-- DBSegmentImpl.mesa
-- Last Edited by: MBrown on February 25, 1983 1:49 pm
-- Last Edited by: Cattell on September 16, 1983 11:13 am
-- Last Edited by: Willie-Sue on February 3, 1983 10:47 am
-- Last Edited by: Wert, August 10, 1984 6:08:10 pm PDT
DIRECTORY
Atom,
Basics,
ConvertUnsafe,
DBCache,
DBCommon,
DBEnvironment,
DBFile,
DBSegment,
DBSegmentExtras,
DBSegmentPrivate,
DBStats,
DBStoragePagetags,
DBStorageTID USING[DBPageIncrement, TID],
File,
IO,
Rope USING [ROPE, Equal],
VM USING [Allocate, AddressForPageNumber];
DBSegmentImpl: PROGRAM
IMPORTS
Atom,
Basics,
ConvertUnsafe,
DBCommon,
DBEnvironment,
DBFile,
DBSegment,
DBStats,
IO,
Rope,
VM
EXPORTS
DBSegment, DBSegmentExtras
SHARES
DBCache
= BEGIN
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Trans: TYPE = DBCommon.Trans;
Segment: TYPE = DBCommon.Segment;
SegmentID: TYPE = DBCommon.SegmentID;
SegmentIndex: TYPE = DBCommon.SegmentIndex;
VersionOptions: TYPE = DBCommon.VersionOptions;
DBPage: TYPE = DBCommon.DBPage;
NullDBPage: DBPage = DBCommon.NullDBPage;
CacheHandle: TYPE = DBCache.CacheHandle;
CacheRecord: TYPE = DBCache.CacheRecord;
TID: TYPE = DBStorageTID.TID;
TakeALookAtThis: SIGNAL = CODE;
FileNode: TYPE = REF FileNodeRecord;
FileNodeRecord: TYPE = RECORD [
segmentID: DBCommon.DBPage,
segment: DBCommon.Segment ← NIL,
trans: DBCommon.Trans ← NIL,
openFile: DBCommon.OpenFileHandle ← NIL,
readonly: BOOL ← TRUE,
version: DBCommon.VersionOptions ← OldFileOnly,
initializeExistingFile: BOOL ← FALSE,
fileName: ROPE ← NIL,
pagesInitial, pagesPerExtent: NAT ← 0,
versionNumber: INT ← -1,
fileChanged: BOOL ← FALSE, -- Indicates was due to interference
aborted: BOOL ← FALSE, -- Indicates transaction abort
rest: FileNode ← NIL
];
HashTable: TYPE = RECORD [
nItems: NAT ← 0, -- number of CacheRecords in table
buckets: SEQUENCE size: NAT OF CacheHandle
];
-- Module state
initialized: BOOL ← FALSE;
createIndexesProc: PROC [s: Segment] RETURNS [TID, TID];
logReads: BOOL← FALSE; -- causes reads & writes to be logged on UserExec
noteThisPage: DBPage← 0; -- causes signal when this page is read or written
files: FileNode← NIL;
-- List mapping SegmentID to interesting information about a file.
cacheSize: NAT;
-- Number of pages allocated to cache.
ht: REF HashTable;
-- Fast map from DBPage to CacheHandle. ht.size is a power of two.
hashMask: CARDINAL;
-- Equals ht.size-1, used in computing hash function.
lruList: CacheHandle;
-- Points to header node of doubly-linked list of CacheRecords.
-- Data structure invariants: A cache record is present in lruList for every
-- allocated cache page; there is no separate "free list". occupied=FALSE means that
-- the page is free, and that the other cache record fields (readonly, written)
-- are undefined. occupied=TRUE means that the page contains valid data; a page is
-- stored in ht iff occupied=TRUE.
-- The lruList points to a cache record with no associated cache page; this "header node"
-- makes the circular-list manipulations work properly. The most-recently used page is
-- lruList.succ, and the least-recently used page is lruList.pred.
Initialize: PUBLIC PROC [
nCachePages: NAT,
useCacheFile: ROPE,
segmentInitProc: PROC [s: Segment] RETURNS [TID, TID]] = {
IF initialized THEN ERROR DBEnvironment.InternalError;
cacheSize ← nCachePages;
createIndexesProc ← segmentInitProc;
InitializeBufferMgr[];
initialized ← TRUE;
};
FindSegmentID: PROC[segmentID: DBCommon.DBPage] RETURNS[found: FileNode] = {
-- Just a linear search, we're not going to have a tremendous number of segments open.
FOR found← files, found.rest UNTIL found=NIL DO
IF found.segmentID=segmentID THEN RETURN ENDLOOP};
FindSegment: PROC [s: Segment] RETURNS [found: FileNode] = {
-- Same as above, but looks for atom match instead of segment ID match.
-- If efficiency of this ever becomes important, we can attach info to prop list of atom.
FOR found← files, found.rest UNTIL found=NIL DO
IF found.segment=s THEN RETURN ENDLOOP};
AttachSegment: PUBLIC PROC [
fileName: ROPE, s: Segment, segmentIndex: SegmentIndex,
readonly: BOOL, version: VersionOptions, initializeExistingFile: BOOL,
nBytesInitial, nBytesPerExtent: INT] = {
fileNode: FileNode;
segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex];
IF NOT initialized THEN ERROR;
fileNode ← FindSegmentID[segmentID];
IF fileNode # NIL THEN {
IF fileNode.trans # NIL THEN -- Client declared segment when transaction open on it
{SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN};
IF fileNode.segmentID # segmentID OR fileNode.segment # s THEN
ERROR DBEnvironment.Error[MismatchedExistingSegment];
fileNode.initializeExistingFile ← initializeExistingFile;
fileNode.fileName← fileName;
fileNode.version← version;
fileNode.readonly← readonly;
fileNode.pagesInitial← MAX[1, DBFile.PagesFromBytes[nBytesInitial]];
fileNode.pagesPerExtent← MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]];
RETURN;
};
files ← NEW[FileNodeRecord← [
segmentID: segmentID,
segment: s,
readonly: readonly,
version: version,
initializeExistingFile: initializeExistingFile,
fileName: fileName,
pagesInitial: MAX[1, DBFile.PagesFromBytes[nBytesInitial]],
pagesPerExtent: MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]],
rest: files
]];
};
OpenTransaction: PUBLIC PROC [s: Segment, useTrans: Trans, noLog: BOOL]
RETURNS [fileChanged: BOOL] = {
fileNode: FileNode = FindSegment[s];
IF fileNode=NIL THEN
{SIGNAL DBEnvironment.Error[SegmentNotDeclared]; RETURN};
IF fileNode.trans # NIL THEN
{SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN};
IF useTrans = NIL THEN {
server: ROPE = DBFile.FileServerFromFileName[fileNode.fileName];
useTrans ← DBFile.CreateTransaction[server];
};
fileNode.trans ← useTrans;
RETURN[OpenSegment[fileNode, noLog]];
};
OpenSegment: PROC [fileNode: FileNode, noLog: BOOL] RETURNS [fileChanged: BOOL] = {
createdFile: BOOL; newVersion: INT;
IF fileNode.trans = NIL THEN ERROR;
[fileNode.openFile, createdFile, newVersion] ← DBFile.OpenFile[
t: fileNode.trans, file: fileNode.fileName,
version: fileNode.version, discardFileContents: fileNode.initializeExistingFile,
nPagesInitial: fileNode.pagesInitial, readOnly: fileNode.readonly, noLog: noLog];
fileNode.fileChanged ← (newVersion # fileNode.versionNumber);
fileNode.versionNumber ← newVersion;
fileChanged ← fileNode.fileChanged OR fileNode.aborted;
fileNode.aborted ← FALSE;
IF createdFile OR fileNode.initializeExistingFile THEN
{ InitializeSegment[fileNode];
fileNode.initializeExistingFile ← FALSE;
fileChanged ← TRUE; } -- force trashing of cached stuff (for DB.EraseSegment)
ELSE CheckSegment[fileNode];
};
GetSegmentInfo: PUBLIC PROC [s: Segment] RETURNS [
filePath: ROPE, number: NAT, trans: Trans,
readOnly: BOOL, nBytesInitial, nBytesPerExtent: INT] = {
fileNode: FileNode← FindSegment[s];
IF fileNode=NIL THEN RETURN[NIL, 0, NIL, FALSE, 0, 0]
ELSE RETURN[
fileNode.fileName,
SegmentIndexFromSegmentID[fileNode.segmentID],
fileNode.trans,
fileNode.readonly,
DBFile.BytesFromPages[fileNode.pagesInitial],
DBFile.BytesFromPages[fileNode.pagesPerExtent]]
};
GetFileChanged:
PUBLIC
PROC [s: Segment]
RETURNS [fileChanged:
BOOL] =
{
fileNode: FileNode← FindSegment[s];
IF fileNode = NIL THEN RETURN [FALSE] ELSE RETURN [fileNode.fileChanged];
};
FinishTransaction:
PUBLIC
PROC [t: Trans, abort:
BOOL, continue:
BOOL] =
-- Close or abort the transaction, and set file to NIL for file nodes under this transaction.
{
IF NOT abort AND NOT WriteoutCache[t] THEN ERROR;
-- Update the cached version numbers for files opened with this transaction
IF
NOT abort
THEN
{
DBFile.FinishTransaction[t, FALSE, TRUE]; -- Force version number update
FOR f: FileNode ← files, f.rest
UNTIL f=
NIL
DO
IF f.trans = t
THEN
{
f.versionNumber ← DBFile.VersionNumberFromOpenFile[f.openFile];
};
ENDLOOP;
};
IF NOT continue THEN DBFile.FinishTransaction[t, abort, continue];
IF abort
OR
NOT continue
THEN
{
AbortCache[t];
FOR f: FileNode← files, f.rest
UNTIL f=
NIL
DO
IF f.trans = t
THEN
{
f.openFile ← NIL;
f.trans ← NIL;
IF abort THEN f.aborted ← TRUE; -- Forces fileChanged indication later
};
ENDLOOP;
};
};
SegmentFromTID: PUBLIC PROC [tid: TID] RETURNS [s: Segment] = {
fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]];
IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL];
};
IsValidTID: PUBLIC PROC [tid: TID] RETURNS [valid: BOOL] = {
fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]];
RETURN [fileNode # NIL AND fileNode.openFile # NIL];
};
SegmentIDFromSegment: PUBLIC PROC [s: Segment] RETURNS [SegmentID] = {
fileNode: FileNode = FindSegment[s];
IF fileNode = NIL THEN
ERROR DBEnvironment.Error[SegmentNotDeclared];
IF fileNode.openFile = NIL THEN
ERROR DBEnvironment.Error[TransactionNotOpen];
RETURN [fileNode.segmentID];
};
EnumerateSegments: PUBLIC PROC [
enumProc: PROC [s: Segment, segmentIndex: SegmentIndex] RETURNS [stop: BOOL]] = {
FOR f: FileNode ← files, f.rest UNTIL f=NIL DO
IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT;
ENDLOOP;
};
FilePageFromDBPage: PROC [dbPage: DBPage]
RETURNS [readonly: BOOL, file: DBCommon.OpenFileHandle, page: CARDINAL] = {
segmentID: SegmentID;
fileNode: FileNode;
[segmentID, page] ← DecomposeDBPage[dbPage];
fileNode ← FindSegmentID[segmentID];
IF fileNode = NIL OR (file ← fileNode.openFile) = NIL THEN
ERROR DBEnvironment.Error[TransactionNotOpen];
RETURN [fileNode.readonly, file, page];
};
-- Procs that understand the format of the segment head page.
InitializeSegment: PROC [fileNode: FileNode] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile];
fileNode.readonly ← FALSE;
[segmentHeadHandle, segmentHead] ← DBSegment.WritePage[fileNode.segmentID, NIL];
segmentHead^ ← [
segmentID: fileNode.segmentID,
allocator: [
firstPageInBlock: ConsDBPage[fileNode.segmentID, 1],
nPagesInBlock: fileSize-1,
pagesPerExtent: fileNode.pagesPerExtent]];
ConvertUnsafe.AppendRope[
to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]];
[segmentHead.index1, segmentHead.index2] ← createIndexesProc[fileNode.segment];
DBSegment.UnlockPage[segmentHeadHandle];
};
CheckSegment: PROC [fileNode: FileNode] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
[segmentHeadHandle, segmentHead] ← DBSegment.ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap OR
segmentHead.softwareCompatibilityVersion #
DBCommon.softwareCompatibilityVersion OR
segmentHead.seal # DBSegmentPrivate.Seal THEN
ERROR DBEnvironment.Error[MismatchedExistingSegment];
IF NOT Rope.Equal[
s1: Atom.GetPName[fileNode.segment],
s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]],
case: FALSE] THEN ERROR DBEnvironment.Error[MismatchedExistingSegment];
DBSegment.UnlockPage[segmentHeadHandle];
};
RootIndicesFromSegment: PUBLIC PROC [s: Segment] RETURNS [index1, index2: TID] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileNode: FileNode = FindSegment[s];
IF fileNode=NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared];
[segmentHeadHandle, segmentHead] ← DBSegment.ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap THEN
ERROR DBEnvironment.Error[MismatchedExistingSegment];
index1 ← segmentHead.index1; index2 ← segmentHead.index2;
DBSegment.UnlockPage[segmentHeadHandle];
RETURN [index1, index2];
};
AllocPage: PUBLIC PROC [s: DBPage] RETURNS [allocatedPage: DBPage,
allocatedPageHandle: CacheHandle, allocatedPagePtr: LONG POINTER] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
DBStats.Inc[SegmentAllocPage];
[segmentHeadHandle, segmentHead] ← DBSegment.WritePage[s, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap THEN
ERROR DBEnvironment.Error[MismatchedExistingSegment];
IF (allocatedPage ← segmentHead.allocator.freeList) # NullDBPage THEN {
-- The segment's free list is nonempty, so return a page from it.
allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage;
[allocatedPageHandle, allocatedFreeListPagePtr] ← DBSegment.WritePage[allocatedPage, NIL];
IF allocatedFreeListPagePtr.tag # DBStoragePagetags.Free THEN ERROR DBEnvironment.InternalError;
segmentHead.allocator.freeList ← allocatedFreeListPagePtr.next;
allocatedPagePtr ← allocatedFreeListPagePtr;
}
ELSE {
-- The segment's free list is empty, so allocate from block allocator, perhaps extending file.
IF segmentHead.allocator.firstPageInBlock = NullDBPage THEN {
fileNode: FileNode ← FindSegmentID[s];
size: CARDINAL ← DBFile.GetSize[fileNode.openFile];
DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent];
segmentHead.allocator.firstPageInBlock ← ConsDBPage[s, size];
segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.pagesPerExtent;
};
allocatedPage ← segmentHead.allocator.firstPageInBlock;
IF (segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.nPagesInBlock-1) = 0 THEN
segmentHead.allocator.firstPageInBlock ← NullDBPage
ELSE segmentHead.allocator.firstPageInBlock ←
allocatedPage + DBStorageTID.DBPageIncrement;
[allocatedPageHandle, allocatedPagePtr] ← NewPage[allocatedPage];
};
DBSegment.UnlockPage[segmentHeadHandle];
LOOPHOLE[allocatedPagePtr, LONG POINTER TO DBStoragePagetags.PageHeader].pageTag ←
DBStoragePagetags.Unused;
RETURN[allocatedPage, allocatedPageHandle, allocatedPagePtr];
};--AllocPage
FreePage: PUBLIC PROC [s: DBPage, freePg: DBPage, freeHint: CacheHandle] = {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
free: LONG POINTER TO DBSegmentPrivate.FreeListPage;
DBStats.Inc[SegmentFreePage];
IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBEnvironment.InternalError;
[segmentHeadHandle, segmentHead] ← DBSegment.WritePage[s, NIL];
IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError;
IF DBSegment.LockCount[freePg, freeHint] # 1 THEN ERROR DBEnvironment.InternalError;
free ← DBSegment.ObtainCoreLoc[freePg, freeHint];
IF free.tag = DBStoragePagetags.Free OR free.tag = DBStoragePagetags.AMap THEN
ERROR DBEnvironment.InternalError;
DBSegment.WriteLockedPage[freeHint];
free^ ← [next: segmentHead.allocator.freeList];
segmentHead.allocator.freeList ← freePg;
DBSegment.UnlockPage[freeHint];
DBSegment.UnlockPage[segmentHeadHandle];
};--FreePage
-- Procs that understand the bit layout of a DBPage.
SegmentIDFromSegmentIndex: PROC [segmentIndex: SegmentIndex]
RETURNS [SegmentID] = {
segmentID: Basics.LongNumber.num = [num[lowbits: 0,
highbits: Basics.BITSHIFT[value: segmentIndex, count: 7]]];
RETURN [LOOPHOLE[segmentID]];
};
SegmentIndexFromSegmentID: PROC [segmentID: SegmentID]
RETURNS [SegmentIndex] = {
RETURN [Basics.BITSHIFT[
value: LOOPHOLE[segmentID, Basics.LongNumber.num].highbits,
count: -7]];
};
SegmentIDFromDBPage: PUBLIC PROC [dbPage: DBPage]
RETURNS [SegmentID] = {
RETURN [InlineSegmentIDFromDBPage[dbPage]]
};
InlineSegmentIDFromDBPage: PROC [dbPage: DBPage]
RETURNS [SegmentID] = INLINE {
segmentID: Basics.LongNumber.num = [num[lowbits: 0,
highbits: Basics.BITAND[LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, 177600B]]];
RETURN [LOOPHOLE[segmentID]];
};
DecomposeDBPage: PROC [dbPage: DBPage] RETURNS [SegmentID, CARDINAL] = {
segmentID1: Basics.LongNumber.num = [num[lowbits: 0,
highbits: Basics.BITAND[LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, 177600B]]];
filePage: CARDINAL ← Basics.BITOR[
Basics.BITSHIFT[
value: LOOPHOLE[dbPage, Basics.LongNumber.num].highbits, count: 10],
Basics.BITSHIFT[
value: LOOPHOLE[dbPage, Basics.LongNumber.num].lowbits, count: -6]];
RETURN [LOOPHOLE[segmentID1], filePage];
};
ConsDBPage: PROC [segmentID: SegmentID, filePage: CARDINAL]
RETURNS [dbPage: DBPage] = {
-- Inverse of DecomposeDBPage
dbPage1: Basics.LongNumber.num = [num[
highbits: Basics.BITOR[
LOOPHOLE[segmentID, Basics.LongNumber.num].highbits,
Basics.BITSHIFT[value: filePage, count: -10]],
lowbits: Basics.BITSHIFT[value: filePage, count: 6]]];
RETURN[LOOPHOLE[dbPage1]];
};
Hash: PROC [dbPage: DBPage] RETURNS [NAT] = INLINE { RETURN [
Basics.BITAND[
hashMask,
Basics.BITXOR[
Basics.BITSHIFT[
value: LOOPHOLE[dbPage, Basics.LongNumber.num].highbits,
count: -7],
Basics.BITSHIFT[
value: LOOPHOLE[dbPage, Basics.LongNumber.num].lowbits,
count: -6]]]];
};
-- Buffer manager
invalidKey: DBPage = 37777777777B;
-- doesn't match any true DBPage (since all DBPages have low bits 0).
InitializeBufferMgr: PROC [] = {
-- Number of hash buckets = smallest power of 2 not less than cacheSize.
hashBuckets: CARDINAL ← 1;
temp: CARDINAL ← Basics.BITSHIFT[cacheSize, -1];
UNTIL temp=0 DO
hashBuckets ← Basics.BITSHIFT[hashBuckets, 1];
temp ← Basics.BITSHIFT[temp, -1];
ENDLOOP;
ht ← NEW[HashTable[hashBuckets] ← [nItems: 0, buckets: ]];
hashMask ← hashBuckets-1;
-- Create cacheRecords, format them into an lru list.
lruList ← NEW[CacheRecord ← [dbPage: invalidKey, pagePtr: NIL]];
lruList.pred ← lruList.succ ← lruList;
FOR i: NAT IN [0 .. (cacheSize+15)/16) DO
AddCachePages[16];
ENDLOOP;
};
ReadPage: PUBLIC PROC [p: DBPage, pHint: CacheHandle]
RETURNS [CacheHandle, LONG POINTER] = {
file: DBCommon.OpenFileHandle; page: CARDINAL;
IF logReads THEN
DBCommon.GetDebugStream[].PutChar['#];
DBStats.Inc[CacheReadOrWrite];
-- IF p=noteThisPage THEN SIGNAL TakeALookAtThis;
IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage;
-- hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GO TO ReturnPage;
-- page is not present in cache, read it in
IF logReads THEN
DBCommon.GetDebugStream[].PutChar['!];
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{ -- We expect ERRORs to be raised from DBFile.ReadFilePage
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
RETURN[pHint, pHint.pagePtr]
}
};
WritePage: PUBLIC PROC [p: DBPage, pHint: CacheHandle]
RETURNS [CacheHandle, LONG POINTER] = {
DBStats.Inc[CacheReadOrWrite];
-- IF p=noteThisPage THEN SIGNAL TakeALookAtThis;
IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage;
-- hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GOTO ReturnPage;
-- page is not present in cache, read it in
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{ -- We expect ERRORs to be raised from DBFile.ReadFilePage
file: DBCommon.OpenFileHandle; page: CARDINAL;
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
IF pHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
}
};
NewPage: PROC [p: DBPage]
RETURNS [CacheHandle, LONG POINTER] = {
pHint: CacheHandle ← Lookup[p];
IF pHint # NIL THEN ERROR DBEnvironment.InternalError; -- [NotaNewPage]
pHint ← GetUnoccupiedPage[];
{
[pHint.readonly, , ] ← FilePageFromDBPage[p];
IF pHint.readonly THEN DBEnvironment.Error[WriteNotAllowed];
};
pHint.dbPage ← p; Insert[pHint];
MakeMRU[pHint];
pHint.lockCount ← 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
};
WriteLockedPage: PUBLIC PROC [pValidHint: CacheHandle] = {
IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN
ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked]
IF pValidHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed];
pValidHint.written ← TRUE;
};
LockCount: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle]
RETURNS [CARDINAL] = {
IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN
ERROR DBEnvironment.InternalError; -- BadCacheHandle
RETURN[pValidHint.lockCount];
};
ObtainCoreLoc: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle]
RETURNS [LONG POINTER] = {
IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN
ERROR DBEnvironment.InternalError; -- [BadCacheHandle];
RETURN[pValidHint.pagePtr];
};
UnlockPage: PUBLIC PROC [pValidHint: CacheHandle] = {
IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN
ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked];
pValidHint.lockCount ← pValidHint.lockCount - 1;
};
WriteoutCache: PROC [t: Trans] RETURNS [success: BOOL] = {
WritePageOutIfUsesTrans: PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.written THEN {
fileNode: FileNode =
FindSegmentID[InlineSegmentIDFromDBPage[cacheHandle.dbPage]];
IF fileNode.trans = t THEN WritePageOut[cacheHandle];
};
};
EnumerateHashTable[WritePageOutIfUsesTrans];
RETURN[TRUE];
};
AbortCache: PROC [t: Trans] = {
DeleteIfUsesTrans: PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.occupied THEN {
fileNode: FileNode =
FindSegmentID[InlineSegmentIDFromDBPage[cacheHandle.dbPage]];
IF fileNode.trans = t THEN Delete[cacheHandle];
};
};
EnumerateLRUList[DeleteIfUsesTrans];
};
WritePageOut: PROC [cacheHandle: CacheHandle] = {
-- Send the contents of a dirty cache page to the database segment, and mark
-- the page no longer dirty (leaving it in the cache).
readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL;
DBStats.Inc[CacheWritePageBack];
[readonly, file, page] ← FilePageFromDBPage[cacheHandle.dbPage];
IF readonly THEN
-- The following signals should never happen, if Mark's code works as I believe it does...
SIGNAL DBEnvironment.Error[WriteNotAllowed]
ELSE
DBFile.WriteFilePage[file, page, cacheHandle.pagePtr];
cacheHandle.written ← FALSE;
};
-- Buffer manager LRU list
AddCachePages: PROC [nPages: NAT] = {
-- Creates nPages new cache records, in the LRU position.
curCachePage: LONG POINTER ← VM.AddressForPageNumber[
VM.Allocate[count: nPages*DBCommon.PagesPerDBPage, in64K: TRUE].page];
FOR i: NAT IN [0..nPages) DO
p: CacheHandle ← NEW[CacheRecord ← [
dbPage: invalidKey, pagePtr: curCachePage]];
curCachePage ← curCachePage + DBCommon.WordsPerPage;
p.pred ← lruList.pred; p.succ ← lruList;
lruList.pred.succ ← p; lruList.pred ← p;
ENDLOOP;
};
MakeMRU: PROC [x: CacheHandle] = {
-- Assumes that x is part of lruList. Deletes x from its current position and makes
-- it first in lruList.
x.pred.succ ← x.succ; x.succ.pred ← x.pred;
x.succ ← lruList.succ; x.pred ← lruList;
lruList.succ.pred ← x; lruList.succ ← x;
};
EnumerateLRUList: PROC [procToCall: PROC[CacheHandle]] = {
-- Calls procToCall for each record in LRUList, in MRU to LRU order.
-- procToCall may not perform MakeMRU; use EnumerateHashTable if you want this.
-- Note that procToCall will be called for both occupied and unoccupied pages.
FOR p: CacheHandle ← lruList.succ, p.succ UNTIL p = lruList DO
procToCall[p];
ENDLOOP;
};
GetUnoccupiedPage: PROC RETURNS [CacheHandle] = {
-- Obtain an empty cache page. It will be the least-recently used page that is
-- unoccupied or occupied but unlocked. The page is returned with Key = garbage,
-- Page = ptr to a cache page, occupied = FALSE, written = FALSE,
-- and entered on lruList in no particular location.
cacheHandle: CacheHandle;
FOR cacheHandle ← lruList.pred, cacheHandle.pred UNTIL cacheHandle = lruList DO
IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage;
IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied;
REPEAT
FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied
MakePageUnoccupied => { -- cacheHandle.occupied AND cacheHandle.lockCount=0
IF cacheHandle.written THEN WritePageOut[cacheHandle];
Delete[cacheHandle];
};
FINISHED => ERROR DBEnvironment.Failure[$allCachePagesLocked]
ENDLOOP;
cacheHandle.written ← FALSE;
RETURN[cacheHandle];
};
-- Buffer manager hash table
EnumerateHashTable: PROC[procToCall: PROC[CacheHandle]] = {
-- Calls procToCall for each record in table.
-- procToCall may not perform Delete; use EnumerateLRUList if you want this.
FOR i: NAT IN [0..ht.size) DO
FOR p: CacheHandle ← ht.buckets[i], p.hashChain UNTIL p = NIL DO
procToCall[p];
ENDLOOP;
ENDLOOP;
};
Lookup: PROC[dbPage: DBPage] RETURNS[CacheHandle] = {
-- Look the dbPage up in the hash table for the cache. Return the CacheHandle if
-- it is found, else return NIL.
i: CARDINAL ← Hash[dbPage];
p: CacheHandle ← ht.buckets[i];
DBStats.Inc[CacheHTLookup];
WHILE p # NIL DO
IF p.dbPage = dbPage THEN RETURN[p];
DBStats.Inc[CacheHTConflictInLookup];
p ← p.hashChain;
ENDLOOP;
IF dbPage=noteThisPage THEN SIGNAL TakeALookAtThis;
RETURN[NIL];
};
Insert: PROC[x: CacheHandle] = {
-- Add the CacheHandle x to the hash table for the cache. Generate InternalError if
-- a handle with the same dbPage is already there.
i: CARDINAL ← Hash[x.dbPage];
-- Make sure its not there already...
p: CacheHandle ← ht.buckets[i];
WHILE p # NIL DO
IF p.dbPage = x.dbPage THEN GOTO FoundDuplicate;
p ← p.hashChain;
REPEAT
FoundDuplicate => ERROR DBEnvironment.InternalError; -- [AlreadyInHashTable];
FINISHED => {
x.hashChain ← ht.buckets[i];
ht.buckets[i] ← x;
ht.nItems ← ht.nItems + 1;
x.occupied ← TRUE;
IF x.dbPage=noteThisPage THEN SIGNAL TakeALookAtThis;
};
ENDLOOP;
};
Delete: PROC[x: CacheHandle] = {
-- Delete the CacheHandle x from the hash table for the cache.
i: CARDINAL ← Hash[x.dbPage];
p: CacheHandle ← ht.buckets[i];
IF p = x THEN {ht.buckets[i] ← x.hashChain; GOTO Deleted};
WHILE p # NIL DO
IF p.hashChain = x THEN {p.hashChain ← x.hashChain; GOTO Deleted};
p ← p.hashChain;
REPEAT
FINISHED => ERROR DBEnvironment.InternalError; -- [NotPresentInHashTable];
ENDLOOP;
EXITS
Deleted => {
IF x.dbPage=noteThisPage THEN SIGNAL TakeALookAtThis;
x.dbPage ← invalidKey;
x.occupied ← FALSE;
x.hashChain ← NIL;
ht.nItems ← ht.nItems - 1;
};
};
-- Buffer manager debugging
AssertionFailed: SIGNAL = CODE;
Assert: PROC [assertion: BOOL] = {IF ~assertion THEN SIGNAL AssertionFailed; };
-- So we can Proceed (in debugger) from failure.
CheckState: PUBLIC PROC [doPrinting, printOnlyLockedPages: BOOL] = {
nOccupiedItems, checkOccupiedItems: CARDINAL ← 0;
nLockedItems: CARDINAL ← 0;
nFreeItems: CARDINAL ← 0;
PageHeader: TYPE = MACHINE DEPENDENT RECORD[
pageTag: CARDINAL [0..256),
dontCare: CARDINAL [0..256)];
IOref: PROC[p: REF ANY] RETURNS[IO.Value] = INLINE {
RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]};
IOptr: PROC[p: LONG POINTER] RETURNS[IO.Value] = INLINE {
RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]};
out: IO.STREAM ← IF doPrinting THEN DBCommon.GetDebugStream[] ELSE NIL;
CheckEntry1: PROC[p: CacheHandle] = {
Assert[p.pred.succ = p];
Assert[p.succ.pred = p];
Assert[p.pagePtr # NIL];
IF ~p.occupied THEN {
nFreeItems ← nFreeItems + 1;
Assert[p.dbPage = invalidKey];
RETURN };
Assert[p.dbPage # invalidKey];
nOccupiedItems ← nOccupiedItems + 1;
IF p.lockCount > 0 THEN nLockedItems ← nLockedItems + 1;
IF p.written THEN Assert[NOT p.readonly];
IF ~doPrinting OR (p.lockCount = 0 AND printOnlyLockedPages) THEN RETURN;
{pageTag: CARDINAL = LOOPHOLE[p.pagePtr, LONG POINTER TO PageHeader].pageTag;
out.PutF["%11bB %11bB %11bB %3d",
IOref[p], IO.card[p.dbPage], IOptr[p.pagePtr], IO.card[pageTag]];
out.PutF[" %3d %g %g*n",
IO.card[p.lockCount], IO.bool[p.written], IO.bool[p.readonly]];
};
};
CheckEntry2: PROC[p: CacheHandle] = {
Assert[p.occupied];
checkOccupiedItems ← checkOccupiedItems + 1;
};
IF doPrinting THEN {
out.PutF["Cache size %g pages.\n", IO.card[cacheSize]];
out.PutRope[
" cacheHdl dbPage pagePtr tag lockCnt written readonly\n"];
};
EnumerateLRUList[CheckEntry1];
EnumerateHashTable[CheckEntry2];
IF doPrinting THEN
out.PutF["%g pages occupied, %g are locked\n",
IO.card[nOccupiedItems], IO.card[nLockedItems]];
Assert[nOccupiedItems = checkOccupiedItems];
Assert[ht.nItems = checkOccupiedItems];
Assert[nFreeItems + nOccupiedItems = cacheSize];
};
END.--DBSegmentImpl
CHANGE LOG
Created by MBrown on December 10, 1979 5:09 PM
Changed by MBrown on December 1, 1982 9:41 am
-- Total rewrite for multiple segments, merging in DBCacheImpl. No more support for module
--finalization. Discarded >3 pages of change log.
Changed by Cattell on January 16, 1983 2:35 pm
-- Added some more meaningful signals for various client errors for segments and transactions,
--in conjunction with new DBEnvironment interface replacing DBException.
Changed by Cattell on January 17, 1983 8:21 pm
-- AttachSegment didn't put readOnly and version into fileNode when segment re-declared, so couldn't re-open transaction with version to create new file.
Changed by Cattell on January 19, 1983 10:57 am
-- The code didn't check for FindFile returning NIL in a couple places, resulting in address faults if the segment was not declared. Put in checks and signal generation.
Changed by Willie-Sue on February 3, 1983
-- Added noLog arg to OpenTransaction and OpenSegment
Changed by MBrown on February 25, 1983 1:39 pm
-- Fixed bug in ReadPage, WritePage, NewPage: buffer left in "occupied" state after
--error (e.g. attempt to read after transaction abort).
Changed by Cattell & Donahue on May 19, 1983 2:26 pm
-- Changed AttachSegment so can re-call it with new file name for same segment atom.
Changed by Cattell on July 11, 1983 3:46 pm
-- Pass readOnly bit through to DBFileAlpine. Temporarily removed check of readOnly bit in WritePageOut, we've already warned user, and this error keeps us from closing segment after getting WriteNotAllowed.
Changed by Cattell on September 14, 1983 11:43 am
-- Removed the DBSegmentParticularTable stuff, which I'm sure Mark added with good intentions. We don't need a red black tree package to keep track of at most half a dozen segments! This change reduces the memory requirements and complexity of this module considerably. The fancy compilation required to do this confuses the spelling corrector and MakeConfig, to boot.
Changed by wert on July 26, 1984 4:37:36 pm PDT
-- Hacked FinishTransaction[] so that it updates the cached version number
Changed by wert on August 10, 1984 6:06:05 pm PDT
-- Hacked OpenSegment[] so that it claims the file has changed when it had to initialize it