BTree.
Tree: TYPE = REF TreeObject;
TreeObject: PUBLIC TYPE = BTreeInternal.TreeObject;
PathStk: TYPE = REF PathStkObject;
PathStkObject: PUBLIC TYPE = BTreeInternal.PathStkObject;
PathStkEntryPtr: TYPE = POINTER TO PathStkEntry;
BTreeEntryPtr: TYPE = POINTER TO BTreeEntry;
silly: REF ¬ NEW[TreeObject];
DeleteKey:
PUBLIC
SAFE
PROC
[tree: Tree, key: Key, pathStk: PathStk ¬
NIL, useExistingPath:
BOOL ¬
FALSE]
RETURNS [found:
BOOL] =
TRUSTED {
FatherMayNeedWork:
PROC
RETURNS [needsWork:
BOOL] = {
This code assumes that the son page is pointed to by the fatherPage[lastOffset].grPage and that this condition is preserved by InsertRecords.
pagePtr, otherPtr: BTreePagePtr;
fatherPSE: PathStkEntryPtr;
fatherFreeWords: CARD;
pse ¬ @pathStk.path[pathStk.top];
IF pse.eslFront=
NIL
THEN needsWork ¬
FALSE
ELSE {
tree.InsertRecords[pathStk];
needsWork ¬ pathStk.top#0 AND pathStk.path[pathStk.top-1].eslFront#NIL;
};
pagePtr ¬ tree.ReferencePage[pse.pageNumber];
IF pathStk.top=1
AND pagePtr.freeBytes=tree.maxFreeBytes
THEN {
Bye-bye, old root page!
tree.state.rootPage ¬ pagePtr.minPage;
tree.state.depth ¬ tree.state.depth-1;
tree.ReleasePage[pse.pageNumber];
tree.FreePage[pse.pageNumber];
RETURN [FALSE];
};
IF pathStk.top=1
OR tree.maxFreeBytes-pagePtr.freeBytes >= tree.prettyFull
THEN {
tree.ReleasePage[pse.pageNumber];
RETURN;
};
Page is not sufficiently full. Try to merge with left or right brother page. This is done by extracting the entire contents of this page (plus one father entry) into the ESL, freeing this page, repositioning to the brother page, and calling InsertRecords. Of course, there may not actually be enough space in the brother page(s), in which case InsertRecords will turn around and allocate a new page. But in any event the overall balance of the tree should be improved.
AppendEntSeqRecord[
pse: pse,
esr: MakeEntSeqRecord[
entSeq: FirstEntry[pagePtr],
length: tree.maxFreeBytes-pagePtr.freeBytes]];
fatherPSE ¬ @pathStk.path[pathStk.top-1];
otherPtr ¬ tree.ReferencePage[fatherPSE.pageNumber];
fatherFreeWords ¬ otherPtr.freeBytes;
tree.ReleasePage[fatherPSE.pageNumber];
IF fatherPSE.offset < nilOffset+(tree.state.pageSize-fatherFreeWords)
OR fatherPSE.eslFront#
NIL
THEN {
the current page has a right brother
rtBroPg: PageNumber;
esr: REF EntSeqRecord;
[esr: esr, grPage: rtBroPg] ¬ tree.RemoveEntry[fatherPSE];
AppendEntSeqRecord[pse: pse, esr: esr];
otherPtr ¬ tree.ReferencePage[fatherPSE.pageNumber, write];
IF otherPtr[fatherPSE.lastOffset].grPage # pse.pageNumber
THEN
ERROR Bug[mcCreightWasWrong];
otherPtr[fatherPSE.lastOffset].grPage ¬ rtBroPg;
tree.ReleasePage[fatherPSE.pageNumber];
otherPtr ¬ tree.ReferencePage[rtBroPg, write];
otherPtr.minPage ¬ pagePtr.minPage;
tree.ReleasePage[rtBroPg];
tree.ReleasePage[pse.pageNumber];
tree.FreePage[pse.pageNumber];
pse.pageNumber ¬ rtBroPg;
pse.offset ¬ entry1Offset;
pse.lastOffset ¬ entry0Offset;
pse.nextToLastOffset ¬ nilOffset;
}
ELSE {
the current page surely has a left brother
esr: REF EntSeqRecord;
tree.ReleasePage[pse.pageNumber];
[esr: esr] ¬ tree.BackUpAndRemoveEntry[fatherPSE];
PushEntSeqRecord[pse: pse, esr: esr];
tree.FreePage[pse.pageNumber];
otherPtr ¬ tree.ReferencePage[fatherPSE.pageNumber];
pse.pageNumber ¬ otherPtr[fatherPSE.lastOffset].grPage;
tree.ReleasePage[fatherPSE.pageNumber];
pagePtr ¬ tree.ReferencePage[pse.pageNumber];
pse.offset ¬ nilOffset+(tree.state.pageSize-pagePtr.freeBytes);
pse.lastOffset ¬ entry1Offset;
pse.nextToLastOffset ¬ entry0Offset;
tree.ReleasePage[pse.pageNumber];
tree.RepairOffsets[pse];
};
IF pse.eslFront#NIL THEN tree.InsertRecords[pathStk];
RETURN [TRUE];
};
pathStkWasNil: BOOL ¬ pathStk=NIL;
pse: PathStkEntryPtr;
tree.Lock[update];
IF pathStkWasNil
THEN {
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ¬ tree.GetDefaultPathStk[];
};
{
Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
ENABLE
UNWIND =>
{
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
};
origStkTop: PathStkIndex;
pagePtr: BTreePagePtr ¬ NIL;
descendantPg: PageNumber;
simpleDelete:
BOOL ¬
FALSE;
entry is "simple" to delete if it is in a leaf page and removing it will still leave the page at least "prettyFull".
equal: BOOL ¬ tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath].equal;
IF ~equal
THEN {
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
RETURN [FALSE];
};
origStkTop ¬ pathStk.top;
[ptr: pagePtr, pse: pse] ¬ tree.ReferenceStack[pathStk];
descendantPg ¬ pagePtr[pse.nextToLastOffset].grPage;
IF descendantPg = nilPage
THEN {
bteSize: CARD ¬ tree.BTreeEntrySize[@pagePtr[pse.lastOffset]];
IF tree.maxFreeBytes - (pagePtr.freeBytes + bteSize) >= tree.prettyFull
THEN
simpleDelete ¬ TRUE;
};
tree.ReleasePage[pse.pageNumber];
tree.version ¬ tree.version+1;
invalidate existing PathStks that refer to this tree
Set up to delete the entry. If it is in a leaf page, we just remove it. If it is in an interior page, we must find a leaf entry to replace it with.
tree.BackUpOneEntry[pse]; -- pse.offset should index deletion victim
IF simpleDelete
THEN {
tree.AdjustTreeState[update: unchanged, deltaEntryCount: -1];
pagePtr ¬ tree.ReferencePage[pse.pageNumber, write];
[] ¬ tree.RemoveEntry[pse];
tree.ReleasePage[pse.pageNumber, IF tree.longUpdate THEN unchanged ELSE endOfUpdate];
}
ELSE {
tree.AdjustTreeState[update: startOfUpdate, deltaEntryCount: -1];
Deletion surrogate is one with greatest key less than victim's
tree.PathToMaxDescendant[pathStk: pathStk, page: descendantPg];
IF pathStk.top > origStkTop
THEN {
dpse: PathStkEntryPtr = @pathStk.path[pathStk.top];
leafESR: REF EntSeqRecord ¬ tree.BackUpAndRemoveEntry[dpse].esr;
[grPage: leafESR.entSeqP.grPage] ¬ tree.RemoveEntry[pse];
discard returned ESR
AppendEntSeqRecord[pse: pse, esr: leafESR];
}
ELSE [] ¬ tree.RemoveEntry[pse];
tree.GetHeapAndTable[pathStk];
DO
needsWork:
BOOL ¬ FatherMayNeedWork[ !
UNWIND => tree.ReturnHeapAndTable[pathStk]];
IF pathStk.top=0 OR (~needsWork AND pathStk.top<=origStkTop) THEN EXIT
ELSE pathStk.top ¬ pathStk.top-1;
ENDLOOP;
pathStk.top ¬ 0;
tree.ReturnHeapAndTable[pathStk];
tree.AdjustTreeState[update: endOfUpdate, deltaEntryCount: 0];
};
};
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
RETURN [TRUE];
};
UpdateRecord:
PUBLIC
SAFE
PROC
[tree: Tree, key: Key, pathStk: PathStk ¬
NIL, useExistingPath:
BOOL ¬
FALSE,
record: Record, updateType: UpdateType ¬ insertOrReplace] =
TRUSTED {
ProduceEntry: EachEntryProc = {
Basics.MoveWords[dst: entry, src: tree.EntryFromRecord[record], count: BytesToWords[bytes]];
};
bytes: EntSize = tree.EntrySize[tree.EntryFromRecord[record]];
UpdateEntry[tree: tree, key: key, pathStk: pathStk, useExistingPath: useExistingPath, bytes: bytes, Proc: ProduceEntry, updateType: updateType];
};
UpdateEntry:
PUBLIC
PROC
[tree: Tree, key: Key, pathStk: PathStk ¬
NIL,
useExistingPath:
BOOL ¬
FALSE, bytes: EntSize,
Proc: EachEntryProc, updateType: UpdateType ¬ insertOrReplace] = {
CallEntryProc: EachEntryProc = {
Proc[entry];
IF tree.EntrySize[entry]#bytes
OR tree.Compare[key, entry]#equal
THEN
ERROR Error[wrongEntryProduced];
};
pathStkWasNil: BOOL ¬ pathStk=NIL;
tree.Lock[update];
IF pathStkWasNil
THEN {
IF useExistingPath THEN ERROR Error[nilPathStk];
pathStk ¬ tree.GetDefaultPathStk[];
};
Extra nesting required so that pathStkWasNil is visible in the catch phrase (yecch)!
{
ENABLE
UNWIND => {
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
};
leafStkTop: PathStkIndex;
equal: BOOL;
pse: PathStkEntryPtr;
pagePtr: BTreePagePtr ¬ NIL;
foundEntSize: CARD ¬ 0; -- zero means there is not an existing entry with this key
IF
CARD[bytes+entryOverhead]
NOT
IN [1+entryOverhead..tree.maxFreeBytes]
THEN
ERROR Error[entrySizesWrong];
[equal: equal, depth: leafStkTop] ¬ tree.PathEntryLE[key: key, pathStk: pathStk, useExistingPath: useExistingPath];
IF equal
THEN {
IF updateType=insert THEN ERROR Error[wrongUpdateType];
[pse: pse, ptr: pagePtr] ¬ tree.ReferenceStack[pathStk];
foundEntSize ¬ tree.EntrySize[@pagePtr[pse.lastOffset].entry];
tree.ReleasePage[pse.pageNumber];
}
ELSE IF updateType=replace THEN ERROR Error[wrongUpdateType];
To minimize average insertion time, perform the update in one of three ways (in increasing order of difficulty, as measured by amount of temporary storage allocated and amount of data copied):
1. If replacing an existing entry of the same size, just overwrite it.
2. If the new entry fits on the page (after removing the old entry if any), just slide up the entries beyond the insertion point and insert the new entry.
3. Otherwise, leave the new entry as an EntSeqRecord at the appropriate stack level, and let InsertRecords cope with the problem.
This code also takes care not to perform the startOfUpdate and endOfUpdate write references to the state page when the update consists of only a single page write.
tree.version ¬ tree.version+1; -- invalidate existing PathStks that refer to this tree
pse ¬ @pathStk.path[pathStk.top];
IF bytes=foundEntSize
THEN {
new record same length as old; just copy it over
tree.AdjustTreeState[update: unchanged, deltaEntryCount: 0];
pagePtr ¬ tree.ReferencePage[pse.pageNumber, write];
CallEntryProc[@pagePtr[pse.lastOffset].entry];
tree.ReleasePage[pse.pageNumber, IF tree.longUpdate THEN unchanged ELSE endOfUpdate];
}
ELSE {
removedEntGrPage: PageNumber ¬ nilPage;
newEntryFits: BOOL ¬ FALSE;
IF foundEntSize=0
THEN {
no old entry to remove, and we will insert at the leaf level
pathStk.top ¬ leafStkTop;
pse ¬ @pathStk.path[pathStk.top];
};
pathStk.top and pse now designate the page into which to insert the new entry.
IF pathStk.top>0
THEN {
delta: CARD ¬ 0;
IF foundEntSize # 0 THEN delta ¬ foundEntSize+entryOverhead;
pagePtr ¬ tree.ReferencePage[pse.pageNumber];
newEntryFits ¬ (bytes+entryOverhead) <= (pagePtr.freeBytes+delta);
tree.ReleasePage[pse.pageNumber];
};
tree.AdjustTreeState[update: IF newEntryFits THEN unchanged ELSE startOfUpdate, deltaEntryCount: IF foundEntSize=0 THEN 1 ELSE 0];
IF pathStk.top>0 THEN pagePtr ¬ tree.ReferencePage[pse.pageNumber, write];
IF foundEntSize#0
THEN
first remove and discard old entry, but save its descendant pointer
[grPage: removedEntGrPage] ¬ tree.BackUpAndRemoveEntry[pse];
IF newEntryFits
THEN {
new entry fits on the page; slide the greater entries out of the way and drop the new entry in
entPtr: BTreeEntryPtr ¬ @pagePtr[pse.offset];
Basics.MoveWords[
dst: LOOPHOLE[entPtr+bytes+entryOverhead],
src: LOOPHOLE[entPtr],
count: BytesToWords[nilOffset+(tree.state.pageSize-pagePtr.freeBytes)-pse.offset]];
CallEntryProc[@entPtr.entry];
entPtr.grPage ¬ removedEntGrPage;
pagePtr.freeBytes ¬ pagePtr.freeBytes - (bytes+entryOverhead);
tree.ReleasePage[pse.pageNumber, IF tree.longUpdate THEN unchanged ELSE endOfUpdate];
}
ELSE {
new entry does not fit (or there isn't yet a page to fit it into)
esr: REF EntSeqRecord ¬ NEW[EntSeqRecord[(bytes+entryOverhead)/SIZE[BYTE]]];
esr.entSeqP ¬ LOOPHOLE[BASE[DESCRIPTOR[esr.entSeq]]];
esr.entSeqLen ¬ bytes+entryOverhead;
CallEntryProc[@esr.entSeqP.entry];
esr.entSeqP.grPage ¬ removedEntGrPage;
AppendEntSeqRecord[pse: pse, esr: esr];
IF pathStk.top>0 THEN tree.ReleasePage[pse.pageNumber];
tree.GetHeapAndTable[pathStk];
WHILE pathStk.path[pathStk.top].eslFront#
NIL
DO
tree.InsertRecords[pathStk !
UNWIND => tree.ReturnHeapAndTable[pathStk]];
IF pathStk.top=0 THEN EXIT ELSE pathStk.top ¬ pathStk.top-1;
ENDLOOP;
tree.ReturnHeapAndTable[pathStk];
tree.AdjustTreeState[update: endOfUpdate, deltaEntryCount: 0];
};
};
};
IF pathStkWasNil THEN tree.ReturnDefaultPathStk[pathStk];
tree.Unlock[];
};
SetUpdateInProgress:
PUBLIC
SAFE
PROC [tree: Tree, updateInProgress:
BOOL] =
TRUSTED {
tree.Lock[update];
tree.longUpdate ¬ updateInProgress;
tree.AdjustTreeState[
update: IF updateInProgress THEN startOfUpdate ELSE endOfUpdate,
deltaEntryCount: 0];
tree.Unlock[];
};
Private "cool" procedures
The following procedures are logically local to InsertRecords and are not called anywhere else. They are separated out because they are infrequently called and might be packaged separately, should we ever decide to package this code at all.
ComplexInsertRecords:
PROC
[tree: Tree, pathStk: PathStk]
RETURNS [rtBroPg1: PageNumber] = {
Called when not all the entries will fit on the current page. All of this page's entries have been extracted into the ESL for this level. Tries to spill over onto the right brother page, or onto the left brother page if there isn't a right brother, or onto a new page if neither brother exists. Returns rtBroPg1=nilPage if this is successful. Otherwise, repositions the current level of the pathStk (if necessary) so that a right brother exists, and returns the right brother's page number. This procedure is responsible for redistributing the entries among the two pages so as to minimize size of the entry promoted to the father page. Note that this considers only brothers and not cousins or more distant relatives.
pse: PathStkEntryPtr = @pathStk.path[pathStk.top];
fatherPSE: PathStkEntryPtr = @pathStk.path[pathStk.top-1];
entryTable: REF EntryTable = pathStk.entryTable;
oneBrotherEnough: BOOL ¬ FALSE;
fatherIndex, bestFatherIndex: EntryOrdinal;
bestFatherSize: CARD ¬ tree.maxFreeBytes+1;
fatherPSE.leastSon ¬ pse.pageNumber; -- in case this is the root page splitting
rtBroPg1 ¬ nilPage;
IF pathStk.top>1
THEN {
rtBroPg1 ¬ FindRightBrother[
tree: tree, pathStk: pathStk, spaceNeeded: -tree.maxFreeBytes];
IF rtBroPg1=nilPage
THEN
This may look strange, but see the comment below
rtBroPg1 ¬ FindLeftBrother[
tree: tree, pathStk: pathStk, spaceNeeded: -tree.maxFreeBytes];
};
IF rtBroPg1=nilPage THEN rtBroPg1 ¬ tree.AllocatePage[];
At this point, we have two pages in hand, pse.pageNumber and rtBroPg1. All of their entries have been extracted into the ESL, so they may be considered blank pages. We will use rtBroPg1 as the right brother of the current page regardless of whether it was formerly the right brother, the left brother, or newly allocated.
IF entryTable.length<3
THEN
ERROR Bug[tooFewEntries];
there must be at least one entry each from this page, the brother page, and the father page
fatherIndex ¬ FillLeftPage[tree: tree, pathStk: pathStk];
The idea next is to send the shortest entry into the father page such that the current page is at least "pretty" full (if we have such a choice).
DO
pl0, pl1, fatherSize: CARD;
pl1 ¬ EntryIntervalSize[pathStk: pathStk, leftFather: fatherIndex];
IF pl1 > tree.maxFreeBytes THEN EXIT;
pl0 ¬ EntryIntervalSize[pathStk: pathStk, rightFather: fatherIndex];
IF pl0=0 OR pl0+pl1 > tree.maxFreeBytes+tree.awfullyFull THEN EXIT;
Still enough room in right brother page. See if this is the shortest father entry, and try moving one more entry into right brother page.
fatherSize ¬ IndexedEntrySize[pathStk: pathStk, index: fatherIndex];
IF fatherSize<bestFatherSize
THEN {
bestFatherIndex ¬ fatherIndex;
bestFatherSize ¬ fatherSize;
oneBrotherEnough ¬ TRUE;
};
fatherIndex ¬ fatherIndex-1;
ENDLOOP;
IF oneBrotherEnough
THEN {
breakSize: CARD = EntryIntervalSize[pathStk: pathStk, rightFather: bestFatherIndex];
totalSize: CARD = EntryIntervalSize[pathStk: pathStk];
WritePage[tree: tree, pse: pse, number: pse.pageNumber, words: breakSize];
PushEntSeqRecord[pse: fatherPSE, esr: WriteRightBrother[tree: tree, pse: pse, rtBroPg: rtBroPg1, words: totalSize-breakSize]];
rtBroPg1 ¬ nilPage;
};
};
HairyInsertRecords:
PROC [tree: Tree, pathStk: PathStk, rtBroPg1: PageNumber] = {
Called when not all the entries will fit on the current page and the right brother page. Pours all the entries into the current and right brother pages and either the second right brother page or the left brother page, creating a new second right brother page if neither exists or there is still not enough space. This procedure is responsible for redistributing the entries among the three pages so as to minimize the sum of sizes of the entries promoted to the father page. Note that this considers only brothers and not cousins or more distant relatives.
TrickleDown:
PROC [emptyIndex: HeapIndex, entry: EntryOrdinal] = {
sonSize: CARD = IndexedEntrySize[pathStk: pathStk, index: entry];
son: HeapIndex ¬ emptyIndex;
DO
father: HeapIndex ¬ son/2;
fatherEnt: EntryOrdinal;
IF
LOOPHOLE[father,
CARD] <= 0
THEN
EXIT;
Someday we'll take this line out and replace it with the following one; this
is a compiler bug! No code gets generated for some reason
IF father <= 0 THEN EXIT;
fatherEnt ¬ heap.entries[father];
IF IndexedEntrySize[pathStk: pathStk, index: fatherEnt] <= sonSize THEN EXIT;
heap.entries[son] ¬ fatherEnt;
entryTable.map[fatherEnt].heapPos ¬ son;
son ¬ father;
ENDLOOP;
heap.entries[son] ¬ entry;
entryTable.map[entry].heapPos ¬ son;
};
Body of Hairy Insert Records
entryTable: REF EntryTable = pathStk.entryTable;
heap: REF Heap = pathStk.heap;
pse: PathStkEntryPtr = @pathStk.path[pathStk.top];
fatherPSE: PathStkEntryPtr = @pathStk.path[pathStk.top-1]; -- father's pse
rtBroPg2: PageNumber;
fatherIndex, fatherIndex2, bestFatherIndex, bestFatherIndex2: EntryOrdinal;
minFeasIndex, maxFeasIndex: EntryOrdinal;
bestFatherSizeSum: CARD ¬ 2*tree.maxFreeBytes + 1;
twoBrothersEnough: BOOL ¬ FALSE;
breakSize1, breakSize2, totalSize: CARD;
fatherESR: REF EntSeqRecord;
See how much free space our second brother page would have to contain in order to handle the overflow. This is done by pretending to fill up this page and the first right brother page and seeing what is left over.
fatherIndex ¬ FillLeftPage[tree: tree, pathStk: pathStk];
fatherIndex2 ¬ FillLeftPage[tree: tree, pathStk: pathStk, leftFather: fatherIndex];
The current page can't be the root, because one brother would surely have been enough in that case; so we don't have to pussyfoot when calling FindRightBrother.
rtBroPg2 ¬ FindRightBrother[
tree: tree,
pathStk: pathStk,
spaceNeeded: EntryIntervalSize[
pathStk: pathStk,
leftFather: fatherIndex2] + 2*tree.breathingSpace];
IF rtBroPg2=nilPage
THEN {
no luck, try the left brother
fe2: EntryOrdinal = FillRightPage[tree: tree, pathStk: pathStk];
fe: EntryOrdinal = FillRightPage[tree: tree, pathStk: pathStk, rightFather: fe2];
rtBroPg2 ¬ FindLeftBrother[tree: tree, pathStk: pathStk, spaceNeeded: EntryIntervalSize[pathStk: pathStk, leftFather: 0, rightFather: fe] + 2*tree.breathingSpace];
IF rtBroPg2=nilPage
THEN rtBroPg2 ¬ tree.AllocatePage[] -- still no luck, allocate new page
ELSE {
left brother had space, but fatherIndexes are now invalid
fatherIndex ¬ FillLeftPage[tree: tree, pathStk: pathStk];
fatherIndex2 ¬ FillLeftPage[tree: tree, pathStk: pathStk, leftFather: fatherIndex];
};
};
IF entryTable.length < 5
THEN
ERROR Bug[tooFewEntries];
there must be two entries from the father page and at least one entry each from this page and the two brother pages
Now figure out how to divide the entries among the three pages in a way that minimizes the sum of the sizes of the two entries sent to the father page while attempting to keep the pages at least "fairly full". The way this is done is as follows. The left cut point (fatherIndex) is swept leftward from its initial maximum possible value, and all possible right cut points for the initial left cut point are thrown into a heap ordered by entry size. As the left cut point moves left, some possible right cut points are added and some are removed. At each step, the minimum-size entry for the right cut point is on the top of the heap. The sum of that and the entry for the left cut point is computed and the minimum remembered.
heap.length ¬ 0;
maxFeasIndex ¬ fatherIndex2;
WHILE EntryIntervalSize[pathStk: pathStk, leftFather: maxFeasIndex] <= tree.fairlyFull
DO
maxFeasIndex ¬ maxFeasIndex-1;
ENDLOOP;
minFeasIndex ¬ maxFeasIndex+1;
WHILE EntryIntervalSize[pathStk: pathStk, rightFather: fatherIndex]
> (
IF twoBrothersEnough
THEN tree.prettyFull
ELSE 0)
DO
WHILE EntryIntervalSize[
pathStk: pathStk,
leftFather: fatherIndex,
rightFather: minFeasIndex-1] > 0 AND EntryIntervalSize[pathStk: pathStk, leftFather: minFeasIndex-1] <= tree.maxFreeBytes DO
minFeasIndex ¬ minFeasIndex-1;
IF minFeasIndex <= maxFeasIndex
THEN {
AddToHeap
heap.length ¬ heap.length+1;
TrickleDown[emptyIndex: heap.length, entry: minFeasIndex];
};
ENDLOOP;
WHILE EntryIntervalSize[pathStk: pathStk, leftFather: fatherIndex, rightFather: maxFeasIndex] > tree.maxFreeBytes
DO
IF maxFeasIndex >= minFeasIndex
THEN {
RemoveFromHeap
heapPos: HeapIndex = entryTable.map[maxFeasIndex].heapPos;
heap.length ¬ heap.length-1;
IF heapPos <= heap.length
THEN {
replacementEntry: EntryOrdinal = heap.entries[heap.length+1];
IF IndexedEntrySize[pathStk: pathStk, index: replacementEntry] <= IndexedEntrySize[pathStk: pathStk, index: maxFeasIndex]
THEN TrickleDown[emptyIndex: heapPos, entry: replacementEntry]
ELSE {
SiftUp
emptyIndex: HeapIndex ¬ heapPos;
entrySize: CARD = IndexedEntrySize[pathStk: pathStk, index: replacementEntry];
DO
son: HeapIndex ¬ emptyIndex*2;
sonEntry: EntryOrdinal;
IF son > heap.length THEN EXIT;
sonEntry ¬ heap.entries[son];
IF son < heap.length
AND IndexedEntrySize[pathStk: pathStk, index: heap.entries[son+1]] < IndexedEntrySize[pathStk: pathStk, index: sonEntry]
THEN {
son ¬ son+1;
sonEntry ¬ heap.entries[son];
};
IF IndexedEntrySize[pathStk: pathStk, index: sonEntry] >= entrySize THEN EXIT;
heap.entries[emptyIndex] ¬ sonEntry;
entryTable.map[sonEntry].heapPos ¬ emptyIndex;
emptyIndex ¬ son;
ENDLOOP;
heap.entries[emptyIndex] ¬ replacementEntry;
entryTable.map[replacementEntry].heapPos ¬ emptyIndex;
};
};
};
maxFeasIndex ¬ maxFeasIndex-1;
ENDLOOP;
IF heap.length>0
THEN {
fatherSizeSum: CARD;
fatherIndex2 ¬ heap.entries[1];
fatherSizeSum ¬ IndexedEntrySize[pathStk: pathStk, index: fatherIndex] + IndexedEntrySize[pathStk: pathStk, index: fatherIndex2];
IF fatherSizeSum<bestFatherSizeSum
THEN {
twoBrothersEnough ¬ TRUE;
bestFatherSizeSum ¬ fatherSizeSum;
bestFatherIndex ¬ fatherIndex;
bestFatherIndex2 ¬ fatherIndex2;
};
};
fatherIndex ¬ fatherIndex-1;
ENDLOOP;
IF ~twoBrothersEnough THEN ERROR Bug[twoBrothersNotEnough];
Write the three pages and promote the two father entries to the next level.
breakSize1 ¬ EntryIntervalSize[pathStk: pathStk, rightFather: bestFatherIndex];
breakSize2 ¬ EntryIntervalSize[pathStk: pathStk, rightFather: bestFatherIndex2];
totalSize ¬ EntryIntervalSize[pathStk: pathStk];
WritePage[tree: tree, pse: pse, number: pse.pageNumber, words: breakSize1];
fatherESR ¬ WriteRightBrother[tree: tree, pse: pse, rtBroPg: rtBroPg1, words: breakSize2-breakSize1];
PushEntSeqRecord[pse: fatherPSE, esr: WriteRightBrother[tree: tree, pse: pse, rtBroPg: rtBroPg2, words: totalSize-breakSize2]];
PushEntSeqRecord[pse: fatherPSE, esr: fatherESR];
};
FindRightBrother:
PROC [tree: Tree, pathStk: PathStk, spaceNeeded:
INT]
RETURNS [rtBroPg: PageNumber] = {
Finds the right brother of the current page, and determines whether it has room for at least spaceNeeded additional words. If so, removes the father entry and all right brother entries and appends them to the ESL for this level. Returns nilPage if there is no right brother or it is too full. Passing a spaceNeeded argument of -tree.maxFreeBytes will find the right brother if it exists, regardless of how full it is.
pse: PathStkEntryPtr = @pathStk.path[pathStk.top];
fatherPSE: PathStkEntryPtr = @pathStk.path[pathStk.top-1];
fatherEntSize: CARD;
pagePtr: BTreePagePtr;
fatherESR, rtBroESR: REF EntSeqRecord;
IF fatherPSE.eslFront=
NIL
THEN {
pagePtr ¬ tree.ReferencePage[fatherPSE.pageNumber];
IF fatherPSE.offset = nilOffset+(tree.state.pageSize-pagePtr.freeBytes)
THEN {
no right brother
tree.ReleasePage[fatherPSE.pageNumber];
RETURN [nilPage];
};
fatherEntSize ¬ tree.BTreeEntrySize[@pagePtr[fatherPSE.offset]];
rtBroPg ¬ pagePtr[fatherPSE.offset].grPage;
tree.ReleasePage[fatherPSE.pageNumber];
}
ELSE {
fatherEntSize ¬ tree.BTreeEntrySize[fatherPSE.eslFront.entSeqP];
rtBroPg ¬ fatherPSE.eslFront.entSeqP.grPage;
};
pagePtr ¬ tree.ReferencePage[rtBroPg];
IF
LOOPHOLE[pagePtr.freeBytes-fatherEntSize,
INT] < spaceNeeded
THEN {
right brother too full
tree.ReleasePage[rtBroPg];
RETURN [nilPage];
};
rtBroESR ¬ MakeEntSeqRecord[
entSeq: FirstEntry[pagePtr],
length: tree.maxFreeBytes-pagePtr.freeBytes];
tree.ReleasePage[rtBroPg];
[esr: fatherESR] ¬ tree.RemoveEntry[pse: fatherPSE];
AppendEntSeqLengths[tree: tree, pathStk: pathStk, esr: fatherESR];
AppendEntSeqRecord[pse: pse, esr: fatherESR];
AppendEntSeqLengths[tree: tree, pathStk: pathStk, esr: rtBroESR];
AppendEntSeqRecord[pse: pse, esr: rtBroESR];
};
FindLeftBrother:
PROC [tree: Tree, pathStk: PathStk, spaceNeeded:
INT]
RETURNS [ltBroPg: PageNumber] = {
Finds the left brother of the current page, and determines whether it has room for at least spaceNeeded additional words. If so, backs up one entry at the father's level, removes the father entry and all left brother entries, and inserts them at the front of the ESL for this level. Returns nilPage if there is no left brother or it is too full. Passing a spaceNeeded argument of -tree.maxFreeBytes will find the left brother if it exists, regardless of how full it is.
pse: PathStkEntryPtr = @pathStk.path[pathStk.top];
fatherPSE: PathStkEntryPtr = @pathStk.path[pathStk.top-1];
fatherPagePtr, ltBroPagePtr, rtBroPagePtr: BTreePagePtr;
fatherESR, ltBroESR: REF EntSeqRecord;
fatherEntSize: CARD;
rtBroOfLtBroPg: PageNumber;
IF fatherPSE.offset <= entry1Offset THEN RETURN [nilPage];
fatherPagePtr ¬ tree.ReferencePage[fatherPSE.pageNumber];
ltBroPg ¬ fatherPagePtr[fatherPSE.nextToLastOffset].grPage;
rtBroOfLtBroPg ¬ fatherPagePtr[fatherPSE.lastOffset].grPage;
fatherEntSize ¬ tree.BTreeEntrySize[@fatherPagePtr[fatherPSE.lastOffset]];
tree.ReleasePage[fatherPSE.pageNumber];
ltBroPagePtr ¬ tree.ReferencePage[ltBroPg];
IF
LOOPHOLE[ltBroPagePtr.freeBytes-fatherEntSize,
INT] < spaceNeeded
THEN {
tree.ReleasePage[ltBroPg];
RETURN [nilPage];
};
ltBroESR ¬ MakeEntSeqRecord[
entSeq: FirstEntry[ltBroPagePtr],
length: tree.maxFreeBytes-ltBroPagePtr.freeBytes];
fatherPagePtr ¬ tree.ReferencePage[fatherPSE.pageNumber, write];
fatherPagePtr[fatherPSE.nextToLastOffset].grPage ¬ rtBroOfLtBroPg;
tree.ReleasePage[fatherPSE.pageNumber];
[esr: fatherESR] ¬ tree.BackUpAndRemoveEntry[pse: fatherPSE];
rtBroPagePtr ¬ tree.ReferencePage[rtBroOfLtBroPg, write];
fatherESR.entSeqP.grPage ¬ rtBroPagePtr.minPage;
rtBroPagePtr.minPage ¬ ltBroPagePtr.minPage;
tree.ReleasePage[rtBroOfLtBroPg];
tree.ReleasePage[ltBroPg];
PushEntSeqLengths[tree: tree, pathStk: pathStk, esr: fatherESR];
PushEntSeqRecord[pse: pse, esr: fatherESR];
PushEntSeqLengths[tree: tree, pathStk: pathStk, esr: ltBroESR];
PushEntSeqRecord[pse: pse, esr: ltBroESR];
};
WriteRightBrother:
PROC [tree: Tree, pse: PathStkEntryPtr, rtBroPg: PageNumber, words:
CARD]
RETURNS [fatherESR:
REF EntSeqRecord] = {
Removes words' worth of entries from the front of the ESL for this level, and writes all but the first entry into rtBroPg. Designates the first entry as the (left) father of rtBroPg, and returns a new ESR containing it. Also sets the page's freeBytes and minPage fields appropriately.
pagePtr: BTreePagePtr;
minPage: PageNumber;
[esr: fatherESR, grPage: minPage] ¬ tree.RemoveEntry[pse: pse];
words ¬ words-fatherESR.entSeqLen;
pagePtr ¬ tree.ReferencePage[rtBroPg, write];
pagePtr.minPage ¬ minPage;
tree.ReleasePage[rtBroPg];
WritePage[tree: tree, pse: pse, number: rtBroPg, words: words];
fatherESR.entSeqP.grPage ¬ rtBroPg;
};
WritePage:
PROC [tree: Tree, pse: PathStkEntryPtr, number: PageNumber, words:
CARD] = {
Removes words' worth of entries from the front of the ESL for this level, and writes them into the page designated by number. Sets the page's freeBytes appropriately, but does not touch minPage.
pagePtr: BTreePagePtr = tree.ReferencePage[number, write];
DepositESL[tree: tree, pse: pse, block: FirstEntry[pagePtr], length: words];
pagePtr.freeBytes ¬ tree.maxFreeBytes-words;
tree.ReleasePage[number];
};
IndexedEntrySize:
PROC [pathStk: PathStk, index: EntryOrdinal]
RETURNS [words:
CARD] =
INLINE {
words ¬ EntryIntervalSize[pathStk: pathStk, leftFather: index-1, rightFather: index+1];
RETURN [words];
};
FillLeftPage:
PROC [tree: Tree, pathStk: PathStk, leftFather, rightFather: EntryOrdinal ¬ 0]
RETURNS [midFather: EntryOrdinal] = {
Finds the largest entry ordinal in (leftFather .. rightFather) such that all the entries in (leftFather .. midFather) will fit in one BTree page. If rightFather = 0 then it is defaulted to pathStk.entryTable.length+1.
IF rightFather=0 THEN rightFather ¬ pathStk.entryTable.length+1;
midFather ¬ leftFather+2;
WHILE midFather<rightFather-2
AND EntryIntervalSize[pathStk: pathStk, leftFather: leftFather, rightFather: midFather+1] <= tree.maxFreeBytes
DO
midFather ¬ midFather+1;
ENDLOOP;
};
FillRightPage:
PROC [tree: Tree, pathStk: PathStk, leftFather, rightFather: EntryOrdinal ¬ 0]
RETURNS [midFather: EntryOrdinal] = {
Finds the smallest entry ordinal in (leftFather .. rightFather) such that all the entries in (midFather .. rightFather) will fit in one BTree page. If rightFather = 0 then it is defaulted to pathStk.entryTable.length+1.
IF rightFather=0 THEN rightFather ¬ pathStk.entryTable.length+1;
midFather ¬ rightFather-2;
WHILE midFather>leftFather+2
AND EntryIntervalSize[pathStk: pathStk, leftFather: midFather-1, rightFather: rightFather] <= tree.maxFreeBytes
DO
midFather ¬ midFather-1;
ENDLOOP;
};
MoveOverlapped: PUBLIC PROC [dst: POINTER, src: POINTER, nBytes: CARD] = {
WordPtr: TYPE = POINTER TO WORD;
IF LOOPHOLE[dst, CARD] > LOOPHOLE[src, CARD]
AND LOOPHOLE[dst, CARD] - LOOPHOLE[src, CARD] < nBytes THEN {
The blocks overlap, so the move must be in the reverse direction
WHILE nBytes > 0 DO
nBytes ¬ nBytes - 1 * BYTES[WORD];
LOOPHOLE[dst+nBytes, WordPtr] ¬ LOOPHOLE[src+nBytes, WordPtr];
ENDLOOP;
RETURN;
};
Destination either overlaps with source in the reverse direction, or overlaps not at all.
We use Basics.
Basics.MoveWords[dst: dst, src: src, count: (nBytes+BYTES[WORD]-1)/(BYTES[WORD])];
};
BytesToWords:
PROC[nBytes:
CARD]
RETURNS[nWords:
CARD] ~
INLINE
{ RETURN[ (nBytes+BYTES[WORD]-1)/BYTES[WORD] ]; };
FirstEntry:
PROC [pagePtr: BTreePagePtr]
RETURNS [BTreeEntryPtr] =
INLINE {
Returns a pointer to the first BTree entry in the page pointed at by pagePtr
RETURN [LOOPHOLE[pagePtr + pageOverhead]];
};
PrintPage: PROC [pagePtr: BTreePagePtr, pageSize: CARD, freeBytes: CARD] = {
entries: CARD = pageSize - freeBytes;
ptrToPage: CARD = LOOPHOLE[pagePtr, CARD];
i: CARD ← ptrToPage;
WHILE i < (ptrToPage + entries) DO
IO.PutRope[outStream, Rope.Cat[Convert.RopeFromCard[LOOPHOLE[i, CARD]],
": "]];
IO.PutRope[outStream, Rope.Cat[Convert.RopeFromCard[LOOPHOLE[LOOPHOLE[i, POINTER TO CARD]^, CARD]], "\n"]];
i ← i + 1*SIZE[BYTE];
ENDLOOP;
IO.PutRope[outStream, "\n"];
};
[inStream, outStream, errStream, hasEcho] ← SimpleStreams.Create[windowSystem: $uxio];
}.