DIRECTORY AlpineEnvironment, Atom, Basics, ConvertUnsafe, DB USING [Error, Failure], DBCommon, DBFile, DBSegment, DBSegmentPrivate, DBStats, DBStoragePage, File, IO, Rope USING [ROPE, Equal], VM USING [Allocate, AddressForPageNumber]; DBSegmentImpl: CEDAR PROGRAM IMPORTS Atom, Basics, ConvertUnsafe, DB, DBCommon, DBFile, DBStats, IO, Rope, VM EXPORTS DBSegment = BEGIN ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; Trans: TYPE = DBCommon.Transaction; Segment: TYPE = DBCommon.Segment; SegmentID: TYPE = DBCommon.SegmentID; SegmentIndex: TYPE = DBCommon.SegmentIndex; VersionOptions: TYPE = DBCommon.VersionOptions; DBPage: TYPE = DBCommon.DBPage; NullDBPage: DBPage = DBCommon.NullDBPage; CacheHandle: TYPE = DBCommon.CacheHandle; CacheRecord: TYPE = DBCommon.CacheRecord; TID: TYPE = DBCommon.TID; TakeALookAtThis: ERROR = CODE; FileNode: TYPE = REF FileNodeRecord; FileNodeRecord: TYPE = RECORD [ segmentID: DBCommon.DBPage, segment: DBCommon.Segment _ NIL, fileName: ROPE _ NIL, openFile: DBCommon.OpenFileHandle _ NIL, readonly: BOOL _ TRUE, lock: AlpineEnvironment.LockOption _ [intendWrite, wait], version: DBCommon.VersionOptions _ OldFileOnly, pagesInitial, pagesPerExtent: NAT _ 0, rest: FileNode _ NIL ]; HashTable: TYPE = RECORD [ nItems: NAT _ 0, -- number of CacheRecords in table buckets: SEQUENCE size: NAT OF CacheHandle ]; initialized: BOOL _ FALSE; createIndexesProc: PROC [s: Segment] RETURNS [ARRAY[0..DBCommon.systemIndexCount) OF TID]; logReads: BOOL _ FALSE; -- causes reads & writes to be logged on UserExec noteThisPage: DBPage _ 0; -- causes signal when this page is read or written files: FileNode _ NIL; cacheSize: NAT; ht: REF HashTable; hashMask: CARDINAL; lruList: CacheHandle; Initialize: PUBLIC PROC [ nCachePages: NAT, useCacheFile: ROPE, segmentInitProc: PROC [s: Segment] RETURNS [ARRAY[0..DBCommon.systemIndexCount) OF TID] ] = { IF initialized THEN ERROR DBCommon.InternalError; cacheSize _ nCachePages; createIndexesProc _ segmentInitProc; InitializeBufferMgr[]; initialized _ TRUE; }; FindSegmentID: PROC[segmentID: DBCommon.DBPage] RETURNS[found: FileNode] = { FOR found _ files, found.rest UNTIL found=NIL DO IF found.segmentID=segmentID THEN RETURN ENDLOOP}; FindSegment: PROC [s: Segment] RETURNS [found: FileNode] = { FOR found _ files, found.rest UNTIL found=NIL DO IF found.segment=s THEN RETURN ENDLOOP }; AttachSegment: PUBLIC PROC [fileName: ROPE, s: Segment, segmentIndex: SegmentIndex, lock: AlpineEnvironment.LockOption, readonly: BOOL, version: VersionOptions, nPagesInitial, nPagesPerExtent: NAT] RETURNS[newAttachment: BOOL] = { segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex]; fileNode: FileNode = FindSegmentID[segmentID]; IF NOT initialized THEN ERROR; IF fileNode # NIL THEN { IF fileNode.segmentID # segmentID OR fileNode.segment # s THEN ERROR DB.Error[MismatchedExistingSegment]; newAttachment _ NOT Rope.Equal[fileNode.fileName, fileName]; fileNode.fileName _ fileName; fileNode.version _ version; fileNode.lock _ lock; fileNode.readonly _ readonly; fileNode.pagesInitial _ MAX[1, nPagesInitial]; fileNode.pagesPerExtent _ MAX[1, nPagesPerExtent]; RETURN; }; files _ NEW[FileNodeRecord _ [ segmentID: segmentID, segment: s, lock: lock, readonly: readonly, version: version, fileName: fileName, pagesInitial: MAX[1, nPagesInitial], pagesPerExtent: MAX[1, nPagesPerExtent], rest: files ]]; newAttachment _ TRUE }; OpenSegment: PUBLIC PROC [s: Segment, trans: Trans, eraseAfterOpening: BOOL] = { ENABLE UNWIND => { CloseSegment[s]; FlushCache[s] }; createdFile: BOOL; fileNode: FileNode = FindSegment[s]; [fileNode.openFile, createdFile] _ DBFile.OpenFile[t: trans, file: fileNode.fileName, version: fileNode.version, discardFileContents: eraseAfterOpening, nPagesInitial: fileNode.pagesInitial, lock: fileNode.lock, readOnly: fileNode.readonly]; IF createdFile OR eraseAfterOpening THEN InitializeSegment[fileNode] ELSE CheckSegment[fileNode] }; CloseSegment: PUBLIC PROC [s: Segment] ~ { fileNode: FileNode = FindSegment[s]; IF fileNode # NIL AND fileNode.openFile # NIL THEN DBFile.Close[fileNode.openFile]; fileNode.openFile _ NIL }; GetSegmentInfo: PUBLIC PROC [s: Segment] RETURNS [filePath: ROPE, readOnly: BOOL] = { fileNode: FileNode = FindSegment[s]; IF fileNode=NIL THEN RETURN[NIL, FALSE] ELSE RETURN[fileNode.fileName, fileNode.readonly] }; GetVersionStamp: PUBLIC PROC [segment: Segment] RETURNS[schemaVersion: INT] = TRUSTED { fileNode: FileNode = FindSegment[segment]; segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared]; [segmentHeadHandle, segmentHead] _ ReadPage[fileNode.segmentID, NIL]; schemaVersion _ segmentHead.schemaVersionStamp; UnlockPage[segmentHeadHandle] }; SetVersionStamp: PUBLIC PROC [segment: Segment, to: INT] = TRUSTED { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileNode: FileNode = FindSegment[segment]; IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared]; IF fileNode.readonly THEN ERROR DB.Error[WriteNotAllowed]; [segmentHeadHandle, segmentHead] _ WritePage[fileNode.segmentID, NIL]; segmentHead.schemaVersionStamp _ to; UnlockPage[segmentHeadHandle] }; SegmentFromTID: PUBLIC PROC [tid: TID] RETURNS [s: Segment] = { fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]]; IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL]; }; IsValidTID: PUBLIC PROC [tid: TID] RETURNS [valid: BOOL] = { fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]]; RETURN [fileNode # NIL AND fileNode.openFile # NIL]; }; SegmentIDFromSegment: PUBLIC PROC [s: Segment] RETURNS [SegmentID] = { fileNode: FileNode = FindSegment[s]; IF fileNode = NIL THEN ERROR DB.Error[SegmentNotDeclared]; RETURN [fileNode.segmentID]; }; EnumerateSegments: PUBLIC PROC [enumProc: PROC [s: Segment, segmentIndex: SegmentIndex] RETURNS [stop: BOOL]] = { FOR f: FileNode _ files, f.rest UNTIL f=NIL DO IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT; ENDLOOP; }; FilePageFromDBPage: PROC [dbPage: DBPage] RETURNS [readonly: BOOL, file: DBCommon.OpenFileHandle, page: CARDINAL] = { segmentID: SegmentID; fileNode: FileNode; [segmentID, page] _ DecomposeDBPage[dbPage]; fileNode _ FindSegmentID[segmentID]; IF fileNode = NIL OR (file _ fileNode.openFile) = NIL THEN ERROR DB.Error[TransactionNotOpen]; RETURN [fileNode.readonly, file, page]; }; InitializeSegment: PROC [fileNode: FileNode] = TRUSTED { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile]; fileNode.readonly _ FALSE; [segmentHeadHandle, segmentHead] _ WritePage[fileNode.segmentID, NIL]; segmentHead^ _ [ segmentID: fileNode.segmentID, allocator: [ firstPageInBlock: ConsDBPage[fileNode.segmentID, 1], nPagesInBlock: fileSize-1, pagesPerExtent: fileNode.pagesPerExtent], softwareCompatibilityVersion: DBCommon.softwareCompatibilityVersion, schemaVersionStamp: 1]; TRUSTED { ConvertUnsafe.AppendRope[ to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]]; }; segmentHead.systemIndices _ createIndexesProc[fileNode.segment]; UnlockPage[segmentHeadHandle]; }; CheckSegment: PROC [fileNode: FileNode] = TRUSTED { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; [segmentHeadHandle, segmentHead] _ ReadPage[fileNode.segmentID, NIL]; IF segmentHead.tag # DBStoragePage.AMap OR segmentHead.seal # DBSegmentPrivate.Seal THEN ERROR DB.Error[MismatchedExistingSegment]; IF segmentHead.softwareCompatibilityVersion # DBCommon.softwareCompatibilityVersion THEN ERROR DB.Error[MismatchedExistingSegment]; TRUSTED { IF NOT Rope.Equal[ s1: Atom.GetPName[fileNode.segment], s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]], case: FALSE] THEN ERROR DB.Error[MismatchedExistingSegment]; }; UnlockPage[segmentHeadHandle]; }; RootIndicesFromSegment: PUBLIC PROC [s: Segment] RETURNS [indices: ARRAY[0..DBCommon.systemIndexCount) OF TID] = TRUSTED { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileNode: FileNode = FindSegment[s]; IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared]; [segmentHeadHandle, segmentHead] _ ReadPage[fileNode.segmentID, NIL]; IF segmentHead.tag # DBStoragePage.AMap THEN ERROR DB.Error[MismatchedExistingSegment]; indices _ segmentHead.systemIndices; UnlockPage[segmentHeadHandle]; RETURN [indices]; }; AllocPage: PUBLIC PROC [s: SegmentID] RETURNS [p: DBPage, pValidHint: CacheHandle, cachePage: LONG POINTER] = TRUSTED { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; DBStats.Inc[SegmentAllocPage]; [segmentHeadHandle, segmentHead] _ WritePage[s, NIL]; IF segmentHead.tag # DBStoragePage.AMap THEN ERROR DB.Error[MismatchedExistingSegment]; IF (p _ segmentHead.allocator.freeList) # NullDBPage THEN { allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage; [pValidHint, allocatedFreeListPagePtr] _ WritePage[p, NIL]; IF allocatedFreeListPagePtr.tag # DBStoragePage.Free THEN ERROR DBCommon.InternalError; segmentHead.allocator.freeList _ allocatedFreeListPagePtr.next; cachePage _ allocatedFreeListPagePtr; } ELSE { IF segmentHead.allocator.firstPageInBlock = NullDBPage THEN { fileNode: FileNode _ FindSegmentID[s]; size: CARDINAL _ DBFile.GetSize[fileNode.openFile]; DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent]; segmentHead.allocator.firstPageInBlock _ ConsDBPage[s, size]; segmentHead.allocator.nPagesInBlock _ segmentHead.allocator.pagesPerExtent; }; p _ segmentHead.allocator.firstPageInBlock; IF (segmentHead.allocator.nPagesInBlock _ segmentHead.allocator.nPagesInBlock-1) = 0 THEN segmentHead.allocator.firstPageInBlock _ NullDBPage ELSE segmentHead.allocator.firstPageInBlock _ p + DBCommon.DBPageIncrement; [pValidHint, cachePage] _ NewPage[p]; }; UnlockPage[segmentHeadHandle]; TRUSTED { LOOPHOLE[cachePage, LONG POINTER TO DBStoragePage.PageHeader].pageTag _ DBStoragePage.Unused; }; RETURN[p, pValidHint, cachePage]; };--AllocPage FreePage: PUBLIC PROC [s: SegmentID, freePg: DBPage, freeHint: CacheHandle] = TRUSTED { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; free: LONG POINTER TO DBSegmentPrivate.FreeListPage; DBStats.Inc[SegmentFreePage]; IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBCommon.InternalError; [segmentHeadHandle, segmentHead] _ WritePage[s, NIL]; IF segmentHead.tag # DBStoragePage.AMap THEN ERROR DBCommon.InternalError; IF LockCount[freePg, freeHint] # 1 THEN ERROR DBCommon.InternalError; free _ ObtainCoreLoc[freePg, freeHint]; IF free.tag = DBStoragePage.Free OR free.tag = DBStoragePage.AMap THEN ERROR DBCommon.InternalError; WriteLockedPage[freeHint]; free^ _ [next: segmentHead.allocator.freeList]; segmentHead.allocator.freeList _ freePg; UnlockPage[freeHint]; UnlockPage[segmentHeadHandle]; };--FreePage SegmentIDFromSegmentIndex: PROC[segmentIndex: SegmentIndex] RETURNS[SegmentID] = { sid: SegmentID; segmentID: Basics.LongNumber.num = [num[lowbits: 0, highbits: Basics.BITSHIFT[value: segmentIndex, count: 7]]]; TRUSTED { sid _ LOOPHOLE[segmentID] }; RETURN [sid]; }; SegmentIndexFromSegmentID: PROC [segmentID: SegmentID] RETURNS [SegmentIndex] = TRUSTED { RETURN [Basics.BITSHIFT[ value: LOOPHOLE[segmentID, Basics.LongNumber.num].highbits, count: -7]]; }; SegmentIDFromDBPage: PUBLIC PROC [dbPage: DBPage] RETURNS [SegmentID] = { RETURN [InlineSegmentIDFromDBPage[dbPage]] }; InlineSegmentIDFromDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID] = TRUSTED INLINE { segmentID: Basics.LongNumber.num = [num[lowbits: 0, highbits: Basics.BITAND[LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, 177600B]]]; RETURN [LOOPHOLE[segmentID]]; }; DecomposeDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID, CARDINAL] = TRUSTED { segmentID1: Basics.LongNumber.num = [num[lowbits: 0, highbits: Basics.BITAND[LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, 177600B]]]; filePage: CARDINAL _ Basics.BITOR[ Basics.BITSHIFT[ value: LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, count: 10], Basics.BITSHIFT[ value: LOOPHOLE[dbPage, Basics.LongNumber.num].lowbits, count: -6]]; RETURN [LOOPHOLE[segmentID1], filePage]; }; ConsDBPage: PROC [segmentID: SegmentID, filePage: CARDINAL] RETURNS [dbPage: DBPage] = TRUSTED { dbPage1: Basics.LongNumber.num = [num[ highbits: Basics.BITOR[ LOOPHOLE[segmentID, Basics.LongNumber.num].highbits, Basics.BITSHIFT[value: filePage, count: -10]], lowbits: Basics.BITSHIFT[value: filePage, count: 6]]]; RETURN[LOOPHOLE[dbPage1]]; }; Hash: PROC [dbPage: DBPage] RETURNS [NAT] = TRUSTED INLINE { RETURN [ Basics.BITAND[ hashMask, Basics.BITXOR[ Basics.BITSHIFT[ value: LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, count: -7], Basics.BITSHIFT[ value: LOOPHOLE[dbPage, Basics.LongNumber.num].lowbits, count: -6]]]]; }; invalidKey: DBPage = 37777777777B; InitializeBufferMgr: PROC [] = { hashBuckets: CARDINAL _ 1; temp: CARDINAL _ Basics.BITSHIFT[cacheSize, -1]; UNTIL temp=0 DO hashBuckets _ Basics.BITSHIFT[hashBuckets, 1]; temp _ Basics.BITSHIFT[temp, -1]; ENDLOOP; ht _ NEW[HashTable[hashBuckets] _ [nItems: 0, buckets: ]]; hashMask _ hashBuckets-1; lruList _ NEW[CacheRecord _ [dbPage: invalidKey, pagePtr: NIL]]; lruList.pred _ lruList.succ _ lruList; FOR i: NAT IN [0 .. (cacheSize+15)/16) DO AddCachePages[16]; ENDLOOP; }; ReadPage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = { file: DBCommon.OpenFileHandle; page: CARDINAL; IF logReads THEN DBCommon.GetDebugStream[].PutChar['#]; DBStats.Inc[CacheReadOrWrite]; IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage; pHint _ Lookup[p]; IF pHint # NIL THEN GO TO ReturnPage; IF logReads THEN DBCommon.GetDebugStream[].PutChar['!]; DBStats.Inc[CacheMiss]; pHint _ GetUnoccupiedPage[]; { -- We expect ERRORs to be raised from DBFile.ReadFilePage [pHint.readonly, file, page] _ FilePageFromDBPage[p]; DBFile.ReadFilePage[file, page, pHint.pagePtr] }; pHint.dbPage _ p; Insert[pHint]; pHint.lockCount _ 0; GOTO ReturnPage; EXITS ReturnPage => { MakeMRU[pHint]; pHint.lockCount _ pHint.lockCount + 1; RETURN[pHint, pHint.pagePtr] } }; WritePage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = { DBStats.Inc[CacheReadOrWrite]; IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage; pHint _ Lookup[p]; IF pHint # NIL THEN GOTO ReturnPage; DBStats.Inc[CacheMiss]; pHint _ GetUnoccupiedPage[]; { -- We expect ERRORs to be raised from DBFile.ReadFilePage file: DBCommon.OpenFileHandle; page: CARDINAL; [pHint.readonly, file, page] _ FilePageFromDBPage[p]; IF pHint.readonly THEN ERROR DB.Error[WriteNotAllowed]; DBFile.ReadFilePage[file, page, pHint.pagePtr] }; pHint.dbPage _ p; Insert[pHint]; pHint.lockCount _ 0; GOTO ReturnPage; EXITS ReturnPage => { MakeMRU[pHint]; pHint.lockCount _ pHint.lockCount + 1; pHint.written _ TRUE; RETURN[pHint, pHint.pagePtr] } }; NewPage: PROC [p: DBPage] RETURNS [CacheHandle, LONG POINTER] = { segmentID: SegmentID; fileNode: FileNode; pHint: CacheHandle _ Lookup[p]; IF pHint # NIL THEN ERROR DBCommon.InternalError; -- [NotaNewPage] pHint _ GetUnoccupiedPage[]; { [segmentID, ] _ DecomposeDBPage[p]; fileNode _ FindSegmentID[segmentID]; IF fileNode = NIL OR fileNode.openFile = NIL THEN ERROR DB.Error[TransactionNotOpen]; pHint.readonly _ fileNode.readonly; IF pHint.readonly THEN ERROR DB.Error[WriteNotAllowed]; }; pHint.dbPage _ p; Insert[pHint]; MakeMRU[pHint]; pHint.lockCount _ 1; pHint.written _ TRUE; RETURN[pHint, pHint.pagePtr] }; WriteLockedPage: PUBLIC PROC [pValidHint: CacheHandle] = { IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN ERROR DBCommon.InternalError; -- [AlreadyUnlocked] IF pValidHint.readonly THEN ERROR DB.Error[WriteNotAllowed]; pValidHint.written _ TRUE; }; LockCount: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [CARDINAL] = { IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN ERROR DBCommon.InternalError; -- BadCacheHandle RETURN[pValidHint.lockCount]; }; ObtainCoreLoc: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [LONG POINTER] = { IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN ERROR DBCommon.InternalError; -- [BadCacheHandle]; RETURN[pValidHint.pagePtr]; }; UnlockPage: PUBLIC PROC [pValidHint: CacheHandle] = { IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN ERROR DBCommon.InternalError; -- [AlreadyUnlocked]; pValidHint.lockCount _ pValidHint.lockCount - 1; }; WriteOutCache: PUBLIC PROC [s: Segment] = { segmentID: SegmentID = SegmentIDFromSegment[s]; WritePageOutIfUsesTrans: PROC [cacheHandle: CacheHandle] = { IF cacheHandle.written THEN { thisSegment: SegmentID = InlineSegmentIDFromDBPage[cacheHandle.dbPage]; IF segmentID = thisSegment THEN WritePageOut[cacheHandle]; }; }; EnumerateHashTable[WritePageOutIfUsesTrans]; }; FlushCache: PUBLIC PROC [s: Segment] = { segmentID: SegmentID = SegmentIDFromSegment[s]; DeleteIfUsesTrans: PROC [cacheHandle: CacheHandle] = { IF cacheHandle.occupied THEN { thisSegment: SegmentID = InlineSegmentIDFromDBPage[cacheHandle.dbPage]; IF segmentID = thisSegment THEN Delete[cacheHandle]; }; }; EnumerateLRUList[DeleteIfUsesTrans] }; WritePageOut: PROC [cacheHandle: CacheHandle] = { readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL; DBStats.Inc[CacheWritePageBack]; [readonly, file, page] _ FilePageFromDBPage[cacheHandle.dbPage]; IF readonly THEN ERROR DB.Error[WriteNotAllowed]; DBFile.WriteFilePage[file, page, cacheHandle.pagePtr]; cacheHandle.written _ FALSE; }; AddCachePages: PROC [nPages: NAT] = TRUSTED { curCachePage: LONG POINTER _ VM.AddressForPageNumber[ VM.Allocate[count: nPages*DBCommon.PagesPerDBPage, in64K: TRUE].page]; FOR i: NAT IN [0..nPages) DO p: CacheHandle _ NEW[CacheRecord _ [ dbPage: invalidKey, pagePtr: curCachePage]]; curCachePage _ curCachePage + DBCommon.WordsPerPage; p.pred _ lruList.pred; p.succ _ lruList; lruList.pred.succ _ p; lruList.pred _ p; ENDLOOP; }; MakeMRU: PROC [x: CacheHandle] = { x.pred.succ _ x.succ; x.succ.pred _ x.pred; x.succ _ lruList.succ; x.pred _ lruList; lruList.succ.pred _ x; lruList.succ _ x; }; EnumerateLRUList: PROC [procToCall: PROC[CacheHandle]] = { FOR p: CacheHandle _ lruList.succ, p.succ UNTIL p = lruList DO procToCall[p]; ENDLOOP; }; GetUnoccupiedPage: PROC RETURNS [CacheHandle] = { cacheHandle: CacheHandle; FOR cacheHandle _ lruList.pred, cacheHandle.pred UNTIL cacheHandle = lruList DO IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage; IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied; REPEAT FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied MakePageUnoccupied => { -- cacheHandle.occupied AND cacheHandle.lockCount=0 IF cacheHandle.written THEN WritePageOut[cacheHandle ! DB.Failure => CONTINUE]; Delete[cacheHandle]; }; FINISHED => ERROR DB.Failure[$allCachePagesLocked, "no empty cache pages"] ENDLOOP; cacheHandle.written _ FALSE; RETURN[cacheHandle]; }; EnumerateHashTable: PROC[procToCall: PROC[CacheHandle]] = { FOR i: NAT IN [0..ht.size) DO FOR p: CacheHandle _ ht.buckets[i], p.hashChain UNTIL p = NIL DO procToCall[p]; ENDLOOP; ENDLOOP; }; Lookup: PROC[dbPage: DBPage] RETURNS[CacheHandle] = { i: CARDINAL _ Hash[dbPage]; p: CacheHandle _ ht.buckets[i]; DBStats.Inc[CacheHTLookup]; WHILE p # NIL DO IF p.dbPage = dbPage AND p.occupied THEN RETURN[p]; DBStats.Inc[CacheHTConflictInLookup]; p _ p.hashChain; ENDLOOP; IF dbPage=noteThisPage THEN ERROR TakeALookAtThis; RETURN[NIL]; }; Insert: PROC[x: CacheHandle] = { i: CARDINAL _ Hash[x.dbPage]; p: CacheHandle _ ht.buckets[i]; WHILE p # NIL DO IF p.dbPage = x.dbPage THEN GOTO FoundDuplicate; p _ p.hashChain; REPEAT FoundDuplicate => ERROR DBCommon.InternalError; -- [AlreadyInHashTable]; FINISHED => { x.hashChain _ ht.buckets[i]; ht.buckets[i] _ x; ht.nItems _ ht.nItems + 1; x.occupied _ TRUE; IF x.dbPage=noteThisPage THEN ERROR TakeALookAtThis; }; ENDLOOP; }; Delete: PROC[x: CacheHandle] = { i: CARDINAL _ Hash[x.dbPage]; p: CacheHandle _ ht.buckets[i]; IF p = x THEN {ht.buckets[i] _ x.hashChain; GOTO Deleted}; WHILE p # NIL DO IF p.hashChain = x THEN {p.hashChain _ x.hashChain; GOTO Deleted}; p _ p.hashChain; REPEAT FINISHED => ERROR DBCommon.InternalError; -- [NotPresentInHashTable]; ENDLOOP; EXITS Deleted => { IF x.dbPage=noteThisPage THEN ERROR TakeALookAtThis; x.dbPage _ invalidKey; x.occupied _ FALSE; x.hashChain _ NIL; ht.nItems _ ht.nItems - 1; }; }; AssertionFailure: ERROR = CODE; Assert: PROC [assertion: BOOL] = {IF ~assertion THEN ERROR AssertionFailure; }; CheckState: PUBLIC PROC [doPrinting, printOnlyLockedPages: BOOL] = { nOccupiedItems, checkOccupiedItems: CARDINAL _ 0; nLockedItems: CARDINAL _ 0; nFreeItems: CARDINAL _ 0; PageHeader: TYPE = MACHINE DEPENDENT RECORD[ pageTag: CARDINAL [0..256), dontCare: CARDINAL [0..256)]; IOref: PROC[p: REF ANY] RETURNS[IO.Value] = INLINE { RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]}; IOptr: PROC[p: LONG POINTER] RETURNS[IO.Value] = INLINE { RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]}; out: IO.STREAM _ IF doPrinting THEN DBCommon.GetDebugStream[] ELSE NIL; CheckEntry1: PROC[p: CacheHandle] = { Assert[p.pred.succ = p]; Assert[p.succ.pred = p]; Assert[p.pagePtr # NIL]; IF ~p.occupied THEN { nFreeItems _ nFreeItems + 1; Assert[p.dbPage = invalidKey]; RETURN }; Assert[p.dbPage # invalidKey]; nOccupiedItems _ nOccupiedItems + 1; IF p.lockCount > 0 THEN nLockedItems _ nLockedItems + 1; IF p.written THEN Assert[NOT p.readonly]; IF ~doPrinting OR (p.lockCount = 0 AND printOnlyLockedPages) THEN RETURN; TRUSTED { pageTag: CARDINAL = LOOPHOLE[p.pagePtr, LONG POINTER TO PageHeader].pageTag; out.PutF["%11bB %11bB %11bB %3d", IOref[p], IO.card[p.dbPage], IOptr[p.pagePtr], IO.card[pageTag]]; out.PutF[" %3d %g %g*n", IO.card[p.lockCount], IO.bool[p.written]]; }; }; CheckEntry2: PROC[p: CacheHandle] = { Assert[p.occupied]; checkOccupiedItems _ checkOccupiedItems + 1; }; IF doPrinting THEN { out.PutF["Cache size %g pages.\n", IO.card[cacheSize]]; out.PutRope[ " cacheHdl dbPage pagePtr tag lockCnt written readonly\n"]; }; EnumerateLRUList[CheckEntry1]; EnumerateHashTable[CheckEntry2]; IF doPrinting THEN out.PutF["%g pages occupied, %g are locked\n", IO.card[nOccupiedItems], IO.card[nLockedItems]]; Assert[nOccupiedItems = checkOccupiedItems]; Assert[ht.nItems = checkOccupiedItems]; Assert[nFreeItems + nOccupiedItems = cacheSize]; }; END. -- DBSegmentImpl ΘDBSegmentImpl.mesa Copyright c 1985 by Xerox Corporation. All rights reserved. Last Edited by: Donahue, July 2, 1986 2:08:13 pm PDT MBrown on February 25, 1983 1:49 pm Cattell on September 16, 1983 11:13 am Willie-Sue on April 25, 1985 12:45:53 pm PST Wert, August 10, 1984 6:08:10 pm PDT Widom, September 12, 1985 4:54:35 pm PDT if openFile = NIL, then the segment is not currently open Module state List mapping SegmentID to interesting information about a file. Number of pages allocated to cache. Fast map from DBPage to CacheHandle. ht.size is a power of two. Equals ht.size-1, used in computing hash function. Points to header node of doubly-linked list of CacheRecords. Data structure invariants: A cache record is present in lruList for every allocated cache page; there is no separate "free list". occupied=FALSE means that the page is free, and that the other cache record fields (readonly, written) are undefined. occupied=TRUE means that the page contains valid data; a page is stored in ht iff occupied=TRUE. The lruList points to a cache record with no associated cache page; this "header node" makes the circular-list manipulations work properly. The most-recently used page is lruList.succ, and the least-recently used page is lruList.pred. Just a linear search, we're not going to have a tremendous number of segments open. Same as above, but looks for atom match instead of segment ID match. If efficiency of this ever becomes important, we can attach info to prop list of atom. Returns the current schema version stamp for the given segment. Called when the schema for the given segment is changed; updates the schema version stamp. Procs that understand the format of the segment head page. If the segment is not readonly then we need to be able to write the head page (its schema version stamp and software compatibility version). The segment's free list is nonempty, so return a page from it. The segment's free list is empty, so allocate from block allocator, perhaps extending file. Procs that understand the bit layout of a DBPage. Inverse of DecomposeDBPage Buffer manager doesn't match any true DBPage (since all DBPages have low bits 0). Number of hash buckets = smallest power of 2 not less than cacheSize. Create cacheRecords, format them into an lru list. IF p=noteThisPage THEN ERROR TakeALookAtThis; hint is wrong, so search hash table page is not present in cache, read it in IF p=noteThisPage THEN ERROR TakeALookAtThis; hint is wrong, so search hash table page is not present in cache, read it in Send the contents of a dirty cache page to the database segment, and mark the page no longer dirty (leaving it in the cache). Buffer manager LRU list Creates nPages new cache records, in the LRU position. Assumes that x is part of lruList. Deletes x from its current position and makes it first in lruList. Calls procToCall for each record in LRUList, in MRU to LRU order. procToCall may not perform MakeMRU; use EnumerateHashTable if you want this. Note that procToCall will be called for both occupied and unoccupied pages. Obtain an empty cache page. It will be the least-recently used page that is unoccupied or occupied but unlocked. The page is returned with Key = garbage, Page = ptr to a cache page, occupied = FALSE, written = FALSE, and entered on lruList in no particular location. If the write fails, then there's no harm in releasing the page, since the transaction under which the file was opened can no longer be used Buffer manager hash table Calls procToCall for each record in table. procToCall may not perform Delete; use EnumerateLRUList if you want this. Look the dbPage up in the hash table for the cache. Return the CacheHandle if it is found, else return NIL. Add the CacheHandle x to the hash table for the cache. Generate DBCommon.InternalError if a handle with the same dbPage is already there. Make sure its not there already... Delete the CacheHandle x from the hash table for the cache. Buffer manager debugging So we can Proceed (in debugger) from failure. Κ5˜šœ™Jšœ Οmœ1™<—šœ™Icode™$Jšœ#™#Jšœ&™&Jšœ,™,Jšœ$™$Jšœ(™(—J˜šΟk ˜ J˜J˜J˜J˜Jšžœžœ˜J˜ J˜J˜ J˜J˜J˜J˜Jšžœ˜Jšœžœžœ ˜Jšžœžœ"˜*J˜—šœžœž˜Jšžœžœžœž˜PJšžœ ˜J˜Jšœž˜J˜Jšžœžœžœ˜Jšžœžœžœžœ˜Jšœžœ˜#Jšœ žœ˜!Jšœ žœ˜%Jšœžœ˜+Jšœžœ˜/Jšœžœ˜J˜)Jšœ žœ˜)Jšœ žœ˜)Jšžœžœ žœ˜J˜Jšœžœžœ˜J˜Jšœ žœžœ˜$šœžœžœ˜J˜Jšœžœ˜ Jšœ žœžœ˜šœ$žœ˜(Jšœ9™9—Jšœ žœžœ˜Jšœ9˜9J˜/Jšœžœ˜&Jšœž˜J˜J˜Jšœ žœžœ˜JšœžœΟc"˜3Jšœ žœžœžœ ˜*J˜J˜—Jšœ ™ J˜Jšœ žœžœ˜Jš œžœžœžœžœžœ˜ZJ˜Jšœ žœžœŸ1˜JJšœŸ2˜MJ˜šœžœ˜Jšœ?™?—šœ žœ˜Jšœ#™#—šœžœ ˜Jšœ@™@—šœ žœ˜Jšœ2™2—˜Jšœ<™Jšžœžœ"˜*—Jšœžœ)˜J˜Jš žœ žœžœžœžœ˜8Jšœ@žœ˜EJšœ/˜/J˜Jšœ˜—J˜š  œžœžœžœžœ˜DJ™ZJšœ žœžœžœ"˜>J˜Jšœ*˜*Jš žœ žœžœžœžœ˜8Jšžœžœžœžœ˜:JšœBžœ˜GJšœ$˜$Jšœ ˜ J˜—š  œžœžœžœžœ˜?J˜CJšžœ žœžœžœžœžœžœ˜CJ˜—J˜š   œžœžœžœžœ žœ˜J˜Jšœ žœ%˜7Jšœžœ˜JšœAžœ˜F˜J˜˜ J˜4J˜J˜)—JšœD˜DJšœ˜—šžœ˜ ˜Jšœžœ>˜J—Jšœ˜—Jšœ@˜@J˜J˜—J˜š  œžœžœ˜3Jšœ žœžœžœ"˜>J˜J™ŒJšœ@žœ˜Ešžœ&ž˜*šœ)ž˜-Jšžœžœ"˜*——JšžœRžœžœžœ"˜ƒšžœ˜ šžœžœ ˜J˜$Jšœžœ˜8Jš œžœžœžœžœ"˜<—J˜—J˜J˜—J˜š œžœžœžœ žœžœžœžœ˜zJšœ žœžœžœ"˜>J˜J˜$Jš žœ žœžœžœžœ˜8Jšœ@žœ˜Ešžœ&ž˜,Jšžœžœ"˜*—Jšœ$˜$J˜Jšžœ ˜J˜J˜—š  œžœžœžœ1žœžœžœ˜wJšœ žœžœžœ"˜>J˜J˜Jšœ0žœ˜5šžœ&ž˜,Jšžœ%˜*—šžœ3žœ˜;Jšœ>™>Jšœžœžœžœ˜HJšœ6žœ˜;Jšžœ3žœžœ˜WJ˜?Jšœ%˜%J˜—šžœ˜Jšœ[™[šžœ5žœ˜=J˜&Jšœžœ%˜3J˜OJ˜=J˜KJ˜—J˜+šžœSž˜YJ˜3—šžœ)˜-J˜—Jšœ%˜%J˜—J˜šžœ˜ šžœ žœžœžœ$˜GJšœ˜—J˜—Jšžœ˜!JšœŸ ˜ —J˜š œžœžœ9žœ˜WJšœ žœžœžœ"˜>J˜Jšœžœžœžœ˜4J˜Jšžœ'žœžœ˜KJšœ0žœ˜5Jšžœ&žœžœ˜JJšžœ!žœžœ˜EJ˜'šžœžœž˜FJšžœ˜—J˜J˜/J˜(J˜J˜JšœŸ ˜ ——šœ1™1š œžœžœ˜RJšœ˜˜3Jšœžœ"˜;—Jšžœ žœ˜&Jšžœ˜ J˜J˜—š œžœžœžœ˜Yšžœ žœ˜Jšœžœ,˜;J˜ —J˜—J˜š œžœžœžœ˜IJšžœ$˜*J˜—J˜š  œžœžœžœžœ˜W˜3Jšœžœžœ5˜U—Jšžœžœ ˜J˜—J˜š  œžœžœ žœžœ˜P˜4Jšœžœžœ5˜U—šœ žœ žœ˜"šœžœ˜Jšœžœ5˜D—šœžœ˜Jšœžœ5˜D——Jšžœžœ˜(J˜—J˜š   œžœ"žœžœžœ˜`Jšœ™˜&šœžœ˜Jšžœ,˜4Jšœžœ˜.—Jšœžœ˜6—Jšžœžœ ˜J˜—J˜š œžœžœžœžœžœžœ˜Ešœžœ˜J˜ šœžœ˜šœžœ˜Jšœžœ)˜8J˜ —šœžœ˜Jšœžœ(˜7J˜———J˜——šœ™˜"JšœB™B—J˜š œžœ˜ JšœE™EJšœ žœ˜Jšœžœ žœ˜0šžœž˜Jšœžœ˜.Jšœžœ ˜!Jšžœ˜—Jšœžœ2˜:J˜Jšœ2™2Jšœ žœ-žœ˜@J˜&šžœžœžœž˜)J˜Jšžœ˜—J˜—J˜š  œžœžœ žœžœžœ˜]Jšœ&žœ˜/šžœ ž˜J˜&—J˜Jšœ-™-Jš žœ žœžœžœžœ ˜9Jšœ#™#J˜Jš žœ žœžœžœžœ ˜%Jšœ(™(Jšžœ žœ'˜7J˜J˜šœŸ9˜;J˜5J˜.J˜—J˜!J˜Jšžœ ˜šž˜˜J˜J˜&Jšžœ˜—J˜—J˜—J˜š   œžœžœ žœžœžœ˜^J˜Jšœ-™-Jš žœ žœžœžœžœžœ ˜:Jšœ#™#J˜Jšžœ žœžœžœ ˜$Jšœ(™(J˜J˜šœŸ9˜;Jšœ&žœ˜/J˜5Jšžœžœžœžœ˜7J˜.J˜—J˜!J˜Jšžœ ˜šž˜˜J˜J˜&Jšœžœ˜Jšžœ˜—J˜—J˜—J˜š  œžœ žœžœžœ˜AJ˜J˜J˜Jš žœ žœžœžœŸ˜BJ˜˜J˜#J˜$Jš žœ žœžœžœž œ˜UJ˜#Jšžœžœžœ˜7J˜—J˜!J˜J˜Jšœžœ˜Jšžœ˜J˜—J˜š œžœžœ˜:šžœžœžœž˜4JšžœŸ˜2—Jšžœžœžœžœ˜J˜—Jšžœ˜J˜—J˜š œžœžœ˜1JšœŒ™ŒJ˜šžœ.žœž˜OJšžœžœžœ˜7Jšžœžœžœ˜:—šž˜JšœžœŸ˜5šœŸ3˜Kšžœžœžœ žœ˜OJšœ‹™‹—J˜J˜—Jšžœžœ9˜J—Jšžœ˜Jšœžœ˜Jšžœ˜J˜——šœ™š œžœ žœ˜;Jšœt™tšžœžœžœž˜šžœ-žœžœž˜@J˜—Jšžœ˜—Jšžœ˜J˜—J˜š œžœžœ˜5Jšœl™lJšœžœ˜J˜J˜šžœžœž˜Jšžœžœ žœžœ˜3J˜%J˜—Jšžœ˜Jšžœžœžœ˜3Jšžœžœ˜ J˜—J˜š œžœ˜ JšœŠ™ŠJšœžœ˜Jšœ"™"J˜šžœžœž˜Jšžœžœžœ˜0J˜—šž˜JšœžœŸ˜Hšžœ˜ J˜J˜J˜Jšœ žœ˜Jšžœžœžœ˜5—J˜—Jšžœ˜J˜—J˜š œžœ˜ Jšœ;™;Jšœžœ˜J˜Jšžœžœžœ ˜:šžœžœž˜Jšžœžœžœ ˜BJ˜—šž˜JšžœžœŸ˜E—Jšžœ˜šž˜˜ Jšžœžœžœ˜5J˜Jšœ žœ˜Jšœžœ˜J˜J˜——J˜——šœ™Jšœžœžœ˜J˜š  œžœ žœžœ žœžœ˜OJšœ-™-—J˜š  œžœžœ$žœ˜DJšœ$žœ˜1Jšœžœ˜Jšœ žœ˜J˜š œ žœžœž œžœ˜,Jšœ žœ ˜Jšœ žœ ˜—š œžœžœžœžœžœ žœ˜4Jš žœžœžœžœžœ˜-—š œžœžœžœžœžœ žœ˜9Jš žœžœžœžœžœ˜-—J˜Jš œžœžœžœ žœžœžœ˜GJ˜š  œžœ˜%J˜J˜Jšœžœ˜šžœ žœ˜J˜J˜Jšžœ˜ —J˜J˜$Jšžœžœ!˜8Jšžœ žœžœ ˜)Jš žœ žœžœžœžœ˜Išžœ˜ š œ žœžœ žœžœžœ˜L˜"Jšœ žœ#žœ˜A—˜#Jšžœžœ˜*——J˜—J˜—š  œžœ˜%J˜J˜,J˜—šžœ žœ˜Jšœ#žœ˜7˜ J˜M—J˜—J˜J˜ šžœ ž˜˜.Jšžœžœ˜0——J˜,J˜'J˜0J˜J˜——JšžœŸ˜J˜J˜—…—XT‚Q