BTreeRead.mesa
This module contains everything you need to open and read BTrees.
Last Edited by: Taft, November 25, 1983 3:02 pm
Loose ends:
1. Validate's repair of the entryCount is slightly flakey since the tree is unlocked between the EnumerateEntries that counts the entries and the setting of the entryCount.
DIRECTORY
BTree,
BTreeInternal,
File USING [wordsPerPage],
PrincOpsUtils USING [BITSHIFT, LongCopy, LongZero];
BTreeRead: MONITOR LOCKS tree USING tree: Tree
IMPORTS BTreeInternal, PrincOpsUtils
EXPORTS BTree, BTreeInternal
= BEGIN OPEN BTree, BTreeInternal;
BTree.
Tree: TYPE = REF TreeObject;
TreeObject: PUBLIC TYPE = BTreeInternal.TreeObject;
PathStk: TYPE = REF PathStkObject;
PathStkObject: PUBLIC TYPE = BTreeInternal.PathStkObject;
New: PUBLIC SAFE PROCEDURE [repPrim: RepresentationPrimitives, storPrim: PageStoragePrimitives, minEntrySize: CARDINAL ← 1, initialState: State[closed..suspended] ← closed] RETURNS [tree: Tree] = CHECKED
BEGIN
tree ← NEW [TreeObject ← [state: [], repPrim: repPrim, storPrim: storPrim, minEntrySize: minEntrySize, objectState: initialState]];
END;
Open: PUBLIC SAFE PROCEDURE [tree: Tree, storage: PageStorage, pageSize: PageSize ← File.wordsPerPage, initialize: BOOLEANFALSE, maintainRecomputableState: BOOLEANTRUE] = TRUSTED
BEGIN
DO -- first ensure that it's not already open
tree.Lock[update, FALSE];
IF tree.objectState#open THEN EXIT;
tree.Unlock[];
SetState[tree, closed];
ENDLOOP;
tree.state ← [pageSize: pageSize];
tree.maxFreeWords ← pageSize-pageOverhead;
tree.maxRecordsPerPage ← tree.maxFreeWords/(tree.minEntrySize+entryOverhead);
tree.awfullyFull ← (9*tree.maxFreeWords)/10;
tree.prettyFull ← (2*tree.maxFreeWords)/3;
tree.fairlyFull ← tree.maxFreeWords/2;
tree.breathingSpace ← tree.maxFreeWords-tree.awfullyFull;
tree.storage ← storage;
tree.maintainRecomputableState ← maintainRecomputableState;
tree.id ← (treeID ← treeID+1); -- this doesn't assign a reliably unique ID, since treeID is not protected by any monitor; but it doesn't really matter since the ID is merely protecting against client blunders anyway.
BEGIN ENABLE UNWIND => tree.Unlock[];
IF initialize THEN
BEGIN
statePtr: LONG POINTER TO TreeState ← tree.ReferencePage[statePage, new];
PrincOpsUtils.LongZero[statePtr, pageSize];
IF ~maintainRecomputableState THEN tree.state.entryCount ← LAST[LONG CARDINAL];
statePtr^ ← tree.state;
tree.ReleasePage[statePage, endOfUpdate];
END
ELSE BEGIN
statePtr: LONG POINTER TO TreeState ← tree.ReferencePage[statePage, read];
IF statePtr.seal#sealValue THEN ERROR Error[badSeal];
IF statePtr.pageSize#pageSize THEN ERROR Error[wrongPageSize];
tree.state ← statePtr^;
tree.ReleasePage[statePage];
IF tree.state.updateInProgress THEN SIGNAL UpdateInProgress;
END;
tree.state.updateInProgress ← FALSE;
END;
tree.objectState ← open;
tree.Unlock[];
END;
SetState: PUBLIC SAFE PROCEDURE [tree: Tree, state: State[closed..suspended]] = TRUSTED
BEGIN
DO
WaitUnlocked: ENTRY PROCEDURE [tree: Tree] = INLINE
{IF tree.longUpdate THEN WAIT tree.unlocked};
tree.Lock[update, FALSE];
IF tree.objectState#open OR ~tree.longUpdate THEN EXIT;
tree.Unlock[];
WaitUnlocked[tree]; -- this actually wakes up every time the tree is unlocked, including the call to Unlock from SetUpdateInProgress
ENDLOOP;
Tree is now locked in update mode with no update in progress
tree.objectState ← state;
tree.storage ← NIL;
tree.offsetTable ← NIL;
tree.defaultPathStk ← NIL;
tree.entryTable ← NIL;
tree.heap ← NIL;
tree.Unlock[];
END;
GetState: PUBLIC ENTRY SAFE PROCEDURE [tree: Tree] RETURNS [state: State, entryCount: LONG CARDINAL, greatestPage: PageNumber, depth: CARDINAL, storage: PageStorage] = CHECKED
{ RETURN [state: tree.objectState, entryCount: tree.state.entryCount, greatestPage: tree.state.greatestPage, depth: tree.state.depth, storage: tree.storage] };
Validate: PUBLIC SAFE PROCEDURE [tree: Tree] = TRUSTED
BEGIN
CheckPair: PROCEDURE [entry: Entry] RETURNS [continue: BOOLEAN] =
BEGIN
size: CARDINAL = tree.EntrySize[entry];
IF size+entryOverhead > tree.maxFreeWords THEN ERROR Error[entrySizesWrong];
entryCount ← entryCount+1;
IF firstEntry THEN firstEntry ← FALSE
ELSE IF tree.CompareEntries[prevEntry, entry]#less THEN
ERROR Error[entriesOutOfOrder];
PrincOpsUtils.LongCopy[to: prevEntry, from: entry, nwords: size];
RETURN [TRUE];
END; -- CheckPair
PrevRecord: TYPE = RECORD [entry: SEQUENCE maxLength: [0..LAST[PageSize]] OF WORD];
prevRecord: REF PrevRecord = NEW[PrevRecord[tree.state.pageSize-reservedWordsPerPage]];
prevEntry: Entry = BASE[DESCRIPTOR[prevRecord.entry]];
entryCount: LONG CARDINAL ← 0;
firstEntry: BOOLEANTRUE;
[] ← EnumerateEntries[tree: tree, key: NIL, Proc: CheckPair];
IF tree.maintainRecomputableState AND entryCount#tree.state.entryCount THEN
BEGIN
tree.Lock[update];
tree.state.entryCount ← entryCount;
tree.WriteStatePage[endOfUpdate];
tree.Unlock[];
END;
END;
ReadRecord: PUBLIC SAFE PROCEDURE [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ← NIL, useExistingPath: BOOLEANFALSE] RETURNS [record: Record] = TRUSTED
BEGIN
EntryProc: PROCEDURE [entry: Entry] =
{ record ← IF entry#NIL THEN tree.NewRecordFromEntry[entry] ELSE NIL };
ReadEntry[tree: tree, key: key, relation: relation, pathStk: pathStk, useExistingPath: useExistingPath, Proc: EntryProc];
END;
EnumerateRecords: PUBLIC SAFE PROCEDURE [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ← NIL, useExistingPath: BOOLEANFALSE, Proc: PROCEDURE [record: Record] RETURNS [continue: BOOLEAN]] RETURNS [exhausted: BOOLEAN] = TRUSTED
BEGIN
EntryProc: PROCEDURE [entry: Entry] RETURNS [continue: BOOLEAN] =
{ RETURN [Proc[tree.NewRecordFromEntry[entry]]] };
exhausted ← EnumerateEntries[tree: tree, key: key, pathStk: pathStk, useExistingPath: useExistingPath, Proc: EntryProc];
END;
ReadEntry: PUBLIC PROCEDURE [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ← NIL, useExistingPath: BOOLEANFALSE, Proc: UNSAFE PROCEDURE [entry: Entry]] =
BEGIN
SELECT relation FROM
less, lessEqual, equal =>
BEGIN
pathStkWasNil: BOOLEAN ← pathStk=NIL;
tree.Lock[read];
IF pathStkWasNil THEN
BEGIN
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ← tree.GetDefaultPathStk[];
END;
Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
BEGIN ENABLE UNWIND =>
{ IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk]; tree.Unlock[] };
equal: BOOLEAN;
[equal: equal] ← tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath];
IF equal AND relation=less THEN [] ← PathToPreviousEntry[tree, pathStk];
IF pathStk.top=0 OR (~equal AND relation=equal) THEN Proc[NIL]
ELSE BEGIN
pagePtr: BTreePagePtr;
pse: LONG POINTER TO PathStkEntry;
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
Proc[@pagePtr[pse.lastOffset].entry !
UNWIND => tree.ReleasePage[pse.pageNumber]];
tree.ReleasePage[pse.pageNumber];
END;
END;
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
END;
greaterEqual, greater =>
BEGIN
CallWhenRelationSatisfied: PROCEDURE [entry: Entry] RETURNS [continue: BOOLEAN] =
BEGIN
IF key=NIL OR tree.Compare[key, entry] <= maxComparison THEN
{Proc[entry]; RETURN [FALSE]}
ELSE RETURN [TRUE];
END;
maxComparison: Comparison ← IF relation=greaterEqual THEN equal ELSE less;
exhausted: BOOLEAN ← EnumerateEntries[tree: tree, key: key, pathStk: pathStk, useExistingPath: useExistingPath, Proc: CallWhenRelationSatisfied];
IF exhausted THEN Proc[NIL];
END;
ENDCASE => ERROR;
END;
EnumerateEntries: PUBLIC PROCEDURE [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ← NIL, useExistingPath: BOOLEANFALSE, Proc: UNSAFE PROCEDURE [entry: Entry] RETURNS [continue: BOOLEAN]] RETURNS [exhausted: BOOLEAN] =
BEGIN
pathStkWasNil: BOOLEAN ← pathStk=NIL;
tree.Lock[read];
IF pathStkWasNil THEN
BEGIN
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ← tree.GetDefaultPathStk[];
END;
Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
BEGIN ENABLE UNWIND =>
{ IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk]; tree.Unlock[] };
leafStkTop: PathStkIndex;
pse: LONG POINTER TO PathStkEntry;
pagePtr: BTreePagePtr ← NIL;
skipFirstEntry: BOOLEANFALSE;
equal: BOOLEAN;
[equal: equal, depth: leafStkTop] ← tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath];
IF pathStk.top=0 THEN
BEGIN -- key is less than any existing entry in the tree. Set PathStk to first record in leftmost leaf page.
IF leafStkTop=0 THEN GOTO exhausted; -- empty tree
pathStk.top ← leafStkTop;
pathStk.path[leafStkTop].lastOffset ← entry1Offset;
END
ELSE
SELECT relation FROM
less =>
IF equal AND ~PathToPreviousEntry[tree, pathStk] THEN
{pathStk.top ← leafStkTop; pathStk.path[leafStkTop].lastOffset ← entry1Offset};
lessEqual =>
NULL;
equal, greaterEqual =>
skipFirstEntry ← ~equal;
greater =>
skipFirstEntry ← TRUE;
ENDCASE;
exhausted ← FALSE;
pse ← @pathStk.path[pathStk.top];
DO
offset: PageOffset ← pse.lastOffset;
endOffset: PageOffset;
IF pagePtr=NIL THEN
BEGIN
pagePtr ← tree.ReferencePage[pse.pageNumber];
endOffset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
END;
IF offset>=endOffset THEN
BEGIN -- ran off end of page. Pop up a level and do the next entry in the father page
tree.ReleasePage[pse.pageNumber];
IF offset>endOffset THEN ERROR Error[entrySizesWrong];
pagePtr ← NIL;
IF (pathStk.top ← pathStk.top-1) = 0 THEN
BEGIN -- ran off end of tree. PathStk is no longer valid.
pathStk.version ← LAST[LONG CARDINAL];
GOTO exhausted;
END;
pse ← pse - SIZE[PathStkEntry];
END
ELSE BEGIN -- enumerate this entry
nxtPgNo: PageNumber ← pagePtr[offset].grPage;
pse.offset ← offset + tree.BTreeEntrySize[@pagePtr[offset]];
IF skipFirstEntry THEN skipFirstEntry ← FALSE
ELSE IF ~Proc[@pagePtr[offset].entry !
UNWIND => tree.ReleasePage[pse.pageNumber]] THEN EXIT;
WHILE nxtPgNo#nilPage DO
In a non-leaf page, find the leftmost right descendant of the entry we just did.
tree.ReleasePage[pse.pageNumber];
pathStk.top ← pathStk.top+1;
pse ← pse + SIZE[PathStkEntry];
pagePtr ← tree.ReferencePage[nxtPgNo];
pse.pageNumber ← nxtPgNo;
pse.nextToLastOffset ← nilOffset;
pse.lastOffset ← entry0Offset;
pse.offset ← entry1Offset;
endOffset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
nxtPgNo ← pagePtr.minPage;
ENDLOOP;
END;
pse.nextToLastOffset ← pse.lastOffset;
pse.lastOffset ← pse.offset;
ENDLOOP;
IF pagePtr#NIL THEN tree.ReleasePage[pse.pageNumber];
EXITS
exhausted => exhausted ← TRUE;
END;
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
END;
SalvageEntries: PUBLIC PROCEDURE [tree: Tree, Proc: UNSAFE PROCEDURE [entry: Entry] RETURNS [continue: BOOLEAN]] RETURNS [exhausted: BOOLEAN] =
BEGIN
tree.Lock[read];
FOR page: PageNumber IN [statePage+1 .. tree.state.greatestPage] DO
ENABLE UNWIND => tree.Unlock[];
pagePtr: BTreePagePtr = tree.ReferencePage[page];
IF pagePtr.freeWords IN [0 .. CARDINAL[tree.state.pageSize-pageOverhead]] THEN
BEGIN
offset: PageOffset ← entry1Offset;
endOffset: PageOffset = nilOffset+(tree.state.pageSize-pagePtr.freeWords);
WHILE offset<endOffset DO
entSize: CARDINAL = tree.BTreeEntrySize[@pagePtr[offset]];
IF entSize<entryOverhead+tree.minEntrySize OR offset+entSize>endOffset THEN EXIT;
IF ~Proc[@pagePtr[offset].entry] THEN GOTO aborted;
offset ← offset+entSize;
ENDLOOP;
END;
tree.ReleasePage[page];
REPEAT
aborted => exhausted ← FALSE;
FINISHED => exhausted ← TRUE;
ENDLOOP;
tree.Unlock[];
END;
NewPathStk: PUBLIC SAFE PROCEDURE RETURNS [pathStk: PathStk] = CHECKED
BEGIN
pathStk ← NEW[PathStkObject];
pathStk.version ← LAST[LONG CARDINAL]; -- not valid
END;
BTreeInternal.
PathEntryLE: PUBLIC PROCEDURE [tree: Tree, key: Key, pathStk: PathStk, useExistingPath: BOOLEANFALSE] RETURNS [equal: BOOLEAN, depth: PathStkIndex] =
BEGIN
BinSearchPage: PROCEDURE RETURNS [firstG: EntryOrdinal] =
Returns an index r such that offsetTable[r] is the offset of the first entry strictly greater than key, and fills in offsetTable[2] through offsetTable[r] with the offsets of all entries up to and including that one.
BEGIN
firstBadOffset: PageOffset = nilOffset + (tree.state.pageSize - pagePtr.freeWords);
midOffset: PageOffset ← PrincOpsUtils.BITSHIFT[firstBadOffset, -1];
curOffset: PageOffset ← entry1Offset;
l, r: EntryOrdinal ← 2;
Scan records linearly and build offsetTable until half the in-use portion of the BTree page has been examined. Then do one key comparison to detemine which half of the page to direct our attention to, and build the rest of offsetTable only if necessary. On the average this saves 25% of the work of building offsetTables, and costs no extra comparison since the first comparison would examine a middle record anyway (not necessarily the same one).
IF key#NIL THEN DO
IF curOffset>=midOffset THEN
BEGIN
IF curOffset>=firstBadOffset OR tree.Compare[key, @pagePtr[curOffset].entry] = less THEN EXIT;
midOffset ← firstBadOffset;
l ← r;
END;
curOffset ← curOffset + tree.BTreeEntrySize[@pagePtr[curOffset]];
r ← r+1;
offsetTable[r] ← curOffset;
ENDLOOP;
IF curOffset>firstBadOffset THEN ERROR Error[entrySizesWrong];
At this point, offsetTable[r] is the offset of the first non-entry, which we assume to have an infinite key (which we shall never test). We shall leave r pointing at the offset of the first entry strictly greater than key.
WHILE l<r DO
m: EntryOrdinal ← PrincOpsUtils.BITSHIFT[l+r, -1];
SELECT tree.Compare[key, @pagePtr[offsetTable[m]].entry] FROM
less => r ← m;
equal => {equal ← TRUE; RETURN [m+1]};
greater => l ← m+1;
ENDCASE;
ENDLOOP;
RETURN [r];
END; -- BinSearchPage
pse: LONG POINTER TO PathStkEntry;
offsetTable: OffsetTable;
curPage: PageNumber;
pagePtr: BTreePagePtr;
equal ← FALSE;
IF useExistingPath AND pathStk.treeID=tree.id AND pathStk.version=tree.version AND key#NIL THEN
BEGIN -- supplied PathStk is still valid. See if it is for the correct key.
depth ← pathStk.top;
IF depth=0 THEN GOTO dontUse; -- who knows whether this is the min key or not?
WHILE pathStk.path[depth].leastSon # nilPage DO depth ← depth+1; ENDLOOP;
[pse: pse, ptr: pagePtr] ← tree.ReferenceStack[pathStk];
SELECT tree.Compare[key, @pagePtr[pse.lastOffset].entry] FROM
less => useExistingPath ← FALSE; -- wrong key
equal => equal ← TRUE; -- correct key
greater =>
BEGIN
IF depth=pathStk.top AND pse.offset < nilOffset+(tree.state.pageSize-pagePtr.freeWords) AND tree.Compare[key, @pagePtr[pse.offset].entry]=less
THEN NULL -- key falls between this entry and next; ok
ELSE useExistingPath ← FALSE; -- wrong key, or too hard to decide
END;
ENDCASE;
tree.ReleasePage[pse.pageNumber];
IF useExistingPath THEN RETURN;
EXITS
dontUse => NULL;
END;
Perform the normal BTree lookup
pathStk.top ← 0;
pathStk.path[0] ← [];
offsetTable ← GetOffsetTable[tree];
offsetTable[0] ← nilOffset;
offsetTable[1] ← entry0Offset;
offsetTable[2] ← entry1Offset;
depth ← 0;
pse ← @pathStk.path[0];
curPage ← tree.state.rootPage;
WHILE curPage#nilPage DO
firstG: EntryOrdinal;
pagePtr ← tree.ReferencePage[curPage];
depth ← depth+1;
pse ← pse+SIZE[PathStkEntry];
pse.pageNumber ← curPage;
firstG ← BinSearchPage[ !
UNWIND => {tree.ReleasePage[curPage]; ReturnOffsetTable[tree, offsetTable]}];
pse.offset ← offsetTable[firstG];
pse.lastOffset ← offsetTable[firstG-1];
pse.nextToLastOffset ← offsetTable[firstG-2];
pse.leastSon ← pagePtr.minPage;
IF firstG>2 THEN pathStk.top ← depth; -- real entry in this page
AssignRefESR[@pse.eslFront, NIL];
AssignRefESR[@pse.eslRear, NIL];
curPage ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
ENDLOOP;
pathStk.treeID ← tree.id;
pathStk.version ← tree.version;
ReturnOffsetTable[tree, offsetTable];
END;
ReferenceStack: PUBLIC PROCEDURE [tree: Tree, pathStk: PathStk, type: ReferenceType ← read] RETURNS [pse: LONG POINTER TO PathStkEntry, ptr: PagePtr] =
BEGIN
pse ← @pathStk.path[pathStk.top];
ptr ← tree.ReferencePage[pse.pageNumber, type];
END;
BackUpOneEntry: PUBLIC PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry] =
BEGIN
tree.RepairOffsets[pse];
pse.offset ← pse.lastOffset;
tree.RepairOffsets[pse];
END;
RepairOffsets: PUBLIC PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry] =
BEGIN
CheckProgression: PROCEDURE [offset1, offset2: PageOffset] RETURNS [ok: BOOLEAN] =
BEGIN
RETURN [SELECT TRUE FROM
offset2=entry0Offset => offset1=nilOffset,
offset2=entry1Offset => offset1=entry0Offset,
offset1>=offset2 => FALSE,
ENDCASE => offset1+tree.BTreeEntrySize[@pagePtr[offset1]] = offset2];
END; -- CheckProgression
pagePtr: BTreePagePtr = tree.ReferencePage[pse.pageNumber];
IF ~(CheckProgression[pse.lastOffset, pse.offset] AND CheckProgression[pse.nextToLastOffset, pse.lastOffset]) THEN
BEGIN
newOffset: PageOffset ← entry1Offset;
lastOffset: PageOffset ← entry0Offset;
nextToLastOffset: PageOffset ← nilOffset;
WHILE newOffset#pse.offset DO
nextToLastOffset ← lastOffset;
lastOffset ← newOffset;
newOffset ← newOffset+tree.BTreeEntrySize[@pagePtr[newOffset]];
ENDLOOP;
pse.lastOffset ← lastOffset;
pse.nextToLastOffset ← nextToLastOffset;
END;
tree.ReleasePage[pse.pageNumber];
END;
PathToMaxDescendant: PUBLIC PROCEDURE [tree: Tree, pathStk: PathStk, page: PageNumber] =
BEGIN
pse: LONG POINTER TO PathStkEntry;
pagePtr: BTreePagePtr;
WHILE page#nilPage DO
pathStk.top ← pathStk.top+1;
pse ← @pathStk.path[pathStk.top];
pse.pageNumber ← page;
pagePtr ← tree.ReferencePage[page];
pse.offset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
pse.lastOffset ← entry1Offset;
pse.nextToLastOffset ← entry0Offset;
tree.RepairOffsets[pse];
AssignRefESR[@pse.eslFront, NIL];
AssignRefESR[@pse.eslRear, NIL];
pse.leastSon ← nilPage;
page ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
ENDLOOP;
END;
WriteStatePage: PUBLIC PROCEDURE [tree: Tree, update: UpdateState ← unchanged] =
BEGIN
statePtr: LONG POINTER TO TreeState = tree.ReferencePage[statePage, write];
statePtr^ ← tree.state;
tree.ReleasePage[statePage, update];
END;
Lock: PUBLIC ENTRY PROCEDURE [tree: Tree, mode: LockMode, checkState: BOOLEANTRUE] =
BEGIN
DO
SELECT tree.objectState FROM
closed => IF checkState THEN RETURN WITH ERROR Error[closed];
suspended => IF checkState THEN {WAIT tree.unlocked; LOOP};
open => NULL
ENDCASE;
SELECT mode FROM
read =>
IF tree.lockCount>=0 THEN {tree.lockCount ← tree.lockCount+1; EXIT};
update =>
IF tree.lockCount=0 THEN {tree.lockCount ← -1; EXIT};
ENDCASE;
WAIT tree.unlocked; -- wait for tree to be unlocked
ENDLOOP;
END;
Unlock: PUBLIC ENTRY PROCEDURE [tree: Tree] =
BEGIN
IF tree.lockCount=0 THEN ERROR Bug[treeNotLocked];
tree.lockCount ← MAX[tree.lockCount-1, 0];
IF tree.lockCount=0 THEN BROADCAST tree.unlocked; -- BROADCAST, not NOTIFY, because releasing a write lock might make multiple readers eligible to enter simultaneously.
END;
GetHeapAndTable: PUBLIC ENTRY PROCEDURE [tree: Tree, pathStk: PathStk] =
BEGIN
pathStk.heap ← tree.heap;
tree.heap ← NIL;
pathStk.entryTable ← tree.entryTable;
tree.entryTable ← NIL;
IF pathStk.heap#NIL THEN heapsReused ← heapsReused+1
ELSE BEGIN
pathStk.heap ← NEW[Heap[tree.maxRecordsPerPage+1]]; -- heap holds at most one page's worth of entries at any given time
pathStk.entryTable ← NEW[EntryTable[3*tree.maxRecordsPerPage+1]]; -- may have to index scroll containing up to 3 pages' worth of entries
heapsCreated ← heapsCreated+1;
END;
END;
ReturnHeapAndTable: PUBLIC ENTRY PROCEDURE [tree: Tree, pathStk: PathStk] =
BEGIN
IF tree.heap=NIL THEN
{ tree.heap ← pathStk.heap; tree.entryTable ← pathStk.entryTable };
pathStk.heap ← NIL;
pathStk.entryTable ← NIL;
END;
GetDefaultPathStk: PUBLIC ENTRY PROCEDURE [tree: Tree] RETURNS [pathStk: PathStk] =
BEGIN
pathStk ← tree.defaultPathStk;
tree.defaultPathStk ← NIL;
IF pathStk=NIL
THEN { pathStk ← NewPathStk[]; pathStksCreated ← pathStksCreated+1 }
ELSE { pathStk.version ← LAST [LONG CARDINAL]; pathStksReused ← pathStksReused+1 };
END;
ReturnDefaultPathStk: PUBLIC ENTRY PROCEDURE [tree: Tree, pathStk: PathStk] =
{ IF tree.defaultPathStk=NIL THEN tree.defaultPathStk ← pathStk };
Error: PUBLIC SAFE ERROR [reason: Reason] = CODE;
UpdateInProgress: PUBLIC SAFE SIGNAL = CODE;
Bug: PUBLIC ERROR [type: BugType] = CODE;
Private
PathToPreviousEntry: PROCEDURE [tree: Tree, pathStk: PathStk] RETURNS [ok: BOOLEAN] =
Backs up the PathStk to refer to the previous entry in the tree. Returns TRUE normally, FALSE if there is no previous entry.
BEGIN
pagePtr: BTreePagePtr;
pse: LONG POINTER TO PathStkEntry ← @pathStk.path[pathStk.top];
IF pathStk.top=0 THEN RETURN [FALSE];
tree.BackUpOneEntry[pse];
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
IF pagePtr[pse.lastOffset].grPage=nilPage THEN
BEGIN -- this is a leaf page
WHILE pse.lastOffset<entry1Offset
DO -- hit beginning of current page: previous entry is left father
tree.ReleasePage[pse.pageNumber];
pathStk.top ← pathStk.top-1;
IF pathStk.top=0 THEN RETURN [FALSE];
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
ENDLOOP;
tree.ReleasePage[pse.pageNumber];
END
ELSE BEGIN -- this is not a leaf page: previous entry is rightmost left son
page: PageNumber ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
tree.PathToMaxDescendant[pathStk: pathStk, page: page];
END;
RETURN [TRUE];
END;
GetOffsetTable: ENTRY PROCEDURE [tree: Tree] RETURNS [offsetTable: OffsetTable] =
Obtains the OffsetTable associated with the BTree, if it is available; otherwise makes a new one.
BEGIN
offsetTable ← tree.offsetTable;
tree.offsetTable ← NIL;
IF offsetTable#NIL THEN offsetTablesReused ← offsetTablesReused+1
ELSE BEGIN
offsetTable ← NEW[OffsetTableObject[tree.maxRecordsPerPage+2]];
offsetTablesCreated ← offsetTablesCreated+1;
END;
END;
ReturnOffsetTable: ENTRY PROCEDURE [tree: Tree, offsetTable: OffsetTable] =
Gives back the OffsetTable.
{ IF tree.offsetTable=NIL THEN tree.offsetTable ← offsetTable };
treeID: CARDINAL ← 0;
offsetTablesReused, offsetTablesCreated: LONG CARDINAL ← 0;
heapsReused, heapsCreated: LONG CARDINAL ← 0;
pathStksReused, pathStksCreated: LONG CARDINAL ← 0;
END.