-- File: DBSegmentImpl.mesa
-- Last edited by
-- MBrown on February 25, 1983 1:49 pm
-- Cattell on June 7, 1983 1:45 pm
-- Willie-Sue on February 3, 1983 10:47 am
DIRECTORY
Atom,
ConvertUnsafe,
Inline,
DBCache,
DBCommon,
DBEnvironment,
DBFile,
DBSegment,
DBSegmentPrivate,
DBStats,
DBStoragePagetags,
DBStorageTID USING[DBPageIncrement, TID],
File,
IO,
OrderedSymbolTable,
Rope USING [ROPE, Equal],
RTOS USING [GetPermanentDataPages];
DBSegmentImpl: PROGRAM
IMPORTS
Atom,
ConvertUnsafe,
Inline,
DBCommon,
DBEnvironment,
DBFile,
DBSegment,
DBStats,
IO,
FileTable: OrderedSymbolTable,
Rope,
RTOS
EXPORTS
DBSegment
SHARES
DBCache
= BEGIN
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Trans: TYPE = DBCommon.Trans;
Segment: TYPE = DBCommon.Segment;
SegmentID: TYPE = DBCommon.SegmentID;
SegmentIndex: TYPE = DBCommon.SegmentIndex;
VersionOptions: TYPE = DBCommon.VersionOptions;
DBPage: TYPE = DBCommon.DBPage;
NullDBPage: DBPage = DBCommon.NullDBPage;
CacheHandle: TYPE = DBCache.CacheHandle;
CacheRecord: TYPE = DBCache.CacheRecord;
TID: TYPE = DBStorageTID.TID;
HashTable: TYPE = RECORD [
nItems: NAT ← 0, -- number of CacheRecords in table
buckets: SEQUENCE size: NAT OF CacheHandle
];
-- Module state
initialized: BOOL ← FALSE;
createIndexesProc: PROC [s: Segment] RETURNS [TID, TID];
zone: ZONE;
logReads: BOOL← FALSE; -- causes reads & writes to be logged on UserExec
files: FileTable.Table;
-- maps SegmentID to interesting information about a file.
cacheSize: NAT;
-- number of pages allocated to cache.
ht: REF HashTable;
-- Fast map from DBPage to CacheHandle. ht.size is a power of two.
hashMask: CARDINAL;
-- ht.size-1, used in computing hash function.
lruList: CacheHandle;
-- points to header node of doubly-linked list of CacheRecords.
-- Data structure invariants: A cache record is present in lruList for every
--allocated cache page; there is no separate "free list". occupied=FALSE means that
--the page is free, and that the other cache record fields (readonly, written)
--are undefined. occupied=TRUE means that the page contains valid data; a page is
--stored in ht iff occupied=TRUE.
-- lruList points to a cache record with no associated cache page; this "header node"
--makes the circular-list manipulations work properly. The most-recently used page is
--lruList.succ, and the least-recently used page is lruList.pred.
Initialize: PUBLIC PROC [
z: ZONE,
nCachePages: NAT,
useCacheFile: ROPE,
segmentInitProc: PROC [s: Segment] RETURNS [TID, TID]] = {
IF initialized THEN ERROR DBEnvironment.InternalError;
cacheSize ← nCachePages;
createIndexesProc ← segmentInitProc;
zone ← z;
FileTable.Initialize[
sentinel1: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]],
sentinel2: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]]];
files ← FileTable.CreateTable[
header: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]]];
InitializeBufferMgr[];
initialized ← TRUE;
};
AttachSegment: PUBLIC PROC [
fileName: ROPE, s: Segment, segmentIndex: SegmentIndex,
readonly: BOOL, version: VersionOptions, initializeExistingFile: BOOL,
nBytesInitial, nBytesPerExtent: INT] = {
fileNode: FileTable.Node;
segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex];
IF NOT initialized THEN ERROR;
fileNode ← files.Lookup[segmentID];
IF fileNode # NIL THEN {
IF fileNode.trans # NIL THEN -- Client declared segment when transaction open on it
{SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN};
IF fileNode.segmentID # segmentID OR fileNode.segment # s THEN
ERROR DBEnvironment.Error[MismatchedExistingSegment];
fileNode.initializeExistingFile ← initializeExistingFile;
fileNode.fileName← fileName;
fileNode.version← version;
fileNode.readonly← readonly;
fileNode.pagesInitial← MAX[1, DBFile.PagesFromBytes[nBytesInitial]];
fileNode.pagesPerExtent← MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]];
RETURN;
};
fileNode ← zone.NEW[FileTable.NodeRecord ← [
segmentID: segmentID,
segment: s,
readonly: readonly,
version: version,
initializeExistingFile: initializeExistingFile,
fileName: fileName,
pagesInitial: MAX[1, DBFile.PagesFromBytes[nBytesInitial]],
pagesPerExtent: MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]]
]];
files.Insert[nodeToInsert: fileNode, insertKey: segmentID];
};
OpenTransaction: PUBLIC PROC [s: Segment, useTrans: Trans, noLog: BOOL] = {
fileNode: FileTable.Node = FindFile[s];
IF fileNode=NIL THEN
{SIGNAL DBEnvironment.Error[SegmentNotDeclared]; RETURN};
IF fileNode.trans # NIL THEN
{SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN};
IF useTrans = NIL THEN {
server: ROPE = DBFile.FileServerFromFileName[fileNode.fileName];
useTrans ← DBFile.CreateTransaction[server];
};
fileNode.trans ← useTrans;
OpenSegment[fileNode, noLog];
};
OpenSegment: PROC [fileNode: FileTable.Node, noLog: BOOL] = {
createdFile: BOOL;
IF fileNode.trans = NIL THEN ERROR;
[fileNode.openFile, createdFile] ← DBFile.OpenFile[t: fileNode.trans, file: fileNode.fileName,
version: fileNode.version, discardFileContents: fileNode.initializeExistingFile,
nPagesInitial: fileNode.pagesInitial, noLog: noLog];
IF createdFile OR fileNode.initializeExistingFile THEN
{ InitializeSegment[fileNode]; fileNode.initializeExistingFile← FALSE}
ELSE CheckSegment[fileNode];
};
GetSegmentInfo: PUBLIC PROC [
s: Segment] RETURNS [filePath: ROPE, number: NAT, trans: Trans] = {
fileNode: FileTable.Node← FindFile[s];
IF fileNode=NIL THEN RETURN[NIL, 0, NIL]
ELSE RETURN[
fileNode.fileName,
SegmentIndexFromSegmentID[fileNode.segmentID],
fileNode.trans]
};
FinishTransaction: PUBLIC PROC [t: Trans, abort: BOOL, continue: BOOL] = {
CloseFileIfNoTransaction: SAFE PROC [f: FileTable.Node]
RETURNS [stop: BOOL] = TRUSTED {
IF f.trans = t THEN { f.openFile ← NIL; f.trans ← NIL };
RETURN [stop: FALSE];
};
IF NOT abort AND NOT WriteoutCache[t] THEN ERROR; --<PUBLIC>
DBFile.FinishTransaction[t, abort, continue];
IF abort OR NOT continue THEN {
AbortCache[t];
files.EnumerateIncreasing[CloseFileIfNoTransaction];
};
};
SegmentFromTID: PUBLIC PROC [tid: TID] RETURNS [s: Segment] = {
fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[tid]];
IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL];
};
IsValidTID: PUBLIC PROC [tid: TID] RETURNS [valid: BOOL] = {
fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[tid]];
RETURN [fileNode # NIL AND fileNode.openFile # NIL];
};
SegmentIDFromSegment: PUBLIC PROC [s: Segment] RETURNS [SegmentID] = {
fileNode: FileTable.Node = FindFile[s];
IF fileNode = NIL THEN
ERROR DBEnvironment.Error[SegmentNotDeclared];
IF fileNode.openFile = NIL THEN
ERROR DBEnvironment.Error[TransactionNotOpen];
RETURN [fileNode.segmentID];
};
FindFile: PROC [s: Segment] RETURNS [fileNode: FileTable.Node] = {
-- If efficiency of this ever becomes important, we can attach info to prop list of atom.
StopOnMatch: SAFE PROC [f: FileTable.Node]
RETURNS [stop: BOOL] = TRUSTED {
IF f.segment = s THEN { fileNode ← f; RETURN [stop: TRUE] };
RETURN [stop: FALSE];
};
fileNode ← NIL;
files.EnumerateIncreasing[StopOnMatch];
RETURN [fileNode];
};
EnumerateSegments: PUBLIC PROC [
enumProc: PROC [s: Segment, segmentIndex: SegmentIndex] RETURNS [stop: BOOL]] = {
f: FileTable.Node ← files.LookupSmallest[];
WHILE f # NIL DO
IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT;
f ← files.LookupNextLarger[f.segmentID];
ENDLOOP;
};
FilePageFromDBPage: PROC [dbPage: DBPage]
RETURNS [readonly: BOOL, file: DBCommon.OpenFileHandle, page: CARDINAL] = {
segmentID: SegmentID;
fileNode: FileTable.Node;
[segmentID, page] ← DecomposeDBPage[dbPage];
fileNode ← files.Lookup[segmentID];
IF fileNode = NIL OR (file ← fileNode.openFile) = NIL THEN
ERROR DBEnvironment.Error[TransactionNotOpen];
RETURN [fileNode.readonly, file, page];
};
-- Procs that understand the format of the segment head page.
InitializeSegment: PROC [fileNode: FileTable.Node] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile];
fileNode.readonly ← FALSE;
[segmentHeadHandle, segmentHead] ← DBSegment.WritePage[fileNode.segmentID, NIL];
segmentHead↑ ← [
segmentID: fileNode.segmentID,
allocator: [
firstPageInBlock: ConsDBPage[fileNode.segmentID, 1],
nPagesInBlock: fileSize-1,
pagesPerExtent: fileNode.pagesPerExtent]];
ConvertUnsafe.AppendRope[
to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]];
[segmentHead.index1, segmentHead.index2] ← createIndexesProc[fileNode.segment];
DBSegment.UnlockPage[segmentHeadHandle];
};
CheckSegment: PROC [fileNode: FileTable.Node] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
[segmentHeadHandle, segmentHead] ← DBSegment.ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap OR
segmentHead.softwareCompatibilityVersion #
DBCommon.softwareCompatibilityVersion OR
segmentHead.seal # DBSegmentPrivate.Seal THEN
ERROR DBEnvironment.Error[MismatchedExistingSegment];
IF NOT Rope.Equal[
s1: Atom.GetPName[fileNode.segment],
s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]],
case: FALSE] THEN ERROR DBEnvironment.Error[MismatchedExistingSegment];
DBSegment.UnlockPage[segmentHeadHandle];
};
RootIndicesFromSegment: PUBLIC PROC [s: Segment] RETURNS [index1, index2: TID] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileNode: FileTable.Node = FindFile[s];
IF fileNode=NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared];
[segmentHeadHandle, segmentHead] ← DBSegment.ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap THEN
ERROR DBEnvironment.Error[MismatchedExistingSegment];
index1 ← segmentHead.index1; index2 ← segmentHead.index2;
DBSegment.UnlockPage[segmentHeadHandle];
RETURN [index1, index2];
};
AllocPage: PUBLIC PROC [s: DBPage] RETURNS [allocatedPage: DBPage,
allocatedPageHandle: CacheHandle, allocatedPagePtr: LONG POINTER] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
DBStats.Inc[SegmentAllocPage];
[segmentHeadHandle, segmentHead] ← DBSegment.WritePage[s, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap THEN
ERROR DBEnvironment.Error[MismatchedExistingSegment];
IF (allocatedPage ← segmentHead.allocator.freeList) # NullDBPage THEN {
-- The segment's free list is nonempty, so return a page from it.
allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage;
[allocatedPageHandle, allocatedFreeListPagePtr] ← DBSegment.WritePage[allocatedPage, NIL];
IF allocatedFreeListPagePtr.tag # DBStoragePagetags.Free THEN ERROR DBEnvironment.InternalError;
segmentHead.allocator.freeList ← allocatedFreeListPagePtr.next;
allocatedPagePtr ← allocatedFreeListPagePtr;
}
ELSE {
-- The segment's free list is empty, so allocate from block allocator, perhaps extending file.
IF segmentHead.allocator.firstPageInBlock = NullDBPage THEN {
fileNode: FileTable.Node ← files.Lookup[s];
size: CARDINAL ← DBFile.GetSize[fileNode.openFile];
DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent];
segmentHead.allocator.firstPageInBlock ← ConsDBPage[s, size];
segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.pagesPerExtent;
};
allocatedPage ← segmentHead.allocator.firstPageInBlock;
IF (segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.nPagesInBlock-1) = 0 THEN
segmentHead.allocator.firstPageInBlock ← NullDBPage
ELSE segmentHead.allocator.firstPageInBlock ←
allocatedPage + DBStorageTID.DBPageIncrement;
[allocatedPageHandle, allocatedPagePtr] ← NewPage[allocatedPage];
};
DBSegment.UnlockPage[segmentHeadHandle];
LOOPHOLE[allocatedPagePtr, LONG POINTER TO DBStoragePagetags.PageHeader].pageTag ←
DBStoragePagetags.Unused;
RETURN[allocatedPage, allocatedPageHandle, allocatedPagePtr];
};--AllocPage
FreePage: PUBLIC PROC [s: DBPage, freePg: DBPage, freeHint: CacheHandle] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
free: LONG POINTER TO DBSegmentPrivate.FreeListPage;
DBStats.Inc[SegmentFreePage];
IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBEnvironment.InternalError;
[segmentHeadHandle, segmentHead] ← DBSegment.WritePage[s, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError;
IF DBSegment.LockCount[freePg, freeHint] # 1 THEN ERROR DBEnvironment.InternalError;
free ← DBSegment.ObtainCoreLoc[freePg, freeHint];
IF free.tag = DBStoragePagetags.Free OR free.tag = DBStoragePagetags.AMap THEN
ERROR DBEnvironment.InternalError;
DBSegment.WriteLockedPage[freeHint];
free↑ ← [next: segmentHead.allocator.freeList];
segmentHead.allocator.freeList ← freePg;
DBSegment.UnlockPage[freeHint];
DBSegment.UnlockPage[segmentHeadHandle];
};--FreePage
-- Procs that understand the bit layout of a DBPage.
SegmentIDFromSegmentIndex: PROC [segmentIndex: SegmentIndex]
RETURNS [SegmentID] = {
segmentID: Inline.LongNumber.num = [num[lowbits: 0,
highbits: Inline.BITSHIFT[value: segmentIndex, count: 7]]];
RETURN [LOOPHOLE[segmentID]];
};
SegmentIndexFromSegmentID: PROC [segmentID: SegmentID]
RETURNS [SegmentIndex] = {
RETURN [Inline.BITSHIFT[
value: LOOPHOLE[segmentID, Inline.LongNumber.num].highbits,
count: -7]];
};
SegmentIDFromDBPage: PUBLIC PROC [dbPage: DBPage]
RETURNS [SegmentID] = {
RETURN [InlineSegmentIDFromDBPage[dbPage]]
};
InlineSegmentIDFromDBPage: PROC [dbPage: DBPage]
RETURNS [SegmentID] = INLINE {
segmentID: Inline.LongNumber.num = [num[lowbits: 0,
highbits: Inline.BITAND[LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, 177600B]]];
RETURN [LOOPHOLE[segmentID]];
};
DecomposeDBPage: PROC [dbPage: DBPage]
RETURNS [SegmentID, CARDINAL] = {
segmentID1: Inline.LongNumber.num = [num[lowbits: 0,
highbits: Inline.BITAND[LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, 177600B]]];
filePage: CARDINAL ← Inline.BITOR[
Inline.BITSHIFT[
value: LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, count: 10],
Inline.BITSHIFT[
value: LOOPHOLE[dbPage, Inline.LongNumber.num].lowbits, count: -6]];
RETURN [LOOPHOLE[segmentID1], filePage];
};
ConsDBPage: PROC [segmentID: SegmentID, filePage: CARDINAL]
RETURNS [dbPage: DBPage] = {
-- Inverse of DecomposeDBPage
dbPage1: Inline.LongNumber.num = [num[
highbits: Inline.BITOR[
LOOPHOLE[segmentID, Inline.LongNumber.num].highbits,
Inline.BITSHIFT[value: filePage, count: -10]],
lowbits: Inline.BITSHIFT[value: filePage, count: 6]]];
RETURN[LOOPHOLE[dbPage1]];
};
Hash: PROC [dbPage: DBPage] RETURNS [NAT] = INLINE { RETURN [
Inline.BITAND[
hashMask,
Inline.BITXOR[
Inline.BITSHIFT[
value: LOOPHOLE[dbPage, Inline.LongNumber.num].highbits,
count: -7],
Inline.BITSHIFT[
value: LOOPHOLE[dbPage, Inline.LongNumber.num].lowbits,
count: -6]]]];
};
-- Buffer manager
invalidKey: DBPage = 37777777777B;
-- doesn't match any true DBPage (since all DBPages have low bits 0).
InitializeBufferMgr: PROC [] = {
-- Number of hash buckets = smallest power of 2 not less than cacheSize.
hashBuckets: CARDINAL ← 1;
temp: CARDINAL ← Inline.BITSHIFT[cacheSize, -1];
UNTIL temp=0 DO
hashBuckets ← Inline.BITSHIFT[hashBuckets, 1];
temp ← Inline.BITSHIFT[temp, -1];
ENDLOOP;
ht ← zone.NEW[HashTable[hashBuckets] ← [nItems: 0, buckets: ]];
hashMask ← hashBuckets-1;
-- Create cacheRecords, format them into an lru list.
lruList ← zone.NEW[CacheRecord ← [dbPage: invalidKey, pagePtr: NIL]];
lruList.pred ← lruList.succ ← lruList;
FOR i: NAT IN [0 .. (cacheSize+15)/16) DO
AddCachePages[16];
ENDLOOP;
};
ReadPage: PUBLIC PROC [p: DBPage, pHint: CacheHandle]
RETURNS [CacheHandle, LONG POINTER] = {
IF logReads THEN
DBCommon.GetDebugStream[].PutChar['#];
DBStats.Inc[CacheReadOrWrite];
IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage;
-- hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GO TO ReturnPage;
-- page is not present in cache, read it in
IF logReads THEN
DBCommon.GetDebugStream[].PutChar['!];
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{ -- We expect ERRORs to be raised from DBFile.ReadFilePage
file: DBCommon.OpenFileHandle; page: CARDINAL;
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
RETURN[pHint, pHint.pagePtr]
}
};
WritePage: PUBLIC PROC [p: DBPage, pHint: CacheHandle]
RETURNS [CacheHandle, LONG POINTER] = {
DBStats.Inc[CacheReadOrWrite];
IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage;
-- hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GOTO ReturnPage;
-- page is not present in cache, read it in
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{ -- We expect ERRORs to be raised from DBFile.ReadFilePage
file: DBCommon.OpenFileHandle; page: CARDINAL;
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
IF pHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
}
};
NewPage: PROC [p: DBPage]
RETURNS [CacheHandle, LONG POINTER] = {
pHint: CacheHandle ← Lookup[p];
IF pHint # NIL THEN ERROR DBEnvironment.InternalError; -- [NotaNewPage]
pHint ← GetUnoccupiedPage[];
{
[pHint.readonly, , ] ← FilePageFromDBPage[p];
IF pHint.readonly THEN DBEnvironment.Error[WriteNotAllowed];
};
pHint.dbPage ← p; Insert[pHint];
MakeMRU[pHint];
pHint.lockCount ← 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
};
WriteLockedPage: PUBLIC PROC [pValidHint: CacheHandle] = {
IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN
ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked]
IF pValidHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed];
pValidHint.written ← TRUE;
};
LockCount: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle]
RETURNS [CARDINAL] = {
IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN
ERROR DBEnvironment.InternalError; -- BadCacheHandle
RETURN[pValidHint.lockCount];
};
ObtainCoreLoc: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle]
RETURNS [LONG POINTER] = {
IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN
ERROR DBEnvironment.InternalError; -- [BadCacheHandle];
RETURN[pValidHint.pagePtr];
};
UnlockPage: PUBLIC PROC [pValidHint: CacheHandle] = {
IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN
ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked];
pValidHint.lockCount ← pValidHint.lockCount - 1;
};
WriteoutCache: PROC [t: Trans] RETURNS [success: BOOL] = {
WritePageOutIfUsesTrans: PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.written THEN {
fileNode: FileTable.Node =
files.Lookup[InlineSegmentIDFromDBPage[cacheHandle.dbPage]];
IF fileNode.trans = t THEN WritePageOut[cacheHandle];
};
};
EnumerateHashTable[WritePageOutIfUsesTrans];
RETURN[TRUE];
};
AbortCache: PROC [t: Trans] = {
DeleteIfUsesTrans: PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.occupied THEN {
fileNode: FileTable.Node =
files.Lookup[InlineSegmentIDFromDBPage[cacheHandle.dbPage]];
IF fileNode.trans = t THEN Delete[cacheHandle];
};
};
EnumerateLRUList[DeleteIfUsesTrans];
};
WritePageOut: PROC [cacheHandle: CacheHandle] = {
readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL;
DBStats.Inc[CacheWritePageBack];
[readonly, file, page] ← FilePageFromDBPage[cacheHandle.dbPage];
IF readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed];
DBFile.WriteFilePage[file, page, cacheHandle.pagePtr];
cacheHandle.written ← FALSE;
};
-- Buffer manager LRU list
AddCachePages: PROC [nPages: NAT] = {
-- creates nPages new cache records, in the LRU position.
curCachePage: LONG POINTER ← RTOS.GetPermanentDataPages[
nPages: nPages*DBCommon.PagesPerDBPage, createUniformSwapUnits: TRUE];
FOR i: NAT IN [0..nPages) DO
p: CacheHandle ← zone.NEW[CacheRecord ← [
dbPage: invalidKey, pagePtr: curCachePage]];
curCachePage ← curCachePage + DBCommon.WordsPerPage;
p.pred ← lruList.pred; p.succ ← lruList;
lruList.pred.succ ← p; lruList.pred ← p;
ENDLOOP;
};
MakeMRU: PROC [x: CacheHandle] = INLINE {
-- Assumes that x is part of lruList. Deletes x from its current position and makes
--it first in lruList.
x.pred.succ ← x.succ; x.succ.pred ← x.pred;
x.succ ← lruList.succ; x.pred ← lruList;
lruList.succ.pred ← x; lruList.succ ← x;
};
EnumerateLRUList: PROC [procToCall: PROC[CacheHandle]] = {
-- Calls procToCall for each record in LRUList, in MRU to LRU order.
-- procToCall may not perform MakeMRU; use EnumerateHashTable if you want this.
-- Note that procToCall will be called for both occupied and unoccupied pages.
FOR p: CacheHandle ← lruList.succ, p.succ UNTIL p = lruList DO
procToCall[p];
ENDLOOP;
};
GetUnoccupiedPage: PROC RETURNS [CacheHandle] = {
-- Obtain an empty cache page. It will be the least-recently used page that
--unoccupied or occupied but unlocked. The page is returned with Key = garbage,
--Page = ptr to a cache page, occupied = FALSE, written = FALSE,
--and entered on lruList in no particular location.
cacheHandle: CacheHandle;
FOR cacheHandle ← lruList.pred, cacheHandle.pred UNTIL cacheHandle = lruList DO
IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage;
IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied;
REPEAT
FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied
MakePageUnoccupied => { -- cacheHandle.occupied AND cacheHandle.lockCount=0
IF cacheHandle.written THEN WritePageOut[cacheHandle];
Delete[cacheHandle];
};
FINISHED => ERROR DBEnvironment.Fatal[AllCachePagesLocked]
ENDLOOP;
cacheHandle.written ← FALSE;
RETURN[cacheHandle];
};
-- Buffer manager hash table
EnumerateHashTable: PROC[procToCall: PROC[CacheHandle]] = {
-- Calls procToCall for each record in table.
-- procToCall may not perform Delete; use EnumerateLRUList if you want this.
FOR i: NAT IN [0..ht.size) DO
FOR p: CacheHandle ← ht.buckets[i], p.hashChain UNTIL p = NIL DO
procToCall[p];
ENDLOOP;
ENDLOOP;
};
Lookup: PROC[dbPage: DBPage] RETURNS[CacheHandle] = {
i: CARDINAL ← Hash[dbPage];
p: CacheHandle ← ht.buckets[i];
DBStats.Inc[CacheHTLookup];
WHILE p # NIL DO
IF p.dbPage = dbPage THEN RETURN[p];
DBStats.Inc[CacheHTConflictInLookup];
p ← p.hashChain;
ENDLOOP;
RETURN[NIL];
};
Insert: PROC[x: CacheHandle] = {
i: CARDINAL ← Hash[x.dbPage];
-- Make sure its not there already...
p: CacheHandle ← ht.buckets[i];
WHILE p # NIL DO
IF p.dbPage = x.dbPage THEN GOTO FoundDuplicate;
p ← p.hashChain;
REPEAT
FoundDuplicate => ERROR DBEnvironment.InternalError; -- [AlreadyInHashTable];
FINISHED => {
x.hashChain ← ht.buckets[i];
ht.buckets[i] ← x;
ht.nItems ← ht.nItems + 1;
x.occupied ← TRUE;
};
ENDLOOP;
};
Delete: PROC[x: CacheHandle] = {
i: CARDINAL ← Hash[x.dbPage];
p: CacheHandle ← ht.buckets[i];
IF p = x THEN {ht.buckets[i] ← x.hashChain; GOTO Deleted};
WHILE p # NIL DO
IF p.hashChain = x THEN {p.hashChain ← x.hashChain; GOTO Deleted};
p ← p.hashChain;
REPEAT
FINISHED => ERROR DBEnvironment.InternalError; -- [NotPresentInHashTable];
ENDLOOP;
EXITS
Deleted => {
x.dbPage ← invalidKey;
x.occupied ← FALSE;
x.hashChain ← NIL;
ht.nItems ← ht.nItems - 1;
};
};
-- Buffer manager debugging
AssertionFailed: SIGNAL = CODE;
Assert: PROC [assertion: BOOL] = {IF ~assertion THEN SIGNAL AssertionFailed; };
-- So we can Proceed (in debugger) from failure.
CheckState: PUBLIC PROC [doPrinting, printOnlyLockedPages: BOOL] = {
nOccupiedItems, checkOccupiedItems: CARDINAL ← 0;
nLockedItems: CARDINAL ← 0;
nFreeItems: CARDINAL ← 0;
PageHeader: TYPE = MACHINE DEPENDENT RECORD[
pageTag: CARDINAL [0..256),
dontCare: CARDINAL [0..256)];
IOref: PROC[p: REF ANY] RETURNS[IO.Value] = INLINE {
RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]};
IOptr: PROC[p: LONG POINTER] RETURNS[IO.Value] = INLINE {
RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]};
out: IO.Handle ← IF doPrinting THEN DBCommon.GetDebugStream[] ELSE NIL;
CheckEntry1: PROC[p: CacheHandle] = {
Assert[p.pred.succ = p];
Assert[p.succ.pred = p];
Assert[p.pagePtr # NIL];
IF ~p.occupied THEN {
nFreeItems ← nFreeItems + 1;
Assert[p.dbPage = invalidKey];
RETURN };
Assert[p.dbPage # invalidKey];
nOccupiedItems ← nOccupiedItems + 1;
IF p.lockCount > 0 THEN nLockedItems ← nLockedItems + 1;
IF p.written THEN Assert[NOT p.readonly];
IF ~doPrinting OR (p.lockCount = 0 AND printOnlyLockedPages) THEN RETURN;
{pageTag: CARDINAL = LOOPHOLE[p.pagePtr, LONG POINTER TO PageHeader].pageTag;
out.PutF["%11bB %11bB %11bB %3d",
IOref[p], IO.card[p.dbPage], IOptr[p.pagePtr], IO.card[pageTag]];
out.PutF[" %3d %g %g*n",
IO.card[p.lockCount], IO.bool[p.written], IO.bool[p.readonly]];
};
};
CheckEntry2: PROC[p: CacheHandle] = {
Assert[p.occupied];
checkOccupiedItems ← checkOccupiedItems + 1;
};
IF doPrinting THEN {
out.PutF["Cache size %g pages.\n", IO.card[cacheSize]];
out.PutRope[
" cacheHdl dbPage pagePtr tag lockCnt written readonly\n"];
};
EnumerateLRUList[CheckEntry1];
EnumerateHashTable[CheckEntry2];
IF doPrinting THEN
out.PutF["%g pages occupied, %g are locked\n",
IO.card[nOccupiedItems], IO.card[nLockedItems]];
Assert[nOccupiedItems = checkOccupiedItems];
Assert[ht.nItems = checkOccupiedItems];
Assert[nFreeItems + nOccupiedItems = cacheSize];
};
END.--DBSegmentImpl
CHANGE LOG
Created by MBrown on December 10, 1979 5:09 PM
Changed by MBrown on December 1, 1982 9:41 am
-- Total rewrite for multiple segments, merging in DBCacheImpl. No more support for module
--finalization. Discarded >3 pages of change log.
Changed by Cattell on January 16, 1983 2:35 pm
-- Added some more meaningful signals for various client errors for segments and transactions,
--in conjunction with new DBEnvironment interface replacing DBException.
Changed by Cattell on January 17, 1983 8:21 pm
-- AttachSegment didn't put readOnly and version into fileNode when segment re-declared, so couldn't re-open transaction with version to create new file.
Changed by Cattell on January 19, 1983 10:57 am
-- The code didn't check for FindFile returning NIL in a couple places, resulting in address faults if the segment was not declared. Put in checks and signal generation.
Changed by Willie-Sue on February 3, 1983
-- Added noLog arg to OpenTransaction and OpenSegment
Changed by MBrown on February 25, 1983 1:39 pm
-- Fixed bug in ReadPage, WritePage, NewPage: buffer left in "occupied" state after
--error (e.g. attempt to read after transaction abort).
Changed by Cattell & Donahue on May 19, 1983 2:26 pm
-- Changed AttachSegment so can re-call it with new file name for same segment atom.