Last Edited by:
Donahue, July 2, 1986 2:08:13 pm PDT
MBrown on February 25, 1983 1:49 pm
Cattell on September 16, 1983 11:13 am
Willie-Sue on April 25, 1985 12:45:53 pm PST
Wert, August 10, 1984 6:08:10 pm PDT
Widom, September 12, 1985 4:54:35 pm PDT
DBSegmentImpl:
CEDAR
PROGRAM
IMPORTS Atom, Basics, ConvertUnsafe, DB, DBCommon, DBFile, DBStats, IO, Rope, VM
EXPORTS DBSegment
= BEGIN
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
Trans: TYPE = DBCommon.Transaction;
Segment: TYPE = DBCommon.Segment;
SegmentID: TYPE = DBCommon.SegmentID;
SegmentIndex: TYPE = DBCommon.SegmentIndex;
VersionOptions: TYPE = DBCommon.VersionOptions;
DBPage: TYPE = DBCommon.DBPage;
NullDBPage: DBPage = DBCommon.NullDBPage;
CacheHandle: TYPE = DBCommon.CacheHandle;
CacheRecord: TYPE = DBCommon.CacheRecord;
TID: TYPE = DBCommon.TID;
TakeALookAtThis: ERROR = CODE;
FileNode: TYPE = REF FileNodeRecord;
FileNodeRecord:
TYPE =
RECORD [
segmentID: DBCommon.DBPage,
segment: DBCommon.Segment ← NIL,
fileName: ROPE ← NIL,
openFile: DBCommon.OpenFileHandle ←
NIL,
if openFile = NIL, then the segment is not currently open
readonly: BOOL ← TRUE,
lock: AlpineEnvironment.LockOption ← [intendWrite, wait],
version: DBCommon.VersionOptions ← OldFileOnly,
pagesInitial, pagesPerExtent: NAT ← 0,
rest: FileNode ← NIL
];
HashTable: TYPE = RECORD [
nItems: NAT ← 0, -- number of CacheRecords in table
buckets: SEQUENCE size: NAT OF CacheHandle
];
Module state
initialized: BOOL ← FALSE;
createIndexesProc: PROC [s: Segment] RETURNS [ARRAY[0..DBCommon.systemIndexCount) OF TID];
logReads: BOOL ← FALSE; -- causes reads & writes to be logged on UserExec
noteThisPage: DBPage ← 0; -- causes signal when this page is read or written
files: FileNode ←
NIL;
List mapping SegmentID to interesting information about a file.
cacheSize:
NAT;
Number of pages allocated to cache.
ht:
REF HashTable;
Fast map from DBPage to CacheHandle. ht.size is a power of two.
hashMask:
CARDINAL;
Equals ht.size-1, used in computing hash function.
lruList: CacheHandle;
Points to header node of doubly-linked list of CacheRecords.
Data structure invariants: A cache record is present in lruList for every allocated cache page; there is no separate "free list". occupied=FALSE means that the page is free, and that the other cache record fields (readonly, written) are undefined. occupied=TRUE means that the page contains valid data; a page is stored in ht iff occupied=TRUE.
The lruList points to a cache record with no associated cache page; this "header node" makes the circular-list manipulations work properly. The most-recently used page is lruList.succ, and the least-recently used page is lruList.pred.
Initialize:
PUBLIC
PROC [
nCachePages: NAT,
useCacheFile: ROPE,
segmentInitProc: PROC [s: Segment] RETURNS [ARRAY[0..DBCommon.systemIndexCount) OF TID] ] = {
IF initialized THEN ERROR DBCommon.InternalError;
cacheSize ← nCachePages;
createIndexesProc ← segmentInitProc;
InitializeBufferMgr[];
initialized ← TRUE;
};
FindSegmentID:
PROC[segmentID: DBCommon.DBPage]
RETURNS[found: FileNode] = {
Just a linear search, we're not going to have a tremendous number of segments open.
FOR found ← files, found.rest
UNTIL found=
NIL
DO
IF found.segmentID=segmentID THEN RETURN ENDLOOP};
FindSegment:
PROC [s: Segment]
RETURNS [found: FileNode] = {
Same as above, but looks for atom match instead of segment ID match. If efficiency of this ever becomes important, we can attach info to prop list of atom.
FOR found ← files, found.rest
UNTIL found=
NIL
DO
IF found.segment=s THEN RETURN ENDLOOP };
AttachSegment:
PUBLIC
PROC [fileName:
ROPE, s: Segment, segmentIndex: SegmentIndex, lock: AlpineEnvironment.LockOption, readonly:
BOOL, version: VersionOptions, nPagesInitial, nPagesPerExtent:
NAT]
RETURNS[newAttachment:
BOOL] = {
segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex];
fileNode: FileNode = FindSegmentID[segmentID];
IF NOT initialized THEN ERROR;
IF fileNode #
NIL
THEN {
IF fileNode.segmentID # segmentID
OR fileNode.segment # s
THEN
ERROR DB.Error[MismatchedExistingSegment];
newAttachment ← NOT Rope.Equal[fileNode.fileName, fileName];
fileNode.fileName ← fileName;
fileNode.version ← version;
fileNode.lock ← lock;
fileNode.readonly ← readonly;
fileNode.pagesInitial ← MAX[1, nPagesInitial];
fileNode.pagesPerExtent ← MAX[1, nPagesPerExtent];
RETURN;
};
files ←
NEW[FileNodeRecord ← [
segmentID: segmentID,
segment: s,
lock: lock,
readonly: readonly,
version: version,
fileName: fileName,
pagesInitial: MAX[1, nPagesInitial],
pagesPerExtent: MAX[1, nPagesPerExtent],
rest: files
]];
newAttachment ← TRUE
};
OpenSegment:
PUBLIC
PROC [s: Segment, trans: Trans, eraseAfterOpening:
BOOL] = {
ENABLE UNWIND => { CloseSegment[s]; FlushCache[s] };
createdFile: BOOL;
fileNode: FileNode = FindSegment[s];
[fileNode.openFile, createdFile] ← DBFile.OpenFile[t: trans, file: fileNode.fileName, version: fileNode.version, discardFileContents: eraseAfterOpening, nPagesInitial: fileNode.pagesInitial, lock: fileNode.lock, readOnly: fileNode.readonly];
IF createdFile OR eraseAfterOpening THEN InitializeSegment[fileNode]
ELSE CheckSegment[fileNode] };
CloseSegment:
PUBLIC
PROC [s: Segment] ~ {
fileNode: FileNode = FindSegment[s];
IF fileNode # NIL AND fileNode.openFile # NIL THEN DBFile.Close[fileNode.openFile];
fileNode.openFile ← NIL };
GetSegmentInfo:
PUBLIC
PROC [s: Segment]
RETURNS [filePath:
ROPE, readOnly:
BOOL] = {
fileNode: FileNode = FindSegment[s];
IF fileNode=NIL THEN RETURN[NIL, FALSE]
ELSE RETURN[fileNode.fileName, fileNode.readonly]
};
GetVersionStamp:
PUBLIC
PROC [segment: Segment]
RETURNS[schemaVersion:
INT] =
TRUSTED {
Returns the current schema version stamp for the given segment.
fileNode: FileNode = FindSegment[segment];
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
schemaVersion ← segmentHead.schemaVersionStamp;
UnlockPage[segmentHeadHandle]
};
SetVersionStamp:
PUBLIC
PROC [segment: Segment, to:
INT] =
TRUSTED {
Called when the schema for the given segment is changed; updates the schema version stamp.
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileNode: FileNode = FindSegment[segment];
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
IF fileNode.readonly THEN ERROR DB.Error[WriteNotAllowed];
[segmentHeadHandle, segmentHead] ← WritePage[fileNode.segmentID, NIL];
segmentHead.schemaVersionStamp ← to;
UnlockPage[segmentHeadHandle] };
SegmentFromTID:
PUBLIC
PROC [tid:
TID]
RETURNS [s: Segment] = {
fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]];
IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL];
};
IsValidTID:
PUBLIC
PROC [tid:
TID]
RETURNS [valid:
BOOL] = {
fileNode: FileNode = FindSegmentID[InlineSegmentIDFromDBPage[tid]];
RETURN [fileNode # NIL AND fileNode.openFile # NIL];
};
SegmentIDFromSegment:
PUBLIC
PROC [s: Segment]
RETURNS [SegmentID] = {
fileNode: FileNode = FindSegment[s];
IF fileNode = NIL THEN ERROR DB.Error[SegmentNotDeclared];
RETURN [fileNode.segmentID];
};
EnumerateSegments:
PUBLIC
PROC [enumProc:
PROC [s: Segment, segmentIndex: SegmentIndex]
RETURNS [stop:
BOOL]] = {
FOR f: FileNode ← files, f.rest
UNTIL f=
NIL
DO
IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT;
ENDLOOP;
};
FilePageFromDBPage:
PROC [dbPage: DBPage]
RETURNS [readonly:
BOOL, file: DBCommon.OpenFileHandle, page:
CARDINAL] = {
segmentID: SegmentID;
fileNode: FileNode;
[segmentID, page] ← DecomposeDBPage[dbPage];
fileNode ← FindSegmentID[segmentID];
IF fileNode = NIL OR (file ← fileNode.openFile) = NIL THEN ERROR DB.Error[TransactionNotOpen];
RETURN [fileNode.readonly, file, page];
};
Procs that understand the format of the segment head page.
InitializeSegment:
PROC [fileNode: FileNode] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile];
fileNode.readonly ← FALSE;
[segmentHeadHandle, segmentHead] ← WritePage[fileNode.segmentID, NIL];
segmentHead^ ← [
segmentID: fileNode.segmentID,
allocator: [
firstPageInBlock: ConsDBPage[fileNode.segmentID, 1],
nPagesInBlock: fileSize-1,
pagesPerExtent: fileNode.pagesPerExtent],
softwareCompatibilityVersion: DBCommon.softwareCompatibilityVersion,
schemaVersionStamp: 1];
TRUSTED {
ConvertUnsafe.AppendRope[
to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]];
};
segmentHead.systemIndices ← createIndexesProc[fileNode.segment];
UnlockPage[segmentHeadHandle];
};
CheckSegment:
PROC [fileNode: FileNode] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
If the segment is not readonly then we need to be able to write the head page (its schema version stamp and software compatibility version).
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePage.AMap
OR
segmentHead.seal # DBSegmentPrivate.Seal
THEN
ERROR DB.Error[MismatchedExistingSegment];
IF segmentHead.softwareCompatibilityVersion # DBCommon.softwareCompatibilityVersion THEN ERROR DB.Error[MismatchedExistingSegment];
TRUSTED {
IF
NOT Rope.Equal[
s1: Atom.GetPName[fileNode.segment],
s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]],
case: FALSE] THEN ERROR DB.Error[MismatchedExistingSegment];
};
UnlockPage[segmentHeadHandle];
};
RootIndicesFromSegment:
PUBLIC
PROC [s: Segment]
RETURNS [indices:
ARRAY[0..DBCommon.systemIndexCount)
OF
TID] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
fileNode: FileNode = FindSegment[s];
IF fileNode=NIL THEN ERROR DB.Error[SegmentNotDeclared];
[segmentHeadHandle, segmentHead] ← ReadPage[fileNode.segmentID, NIL];
IF segmentHead.tag # DBStoragePage.AMap
THEN
ERROR DB.Error[MismatchedExistingSegment];
indices ← segmentHead.systemIndices;
UnlockPage[segmentHeadHandle];
RETURN [indices];
};
AllocPage:
PUBLIC
PROC [s: SegmentID]
RETURNS [p: DBPage, pValidHint: CacheHandle, cachePage:
LONG
POINTER] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
DBStats.Inc[SegmentAllocPage];
[segmentHeadHandle, segmentHead] ← WritePage[s, NIL];
IF segmentHead.tag # DBStoragePage.AMap
THEN
ERROR DB.Error[MismatchedExistingSegment];
IF (p ← segmentHead.allocator.freeList) # NullDBPage
THEN {
The segment's free list is nonempty, so return a page from it.
allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage;
[pValidHint, allocatedFreeListPagePtr] ← WritePage[p, NIL];
IF allocatedFreeListPagePtr.tag # DBStoragePage.Free THEN ERROR DBCommon.InternalError;
segmentHead.allocator.freeList ← allocatedFreeListPagePtr.next;
cachePage ← allocatedFreeListPagePtr;
}
ELSE {
The segment's free list is empty, so allocate from block allocator, perhaps extending file.
IF segmentHead.allocator.firstPageInBlock = NullDBPage
THEN {
fileNode: FileNode ← FindSegmentID[s];
size: CARDINAL ← DBFile.GetSize[fileNode.openFile];
DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent];
segmentHead.allocator.firstPageInBlock ← ConsDBPage[s, size];
segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.pagesPerExtent;
};
p ← segmentHead.allocator.firstPageInBlock;
IF (segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.nPagesInBlock-1) = 0
THEN
segmentHead.allocator.firstPageInBlock ← NullDBPage
ELSE segmentHead.allocator.firstPageInBlock ←
p + DBCommon.DBPageIncrement;
[pValidHint, cachePage] ← NewPage[p];
};
UnlockPage[segmentHeadHandle];
TRUSTED {
LOOPHOLE[cachePage,
LONG
POINTER
TO DBStoragePage.PageHeader].pageTag ←
DBStoragePage.Unused;
};
RETURN[p, pValidHint, cachePage];
};--AllocPage
FreePage:
PUBLIC
PROC [s: SegmentID, freePg: DBPage, freeHint: CacheHandle] =
TRUSTED {
segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
segmentHeadHandle: CacheHandle;
free: LONG POINTER TO DBSegmentPrivate.FreeListPage;
DBStats.Inc[SegmentFreePage];
IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBCommon.InternalError;
[segmentHeadHandle, segmentHead] ← WritePage[s, NIL];
IF segmentHead.tag # DBStoragePage.AMap THEN ERROR DBCommon.InternalError;
IF LockCount[freePg, freeHint] # 1 THEN ERROR DBCommon.InternalError;
free ← ObtainCoreLoc[freePg, freeHint];
IF free.tag = DBStoragePage.Free
OR free.tag = DBStoragePage.AMap
THEN
ERROR DBCommon.InternalError;
WriteLockedPage[freeHint];
free^ ← [next: segmentHead.allocator.freeList];
segmentHead.allocator.freeList ← freePg;
UnlockPage[freeHint];
UnlockPage[segmentHeadHandle];
};--FreePage
Buffer manager
invalidKey: DBPage = 37777777777B;
doesn't match any true DBPage (since all DBPages have low bits 0).
InitializeBufferMgr:
PROC [] = {
Number of hash buckets = smallest power of 2 not less than cacheSize.
hashBuckets: CARDINAL ← 1;
temp: CARDINAL ← Basics.BITSHIFT[cacheSize, -1];
UNTIL temp=0
DO
hashBuckets ← Basics.BITSHIFT[hashBuckets, 1];
temp ← Basics.BITSHIFT[temp, -1];
ENDLOOP;
ht ← NEW[HashTable[hashBuckets] ← [nItems: 0, buckets: ]];
hashMask ← hashBuckets-1;
Create cacheRecords, format them into an lru list.
lruList ← NEW[CacheRecord ← [dbPage: invalidKey, pagePtr: NIL]];
lruList.pred ← lruList.succ ← lruList;
FOR i:
NAT
IN [0 .. (cacheSize+15)/16)
DO
AddCachePages[16];
ENDLOOP;
};
ReadPage:
PUBLIC
PROC [p: DBPage, pHint: CacheHandle]
RETURNS [CacheHandle,
LONG
POINTER] = {
file: DBCommon.OpenFileHandle; page: CARDINAL;
IF logReads
THEN
DBCommon.GetDebugStream[].PutChar['#];
DBStats.Inc[CacheReadOrWrite];
IF p=noteThisPage THEN ERROR TakeALookAtThis;
IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage;
hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GO TO ReturnPage;
page is not present in cache, read it in
IF logReads THEN DBCommon.GetDebugStream[].PutChar['!];
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{
-- We expect ERRORs to be raised from DBFile.ReadFilePage
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
RETURN[pHint, pHint.pagePtr]
}
};
WritePage:
PUBLIC
PROC [p: DBPage, pHint: CacheHandle]
RETURNS [CacheHandle,
LONG
POINTER] = {
DBStats.Inc[CacheReadOrWrite];
IF p=noteThisPage THEN ERROR TakeALookAtThis;
IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage;
hint is wrong, so search hash table
pHint ← Lookup[p];
IF pHint # NIL THEN GOTO ReturnPage;
page is not present in cache, read it in
DBStats.Inc[CacheMiss];
pHint ← GetUnoccupiedPage[];
{
-- We expect ERRORs to be raised from DBFile.ReadFilePage
file: DBCommon.OpenFileHandle; page: CARDINAL;
[pHint.readonly, file, page] ← FilePageFromDBPage[p];
IF pHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
DBFile.ReadFilePage[file, page, pHint.pagePtr]
};
pHint.dbPage ← p; Insert[pHint];
pHint.lockCount ← 0;
GOTO ReturnPage;
EXITS
ReturnPage => {
MakeMRU[pHint];
pHint.lockCount ← pHint.lockCount + 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
}
};
NewPage:
PROC [p: DBPage]
RETURNS [CacheHandle,
LONG
POINTER] = {
segmentID: SegmentID;
fileNode: FileNode;
pHint: CacheHandle ← Lookup[p];
IF pHint # NIL THEN ERROR DBCommon.InternalError; -- [NotaNewPage]
pHint ← GetUnoccupiedPage[];
{
[segmentID, ] ← DecomposeDBPage[p];
fileNode ← FindSegmentID[segmentID];
IF fileNode = NIL OR fileNode.openFile = NIL THEN ERROR DB.Error[TransactionNotOpen];
pHint.readonly ← fileNode.readonly;
IF pHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
};
pHint.dbPage ← p; Insert[pHint];
MakeMRU[pHint];
pHint.lockCount ← 1;
pHint.written ← TRUE;
RETURN[pHint, pHint.pagePtr]
};
WriteLockedPage:
PUBLIC
PROC [pValidHint: CacheHandle] = {
IF pValidHint =
NIL
OR pValidHint.lockCount = 0
THEN
ERROR DBCommon.InternalError; -- [AlreadyUnlocked]
IF pValidHint.readonly THEN ERROR DB.Error[WriteNotAllowed];
pValidHint.written ← TRUE;
};
LockCount:
PUBLIC
PROC [p: DBPage, pValidHint: CacheHandle]
RETURNS [
CARDINAL] = {
IF pValidHint =
NIL
OR pValidHint.dbPage # p
OR pValidHint.lockCount = 0
THEN
ERROR DBCommon.InternalError; -- BadCacheHandle
RETURN[pValidHint.lockCount];
};
ObtainCoreLoc:
PUBLIC
PROC [p: DBPage, pValidHint: CacheHandle]
RETURNS [
LONG
POINTER] = {
IF pValidHint =
NIL
OR pValidHint.dbPage # p
OR pValidHint.lockCount = 0
THEN
ERROR DBCommon.InternalError; -- [BadCacheHandle];
RETURN[pValidHint.pagePtr];
};
UnlockPage:
PUBLIC
PROC [pValidHint: CacheHandle] = {
IF pValidHint =
NIL
OR pValidHint.lockCount = 0
THEN
ERROR DBCommon.InternalError; -- [AlreadyUnlocked];
pValidHint.lockCount ← pValidHint.lockCount - 1;
};
WriteOutCache:
PUBLIC PROC [s: Segment] = {
segmentID: SegmentID = SegmentIDFromSegment[s];
WritePageOutIfUsesTrans:
PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.written
THEN {
thisSegment: SegmentID = InlineSegmentIDFromDBPage[cacheHandle.dbPage];
IF segmentID = thisSegment THEN WritePageOut[cacheHandle];
};
};
EnumerateHashTable[WritePageOutIfUsesTrans];
};
FlushCache:
PUBLIC
PROC [s: Segment] = {
segmentID: SegmentID = SegmentIDFromSegment[s];
DeleteIfUsesTrans:
PROC [cacheHandle: CacheHandle] = {
IF cacheHandle.occupied
THEN {
thisSegment: SegmentID = InlineSegmentIDFromDBPage[cacheHandle.dbPage];
IF segmentID = thisSegment THEN Delete[cacheHandle];
};
};
EnumerateLRUList[DeleteIfUsesTrans]
};
WritePageOut:
PROC [cacheHandle: CacheHandle] = {
Send the contents of a dirty cache page to the database segment, and mark the page no longer dirty (leaving it in the cache).
readonly: BOOL; file: DBCommon.OpenFileHandle; page: CARDINAL;
DBStats.Inc[CacheWritePageBack];
[readonly, file, page] ← FilePageFromDBPage[cacheHandle.dbPage];
IF readonly THEN ERROR DB.Error[WriteNotAllowed];
DBFile.WriteFilePage[file, page, cacheHandle.pagePtr];
cacheHandle.written ← FALSE;
};