BTree.
Tree: TYPE = REF TreeObject;
TreeObject: PUBLIC TYPE = BTreeInternal.TreeObject;
PathStk: TYPE = REF PathStkObject;
PathStkObject: PUBLIC TYPE = BTreeInternal.PathStkObject;
New:
PUBLIC
SAFE
PROCEDURE [repPrim: RepresentationPrimitives, storPrim: PageStoragePrimitives, minEntrySize:
CARDINAL ← 1, initialState: State[closed..suspended] ← closed]
RETURNS [tree: Tree] =
CHECKED
BEGIN
tree ← NEW [TreeObject ← [state: [], repPrim: repPrim, storPrim: storPrim, minEntrySize: minEntrySize, objectState: initialState]];
END;
Open:
PUBLIC
SAFE
PROCEDURE [tree: Tree, storage: PageStorage, pageSize: PageSize ← File.wordsPerPage, initialize:
BOOLEAN ←
FALSE, maintainRecomputableState:
BOOLEAN ←
TRUE] =
TRUSTED
BEGIN
DO
-- first ensure that it's not already open
tree.Lock[update, FALSE];
IF tree.objectState#open THEN EXIT;
tree.Unlock[];
SetState[tree, closed];
ENDLOOP;
tree.state ← [pageSize: pageSize];
tree.maxFreeWords ← pageSize-pageOverhead;
tree.maxRecordsPerPage ← tree.maxFreeWords/(tree.minEntrySize+entryOverhead);
tree.awfullyFull ← (9*tree.maxFreeWords)/10;
tree.prettyFull ← (2*tree.maxFreeWords)/3;
tree.fairlyFull ← tree.maxFreeWords/2;
tree.breathingSpace ← tree.maxFreeWords-tree.awfullyFull;
tree.storage ← storage;
tree.maintainRecomputableState ← maintainRecomputableState;
tree.id ← (treeID ← treeID+1); -- this doesn't assign a reliably unique ID, since treeID is not protected by any monitor; but it doesn't really matter since the ID is merely protecting against client blunders anyway.
BEGIN
ENABLE
UNWIND => tree.Unlock[];
IF initialize
THEN
BEGIN
statePtr: LONG POINTER TO TreeState ← tree.ReferencePage[statePage, new];
PrincOpsUtils.LongZero[statePtr, pageSize];
IF ~maintainRecomputableState THEN tree.state.entryCount ← LAST[LONG CARDINAL];
statePtr^ ← tree.state;
tree.ReleasePage[statePage, endOfUpdate];
END
ELSE
BEGIN
statePtr: LONG POINTER TO TreeState ← tree.ReferencePage[statePage, read];
IF statePtr.seal#sealValue THEN ERROR Error[badSeal];
IF statePtr.pageSize#pageSize THEN ERROR Error[wrongPageSize];
tree.state ← statePtr^;
tree.ReleasePage[statePage];
IF tree.state.updateInProgress THEN SIGNAL UpdateInProgress;
END;
tree.state.updateInProgress ← FALSE;
END;
tree.objectState ← open;
tree.Unlock[];
END;
SetState:
PUBLIC
SAFE
PROCEDURE [tree: Tree, state: State[closed..suspended]] =
TRUSTED
BEGIN
DO
WaitUnlocked:
ENTRY
PROCEDURE [tree: Tree] =
INLINE
{IF tree.longUpdate THEN WAIT tree.unlocked};
tree.Lock[update, FALSE];
IF tree.objectState#open OR ~tree.longUpdate THEN EXIT;
tree.Unlock[];
WaitUnlocked[tree]; -- this actually wakes up every time the tree is unlocked, including the call to Unlock from SetUpdateInProgress
ENDLOOP;
Tree is now locked in update mode with no update in progress
tree.objectState ← state;
tree.storage ← NIL;
tree.offsetTable ← NIL;
tree.defaultPathStk ← NIL;
tree.entryTable ← NIL;
tree.heap ← NIL;
tree.Unlock[];
END;
GetState:
PUBLIC ENTRY
SAFE
PROCEDURE [tree: Tree]
RETURNS [state: State, entryCount:
LONG
CARDINAL, greatestPage: PageNumber, depth:
CARDINAL, storage: PageStorage] =
CHECKED
{ RETURN [state: tree.objectState, entryCount: tree.state.entryCount, greatestPage: tree.state.greatestPage, depth: tree.state.depth, storage: tree.storage] };
Validate:
PUBLIC
SAFE
PROCEDURE [tree: Tree] =
TRUSTED
BEGIN
CheckPair:
PROCEDURE [entry: Entry]
RETURNS [continue:
BOOLEAN] =
BEGIN
size: CARDINAL = tree.EntrySize[entry];
IF size+entryOverhead > tree.maxFreeWords THEN ERROR Error[entrySizesWrong];
entryCount ← entryCount+1;
IF firstEntry THEN firstEntry ← FALSE
ELSE
IF tree.CompareEntries[prevEntry, entry]#less
THEN
ERROR Error[entriesOutOfOrder];
PrincOpsUtils.LongCopy[to: prevEntry, from: entry, nwords: size];
RETURN [TRUE];
END; -- CheckPair
PrevRecord: TYPE = RECORD [entry: SEQUENCE maxLength: [0..LAST[PageSize]] OF WORD];
prevRecord: REF PrevRecord = NEW[PrevRecord[tree.state.pageSize-reservedWordsPerPage]];
prevEntry: Entry = BASE[DESCRIPTOR[prevRecord.entry]];
entryCount: LONG CARDINAL ← 0;
firstEntry: BOOLEAN ← TRUE;
[] ← EnumerateEntries[tree: tree, key: NIL, Proc: CheckPair];
IF tree.maintainRecomputableState
AND entryCount#tree.state.entryCount
THEN
BEGIN
tree.Lock[update];
tree.state.entryCount ← entryCount;
tree.WriteStatePage[endOfUpdate];
tree.Unlock[];
END;
END;
ReadRecord:
PUBLIC
SAFE
PROCEDURE [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ←
NIL, useExistingPath:
BOOLEAN ←
FALSE]
RETURNS [record: Record] =
TRUSTED
BEGIN
EntryProc:
PROCEDURE [entry: Entry] =
{ record ← IF entry#NIL THEN tree.NewRecordFromEntry[entry] ELSE NIL };
ReadEntry[tree: tree, key: key, relation: relation, pathStk: pathStk, useExistingPath: useExistingPath, Proc: EntryProc];
END;
EnumerateRecords:
PUBLIC
SAFE
PROCEDURE [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ←
NIL, useExistingPath:
BOOLEAN ←
FALSE, Proc:
PROCEDURE [record: Record]
RETURNS [continue:
BOOLEAN]]
RETURNS [exhausted:
BOOLEAN] =
TRUSTED
BEGIN
EntryProc:
PROCEDURE [entry: Entry]
RETURNS [continue:
BOOLEAN] =
{ RETURN [Proc[tree.NewRecordFromEntry[entry]]] };
exhausted ← EnumerateEntries[tree: tree, key: key, pathStk: pathStk, useExistingPath: useExistingPath, Proc: EntryProc];
END;
ReadEntry:
PUBLIC
PROCEDURE [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ←
NIL, useExistingPath:
BOOLEAN ←
FALSE, Proc:
UNSAFE
PROCEDURE [entry: Entry]] =
BEGIN
SELECT relation
FROM
less, lessEqual, equal =>
BEGIN
pathStkWasNil: BOOLEAN ← pathStk=NIL;
tree.Lock[read];
IF pathStkWasNil
THEN
BEGIN
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ← tree.GetDefaultPathStk[];
END;
Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
BEGIN
ENABLE
UNWIND =>
{ IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk]; tree.Unlock[] };
equal: BOOLEAN;
[equal: equal] ← tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath];
IF equal AND relation=less THEN [] ← PathToPreviousEntry[tree, pathStk];
IF pathStk.top=0 OR (~equal AND relation=equal) THEN Proc[NIL]
ELSE
BEGIN
pagePtr: BTreePagePtr;
pse: LONG POINTER TO PathStkEntry;
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
Proc[@pagePtr[pse.lastOffset].entry !
UNWIND => tree.ReleasePage[pse.pageNumber]];
tree.ReleasePage[pse.pageNumber];
END;
END;
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
END;
greaterEqual, greater =>
BEGIN
CallWhenRelationSatisfied:
PROCEDURE [entry: Entry]
RETURNS [continue:
BOOLEAN] =
BEGIN
IF key=
NIL
OR tree.Compare[key, entry] <= maxComparison
THEN
{Proc[entry]; RETURN [FALSE]}
ELSE RETURN [TRUE];
END;
maxComparison: Comparison ← IF relation=greaterEqual THEN equal ELSE less;
exhausted: BOOLEAN ← EnumerateEntries[tree: tree, key: key, pathStk: pathStk, useExistingPath: useExistingPath, Proc: CallWhenRelationSatisfied];
IF exhausted THEN Proc[NIL];
END;
ENDCASE => ERROR;
END;
EnumerateEntries:
PUBLIC
PROCEDURE [tree: Tree, key: Key, relation: Relation ← equal, pathStk: PathStk ←
NIL, useExistingPath:
BOOLEAN ←
FALSE, Proc:
UNSAFE
PROCEDURE [entry: Entry]
RETURNS [continue:
BOOLEAN]]
RETURNS [exhausted:
BOOLEAN] =
BEGIN
pathStkWasNil: BOOLEAN ← pathStk=NIL;
tree.Lock[read];
IF pathStkWasNil
THEN
BEGIN
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ← tree.GetDefaultPathStk[];
END;
Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
BEGIN
ENABLE
UNWIND =>
{ IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk]; tree.Unlock[] };
leafStkTop: PathStkIndex;
pse: LONG POINTER TO PathStkEntry;
pagePtr: BTreePagePtr ← NIL;
skipFirstEntry: BOOLEAN ← FALSE;
equal: BOOLEAN;
[equal: equal, depth: leafStkTop] ← tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath];
IF pathStk.top=0
THEN
BEGIN -- key is less than any existing entry in the tree. Set PathStk to first record in leftmost leaf page.
IF leafStkTop=0 THEN GOTO exhausted; -- empty tree
pathStk.top ← leafStkTop;
pathStk.path[leafStkTop].lastOffset ← entry1Offset;
END
ELSE
SELECT relation
FROM
less =>
IF equal
AND ~PathToPreviousEntry[tree, pathStk]
THEN
{pathStk.top ← leafStkTop; pathStk.path[leafStkTop].lastOffset ← entry1Offset};
equal, greaterEqual =>
skipFirstEntry ← ~equal;
greater =>
skipFirstEntry ← TRUE;
ENDCASE;
exhausted ← FALSE;
pse ← @pathStk.path[pathStk.top];
DO
offset: PageOffset ← pse.lastOffset;
endOffset: PageOffset;
IF pagePtr=
NIL
THEN
BEGIN
pagePtr ← tree.ReferencePage[pse.pageNumber];
endOffset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
END;
IF offset>=endOffset
THEN
BEGIN -- ran off end of page. Pop up a level and do the next entry in the father page
tree.ReleasePage[pse.pageNumber];
IF offset>endOffset THEN ERROR Error[entrySizesWrong];
pagePtr ← NIL;
IF (pathStk.top ← pathStk.top-1) = 0
THEN
BEGIN -- ran off end of tree. PathStk is no longer valid.
pathStk.version ← LAST[LONG CARDINAL];
GOTO exhausted;
END;
pse ← pse - SIZE[PathStkEntry];
END
ELSE
BEGIN
-- enumerate this entry
nxtPgNo: PageNumber ← pagePtr[offset].grPage;
pse.offset ← offset + tree.BTreeEntrySize[@pagePtr[offset]];
IF skipFirstEntry THEN skipFirstEntry ← FALSE
ELSE IF ~Proc[@pagePtr[offset].entry !
UNWIND => tree.ReleasePage[pse.pageNumber]] THEN EXIT;
WHILE nxtPgNo#nilPage
DO
In a non-leaf page, find the leftmost right descendant of the entry we just did.
tree.ReleasePage[pse.pageNumber];
pathStk.top ← pathStk.top+1;
pse ← pse + SIZE[PathStkEntry];
pagePtr ← tree.ReferencePage[nxtPgNo];
pse.pageNumber ← nxtPgNo;
pse.nextToLastOffset ← nilOffset;
pse.lastOffset ← entry0Offset;
pse.offset ← entry1Offset;
endOffset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
nxtPgNo ← pagePtr.minPage;
ENDLOOP;
END;
pse.nextToLastOffset ← pse.lastOffset;
pse.lastOffset ← pse.offset;
ENDLOOP;
IF pagePtr#NIL THEN tree.ReleasePage[pse.pageNumber];
EXITS
exhausted => exhausted ← TRUE;
END;
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
END;
SalvageEntries:
PUBLIC
PROCEDURE [tree: Tree, Proc:
UNSAFE
PROCEDURE [entry: Entry]
RETURNS [continue:
BOOLEAN]]
RETURNS [exhausted:
BOOLEAN] =
BEGIN
tree.Lock[read];
FOR page: PageNumber
IN [statePage+1 .. tree.state.greatestPage]
DO
ENABLE UNWIND => tree.Unlock[];
pagePtr: BTreePagePtr = tree.ReferencePage[page];
IF pagePtr.freeWords
IN [0 ..
CARDINAL[tree.state.pageSize-pageOverhead]]
THEN
BEGIN
offset: PageOffset ← entry1Offset;
endOffset: PageOffset = nilOffset+(tree.state.pageSize-pagePtr.freeWords);
WHILE offset<endOffset
DO
entSize: CARDINAL = tree.BTreeEntrySize[@pagePtr[offset]];
IF entSize<entryOverhead+tree.minEntrySize OR offset+entSize>endOffset THEN EXIT;
IF ~Proc[@pagePtr[offset].entry] THEN GOTO aborted;
offset ← offset+entSize;
ENDLOOP;
END;
tree.ReleasePage[page];
REPEAT
aborted => exhausted ← FALSE;
FINISHED => exhausted ← TRUE;
ENDLOOP;
tree.Unlock[];
END;
NewPathStk:
PUBLIC
SAFE
PROCEDURE
RETURNS [pathStk: PathStk] =
CHECKED
BEGIN
pathStk ← NEW[PathStkObject];
pathStk.version ← LAST[LONG CARDINAL]; -- not valid
END;
BTreeInternal.
PathEntryLE:
PUBLIC
PROCEDURE [tree: Tree, key: Key, pathStk: PathStk, useExistingPath:
BOOLEAN ←
FALSE]
RETURNS [equal:
BOOLEAN, depth: PathStkIndex] =
BEGIN
BinSearchPage:
PROCEDURE
RETURNS [firstG: EntryOrdinal] =
Returns an index r such that offsetTable[r] is the offset of the first entry strictly greater than key, and fills in offsetTable[2] through offsetTable[r] with the offsets of all entries up to and including that one.
BEGIN
firstBadOffset: PageOffset = nilOffset + (tree.state.pageSize - pagePtr.freeWords);
midOffset: PageOffset ← PrincOpsUtils.BITSHIFT[firstBadOffset, -1];
curOffset: PageOffset ← entry1Offset;
l, r: EntryOrdinal ← 2;
Scan records linearly and build offsetTable until half the in-use portion of the BTree page has been examined. Then do one key comparison to detemine which half of the page to direct our attention to, and build the rest of offsetTable only if necessary. On the average this saves 25% of the work of building offsetTables, and costs no extra comparison since the first comparison would examine a middle record anyway (not necessarily the same one).
IF key#
NIL
THEN
DO
IF curOffset>=midOffset
THEN
BEGIN
IF curOffset>=firstBadOffset OR tree.Compare[key, @pagePtr[curOffset].entry] = less THEN EXIT;
midOffset ← firstBadOffset;
l ← r;
END;
curOffset ← curOffset + tree.BTreeEntrySize[@pagePtr[curOffset]];
r ← r+1;
offsetTable[r] ← curOffset;
ENDLOOP;
IF curOffset>firstBadOffset THEN ERROR Error[entrySizesWrong];
At this point, offsetTable[r] is the offset of the first non-entry, which we assume to have an infinite key (which we shall never test). We shall leave r pointing at the offset of the first entry strictly greater than key.
WHILE l<r
DO
m: EntryOrdinal ← PrincOpsUtils.BITSHIFT[l+r, -1];
SELECT tree.Compare[key, @pagePtr[offsetTable[m]].entry]
FROM
less => r ← m;
equal => {equal ← TRUE; RETURN [m+1]};
greater => l ← m+1;
ENDCASE;
ENDLOOP;
RETURN [r];
END; -- BinSearchPage
pse: LONG POINTER TO PathStkEntry;
offsetTable: OffsetTable;
curPage: PageNumber;
pagePtr: BTreePagePtr;
equal ← FALSE;
IF useExistingPath
AND pathStk.treeID=tree.id
AND pathStk.version=tree.version
AND key#
NIL
THEN
BEGIN -- supplied PathStk is still valid. See if it is for the correct key.
depth ← pathStk.top;
IF depth=0 THEN GOTO dontUse; -- who knows whether this is the min key or not?
WHILE pathStk.path[depth].leastSon # nilPage DO depth ← depth+1; ENDLOOP;
[pse: pse, ptr: pagePtr] ← tree.ReferenceStack[pathStk];
SELECT tree.Compare[key, @pagePtr[pse.lastOffset].entry]
FROM
less => useExistingPath ← FALSE; -- wrong key
equal => equal ← TRUE; -- correct key
greater =>
BEGIN
IF depth=pathStk.top AND pse.offset < nilOffset+(tree.state.pageSize-pagePtr.freeWords) AND tree.Compare[key, @pagePtr[pse.offset].entry]=less
THEN NULL -- key falls between this entry and next; ok
ELSE useExistingPath ← FALSE; -- wrong key, or too hard to decide
END;
ENDCASE;
tree.ReleasePage[pse.pageNumber];
IF useExistingPath THEN RETURN;
END;
Perform the normal BTree lookup
pathStk.top ← 0;
pathStk.path[0] ← [];
offsetTable ← GetOffsetTable[tree];
offsetTable[0] ← nilOffset;
offsetTable[1] ← entry0Offset;
offsetTable[2] ← entry1Offset;
depth ← 0;
pse ← @pathStk.path[0];
curPage ← tree.state.rootPage;
WHILE curPage#nilPage
DO
firstG: EntryOrdinal;
pagePtr ← tree.ReferencePage[curPage];
depth ← depth+1;
pse ← pse+SIZE[PathStkEntry];
pse.pageNumber ← curPage;
firstG ← BinSearchPage[ !
UNWIND => {tree.ReleasePage[curPage]; ReturnOffsetTable[tree, offsetTable]}];
pse.offset ← offsetTable[firstG];
pse.lastOffset ← offsetTable[firstG-1];
pse.nextToLastOffset ← offsetTable[firstG-2];
pse.leastSon ← pagePtr.minPage;
IF firstG>2 THEN pathStk.top ← depth; -- real entry in this page
AssignRefESR[@pse.eslFront, NIL];
AssignRefESR[@pse.eslRear, NIL];
curPage ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
ENDLOOP;
pathStk.treeID ← tree.id;
pathStk.version ← tree.version;
ReturnOffsetTable[tree, offsetTable];
END;
ReferenceStack:
PUBLIC
PROCEDURE [tree: Tree, pathStk: PathStk, type: ReferenceType ← read]
RETURNS [pse:
LONG
POINTER
TO PathStkEntry, ptr: PagePtr] =
BEGIN
pse ← @pathStk.path[pathStk.top];
ptr ← tree.ReferencePage[pse.pageNumber, type];
END;
BackUpOneEntry:
PUBLIC
PROCEDURE [tree: Tree, pse:
LONG
POINTER
TO PathStkEntry] =
BEGIN
tree.RepairOffsets[pse];
pse.offset ← pse.lastOffset;
tree.RepairOffsets[pse];
END;
RepairOffsets:
PUBLIC
PROCEDURE [tree: Tree, pse:
LONG
POINTER
TO PathStkEntry] =
BEGIN
CheckProgression:
PROCEDURE [offset1, offset2: PageOffset]
RETURNS [ok:
BOOLEAN] =
BEGIN
RETURN [
SELECT
TRUE
FROM
offset2=entry0Offset => offset1=nilOffset,
offset2=entry1Offset => offset1=entry0Offset,
offset1>=offset2 => FALSE,
ENDCASE => offset1+tree.BTreeEntrySize[@pagePtr[offset1]] = offset2];
END; -- CheckProgression
pagePtr: BTreePagePtr = tree.ReferencePage[pse.pageNumber];
IF ~(CheckProgression[pse.lastOffset, pse.offset]
AND CheckProgression[pse.nextToLastOffset, pse.lastOffset])
THEN
BEGIN
newOffset: PageOffset ← entry1Offset;
lastOffset: PageOffset ← entry0Offset;
nextToLastOffset: PageOffset ← nilOffset;
WHILE newOffset#pse.offset
DO
nextToLastOffset ← lastOffset;
lastOffset ← newOffset;
newOffset ← newOffset+tree.BTreeEntrySize[@pagePtr[newOffset]];
ENDLOOP;
pse.lastOffset ← lastOffset;
pse.nextToLastOffset ← nextToLastOffset;
END;
tree.ReleasePage[pse.pageNumber];
END;
PathToMaxDescendant:
PUBLIC
PROCEDURE [tree: Tree, pathStk: PathStk, page: PageNumber] =
BEGIN
pse: LONG POINTER TO PathStkEntry;
pagePtr: BTreePagePtr;
WHILE page#nilPage
DO
pathStk.top ← pathStk.top+1;
pse ← @pathStk.path[pathStk.top];
pse.pageNumber ← page;
pagePtr ← tree.ReferencePage[page];
pse.offset ← nilOffset+(tree.state.pageSize-pagePtr.freeWords);
pse.lastOffset ← entry1Offset;
pse.nextToLastOffset ← entry0Offset;
tree.RepairOffsets[pse];
AssignRefESR[@pse.eslFront, NIL];
AssignRefESR[@pse.eslRear, NIL];
pse.leastSon ← nilPage;
page ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
ENDLOOP;
END;
WriteStatePage:
PUBLIC
PROCEDURE [tree: Tree, update: UpdateState ← unchanged] =
BEGIN
statePtr: LONG POINTER TO TreeState = tree.ReferencePage[statePage, write];
statePtr^ ← tree.state;
tree.ReleasePage[statePage, update];
END;
Lock:
PUBLIC
ENTRY
PROCEDURE [tree: Tree, mode: LockMode, checkState:
BOOLEAN ←
TRUE] =
BEGIN
DO
SELECT tree.objectState
FROM
closed => IF checkState THEN RETURN WITH ERROR Error[closed];
suspended => IF checkState THEN {WAIT tree.unlocked; LOOP};
open => NULL
ENDCASE;
SELECT mode
FROM
read =>
IF tree.lockCount>=0 THEN {tree.lockCount ← tree.lockCount+1; EXIT};
update =>
IF tree.lockCount=0 THEN {tree.lockCount ← -1; EXIT};
ENDCASE;
WAIT tree.unlocked; -- wait for tree to be unlocked
ENDLOOP;
END;
Unlock:
PUBLIC
ENTRY
PROCEDURE [tree: Tree] =
BEGIN
IF tree.lockCount=0 THEN ERROR Bug[treeNotLocked];
tree.lockCount ← MAX[tree.lockCount-1, 0];
IF tree.lockCount=0 THEN BROADCAST tree.unlocked; -- BROADCAST, not NOTIFY, because releasing a write lock might make multiple readers eligible to enter simultaneously.
END;
GetHeapAndTable:
PUBLIC
ENTRY
PROCEDURE [tree: Tree, pathStk: PathStk] =
BEGIN
pathStk.heap ← tree.heap;
tree.heap ← NIL;
pathStk.entryTable ← tree.entryTable;
tree.entryTable ← NIL;
IF pathStk.heap#NIL THEN heapsReused ← heapsReused+1
ELSE
BEGIN
pathStk.heap ← NEW[Heap[tree.maxRecordsPerPage+1]]; -- heap holds at most one page's worth of entries at any given time
pathStk.entryTable ← NEW[EntryTable[3*tree.maxRecordsPerPage+1]]; -- may have to index scroll containing up to 3 pages' worth of entries
heapsCreated ← heapsCreated+1;
END;
END;
ReturnHeapAndTable:
PUBLIC
ENTRY
PROCEDURE [tree: Tree, pathStk: PathStk] =
BEGIN
IF tree.heap=
NIL
THEN
{ tree.heap ← pathStk.heap; tree.entryTable ← pathStk.entryTable };
pathStk.heap ← NIL;
pathStk.entryTable ← NIL;
END;
GetDefaultPathStk:
PUBLIC
ENTRY
PROCEDURE [tree: Tree]
RETURNS [pathStk: PathStk] =
BEGIN
pathStk ← tree.defaultPathStk;
tree.defaultPathStk ← NIL;
IF pathStk=NIL
THEN { pathStk ← NewPathStk[]; pathStksCreated ← pathStksCreated+1 }
ELSE { pathStk.version ← LAST [LONG CARDINAL]; pathStksReused ← pathStksReused+1 };
END;
ReturnDefaultPathStk:
PUBLIC
ENTRY
PROCEDURE [tree: Tree, pathStk: PathStk] =
{ IF tree.defaultPathStk=NIL THEN tree.defaultPathStk ← pathStk };
Error: PUBLIC SAFE ERROR [reason: Reason] = CODE;
UpdateInProgress: PUBLIC SAFE SIGNAL = CODE;
Bug: PUBLIC ERROR [type: BugType] = CODE;
Private
PathToPreviousEntry:
PROCEDURE [tree: Tree, pathStk: PathStk]
RETURNS [ok:
BOOLEAN] =
Backs up the PathStk to refer to the previous entry in the tree. Returns TRUE normally, FALSE if there is no previous entry.
BEGIN
pagePtr: BTreePagePtr;
pse: LONG POINTER TO PathStkEntry ← @pathStk.path[pathStk.top];
IF pathStk.top=0 THEN RETURN [FALSE];
tree.BackUpOneEntry[pse];
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
IF pagePtr[pse.lastOffset].grPage=nilPage
THEN
BEGIN -- this is a leaf page
WHILE pse.lastOffset<entry1Offset
DO -- hit beginning of current page: previous entry is left father
tree.ReleasePage[pse.pageNumber];
pathStk.top ← pathStk.top-1;
IF pathStk.top=0 THEN RETURN [FALSE];
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
ENDLOOP;
tree.ReleasePage[pse.pageNumber];
END
ELSE BEGIN
-- this is not a leaf page: previous entry is rightmost left son
page: PageNumber ← pagePtr[pse.lastOffset].grPage;
tree.ReleasePage[pse.pageNumber];
tree.PathToMaxDescendant[pathStk: pathStk, page: page];
END;
RETURN [TRUE];
END;
GetOffsetTable:
ENTRY
PROCEDURE [tree: Tree]
RETURNS [offsetTable: OffsetTable] =
Obtains the OffsetTable associated with the BTree, if it is available; otherwise makes a new one.
BEGIN
offsetTable ← tree.offsetTable;
tree.offsetTable ← NIL;
IF offsetTable#NIL THEN offsetTablesReused ← offsetTablesReused+1
ELSE
BEGIN
offsetTable ← NEW[OffsetTableObject[tree.maxRecordsPerPage+2]];
offsetTablesCreated ← offsetTablesCreated+1;
END;
END;
ReturnOffsetTable:
ENTRY
PROCEDURE [tree: Tree, offsetTable: OffsetTable] =
Gives back the OffsetTable.
{ IF tree.offsetTable=NIL THEN tree.offsetTable ← offsetTable };
treeID: CARDINAL ← 0;
offsetTablesReused, offsetTablesCreated: LONG CARDINAL ← 0;
heapsReused, heapsCreated: LONG CARDINAL ← 0;
pathStksReused, pathStksCreated: LONG CARDINAL ← 0;
END.