Last Edited by:
Donahue, March 10, 1986 8:35:45 am PST
MBrown on February 25, 1983 1:49 pm
Cattell on September 16, 1983 11:13 am
Willie-Sue on April 25, 1985 12:45:53 pm PST
Wert, August 10, 1984 6:08:10 pm PDT
Widom, September 12, 1985 4:54:35 pm PDT
DBSegmentImpl:
CEDAR
PROGRAM
IMPORTS Atom, Basics, ConvertUnsafe, DB, DBCommon, DBFile, DBStats, IO, Rope, VM
EXPORTS DBSegment
= BEGIN
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Trans: TYPE = DBCommon.Transaction;
Segment: TYPE = DBCommon.Segment;
SegmentID: TYPE = DBCommon.SegmentID;
SegmentIndex: TYPE = DBCommon.SegmentIndex;
VersionOptions: TYPE = DBCommon.VersionOptions;
DBPage: TYPE = DBCommon.DBPage;
NullDBPage: DBPage = DBCommon.NullDBPage;
CacheHandle: TYPE = DBCommon.CacheHandle;
CacheRecord: TYPE = DBCommon.CacheRecord;
TID: TYPE = DBStorageTID.TID;
TakeALookAtThis: ERROR = CODE;
FileNode: TYPE = REF FileNodeRecord;
FileNodeRecord:
TYPE =
RECORD [
segmentID: DBCommon.DBPage,
segment: DBCommon.Segment ← NIL,
fileName: ROPE ← NIL,
openFile: DBCommon.OpenFileHandle ←
NIL,
if openFile = NIL, then the segment is not currently open
readonly: BOOL ← TRUE,
lock: AlpineEnvironment.LockOption ← [intendWrite, wait],
version: DBCommon.VersionOptions ← OldFileOnly,
pagesInitial, pagesPerExtent: NAT ← 0,
rest: FileNode ← NIL
];
HashTable: TYPE = RECORD [
nItems: NAT ← 0, -- number of CacheRecords in table
buckets: SEQUENCE size: NAT OF CacheHandle
];
Module state
initialized: BOOL ← FALSE;
createIndexesProc: PROC [s: Segment] RETURNS [ARRAY[0..DBCommon.systemIndexCount) OF TID];
logReads: BOOL ← FALSE; -- causes reads & writes to be logged on UserExec
noteThisPage: DBPage ← 0; -- causes signal when this page is read or written
files: FileNode ←
NIL;
List mapping SegmentID to interesting information about a file.
cacheSize:
NAT;
Number of pages allocated to cache.
ht:
REF HashTable;
Fast map from DBPage to CacheHandle. ht.size is a power of two.
hashMask:
CARDINAL;
Equals ht.size-1, used in computing hash function.
lruList: CacheHandle;
Points to header node of doubly-linked list of CacheRecords.
Data structure invariants: A cache record is present in lruList for every
allocated cache page; there is no separate "free list". occupied=FALSE means that
the page is free, and that the other cache record fields (readonly, written)
are undefined. occupied=TRUE means that the page contains valid data; a page is
stored in ht iff occupied=TRUE.
The lruList points to a cache record with no associated cache page; this "header node"
makes the circular-list manipulations work properly. The most-recently used page is
lruList.succ, and the least-recently used page is lruList.pred.
Initialize:
PUBLIC
PROC [
nCachePages: NAT,
useCacheFile: ROPE,
segmentInitProc: PROC [s: Segment] RETURNS [ARRAY[0..DBCommon.systemIndexCount) OF TID] ] = {
IF initialized THEN ERROR DB.InternalError;
cacheSize ← nCachePages;
createIndexesProc ← segmentInitProc;
InitializeBufferMgr[];
initialized ← TRUE;
};
FindSegmentID:
PROC[segmentID: DBCommon.DBPage]
RETURNS[found: FileNode] = {
Just a linear search, we're not going to have a tremendous number of segments open.
FOR found ← files, found.rest
UNTIL found=
NIL
DO
IF found.segmentID=segmentID THEN RETURN ENDLOOP};
FindSegment:
PROC [s: Segment]
RETURNS [found: FileNode] = {
Same as above, but looks for atom match instead of segment ID match.
If efficiency of this ever becomes important, we can attach info to prop list of atom.
FOR found ← files, found.rest
UNTIL found=
NIL
DO
IF found.segment=s THEN RETURN ENDLOOP};
AttachSegment:
PUBLIC
PROC [fileName:
ROPE, s: Segment, segmentIndex: SegmentIndex, lock: AlpineEnvironment.LockOption, readonly:
BOOL, version: VersionOptions, nPagesInitial, nPagesPerExtent:
NAT] = {
segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex];
fileNode: FileNode = FindSegmentID[segmentID];
IF NOT initialized THEN ERROR;
IF fileNode #
NIL
THEN {
IF fileNode.segmentID # segmentID
OR fileNode.segment # s
THEN
ERROR DB.Error[MismatchedExistingSegment];
fileNode.fileName ← fileName;
fileNode.version ← version;
fileNode.lock ← lock;
fileNode.readonly ← readonly;
fileNode.pagesInitial ← MAX[1, nPagesInitial];
fileNode.pagesPerExtent ← MAX[1, nPagesPerExtent];
RETURN;
};
files ←
NEW[FileNodeRecord ← [
segmentID: segmentID,
segment: s,
lock: lock,
readonly: readonly,
version: version,
fileName: fileName,
pagesInitial: MAX[1, nPagesInitial],
pagesPerExtent: MAX[1, nPagesPerExtent],
rest: files
]];
};
OpenSegment:
PUBLIC
PROC [s: Segment, trans: Trans, eraseAfterOpening:
BOOL] = {
createdFile: BOOL;
fileNode: FileNode = FindSegment[s];
[fileNode.openFile, createdFile] ← DBFile.OpenFile[
t: trans, file: fileNode.fileName, version: fileNode.version, discardFileContents: eraseAfterOpening, nPagesInitial: fileNode.pagesInitial, lock: fileNode.lock, readOnly: fileNode.readonly];
IF createdFile OR eraseAfterOpening THEN InitializeSegment[fileNode]
ELSE CheckSegment[fileNode];
IF eraseAfterOpening THEN DBFile.FinishTransaction[t: trans, abort: FALSE, continue: TRUE];
};
CloseSegment:
PUBLIC PROC [s: Segment] ~ {
fileNode: FileNode = FindSegment[s];
DBFile.Close[fileNode.openFile];
fileNode.openFile ← NIL
};
GetSegmentInfo:
PUBLIC
PROC [s: Segment]
RETURNS [filePath:
ROPE, readOnly:
BOOL] = {
fileNode: FileNode = FindSegment[s];
IF fileNode=NIL THEN RETURN[NIL, FALSE]
ELSE RETURN[fileNode.fileName, fileNode.readonly]
};
GetVersionStamp:
PUBLIC
PROC [segment: Segment]
RETURNS[schemaVersion:
INT] =
TRUSTED {
Returns the current schema version stamp for the given segment.
fileNode: FileNode = FindSegment[segment];
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
schemaVersion ← segmentHead.schemaVersionStamp;
UnlockPage[segmentHeadHandle]
};
SetVersionStamp:
PUBLIC
PROC [segment: Segment, to:
INT] =
TRUSTED {
Called when the schema for the given segment is changed; updates the schema version stamp.
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileNode: FileNode = FindSegment[segment];
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
IF fileNode.readonly THEN ERROR DB.Error[WriteNotAllowed];
[segmentHeadHandle, segmentHead] ← WritePage[fileNode.segmentID, NIL];
segmentHead.schemaVersionStamp ← to;
UnlockPage[segmentHeadHandle] };
SegmentFromTID:
PUBLIC
PROC [tid:
TID]
RETURNS [s: Segment] = {
fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]];
IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL];
};
IsValidTID:
PUBLIC
PROC [tid:
TID]
RETURNS [valid:
BOOL] = {
fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]];
RETURN [fileNode # NIL AND fileNode.openFile # NIL];
};
SegmentIDFromSegment:
PUBLIC
PROC [s: Segment]
RETURNS [SegmentID] = {
fileNode: FileNode = FindSegment[s];
IF fileNode = NIL THEN ERROR DB.Error[SegmentNotDeclared];
RETURN [fileNode.segmentID];
};
EnumerateSegments:
PUBLIC
PROC [enumProc:
PROC [s: Segment, segmentIndex: SegmentIndex]
RETURNS [stop:
BOOL]] = {
FOR f: FileNode ← files, f.rest
UNTIL f=
NIL
DO
IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT;
ENDLOOP;
};
FilePageFromDBPage:
PROC [dbPage: DBPage]
RETURNS [readonly:
BOOL, file: DBCommon.OpenFileHandle, page:
CARDINAL] = {
segmentID: SegmentID;
fileNode: FileNode;
[segmentID, page] ← DecomposeDBPage[dbPage];
fileNode ← FindSegmentID[segmentID];
IF fileNode =
NIL
OR (file ← fileNode.openFile) =
NIL
THEN
ERROR DB.Error[TransactionNotOpen];
RETURN [fileNode.readonly, file, page];
};
Procs that understand the format of the segment head page.
InitializeSegment:
PROC [fileNode: FileNode] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile];
fileNode.readonly ← FALSE;
[segmentHeadHandle, segmentHead] ← WritePage[fileNode.segmentID, NIL];
segmentHead^ ← [
segmentID: fileNode.segmentID,
allocator: [
firstPageInBlock: ConsDBPage[fileNode.segmentID, 1],
nPagesInBlock: fileSize-1,
pagesPerExtent: fileNode.pagesPerExtent],
softwareCompatibilityVersion: DBCommon.softwareCompatibilityVersion,
schemaVersionStamp: 1];
TRUSTED {
ConvertUnsafe.AppendRope[
to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]];
};
segmentHead.systemIndices ← createIndexesProc[fileNode.segment];
UnlockPage[segmentHeadHandle];
};
CheckSegment:
PROC [fileNode: FileNode] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
If the segment is not readonly then we need to be able to write the head page (its schema version stamp and software compatibility version).
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap
OR
segmentHead.seal # DBSegmentPrivate.Seal
THEN
ERROR DB.Error[MismatchedExistingSegment];
IF segmentHead.softwareCompatibilityVersion # DBCommon.softwareCompatibilityVersion THEN ERROR DB.Error[MismatchedExistingSegment];
TRUSTED {
IF
NOT Rope.Equal[
s1: Atom.GetPName[fileNode.segment],
s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]],
case: FALSE] THEN ERROR DB.Error[MismatchedExistingSegment];
};
UnlockPage[segmentHeadHandle];
};
RootIndicesFromSegment:
PUBLIC
PROC [s: Segment]
RETURNS [indices:
ARRAY[0..DBCommon.systemIndexCount)
OF
TID] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileNode: FileNode = FindSegment[s];
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap
THEN
ERROR DB.Error[MismatchedExistingSegment];
indices ← segmentHead.systemIndices;
UnlockPage[segmentHeadHandle];
RETURN [indices];
};
AllocPage:
PUBLIC
PROC [s: SegmentID]
RETURNS [p: DBPage, pValidHint: CacheHandle, cachePage:
LONG
POINTER] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
DBStats.Inc[SegmentAllocPage];
[segmentHeadHandle, segmentHead] ← WritePage[s, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap
THEN
ERROR DB.Error[MismatchedExistingSegment];
IF (p ← segmentHead.allocator.freeList) # NullDBPage
THEN {
The segment's free list is nonempty, so return a page from it.
allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage;
[pValidHint, allocatedFreeListPagePtr] ← WritePage[p, NIL];
IF allocatedFreeListPagePtr.tag # DBStoragePagetags.Free THEN ERROR DB.InternalError;
segmentHead.allocator.freeList ← allocatedFreeListPagePtr.next;
cachePage ← allocatedFreeListPagePtr;
}
ELSE {
The segment's free list is empty, so allocate from block allocator, perhaps extending file.
IF segmentHead.allocator.firstPageInBlock = NullDBPage
THEN {
fileNode: FileNode ← FindSegmentID[s];
size: CARDINAL ← DBFile.GetSize[fileNode.openFile];
DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent];
segmentHead.allocator.firstPageInBlock ← ConsDBPage[s, size];
segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.pagesPerExtent;
};
p ← segmentHead.allocator.firstPageInBlock;
IF (segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.nPagesInBlock-1) = 0
THEN
segmentHead.allocator.firstPageInBlock ← NullDBPage
ELSE segmentHead.allocator.firstPageInBlock ←
p + DBStorageTID.DBPageIncrement;
[pValidHint, cachePage] ← NewPage[p];
};
UnlockPage[segmentHeadHandle];
TRUSTED {
LOOPHOLE[cachePage,
LONG
POINTER
TO DBStoragePagetags.PageHeader].pageTag ←
DBStoragePagetags.Unused;
};
RETURN[p, pValidHint, cachePage];
};--AllocPage
FreePage:
PUBLIC
PROC [s: SegmentID, freePg: DBPage, freeHint: CacheHandle] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
free: LONG POINTER TO DBSegmentPrivate.FreeListPage;
DBStats.Inc[SegmentFreePage];
IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DB.InternalError;
[segmentHeadHandle, segmentHead] ← WritePage[s, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DB.InternalError;
IF LockCount[freePg, freeHint] # 1 THEN ERROR DB.InternalError;
free ← ObtainCoreLoc[freePg, freeHint];
IF free.tag = DBStoragePagetags.Free
OR free.tag = DBStoragePagetags.AMap
THEN
ERROR DB.InternalError;
WriteLockedPage[freeHint];
free^ ← [next: segmentHead.allocator.freeList];
segmentHead.allocator.freeList ← freePg;
UnlockPage[freeHint];
UnlockPage[segmentHeadHandle];
};--FreePage
Buffer manager
invalidKey: DBPage = 37777777777B;
doesn't match any true DBPage (since all DBPages have low bits 0).
InitializeBufferMgr:
PROC [] = {
Number of hash buckets = smallest power of 2 not less than cacheSize.
hashBuckets: CARDINAL ← 1;
temp: CARDINAL ← Basics.BITSHIFT[cacheSize, -1];
UNTIL temp=0
DO
hashBuckets ← Basics.BITSHIFT[hashBuckets, 1];
temp ← Basics.BITSHIFT[temp, -1];
ENDLOOP;
ht ← NEW[HashTable[hashBuckets] ← [nItems: 0, buckets: ]];
hashMask ← hashBuckets-1;
Create cacheRecords, format them into an lru list.
lruList ← NEW[CacheRecord ← [dbPage: invalidKey, pagePtr: NIL]];
lruList.pred ← lruList.succ ← lruList;
FOR i:
NAT
IN [0 .. (cacheSize+15)/16)
DO
AddCachePages[16];
ENDLOOP;
};
ReadPage:
PUBLIC
PROC [p: DBPage, pHint: CacheHandle]
RETURNS [CacheHandle,
LONG
POINTER] = {
file: DBCommon.OpenFileHandle; page: CARDINAL;
IF logReads
THEN
DBCommon.GetDebugStream[].PutChar['#];
DBStats.Inc[CacheReadOrWrite];
IF p=noteThisPage THEN ERROR TakeALookAtThis;
IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage;
hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GO TO ReturnPage;
page is not present in cache, read it in
IF logReads THEN DBCommon.GetDebugStream[].PutChar['!];
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{
-- We expect ERRORs to be raised from DBFile.ReadFilePage
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
RETURN[pHint, pHint.pagePtr]
}
};
WritePage:
PUBLIC
PROC [p: DBPage, pHint: CacheHandle]
RETURNS [CacheHandle,
LONG
POINTER] = {
DBStats.Inc[CacheReadOrWrite];
IF p=noteThisPage THEN ERROR TakeALookAtThis;
IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage;
hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GOTO ReturnPage;
page is not present in cache, read it in
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{
-- We expect ERRORs to be raised from DBFile.ReadFilePage
file: DBCommon.OpenFileHandle; page: CARDINAL;
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
IF pHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
}
};
NewPage:
PROC [p: DBPage]
RETURNS [CacheHandle,
LONG
POINTER] = {
segmentID: SegmentID;
fileNode: FileNode;
pHint: CacheHandle ← Lookup[p];
IF pHint # NIL THEN ERROR DB.InternalError; -- [NotaNewPage]
pHint ← GetUnoccupiedPage[];
{
[segmentID, ] ← DecomposeDBPage[p];
fileNode ← FindSegmentID[segmentID];
IF fileNode = NIL OR fileNode.openFile = NIL THEN ERROR DB.Error[TransactionNotOpen];
pHint.readonly ← fileNode.readonly;
IF pHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
};
pHint.dbPage ← p; Insert[pHint];
MakeMRU[pHint];
pHint.lockCount ← 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
};
WriteLockedPage:
PUBLIC
PROC [pValidHint: CacheHandle] = {
IF pValidHint =
NIL
OR pValidHint.lockCount = 0
THEN
ERROR DB.InternalError; -- [AlreadyUnlocked]
IF pValidHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
pValidHint.written ← TRUE;
};
LockCount:
PUBLIC
PROC [p: DBPage, pValidHint: CacheHandle]
RETURNS [
CARDINAL] = {
IF pValidHint =
NIL
OR pValidHint.dbPage # p
OR pValidHint.lockCount = 0
THEN
ERROR DB.InternalError; -- BadCacheHandle
RETURN[pValidHint.lockCount];
};
ObtainCoreLoc:
PUBLIC
PROC [p: DBPage, pValidHint: CacheHandle]
RETURNS [
LONG
POINTER] = {
IF pValidHint =
NIL
OR pValidHint.dbPage # p
OR pValidHint.lockCount = 0
THEN
ERROR DB.InternalError; -- [BadCacheHandle];
RETURN[pValidHint.pagePtr];
};
UnlockPage:
PUBLIC
PROC [pValidHint: CacheHandle] = {
IF pValidHint =
NIL
OR pValidHint.lockCount = 0
THEN
ERROR DB.InternalError; -- [AlreadyUnlocked];
pValidHint.lockCount ← pValidHint.lockCount - 1;
};
WriteOutCache:
PUBLIC PROC [s: Segment] = {
segmentID: SegmentID = SegmentIDFromSegment[s];
WritePageOutIfUsesTrans:
PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.written
THEN {
thisSegment: SegmentID = InlineSegmentIDFromDBPage[cacheHandle.dbPage];
IF segmentID = thisSegment THEN WritePageOut[cacheHandle];
};
};
EnumerateHashTable[WritePageOutIfUsesTrans];
};
FlushCache:
PUBLIC
PROC [s: Segment] = {
segmentID: SegmentID = SegmentIDFromSegment[s];
DeleteIfUsesTrans:
PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.occupied
THEN {
thisSegment: SegmentID = InlineSegmentIDFromDBPage[cacheHandle.dbPage];
IF segmentID = thisSegment THEN Delete[cacheHandle];
};
};
EnumerateLRUList[DeleteIfUsesTrans]
};
WritePageOut:
PROC [cacheHandle: CacheHandle] = {
Send the contents of a dirty cache page to the database segment, and mark
the page no longer dirty (leaving it in the cache).
readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL;
DBStats.Inc[CacheWritePageBack];
[readonly, file, page] ← FilePageFromDBPage[cacheHandle.dbPage];
IF readonly THEN ERROR DB.Error[WriteNotAllowed];
DBFile.WriteFilePage[file, page, cacheHandle.pagePtr];
cacheHandle.written ← FALSE;
};
Buffer manager LRU list
AddCachePages:
PROC [nPages:
NAT] =
TRUSTED {
Creates nPages new cache records, in the LRU position.
curCachePage:
LONG
POINTER ←
VM.AddressForPageNumber[
VM.Allocate[count: nPages*DBCommon.PagesPerDBPage, in64K: TRUE].page];
FOR i:
NAT
IN [0..nPages)
DO
p: CacheHandle ←
NEW[CacheRecord ← [
dbPage: invalidKey, pagePtr: curCachePage]];
curCachePage ← curCachePage + DBCommon.WordsPerPage;
p.pred ← lruList.pred; p.succ ← lruList;
lruList.pred.succ ← p; lruList.pred ← p;
ENDLOOP;
};
MakeMRU:
PROC [x: CacheHandle] = {
Assumes that x is part of lruList. Deletes x from its current position and makes
it first in lruList.
x.pred.succ ← x.succ; x.succ.pred ← x.pred;
x.succ ← lruList.succ; x.pred ← lruList;
lruList.succ.pred ← x; lruList.succ ← x;
};
EnumerateLRUList:
PROC [procToCall:
PROC[CacheHandle]] = {
Calls procToCall for each record in LRUList, in MRU to LRU order.
procToCall may not perform MakeMRU; use EnumerateHashTable if you want this.
Note that procToCall will be called for both occupied and unoccupied pages.
FOR p: CacheHandle ← lruList.succ, p.succ
UNTIL p = lruList
DO
procToCall[p];
ENDLOOP;
};
GetUnoccupiedPage:
PROC
RETURNS [CacheHandle] = {
Obtain an empty cache page. It will be the least-recently used page that is
unoccupied or occupied but unlocked. The page is returned with Key = garbage,
Page = ptr to a cache page, occupied = FALSE, written = FALSE,
and entered on lruList in no particular location.
cacheHandle: CacheHandle;
FOR cacheHandle ← lruList.pred, cacheHandle.pred
UNTIL cacheHandle = lruList
DO
IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage;
IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied;
REPEAT
FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied
MakePageUnoccupied => {
-- cacheHandle.occupied AND cacheHandle.lockCount=0
IF cacheHandle.written THEN WritePageOut[cacheHandle];
Delete[cacheHandle];
};
FINISHED => ERROR DB.Failure[$allCachePagesLocked, "no empty cache pages"]
ENDLOOP;
cacheHandle.written ← FALSE;
RETURN[cacheHandle];
};