-- File: DBSegmentImpl.mesa -- Last edited by -- MBrown on February 25, 1983 1:49 pm -- Cattell on September 16, 1983 11:13 am -- Willie-Sue on February 3, 1983 10:47 am DIRECTORY Atom, Basics, ConvertUnsafe, DBCache, DBCommon, DBEnvironment, DBFile, DBSegment, DBSegmentPrivate, DBStats, DBStoragePagetags, DBStorageTID USING[DBPageIncrement, TID], File, IO, Rope USING [ROPE, Equal], VM USING [Allocate, AddressForPageNumber]; DBSegmentImpl: PROGRAM IMPORTS Atom, Basics, ConvertUnsafe, DBCommon, DBEnvironment, DBFile, DBSegment, DBStats, IO, Rope, VM EXPORTS DBSegment SHARES DBCache = BEGIN ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; Trans: TYPE = DBCommon.Trans; Segment: TYPE = DBCommon.Segment; SegmentID: TYPE = DBCommon.SegmentID; SegmentIndex: TYPE = DBCommon.SegmentIndex; VersionOptions: TYPE = DBCommon.VersionOptions; DBPage: TYPE = DBCommon.DBPage; NullDBPage: DBPage = DBCommon.NullDBPage; CacheHandle: TYPE = DBCache.CacheHandle; CacheRecord: TYPE = DBCache.CacheRecord; TID: TYPE = DBStorageTID.TID; TakeALookAtThis: SIGNAL = CODE; FileNode: TYPE = REF FileNodeRecord; FileNodeRecord: TYPE = RECORD [ segmentID: DBCommon.DBPage, segment: DBCommon.Segment _ NIL, trans: DBCommon.Trans _ NIL, openFile: DBCommon.OpenFileHandle _ NIL, readonly: BOOL _ TRUE, version: DBCommon.VersionOptions _ OldFileOnly, initializeExistingFile: BOOL _ FALSE, fileName: ROPE _ NIL, pagesInitial, pagesPerExtent: NAT _ 0, rest: FileNode _ NIL ]; HashTable: TYPE = RECORD [ nItems: NAT _ 0, -- number of CacheRecords in table buckets: SEQUENCE size: NAT OF CacheHandle ]; -- Module state initialized: BOOL _ FALSE; createIndexesProc: PROC [s: Segment] RETURNS [TID, TID]; logReads: BOOL_ FALSE; -- causes reads & writes to be logged on UserExec noteThisPage: DBPage_ 0; -- causes signal when this page is read or written files: FileNode_ NIL; -- List mapping SegmentID to interesting information about a file. cacheSize: NAT; -- Number of pages allocated to cache. ht: REF HashTable; -- Fast map from DBPage to CacheHandle. ht.size is a power of two. hashMask: CARDINAL; -- Equals ht.size-1, used in computing hash function. lruList: CacheHandle; -- Points to header node of doubly-linked list of CacheRecords. -- Data structure invariants: A cache record is present in lruList for every -- allocated cache page; there is no separate "free list". occupied=FALSE means that -- the page is free, and that the other cache record fields (readonly, written) -- are undefined. occupied=TRUE means that the page contains valid data; a page is -- stored in ht iff occupied=TRUE. -- The lruList points to a cache record with no associated cache page; this "header node" -- makes the circular-list manipulations work properly. The most-recently used page is -- lruList.succ, and the least-recently used page is lruList.pred. Initialize: PUBLIC PROC [ nCachePages: NAT, useCacheFile: ROPE, segmentInitProc: PROC [s: Segment] RETURNS [TID, TID]] = { IF initialized THEN ERROR DBEnvironment.InternalError; cacheSize _ nCachePages; createIndexesProc _ segmentInitProc; InitializeBufferMgr[]; initialized _ TRUE; }; FindSegmentID: PROC[segmentID: DBCommon.DBPage] RETURNS[found: FileNode] = { -- Just a linear search, we're not going to have a tremendous number of segments open. FOR found_ files, found.rest UNTIL found=NIL DO IF found.segmentID=segmentID THEN RETURN ENDLOOP}; FindSegment: PROC [s: Segment] RETURNS [found: FileNode] = { -- Same as above, but looks for atom match instead of segment ID match. -- If efficiency of this ever becomes important, we can attach info to prop list of atom. FOR found_ files, found.rest UNTIL found=NIL DO IF found.segment=s THEN RETURN ENDLOOP}; AttachSegment: PUBLIC PROC [ fileName: ROPE, s: Segment, segmentIndex: SegmentIndex, readonly: BOOL, version: VersionOptions, initializeExistingFile: BOOL, nBytesInitial, nBytesPerExtent: INT] = { fileNode: FileNode; segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex]; IF NOT initialized THEN ERROR; fileNode _ FindSegmentID[segmentID]; IF fileNode # NIL THEN { IF fileNode.trans # NIL THEN -- Client declared segment when transaction open on it {SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN}; IF fileNode.segmentID # segmentID OR fileNode.segment # s THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; fileNode.initializeExistingFile _ initializeExistingFile; fileNode.fileName_ fileName; fileNode.version_ version; fileNode.readonly_ readonly; fileNode.pagesInitial_ MAX[1, DBFile.PagesFromBytes[nBytesInitial]]; fileNode.pagesPerExtent_ MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]]; RETURN; }; files _ NEW[FileNodeRecord_ [ segmentID: segmentID, segment: s, readonly: readonly, version: version, initializeExistingFile: initializeExistingFile, fileName: fileName, pagesInitial: MAX[1, DBFile.PagesFromBytes[nBytesInitial]], pagesPerExtent: MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]], rest: files ]]; }; OpenTransaction: PUBLIC PROC [s: Segment, useTrans: Trans, noLog: BOOL] = { fileNode: FileNode = FindSegment[s]; IF fileNode=NIL THEN {SIGNAL DBEnvironment.Error[SegmentNotDeclared]; RETURN}; IF fileNode.trans # NIL THEN {SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN}; IF useTrans = NIL THEN { server: ROPE = DBFile.FileServerFromFileName[fileNode.fileName]; useTrans _ DBFile.CreateTransaction[server]; }; fileNode.trans _ useTrans; OpenSegment[fileNode, noLog]; }; OpenSegment: PROC [fileNode: FileNode, noLog: BOOL] = { createdFile: BOOL; IF fileNode.trans = NIL THEN ERROR; [fileNode.openFile, createdFile] _ DBFile.OpenFile[t: fileNode.trans, file: fileNode.fileName, version: fileNode.version, discardFileContents: fileNode.initializeExistingFile, nPagesInitial: fileNode.pagesInitial, readOnly: fileNode.readonly, noLog: noLog]; IF createdFile OR fileNode.initializeExistingFile THEN { InitializeSegment[fileNode]; fileNode.initializeExistingFile_ FALSE} ELSE CheckSegment[fileNode]; }; GetSegmentInfo: PUBLIC PROC [s: Segment] RETURNS [ filePath: ROPE, number: NAT, trans: Trans, readOnly: BOOL, nBytesInitial, nBytesPerExtent: INT] = { fileNode: FileNode_ FindSegment[s]; IF fileNode=NIL THEN RETURN[NIL, 0, NIL, FALSE, 0, 0] ELSE RETURN[ fileNode.fileName, SegmentIndexFromSegmentID[fileNode.segmentID], fileNode.trans, fileNode.readonly, DBFile.BytesFromPages[fileNode.pagesInitial], DBFile.BytesFromPages[fileNode.pagesPerExtent]] }; FinishTransaction: PUBLIC PROC [t: Trans, abort: BOOL, continue: BOOL] = { -- Close or abort the transaction, and set file to NIL for file nodes under this transaction. IF NOT abort AND NOT WriteoutCache[t] THEN ERROR; DBFile.FinishTransaction[t, abort, continue]; IF abort OR NOT continue THEN { AbortCache[t]; FOR f: FileNode_ files, f.rest UNTIL f=NIL DO IF f.trans = t THEN {f.openFile _ NIL; f.trans _ NIL} ENDLOOP; }; }; SegmentFromTID: PUBLIC PROC [tid: TID] RETURNS [s: Segment] = { fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]]; IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL]; }; IsValidTID: PUBLIC PROC [tid: TID] RETURNS [valid: BOOL] = { fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]]; RETURN [fileNode # NIL AND fileNode.openFile # NIL]; }; SegmentIDFromSegment: PUBLIC PROC [s: Segment] RETURNS [SegmentID] = { fileNode: FileNode = FindSegment[s]; IF fileNode = NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared]; IF fileNode.openFile = NIL THEN ERROR DBEnvironment.Error[TransactionNotOpen]; RETURN [fileNode.segmentID]; }; EnumerateSegments: PUBLIC PROC [ enumProc: PROC [s: Segment, segmentIndex: SegmentIndex] RETURNS [stop: BOOL]] = { FOR f: FileNode _ files, f.rest UNTIL f=NIL DO IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT; ENDLOOP; }; FilePageFromDBPage: PROC [dbPage: DBPage] RETURNS [readonly: BOOL, file: DBCommon.OpenFileHandle, page: CARDINAL] = { segmentID: SegmentID; fileNode: FileNode; [segmentID, page] _ DecomposeDBPage[dbPage]; fileNode _ FindSegmentID[segmentID]; IF fileNode = NIL OR (file _ fileNode.openFile) = NIL THEN ERROR DBEnvironment.Error[TransactionNotOpen]; RETURN [fileNode.readonly, file, page]; }; -- Procs that understand the format of the segment head page. InitializeSegment: PROC [fileNode: FileNode] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile]; fileNode.readonly _ FALSE; [segmentHeadHandle, segmentHead] _ DBSegment.WritePage[fileNode.segmentID, NIL]; segmentHead^ _ [ segmentID: fileNode.segmentID, allocator: [ firstPageInBlock: ConsDBPage[fileNode.segmentID, 1], nPagesInBlock: fileSize-1, pagesPerExtent: fileNode.pagesPerExtent]]; ConvertUnsafe.AppendRope[ to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]]; [segmentHead.index1, segmentHead.index2] _ createIndexesProc[fileNode.segment]; DBSegment.UnlockPage[segmentHeadHandle]; }; CheckSegment: PROC [fileNode: FileNode] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; [segmentHeadHandle, segmentHead] _ DBSegment.ReadPage[fileNode.segmentID, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap OR segmentHead.softwareCompatibilityVersion # DBCommon.softwareCompatibilityVersion OR segmentHead.seal # DBSegmentPrivate.Seal THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; IF NOT Rope.Equal[ s1: Atom.GetPName[fileNode.segment], s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]], case: FALSE] THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; DBSegment.UnlockPage[segmentHeadHandle]; }; RootIndicesFromSegment: PUBLIC PROC [s: Segment] RETURNS [index1, index2: TID] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; fileNode: FileNode = FindSegment[s]; IF fileNode=NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared]; [segmentHeadHandle, segmentHead] _ DBSegment.ReadPage[fileNode.segmentID, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; index1 _ segmentHead.index1; index2 _ segmentHead.index2; DBSegment.UnlockPage[segmentHeadHandle]; RETURN [index1, index2]; }; AllocPage: PUBLIC PROC [s: DBPage] RETURNS [allocatedPage: DBPage, allocatedPageHandle: CacheHandle, allocatedPagePtr: LONG POINTER] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; DBStats.Inc[SegmentAllocPage]; [segmentHeadHandle, segmentHead] _ DBSegment.WritePage[s, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.Error[MismatchedExistingSegment]; IF (allocatedPage _ segmentHead.allocator.freeList) # NullDBPage THEN { -- The segment's free list is nonempty, so return a page from it. allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage; [allocatedPageHandle, allocatedFreeListPagePtr] _ DBSegment.WritePage[allocatedPage, NIL]; IF allocatedFreeListPagePtr.tag # DBStoragePagetags.Free THEN ERROR DBEnvironment.InternalError; segmentHead.allocator.freeList _ allocatedFreeListPagePtr.next; allocatedPagePtr _ allocatedFreeListPagePtr; } ELSE { -- The segment's free list is empty, so allocate from block allocator, perhaps extending file. IF segmentHead.allocator.firstPageInBlock = NullDBPage THEN { fileNode: FileNode _ FindSegmentID[s]; size: CARDINAL _ DBFile.GetSize[fileNode.openFile]; DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent]; segmentHead.allocator.firstPageInBlock _ ConsDBPage[s, size]; segmentHead.allocator.nPagesInBlock _ segmentHead.allocator.pagesPerExtent; }; allocatedPage _ segmentHead.allocator.firstPageInBlock; IF (segmentHead.allocator.nPagesInBlock _ segmentHead.allocator.nPagesInBlock-1) = 0 THEN segmentHead.allocator.firstPageInBlock _ NullDBPage ELSE segmentHead.allocator.firstPageInBlock _ allocatedPage + DBStorageTID.DBPageIncrement; [allocatedPageHandle, allocatedPagePtr] _ NewPage[allocatedPage]; }; DBSegment.UnlockPage[segmentHeadHandle]; LOOPHOLE[allocatedPagePtr, LONG POINTER TO DBStoragePagetags.PageHeader].pageTag _ DBStoragePagetags.Unused; RETURN[allocatedPage, allocatedPageHandle, allocatedPagePtr]; };--AllocPage FreePage: PUBLIC PROC [s: DBPage, freePg: DBPage, freeHint: CacheHandle] = { segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage; segmentHeadHandle: CacheHandle; free: LONG POINTER TO DBSegmentPrivate.FreeListPage; DBStats.Inc[SegmentFreePage]; IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBEnvironment.InternalError; [segmentHeadHandle, segmentHead] _ DBSegment.WritePage[s, NIL]; IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError; IF DBSegment.LockCount[freePg, freeHint] # 1 THEN ERROR DBEnvironment.InternalError; free _ DBSegment.ObtainCoreLoc[freePg, freeHint]; IF free.tag = DBStoragePagetags.Free OR free.tag = DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError; DBSegment.WriteLockedPage[freeHint]; free^ _ [next: segmentHead.allocator.freeList]; segmentHead.allocator.freeList _ freePg; DBSegment.UnlockPage[freeHint]; DBSegment.UnlockPage[segmentHeadHandle]; };--FreePage -- Procs that understand the bit layout of a DBPage. SegmentIDFromSegmentIndex: PROC [segmentIndex: SegmentIndex] RETURNS [SegmentID] = { segmentID: Basics.LongNumber.num = [num[lowbits: 0, highbits: Basics.BITSHIFT[value: segmentIndex, count: 7]]]; RETURN [LOOPHOLE[segmentID]]; }; SegmentIndexFromSegmentID: PROC [segmentID: SegmentID] RETURNS [SegmentIndex] = { RETURN [Basics.BITSHIFT[ value: LOOPHOLE[segmentID, Basics.LongNumber.num].highbits, count: -7]]; }; SegmentIDFromDBPage: PUBLIC PROC [dbPage: DBPage] RETURNS [SegmentID] = { RETURN [InlineSegmentIDFromDBPage[dbPage]] }; InlineSegmentIDFromDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID] = INLINE { segmentID: Basics.LongNumber.num = [num[lowbits: 0, highbits: Basics.BITAND[LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, 177600B]]]; RETURN [LOOPHOLE[segmentID]]; }; DecomposeDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID, CARDINAL] = { segmentID1: Basics.LongNumber.num = [num[lowbits: 0, highbits: Basics.BITAND[LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, 177600B]]]; filePage: CARDINAL _ Basics.BITOR[ Basics.BITSHIFT[ value: LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, count: 10], Basics.BITSHIFT[ value: LOOPHOLE[dbPage, Basics.LongNumber.num].lowbits, count: -6]]; RETURN [LOOPHOLE[segmentID1], filePage]; }; ConsDBPage: PROC [segmentID: SegmentID, filePage: CARDINAL] RETURNS [dbPage: DBPage] = { -- Inverse of DecomposeDBPage dbPage1: Basics.LongNumber.num = [num[ highbits: Basics.BITOR[ LOOPHOLE[segmentID, Basics.LongNumber.num].highbits, Basics.BITSHIFT[value: filePage, count: -10]], lowbits: Basics.BITSHIFT[value: filePage, count: 6]]]; RETURN[LOOPHOLE[dbPage1]]; }; Hash: PROC [dbPage: DBPage] RETURNS [NAT] = INLINE { RETURN [ Basics.BITAND[ hashMask, Basics.BITXOR[ Basics.BITSHIFT[ value: LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, count: -7], Basics.BITSHIFT[ value: LOOPHOLE[dbPage, Basics.LongNumber.num].lowbits, count: -6]]]]; }; -- Buffer manager invalidKey: DBPage = 37777777777B; -- doesn't match any true DBPage (since all DBPages have low bits 0). InitializeBufferMgr: PROC [] = { -- Number of hash buckets = smallest power of 2 not less than cacheSize. hashBuckets: CARDINAL _ 1; temp: CARDINAL _ Basics.BITSHIFT[cacheSize, -1]; UNTIL temp=0 DO hashBuckets _ Basics.BITSHIFT[hashBuckets, 1]; temp _ Basics.BITSHIFT[temp, -1]; ENDLOOP; ht _ NEW[HashTable[hashBuckets] _ [nItems: 0, buckets: ]]; hashMask _ hashBuckets-1; -- Create cacheRecords, format them into an lru list. lruList _ NEW[CacheRecord _ [dbPage: invalidKey, pagePtr: NIL]]; lruList.pred _ lruList.succ _ lruList; FOR i: NAT IN [0 .. (cacheSize+15)/16) DO AddCachePages[16]; ENDLOOP; }; ReadPage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = { file: DBCommon.OpenFileHandle; page: CARDINAL; IF logReads THEN DBCommon.GetDebugStream[].PutChar['#]; DBStats.Inc[CacheReadOrWrite]; -- IF p=noteThisPage THEN SIGNAL TakeALookAtThis; IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage; -- hint is wrong, so search hash table pHint _ Lookup[p]; IF pHint # NIL THEN GO TO ReturnPage; -- page is not present in cache, read it in IF logReads THEN DBCommon.GetDebugStream[].PutChar['!]; DBStats.Inc[CacheMiss]; pHint _ GetUnoccupiedPage[]; { -- We expect ERRORs to be raised from DBFile.ReadFilePage [pHint.readonly, file, page] _ FilePageFromDBPage[p]; DBFile.ReadFilePage[file, page, pHint.pagePtr] }; pHint.dbPage _ p; Insert[pHint]; pHint.lockCount _ 0; GOTO ReturnPage; EXITS ReturnPage => { MakeMRU[pHint]; pHint.lockCount _ pHint.lockCount + 1; RETURN[pHint, pHint.pagePtr] } }; WritePage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = { DBStats.Inc[CacheReadOrWrite]; -- IF p=noteThisPage THEN SIGNAL TakeALookAtThis; IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage; -- hint is wrong, so search hash table pHint _ Lookup[p]; IF pHint # NIL THEN GOTO ReturnPage; -- page is not present in cache, read it in DBStats.Inc[CacheMiss]; pHint _ GetUnoccupiedPage[]; { -- We expect ERRORs to be raised from DBFile.ReadFilePage file: DBCommon.OpenFileHandle; page: CARDINAL; [pHint.readonly, file, page] _ FilePageFromDBPage[p]; IF pHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed]; DBFile.ReadFilePage[file, page, pHint.pagePtr] }; pHint.dbPage _ p; Insert[pHint]; pHint.lockCount _ 0; GOTO ReturnPage; EXITS ReturnPage => { MakeMRU[pHint]; pHint.lockCount _ pHint.lockCount + 1; pHint.written _ TRUE; RETURN[pHint, pHint.pagePtr] } }; NewPage: PROC [p: DBPage] RETURNS [CacheHandle, LONG POINTER] = { pHint: CacheHandle _ Lookup[p]; IF pHint # NIL THEN ERROR DBEnvironment.InternalError; -- [NotaNewPage] pHint _ GetUnoccupiedPage[]; { [pHint.readonly, , ] _ FilePageFromDBPage[p]; IF pHint.readonly THEN DBEnvironment.Error[WriteNotAllowed]; }; pHint.dbPage _ p; Insert[pHint]; MakeMRU[pHint]; pHint.lockCount _ 1; pHint.written _ TRUE; RETURN[pHint, pHint.pagePtr] }; WriteLockedPage: PUBLIC PROC [pValidHint: CacheHandle] = { IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked] IF pValidHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed]; pValidHint.written _ TRUE; }; LockCount: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [CARDINAL] = { IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- BadCacheHandle RETURN[pValidHint.lockCount]; }; ObtainCoreLoc: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [LONG POINTER] = { IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [BadCacheHandle]; RETURN[pValidHint.pagePtr]; }; UnlockPage: PUBLIC PROC [pValidHint: CacheHandle] = { IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked]; pValidHint.lockCount _ pValidHint.lockCount - 1; }; WriteoutCache: PROC [t: Trans] RETURNS [success: BOOL] = { WritePageOutIfUsesTrans: PROC [cacheHandle: CacheHandle] = { IF cacheHandle.written THEN { fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[cacheHandle.dbPage]]; IF fileNode.trans = t THEN WritePageOut[cacheHandle]; }; }; EnumerateHashTable[WritePageOutIfUsesTrans]; RETURN[TRUE]; }; AbortCache: PROC [t: Trans] = { DeleteIfUsesTrans: PROC [cacheHandle: CacheHandle] = { IF cacheHandle.occupied THEN { fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[cacheHandle.dbPage]]; IF fileNode.trans = t THEN Delete[cacheHandle]; }; }; EnumerateLRUList[DeleteIfUsesTrans]; }; WritePageOut: PROC [cacheHandle: CacheHandle] = { -- Send the contents of a dirty cache page to the database segment, and mark -- the page no longer dirty (leaving it in the cache). readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL; DBStats.Inc[CacheWritePageBack]; [readonly, file, page] _ FilePageFromDBPage[cacheHandle.dbPage]; IF readonly THEN -- The following signals should never happen, if Mark's code works as I believe it does... SIGNAL DBEnvironment.Error[WriteNotAllowed] ELSE DBFile.WriteFilePage[file, page, cacheHandle.pagePtr]; cacheHandle.written _ FALSE; }; -- Buffer manager LRU list AddCachePages: PROC [nPages: NAT] = { -- Creates nPages new cache records, in the LRU position. curCachePage: LONG POINTER _ VM.AddressForPageNumber[ VM.Allocate[count: nPages*DBCommon.PagesPerDBPage, in64K: TRUE].page]; FOR i: NAT IN [0..nPages) DO p: CacheHandle _ NEW[CacheRecord _ [ dbPage: invalidKey, pagePtr: curCachePage]]; curCachePage _ curCachePage + DBCommon.WordsPerPage; p.pred _ lruList.pred; p.succ _ lruList; lruList.pred.succ _ p; lruList.pred _ p; ENDLOOP; }; MakeMRU: PROC [x: CacheHandle] = { -- Assumes that x is part of lruList. Deletes x from its current position and makes -- it first in lruList. x.pred.succ _ x.succ; x.succ.pred _ x.pred; x.succ _ lruList.succ; x.pred _ lruList; lruList.succ.pred _ x; lruList.succ _ x; }; EnumerateLRUList: PROC [procToCall: PROC[CacheHandle]] = { -- Calls procToCall for each record in LRUList, in MRU to LRU order. -- procToCall may not perform MakeMRU; use EnumerateHashTable if you want this. -- Note that procToCall will be called for both occupied and unoccupied pages. FOR p: CacheHandle _ lruList.succ, p.succ UNTIL p = lruList DO procToCall[p]; ENDLOOP; }; GetUnoccupiedPage: PROC RETURNS [CacheHandle] = { -- Obtain an empty cache page. It will be the least-recently used page that is -- unoccupied or occupied but unlocked. The page is returned with Key = garbage, -- Page = ptr to a cache page, occupied = FALSE, written = FALSE, -- and entered on lruList in no particular location. cacheHandle: CacheHandle; FOR cacheHandle _ lruList.pred, cacheHandle.pred UNTIL cacheHandle = lruList DO IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage; IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied; REPEAT FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied MakePageUnoccupied => { -- cacheHandle.occupied AND cacheHandle.lockCount=0 IF cacheHandle.written THEN WritePageOut[cacheHandle]; Delete[cacheHandle]; }; FINISHED => ERROR DBEnvironment.Failure[$allCachePagesLocked] ENDLOOP; cacheHandle.written _ FALSE; RETURN[cacheHandle]; }; -- Buffer manager hash table EnumerateHashTable: PROC[procToCall: PROC[CacheHandle]] = { -- Calls procToCall for each record in table. -- procToCall may not perform Delete; use EnumerateLRUList if you want this. FOR i: NAT IN [0..ht.size) DO FOR p: CacheHandle _ ht.buckets[i], p.hashChain UNTIL p = NIL DO procToCall[p]; ENDLOOP; ENDLOOP; }; Lookup: PROC[dbPage: DBPage] RETURNS[CacheHandle] = { -- Look the dbPage up in the hash table for the cache. Return the CacheHandle if -- it is found, else return NIL. i: CARDINAL _ Hash[dbPage]; p: CacheHandle _ ht.buckets[i]; DBStats.Inc[CacheHTLookup]; WHILE p # NIL DO IF p.dbPage = dbPage THEN RETURN[p]; DBStats.Inc[CacheHTConflictInLookup]; p _ p.hashChain; ENDLOOP; IF dbPage=noteThisPage THEN SIGNAL TakeALookAtThis; RETURN[NIL]; }; Insert: PROC[x: CacheHandle] = { -- Add the CacheHandle x to the hash table for the cache. Generate InternalError if -- a handle with the same dbPage is already there. i: CARDINAL _ Hash[x.dbPage]; -- Make sure its not there already... p: CacheHandle _ ht.buckets[i]; WHILE p # NIL DO IF p.dbPage = x.dbPage THEN GOTO FoundDuplicate; p _ p.hashChain; REPEAT FoundDuplicate => ERROR DBEnvironment.InternalError; -- [AlreadyInHashTable]; FINISHED => { x.hashChain _ ht.buckets[i]; ht.buckets[i] _ x; ht.nItems _ ht.nItems + 1; x.occupied _ TRUE; IF x.dbPage=noteThisPage THEN SIGNAL TakeALookAtThis; }; ENDLOOP; }; Delete: PROC[x: CacheHandle] = { -- Delete the CacheHandle x from the hash table for the cache. i: CARDINAL _ Hash[x.dbPage]; p: CacheHandle _ ht.buckets[i]; IF p = x THEN {ht.buckets[i] _ x.hashChain; GOTO Deleted}; WHILE p # NIL DO IF p.hashChain = x THEN {p.hashChain _ x.hashChain; GOTO Deleted}; p _ p.hashChain; REPEAT FINISHED => ERROR DBEnvironment.InternalError; -- [NotPresentInHashTable]; ENDLOOP; EXITS Deleted => { IF x.dbPage=noteThisPage THEN SIGNAL TakeALookAtThis; x.dbPage _ invalidKey; x.occupied _ FALSE; x.hashChain _ NIL; ht.nItems _ ht.nItems - 1; }; }; -- Buffer manager debugging AssertionFailed: SIGNAL = CODE; Assert: PROC [assertion: BOOL] = {IF ~assertion THEN SIGNAL AssertionFailed; }; -- So we can Proceed (in debugger) from failure. CheckState: PUBLIC PROC [doPrinting, printOnlyLockedPages: BOOL] = { nOccupiedItems, checkOccupiedItems: CARDINAL _ 0; nLockedItems: CARDINAL _ 0; nFreeItems: CARDINAL _ 0; PageHeader: TYPE = MACHINE DEPENDENT RECORD[ pageTag: CARDINAL [0..256), dontCare: CARDINAL [0..256)]; IOref: PROC[p: REF ANY] RETURNS[IO.Value] = INLINE { RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]}; IOptr: PROC[p: LONG POINTER] RETURNS[IO.Value] = INLINE { RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]}; out: IO.STREAM _ IF doPrinting THEN DBCommon.GetDebugStream[] ELSE NIL; CheckEntry1: PROC[p: CacheHandle] = { Assert[p.pred.succ = p]; Assert[p.succ.pred = p]; Assert[p.pagePtr # NIL]; IF ~p.occupied THEN { nFreeItems _ nFreeItems + 1; Assert[p.dbPage = invalidKey]; RETURN }; Assert[p.dbPage # invalidKey]; nOccupiedItems _ nOccupiedItems + 1; IF p.lockCount > 0 THEN nLockedItems _ nLockedItems + 1; IF p.written THEN Assert[NOT p.readonly]; IF ~doPrinting OR (p.lockCount = 0 AND printOnlyLockedPages) THEN RETURN; {pageTag: CARDINAL = LOOPHOLE[p.pagePtr, LONG POINTER TO PageHeader].pageTag; out.PutF["%11bB %11bB %11bB %3d", IOref[p], IO.card[p.dbPage], IOptr[p.pagePtr], IO.card[pageTag]]; out.PutF[" %3d %g %g*n", IO.card[p.lockCount], IO.bool[p.written], IO.bool[p.readonly]]; }; }; CheckEntry2: PROC[p: CacheHandle] = { Assert[p.occupied]; checkOccupiedItems _ checkOccupiedItems + 1; }; IF doPrinting THEN { out.PutF["Cache size %g pages.\n", IO.card[cacheSize]]; out.PutRope[ " cacheHdl dbPage pagePtr tag lockCnt written readonly\n"]; }; EnumerateLRUList[CheckEntry1]; EnumerateHashTable[CheckEntry2]; IF doPrinting THEN out.PutF["%g pages occupied, %g are locked\n", IO.card[nOccupiedItems], IO.card[nLockedItems]]; Assert[nOccupiedItems = checkOccupiedItems]; Assert[ht.nItems = checkOccupiedItems]; Assert[nFreeItems + nOccupiedItems = cacheSize]; }; END.--DBSegmentImpl CHANGE LOG Created by MBrown on December 10, 1979 5:09 PM Changed by MBrown on December 1, 1982 9:41 am -- Total rewrite for multiple segments, merging in DBCacheImpl. No more support for module --finalization. Discarded >3 pages of change log. Changed by Cattell on January 16, 1983 2:35 pm -- Added some more meaningful signals for various client errors for segments and transactions, --in conjunction with new DBEnvironment interface replacing DBException. Changed by Cattell on January 17, 1983 8:21 pm -- AttachSegment didn't put readOnly and version into fileNode when segment re-declared, so couldn't re-open transaction with version to create new file. Changed by Cattell on January 19, 1983 10:57 am -- The code didn't check for FindFile returning NIL in a couple places, resulting in address faults if the segment was not declared. Put in checks and signal generation. Changed by Willie-Sue on February 3, 1983 -- Added noLog arg to OpenTransaction and OpenSegment Changed by MBrown on February 25, 1983 1:39 pm -- Fixed bug in ReadPage, WritePage, NewPage: buffer left in "occupied" state after --error (e.g. attempt to read after transaction abort). Changed by Cattell & Donahue on May 19, 1983 2:26 pm -- Changed AttachSegment so can re-call it with new file name for same segment atom. Changed by Cattell on July 11, 1983 3:46 pm -- Pass readOnly bit through to DBFileAlpine. Temporarily removed check of readOnly bit in WritePageOut, we've already warned user, and this error keeps us from closing segment after getting WriteNotAllowed. Changed by Cattell on September 14, 1983 11:43 am -- Removed the DBSegmentParticularTable stuff, which I'm sure Mark added with good intentions. We don't need a red black tree package to keep track of at most half a dozen segments! This change reduces the memory requirements and complexity of this module considerably. The fancy compilation required to do this confuses the spelling corrector and MakeConfig, to boot. J{{