BTreeRead.mesa
Copyright © 1985, 1986 by Xerox Corporation. All rights reserved.
This module contains everything you need to open and read BTrees.
Taft, November 25, 1983 3:02 pm
Russ Atkinson (RRA) April 28, 1986 1:00:26 pm PDT
Bob Hagmann April 30, 1985 7:14:30 am PDT
Loose ends:
1. Validate's repair of the entryCount is slightly flakey since the tree is unlocked between the EnumerateEntries that counts the entries and the setting of the entryCount.
DIRECTORY
BTree USING [Compare, CompareEntries, Comparison, Entry, EntrySize, Key, NewRecordFromEntry, PageNumber, PagePtr, PageSize, PageStorage, PageStoragePrimitives, Reason, Record, ReferencePage, ReferenceType, Relation, ReleasePage, RepresentationPrimitives, State, statePage, UpdateState],
BTreeInternal USING [AssignRefESR, BackUpOneEntry, BTreeEntrySize, BTreePagePtr, BugType, Compare, CompareEntries, entry0Offset, entry1Offset, EntryOrdinal, entryOverhead, EntrySize, EntryTable, GetDefaultPathStk, Heap, Lock, LockMode, NewRecordFromEntry, nilOffset, nilPage, OffsetTable, OffsetTableObject, PageOffset, pageOverhead, PathEntryLE, PathStk, PathStkEntry, PathStkIndex, PathStkObject, PathToMaxDescendant, ReferencePage, ReferenceStack, ReleasePage, RepairOffsets, reservedWordsPerPage, ReturnDefaultPathStk, sealValue, Tree, TreeObject, TreeState, Unlock, WriteStatePage],
File USING [Error, wordsPerPage],
PrincOpsUtils USING [BITSHIFT, LongCopy, LongZero];
BTreeRead: MONITOR
LOCKS tree USING tree: Tree
IMPORTS BTreeInternal, File, PrincOpsUtils
EXPORTS BTree, BTreeInternal
= { OPEN BTree, BTreeInternal;
CARD: TYPE = LONG CARDINAL;
BTree.
Tree: TYPE = REF TreeObject;
TreeObject: PUBLIC TYPE = BTreeInternal.TreeObject;
PathStk: TYPE = REF PathStkObject;
PathStkObject: PUBLIC TYPE = BTreeInternal.PathStkObject;
New: PUBLIC SAFE PROC [repPrim: RepresentationPrimitives, storPrim: PageStoragePrimitives, minEntrySize: CARDINAL ← 1, initialState: State[closed..suspended] ← closed] RETURNS [tree: Tree] = CHECKED {
tree ← NEW [TreeObject ← [state: [], repPrim: repPrim, storPrim: storPrim, minEntrySize: minEntrySize, objectState: initialState]];
};
Open: PUBLIC SAFE PROC [tree: Tree, storage: PageStorage, pageSize: PageSize ← File.wordsPerPage, initialize: BOOLFALSE, maintainRecomputableState: BOOLTRUE] = TRUSTED {
DO
first ensure that it's not already open
tree.Lock[update, FALSE];
IF tree.objectState#open THEN EXIT;
tree.Unlock[];
SetState[tree, closed];
ENDLOOP;
tree.state ← [pageSize: pageSize];
tree.maxFreeWords ← pageSize-pageOverhead;
tree.maxRecordsPerPage ← tree.maxFreeWords/(tree.minEntrySize+entryOverhead);
tree.awfullyFull ← (9*tree.maxFreeWords)/10;
tree.prettyFull ← (2*tree.maxFreeWords)/3;
tree.fairlyFull ← tree.maxFreeWords/2;
tree.breathingSpace ← tree.maxFreeWords-tree.awfullyFull;
tree.storage ← storage;
tree.maintainRecomputableState ← maintainRecomputableState;
tree.id ← (treeID ← treeID+1); -- this doesn't assign a reliably unique ID, since treeID is not protected by any monitor; but it doesn't really matter since the ID is merely protecting against client blunders anyway.
{
ENABLE UNWIND => tree.Unlock[];
IF initialize
THEN {
statePtr: LONG POINTER TO TreeState ← tree.ReferencePage[statePage, new];
PrincOpsUtils.LongZero[statePtr, pageSize];
IF ~maintainRecomputableState THEN tree.state.entryCount ← LAST[CARD];
statePtr^ ← tree.state;
tree.ReleasePage[statePage, endOfUpdate];
}
ELSE {
statePtr: LONG POINTER TO TreeState ← tree.ReferencePage[statePage, read];
IF statePtr.seal#sealValue THEN ERROR Error[badSeal];
IF statePtr.pageSize#pageSize THEN ERROR Error[wrongPageSize];
tree.state ← statePtr^;
tree.ReleasePage[statePage];
IF tree.state.updateInProgress THEN SIGNAL UpdateInProgress;
};
tree.state.updateInProgress ← FALSE;
};
tree.objectState ← open;
tree.Unlock[];
};
SetState: PUBLIC SAFE PROC [tree: Tree, state: State[closed..suspended]] = TRUSTED {
DO
WaitUnlocked: ENTRY PROC [tree: Tree] = INLINE {
IF tree.longUpdate THEN WAIT tree.unlocked};
tree.Lock[update, FALSE];
IF tree.objectState#open OR ~tree.longUpdate THEN EXIT;
tree.Unlock[];
WaitUnlocked[tree];
this actually wakes up every time the tree is unlocked, including the call to Unlock from SetUpdateInProgress
ENDLOOP;
Tree is now locked in update mode with no update in progress
tree.objectState ← state;
tree.storage ← NIL;
tree.offsetTable ← NIL;
tree.defaultPathStk ← NIL;
tree.entryTable ← NIL;
tree.heap ← NIL;
tree.Unlock[];
};
GetState: PUBLIC ENTRY SAFE PROC [tree: Tree] RETURNS [state: State, entryCount: CARD, greatestPage: PageNumber, depth: CARDINAL, storage: PageStorage] = CHECKED {
RETURN [state: tree.objectState, entryCount: tree.state.entryCount, greatestPage: tree.state.greatestPage, depth: tree.state.depth, storage: tree.storage];
};
Validate: PUBLIC SAFE PROC [tree: Tree] = TRUSTED {
CheckPair: PROC [entry: Entry] RETURNS [continue: BOOL] = {
size: CARDINAL = tree.EntrySize[entry];
IF size+entryOverhead > tree.maxFreeWords THEN ERROR Error[entrySizesWrong];
entryCount ← entryCount+1;
IF firstEntry
THEN firstEntry ← FALSE
ELSE IF tree.CompareEntries[prevEntry, entry]#less THEN
ERROR Error[entriesOutOfOrder];
PrincOpsUtils.LongCopy[to: prevEntry, from: entry, nwords: size];
RETURN [TRUE];
};
PrevRecord: TYPE = RECORD [entry: SEQUENCE maxLength: [0..LAST[PageSize]] OF WORD];
prevRecord: REF PrevRecord = NEW[PrevRecord[tree.state.pageSize-reservedWordsPerPage]];
prevEntry: Entry = BASE[DESCRIPTOR[prevRecord.entry]];
entryCount: CARD ← 0;
firstEntry: BOOLTRUE;
[] ← EnumerateEntries[tree: tree, key: NIL, Proc: CheckPair];
IF tree.maintainRecomputableState AND entryCount#tree.state.entryCount THEN {
tree.Lock[update];
tree.state.entryCount ← entryCount;
tree.WriteStatePage[endOfUpdate];
tree.Unlock[];
};
};
ReadRecord: PUBLIC SAFE PROC [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ← NIL, useExistingPath: BOOLFALSE] RETURNS [record: Record] = TRUSTED {
EntryProc: PROC [entry: Entry] = {
record ← IF entry#NIL THEN tree.NewRecordFromEntry[entry] ELSE NIL;
};
ReadEntry[tree: tree, key: key, relation: relation, pathStk: pathStk, useExistingPath: useExistingPath, Proc: EntryProc];
};
EnumerateRecords: PUBLIC SAFE PROC [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ← NIL, useExistingPath: BOOLFALSE, Proc: PROC [record: Record] RETURNS [continue: BOOL]] RETURNS [exhausted: BOOL] = TRUSTED {
EntryProc: PROC [entry: Entry] RETURNS [continue: BOOL] = {
RETURN [Proc[tree.NewRecordFromEntry[entry]]];
};
exhausted ← EnumerateEntries[tree: tree, key: key, pathStk: pathStk, useExistingPath: useExistingPath, Proc: EntryProc];
};
ReadEntry: PUBLIC PROC [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ← NIL, useExistingPath: BOOLFALSE, Proc: UNSAFE PROC [entry: Entry]] = {
SELECT relation FROM
less, lessEqual, equal => {
pathStkWasNil: BOOL ← pathStk=NIL;
tree.Lock[read];
IF pathStkWasNil THEN {
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ← tree.GetDefaultPathStk[];
};
{
Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
ENABLE UNWIND => {
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
};
equal: BOOL ← tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath].equal;
IF equal AND relation=less THEN [] ← PathToPreviousEntry[tree, pathStk];
IF pathStk.top=0 OR (~equal AND relation=equal)
THEN Proc[NIL]
ELSE {
pagePtr: BTreePagePtr;
pse: LONG POINTER TO PathStkEntry;
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
Proc[@pagePtr[pse.lastOffset].entry !
UNWIND => tree.ReleasePage[pse.pageNumber]];
tree.ReleasePage[pse.pageNumber];
};
};
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
};
greaterEqual, greater => {
CallWhenRelationSatisfied: PROC [entry: Entry] RETURNS [continue: BOOL] = {
IF key=NIL OR tree.Compare[key, entry] <= maxComparison
THEN {Proc[entry]; RETURN [FALSE]}
ELSE RETURN [TRUE];
};
maxComparison: Comparison ← IF relation=greaterEqual THEN equal ELSE less;
exhausted: BOOL ← EnumerateEntries[tree: tree, key: key, pathStk: pathStk, useExistingPath: useExistingPath, Proc: CallWhenRelationSatisfied];
IF exhausted THEN Proc[NIL];
};
ENDCASE => ERROR;
};
EnumerateEntries: PUBLIC PROC [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ← NIL, useExistingPath: BOOLFALSE, Proc: UNSAFE PROC [entry: Entry] RETURNS [continue: BOOL]] RETURNS [exhausted: BOOL] = {
pathStkWasNil: BOOL ← pathStk = NIL;
tree.Lock[read];
IF pathStkWasNil THEN {
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ← tree.GetDefaultPathStk[];
};
{
Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
ENABLE UNWIND => {
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
};
leafStkTop: PathStkIndex;
pse: LONG POINTER TO PathStkEntry;
pagePtr: BTreePagePtr ← NIL;
skipFirstEntry: BOOLFALSE;
equal: BOOL;
[equal: equal, depth: leafStkTop] ← tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath];
IF pathStk.top=0
THEN {
key is less than any existing entry in the tree. Set PathStk to first record in leftmost leaf page.
IF leafStkTop=0 THEN GOTO exhausted; -- empty tree
pathStk.top ← leafStkTop;
pathStk.path[leafStkTop].lastOffset ← entry1Offset;
}
ELSE
SELECT relation FROM
less =>
IF equal AND ~PathToPreviousEntry[tree, pathStk] THEN {
pathStk.top ← leafStkTop;
pathStk.path[leafStkTop].lastOffset ← entry1Offset;
};
lessEqual =>
NULL;
equal, greaterEqual =>
skipFirstEntry ← ~equal;
greater =>
skipFirstEntry ← TRUE;
ENDCASE;
exhausted ← FALSE;
pse ← @pathStk.path[pathStk.top];
DO
offset: PageOffset ← pse.lastOffset;
endOffset: PageOffset;
IF pagePtr=NIL THEN {
pagePtr ← tree.ReferencePage[pse.pageNumber];
endOffset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
};
IF offset>=endOffset
THEN {
ran off end of page. Pop up a level and do the next entry in the father page
tree.ReleasePage[pse.pageNumber];
IF offset>endOffset THEN ERROR Error[entrySizesWrong];
pagePtr ← NIL;
IF (pathStk.top ← pathStk.top-1) = 0 THEN {
ran off end of tree. PathStk is no longer valid.
pathStk.version ← LAST[CARD];
GOTO exhausted;
};
pse ← pse - SIZE[PathStkEntry];
}
ELSE {
enumerate this entry
nxtPgNo: PageNumber ← pagePtr[offset].grPage;
pse.offset ← offset + tree.BTreeEntrySize[@pagePtr[offset]];
IF skipFirstEntry THEN skipFirstEntry ← FALSE
ELSE IF ~Proc[@pagePtr[offset].entry !
UNWIND => tree.ReleasePage[pse.pageNumber]] THEN EXIT;
WHILE nxtPgNo#nilPage DO
In a non-leaf page, find the leftmost right descendant of the entry we just did.
tree.ReleasePage[pse.pageNumber];
pathStk.top ← pathStk.top+1;
pse ← pse + SIZE[PathStkEntry];
pagePtr ← tree.ReferencePage[nxtPgNo];
pse.pageNumber ← nxtPgNo;
pse.nextToLastOffset ← nilOffset;
pse.lastOffset ← entry0Offset;
pse.offset ← entry1Offset;
endOffset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
nxtPgNo ← pagePtr.minPage;
ENDLOOP;
};
pse.nextToLastOffset ← pse.lastOffset;
pse.lastOffset ← pse.offset;
ENDLOOP;
IF pagePtr#NIL THEN tree.ReleasePage[pse.pageNumber];
EXITS
exhausted => exhausted ← TRUE;
};
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
};
SalvageEntries: PUBLIC PROC [tree: Tree, Proc: UNSAFE PROC [entry: Entry] RETURNS [continue: BOOL]] RETURNS [exhausted: BOOL] = {
tree.Lock[read];
FOR page: PageNumber IN [statePage+1 .. tree.state.greatestPage] DO
ENABLE UNWIND => tree.Unlock[];
pagePtr: BTreePagePtr;
pagePtr ← tree.ReferencePage[page ! File.Error => LOOP];
IF pagePtr.freeWords IN [0 .. CARDINAL[tree.state.pageSize-pageOverhead]] THEN {
offset: PageOffset ← entry1Offset;
endOffset: PageOffset = nilOffset+(tree.state.pageSize-pagePtr.freeWords);
WHILE offset<endOffset DO
entSize: CARDINAL = tree.BTreeEntrySize[@pagePtr[offset]];
IF entSize<entryOverhead+tree.minEntrySize OR offset+entSize>endOffset THEN EXIT;
IF ~Proc[@pagePtr[offset].entry] THEN GOTO aborted;
offset ← offset+entSize;
ENDLOOP;
};
tree.ReleasePage[page];
REPEAT
aborted => exhausted ← FALSE;
FINISHED => exhausted ← TRUE;
ENDLOOP;
tree.Unlock[];
};
NewPathStk: PUBLIC SAFE PROC RETURNS [pathStk: PathStk] = CHECKED {
pathStk ← NEW[PathStkObject];
pathStk.version ← LAST[CARD]; -- not valid
};
BTreeInternal.
PathEntryLE: PUBLIC PROC [tree: Tree, key: Key, pathStk: PathStk, useExistingPath: BOOLFALSE] RETURNS [equal: BOOL, depth: PathStkIndex] = {
BinSearchPage: PROC RETURNS [firstG: EntryOrdinal] = {
Returns an index r such that offsetTable[r] is the offset of the first entry strictly greater than key, and fills in offsetTable[2] through offsetTable[r] with the offsets of all entries up to and including that one.
firstBadOffset: PageOffset = nilOffset + (tree.state.pageSize - pagePtr.freeWords);
midOffset: PageOffset ← LOOPHOLE[PrincOpsUtils.BITSHIFT[LOOPHOLE[firstBadOffset], -1]];
curOffset: PageOffset ← entry1Offset;
l, r: EntryOrdinal ← 2;
Scan records linearly and build offsetTable until half the in-use portion of the BTree page has been examined. Then do one key comparison to detemine which half of the page to direct our attention to, and build the rest of offsetTable only if necessary. On the average this saves 25% of the work of building offsetTables, and costs no extra comparison since the first comparison would examine a middle record anyway (not necessarily the same one).
IF key#NIL THEN DO
IF curOffset>=midOffset THEN {
IF curOffset>=firstBadOffset OR tree.Compare[key, @pagePtr[curOffset].entry] = less THEN EXIT;
midOffset ← firstBadOffset;
l ← r;
};
curOffset ← curOffset + tree.BTreeEntrySize[@pagePtr[curOffset]];
r ← r+1;
offsetTable[r] ← curOffset;
ENDLOOP;
IF curOffset>firstBadOffset THEN ERROR Error[entrySizesWrong];
At this point, offsetTable[r] is the offset of the first non-entry, which we assume to have an infinite key (which we shall never test). We shall leave r pointing at the offset of the first entry strictly greater than key.
WHILE l<r DO
m: EntryOrdinal ← PrincOpsUtils.BITSHIFT[l+r, -1];
SELECT tree.Compare[key, @pagePtr[offsetTable[m]].entry] FROM
less => r ← m;
equal => {equal ← TRUE; RETURN [m+1]};
greater => l ← m+1;
ENDCASE;
ENDLOOP;
RETURN [r];
};
pse: LONG POINTER TO PathStkEntry;
offsetTable: OffsetTable;
curPage: PageNumber;
pagePtr: BTreePagePtr;
equal ← FALSE;
IF useExistingPath AND pathStk.treeID=tree.id AND pathStk.version=tree.version AND key#NIL THEN {
supplied PathStk is still valid. See if it is for the correct key.
depth ← pathStk.top;
IF depth=0 THEN GOTO dontUse; -- who knows whether this is the min key or not?
WHILE pathStk.path[depth].leastSon # nilPage DO depth ← depth+1; ENDLOOP;
[pse: pse, ptr: pagePtr] ← tree.ReferenceStack[pathStk];
SELECT tree.Compare[key, @pagePtr[pse.lastOffset].entry] FROM
less => useExistingPath ← FALSE; -- wrong key
equal => equal ← TRUE; -- correct key
greater => {
IF depth=pathStk.top
AND pse.offset < nilOffset+(tree.state.pageSize-pagePtr.freeWords)
AND tree.Compare[key, @pagePtr[pse.offset].entry] = less
THEN NULL -- key falls between this entry and next; ok
ELSE useExistingPath ← FALSE; -- wrong key, or too hard to decide
};
ENDCASE;
tree.ReleasePage[pse.pageNumber];
IF useExistingPath THEN RETURN;
EXITS dontUse => NULL;
};
Perform the normal BTree lookup
pathStk.top ← 0;
pathStk.path[0] ← [];
offsetTable ← GetOffsetTable[tree];
offsetTable[0] ← nilOffset;
offsetTable[1] ← entry0Offset;
offsetTable[2] ← entry1Offset;
depth ← 0;
pse ← @pathStk.path[0];
curPage ← tree.state.rootPage;
WHILE curPage#nilPage DO
firstG: EntryOrdinal;
pagePtr ← tree.ReferencePage[curPage];
depth ← depth+1;
pse ← pse+SIZE[PathStkEntry];
pse.pageNumber ← curPage;
firstG ← BinSearchPage[ !
UNWIND => {tree.ReleasePage[curPage]; ReturnOffsetTable[tree, offsetTable]}];
pse.offset ← offsetTable[firstG];
pse.lastOffset ← offsetTable[firstG-1];
pse.nextToLastOffset ← offsetTable[firstG-2];
pse.leastSon ← pagePtr.minPage;
IF firstG>2 THEN pathStk.top ← depth; -- real entry in this page
AssignRefESR[@pse.eslFront, NIL];
AssignRefESR[@pse.eslRear, NIL];
curPage ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
ENDLOOP;
pathStk.treeID ← tree.id;
pathStk.version ← tree.version;
ReturnOffsetTable[tree, offsetTable];
};
ReferenceStack: PUBLIC PROC [tree: Tree, pathStk: PathStk, type: ReferenceType ← read] RETURNS [pse: LONG POINTER TO PathStkEntry, ptr: PagePtr] = {
pse ← @pathStk.path[pathStk.top];
ptr ← tree.ReferencePage[pse.pageNumber, type];
};
BackUpOneEntry: PUBLIC PROC [tree: Tree, pse: LONG POINTER TO PathStkEntry] = {
tree.RepairOffsets[pse];
pse.offset ← pse.lastOffset;
tree.RepairOffsets[pse];
};
RepairOffsets: PUBLIC PROC [tree: Tree, pse: LONG POINTER TO PathStkEntry] = {
CheckProgression: PROC [offset1, offset2: PageOffset] RETURNS [ok: BOOL] = {
RETURN [SELECT TRUE FROM
offset2=entry0Offset => offset1=nilOffset,
offset2=entry1Offset => offset1=entry0Offset,
offset1>=offset2 => FALSE,
ENDCASE => offset1+tree.BTreeEntrySize[@pagePtr[offset1]] = offset2];
};
pagePtr: BTreePagePtr = tree.ReferencePage[pse.pageNumber];
IF ~(CheckProgression[pse.lastOffset, pse.offset] AND CheckProgression[pse.nextToLastOffset, pse.lastOffset]) THEN {
newOffset: PageOffset ← entry1Offset;
lastOffset: PageOffset ← entry0Offset;
nextToLastOffset: PageOffset ← nilOffset;
WHILE newOffset#pse.offset DO
nextToLastOffset ← lastOffset;
lastOffset ← newOffset;
newOffset ← newOffset+tree.BTreeEntrySize[@pagePtr[newOffset]];
ENDLOOP;
pse.lastOffset ← lastOffset;
pse.nextToLastOffset ← nextToLastOffset;
};
tree.ReleasePage[pse.pageNumber];
};
PathToMaxDescendant: PUBLIC PROC [tree: Tree, pathStk: PathStk, page: PageNumber] = {
pse: LONG POINTER TO PathStkEntry;
pagePtr: BTreePagePtr;
WHILE page#nilPage DO
pathStk.top ← pathStk.top+1;
pse ← @pathStk.path[pathStk.top];
pse.pageNumber ← page;
pagePtr ← tree.ReferencePage[page];
pse.offset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
pse.lastOffset ← entry1Offset;
pse.nextToLastOffset ← entry0Offset;
tree.RepairOffsets[pse];
AssignRefESR[@pse.eslFront, NIL];
AssignRefESR[@pse.eslRear, NIL];
pse.leastSon ← nilPage;
page ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
ENDLOOP;
};
WriteStatePage: PUBLIC PROC [tree: Tree, update: UpdateState ← unchanged] = {
statePtr: LONG POINTER TO TreeState = tree.ReferencePage[statePage, write];
statePtr^ ← tree.state;
tree.ReleasePage[statePage, update];
};
Lock: PUBLIC ENTRY PROC [tree: Tree, mode: LockMode, checkState: BOOLTRUE] = {
DO
SELECT tree.objectState FROM
closed => IF checkState THEN RETURN WITH ERROR Error[closed];
suspended => IF checkState THEN {WAIT tree.unlocked; LOOP};
open => NULL
ENDCASE;
SELECT mode FROM
read =>
IF tree.lockCount>=0 THEN {tree.lockCount ← tree.lockCount+1; EXIT};
update =>
IF tree.lockCount=0 THEN {tree.lockCount ← -1; EXIT};
ENDCASE;
WAIT tree.unlocked; -- wait for tree to be unlocked
ENDLOOP;
};
Unlock: PUBLIC ENTRY PROC [tree: Tree] = {
IF tree.lockCount=0 THEN ERROR Bug[treeNotLocked];
tree.lockCount ← MAX[tree.lockCount-1, 0];
IF tree.lockCount=0 THEN BROADCAST tree.unlocked; -- BROADCAST, not NOTIFY, because releasing a write lock might make multiple readers eligible to enter simultaneously.
};
GetHeapAndTable: PUBLIC ENTRY PROC [tree: Tree, pathStk: PathStk] = {
pathStk.heap ← tree.heap;
tree.heap ← NIL;
pathStk.entryTable ← tree.entryTable;
tree.entryTable ← NIL;
IF pathStk.heap#NIL THEN heapsReused ← heapsReused+1
ELSE {
pathStk.heap ← NEW[Heap[tree.maxRecordsPerPage+1]]; -- heap holds at most one page's worth of entries at any given time
pathStk.entryTable ← NEW[EntryTable[3*tree.maxRecordsPerPage+1]]; -- may have to index scroll containing up to 3 pages' worth of entries
heapsCreated ← heapsCreated+1;
};
};
ReturnHeapAndTable: PUBLIC ENTRY PROC [tree: Tree, pathStk: PathStk] = {
IF tree.heap=NIL THEN
{ tree.heap ← pathStk.heap; tree.entryTable ← pathStk.entryTable };
pathStk.heap ← NIL;
pathStk.entryTable ← NIL;
};
GetDefaultPathStk: PUBLIC ENTRY PROC [tree: Tree] RETURNS [pathStk: PathStk] = {
pathStk ← tree.defaultPathStk;
tree.defaultPathStk ← NIL;
IF pathStk=NIL
THEN { pathStk ← NewPathStk[]; pathStksCreated ← pathStksCreated+1 }
ELSE { pathStk.version ← LAST [CARD]; pathStksReused ← pathStksReused+1 };
};
ReturnDefaultPathStk: PUBLIC ENTRY PROC [tree: Tree, pathStk: PathStk] = {
IF tree.defaultPathStk=NIL THEN tree.defaultPathStk ← pathStk;
};
Error: PUBLIC SAFE ERROR [reason: Reason] = CODE;
UpdateInProgress: PUBLIC SAFE SIGNAL = CODE;
Bug: PUBLIC ERROR [type: BugType] = CODE;
Private
PathToPreviousEntry: PROC [tree: Tree, pathStk: PathStk] RETURNS [ok: BOOL] = {
Backs up the PathStk to refer to the previous entry in the tree. Returns TRUE normally, FALSE if there is no previous entry.
pagePtr: BTreePagePtr;
pse: LONG POINTER TO PathStkEntry ← @pathStk.path[pathStk.top];
IF pathStk.top=0 THEN RETURN [FALSE];
tree.BackUpOneEntry[pse];
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
IF pagePtr[pse.lastOffset].grPage=nilPage
THEN {
this is a leaf page
WHILE pse.lastOffset<entry1Offset DO
hit beginning of current page: previous entry is left father
tree.ReleasePage[pse.pageNumber];
pathStk.top ← pathStk.top-1;
IF pathStk.top=0 THEN RETURN [FALSE];
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
ENDLOOP;
tree.ReleasePage[pse.pageNumber];
}
ELSE {
this is not a leaf page: previous entry is rightmost left son
page: PageNumber ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
tree.PathToMaxDescendant[pathStk: pathStk, page: page];
};
RETURN [TRUE];
};
GetOffsetTable: ENTRY PROC [tree: Tree] RETURNS [offsetTable: OffsetTable] = INLINE {
Obtains the OffsetTable associated with the BTree, if it is available; otherwise makes a new one.
offsetTable ← tree.offsetTable;
tree.offsetTable ← NIL;
IF offsetTable#NIL
THEN offsetTablesReused ← offsetTablesReused+1
ELSE {
offsetTable ← NEW[OffsetTableObject[tree.maxRecordsPerPage+2]];
offsetTablesCreated ← offsetTablesCreated+1;
};
};
ReturnOffsetTable: ENTRY PROC [tree: Tree, offsetTable: OffsetTable] = INLINE {
Gives back the OffsetTable.
IF tree.offsetTable=NIL THEN tree.offsetTable ← offsetTable;
};
treeID: CARDINAL ← 0;
offsetTablesReused, offsetTablesCreated: CARD ← 0;
heapsReused, heapsCreated: CARD ← 0;
pathStksReused, pathStksCreated: CARD ← 0;
}.
Bob Hagmann April 30, 1985 7:04:49 am PDT
Catch File.Error and skip those pages during SalvageEntries
changes to: SalvageEntries