-- File: DBSegmentImpl.mesa -- Last edited by -- MBrown on February 25, 1983 1:49 pm -- Cattell on June 7, 1983 1:45 pm -- Willie-Sue on February 3, 1983 10:47 am DIRECTORY Atom, ConvertUnsafe, Inline, DBCache, DBCommon, DBEnvironment, DBFile, DBSegment, DBSegmentPrivate, DBStats, DBStoragePagetags, DBStorageTID USING[DBPageIncrement, TID], File, IO, OrderedSymbolTable, Rope USING [ROPE, Equal], RTOS USING [GetPermanentDataPages]; DBSegmentImpl: PROGRAM IMPORTS Atom, ConvertUnsafe, Inline, DBCommon, DBEnvironment, DBFile, DBSegment, DBStats, IO, FileTable: OrderedSymbolTable, Rope, RTOS EXPORTS DBSegment SHARES DBCache = BEGIN ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; Trans: TYPE = DBCommon.Trans; Segment: TYPE = DBCommon.Segment; SegmentID: TYPE = DBCommon.SegmentID; SegmentIndex: TYPE = DBCommon.SegmentIndex; VersionOptions: TYPE = DBCommon.VersionOptions; DBPage: TYPE = DBCommon.DBPage; NullDBPage: DBPage = DBCommon.NullDBPage; CacheHandle: TYPE = DBCache.CacheHandle; CacheRecord: TYPE = DBCache.CacheRecord; TID: TYPE = DBStorageTID.TID; HashTable: TYPE = RECORD [ nItems: NAT ← 0, -- number of CacheRecords in table buckets: SEQUENCE size: NAT OF CacheHandle ]; -- Module state initialized: BOOL ← FALSE; createIndexesProc: PROC [s: Segment] RETURNS [TID, TID]; zone: ZONE; logReads: BOOL← FALSE; -- causes reads & writes to be logged on UserExec files: FileTable.Table; -- maps SegmentID to interesting information about a file. cacheSize: NAT; -- number of pages allocated to cache. ht: REF HashTable; -- Fast map from DBPage to CacheHandle. ht.size is a power of two. hashMask: CARDINAL; -- ht.size-1, used in computing hash function. lruList: CacheHandle; -- points to header node of doubly-linked list of CacheRecords. -- Data structure invariants: A cache record is present in lruList for every --allocated cache page; there is no separate "free list". occupied=FALSE means that --the page is free, and that the other cache record fields (readonly, written) --are undefined. occupied=TRUE means that the page contains valid data; a page is --stored in ht iff occupied=TRUE. -- lruList points to a cache record with no associated cache page; this "header node" --makes the circular-list manipulations work properly. The most-recently used page is --lruList.succ, and the least-recently used page is lruList.pred. Initialize: PUBLIC PROC [ z: ZONE, nCachePages: NAT, useCacheFile: ROPE, segmentInitProc: PROC [s: Segment] RETURNS [TID, TID]] = { IF initialized THEN ERROR DBEnvironment.InternalError; cacheSize ← nCachePages; createIndexesProc ← segmentInitProc; zone ← z; FileTable.Initialize[ sentinel1: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]], sentinel2: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]]]; files ← FileTable.CreateTable[ header: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]]]; InitializeBufferMgr[]; initialized ← TRUE; }; AttachSegment: PUBLIC PROC [ fileName: ROPE, s: Segment, segmentIndex: SegmentIndex, readonly: BOOL, version: VersionOptions, initializeExistingFile: BOOL, nBytesInitial, nBytesPerExtent: INT] = { fileNode: FileTable.Node; segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex]; IF NOT initialized THEN ERROR; fileNode ← files.Lookup[segmentID]; IF fileNode # NIL THEN { IF fileNode.trans # NIL THEN -- Client declared segment when transaction open on it {SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN}; IF fileNode.segmentID # segmentID OR fileNode.segment # s THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; fileNode.initializeExistingFile ← initializeExistingFile; fileNode.fileName← fileName; fileNode.version← version; fileNode.readonly← readonly; fileNode.pagesInitial← MAX[1, DBFile.PagesFromBytes[nBytesInitial]]; fileNode.pagesPerExtent← MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]]; RETURN; }; fileNode ← zone.NEW[FileTable.NodeRecord ← [ segmentID: segmentID, segment: s, readonly: readonly, version: version, initializeExistingFile: initializeExistingFile, fileName: fileName, pagesInitial: MAX[1, DBFile.PagesFromBytes[nBytesInitial]], pagesPerExtent: MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]] ]]; files.Insert[nodeToInsert: fileNode, insertKey: segmentID]; }; OpenTransaction: PUBLIC PROC [s: Segment, useTrans: Trans, noLog: BOOL] = { fileNode: FileTable.Node = FindFile[s]; IF fileNode=NIL THEN {SIGNAL DBEnvironment.Error[SegmentNotDeclared]; RETURN}; IF fileNode.trans # NIL THEN {SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN}; IF useTrans = NIL THEN { server: ROPE = DBFile.FileServerFromFileName[fileNode.fileName]; useTrans ← DBFile.CreateTransaction[server]; }; fileNode.trans ← useTrans; OpenSegment[fileNode, noLog]; }; OpenSegment: PROC [fileNode: FileTable.Node, noLog: BOOL] = { createdFile: BOOL; IF fileNode.trans = NIL THEN ERROR; [fileNode.openFile, createdFile] ← DBFile.OpenFile[t: fileNode.trans, file: fileNode.fileName, version: fileNode.version, discardFileContents: fileNode.initializeExistingFile, nPagesInitial: fileNode.pagesInitial, noLog: noLog]; IF createdFile OR fileNode.initializeExistingFile THEN { InitializeSegment[fileNode]; fileNode.initializeExistingFile← FALSE} ELSE CheckSegment[fileNode]; }; GetSegmentInfo: PUBLIC PROC [ s: Segment] RETURNS [filePath: ROPE, number: NAT, trans: Trans] = { fileNode: FileTable.Node← FindFile[s]; IF fileNode=NIL THEN RETURN[NIL, 0, NIL] ELSE RETURN[ fileNode.fileName, SegmentIndexFromSegmentID[fileNode.segmentID], fileNode.trans] }; FinishTransaction: PUBLIC PROC [t: Trans, abort: BOOL, continue: BOOL] = { CloseFileIfNoTransaction: SAFE PROC [f: FileTable.Node] RETURNS [stop: BOOL] = TRUSTED { IF f.trans = t THEN { f.openFile ← NIL; f.trans ← NIL }; RETURN [stop: FALSE]; }; IF NOT abort AND NOT WriteoutCache[t] THEN ERROR; --<PUBLIC> DBFile.FinishTransaction[t, abort, continue]; IF abort OR NOT continue THEN { AbortCache[t]; files.EnumerateIncreasing[CloseFileIfNoTransaction]; }; }; SegmentFromTID: PUBLIC PROC [tid: TID] RETURNS [s: Segment] = { fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[tid]]; IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL]; }; IsValidTID: PUBLIC PROC [tid: TID] RETURNS [valid: BOOL] = { fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[tid]]; RETURN [fileNode # NIL AND fileNode.openFile # NIL]; }; SegmentIDFromSegment: PUBLIC PROC [s: Segment] RETURNS [SegmentID] = { fileNode: FileTable.Node = FindFile[s]; IF fileNode = NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared]; IF fileNode.openFile = NIL THEN ERROR DBEnvironment.Error[TransactionNotOpen]; RETURN [fileNode.segmentID]; }; FindFile: PROC [s: Segment] RETURNS [fileNode: FileTable.Node] = { -- If efficiency of this ever becomes important, we can attach info to prop list of atom. StopOnMatch: SAFE PROC [f: FileTable.Node] RETURNS [stop: BOOL] = TRUSTED { IF f.segment = s THEN { fileNode ← f; RETURN [stop: TRUE] }; RETURN [stop: FALSE]; }; fileNode ← NIL; files.EnumerateIncreasing[StopOnMatch]; RETURN [fileNode]; }; EnumerateSegments: PUBLIC PROC [ enumProc: PROC [s: Segment, segmentIndex: SegmentIndex] RETURNS [stop: BOOL]] = { f: FileTable.Node ← files.LookupSmallest[]; WHILE f # NIL DO IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT; f ← files.LookupNextLarger[f.segmentID]; ENDLOOP; }; FilePageFromDBPage: PROC [dbPage: DBPage] RETURNS [readonly: BOOL, file: DBCommon.OpenFileHandle, page: CARDINAL] = { segmentID: SegmentID; fileNode: FileTable.Node; [segmentID, page] ← DecomposeDBPage[dbPage]; fileNode ← files.Lookup[segmentID]; IF fileNode = NIL OR (file ← fileNode.openFile) = NIL THEN ERROR DBEnvironment.Error[TransactionNotOpen]; RETURN [fileNode.readonly, file, page]; }; -- Procs that understand the format of the segment head page. InitializeSegment: PROC [fileNode: FileTable.Node] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile]; fileNode.readonly ← FALSE; [segmentHeadHandle, segmentHead] ← DBSegment.WritePage[fileNode.segmentID, NIL]; segmentHead↑ ← [ segmentID: fileNode.segmentID, allocator: [ firstPageInBlock: ConsDBPage[fileNode.segmentID, 1], nPagesInBlock: fileSize-1, pagesPerExtent: fileNode.pagesPerExtent]]; ConvertUnsafe.AppendRope[ to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]]; [segmentHead.index1, segmentHead.index2] ← createIndexesProc[fileNode.segment]; DBSegment.UnlockPage[segmentHeadHandle]; }; CheckSegment: PROC [fileNode: FileTable.Node] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; [segmentHeadHandle, segmentHead] ← DBSegment.ReadPage[fileNode.segmentID, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap OR segmentHead.softwareCompatibilityVersion # DBCommon.softwareCompatibilityVersion OR segmentHead.seal # DBSegmentPrivate.Seal THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; IF NOT Rope.Equal[ s1: Atom.GetPName[fileNode.segment], s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]], case: FALSE] THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; DBSegment.UnlockPage[segmentHeadHandle]; }; RootIndicesFromSegment: PUBLIC PROC [s: Segment] RETURNS [index1, index2: TID] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileNode: FileTable.Node = FindFile[s]; IF fileNode=NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared]; [segmentHeadHandle, segmentHead] ← DBSegment.ReadPage[fileNode.segmentID, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; index1 ← segmentHead.index1; index2 ← segmentHead.index2; DBSegment.UnlockPage[segmentHeadHandle]; RETURN [index1, index2]; }; AllocPage: PUBLIC PROC [s: DBPage] RETURNS [allocatedPage: DBPage, allocatedPageHandle: CacheHandle, allocatedPagePtr: LONG POINTER] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; DBStats.Inc[SegmentAllocPage]; [segmentHeadHandle, segmentHead] ← DBSegment.WritePage[s, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; IF (allocatedPage ← segmentHead.allocator.freeList) # NullDBPage THEN { -- The segment's free list is nonempty, so return a page from it. allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage; [allocatedPageHandle, allocatedFreeListPagePtr] ← DBSegment.WritePage[allocatedPage, NIL]; IF allocatedFreeListPagePtr.tag # DBStoragePagetags.Free THEN ERROR DBEnvironment.InternalError; segmentHead.allocator.freeList ← allocatedFreeListPagePtr.next; allocatedPagePtr ← allocatedFreeListPagePtr; } ELSE { -- The segment's free list is empty, so allocate from block allocator, perhaps extending file. IF segmentHead.allocator.firstPageInBlock = NullDBPage THEN { fileNode: FileTable.Node ← files.Lookup[s]; size: CARDINAL ← DBFile.GetSize[fileNode.openFile]; DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent]; segmentHead.allocator.firstPageInBlock ← ConsDBPage[s, size]; segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.pagesPerExtent; }; allocatedPage ← segmentHead.allocator.firstPageInBlock; IF (segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.nPagesInBlock-1) = 0 THEN segmentHead.allocator.firstPageInBlock ← NullDBPage ELSE segmentHead.allocator.firstPageInBlock ← allocatedPage + DBStorageTID.DBPageIncrement; [allocatedPageHandle, allocatedPagePtr] ← NewPage[allocatedPage]; }; DBSegment.UnlockPage[segmentHeadHandle]; LOOPHOLE[allocatedPagePtr, LONG POINTER TO DBStoragePagetags.PageHeader].pageTag ← DBStoragePagetags.Unused; RETURN[allocatedPage, allocatedPageHandle, allocatedPagePtr]; };--AllocPage FreePage: PUBLIC PROC [s: DBPage, freePg: DBPage, freeHint: CacheHandle] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; free: LONG POINTER TO DBSegmentPrivate.FreeListPage; DBStats.Inc[SegmentFreePage]; IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBEnvironment.InternalError; [segmentHeadHandle, segmentHead] ← DBSegment.WritePage[s, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError; IF DBSegment.LockCount[freePg, freeHint] # 1 THEN ERROR DBEnvironment.InternalError; free ← DBSegment.ObtainCoreLoc[freePg, freeHint]; IF free.tag = DBStoragePagetags.Free OR free.tag = DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError; DBSegment.WriteLockedPage[freeHint]; free↑ ← [next: segmentHead.allocator.freeList]; segmentHead.allocator.freeList ← freePg; DBSegment.UnlockPage[freeHint]; DBSegment.UnlockPage[segmentHeadHandle]; };--FreePage -- Procs that understand the bit layout of a DBPage. SegmentIDFromSegmentIndex: PROC [segmentIndex: SegmentIndex] RETURNS [SegmentID] = { segmentID: Inline.LongNumber.num = [num[lowbits: 0, highbits: Inline.BITSHIFT[value: segmentIndex, count: 7]]]; RETURN [LOOPHOLE[segmentID]]; }; SegmentIndexFromSegmentID: PROC [segmentID: SegmentID] RETURNS [SegmentIndex] = { RETURN [Inline.BITSHIFT[ value: LOOPHOLE[segmentID, Inline.LongNumber.num].highbits, count: -7]]; }; SegmentIDFromDBPage: PUBLIC PROC [dbPage: DBPage] RETURNS [SegmentID] = { RETURN [InlineSegmentIDFromDBPage[dbPage]] }; InlineSegmentIDFromDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID] = INLINE { segmentID: Inline.LongNumber.num = [num[lowbits: 0, highbits: Inline.BITAND[LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, 177600B]]]; RETURN [LOOPHOLE[segmentID]]; }; DecomposeDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID, CARDINAL] = { segmentID1: Inline.LongNumber.num = [num[lowbits: 0, highbits: Inline.BITAND[LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, 177600B]]]; filePage: CARDINAL ← Inline.BITOR[ Inline.BITSHIFT[ value: LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, count: 10], Inline.BITSHIFT[ value: LOOPHOLE[dbPage, Inline.LongNumber.num].lowbits, count: -6]]; RETURN [LOOPHOLE[segmentID1], filePage]; }; ConsDBPage: PROC [segmentID: SegmentID, filePage: CARDINAL] RETURNS [dbPage: DBPage] = { -- Inverse of DecomposeDBPage dbPage1: Inline.LongNumber.num = [num[ highbits: Inline.BITOR[ LOOPHOLE[segmentID, Inline.LongNumber.num].highbits, Inline.BITSHIFT[value: filePage, count: -10]], lowbits: Inline.BITSHIFT[value: filePage, count: 6]]]; RETURN[LOOPHOLE[dbPage1]]; }; Hash: PROC [dbPage: DBPage] RETURNS [NAT] = INLINE { RETURN [ Inline.BITAND[ hashMask, Inline.BITXOR[ Inline.BITSHIFT[ value: LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, count: -7], Inline.BITSHIFT[ value: LOOPHOLE[dbPage, Inline.LongNumber.num].lowbits, count: -6]]]]; }; -- Buffer manager invalidKey: DBPage = 37777777777B; -- doesn't match any true DBPage (since all DBPages have low bits 0). InitializeBufferMgr: PROC [] = { -- Number of hash buckets = smallest power of 2 not less than cacheSize. hashBuckets: CARDINAL ← 1; temp: CARDINAL ← Inline.BITSHIFT[cacheSize, -1]; UNTIL temp=0 DO hashBuckets ← Inline.BITSHIFT[hashBuckets, 1]; temp ← Inline.BITSHIFT[temp, -1]; ENDLOOP; ht ← zone.NEW[HashTable[hashBuckets] ← [nItems: 0, buckets: ]]; hashMask ← hashBuckets-1; -- Create cacheRecords, format them into an lru list. lruList ← zone.NEW[CacheRecord ← [dbPage: invalidKey, pagePtr: NIL]]; lruList.pred ← lruList.succ ← lruList; FOR i: NAT IN [0 .. (cacheSize+15)/16) DO AddCachePages[16]; ENDLOOP; }; ReadPage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = { IF logReads THEN DBCommon.GetDebugStream[].PutChar['#]; DBStats.Inc[CacheReadOrWrite]; IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage; -- hint is wrong, so search hash table pHint ← Lookup[p]; IF pHint # NIL THEN GO TO ReturnPage; -- page is not present in cache, read it in IF logReads THEN DBCommon.GetDebugStream[].PutChar['!]; DBStats.Inc[CacheMiss]; pHint ← GetUnoccupiedPage[]; { -- We expect ERRORs to be raised from DBFile.ReadFilePage file: DBCommon.OpenFileHandle; page: CARDINAL; [pHint.readonly, file, page] ← FilePageFromDBPage[p]; DBFile.ReadFilePage[file, page, pHint.pagePtr] }; pHint.dbPage ← p; Insert[pHint]; pHint.lockCount ← 0; GOTO ReturnPage; EXITS ReturnPage => { MakeMRU[pHint]; pHint.lockCount ← pHint.lockCount + 1; RETURN[pHint, pHint.pagePtr] } }; WritePage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = { DBStats.Inc[CacheReadOrWrite]; IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage; -- hint is wrong, so search hash table pHint ← Lookup[p]; IF pHint # NIL THEN GOTO ReturnPage; -- page is not present in cache, read it in DBStats.Inc[CacheMiss]; pHint ← GetUnoccupiedPage[]; { -- We expect ERRORs to be raised from DBFile.ReadFilePage file: DBCommon.OpenFileHandle; page: CARDINAL; [pHint.readonly, file, page] ← FilePageFromDBPage[p]; IF pHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed]; DBFile.ReadFilePage[file, page, pHint.pagePtr] }; pHint.dbPage ← p; Insert[pHint]; pHint.lockCount ← 0; GOTO ReturnPage; EXITS ReturnPage => { MakeMRU[pHint]; pHint.lockCount ← pHint.lockCount + 1; pHint.written ← TRUE; RETURN[pHint, pHint.pagePtr] } }; NewPage: PROC [p: DBPage] RETURNS [CacheHandle, LONG POINTER] = { pHint: CacheHandle ← Lookup[p]; IF pHint # NIL THEN ERROR DBEnvironment.InternalError; -- [NotaNewPage] pHint ← GetUnoccupiedPage[]; { [pHint.readonly, , ] ← FilePageFromDBPage[p]; IF pHint.readonly THEN DBEnvironment.Error[WriteNotAllowed]; }; pHint.dbPage ← p; Insert[pHint]; MakeMRU[pHint]; pHint.lockCount ← 1; pHint.written ← TRUE; RETURN[pHint, pHint.pagePtr] }; WriteLockedPage: PUBLIC PROC [pValidHint: CacheHandle] = { IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked] IF pValidHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed]; pValidHint.written ← TRUE; }; LockCount: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [CARDINAL] = { IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- BadCacheHandle RETURN[pValidHint.lockCount]; }; ObtainCoreLoc: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [LONG POINTER] = { IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [BadCacheHandle]; RETURN[pValidHint.pagePtr]; }; UnlockPage: PUBLIC PROC [pValidHint: CacheHandle] = { IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked]; pValidHint.lockCount ← pValidHint.lockCount - 1; }; WriteoutCache: PROC [t: Trans] RETURNS [success: BOOL] = { WritePageOutIfUsesTrans: PROC [cacheHandle: CacheHandle] = { IF cacheHandle.written THEN { fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[cacheHandle.dbPage]]; IF fileNode.trans = t THEN WritePageOut[cacheHandle]; }; }; EnumerateHashTable[WritePageOutIfUsesTrans]; RETURN[TRUE]; }; AbortCache: PROC [t: Trans] = { DeleteIfUsesTrans: PROC [cacheHandle: CacheHandle] = { IF cacheHandle.occupied THEN { fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[cacheHandle.dbPage]]; IF fileNode.trans = t THEN Delete[cacheHandle]; }; }; EnumerateLRUList[DeleteIfUsesTrans]; }; WritePageOut: PROC [cacheHandle: CacheHandle] = { readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL; DBStats.Inc[CacheWritePageBack]; [readonly, file, page] ← FilePageFromDBPage[cacheHandle.dbPage]; IF readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed]; DBFile.WriteFilePage[file, page, cacheHandle.pagePtr]; cacheHandle.written ← FALSE; }; -- Buffer manager LRU list AddCachePages: PROC [nPages: NAT] = { -- creates nPages new cache records, in the LRU position. curCachePage: LONG POINTER ← RTOS.GetPermanentDataPages[ nPages: nPages*DBCommon.PagesPerDBPage, createUniformSwapUnits: TRUE]; FOR i: NAT IN [0..nPages) DO p: CacheHandle ← zone.NEW[CacheRecord ← [ dbPage: invalidKey, pagePtr: curCachePage]]; curCachePage ← curCachePage + DBCommon.WordsPerPage; p.pred ← lruList.pred; p.succ ← lruList; lruList.pred.succ ← p; lruList.pred ← p; ENDLOOP; }; MakeMRU: PROC [x: CacheHandle] = INLINE { -- Assumes that x is part of lruList. Deletes x from its current position and makes --it first in lruList. x.pred.succ ← x.succ; x.succ.pred ← x.pred; x.succ ← lruList.succ; x.pred ← lruList; lruList.succ.pred ← x; lruList.succ ← x; }; EnumerateLRUList: PROC [procToCall: PROC[CacheHandle]] = { -- Calls procToCall for each record in LRUList, in MRU to LRU order. -- procToCall may not perform MakeMRU; use EnumerateHashTable if you want this. -- Note that procToCall will be called for both occupied and unoccupied pages. FOR p: CacheHandle ← lruList.succ, p.succ UNTIL p = lruList DO procToCall[p]; ENDLOOP; }; GetUnoccupiedPage: PROC RETURNS [CacheHandle] = { -- Obtain an empty cache page. It will be the least-recently used page that --unoccupied or occupied but unlocked. The page is returned with Key = garbage, --Page = ptr to a cache page, occupied = FALSE, written = FALSE, --and entered on lruList in no particular location. cacheHandle: CacheHandle; FOR cacheHandle ← lruList.pred, cacheHandle.pred UNTIL cacheHandle = lruList DO IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage; IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied; REPEAT FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied MakePageUnoccupied => { -- cacheHandle.occupied AND cacheHandle.lockCount=0 IF cacheHandle.written THEN WritePageOut[cacheHandle]; Delete[cacheHandle]; }; FINISHED => ERROR DBEnvironment.Fatal[AllCachePagesLocked] ENDLOOP; cacheHandle.written ← FALSE; RETURN[cacheHandle]; }; -- Buffer manager hash table EnumerateHashTable: PROC[procToCall: PROC[CacheHandle]] = { -- Calls procToCall for each record in table. -- procToCall may not perform Delete; use EnumerateLRUList if you want this. FOR i: NAT IN [0..ht.size) DO FOR p: CacheHandle ← ht.buckets[i], p.hashChain UNTIL p = NIL DO procToCall[p]; ENDLOOP; ENDLOOP; }; Lookup: PROC[dbPage: DBPage] RETURNS[CacheHandle] = { i: CARDINAL ← Hash[dbPage]; p: CacheHandle ← ht.buckets[i]; DBStats.Inc[CacheHTLookup]; WHILE p # NIL DO IF p.dbPage = dbPage THEN RETURN[p]; DBStats.Inc[CacheHTConflictInLookup]; p ← p.hashChain; ENDLOOP; RETURN[NIL]; }; Insert: PROC[x: CacheHandle] = { i: CARDINAL ← Hash[x.dbPage]; -- Make sure its not there already... p: CacheHandle ← ht.buckets[i]; WHILE p # NIL DO IF p.dbPage = x.dbPage THEN GOTO FoundDuplicate; p ← p.hashChain; REPEAT FoundDuplicate => ERROR DBEnvironment.InternalError; -- [AlreadyInHashTable]; FINISHED => { x.hashChain ← ht.buckets[i]; ht.buckets[i] ← x; ht.nItems ← ht.nItems + 1; x.occupied ← TRUE; }; ENDLOOP; }; Delete: PROC[x: CacheHandle] = { i: CARDINAL ← Hash[x.dbPage]; p: CacheHandle ← ht.buckets[i]; IF p = x THEN {ht.buckets[i] ← x.hashChain; GOTO Deleted}; WHILE p # NIL DO IF p.hashChain = x THEN {p.hashChain ← x.hashChain; GOTO Deleted}; p ← p.hashChain; REPEAT FINISHED => ERROR DBEnvironment.InternalError; -- [NotPresentInHashTable]; ENDLOOP; EXITS Deleted => { x.dbPage ← invalidKey; x.occupied ← FALSE; x.hashChain ← NIL; ht.nItems ← ht.nItems - 1; }; }; -- Buffer manager debugging AssertionFailed: SIGNAL = CODE; Assert: PROC [assertion: BOOL] = {IF ~assertion THEN SIGNAL AssertionFailed; }; -- So we can Proceed (in debugger) from failure. 