-- BTreeWrite.mesa
-- Operations for inserting new BTree entries and replacing existing ones.
-- Last Edited by: Taft, March 24, 1983 5:07 pm
DIRECTORY
BTree,
BTreeInternal,
Inline USING [LongCOPY],
Utilities USING [LongMove];
BTreeWrite: PROGRAM
IMPORTS BTree, BTreeInternal, Inline, Utilities
EXPORTS BTree, BTreeInternal
= BEGIN OPEN BTree, BTreeInternal;
-- BTree.
Tree: TYPE = REF TreeObject;
TreeObject: PUBLIC TYPE = BTreeInternal.TreeObject;
PathStk: TYPE = REF PathStkObject;
PathStkObject: PUBLIC TYPE = BTreeInternal.PathStkObject;
UpdateRecord: PUBLIC SAFE PROCEDURE [tree: Tree, key: Key, pathStk: PathStk ← NIL, useExistingPath: BOOLEANFALSE, record: Record] = TRUSTED
BEGIN
UpdateEntry[tree: tree, key: key, pathStk: pathStk, useExistingPath: useExistingPath, entry: tree.EntryFromRecord[record]];
END;
UpdateEntry: PUBLIC PROCEDURE [tree: Tree, key: Key, pathStk: PathStk ← NIL, useExistingPath: BOOLEANFALSE, entry: Entry] =
BEGIN
pathStkWasNil: BOOLEAN ← pathStk=NIL;
tree.Lock[update];
IF pathStkWasNil THEN
BEGIN
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ← tree.GetDefaultPathStk[];
END;
-- Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
BEGIN ENABLE UNWIND =>
{ IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk]; tree.Unlock[] };
leafStkTop: PathStkIndex;
pse: LONG POINTER TO PathStkEntry;
pagePtr: BTreePagePtr ← NIL;
foundEntSize: CARDINAL ← 0; -- zero means there is not an existing entry with this key
newEntSize: CARDINAL;
IF tree.Compare[key, entry]#equal THEN ERROR Error[wrongKeyForRecord];
newEntSize ← tree.EntrySize[entry];
IF newEntSize+entryOverhead NOT IN [1+entryOverhead..tree.maxFreeWords] THEN
ERROR Error[entrySizesWrong];
leafStkTop ← tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath];
IF pathStk.top>0 THEN
BEGIN
[ptr: pagePtr, pse: pse] ← tree.ReferenceStack[pathStk];
IF tree.Compare[key, @pagePtr[pse.lastOffset].entry]=equal THEN
foundEntSize ← tree.EntrySize[@pagePtr[pse.lastOffset].entry];
tree.ReleasePage[pse.pageNumber];
END;
-- To minimize average insertion time, perform the update in one of three ways (in increasing order of difficulty, as measured by amount of temporary storage allocated and amount of data copied):
-- 1. If replacing an existing entry of the same size, just overwrite it.
-- 2. If the new entry fits on the page (after removing the old entry if any), just slide up the entries beyond the insertion point and insert the new entry.
-- 3. Otherwise, leave the new entry as an EntSeqRecord at the appropriate stack level, and let InsertRecords cope with the problem.
-- This code also takes care not to perform the startOfUpdate and endOfUpdate write references to the state page when the update consists of only a single page write.
tree.version ← tree.version+1; -- invalidate existing PathStks that refer to this tree
pse ← @pathStk.path[pathStk.top];
IF newEntSize=foundEntSize THEN
BEGIN -- new record same length as old; just copy it over
tree.AdjustTreeState[update: unchanged, deltaEntryCount: 0];
pagePtr ← tree.ReferencePage[pse.pageNumber, write];
Inline.LongCOPY[to: @pagePtr[pse.lastOffset].entry, from: entry, nwords: newEntSize];
tree.ReleasePage[pse.pageNumber, endOfUpdate];
END
ELSE BEGIN
removedEntGrPage: PageNumber ← nilPage;
newEntryFits: BOOLEANFALSE;
IF foundEntSize=0 THEN
BEGIN -- no old entry to remove, and we will insert at the leaf level
pathStk.top ← leafStkTop;
pse ← @pathStk.path[pathStk.top];
END;
-- pathStk.top and pse now designate the page into which to insert the new entry.
IF pathStk.top>0 THEN
BEGIN
pagePtr ← tree.ReferencePage[pse.pageNumber];
newEntryFits ← newEntSize+entryOverhead <= pagePtr.freeWords + (IF foundEntSize=0 THEN 0 ELSE foundEntSize+entryOverhead);
tree.ReleasePage[pse.pageNumber];
END;
tree.AdjustTreeState[update: IF newEntryFits THEN unchanged ELSE startOfUpdate, deltaEntryCount: IF foundEntSize=0 THEN 1 ELSE 0];
IF pathStk.top>0 THEN pagePtr ← tree.ReferencePage[pse.pageNumber, write];
IF foundEntSize#0 THEN
-- first remove and discard old entry, but save its descendant pointer
[grPage: removedEntGrPage] ← tree.BackUpAndRemoveEntry[pse];
IF newEntryFits THEN
BEGIN -- new entry fits on the page; slide the greater entries out of the way and drop the new entry in
entPtr: LONG POINTER TO BTreeEntry ← @pagePtr[pse.offset];
Utilities.LongMove[pSink: entPtr+newEntSize+entryOverhead, pSource: entPtr, size: nilOffset+(tree.state.pageSize-pagePtr.freeWords)-pse.offset];
Inline.LongCOPY[to: @entPtr.entry, from: entry, nwords: newEntSize];
entPtr.grPage ← removedEntGrPage;
pagePtr.freeWords ← pagePtr.freeWords - (newEntSize+entryOverhead);
tree.ReleasePage[pse.pageNumber, endOfUpdate];
END
ELSE BEGIN -- new entry does not fit (or there isn't yet a page to fit it into)
esr: REF EntSeqRecord ← tempZone.NEW[EntSeqRecord[newEntSize+entryOverhead]];
esr.entSeqP ← LOOPHOLE[BASE[DESCRIPTOR[esr.entSeq]]];
esr.entSeqLen ← newEntSize+entryOverhead;
Inline.LongCOPY[to: @esr.entSeqP.entry, from: entry, nwords: newEntSize];
esr.entSeqP.grPage ← removedEntGrPage;
AppendEntSeqRecord[pse: pse, esr: esr];
IF pathStk.top>0 THEN tree.ReleasePage[pse.pageNumber];
tree.GetHeapAndTable[pathStk];
WHILE pathStk.path[pathStk.top].eslFront#NIL DO
tree.InsertRecords[pathStk !
UNWIND => tree.ReturnHeapAndTable[pathStk]];
IF pathStk.top=0 THEN EXIT ELSE pathStk.top ← pathStk.top-1;
ENDLOOP;
tree.ReturnHeapAndTable[pathStk];
tree.AdjustTreeState[update: endOfUpdate, deltaEntryCount: 0];
END;
END;
END;
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
END;
-- BTreeInternal.
AdjustTreeState: PUBLIC PROCEDURE [tree: Tree, update: UpdateState, deltaEntryCount: INTEGER] =
BEGIN
IF tree.maintainRecomputableState THEN
BEGIN -- normal update
IF tree.state.entryCount#LAST[LONG CARDINAL] THEN
tree.state.entryCount ← tree.state.entryCount+deltaEntryCount;
IF update#unchanged THEN
BEGIN
tree.state.updateInProgress ← update=startOfUpdate;
tree.WriteStatePage[update: update];
END;
END
ELSE IF deltaEntryCount#0 AND tree.state.entryCount#LAST[LONG CARDINAL] THEN
BEGIN -- remember that the entryCount is no longer being maintained
tree.state.entryCount ← LAST[LONG CARDINAL];
tree.WriteStatePage[];
END;
END;
BackUpOneEntry: PUBLIC PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry] =
BEGIN
tree.RepairOffsets[pse];
pse.offset ← pse.lastOffset;
tree.RepairOffsets[pse];
END;
RepairOffsets: PUBLIC PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry] =
BEGIN
CheckProgression: PROCEDURE [offset1, offset2: PageOffset] RETURNS [ok: BOOLEAN] =
BEGIN
RETURN [SELECT TRUE FROM
offset2=entry0Offset => offset1=nilOffset,
offset2=entry1Offset => offset1=entry0Offset,
offset1>=offset2 => FALSE,
ENDCASE => offset1+tree.BTreeEntrySize[@pagePtr[offset1]] = offset2];
END; -- CheckProgression
pagePtr: BTreePagePtr = tree.ReferencePage[pse.pageNumber];
IF ~(CheckProgression[pse.lastOffset, pse.offset] AND CheckProgression[pse.nextToLastOffset, pse.lastOffset]) THEN
BEGIN
newOffset: PageOffset ← entry1Offset;
lastOffset: PageOffset ← entry0Offset;
nextToLastOffset: PageOffset ← nilOffset;
WHILE newOffset#pse.offset DO
nextToLastOffset ← lastOffset;
lastOffset ← newOffset;
newOffset ← newOffset+tree.BTreeEntrySize[@pagePtr[newOffset]];
ENDLOOP;
pse.lastOffset ← lastOffset;
pse.nextToLastOffset ← nextToLastOffset;
END;
tree.ReleasePage[pse.pageNumber];
END;
InsertRecords: PUBLIC PROCEDURE [tree: Tree, pathStk: PathStk] =
BEGIN
pse: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top];
IF pse.eslFront#NIL THEN
BEGIN
pathStk.entryTable.length ← 0;
pathStk.entryTable.map[0].cumEntSize ← 0;
FOR esr: REF EntSeqRecord ← pse.eslFront, esr.fwdP UNTIL esr=NIL DO
AppendEntSeqLengths[tree: tree, pathStk: pathStk, esr: esr];
ENDLOOP;
IF pathStk.top=0 THEN MakeNewRoot[tree: tree, pathStk: pathStk]
ELSE BEGIN
pagePtr: BTreePagePtr = tree.ReferencePage[pse.pageNumber, write];
tailBlkPtr: LONG POINTER TO BTreeEntry = @pagePtr[pse.offset];
tailBlkLen: CARDINAL = (nilOffset+tree.state.pageSize-pagePtr.freeWords)-pse.offset;
wordsToInsert: CARDINAL = EntryIntervalSize[pathStk: pathStk];
IF wordsToInsert<=pagePtr.freeWords THEN
BEGIN -- all entries fit the current page. Hurrah!
Utilities.LongMove[pSink: tailBlkPtr+wordsToInsert, pSource: tailBlkPtr, size: nilOffset+(tree.state.pageSize-pagePtr.freeWords)-pse.offset];
DepositESL[tree: tree, pse: pse, block: tailBlkPtr, length: wordsToInsert];
pagePtr.freeWords ← pagePtr.freeWords-wordsToInsert;
tree.ReleasePage[pse.pageNumber];
END
ELSE BEGIN -- not all the entries will fit on the current page. This is getting complex.
rtBroPg1: PageNumber;
esr: REF EntSeqRecord ← MakeEntSeqRecord[@pagePtr[entry1Offset], pse.offset-entry1Offset];
PushEntSeqRecord[pse: pse, esr: esr];
PushEntSeqLengths[tree: tree, pathStk: pathStk, esr: esr];
esr ← MakeEntSeqRecord[tailBlkPtr, tailBlkLen];
AppendEntSeqRecord[pse: pse, esr: esr];
AppendEntSeqLengths[tree: tree, pathStk: pathStk, esr: esr];
tree.ReleasePage[pse.pageNumber];
rtBroPg1 ← ComplexInsertRecords[tree: tree, pathStk: pathStk];
IF rtBroPg1#nilPage THEN HairyInsertRecords[tree: tree, pathStk: pathStk, rtBroPg1: rtBroPg1];
END;
END;
IF pse.eslFront#NIL THEN ERROR Bug[entriesLeftOver];
END;
END;
MakeEntSeqRecord: PUBLIC PROCEDURE [entSeq: LONG POINTER TO BTreeEntry, length: CARDINAL] RETURNS [esr: REF EntSeqRecord] =
BEGIN
IF length=0 THEN RETURN [NIL];
esr ← tempZone.NEW[EntSeqRecord[length]];
esr.entSeqP ← LOOPHOLE[BASE[DESCRIPTOR[esr.entSeq]]];
esr.entSeqLen ← length;
Inline.LongCOPY[to: esr.entSeqP, from: entSeq, nwords: length];
END;
AppendEntSeqRecord: PUBLIC PROCEDURE [pse: LONG POINTER TO PathStkEntry, esr: REF EntSeqRecord] =
BEGIN
IF esr#NIL THEN
BEGIN
esr.fwdP ← NIL;
IF pse.eslRear=NIL THEN AssignRefESR[@pse.eslFront, esr] ELSE pse.eslRear.fwdP ← esr;
AssignRefESR[@pse.eslRear, esr];
END;
END;
PushEntSeqRecord: PUBLIC PROCEDURE [pse: LONG POINTER TO PathStkEntry, esr: REF EntSeqRecord] =
BEGIN
IF esr#NIL THEN
BEGIN
esr.fwdP ← pse.eslFront;
AssignRefESR[@pse.eslFront, esr];
IF pse.eslRear=NIL THEN AssignRefESR[@pse.eslRear, esr];
END;
END;
RemoveEntry: PUBLIC PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry, ignoreESL: BOOLEANFALSE] RETURNS [esr: REF EntSeqRecord, grPage: PageNumber] =
BEGIN
BasicRemoveEntry: PROCEDURE RETURNS [esr: REF EntSeqRecord] =
-- Removes the entry from the BTree page at pse.offset and returns an EntSeqRecord containing it.
BEGIN
pagePtr: BTreePagePtr = tree.ReferencePage[pse.pageNumber, write];
entSize: CARDINAL = tree.BTreeEntrySize[@pagePtr[pse.offset]];
esr ← MakeEntSeqRecord[entSeq: @pagePtr[pse.offset], length: entSize];
pagePtr.freeWords ← pagePtr.freeWords+entSize;
Inline.LongCOPY[to: @pagePtr[pse.offset], from: @pagePtr[pse.offset]+entSize, nwords: nilOffset+(tree.state.pageSize-pagePtr.freeWords)-pse.offset];
tree.ReleasePage[pse.pageNumber];
END;
RemoveFromEntSeqRecord: PROCEDURE RETURNS [esr: REF EntSeqRecord] =
-- Removes the first entry from the ESL at pse.offset (there had better be one) and returns an EntSeqRecord containing it.
BEGIN
entSize: CARDINAL = tree.BTreeEntrySize[pse.eslFront.entSeqP];
esr ← tempZone.NEW[EntSeqRecord[entSize]];
esr.entSeqP ← LOOPHOLE[BASE[DESCRIPTOR[esr.entSeq]]];
esr.entSeqLen ← entSize;
DepositESL[tree: tree, pse: pse, block: esr.entSeqP, length: entSize];
END;
esr ← IF ignoreESL OR pse.eslFront=NIL THEN BasicRemoveEntry[] ELSE RemoveFromEntSeqRecord[];
grPage ← esr.entSeqP.grPage;
esr.entSeqP.grPage ← nilPage;
IF grPage#nilPage THEN
BEGIN
pagePtr: BTreePagePtr = tree.ReferencePage[grPage];
esr.entSeqP.grPage ← pagePtr.minPage;
tree.ReleasePage[grPage];
END;
END;
BackUpAndRemoveEntry: PUBLIC PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry] RETURNS [esr: REF EntSeqRecord, grPage: PageNumber] =
BEGIN
tree.BackUpOneEntry[pse];
[esr: esr, grPage: grPage] ← tree.RemoveEntry[pse: pse, ignoreESL: TRUE];
END;
AllocatePage: PUBLIC SAFE PROCEDURE [tree: Tree] RETURNS [number: PageNumber] = TRUSTED
BEGIN
pagePtr: BTreePagePtr;
IF tree.state.firstFreePage=nilPage THEN
BEGIN
number ← (tree.state.greatestPage ← tree.state.greatestPage+1);
pagePtr ← tree.ReferencePage[number, new];
END
ELSE BEGIN
number ← tree.state.firstFreePage;
pagePtr ← tree.ReferencePage[number, write];
IF pagePtr.freeWords#freePageMarker THEN ERROR Bug[pageNotFree];
tree.state.firstFreePage ← pagePtr.minPage;
END;
pagePtr.freeWords ← tree.maxFreeWords;
tree.ReleasePage[number];
END;
FreePage: PUBLIC SAFE PROCEDURE [tree: Tree, number: PageNumber] = TRUSTED
BEGIN
pagePtr: BTreePagePtr = tree.ReferencePage[number, write];
IF pagePtr.freeWords=freePageMarker THEN ERROR Bug[pageAlreadyFree];
pagePtr.freeWords ← freePageMarker;
pagePtr.minPage ← tree.state.firstFreePage;
tree.state.firstFreePage ← number;
tree.ReleasePage[number];
END;
-- Private procedures
AppendEntSeqLengths: PROCEDURE [tree: Tree, pathStk: PathStk, esr: REF EntSeqRecord] =
-- Appends the cumulative lengths of the entries in the EntSeqRecord to the EntryTable held by the pathStk. It is ok for esr to be NIL.
BEGIN
IF esr#NIL THEN
BEGIN
entryTable: REF EntryTable ← pathStk.entryTable;
index: EntryOrdinal ← entryTable.length;
wordsLeft: CARDINAL ← esr.entSeqLen;
entry: LONG POINTER TO BTreeEntry ← esr.entSeqP;
WHILE wordsLeft>0 DO
entrySize: CARDINAL = tree.BTreeEntrySize[entry];
index ← index+1;
IF index >= entryTable.maxLength THEN ERROR Bug[tooManyEntriesInPage];
entryTable.map[index].cumEntSize ← entryTable.map[index-1].cumEntSize+entrySize;
entry ← entry+entrySize;
wordsLeft ← wordsLeft-entrySize;
ENDLOOP;
entryTable.length ← index;
END;
END;
PushEntSeqLengths: PROCEDURE [tree: Tree, pathStk: PathStk, esr: REF EntSeqRecord] =
-- Inserts the cumulative lengths of the EntSeqRecord's entries at the front of the EntryTable held by the pathStk, and appropriately adjusts the cumulative lengths of the ones already there. This must never be done while we have an active heap, or disaster will ensue. It is ok for esr to be NIL.
BEGIN
IF esr#NIL THEN
BEGIN
entryTable: REF EntryTable = pathStk.entryTable;
oldLen: EntryOrdinal = entryTable.length;
tempFirstOldIndex: EntryOrdinal = entryTable.maxLength-oldLen;
newLen: EntryOrdinal;
delta: CARDINAL;
-- Move existing stuff out of the way
Utilities.LongMove[pSink: @entryTable.map[tempFirstOldIndex], pSource: @entryTable.map[1], size: oldLen*SIZE[EntryTableRec]];
-- Now compute the new lengths as if the old entries weren't there
entryTable.length ← 0;
AppendEntSeqLengths[tree: tree, pathStk: pathStk, esr: esr];
newLen ← entryTable.length;
IF newLen >= tempFirstOldIndex THEN ERROR Bug[tooManyEntriesInPage];
entryTable.length ← newLen+oldLen;
-- Now make the old lengths contiguous with the new ones and adjust the cumulative lengths of the old entries
delta ← entryTable.map[newLen].cumEntSize;
FOR i: EntryOrdinal IN [0..oldLen) DO
entryTable.map[newLen+1+i].cumEntSize ← entryTable.map[tempFirstOldIndex+i].cumEntSize+delta;
ENDLOOP;
END;
END;
DepositESL: PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry, block: LONG POINTER TO BTreeEntry, length: CARDINAL] =
-- Removes entries from pse's ESL and deposits as many as will fit into the storage described by block and length. Raises Bug[depositESL] if the ESL is exhausted before the block is used up or if the end of the block would fall in the middle of an entry.
BEGIN
WHILE length#0 AND pse.eslFront#NIL DO
esr: REF EntSeqRecord ← pse.eslFront;
entSeqP: LONG POINTER TO BTreeEntry = esr.entSeqP;
IF esr.entSeqLen <= length THEN
BEGIN
Inline.LongCOPY[to: block, from: entSeqP, nwords: esr.entSeqLen];
block ← block+esr.entSeqLen;
length ← length-esr.entSeqLen;
AssignRefESR[@pse.eslFront, esr.fwdP];
esr.fwdP ← NIL;
END
ELSE BEGIN
firstEntSize: CARDINAL = tree.BTreeEntrySize[entSeqP];
IF firstEntSize <= length THEN
BEGIN
Inline.LongCOPY[to: block, from: entSeqP, nwords: firstEntSize];
block ← block+firstEntSize;
length ← length-firstEntSize;
esr.entSeqP ← entSeqP+firstEntSize;
esr.entSeqLen ← esr.entSeqLen-firstEntSize;
END
ELSE ERROR Bug[depositESL]; -- block would end in middle of entry
END;
ENDLOOP;
IF length#0 THEN ERROR Bug[depositESL]; -- ESL exhausted
IF pse.eslFront=NIL THEN AssignRefESR[@pse.eslRear, NIL];
END;
EntryIntervalSize: PROCEDURE [pathStk: PathStk, leftFather, rightFather: EntryOrdinal ← 0] RETURNS [words: CARDINAL] =
-- Computes the number of words occupied by the ESL entries bounded by the leftFather and rightFather ordinals, not inclusive. Note that it is ok for leftFather and rightFather to designate nonexistent entries, i.e., leftFather = 0 and rightFather = pathStk.entryTable.length+1. If rightFather = 0 then it is defaulted to pathStk.entryTable.length+1.
BEGIN
IF rightFather=0 THEN rightFather ← pathStk.entryTable.length+1;
RETURN [pathStk.entryTable.map[rightFather-1].cumEntSize - pathStk.entryTable.map[leftFather].cumEntSize];
END;
-- Private "cool" procedures
-- The following procedures are logically local to InsertRecords and are not called anywhere else. They are separated out because they are infrequently called and might be packaged separately, should we ever decide to package this code at all.
MakeNewRoot: PROCEDURE [tree: Tree, pathStk: PathStk] =
-- Makes a new root page given a pathStk now at level 0 and with a non-empty ESL.
BEGIN
wordsToInsert: CARDINAL = EntryIntervalSize[pathStk: pathStk, leftFather: 0];
newRootPage: PageNumber = tree.AllocatePage[];
pagePtr: BTreePagePtr = tree.ReferencePage[newRootPage, write];
IF tree.state.depth >= maxLevelsInTree THEN ERROR Error[depthExceeded];
pagePtr.minPage ← pathStk.path[0].leastSon;
tree.ReleasePage[newRootPage];
IF wordsToInsert > tree.maxFreeWords THEN ERROR Bug[newRootOverflow];
WritePage[tree: tree, pse: @pathStk.path[0], number: newRootPage, words: wordsToInsert];
tree.state.rootPage ← newRootPage;
tree.state.depth ← tree.state.depth+1;
END;
ComplexInsertRecords: PROCEDURE [tree: Tree, pathStk: PathStk] RETURNS [rtBroPg1: PageNumber] =
-- Called when not all the entries will fit on the current page. All of this page's entries have been extracted into the ESL for this level. Tries to spill over onto the right brother page, or onto the left brother page if there isn't a right brother, or onto a new page if neither brother exists. Returns rtBroPg1=nilPage if this is successful. Otherwise, repositions the current level of the pathStk (if necessary) so that a right brother exists, and returns the right brother's page number. This procedure is responsible for redistributing the entries among the two pages so as to minimize size of the entry promoted to the father page. Note that this considers only brothers and not cousins or more distant relatives.
BEGIN
pse: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top];
fatherPSE: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top-1];
entryTable: REF EntryTable = pathStk.entryTable;
oneBrotherEnough: BOOLEANFALSE;
fatherIndex, bestFatherIndex: EntryOrdinal;
bestFatherSize: CARDINAL ← tree.maxFreeWords+1;
fatherPSE.leastSon ← pse.pageNumber; -- in case this is the root page splitting
rtBroPg1 ← nilPage;
IF pathStk.top>1 THEN
BEGIN
rtBroPg1 ← FindRightBrother[tree: tree, pathStk: pathStk, spaceNeeded: -tree.maxFreeWords];
IF rtBroPg1=nilPage THEN
-- This may look strange, but see the comment below
rtBroPg1 ← FindLeftBrother[tree: tree, pathStk: pathStk, spaceNeeded: -tree.maxFreeWords];
END;
IF rtBroPg1=nilPage THEN rtBroPg1 ← tree.AllocatePage[];
-- At this point, we have two pages in hand, pse.pageNumber and rtBroPg1. All of their entries have been extracted into the ESL, so they may be considered blank pages. We will use rtBroPg1 as the right brother of the current page regardless of whether it was formerly the right brother, the left brother, or newly allocated.
IF entryTable.length<3 THEN ERROR Bug[tooFewEntries]; -- there must be at least one entry each from this page, the brother page, and the father page
fatherIndex ← FillLeftPage[tree: tree, pathStk: pathStk];
-- The idea next is to send the shortest entry into the father page such that the current page is at least "pretty" full (if we have such a choice).
DO
pl0, pl1, fatherSize: CARDINAL;
pl1 ← EntryIntervalSize[pathStk: pathStk, leftFather: fatherIndex];
IF pl1 > tree.maxFreeWords THEN EXIT;
pl0 ← EntryIntervalSize[pathStk: pathStk, rightFather: fatherIndex];
IF pl0=0 OR pl0+pl1 > tree.maxFreeWords+tree.awfullyFull THEN EXIT;
-- Still enough room in right brother page. See if this is the shortest father entry, and try moving one more entry into right brother page.
fatherSize ← IndexedEntrySize[pathStk: pathStk, index: fatherIndex];
IF fatherSize<bestFatherSize THEN
BEGIN
bestFatherIndex ← fatherIndex;
bestFatherSize ← fatherSize;
oneBrotherEnough ← TRUE;
END;
fatherIndex ← fatherIndex-1;
ENDLOOP;
IF oneBrotherEnough THEN
BEGIN
breakSize: CARDINAL = EntryIntervalSize[pathStk: pathStk, rightFather: bestFatherIndex];
totalSize: CARDINAL = EntryIntervalSize[pathStk: pathStk];
WritePage[tree: tree, pse: pse, number: pse.pageNumber, words: breakSize];
PushEntSeqRecord[pse: fatherPSE, esr: WriteRightBrother[tree: tree, pse: pse, rtBroPg: rtBroPg1, words: totalSize-breakSize]];
rtBroPg1 ← nilPage;
END;
END;
HairyInsertRecords: PROCEDURE [tree: Tree, pathStk: PathStk, rtBroPg1: PageNumber] =
-- Called when not all the entries will fit on the current page and the right brother page. Pours all the entries into the current and right brother pages and either the second right brother page or the left brother page, creating a new second right brother page if neither exists or there is still not enough space. This procedure is responsible for redistributing the entries among the three pages so as to minimize the sum of sizes of the entries promoted to the father page. Note that this considers only brothers and not cousins or more distant relatives.
BEGIN
AddToHeap: PROCEDURE [entry: EntryOrdinal] =
BEGIN
heap.length ← heap.length+1;
TrickleDown[emptyIndex: heap.length, entry: entry];
END; -- AddToHeap
RemoveFromHeap: PROCEDURE [entry: EntryOrdinal] =
BEGIN
heapPos: HeapIndex = entryTable.map[entry].heapPos;
heap.length ← heap.length-1;
IF heapPos <= heap.length THEN
BEGIN
replacementEntry: EntryOrdinal = heap.entries[heap.length+1];
IF IndexedEntrySize[pathStk: pathStk, index: replacementEntry] <= IndexedEntrySize[pathStk: pathStk, index: entry]
THEN TrickleDown[emptyIndex: heapPos, entry: replacementEntry]
ELSE SiftUp[emptyIndex: heapPos, entry: replacementEntry];
END;
END; -- RemoveFromHeap
TrickleDown: PROCEDURE [emptyIndex: HeapIndex, entry: EntryOrdinal] =
BEGIN
sonSize: CARDINAL = IndexedEntrySize[pathStk: pathStk, index: entry];
son: HeapIndex ← emptyIndex;
DO
father: HeapIndex ← son/2;
fatherEnt: EntryOrdinal;
IF father<=0 THEN EXIT;
fatherEnt ← heap.entries[father];
IF IndexedEntrySize[pathStk: pathStk, index: fatherEnt] <= sonSize THEN EXIT;
heap.entries[son] ← fatherEnt;
entryTable.map[fatherEnt].heapPos ← son;
son ← father;
ENDLOOP;
heap.entries[son] ← entry;
entryTable.map[entry].heapPos ← son;
END; -- TrickleDown
SiftUp: PROCEDURE [emptyIndex: HeapIndex, entry: EntryOrdinal] =
BEGIN
entrySize: CARDINAL = IndexedEntrySize[pathStk: pathStk, index: entry];
DO
son: HeapIndex ← emptyIndex*2;
sonEntry: EntryOrdinal;
IF son > heap.length THEN EXIT;
sonEntry ← heap.entries[son];
IF son < heap.length AND IndexedEntrySize[pathStk: pathStk, index: heap.entries[son+1]] < IndexedEntrySize[pathStk: pathStk, index: sonEntry] THEN
{ son ← son+1; sonEntry ← heap.entries[son] };
IF IndexedEntrySize[pathStk: pathStk, index: sonEntry] >= entrySize THEN EXIT;
heap.entries[emptyIndex] ← sonEntry;
entryTable.map[sonEntry].heapPos ← emptyIndex;
emptyIndex ← son;
ENDLOOP;
heap.entries[emptyIndex] ← entry;
entryTable.map[entry].heapPos ← emptyIndex;
END; -- SiftUp
entryTable: REF EntryTable = pathStk.entryTable;
heap: REF Heap = pathStk.heap;
pse: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top];
fatherPSE: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top-1]; -- father's pse
rtBroPg2: PageNumber;
fatherIndex, fatherIndex2, bestFatherIndex, bestFatherIndex2: EntryOrdinal;
minFeasIndex, maxFeasIndex: EntryOrdinal;
bestFatherSizeSum: CARDINAL ← 2*tree.maxFreeWords + 1;
twoBrothersEnough: BOOLEANFALSE;
breakSize1, breakSize2, totalSize: CARDINAL;
fatherESR: REF EntSeqRecord;
-- See how much free space our second brother page would have to contain in order to handle the overflow. This is done by pretending to fill up this page and the first right brother page and seeing what is left over.
fatherIndex ← FillLeftPage[tree: tree, pathStk: pathStk];
fatherIndex2 ← FillLeftPage[tree: tree, pathStk: pathStk, leftFather: fatherIndex];
-- The current page can't be the root, because one brother would surely have been enough in that case; so we don't have to pussyfoot when calling FindRightBrother.
rtBroPg2 ← FindRightBrother[tree: tree, pathStk: pathStk, spaceNeeded: EntryIntervalSize[pathStk: pathStk, leftFather: fatherIndex2] + 2*tree.breathingSpace];
IF rtBroPg2=nilPage THEN
BEGIN -- no luck, try the left brother
fe2: EntryOrdinal = FillRightPage[tree: tree, pathStk: pathStk];
fe: EntryOrdinal = FillRightPage[tree: tree, pathStk: pathStk, rightFather: fe2];
rtBroPg2 ← FindLeftBrother[tree: tree, pathStk: pathStk, spaceNeeded: EntryIntervalSize[pathStk: pathStk, leftFather: 0, rightFather: fe] + 2*tree.breathingSpace];
IF rtBroPg2=nilPage THEN rtBroPg2 ← tree.AllocatePage[] -- still no luck, allocate new page
ELSE BEGIN -- left brother had space, but fatherIndexes are now invalid
fatherIndex ← FillLeftPage[tree: tree, pathStk: pathStk];
fatherIndex2 ← FillLeftPage[tree: tree, pathStk: pathStk, leftFather: fatherIndex];
END;
END;
IF entryTable.length<5 THEN ERROR Bug[tooFewEntries]; -- there must be two entries from the father page and at least one entry each from this page and the two brother pages
-- Now figure out how to divide the entries among the three pages in a way that minimizes the sum of the sizes of the two entries sent to the father page while attempting to keep the pages at least "fairly full". The way this is done is as follows. The left cut point (fatherIndex) is swept leftward from its initial maximum possible value, and all possible right cut points for the initial left cut point are thrown into a heap ordered by entry size. As the left cut point moves left, some possible right cut points are added and some are removed. At each step, the minimum-size entry for the right cut point is on the top of the heap. The sum of that and the entry for the left cut point is computed and the minimum remembered.
heap.length ← 0;
maxFeasIndex ← fatherIndex2;
WHILE EntryIntervalSize[pathStk: pathStk, leftFather: maxFeasIndex] <= tree.fairlyFull DO
maxFeasIndex ← maxFeasIndex-1;
ENDLOOP;
minFeasIndex ← maxFeasIndex+1;
WHILE EntryIntervalSize[pathStk: pathStk, rightFather: fatherIndex] > (IF twoBrothersEnough THEN tree.prettyFull ELSE 0) DO
WHILE EntryIntervalSize[pathStk: pathStk, leftFather: fatherIndex, rightFather: minFeasIndex-1] > 0 AND EntryIntervalSize[pathStk: pathStk, leftFather: minFeasIndex-1] <= tree.maxFreeWords DO
minFeasIndex ← minFeasIndex-1;
IF minFeasIndex <= maxFeasIndex THEN AddToHeap[minFeasIndex];
ENDLOOP;
WHILE EntryIntervalSize[pathStk: pathStk, leftFather: fatherIndex, rightFather: maxFeasIndex] > tree.maxFreeWords DO
IF maxFeasIndex >= minFeasIndex THEN RemoveFromHeap[maxFeasIndex];
maxFeasIndex ← maxFeasIndex-1;
ENDLOOP;
IF heap.length>0 THEN
BEGIN
fatherSizeSum: CARDINAL;
fatherIndex2 ← heap.entries[1];
fatherSizeSum ← IndexedEntrySize[pathStk: pathStk, index: fatherIndex] + IndexedEntrySize[pathStk: pathStk, index: fatherIndex2];
IF fatherSizeSum<bestFatherSizeSum THEN
BEGIN
twoBrothersEnough ← TRUE;
bestFatherSizeSum ← fatherSizeSum;
bestFatherIndex ← fatherIndex;
bestFatherIndex2 ← fatherIndex2;
END;
END;
fatherIndex ← fatherIndex-1;
ENDLOOP;
IF ~twoBrothersEnough THEN ERROR Bug[twoBrothersNotEnough];
-- Write the three pages and promote the two father entries to the next level.
breakSize1 ← EntryIntervalSize[pathStk: pathStk, rightFather: bestFatherIndex];
breakSize2 ← EntryIntervalSize[pathStk: pathStk, rightFather: bestFatherIndex2];
totalSize ← EntryIntervalSize[pathStk: pathStk];
WritePage[tree: tree, pse: pse, number: pse.pageNumber, words: breakSize1];
fatherESR ← WriteRightBrother[tree: tree, pse: pse, rtBroPg: rtBroPg1, words: breakSize2-breakSize1];
PushEntSeqRecord[pse: fatherPSE, esr: WriteRightBrother[tree: tree, pse: pse, rtBroPg: rtBroPg2, words: totalSize-breakSize2]];
PushEntSeqRecord[pse: fatherPSE, esr: fatherESR];
END;
FindRightBrother: PROCEDURE [tree: Tree, pathStk: PathStk, spaceNeeded: INTEGER] RETURNS [rtBroPg: PageNumber] =
-- Finds the right brother of the current page, and determines whether it has room for at least spaceNeeded additional words. If so, removes the father entry and all right brother entries and appends them to the ESL for this level. Returns nilPage if there is no right brother or it is too full. Passing a spaceNeeded argument of -tree.maxFreeWords will find the right brother if it exists, regardless of how full it is.
BEGIN
pse: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top];
fatherPSE: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top-1];
fatherEntSize: CARDINAL;
pagePtr: BTreePagePtr;
fatherESR, rtBroESR: REF EntSeqRecord;
IF fatherPSE.eslFront=NIL THEN
BEGIN
pagePtr ← tree.ReferencePage[fatherPSE.pageNumber];
IF fatherPSE.offset = nilOffset+(tree.state.pageSize-pagePtr.freeWords) THEN
{ tree.ReleasePage[fatherPSE.pageNumber]; RETURN [nilPage] }; -- no right brother
fatherEntSize ← tree.BTreeEntrySize[@pagePtr[fatherPSE.offset]];
rtBroPg ← pagePtr[fatherPSE.offset].grPage;
tree.ReleasePage[fatherPSE.pageNumber];
END
ELSE BEGIN
fatherEntSize ← tree.BTreeEntrySize[fatherPSE.eslFront.entSeqP];
rtBroPg ← fatherPSE.eslFront.entSeqP.grPage;
END;
pagePtr ← tree.ReferencePage[rtBroPg];
IF LOOPHOLE[pagePtr.freeWords-fatherEntSize, INTEGER] < spaceNeeded THEN
{ tree.ReleasePage[rtBroPg]; RETURN [nilPage] }; -- right brother too full
rtBroESR ← MakeEntSeqRecord[entSeq: @pagePtr.entries, length: tree.maxFreeWords-pagePtr.freeWords];
tree.ReleasePage[rtBroPg];
[esr: fatherESR] ← tree.RemoveEntry[pse: fatherPSE];
AppendEntSeqLengths[tree: tree, pathStk: pathStk, esr: fatherESR];
AppendEntSeqRecord[pse: pse, esr: fatherESR];
AppendEntSeqLengths[tree: tree, pathStk: pathStk, esr: rtBroESR];
AppendEntSeqRecord[pse: pse, esr: rtBroESR];
END;
FindLeftBrother: PROCEDURE [tree: Tree, pathStk: PathStk, spaceNeeded: INTEGER] RETURNS [ltBroPg: PageNumber] =
-- Finds the left brother of the current page, and determines whether it has room for at least spaceNeeded additional words. If so, backs up one entry at the father's level, removes the father entry and all left brother entries, and inserts them at the front of the ESL for this level. Returns nilPage if there is no left brother or it is too full. Passing a spaceNeeded argument of -tree.maxFreeWords will find the left brother if it exists, regardless of how full it is.
BEGIN
pse: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top];
fatherPSE: LONG POINTER TO PathStkEntry = @pathStk.path[pathStk.top-1];
fatherPagePtr, ltBroPagePtr, rtBroPagePtr: BTreePagePtr;
fatherESR, ltBroESR: REF EntSeqRecord;
fatherEntSize: CARDINAL;
rtBroOfLtBroPg: PageNumber;
IF fatherPSE.offset <= entry1Offset THEN RETURN [nilPage];
fatherPagePtr ← tree.ReferencePage[fatherPSE.pageNumber];
ltBroPg ← fatherPagePtr[fatherPSE.nextToLastOffset].grPage;
rtBroOfLtBroPg ← fatherPagePtr[fatherPSE.lastOffset].grPage;
fatherEntSize ← tree.BTreeEntrySize[@fatherPagePtr[fatherPSE.lastOffset]];
tree.ReleasePage[fatherPSE.pageNumber];
ltBroPagePtr ← tree.ReferencePage[ltBroPg];
IF LOOPHOLE[ltBroPagePtr.freeWords-fatherEntSize, INTEGER] < spaceNeeded THEN
{ tree.ReleasePage[ltBroPg]; RETURN [nilPage] };
ltBroESR ← MakeEntSeqRecord[entSeq: @ltBroPagePtr.entries, length: tree.maxFreeWords-ltBroPagePtr.freeWords];
fatherPagePtr ← tree.ReferencePage[fatherPSE.pageNumber, write];
fatherPagePtr[fatherPSE.nextToLastOffset].grPage ← rtBroOfLtBroPg;
tree.ReleasePage[fatherPSE.pageNumber];
[esr: fatherESR] ← tree.BackUpAndRemoveEntry[pse: fatherPSE];
rtBroPagePtr ← tree.ReferencePage[rtBroOfLtBroPg, write];
fatherESR.entSeqP.grPage ← rtBroPagePtr.minPage;
rtBroPagePtr.minPage ← ltBroPagePtr.minPage;
tree.ReleasePage[rtBroOfLtBroPg];
tree.ReleasePage[ltBroPg];
PushEntSeqLengths[tree: tree, pathStk: pathStk, esr: fatherESR];
PushEntSeqRecord[pse: pse, esr: fatherESR];
PushEntSeqLengths[tree: tree, pathStk: pathStk, esr: ltBroESR];
PushEntSeqRecord[pse: pse, esr: ltBroESR];
END;
WriteRightBrother: PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry, rtBroPg: PageNumber, words: CARDINAL] RETURNS [fatherESR: REF EntSeqRecord] =
-- Removes words' worth of entries from the front of the ESL for this level, and writes all but the first entry into rtBroPg. Designates the first entry as the (left) father of rtBroPg, and returns a new ESR containing it. Also sets the page's freeWords and minPage fields appropriately.
BEGIN
pagePtr: BTreePagePtr;
minPage: PageNumber;
[esr: fatherESR, grPage: minPage] ← tree.RemoveEntry[pse: pse];
words ← words-fatherESR.entSeqLen;
pagePtr ← tree.ReferencePage[rtBroPg, write];
pagePtr.minPage ← minPage;
tree.ReleasePage[rtBroPg];
WritePage[tree: tree, pse: pse, number: rtBroPg, words: words];
fatherESR.entSeqP.grPage ← rtBroPg;
END;
WritePage: PROCEDURE [tree: Tree, pse: LONG POINTER TO PathStkEntry, number: PageNumber, words: CARDINAL] =
-- Removes words' worth of entries from the front of the ESL for this level, and writes them into the page designated by number. Sets the page's freeWords appropriately, but does not touch minPage.
BEGIN
pagePtr: BTreePagePtr = tree.ReferencePage[number, write];
DepositESL[tree: tree, pse: pse, block: @pagePtr.entries, length: words];
pagePtr.freeWords ← tree.maxFreeWords-words;
tree.ReleasePage[number];
END;
IndexedEntrySize: PROCEDURE [pathStk: PathStk, index: EntryOrdinal] RETURNS [words: CARDINAL] = INLINE
{ RETURN [EntryIntervalSize[pathStk: pathStk, leftFather: index-1, rightFather: index+1]] };
FillLeftPage: PROCEDURE [tree: Tree, pathStk: PathStk, leftFather, rightFather: EntryOrdinal ← 0] RETURNS [midFather: EntryOrdinal] =
-- Finds the largest entry ordinal in (leftFather .. rightFather) such that all the entries in (leftFather .. midFather) will fit in one BTree page. If rightFather = 0 then it is defaulted to pathStk.entryTable.length+1.
BEGIN
IF rightFather=0 THEN rightFather ← pathStk.entryTable.length+1;
midFather ← leftFather+2;
WHILE midFather<rightFather-2 AND EntryIntervalSize[pathStk: pathStk, leftFather: leftFather, rightFather: midFather+1] <= tree.maxFreeWords DO
midFather ← midFather+1;
ENDLOOP;
END;
FillRightPage: PROCEDURE [tree: Tree, pathStk: PathStk, leftFather, rightFather: EntryOrdinal ← 0] RETURNS [midFather: EntryOrdinal] =
-- Finds the smallest entry ordinal in (leftFather .. rightFather) such that all the entries in (midFather .. rightFather) will fit in one BTree page. If rightFather = 0 then it is defaulted to pathStk.entryTable.length+1.
BEGIN
IF rightFather=0 THEN rightFather ← pathStk.entryTable.length+1;
midFather ← rightFather-2;
WHILE midFather>leftFather+2 AND EntryIntervalSize[pathStk: pathStk, leftFather: midFather-1, rightFather: rightFather] <= tree.maxFreeWords DO
midFather ← midFather-1;
ENDLOOP;
END;
END.