DBSegmentImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Last Edited by:
Donahue, July 2, 1986 2:08:13 pm PDT
MBrown on February 25, 1983 1:49 pm
Cattell on September 16, 1983 11:13 am
Willie-Sue on April 25, 1985 12:45:53 pm PST
Wert, August 10, 1984 6:08:10 pm PDT
Widom, September 12, 1985 4:54:35 pm PDT
DIRECTORY
AlpineEnvironment,
Atom,
Basics,
ConvertUnsafe,
DB USING [Error, Failure],
DBCommon,
DBFile,
DBSegment,
DBSegmentPrivate,
DBStats,
DBStoragePage,
File,
IO,
Rope USING [ROPE, Equal],
VM USING [Allocate, AddressForPageNumber];
DBSegmentImpl: CEDAR PROGRAM
IMPORTS Atom, Basics, ConvertUnsafe, DB, DBCommon, DBFile, DBStats, IO, Rope, VM
EXPORTS DBSegment
= BEGIN
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Trans: TYPE = DBCommon.Transaction;
Segment: TYPE = DBCommon.Segment;
SegmentID: TYPE = DBCommon.SegmentID;
SegmentIndex: TYPE = DBCommon.SegmentIndex;
VersionOptions: TYPE = DBCommon.VersionOptions;
DBPage: TYPE = DBCommon.DBPage;
NullDBPage: DBPage = DBCommon.NullDBPage;
CacheHandle: TYPE = DBCommon.CacheHandle;
CacheRecord: TYPE = DBCommon.CacheRecord;
TID: TYPE = DBCommon.TID;
TakeALookAtThis: ERROR = CODE;
FileNode: TYPE = REF FileNodeRecord;
FileNodeRecord: TYPE = RECORD [
segmentID: DBCommon.DBPage,
segment: DBCommon.Segment ← NIL,
fileName: ROPENIL,
openFile: DBCommon.OpenFileHandle ← NIL,
if openFile = NIL, then the segment is not currently open
readonly: BOOLTRUE,
lock: AlpineEnvironment.LockOption ← [intendWrite, wait],
version: DBCommon.VersionOptions ← OldFileOnly,
pagesInitial, pagesPerExtent: NAT ← 0,
rest: FileNode ← NIL
];
HashTable: TYPE = RECORD [
nItems: NAT ← 0, -- number of CacheRecords in table
buckets: SEQUENCE size: NAT OF CacheHandle
];
Module state
initialized: BOOLFALSE;
createIndexesProc: PROC [s: Segment] RETURNS [ARRAY[0..DBCommon.systemIndexCount) OF TID];
logReads: BOOLFALSE; -- causes reads & writes to be logged on UserExec
noteThisPage: DBPage ← 0; -- causes signal when this page is read or written
files: FileNode ← NIL;
List mapping SegmentID to interesting information about a file.
cacheSize: NAT;
Number of pages allocated to cache.
ht: REF HashTable;
Fast map from DBPage to CacheHandle. ht.size is a power of two.
hashMask: CARDINAL;
Equals ht.size-1, used in computing hash function.
lruList: CacheHandle;
Points to header node of doubly-linked list of CacheRecords.
Data structure invariants: A cache record is present in lruList for every allocated cache page; there is no separate "free list". occupied=FALSE means that the page is free, and that the other cache record fields (readonly, written) are undefined. occupied=TRUE means that the page contains valid data; a page is stored in ht iff occupied=TRUE.
The lruList points to a cache record with no associated cache page; this "header node" makes the circular-list manipulations work properly. The most-recently used page is lruList.succ, and the least-recently used page is lruList.pred.
Initialize: PUBLIC PROC [
nCachePages: NAT,
useCacheFile: ROPE,
segmentInitProc: PROC [s: Segment] RETURNS [ARRAY[0..DBCommon.systemIndexCount) OF TID] ] = {
IF initialized THEN ERROR DBCommon.InternalError;
cacheSize ← nCachePages;
createIndexesProc ← segmentInitProc;
InitializeBufferMgr[];
initialized ← TRUE;
};
FindSegmentID: PROC[segmentID: DBCommon.DBPage] RETURNS[found: FileNode] = {
Just a linear search, we're not going to have a tremendous number of segments open.
FOR found ← files, found.rest UNTIL found=NIL DO
IF found.segmentID=segmentID THEN RETURN ENDLOOP};
FindSegment: PROC [s: Segment] RETURNS [found: FileNode] = {
Same as above, but looks for atom match instead of segment ID match. If efficiency of this ever becomes important, we can attach info to prop list of atom.
FOR found ← files, found.rest UNTIL found=NIL DO
IF found.segment=s THEN RETURN ENDLOOP };
AttachSegment: PUBLIC PROC [fileName: ROPE, s: Segment, segmentIndex: SegmentIndex, lock: AlpineEnvironment.LockOption, readonly: BOOL, version: VersionOptions, nPagesInitial, nPagesPerExtent: NAT] RETURNS[newAttachment: BOOL] = {
segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex];
fileNode: FileNode = FindSegmentID[segmentID];
IF NOT initialized THEN ERROR;
IF fileNode # NIL THEN {
IF fileNode.segmentID # segmentID OR fileNode.segment # s THEN
ERROR DB.Error[MismatchedExistingSegment];
newAttachment ← NOT Rope.Equal[fileNode.fileName, fileName];
fileNode.fileName ← fileName;
fileNode.version ← version;
fileNode.lock ← lock;
fileNode.readonly ← readonly;
fileNode.pagesInitial ← MAX[1, nPagesInitial];
fileNode.pagesPerExtent ← MAX[1, nPagesPerExtent];
RETURN;
};
files ← NEW[FileNodeRecord ← [
segmentID: segmentID,
segment: s,
lock: lock,
readonly: readonly,
version: version,
fileName: fileName,
pagesInitial: MAX[1, nPagesInitial],
pagesPerExtent: MAX[1, nPagesPerExtent],
rest: files
]];
newAttachment ← TRUE
};
OpenSegment: PUBLIC PROC [s: Segment, trans: Trans, eraseAfterOpening: BOOL] = {
ENABLE UNWIND => { CloseSegment[s]; FlushCache[s] };
createdFile: BOOL;
fileNode: FileNode = FindSegment[s];
[fileNode.openFile, createdFile] ← DBFile.OpenFile[t: trans, file: fileNode.fileName, version: fileNode.version, discardFileContents: eraseAfterOpening, nPagesInitial: fileNode.pagesInitial, lock: fileNode.lock, readOnly: fileNode.readonly];
IF createdFile OR eraseAfterOpening THEN InitializeSegment[fileNode]
ELSE CheckSegment[fileNode] };
CloseSegment: PUBLIC PROC [s: Segment] ~ {
fileNode: FileNode = FindSegment[s];
IF fileNode # NIL AND fileNode.openFile # NIL THEN DBFile.Close[fileNode.openFile];
fileNode.openFile ← NIL };
GetSegmentInfo: PUBLIC PROC [s: Segment] RETURNS [filePath: ROPE, readOnly: BOOL] = {
fileNode: FileNode = FindSegment[s];
IF fileNode=NIL THEN RETURN[NIL, FALSE]
ELSE RETURN[fileNode.fileName, fileNode.readonly]
};
GetVersionStamp: PUBLIC PROC [segment: Segment] RETURNS[schemaVersion: INT] = TRUSTED {
Returns the current schema version stamp for the given segment.
fileNode: FileNode = FindSegment[segment];
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
schemaVersion ← segmentHead.schemaVersionStamp;
UnlockPage[segmentHeadHandle]
};
SetVersionStamp: PUBLIC PROC [segment: Segment, to: INT] = TRUSTED {
Called when the schema for the given segment is changed; updates the schema version stamp.
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileNode: FileNode = FindSegment[segment];
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
IF fileNode.readonly THEN ERROR DB.Error[WriteNotAllowed];
[segmentHeadHandle, segmentHead] ← WritePage[fileNode.segmentID, NIL];
segmentHead.schemaVersionStamp ← to;
UnlockPage[segmentHeadHandle] };
SegmentFromTID: PUBLIC PROC [tid: TID] RETURNS [s: Segment] = {
fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]];
IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL];
};
IsValidTID: PUBLIC PROC [tid: TID] RETURNS [valid: BOOL] = {
fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]];
RETURN [fileNode # NIL AND fileNode.openFile # NIL];
};
SegmentIDFromSegment: PUBLIC PROC [s: Segment] RETURNS [SegmentID] = {
fileNode: FileNode = FindSegment[s];
IF fileNode = NIL THEN ERROR DB.Error[SegmentNotDeclared];
RETURN [fileNode.segmentID];
};
EnumerateSegments: PUBLIC PROC [enumProc: PROC [s: Segment, segmentIndex: SegmentIndex] RETURNS [stop: BOOL]] = {
FOR f: FileNode ← files, f.rest UNTIL f=NIL DO
IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT;
ENDLOOP;
};
FilePageFromDBPage: PROC [dbPage: DBPage] RETURNS [readonly: BOOL, file: DBCommon.OpenFileHandle, page: CARDINAL] = {
segmentID: SegmentID;
fileNode: FileNode;
[segmentID, page] ← DecomposeDBPage[dbPage];
fileNode ← FindSegmentID[segmentID];
IF fileNode = NIL OR (file ← fileNode.openFile) = NIL THEN ERROR DB.Error[TransactionNotOpen];
RETURN [fileNode.readonly, file, page];
};
Procs that understand the format of the segment head page.
InitializeSegment: PROC [fileNode: FileNode] = TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile];
fileNode.readonly ← FALSE;
[segmentHeadHandle, segmentHead] ← WritePage[fileNode.segmentID, NIL];
segmentHead^ ← [
segmentID: fileNode.segmentID,
allocator: [
firstPageInBlock: ConsDBPage[fileNode.segmentID, 1],
nPagesInBlock: fileSize-1,
pagesPerExtent: fileNode.pagesPerExtent],
softwareCompatibilityVersion: DBCommon.softwareCompatibilityVersion,
schemaVersionStamp: 1];
TRUSTED {
ConvertUnsafe.AppendRope[
to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]];
};
segmentHead.systemIndices ← createIndexesProc[fileNode.segment];
UnlockPage[segmentHeadHandle];
};
CheckSegment: PROC [fileNode: FileNode] = TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
If the segment is not readonly then we need to be able to write the head page (its schema version stamp and software compatibility version).
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePage.AMap OR
segmentHead.seal # DBSegmentPrivate.Seal THEN
ERROR DB.Error[MismatchedExistingSegment];
IF segmentHead.softwareCompatibilityVersion # DBCommon.softwareCompatibilityVersion THEN ERROR DB.Error[MismatchedExistingSegment];
TRUSTED {
IF NOT Rope.Equal[
s1: Atom.GetPName[fileNode.segment],
s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]],
case: FALSE] THEN ERROR DB.Error[MismatchedExistingSegment];
};
UnlockPage[segmentHeadHandle];
};
RootIndicesFromSegment: PUBLIC PROC [s: Segment] RETURNS [indices: ARRAY[0..DBCommon.systemIndexCount) OF TID] = TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileNode: FileNode = FindSegment[s];
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePage.AMap THEN
ERROR DB.Error[MismatchedExistingSegment];
indices ← segmentHead.systemIndices;
UnlockPage[segmentHeadHandle];
RETURN [indices];
};
AllocPage: PUBLIC PROC [s: SegmentID] RETURNS [p: DBPage, pValidHint: CacheHandle, cachePage: LONG POINTER] = TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
DBStats.Inc[SegmentAllocPage];
[segmentHeadHandle, segmentHead] ← WritePage[s, NIL];
IF segmentHead.tag # DBStoragePage.AMap THEN
ERROR DB.Error[MismatchedExistingSegment];
IF (p ← segmentHead.allocator.freeList) # NullDBPage THEN {
The segment's free list is nonempty, so return a page from it.
allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage;
[pValidHint, allocatedFreeListPagePtr] ← WritePage[p, NIL];
IF allocatedFreeListPagePtr.tag # DBStoragePage.Free THEN ERROR DBCommon.InternalError;
segmentHead.allocator.freeList ← allocatedFreeListPagePtr.next;
cachePage ← allocatedFreeListPagePtr;
}
ELSE {
The segment's free list is empty, so allocate from block allocator, perhaps extending file.
IF segmentHead.allocator.firstPageInBlock = NullDBPage THEN {
fileNode: FileNode ← FindSegmentID[s];
size: CARDINAL ← DBFile.GetSize[fileNode.openFile];
DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent];
segmentHead.allocator.firstPageInBlock ← ConsDBPage[s, size];
segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.pagesPerExtent;
};
p ← segmentHead.allocator.firstPageInBlock;
IF (segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.nPagesInBlock-1) = 0 THEN
segmentHead.allocator.firstPageInBlock ← NullDBPage
ELSE segmentHead.allocator.firstPageInBlock ←
p + DBCommon.DBPageIncrement;
[pValidHint, cachePage] ← NewPage[p];
};
UnlockPage[segmentHeadHandle];
TRUSTED {
LOOPHOLE[cachePage, LONG POINTER TO DBStoragePage.PageHeader].pageTag ←
DBStoragePage.Unused;
};
RETURN[p, pValidHint, cachePage];
};--AllocPage
FreePage: PUBLIC PROC [s: SegmentID, freePg: DBPage, freeHint: CacheHandle] = TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
free: LONG POINTER TO DBSegmentPrivate.FreeListPage;
DBStats.Inc[SegmentFreePage];
IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBCommon.InternalError;
[segmentHeadHandle, segmentHead] ← WritePage[s, NIL];
IF segmentHead.tag # DBStoragePage.AMap THEN ERROR DBCommon.InternalError;
IF LockCount[freePg, freeHint] # 1 THEN ERROR DBCommon.InternalError;
free ← ObtainCoreLoc[freePg, freeHint];
IF free.tag = DBStoragePage.Free OR free.tag = DBStoragePage.AMap THEN
ERROR DBCommon.InternalError;
WriteLockedPage[freeHint];
free^ ← [next: segmentHead.allocator.freeList];
segmentHead.allocator.freeList ← freePg;
UnlockPage[freeHint];
UnlockPage[segmentHeadHandle];
};--FreePage
Procs that understand the bit layout of a DBPage.
SegmentIDFromSegmentIndex: PROC[segmentIndex: SegmentIndex] RETURNS[SegmentID] = {
sid: SegmentID;
segmentID: Basics.LongNumber.num = [num[lowbits: 0,
highbits: Basics.BITSHIFT[value: segmentIndex, count: 7]]];
TRUSTED { sid ← LOOPHOLE[segmentID] };
RETURN [sid];
};
SegmentIndexFromSegmentID: PROC [segmentID: SegmentID] RETURNS [SegmentIndex] = TRUSTED {
RETURN [Basics.BITSHIFT[
value: LOOPHOLE[segmentID, Basics.LongNumber.num].highbits,
count: -7]];
};
SegmentIDFromDBPage: PUBLIC PROC [dbPage: DBPage] RETURNS [SegmentID] = {
RETURN [InlineSegmentIDFromDBPage[dbPage]]
};
InlineSegmentIDFromDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID] = TRUSTED INLINE {
segmentID: Basics.LongNumber.num = [num[lowbits: 0,
highbits: Basics.BITAND[LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, 177600B]]];
RETURN [LOOPHOLE[segmentID]];
};
DecomposeDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID, CARDINAL] = TRUSTED {
segmentID1: Basics.LongNumber.num = [num[lowbits: 0,
highbits: Basics.BITAND[LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, 177600B]]];
filePage: CARDINAL ← Basics.BITOR[
Basics.BITSHIFT[
value: LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, count: 10],
Basics.BITSHIFT[
value: LOOPHOLE[dbPage, Basics.LongNumber.num].lowbits, count: -6]];
RETURN [LOOPHOLE[segmentID1], filePage];
};
ConsDBPage: PROC [segmentID: SegmentID, filePage: CARDINAL] RETURNS [dbPage: DBPage] = TRUSTED {
Inverse of DecomposeDBPage
dbPage1: Basics.LongNumber.num = [num[
highbits: Basics.BITOR[
LOOPHOLE[segmentID, Basics.LongNumber.num].highbits,
Basics.BITSHIFT[value: filePage, count: -10]],
lowbits: Basics.BITSHIFT[value: filePage, count: 6]]];
RETURN[LOOPHOLE[dbPage1]];
};
Hash: PROC [dbPage: DBPage] RETURNS [NAT] = TRUSTED INLINE { RETURN [
Basics.BITAND[
hashMask,
Basics.BITXOR[
Basics.BITSHIFT[
value: LOOPHOLE[dbPage, Basics.LongNumber.num].highbits,
count: -7],
Basics.BITSHIFT[
value: LOOPHOLE[dbPage, Basics.LongNumber.num].lowbits,
count: -6]]]];
};
Buffer manager
invalidKey: DBPage = 37777777777B;
doesn't match any true DBPage (since all DBPages have low bits 0).
InitializeBufferMgr: PROC [] = {
Number of hash buckets = smallest power of 2 not less than cacheSize.
hashBuckets: CARDINAL ← 1;
temp: CARDINAL ← Basics.BITSHIFT[cacheSize, -1];
UNTIL temp=0 DO
hashBuckets ← Basics.BITSHIFT[hashBuckets, 1];
temp ← Basics.BITSHIFT[temp, -1];
ENDLOOP;
ht ← NEW[HashTable[hashBuckets] ← [nItems: 0, buckets: ]];
hashMask ← hashBuckets-1;
Create cacheRecords, format them into an lru list.
lruList ← NEW[CacheRecord ← [dbPage: invalidKey, pagePtr: NIL]];
lruList.pred ← lruList.succ ← lruList;
FOR i: NAT IN [0 .. (cacheSize+15)/16) DO
AddCachePages[16];
ENDLOOP;
};
ReadPage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = {
file: DBCommon.OpenFileHandle; page: CARDINAL;
IF logReads THEN
DBCommon.GetDebugStream[].PutChar['#];
DBStats.Inc[CacheReadOrWrite];
IF p=noteThisPage THEN ERROR TakeALookAtThis;
IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage;
hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GO TO ReturnPage;
page is not present in cache, read it in
IF logReads THEN DBCommon.GetDebugStream[].PutChar['!];
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{ -- We expect ERRORs to be raised from DBFile.ReadFilePage
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
RETURN[pHint, pHint.pagePtr]
}
};
WritePage: PUBLIC PROC [p: DBPage, pHint: CacheHandle] RETURNS [CacheHandle, LONG POINTER] = {
DBStats.Inc[CacheReadOrWrite];
IF p=noteThisPage THEN ERROR TakeALookAtThis;
IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage;
hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GOTO ReturnPage;
page is not present in cache, read it in
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{ -- We expect ERRORs to be raised from DBFile.ReadFilePage
file: DBCommon.OpenFileHandle; page: CARDINAL;
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
IF pHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
}
};
NewPage: PROC [p: DBPage] RETURNS [CacheHandle, LONG POINTER] = {
segmentID: SegmentID;
fileNode: FileNode;
pHint: CacheHandle ← Lookup[p];
IF pHint # NIL THEN ERROR DBCommon.InternalError; -- [NotaNewPage]
pHint ← GetUnoccupiedPage[];
{
[segmentID, ] ← DecomposeDBPage[p];
fileNode ← FindSegmentID[segmentID];
IF fileNode = NIL OR fileNode.openFile = NIL THEN ERROR DB.Error[TransactionNotOpen];
pHint.readonly ← fileNode.readonly;
IF pHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
};
pHint.dbPage ← p; Insert[pHint];
MakeMRU[pHint];
pHint.lockCount ← 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
};
WriteLockedPage: PUBLIC PROC [pValidHint: CacheHandle] = {
IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN
ERROR DBCommon.InternalError; -- [AlreadyUnlocked]
IF pValidHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
pValidHint.written ← TRUE;
};
LockCount: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [CARDINAL] = {
IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN
ERROR DBCommon.InternalError; -- BadCacheHandle
RETURN[pValidHint.lockCount];
};
ObtainCoreLoc: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle] RETURNS [LONG POINTER] = {
IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN
ERROR DBCommon.InternalError; -- [BadCacheHandle];
RETURN[pValidHint.pagePtr];
};
UnlockPage: PUBLIC PROC [pValidHint: CacheHandle] = {
IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN
ERROR DBCommon.InternalError; -- [AlreadyUnlocked];
pValidHint.lockCount ← pValidHint.lockCount - 1;
};
WriteOutCache: PUBLIC PROC [s: Segment] = {
segmentID: SegmentID = SegmentIDFromSegment[s];
WritePageOutIfUsesTrans: PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.written THEN {
thisSegment: SegmentID = InlineSegmentIDFromDBPage[cacheHandle.dbPage];
IF segmentID = thisSegment THEN WritePageOut[cacheHandle];
};
};
EnumerateHashTable[WritePageOutIfUsesTrans];
};
FlushCache: PUBLIC PROC [s: Segment] = {
segmentID: SegmentID = SegmentIDFromSegment[s];
DeleteIfUsesTrans: PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.occupied THEN {
thisSegment: SegmentID = InlineSegmentIDFromDBPage[cacheHandle.dbPage];
IF segmentID = thisSegment THEN Delete[cacheHandle];
};
};
EnumerateLRUList[DeleteIfUsesTrans]
};
WritePageOut: PROC [cacheHandle: CacheHandle] = {
Send the contents of a dirty cache page to the database segment, and mark the page no longer dirty (leaving it in the cache).
readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL;
DBStats.Inc[CacheWritePageBack];
[readonly, file, page] ← FilePageFromDBPage[cacheHandle.dbPage];
IF readonly THEN ERROR DB.Error[WriteNotAllowed];
DBFile.WriteFilePage[file, page, cacheHandle.pagePtr];
cacheHandle.written ← FALSE;
};
Buffer manager LRU list
AddCachePages: PROC [nPages: NAT] = TRUSTED {
Creates nPages new cache records, in the LRU position.
curCachePage: LONG POINTERVM.AddressForPageNumber[
VM.Allocate[count: nPages*DBCommon.PagesPerDBPage, in64K: TRUE].page];
FOR i: NAT IN [0..nPages) DO
p: CacheHandle ← NEW[CacheRecord ← [
dbPage: invalidKey, pagePtr: curCachePage]];
curCachePage ← curCachePage + DBCommon.WordsPerPage;
p.pred ← lruList.pred; p.succ ← lruList;
lruList.pred.succ ← p; lruList.pred ← p;
ENDLOOP;
};
MakeMRU: PROC [x: CacheHandle] = {
Assumes that x is part of lruList. Deletes x from its current position and makes it first in lruList.
x.pred.succ ← x.succ; x.succ.pred ← x.pred;
x.succ ← lruList.succ; x.pred ← lruList;
lruList.succ.pred ← x; lruList.succ ← x;
};
EnumerateLRUList: PROC [procToCall: PROC[CacheHandle]] = {
Calls procToCall for each record in LRUList, in MRU to LRU order. procToCall may not perform MakeMRU; use EnumerateHashTable if you want this. Note that procToCall will be called for both occupied and unoccupied pages.
FOR p: CacheHandle ← lruList.succ, p.succ UNTIL p = lruList DO
procToCall[p];
ENDLOOP;
};
GetUnoccupiedPage: PROC RETURNS [CacheHandle] = {
Obtain an empty cache page. It will be the least-recently used page that is unoccupied or occupied but unlocked. The page is returned with Key = garbage, Page = ptr to a cache page, occupied = FALSE, written = FALSE, and entered on lruList in no particular location.
cacheHandle: CacheHandle;
FOR cacheHandle ← lruList.pred, cacheHandle.pred UNTIL cacheHandle = lruList DO
IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage;
IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied;
REPEAT
FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied
MakePageUnoccupied => { -- cacheHandle.occupied AND cacheHandle.lockCount=0
IF cacheHandle.written THEN WritePageOut[cacheHandle ! DB.Failure => CONTINUE];
If the write fails, then there's no harm in releasing the page, since the transaction under which the file was opened can no longer be used
Delete[cacheHandle];
};
FINISHED => ERROR DB.Failure[$allCachePagesLocked, "no empty cache pages"]
ENDLOOP;
cacheHandle.written ← FALSE;
RETURN[cacheHandle];
};
Buffer manager hash table
EnumerateHashTable: PROC[procToCall: PROC[CacheHandle]] = {
Calls procToCall for each record in table. procToCall may not perform Delete; use EnumerateLRUList if you want this.
FOR i: NAT IN [0..ht.size) DO
FOR p: CacheHandle ← ht.buckets[i], p.hashChain UNTIL p = NIL DO
procToCall[p];
ENDLOOP;
ENDLOOP;
};
Lookup: PROC[dbPage: DBPage] RETURNS[CacheHandle] = {
Look the dbPage up in the hash table for the cache. Return the CacheHandle if it is found, else return NIL.
i: CARDINAL ← Hash[dbPage];
p: CacheHandle ← ht.buckets[i];
DBStats.Inc[CacheHTLookup];
WHILE p # NIL DO
IF p.dbPage = dbPage AND p.occupied THEN RETURN[p];
DBStats.Inc[CacheHTConflictInLookup];
p ← p.hashChain;
ENDLOOP;
IF dbPage=noteThisPage THEN ERROR TakeALookAtThis;
RETURN[NIL];
};
Insert: PROC[x: CacheHandle] = {
Add the CacheHandle x to the hash table for the cache. Generate DBCommon.InternalError if a handle with the same dbPage is already there.
i: CARDINAL ← Hash[x.dbPage];
Make sure its not there already...
p: CacheHandle ← ht.buckets[i];
WHILE p # NIL DO
IF p.dbPage = x.dbPage THEN GOTO FoundDuplicate;
p ← p.hashChain;
REPEAT
FoundDuplicate => ERROR DBCommon.InternalError; -- [AlreadyInHashTable];
FINISHED => {
x.hashChain ← ht.buckets[i];
ht.buckets[i] ← x;
ht.nItems ← ht.nItems + 1;
x.occupied ← TRUE;
IF x.dbPage=noteThisPage THEN ERROR TakeALookAtThis;
};
ENDLOOP;
};
Delete: PROC[x: CacheHandle] = {
Delete the CacheHandle x from the hash table for the cache.
i: CARDINAL ← Hash[x.dbPage];
p: CacheHandle ← ht.buckets[i];
IF p = x THEN {ht.buckets[i] ← x.hashChain; GOTO Deleted};
WHILE p # NIL DO
IF p.hashChain = x THEN {p.hashChain ← x.hashChain; GOTO Deleted};
p ← p.hashChain;
REPEAT
FINISHED => ERROR DBCommon.InternalError; -- [NotPresentInHashTable];
ENDLOOP;
EXITS
Deleted => {
IF x.dbPage=noteThisPage THEN ERROR TakeALookAtThis;
x.dbPage ← invalidKey;
x.occupied ← FALSE;
x.hashChain ← NIL;
ht.nItems ← ht.nItems - 1;
};
};
Buffer manager debugging
AssertionFailure: ERROR = CODE;
Assert: PROC [assertion: BOOL] = {IF ~assertion THEN ERROR AssertionFailure; };
So we can Proceed (in debugger) from failure.
CheckState: PUBLIC PROC [doPrinting, printOnlyLockedPages: BOOL] = {
nOccupiedItems, checkOccupiedItems: CARDINAL ← 0;
nLockedItems: CARDINAL ← 0;
nFreeItems: CARDINAL ← 0;
PageHeader: TYPE = MACHINE DEPENDENT RECORD[
pageTag: CARDINAL [0..256),
dontCare: CARDINAL [0..256)];
IOref: PROC[p: REF ANY] RETURNS[IO.Value] = INLINE {
RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]};
IOptr: PROC[p: LONG POINTER] RETURNS[IO.Value] = INLINE {
RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]};
out: IO.STREAMIF doPrinting THEN DBCommon.GetDebugStream[] ELSE NIL;
CheckEntry1: PROC[p: CacheHandle] = {
Assert[p.pred.succ = p];
Assert[p.succ.pred = p];
Assert[p.pagePtr # NIL];
IF ~p.occupied THEN {
nFreeItems ← nFreeItems + 1;
Assert[p.dbPage = invalidKey];
RETURN };
Assert[p.dbPage # invalidKey];
nOccupiedItems ← nOccupiedItems + 1;
IF p.lockCount > 0 THEN nLockedItems ← nLockedItems + 1;
IF p.written THEN Assert[NOT p.readonly];
IF ~doPrinting OR (p.lockCount = 0 AND printOnlyLockedPages) THEN RETURN;
TRUSTED {
pageTag: CARDINAL = LOOPHOLE[p.pagePtr, LONG POINTER TO PageHeader].pageTag;
out.PutF["%11bB %11bB %11bB %3d",
IOref[p], IO.card[p.dbPage], IOptr[p.pagePtr], IO.card[pageTag]];
out.PutF[" %3d %g %g*n",
IO.card[p.lockCount], IO.bool[p.written]];
};
};
CheckEntry2: PROC[p: CacheHandle] = {
Assert[p.occupied];
checkOccupiedItems ← checkOccupiedItems + 1;
};
IF doPrinting THEN {
out.PutF["Cache size %g pages.\n", IO.card[cacheSize]];
out.PutRope[
" cacheHdl dbPage pagePtr tag lockCnt written readonly\n"];
};
EnumerateLRUList[CheckEntry1];
EnumerateHashTable[CheckEntry2];
IF doPrinting THEN
out.PutF["%g pages occupied, %g are locked\n",
IO.card[nOccupiedItems], IO.card[nLockedItems]];
Assert[nOccupiedItems = checkOccupiedItems];
Assert[ht.nItems = checkOccupiedItems];
Assert[nFreeItems + nOccupiedItems = cacheSize];
};
END. -- DBSegmentImpl