-- File: DBSegmentImpl.mesa -- Last edited by -- MBrown on February 25, 1983 1:49 pm -- Cattell on June 7, 1983 1:45 pm -- Willie-Sue on February 3, 1983 10:47 am DIRECTORY Atom, ConvertUnsafe, Inline, DBCache, DBCommon, DBEnvironment, DBFile, DBSegment, DBSegmentPrivate, DBStats, DBStoragePagetags, DBStorageTID USING[DBPageIncrement, TID], File, IO, OrderedSymbolTable, Rope USING [ROPE, Equal], RTOS USING [GetPermanentDataPages]; DBSegmentImpl: PROGRAM IMPORTS Atom, ConvertUnsafe, Inline, DBCommon, DBEnvironment, DBFile, DBSegment, DBStats, IO, FileTable: OrderedSymbolTable, Rope, RTOS EXPORTS DBSegment SHARES DBCache = BEGIN ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; Trans: TYPE = DBCommon.Trans; Segment: TYPE = DBCommon.Segment; SegmentID: TYPE = DBCommon.SegmentID; SegmentIndex: TYPE = DBCommon.SegmentIndex; VersionOptions: TYPE = DBCommon.VersionOptions; DBPage: TYPE = DBCommon.DBPage; NullDBPage: DBPage = DBCommon.NullDBPage; CacheHandle: TYPE = DBCache.CacheHandle; CacheRecord: TYPE = DBCache.CacheRecord; TID: TYPE = DBStorageTID.TID; HashTable: TYPE = RECORD [ nItems: NAT _ 0, -- number of CacheRecords in table buckets: SEQUENCE size: NAT OF CacheHandle ]; -- Module state initialized: BOOL _ FALSE; createIndexesProc: PROC [s: Segment] RETURNS [TID, TID]; zone: ZONE; logReads: BOOL_ FALSE; -- causes reads & writes to be logged on UserExec files: FileTable.Table; -- maps SegmentID to interesting information about a file. cacheSize: NAT; -- number of pages allocated to cache. ht: REF HashTable; -- Fast map from DBPage to CacheHandle. ht.size is a power of two. hashMask: CARDINAL; -- ht.size-1, used in computing hash function. lruList: CacheHandle; -- points to header node of doubly-linked list of CacheRecords. -- Data structure invariants: A cache record is present in lruList for every --allocated cache page; there is no separate "free list". occupied=FALSE means that --the page is free, and that the other cache record fields (readonly, written) --are undefined. occupied=TRUE means that the page contains valid data; a page is --stored in ht iff occupied=TRUE. -- lruList points to a cache record with no associated cache page; this "header node" --makes the circular-list manipulations work properly. The most-recently used page is --lruList.succ, and the least-recently used page is lruList.pred. Initialize: PUBLIC PROC [ z: ZONE, nCachePages: NAT, useCacheFile: ROPE, segmentInitProc: PROC [s: Segment] RETURNS [TID, TID]] = { IF initialized THEN ERROR DBEnvironment.InternalError; cacheSize _ nCachePages; createIndexesProc _ segmentInitProc; zone _ z; FileTable.Initialize[ sentinel1: zone.NEW[FileTable.NodeRecord _ [segmentID: NullDBPage]], sentinel2: zone.NEW[FileTable.NodeRecord _ [segmentID: NullDBPage]]]; files _ FileTable.CreateTable[ header: zone.NEW[FileTable.NodeRecord _ [segmentID: NullDBPage]]]; InitializeBufferMgr[]; initialized _ TRUE; }; AttachSegment: PUBLIC PROC [ fileName: ROPE, s: Segment, segmentIndex: SegmentIndex, readonly: BOOL, version: VersionOptions, initializeExistingFile: BOOL, nBytesInitial, nBytesPerExtent: INT] = { fileNode: FileTable.Node; segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex]; IF NOT initialized THEN ERROR; fileNode _ files.Lookup[segmentID]; IF fileNode # NIL THEN { IF fileNode.trans # NIL THEN -- Client declared segment when transaction open on it {SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN}; IF fileNode.segmentID # segmentID OR fileNode.segment # s THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; fileNode.initializeExistingFile _ initializeExistingFile; fileNode.fileName_ fileName; fileNode.version_ version; fileNode.readonly_ readonly; fileNode.pagesInitial_ MAX[1, DBFile.PagesFromBytes[nBytesInitial]]; fileNode.pagesPerExtent_ MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]]; RETURN; }; fileNode _ zone.NEW[FileTable.NodeRecord _ [ segmentID: segmentID, segment: s, readonly: readonly, version: version, initializeExistingFile: initializeExistingFile, fileName: fileName, pagesInitial: MAX[1, DBFile.PagesFromBytes[nBytesInitial]], pagesPerExtent: MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]] ]]; files.Insert[nodeToInsert: fileNode, insertKey: segmentID]; }; OpenTransaction: PUBLIC PROC [s: Segment, useTrans: Trans, noLog: BOOL] = { fileNode: FileTable.Node = FindFile[s]; IF fileNode=NIL THEN {SIGNAL DBEnvironment.Error[SegmentNotDeclared]; RETURN}; IF fileNode.trans # NIL THEN {SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN}; IF useTrans = NIL THEN { server: ROPE = DBFile.FileServerFromFileName[fileNode.fileName]; useTrans _ DBFile.CreateTransaction[server]; }; fileNode.trans _ useTrans; OpenSegment[fileNode, noLog]; }; OpenSegment: PROC [fileNode: FileTable.Node, noLog: BOOL] = { createdFile: BOOL; IF fileNode.trans = NIL THEN ERROR; [fileNode.openFile, createdFile] _ DBFile.OpenFile[t: fileNode.trans, file: fileNode.fileName, version: fileNode.version, discardFileContents: fileNode.initializeExistingFile, nPagesInitial: fileNode.pagesInitial, noLog: noLog]; IF createdFile OR fileNode.initializeExistingFile THEN { InitializeSegment[fileNode]; fileNode.initializeExistingFile_ FALSE} ELSE CheckSegment[fileNode]; }; GetSegmentInfo: PUBLIC PROC [ s: Segment] RETURNS [filePath: ROPE, number: NAT, trans: Trans] = { fileNode: FileTable.Node_ FindFile[s]; IF fileNode=NIL THEN RETURN[NIL, 0, NIL] ELSE RETURN[ fileNode.fileName, SegmentIndexFromSegmentID[fileNode.segmentID], fileNode.trans] }; FinishTransaction: PUBLIC PROC [t: Trans, abort: BOOL, continue: BOOL] = { CloseFileIfNoTransaction: SAFE PROC [f: FileTable.Node] RETURNS [stop: BOOL] = TRUSTED { IF f.trans = t THEN { f.openFile _ NIL; f.trans _ NIL }; RETURN [stop: FALSE]; }; IF NOT abort AND NOT WriteoutCache[t] THEN ERROR; -- DBFile.FinishTransaction[t, abort, continue]; IF abort OR NOT continue THEN { AbortCache[t]; files.EnumerateIncreasing[CloseFileIfNoTransaction]; }; }; SegmentFromTID: PUBLIC PROC [tid: TID] RETURNS [s: Segment] = { fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[tid]]; IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL]; }; IsValidTID: PUBLIC PROC [tid: TID] RETURNS [valid: BOOL] = { fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[tid]]; RETURN [fileNode # NIL AND fileNode.openFile # NIL]; }; SegmentIDFromSegment: PUBLIC PROC [s: Segment] RETURNS [SegmentID] = { fileNode: FileTable.Node = FindFile[s]; IF fileNode = NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared]; IF fileNode.openFile = NIL THEN ERROR DBEnvironment.Error[TransactionNotOpen]; RETURN [fileNode.segmentID]; }; FindFile: PROC [s: Segment] RETURNS [fileNode: FileTable.Node] = { -- If efficiency of this ever becomes important, we can attach info to prop list of atom. StopOnMatch: SAFE PROC [f: FileTable.Node] RETURNS [stop: BOOL] = TRUSTED { IF f.segment = s THEN { fileNode _ f; RETURN [stop: TRUE] }; RETURN [stop: FALSE]; }; fileNode _ NIL; files.EnumerateIncreasing[StopOnMatch]; RETURN [fileNode]; }; EnumerateSegments: PUBLIC PROC [ enumProc: PROC [s: Segment, segmentIndex: SegmentIndex] RETURNS [stop: BOOL]] = { f: FileTable.Node _ files.LookupSmallest[]; WHILE f # NIL DO IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT; f _ files.LookupNextLarger[f.segmentID]; ENDLOOP; }; FilePageFromDBPage: PROC [dbPage: DBPage] RETURNS [readonly: BOOL, file: DBCommon.OpenFileHandle, page: CARDINAL] = { segmentID: SegmentID; fileNode: FileTable.Node; [segmentID, page] _ DecomposeDBPage[dbPage]; fileNode _ files.Lookup[segmentID]; IF fileNode = NIL OR (file _ fileNode.openFile) = NIL THEN ERROR DBEnvironment.Error[TransactionNotOpen]; RETURN [fileNode.readonly, file, page]; }; -- Procs that understand the format of the segment head page. InitializeSegment: PROC [fileNode: FileTable.Node] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile]; fileNode.readonly _ FALSE; [segmentHeadHandle, segmentHead] _ DBSegment.WritePage[fileNode.segmentID, NIL]; segmentHead^ _ [ segmentID: fileNode.segmentID, allocator: [ firstPageInBlock: ConsDBPage[fileNode.segmentID, 1], nPagesInBlock: fileSize-1, pagesPerExtent: fileNode.pagesPerExtent]]; ConvertUnsafe.AppendRope[ to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]]; [segmentHead.index1, segmentHead.index2] _ createIndexesProc[fileNode.segment]; DBSegment.UnlockPage[segmentHeadHandle]; }; CheckSegment: PROC [fileNode: FileTable.Node] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; [segmentHeadHandle, segmentHead] _ DBSegment.ReadPage[fileNode.segmentID, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap OR segmentHead.softwareCompatibilityVersion # DBCommon.softwareCompatibilityVersion OR segmentHead.seal # DBSegmentPrivate.Seal THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; IF NOT Rope.Equal[ s1: Atom.GetPName[fileNode.segment], s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]], case: FALSE] THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; DBSegment.UnlockPage[segmentHeadHandle]; }; RootIndicesFromSegment: PUBLIC PROC [s: Segment] RETURNS [index1, index2: TID] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileNode: FileTable.Node = FindFile[s]; IF fileNode=NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared]; [segmentHeadHandle, segmentHead] _ DBSegment.ReadPage[fileNode.segmentID, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; index1 _ segmentHead.index1; index2 _ segmentHead.index2; DBSegment.UnlockPage[segmentHeadHandle]; RETURN [index1, index2]; }; AllocPage: PUBLIC PROC [s: DBPage] RETURNS [allocatedPage: DBPage, allocatedPageHandle: CacheHandle, allocatedPagePtr: LONG POINTER] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; DBStats.Inc[SegmentAllocPage]; [segmentHeadHandle, segmentHead] _ DBSegment.WritePage[s, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; IF (allocatedPage _ segmentHead.allocator.freeList) # NullDBPage THEN { -- The segment's free list is nonempty, so return a page from it. allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage; [allocatedPageHandle, allocatedFreeListPagePtr] _ DBSegment.WritePage[allocatedPage, NIL]; IF allocatedFreeListPagePtr.tag # DBStoragePagetags.Free THEN ERROR DBEnvironment.InternalError; segmentHead.allocator.freeList _ allocatedFreeListPagePtr.next; allocatedPagePtr _ allocatedFreeListPagePtr; } ELSE { -- The segment's free list is empty, so allocate from block allocator, perhaps extending file. IF segmentHead.allocator.firstPageInBlock = NullDBPage THEN { fileNode: FileTable.Node _ files.Lookup[s]; size: CARDINAL _ DBFile.GetSize[fileNode.openFile]; DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent]; segmentHead.allocator.firstPageInBlock _ ConsDBPage[s, size]; segmentHead.allocator.nPagesInBlock _ segmentHead.allocator.pagesPerExtent; }; allocatedPage _ segmentHead.allocator.firstPageInBlock; IF (segmentHead.allocator.nPagesInBlock _ segmentHead.allocator.nPagesInBlock-1) = 0 THEN segmentHead.allocator.firstPageInBlock _ NullDBPage ELSE segmentHead.allocator.firstPageInBlock _ allocatedPage + DBStorageTID.DBPageIncrement; [allocatedPageHandle, allocatedPagePtr] _ NewPage[allocatedPage]; }; DBSegment.UnlockPage[segmentHeadHandle]; LOOPHOLE[allocatedPagePtr, LONG POINTER TO DBStoragePagetags.PageHeader].pageTag _ DBStoragePagetags.Unused; RETURN[allocatedPage, allocatedPageHandle, allocatedPagePtr]; };--AllocPage FreePage: PUBLIC PROC [s: DBPage, freePg: DBPage, freeHint: CacheHandle] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; free: LONG POINTER TO DBSegmentPrivate.FreeListPage; DBStats.Inc[SegmentFreePage]; IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBEnvironment.InternalError; [segmentHeadHandle, segmentHead] _ DBSegment.WritePage[s, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError; IF DBSegment.LockCount[freePg, freeHint] # 1 THEN ERROR DBEnvironment.InternalError; free _ DBSegment.ObtainCoreLoc[freePg, freeHint]; IF free.tag = DBStoragePagetags.Free OR free.tag = DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError; DBSegment.WriteLockedPage[freeHint]; free^ _ [next: segmentHead.allocator.freeList]; segmentHead.allocator.freeList _ freePg; DBSegment.UnlockPage[freeHint]; DBSegment.UnlockPage[segmentHeadHandle]; };--FreePage -- Procs that understand the bit layout of a DBPage. SegmentIDFromSegmentIndex: PROC [segmentIndex: SegmentIndex] RETURNS [SegmentID] = { segmentID: Inline.LongNumber.num = [num[lowbits: 0, highbits: Inline.BITSHIFT[value: segmentIndex, count: 7]]]; RETURN [LOOPHOLE[segmentID]]; }; SegmentIndexFromSegmentID: PROC [segmentID: SegmentID] RETURNS [SegmentIndex] = { RETURN [Inline.BITSHIFT[ value: LOOPHOLE[segmentID, Inline.LongNumber.num].highbits, count: -7]]; }; SegmentIDFromDBPage: PUBLIC PROC [dbPage: DBPage] RETURNS [SegmentID] = { RETURN [InlineSegmentIDFromDBPage[dbPage]] }; InlineSegmentIDFromDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID] = INLINE { segmentID: Inline.LongNumber.num = [num[lowbits: 0, highbits: Inline.BITAND[LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, 177600B]]]; RETURN [LOOPHOLE[segmentID]]; }; DecomposeDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID, CARDINAL] = { segmentID1: Inline.LongNumber.num = [num[lowbits: 0, highbits: Inline.BITAND[LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, 177600B]]]; filePage: CARDINAL _ Inline.BITOR[ Inline.BITSHIFT[ value: LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, count: 10], Inline.BITSHIFT[ value: LOOPHOLE[dbPage, Inline.LongNumber.num].lowbits, count: -6]]; RETURN [LOOPHOLE[segmentID1], filePage]; }; ConsDBPage: PROC [segmentID: SegmentID, filePage: CARDINAL] RETURNS [dbPage: DBPage] = { -- Inverse of DecomposeDBPage dbPage1: Inline.LongNumber.num = [num[ highbits: Inline.BITOR[ LOOPHOLE[segmentID, Inline.LongNumber.num].highbits, Inline.BITSHIFT[value: filePage, count: -10]], lowbits: Inline.BITSHIFT[value: filePage, count: 6]]]; RETURN[LOOPHOLE[dbPage1]]; }; Hash: PROC [dbPage: DBPage] RETURNS [NAT] = INLINE { RETURN [ Inline.BITAND[ hashMask, Inline.BITXOR[ Inline.BITSHIFT[ value: LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, count: -7], Inline.BITSHIFT[ value: LOOPHOLE[dbPage, Inline.LongNumber.num].lowbits, count: -6]]]]; }; -- Buffer manager invalidKey: DBPage = 37777777777B; -- doesn't match any true DBPage (since all DBPages have low bits 0). InitializeBufferMgr: PROC [] = { -- Number of hash buckets = smallest power of 2 not less than cacheSize. hashBuckets: CARDINAL _ 1; temp: CARDINAL _ Inline.BITSHIFT[cacheSize, -1]; UNTIL temp=0 DO hashBuckets _ Inline.BITSHIFT[hashBuckets, 1]; temp _ Inline.BITSHIFT[temp, -1]; ENDLOOP; ht _ zone.NEW[HashTable[hashBuckets] _ [nItems: 0, buckets: ]]; hashMask _ hashBuckets-1; -- Create cacheRecords, format them into an lru list. lruList _ zone.NEW[CacheRecord _ [dbPage: invalidKey, pagePtr: NIL]]; lruList.pred _ lruList.succ _ lruList; FOR i: NAT IN [0 .. (cacheSize+15)/16) DO AddCachePages[16]; ENDLOOP; }; ReadPage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = { IF logReads THEN DBCommon.GetDebugStream[].PutChar['#]; DBStats.Inc[CacheReadOrWrite]; IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage; -- hint is wrong, so search hash table pHint _ Lookup[p]; IF pHint # NIL THEN GO TO ReturnPage; -- page is not present in cache, read it in IF logReads THEN DBCommon.GetDebugStream[].PutChar['!]; DBStats.Inc[CacheMiss]; pHint _ GetUnoccupiedPage[]; { -- We expect ERRORs to be raised from DBFile.ReadFilePage file: DBCommon.OpenFileHandle; page: CARDINAL; [pHint.readonly, file, page] _ FilePageFromDBPage[p]; DBFile.ReadFilePage[file, page, pHint.pagePtr] }; pHint.dbPage _ p; Insert[pHint]; pHint.lockCount _ 0; GOTO ReturnPage; EXITS ReturnPage => { MakeMRU[pHint]; pHint.lockCount _ pHint.lockCount + 1; RETURN[pHint, pHint.pagePtr] } }; WritePage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = { DBStats.Inc[CacheReadOrWrite]; IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage; -- hint is wrong, so search hash table pHint _ Lookup[p]; IF pHint # NIL THEN GOTO ReturnPage; -- page is not present in cache, read it in DBStats.Inc[CacheMiss]; pHint _ GetUnoccupiedPage[]; { -- We expect ERRORs to be raised from DBFile.ReadFilePage file: DBCommon.OpenFileHandle; page: CARDINAL; [pHint.readonly, file, page] _ FilePageFromDBPage[p]; IF pHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed]; DBFile.ReadFilePage[file, page, pHint.pagePtr] }; pHint.dbPage _ p; Insert[pHint]; pHint.lockCount _ 0; GOTO ReturnPage; EXITS ReturnPage => { MakeMRU[pHint]; pHint.lockCount _ pHint.lockCount + 1; pHint.written _ TRUE; RETURN[pHint, pHint.pagePtr] } }; NewPage: PROC [p: DBPage] RETURNS [CacheHandle, LONG POINTER] = { pHint: CacheHandle _ Lookup[p]; IF pHint # NIL THEN ERROR DBEnvironment.InternalError; -- [NotaNewPage] pHint _ GetUnoccupiedPage[]; { [pHint.readonly, , ] _ FilePageFromDBPage[p]; IF pHint.readonly THEN DBEnvironment.Error[WriteNotAllowed]; }; pHint.dbPage _ p; Insert[pHint]; MakeMRU[pHint]; pHint.lockCount _ 1; pHint.written _ TRUE; RETURN[pHint, pHint.pagePtr] }; WriteLockedPage: PUBLIC PROC [pValidHint: CacheHandle] = { IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked] IF pValidHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed]; pValidHint.written _ TRUE; }; LockCount: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [CARDINAL] = { IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- BadCacheHandle RETURN[pValidHint.lockCount]; }; ObtainCoreLoc: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [LONG POINTER] = { IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [BadCacheHandle]; RETURN[pValidHint.pagePtr]; }; UnlockPage: PUBLIC PROC [pValidHint: CacheHandle] = { IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked]; pValidHint.lockCount _ pValidHint.lockCount - 1; }; WriteoutCache: PROC [t: Trans] RETURNS [success: BOOL] = { WritePageOutIfUsesTrans: PROC [cacheHandle: CacheHandle] = { IF cacheHandle.written THEN { fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[cacheHandle.dbPage]]; IF fileNode.trans = t THEN WritePageOut[cacheHandle]; }; }; EnumerateHashTable[WritePageOutIfUsesTrans]; RETURN[TRUE]; }; AbortCache: PROC [t: Trans] = { DeleteIfUsesTrans: PROC [cacheHandle: CacheHandle] = { IF cacheHandle.occupied THEN { fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[cacheHandle.dbPage]]; IF fileNode.trans = t THEN Delete[cacheHandle]; }; }; EnumerateLRUList[DeleteIfUsesTrans]; }; WritePageOut: PROC [cacheHandle: CacheHandle] = { readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL; DBStats.Inc[CacheWritePageBack]; [readonly, file, page] _ FilePageFromDBPage[cacheHandle.dbPage]; IF readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed]; DBFile.WriteFilePage[file, page, cacheHandle.pagePtr]; cacheHandle.written _ FALSE; }; -- Buffer manager LRU list AddCachePages: PROC [nPages: NAT] = { -- creates nPages new cache records, in the LRU position. curCachePage: LONG POINTER _ RTOS.GetPermanentDataPages[ nPages: nPages*DBCommon.PagesPerDBPage, createUniformSwapUnits: TRUE]; FOR i: NAT IN [0..nPages) DO p: CacheHandle _ zone.NEW[CacheRecord _ [ dbPage: invalidKey, pagePtr: curCachePage]]; curCachePage _ curCachePage + DBCommon.WordsPerPage; p.pred _ lruList.pred; p.succ _ lruList; lruList.pred.succ _ p; lruList.pred _ p; ENDLOOP; }; MakeMRU: PROC [x: CacheHandle] = INLINE { -- Assumes that x is part of lruList. Deletes x from its current position and makes --it first in lruList. x.pred.succ _ x.succ; x.succ.pred _ x.pred; x.succ _ lruList.succ; x.pred _ lruList; lruList.succ.pred _ x; lruList.succ _ x; }; EnumerateLRUList: PROC [procToCall: PROC[CacheHandle]] = { -- Calls procToCall for each record in LRUList, in MRU to LRU order. -- procToCall may not perform MakeMRU; use EnumerateHashTable if you want this. -- Note that procToCall will be called for both occupied and unoccupied pages. FOR p: CacheHandle _ lruList.succ, p.succ UNTIL p = lruList DO procToCall[p]; ENDLOOP; }; GetUnoccupiedPage: PROC RETURNS [CacheHandle] = { -- Obtain an empty cache page. It will be the least-recently used page that --unoccupied or occupied but unlocked. The page is returned with Key = garbage, --Page = ptr to a cache page, occupied = FALSE, written = FALSE, --and entered on lruList in no particular location. cacheHandle: CacheHandle; FOR cacheHandle _ lruList.pred, cacheHandle.pred UNTIL cacheHandle = lruList DO IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage; IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied; REPEAT FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied MakePageUnoccupied => { -- cacheHandle.occupied AND cacheHandle.lockCount=0 IF cacheHandle.written THEN WritePageOut[cacheHandle]; Delete[cacheHandle]; }; FINISHED => ERROR DBEnvironment.Fatal[AllCachePagesLocked] ENDLOOP; cacheHandle.written _ FALSE; RETURN[cacheHandle]; }; -- Buffer manager hash table EnumerateHashTable: PROC[procToCall: PROC[CacheHandle]] = { -- Calls procToCall for each record in table. -- procToCall may not perform Delete; use EnumerateLRUList if you want this. FOR i: NAT IN [0..ht.size) DO FOR p: CacheHandle _ ht.buckets[i], p.hashChain UNTIL p = NIL DO procToCall[p]; ENDLOOP; ENDLOOP; }; Lookup: PROC[dbPage: DBPage] RETURNS[CacheHandle] = { i: CARDINAL _ Hash[dbPage]; p: CacheHandle _ ht.buckets[i]; DBStats.Inc[CacheHTLookup]; WHILE p # NIL DO IF p.dbPage = dbPage THEN RETURN[p]; DBStats.Inc[CacheHTConflictInLookup]; p _ p.hashChain; ENDLOOP; RETURN[NIL]; }; Insert: PROC[x: CacheHandle] = { i: CARDINAL _ Hash[x.dbPage]; -- Make sure its not there already... p: CacheHandle _ ht.buckets[i]; WHILE p # NIL DO IF p.dbPage = x.dbPage THEN GOTO FoundDuplicate; p _ p.hashChain; REPEAT FoundDuplicate => ERROR DBEnvironment.InternalError; -- [AlreadyInHashTable]; FINISHED => { x.hashChain _ ht.buckets[i]; ht.buckets[i] _ x; ht.nItems _ ht.nItems + 1; x.occupied _ TRUE; }; ENDLOOP; }; Delete: PROC[x: CacheHandle] = { i: CARDINAL _ Hash[x.dbPage]; p: CacheHandle _ ht.buckets[i]; IF p = x THEN {ht.buckets[i] _ x.hashChain; GOTO Deleted}; WHILE p # NIL DO IF p.hashChain = x THEN {p.hashChain _ x.hashChain; GOTO Deleted}; p _ p.hashChain; REPEAT FINISHED => ERROR DBEnvironment.InternalError; -- [NotPresentInHashTable]; ENDLOOP; EXITS Deleted => { x.dbPage _ invalidKey; x.occupied _ FALSE; x.hashChain _ NIL; ht.nItems _ ht.nItems - 1; }; }; -- Buffer manager debugging AssertionFailed: SIGNAL = CODE; Assert: PROC [assertion: BOOL] = {IF ~assertion THEN SIGNAL AssertionFailed; }; -- So we can Proceed (in debugger) from failure. CheckState: PUBLIC PROC [doPrinting, printOnlyLockedPages: BOOL] = { nOccupiedItems, checkOccupiedItems: CARDINAL _ 0; nLockedItems: CARDINAL _ 0; nFreeItems: CARDINAL _ 0; PageHeader: TYPE = MACHINE DEPENDENT RECORD[ pageTag: CARDINAL [0..256), dontCare: CARDINAL [0..256)]; IOref: PROC[p: REF ANY] RETURNS[IO.Value] = INLINE { RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]}; IOptr: PROC[p: LONG POINTER] RETURNS[IO.Value] = INLINE { RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]}; out: IO.Handle _ IF doPrinting THEN DBCommon.GetDebugStream[] ELSE NIL; CheckEntry1: PROC[p: CacheHandle] = { Assert[p.pred.succ = p]; Assert[p.succ.pred = p]; Assert[p.pagePtr # NIL]; IF ~p.occupied THEN { nFreeItems _ nFreeItems + 1; Assert[p.dbPage = invalidKey]; RETURN }; Assert[p.dbPage # invalidKey]; nOccupiedItems _ nOccupiedItems + 1; IF p.lockCount > 0 THEN nLockedItems _ nLockedItems + 1; IF p.written THEN Assert[NOT p.readonly]; IF ~doPrinting OR (p.lockCount = 0 AND printOnlyLockedPages) THEN RETURN; {pageTag: CARDINAL = LOOPHOLE[p.pagePtr, LONG POINTER TO PageHeader].pageTag; out.PutF["%11bB %11bB %11bB %3d", IOref[p], IO.card[p.dbPage], IOptr[p.pagePtr], IO.card[pageTag]]; out.PutF[" %3d %g %g*n", IO.card[p.lockCount], IO.bool[p.written], IO.bool[p.readonly]]; }; }; CheckEntry2: PROC[p: CacheHandle] = { Assert[p.occupied]; checkOccupiedItems _ checkOccupiedItems + 1; }; IF doPrinting THEN { out.PutF["Cache size %g pages.\n", IO.card[cacheSize]]; out.PutRope[ " cacheHdl dbPage pagePtr tag lockCnt written readonly\n"]; }; EnumerateLRUList[CheckEntry1]; EnumerateHashTable[CheckEntry2]; IF doPrinting THEN out.PutF["%g pages occupied, %g are locked\n", IO.card[nOccupiedItems], IO.card[nLockedItems]]; Assert[nOccupiedItems = checkOccupiedItems]; Assert[ht.nItems = checkOccupiedItems]; Assert[nFreeItems + nOccupiedItems = cacheSize]; }; END.--DBSegmentImpl CHANGE LOG Created by MBrown on December 10, 1979 5:09 PM Changed by MBrown on December 1, 1982 9:41 am -- Total rewrite for multiple segments, merging in DBCacheImpl. No more support for module --finalization. Discarded >3 pages of change log. Changed by Cattell on January 16, 1983 2:35 pm -- Added some more meaningful signals for various client errors for segments and transactions, --in conjunction with new DBEnvironment interface replacing DBException. Changed by Cattell on January 17, 1983 8:21 pm -- AttachSegment didn't put readOnly and version into fileNode when segment re-declared, so couldn't re-open transaction with version to create new file. Changed by Cattell on January 19, 1983 10:57 am -- The code didn't check for FindFile returning NIL in a couple places, resulting in address faults if the segment was not declared. Put in checks and signal generation. Changed by Willie-Sue on February 3, 1983 -- Added noLog arg to OpenTransaction and OpenSegment Changed by MBrown on February 25, 1983 1:39 pm -- Fixed bug in ReadPage, WritePage, NewPage: buffer left in "occupied" state after --error (e.g. attempt to read after transaction abort). Changed by Cattell & Donahue on May 19, 1983 2:26 pm -- Changed AttachSegment so can re-call it with new file name for same segment atom.