DBIndexImpl:
CEDAR
PROGRAM
IMPORTS
DBCommon,
DBSegment,
DBStats,
DBIndexOp,
DBIndexMod,
DBIndexPage,
DBIndexScan,
DBStorageTuple,
IO,
RefText,
Rope
EXPORTS DBIndex, DBStorage
= BEGIN OPEN DBIndexPage;
Types
IndexScanObject: PUBLIC TYPE = DBStorageConcrete.IndexScanObject;
IndexScanHandle: TYPE = REF IndexScanObject;
Page: TYPE = DBIndex.Page;
IndexKey: TYPE = DBIndex.IndexKey;
Core: TYPE = DBIndex.Core;
State: TYPE = DBIndex.State;
RealIndexHandle: TYPE = DBIndex.RealIndexHandle;
ItemHandle: TYPE = DBIndex.ItemHandle;
FullPage: CARDINAL = DBIndex.FullPage;
OverHead: CARDINAL = DBIndex.OverHead;
ROPE: TYPE = Rope.ROPE;
Signals
BadBTree: SIGNAL = CODE;
KeyIsNotFound: ERROR = CODE;
TakeALookAtThis: SIGNAL = CODE; -- for debugging
Constants
DeletionOfNonExistentKey: CARDINAL = 201B;
ThisIndexWasNotCreated: CARDINAL = 202B;
Global vars (mainly for debugging)
CheckFlag: BOOLEAN ← FALSE; -- do checking for illegal B-Tree format after each operation
ExtraCheckFlag: BOOLEAN ← FALSE; -- check for unusual but not illegal cases
PrintFlag: BOOLEAN ← FALSE; -- turn on debugging output
PrintBreadth: CARDINAL ← 4; -- for debug output: # key entries to print per page
PrintInternalValues: BOOL ← TRUE; -- for debug output: print values associated with internal keys
PrintLeafValues: BOOL ← FALSE; -- for debug output: print values associated with leaf keys
opCount: INT ← 0; -- counts number of operations for printing purposes
watchForThisKey: ROPE ← NIL; -- prints this key when entered or deleted
stopAtOpCount: INT ← 0; -- signals when get to this op count
deletionTurnedOn: BOOL ← TRUE; -- this should normally be TRUE, to free B-Tree pages
tryRecovery: BOOL ← FALSE; -- tries to recover from bad B-Tree; this should normally be FALSE
prevDB, nextDB: DBCommon.DBPage; -- Used only by check programs
Public procs
CreateIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle] = {
DBIndexScan.CreateNewRealIndexHandle[DBStorageTuple.TIDOfTuple[x]];
WriteIndexTuple[tuple: x, root: DBCommon.NullDBPage, depth: 0]
};
DestroyIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle] = {
Destroys the tree itself and the index handle but not the tuple the latter is stored in
q: RealIndexHandle ← DBIndexScan.GetOldRealIndexHandle[x];
DestroyTree[q];
DBIndexScan.DestroyIndexHandle[q];
WriteIndexTuple[tuple: x, root: DBCommon.NullDBPage, depth: 0]
};
InsertIntoIndex:
PUBLIC
PROC[x: DBStorage.IndexHandle, k:
ROPE, v: DBStorage.TupleHandle] =
TRUSTED {
newKey: REF TEXT;
keyText: REF TEXT = LOOPHOLE[RefText.TrustTextRopeAsText[Rope.Flatten[k]]];
q: RealIndexHandle ← DBIndexScan.GetOldRealIndexHandle[x];
newP, overFlow, root: Page;
state: State;
tid: DBCommon.TID ← DBStorageTuple.TIDOfTuple[v];
size: CARDINAL ← CheckTreeInit[q, "Inserting", k];
DBStats.Inc[DBIndexInsert];
IF q.depth = 0
THEN {
newP ← DBIndexPage.CreateEmptyPage[q, 1, -- the level -- q.segment];
DBIndexMod.InsertTheFirstLeafEntry[newP, keyText, tid];
PutRootPageToIndexHandle[q, newP];
IncrementDepth[q];
DBIndexScan.PutScanIndex[q, newP];
WriteIndexTuple[tuple: x, root: newP.db, depth: 1];
DBIndexPage.UnlockPage[newP] }
ELSE {
root ← GetRootPage[q];
[state, overFlow, newKey] ← InsertIntoTree[root, keyText, tid];
SELECT state
FROM
deleteFromNextPage => ERROR KeyIsNotFound;
normal => NULL;
split => SplitRoot[tuple: x, root: q, overflow: overFlow, splitKey: newKey];
ENDCASE => ERROR;
DBIndexPage.UnlockPage[root] };
CheckTreeFinal[q, size + 1] };
DeleteFromIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle, k: Rope.
ROPE, v: DBStorage.TupleHandle] =
TRUSTED {
tid: DBCommon.TID = DBStorageTuple.TIDOfTuple[v];
q: RealIndexHandle = DBIndexScan.GetOldRealIndexHandle[x];
keyText: REF TEXT = LOOPHOLE[RefText.TrustTextRopeAsText[Rope.Flatten[k]]];
newRoot, overflow, root: Page;
state: State;
newKey: REF TEXT;
size: CARDINAL ← CheckTreeInit[q, "Deleting", k];
DBStats.Inc[DBIndexDelete];
IF q.depth = 0
THEN ERROR DBCommon.InternalError -- Deleting entry from empty index!
ELSE
{root ← GetRootPage[q];
[state, overflow, newKey] ← DeleteFromTree[root, keyText, tid];
SELECT state
FROM
deleteFromNextPage => ERROR KeyIsNotFound;
normal => DBIndexPage.UnlockPage[root];
merge =>
TRUSTED {
IF root.depth = 1
AND root.pointer.size = 0
THEN
Deleted last entry in index (on root page). Destroy the root page.
{
IF ExtraCheckFlag
THEN
SIGNAL TakeALookAtThis;
DBIndexPage.DestroyPage[q.segment, root, root.db];
DBIndexScan.DeleteHandle[q];
DBIndexScan.ScanForNullTree[q];
WriteIndexTuple[tuple: x, root: q.rootDB, depth: 0] }
ELSE
One entry left on a non-root page; want to merge it.
IF root.depth > 1
AND root.pointer.size = 1
THEN
{
IF ExtraCheckFlag
THEN
SIGNAL TakeALookAtThis;
newRoot ← DeleteRoot[root, q.rootDB];
PutRootPageToIndexHandle[q, newRoot];
DecrementDepth[q];
WriteIndexTuple[tuple: x, root: q.rootDB, depth: q.depth];
DBIndexPage.UnlockPage[newRoot] }
ELSE
DBIndexPage.UnlockPage[root];
};
split =>
{SplitRoot[tuple: x, root: q, overflow: overflow, splitKey: newKey];
DBIndexPage.UnlockPage[root] }
ENDCASE => ERROR };
CheckTreeFinal[q, size - 1] }; -- end of DeleteFromIndex
OpenScanIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle, y: DBStorage.Selection, startPosition: DBStorage.FirstLast]
RETURNS [scan: DBStorage.IndexScanHandle] =
TRUSTED {
index: CARDINAL;
page: DBCommon.DBPage;
q: RealIndexHandle = DBIndexScan.GetOldRealIndexHandle[x];
size: CARDINAL = CheckTreeInit[q, "Scanning index from", y.lowerBound];
IF
NOT y.lowerBoundInfinity
AND
NOT y.includeLowerBound
THEN
y.lowerBound ← Rope.Concat[y.lowerBound, "\000"];
IF
NOT y.upperBoundInfinity
AND y.includeUpperBound
THEN
y.upperBound ← Rope.Concat[y.upperBound, "\000"];
IF q.depth = 0 THEN {page ← DBCommon.NullDBPage; index ← 0}
ELSE
IF startPosition = First
THEN
IF y.lowerBoundInfinity THEN {page ← SearchTheFirstPage[q]; index ← 0}
ELSE [page, index] ← Search[q, LOOPHOLE[RefText.TrustTextRopeAsText[Rope.Flatten[y.lowerBound]]]]
ELSE
IF y.upperBoundInfinity THEN [page, index] ← SearchTheLastPage[q]
ELSE [page, index] ← Search[q, LOOPHOLE[RefText.TrustTextRopeAsText[Rope.Flatten[y.upperBound]]]];
scan ← DBIndexScan.CreateScanHandle[q, y, startPosition, page, index];
CheckTreeFinal[q, size]
};
NextScanIndex:
PUBLIC
PROC [x: IndexScanHandle]
RETURNS [DBStorage.TupleHandle] =
TRUSTED {
Note that we fetch the tuple currently ref'd by x and THEN increment the scan to the next tuple, as the convention is that x already refs the next tuple to scan.
page: Page;
tuple: DBStorage.TupleHandle;
size: CARDINAL;
IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released.
size ← CheckTreeInit[x.tree, "NextScanIndex"];
IF x.this = DBCommon.NullDBPage THEN RETURN [NIL];
page ← DBIndexPage.GetPage[x.tree, x.this, 1];
IF x.index >= page.pointer.size
THEN { IF page.pointer.right # DBCommon.NullDBPage THEN ERROR; tuple ← NIL }
ELSE {
key: IndexKey ← Key[page, x.index];
IF NOT x.upperBoundInfinity AND ( x.includeUpperBound AND Less[x.upperBound, key] OR NOT x.includeUpperBound AND LessEq[x.upperBound, key] ) THEN tuple ← NIL
ELSE {
tuple ← DBStorageTuple.ConsTupleObject[tid: Tid[page, x.index]];
page ← IncrementIndex[x, page] } };
DBIndexPage.UnlockPage[page];
CheckTreeFinal[x.tree, size];
RETURN [tuple]
};
NextScanKey:
PUBLIC
PROC [x: IndexScanHandle]
RETURNS [key: DBIndex.IndexKey] =
TRUSTED {
Note that we fetch the tuple currently ref'd by x and THEN increment the scan to the next tuple, as the convention is that x already refs the next tuple to scan.
page: Page;
IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released.
IF x.this = DBCommon.NullDBPage THEN RETURN [NIL];
page ← DBIndexPage.GetPage[x.tree, x.this, 1];
IF x.index >= page.pointer.size
THEN { IF page.pointer.right # DBCommon.NullDBPage THEN ERROR; key ← NIL }
ELSE {
key ← Key[page, x.index];
IF
NOT x.upperBoundInfinity
AND (x.includeUpperBound
AND Less[x.upperBound, key]
OR
NOT x.includeUpperBound
AND LessEq[x.upperBound, key])
THEN { key ← NIL; x.direction ← finished }
ELSE x.direction ← next };
RETURN [key]
};
PrevScanIndex:
PUBLIC
PROC[x: IndexScanHandle]
RETURNS [tuple: DBStorage.TupleHandle] = {
Decrement scan and then fetch tuple ref'd by x.
page: Page;
size: CARDINAL;
IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released.
size ← CheckTreeInit[x.tree, "PrevScanIndex"];
IF x.this = DBCommon.NullDBPage THEN RETURN [NIL];
page ← DBIndexPage.GetPage[x.tree, x.this, 1];
IF x.index = 0 AND NoPreviousPage[page] THEN tuple ← NIL
ELSE {
key: IndexKey ← Key[page, x.index];
page ← DecrementIndex[x, page];
IF NOT x.lowerBoundInfinity AND ( x.includeLowerBound AND Greater[x.lowerBound, key] OR NOT x.includeLowerBound AND GreaterEq[x.lowerBound, key] )
THEN tuple ← NIL
ELSE tuple ← DBStorageTuple.ConsTupleObject[Tid[page, x.index]] };
DBIndexPage.UnlockPage[page];
CheckTreeFinal[x.tree, size];
RETURN [tuple]
}; -- PredScanIndex
PrevScanKey:
PUBLIC
PROC[x: IndexScanHandle]
RETURNS [key: DBIndex.IndexKey] = {
page: Page;
IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released.
IF x.this = DBCommon.NullDBPage THEN RETURN [NIL];
page ← DBIndexPage.GetPage[x.tree, x.this, 1];
IF x.index = 0 AND NoPreviousPage[page] THEN { key ← NIL; x.direction ← finished }
ELSE {
key ← Key[page, x.index];
IF NOT x.lowerBoundInfinity AND (x.includeLowerBound AND Greater[x.lowerBound, key] OR NOT x.includeLowerBound AND GreaterEq[x.lowerBound, key])
THEN {key ← NIL; x.direction ← finished}
ELSE x.direction ← prev };
RETURN [key]
}; -- PrevScanKey
ThisScanTuple:
PUBLIC
PROC[x: IndexScanHandle]
RETURNS [tuple: DBStorage.TupleHandle] = {
IF x.direction = finished THEN RETURN[NIL];
BEGIN
page: Page ← IF x.direction = next THEN DBIndexPage.GetPage[x.tree, x.this, 1] ELSE DecrementIndex[x, DBIndexPage.GetPage[x.tree, x.this, 1]];
tuple ← DBStorageTuple.ConsTupleObject[tid: Tid[page, x.index]];
IF x.direction = next THEN page ← IncrementIndex[x, page];
DBIndexPage.UnlockPage[page]
END };
IgnoreEntry:
PUBLIC PROC[x: IndexScanHandle] = {
page: Page = IF x.direction = next THEN DBIndexPage.GetPage[x.tree, x.this, 1] ELSE DecrementIndex[x, DBIndexPage.GetPage[x.tree, x.this, 1]];
DBIndexPage.UnlockPage[page] };
TupleForKey:
PUBLIC
PROC[x: DBStorage.IndexHandle, key: Rope.
ROPE]
RETURNS[tuple: DBStorage.TupleHandle] =
TRUSTED {
q: RealIndexHandle = DBIndexScan.GetOldRealIndexHandle[x];
IF q.rootDB = DBCommon.NullDBPage THEN tuple ← NIL
ELSE
TRUSTED
BEGIN
root: Page = GetRootPage[q];
tuplePage: DBCommon.DBPage;
thisPage: Page;
tupleIndex: CARDINAL;
[tuplePage, tupleIndex] ← SearchTree[root, LOOPHOLE[Rope.Flatten[key]]];
thisPage ← DBIndexPage.GetPage[q, tuplePage, q.depth];
IF tupleIndex >= thisPage.pointer.size
OR Less[key, Key[thisPage, tupleIndex]]
THEN tuple ← NIL
ELSE tuple ← DBStorageTuple.ConsTupleObject[Tid[thisPage, tupleIndex]];
DBIndexPage.UnlockPage[thisPage];
DBIndexPage.UnlockPage[root];
END;
DBIndexScan.DeleteHandle[q] };
CallAfterFinishTransaction:
PUBLIC
PROC [s: DBCommon.Segment] = {
DBIndexPage.DestroyPageList[s];
DBIndexScan.FinalizeIndexHandle[s];
DBIndexScan.FreeScan[s] };
Private Procedures
Insertion
InsertIntoTree:
PROC[page: Page, key:
REF
TEXT, value: DBCommon.TID]
RETURNS [State, Page,
REF
TEXT] = {
Inserts <key, value> into the tree emanating from page a[0]<=key. Caller must deallocate IndexKey returned.
i: CARDINAL;
newKey, newSplitKey: REF TEXT;
newOverFlow, overFlow: Page;
q: Page;
state: State;
IF page.depth = 1
-- leaf --
THEN {
DBStats.Inc[BTreeSearchPage];
IF DBIndexOp.OverFlow[page, key]
THEN {
[overFlow, newKey] ← DBIndexOp.SplitLeaf[page, key, value];
RETURN [split, overFlow, newKey] }
ELSE {
SlideLeaf[page, key, value];
RETURN [normal, NIL, NIL] } }
ELSE {
DBStats.Inc[BTreeSearchPage];
i ← FindTheLastInternalKey[page, key];
i=0&key<a[1] | i<a.length-1&a[i]<=key<a[i+1] | i=a.length-1&a[i]<=key
q ← DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1];
[state, overFlow, newKey] ← InsertIntoTree[q, key, value];
SELECT state
FROM
normal => { DBIndexPage.UnlockPage[q]; RETURN [normal, NIL, NIL] };
split => {
[state, newOverFlow, newSplitKey] ←
DBIndexOp.InsertInInternalPage[page, i, overFlow.db, newKey];
DBIndexPage.UnlockPage[q];
DBIndexPage.UnlockPage[overFlow];
RETURN [state, newOverFlow, newSplitKey] }
ENDCASE => ERROR}
};
SplitRoot:
PROC [tuple: DBStorage.TupleHandle, root: RealIndexHandle, overflow: Page,
splitKey: REF TEXT] = {
Splits the root into two and creates another page with two entries, root and overflow
newP: Page;
newP ← DBIndexPage.CreateEmptyPage[root, root.depth + 1, root.segment];
DBIndexMod.InsertTwoEntriesToEmptyPage[newP, root.rootDB, splitKey, overflow.db];
PutRootPageToIndexHandle[root, newP];
IncrementDepth[root];
DBIndexPage.UnlockPage[overflow];
WriteIndexTuple[tuple: tuple, root: newP.db, depth: root.depth];
DBIndexPage.UnlockPage[newP]
};
Deletion
DeleteFromTree:
PROC[page: Page, key:
REF TEXT, tid: DBCommon.TID]
RETURNS [State, Page,
REF TEXT] = {
Deletes the entry with given key. If multiple entries, then the entry with the value tid is deleted. Deletion has difficult problems in cases like ours, where we allow multiple entries with the same key. If we find the entry at the beginning of the page, there is a possibility that the same key exists in the left page. Therefore, the search algorithm is that in every internal page, look for the left most key which has the same or equal value to the argument key. If the same, go down the left of the key; if greater go down the right.
i: CARDINAL;
newKey, retKey: REF TEXT;
overflow, retOverflow, q: Page;
state: State;
DBIndexPage.CheckTag[page.pointer];
IF page.depth = 1
THEN {
DBStats.Inc[BTreeSearchPage];
[state, ,] ← DBIndexOp.DeleteFromLeafPage[page, key, tid];
DBIndexPage.CheckTag[page.pointer];
RETURN [state, NIL, NIL] };
DBStats.Inc[BTreeSearchPage];
i ← FindTheFirstInternalKey[page, key];
i=0&k<=a[i] OR 0<i<max[a]&a[i-1]<k<=a[i] OR i=max[a]&a[i]<key;
DO q ← DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1];
[state, overflow, newKey] ← DeleteFromTree[q, key, tid];
SELECT state
FROM
deleteFromNextPage => {
DBIndexPage.UnlockPage[q];
i ← i + 1;
TRUSTED {IF i >= page.pointer.size THEN RETURN [state, overflow, newKey]}
};
normal => { DBIndexPage.UnlockPage[q]; RETURN [normal, NIL, NIL]};
merge => {
[state, overflow, retKey] ← MergeInThisPage[page, q, i];
RETURN [state, overflow, retKey] };
split => {
[state, retOverflow, retKey] ← DBIndexOp.InsertInInternalPage[page, i, overflow.db, newKey];
DBIndexPage.UnlockPage[q];
DBIndexPage.UnlockPage[overflow];
RETURN [state, retOverflow, retKey] }
ENDCASE => ERROR
ENDLOOP
}; -- DeleteFromTree
DestroyTree:
PROC [q: RealIndexHandle] = {
Deallocates all file pages of Btree
DestroyBelow:
PROC [tree: RealIndexHandle, root: Page] = {
Deallocates all file pages emanating from this page. The root page is not destroyed. If there is no root page (an empty B-Tree), simply return.
db: DBCommon.DBPage;
i: CARDINAL;
q: Page;
IF root.depth = 1
THEN RETURN -- leaves already freed by parent
ELSE
TRUSTED {
FOR i
IN [0..root.pointer.size)
DO
q ← DBIndexPage.GetPage[tree, db ← Value[root, i], root.depth - 1
! DBIndexPage.BadPage => IF tryRecovery THEN LOOP];
DestroyBelow[tree, q];
DBIndexPage.DestroyPage[tree.segment, q, db]
ENDLOOP
};
}; -- end DestroyBelow
root: Page;
IF q.root=NIL AND q.rootDB=0 THEN RETURN;
root ← GetRootPage[q];
DestroyBelow[q, root];
DBIndexPage.DestroyPage[q.segment, root, q.rootDB];
q.rootDB ← DBCommon.NullDBPage;
q.root ← NIL;
q.depth ← 0;
q.free ← TRUE
};
PutRootPageToIndexHandle:
PROC [q: RealIndexHandle, p: Page] = {
Sets DBPage and CacheHint of root
q.rootDB ← p.db; q.root ← p.cache
};
IncrementDepth: PROC [q: RealIndexHandle] = {q.depth ← q.depth + 1};
GetRootPage:
PROC [q: RealIndexHandle]
RETURNS [Page] = {
RETURN[DBIndexPage.GetPage[tree: q, db: q.rootDB, level: q.depth]] };
DeleteRoot:
PROC [p: Page, r: DBCommon.DBPage]
RETURNS [Page] = {
p contains one element. Deletes p and returns the son
son: Page ← DBIndexPage.GetPage[p.tree, Value[p, 0], p.depth - 1];
obtain the first item
DBIndexPage.DestroyPage[p.tree.segment, p, r];
RETURN [son]
};
DecrementDepth: PROC [q: RealIndexHandle] = {q.depth ← q.depth - 1};
IncrementIndex:
PROC [x: IndexScanHandle, p: Page]
RETURNS [Page] =
TRUSTED {
Increments the index of scan. If it falls off the page, then the right page is fetched and
returned. The original page is deallocated in that case.
db: DBCommon.DBPage;
q: Page;
IF x.index >= p.pointer.size - 1
THEN {
[q, db] ← GetRightPage[p];
IF q = NIL THEN {x.index ← x.index + 1; RETURN [p]};
DBIndexPage.UnlockPage[p];
x.this ← db;
x.index ← 0;
RETURN [q] }
ELSE { x.index ← x.index + 1; RETURN [p] }
};
DecrementIndex:
PROC [x: IndexScanHandle, p: Page]
RETURNS [Page] =
TRUSTED {
Returns the left page if the previous element is in the left page
q: Page;
db: DBCommon.DBPage;
IF x.index # 0 THEN {x.index ← x.index - 1; RETURN [p]};
[q, db] ← GetLeftPage[p];
IF q = NIL THEN RETURN [p];
DBIndexPage.UnlockPage[p];
x.this ← db;
x.index ← q.pointer.size - 1;
RETURN [q]
};
GetRightPage:
PROC [p: Page]
RETURNS [Page, DBCommon.DBPage] =
TRUSTED {
p is a leaf page. Reads in the right page. If no right page, returns NIL
rightDB: DBCommon.DBPage;
IF (rightDB ← p.pointer.right) = DBCommon.NullDBPage
THEN
RETURN [NIL, DBCommon.NullDBPage];
RETURN [DBIndexPage.GetPage[p.tree, rightDB, 1], rightDB]
};
GetLeftPage:
PROC [p: Page]
RETURNS [Page, DBCommon.DBPage] =
TRUSTED {
p is a leaf page. Reads in the left page. If no left page, returns NIL
leftDB: DBCommon.DBPage;
IF (leftDB ← p.pointer.left) = DBCommon.NullDBPage
THEN
RETURN [NIL, DBCommon.NullDBPage];
RETURN [DBIndexPage.GetPage[p.tree, leftDB, 1], leftDB]
};
Internal movement of Btree pages
SlideLeaf:
PROC [p: Page, key:
REF TEXT, value:
LONG
CARDINAL] = {
Shifts part of entries in p and inserts, key and value It is guranteed that there won't be any overflow.
index: CARDINAL ← FindTheLastLeafKey[p, key];
DBIndexMod.SlideLeafAt[p, index, key, value]
};
MergeInThisPage:
PROC [p: Page, son: Page, index:
CARDINAL]
RETURNS [State, Page,
REF TEXT] = {
son is the index-th entry in p. son is sparse, so balance it. Returns normal if p is normal, delete if p needs balancing, split if p needs splitting, merge if p is less than half full This procedure does a UnlockPage or DestroyPage on son, caller should not touch it. Caller must free IndexKey returned.
state: State;
page: Page;
key: REF TEXT;
IF son.depth = 1
THEN [state, page, key] ← DBIndexOp.MergeInLeafPage[p, son, index]
ELSE [state, page, key] ← MergeInInternalPage[p, son, index];
IF state = delete
THEN
{state ← DBIndexMod.RemoveFromInternal[p, index]; DestroyThisPage[son]}
ELSE DBIndexPage.UnlockPage[son];
RETURN [state, page, key]
};
MergeInInternalPage:
PROC [parent: Page, son: Page, index:
CARDINAL]
RETURNS [State, Page,
REF TEXT] =
TRUSTED {
"son" is an internal page. "son" is the index-th entry in "parent" "son" is sparse, so balance it. Returns normal if son is normal, delete if son needs balancing, split if son needs splitting, Caller must deallocate IndexKey returned.
freeSon: CARDINAL ← FreeSpace[son];
i: CARDINAL;
newKey, tempKey: REF TEXT;
sizeSon: CARDINAL ← son.pointer.size;
state: State;
overflow: Page;
We balance son with its right brother except in the unusual case in which it has no right brother (when index=parent.pointer.size-1). There will always be a left or right brother because we don't permit "only child" leaf or internal pages.
IF index # parent.pointer.size - 1
THEN {
-- balance the son with the right page
right: Page ← DBIndexPage.GetPage[parent.tree, Value[parent, index + 1], son.depth];
IF MergableToRightInternal
[parent: parent, leftFree: freeSon, rightFree: FreeSpace[right],
leftAt: index] THEN {
move ALL entries from son to right; caller will delete son
DBIndexMod.MoveEntriesToRightInternal[from: son, to: right, key: DBIndexOp.RopeForKey[Key[parent, index + 1]], nentries: sizeSon];
DBIndexPage.UnlockPage[right];
RETURN [delete, NIL, NIL] };
Move zero or more entries from right to son to balance
i ← JustOver[right, DBIndex.HalfPage - FrontSize[son, sizeSon - 1] - sizeSon];
If move would move no entries, or would leave zero or one entry on right,
then don't do it: return and leave tree as is. We are moving i entries.
IF i = 0
OR i >= right.pointer.size - 1
THEN
{DBIndexPage.UnlockPage[right]; RETURN[normal, NIL, NIL]}; -- ... so do nothing
tempKey ← DBIndexOp.RopeForKey[Key[right, i]];
DBIndexMod.MoveEntriesToLeftInternal[from: right, to: son, key: DBIndexOp.RopeForKey[Key[parent, index + 1]], nentries: i];
DBIndexPage.UnlockPage[right];
[state, overflow, newKey] ← DBIndexOp.ChangeKey[parent, tempKey, index + 1] }
ELSE
{
-- no right brother: balance with the left
left: Page ←
DBIndexPage.GetPage[parent.tree, Value[parent, index - 1], son.depth];
IF MergableToLeftInternal
[parent: parent, leftFree: FreeSpace[left], rightFree: freeSon,
rightAt: index] THEN {
move ALL entries from son to left; caller will delete son
DBIndexMod.MoveEntriesToLeftInternal[from: son, to: left, key: DBIndexOp.RopeForKey[Key[parent, index]], nentries: sizeSon];
DBIndexPage.UnlockPage[left];
RETURN [delete, NIL, NIL] };
Move zero or more entries from left to son to balance, leaving i+1 in left:
i ← JustOver[left, DBIndex.HalfPage];
If JustOver would have us move zero entries, or all but one entry in left,
then don't do it: return and leave tree as is.
IF left.pointer.size = i + 1
OR i = 0
THEN
{DBIndexPage.UnlockPage[left]; RETURN[normal, NIL, NIL]};
tempKey ← DBIndexOp.RopeForKey[Key[left, i + 1]];
DBIndexMod.MoveEntriesToRightInternal
[from: left, to: son, key: DBIndexOp.RopeForKey[Key[parent, index]],
nentries: left.pointer.size - i - 1];
DBIndexPage.UnlockPage[left];
[state, overflow, newKey] ← DBIndexOp.ChangeKey[parent, tempKey, index] };
RETURN [state, overflow, newKey]
};
MergableToRightInternal:
PROC [parent: Page, leftFree, rightFree:
CARDINAL, leftAt:
CARDINAL]
RETURNS [
BOOLEAN] =
TRUSTED {
"left" is the leftAt-th son of "parent" and "right" is just to the right of "left".
Returns TRUE iff left and right can be merged.
key: IndexKey ← Key[parent, leftAt+1];
length: CARDINAL ← (key.length + 3) / 2;
Space used after merging less than a FullPage? Following is equivalent to val returned:
(FullPage - leftFree) + (FullPage - rightFree) - Overhead + length < FullPage
RETURN [rightFree + leftFree + OverHead > FullPage + length]
};
MergableToLeftInternal:
PROC [parent: Page, leftFree, rightFree:
CARDINAL, rightAt:
CARDINAL]
RETURNS [
BOOLEAN] =
TRUSTED {
"right" is the rightAt-th son of "parent" and "left" is just to the left of "right". Returns TRUE iff left and right can be merged.
key: IndexKey ← Key[parent, rightAt];
length: CARDINAL ← (key.length + 3) / 2;
RETURN [rightFree + leftFree + OverHead > FullPage + length]
};
NoPreviousPage:
PROC [page: Page]
RETURNS [
BOOLEAN] =
TRUSTED {
RETURN [page.pointer.left = DBCommon.NullDBPage]
};
DestroyThisPage:
PROC [page: Page] =
TRUSTED
Does some checks, then call the real DestroyPage; use for pages removed from B-Tree. Currently only called by MergeInThisPage, we call directly if destroying whole tree.
BEGIN
IF page.pointer.size#0 THEN SIGNAL TakeALookAtThis;
IF page.pointer.left#DBCommon.NullDBPage
OR page.pointer.right#DBCommon.NullDBPage
THEN
SIGNAL TakeALookAtThis; -- trying to destroy leaf without calling DBIndexOp.RemoveLinks
IF deletionTurnedOn THEN DBIndexPage.DestroyPage[page.tree.segment, page, page.db]
ELSE DBIndexPage.UnlockPage[page];
END;
Manipulations on IndexScanHandles
Index: PROC [x: IndexScanHandle] RETURNS [CARDINAL] = {RETURN [x.index]};
Search
Search:
PROC [r: RealIndexHandle, y:
REF TEXT]
RETURNS [DBCommon.DBPage,
CARDINAL] = {
Returns the index i such that db[i-1]<y<=db[i]
p: Page ← GetRootPage[r];
db: DBCommon.DBPage;
i: CARDINAL;
[db, i] ← SearchTree[p, y];
DBIndexPage.UnlockPage[p];
RETURN [db, i] };
SearchTree:
PROC [page: Page, key:
REF TEXT]
RETURNS [db: DBCommon.DBPage, i:
CARDINAL] = {
Returns the index i such that db[i-1]<key<=db[i]
son: Page;
temp: DBCommon.DBPage;
GotoRightKey:
PROC =
TRUSTED {
Assigns <dbpage of right page, and 0> if there is a right page. Do nothing is there is no right page
core: LONG POINTER TO Core ← page.pointer;
IF core.right = DBCommon.NullDBPage THEN RETURN;
db ← core.right;
i ← 0
};
IF page.depth = 1
THEN
-- leaf --
{db ← page.db;
i ← FindTheFirstLeafKey[page, key];
TRUSTED {IF i = page.pointer.size THEN GotoRightKey[] };
RETURN [db, i]};
i ← FindTheFirstInternalKey[page, key];
son ← DBIndexPage.GetPage[page.tree, temp ← Value[page, i], page.depth - 1];
[db, i] ← SearchTree[son, key];
DBIndexPage.UnlockPage[son];
RETURN [db, i]
};
SearchTheFirstPage:
PROC [r: RealIndexHandle]
RETURNS [DBCommon.DBPage] = {
p: Page;
db: DBCommon.DBPage;
IF r.depth = 1 THEN RETURN [r.rootDB];
p ← GetRootPage[r];
db ← SearchTheFirstTree[p];
DBIndexPage.UnlockPage[p];
RETURN [db]
};
SearchTheFirstTree:
PROC [p: Page]
RETURNS [DBCommon.DBPage] = {
db: DBCommon.DBPage;
IF p.depth = 2 THEN RETURN [Value[p, 0]]
ELSE {
son: Page ← DBIndexPage.GetPage[p.tree, Value[p, 0], p.depth - 1];
db ← SearchTheFirstTree[son];
DBIndexPage.UnlockPage[son];
RETURN [db] }
};
SearchTheLastPage:
PROC [r: RealIndexHandle]
RETURNS [DBCommon.DBPage,
CARDINAL] = {
p: Page ← GetRootPage[r];
db: DBCommon.DBPage;
i, s: CARDINAL;
IF r.depth = 1
THEN
TRUSTED {s ← p.pointer.size; DBIndexPage.UnlockPage[p]; RETURN [r.rootDB, s]};
[db, i] ← SearchTheLastTree[p];
DBIndexPage.UnlockPage[p];
RETURN [db, i]
};
SearchTheLastTree:
PROC [p: Page]
RETURNS [DBCommon.DBPage,
CARDINAL] =
TRUSTED {
l: CARDINAL ← p.depth;
s: CARDINAL ← p.pointer.size;
index, sonSize: CARDINAL;
sonDB: DBCommon.DBPage ← Value[p, s - 1];
son: Page;
db: DBCommon.DBPage;
son ← DBIndexPage.GetPage[p.tree, sonDB, l - 1];
IF l = 2
THEN {
sonSize ← son.pointer.size;
DBIndexPage.UnlockPage[son];
RETURN [sonDB, sonSize] }
ELSE {
[db, index] ← SearchTheLastTree[son];
DBIndexPage.UnlockPage[son];
RETURN [db, index] }
};
Index tuple
WriteIndexTuple:
PROC [tuple: DBStorage.TupleHandle, root: DBCommon.DBPage, depth:
CARDINAL] = {
Writes out the new information into database
tid: DBCommon.TID = DBStorageTuple.TIDOfTuple[tuple];
page: DBCommon.DBPage = DBCommon.DecomposeTID[tid].page;
segment: DBCommon.SegmentID = DBSegment.SegmentIDFromDBPage[page];
ttr: DBIndexPage.TupleTree = NEW[DBIndexPage.TupleTreeRecord← [root, depth, tid, segment]];
DBIndexPage.WriteIndexObject[tuple, ttr]
};
Check procedures
CheckTreeInit:
PROC [q: RealIndexHandle, op:
ROPE, item:
ROPE ←
NIL]
RETURNS [size:
CARDINAL] = {
prevDB ← DBCommon.NullDBPage;
IF NOT CheckFlag THEN RETURN;
opCount← opCount+1;
IF opCount=stopAtOpCount THEN SIGNAL TakeALookAtThis;
IF watchForThisKey#
NIL
AND
watchForThisKey.Equal[item.Substr[0, watchForThisKey.Length[]]] THEN
WriteHeaderStat[Rope.Cat["******Monitored key: ", op], item];
IF PrintFlag THEN WriteHeaderStat[op, item];
IF q.depth = 0 THEN RETURN [0]
ELSE {
root: Page ← GetRootPage[q];
temp: IndexKey;
savePrintingFlag: BOOL← PrintFlag;
PrintFlag← FALSE; -- only do printing during CheckTreeFinal, not CheckTreeInit
[size, temp] ← CheckPage[root, NIL];
PrintFlag← savePrintingFlag;
DBIndexPage.UnlockPage[root] }
};
CheckTreeFinal:
PROC [q: RealIndexHandle, size:
CARDINAL] = {
Checks tree for proper syntactic form. Note: page depth is 1 for leaf nodes, and for internal tree nodes depth is height of the tree (max distance from leaves). If the tree is empty, then q.depth=0 and there is NO root node (the root node is allocated/deallocated when the first/last entry is inserted/deleted).
sonSize: CARDINAL;
prevDB ← DBCommon.NullDBPage;
IF NOT CheckFlag THEN RETURN;
IF q.depth = 0
THEN
IF size # 0 THEN SIGNAL BadBTree ELSE RETURN;
{root: Page ← GetRootPage[q];
temp: IndexKey;
[sonSize, temp] ← CheckPage[root, NIL];
IF sonSize # size THEN SIGNAL BadBTree;
DBIndexPage.UnlockPage[root]};
IF nextDB # DBCommon.NullDBPage THEN SIGNAL BadBTree
};
CheckPage:
PROC [page: Page, start: IndexKey]
RETURNS [size:
CARDINAL, end: IndexKey] =
TRUSTED {
Debugging routine: check that this page of index and its descendants ok
er: IndexKey;
son: Page;
i, sonSize: CARDINAL;
size ← 0;
CheckCacheLock[page.db, page.cache];
CheckPageSize[page];
IF page.depth=0 THEN SIGNAL BadBTree;
IF page.depth = 1
THEN
{[size, end] ← CheckLeafPage[page, start]; RETURN};
IF PrintFlag THEN WritePageStat[page];
IF page.pointer.size=0
-- empty pages never permitted
OR page.pointer.size=1 -- only children of internal pages not permitted
OR page.pointer.size>1000 -- probably a subtraction from zero
THEN SIGNAL BadBTree;
CheckInternalEntries[page];
son ← DBIndexPage.GetPage[page.tree, Value[page, 0], page.depth - 1];
[sonSize, end] ← CheckPage[son, start];
size ← size + sonSize;
DBIndexPage.UnlockPage[son];
FOR i
IN [1..page.pointer.size)
DO
er ← Key[page, i];
IF end # NIL AND NOT LessEq[RefText.TrustTextAsRope[DBIndexOp.RopeForKey[er]], end] THEN SIGNAL BadBTree;
son ← DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1];
[sonSize, end] ← CheckPage[son, er];
size ← size + sonSize;
DBIndexPage.UnlockPage[son]
ENDLOOP
};
CheckLeafPage:
PROC [page: Page, start: IndexKey]
RETURNS [size:
CARDINAL, end: IndexKey] =
TRUSTED {
i: CARDINAL;
prevdesc: IndexKey ← start;
left, right: DBCommon.DBPage;
left ← page.pointer.left;
right ← page.pointer.right;
IF left # prevDB THEN SIGNAL BadBTree;
prevDB ← page.db;
nextDB ← right;
size ← 0;
IF PrintFlag THEN WritePageStat[page];
IF page.pointer.size=0 OR page.pointer.size>1000 THEN SIGNAL BadBTree;
CheckLeafEntries[page];
FOR i
IN [0..page.pointer.size)
DO
end ← Key[page, i];
size ← size + 1;
IF prevdesc # NIL AND NOT LessEq[RefText.TrustTextAsRope[DBIndexOp.RopeForKey[end]], prevdesc] THEN SIGNAL BadBTree;
prevdesc ← end
ENDLOOP
};
WriteHeaderStat:
PROC [op:
ROPE, item:
ROPE] = {
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
debugStream.PutF["**Op #%g: %g %g\n", IO.int[opCount], IO.rope[op], IO.rope[item]]
};
WritePageStat:
PROC [page: Page] =
TRUSTED {
Write page to debug stream, on one indented line, including up to PrintBreadth entries.
depth: CARDINAL ← page.depth;
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
IF depth>10 THEN SIGNAL TakeALookAtThis;
THROUGH [0..5-depth)
DO
debugStream.PutF[" "];
ENDLOOP;
debugStream.PutF["%8bB (%2d):", IO.card[page.db], IO.card[page.pointer.size]];
FOR i: CARDINAL IN [0..page.pointer.size) DO WriteEntry[page, i] ENDLOOP;
debugStream.PutChar['\n];
};
WriteEntry:
PROC[page: Page, entryNum:
CARDINAL] =
TRUSTED {
entry: ItemHandle = CoreAddr[page, entryNum];
debugStream: IO.STREAM = DBCommon.GetDebugStream[];
IF entryNum#0
OR page.depth=1
THEN
BEGIN -- non-leaf pages have no key for the zero-th entry, so do only leaves & non-0th entries
debugStream.PutChar[' ];
IF entry.length>DBIndex.CoreIndexSize
THEN
debugStream.PutF[" ! entry too long(%g) ! ", IO.card[entry.length]]
ELSE
FOR j:
CARDINAL
IN [0..entry.length)
DO
WriteChar[debugStream, entry.text[j]] ENDLOOP;
END;
IF PrintInternalValues
AND page.depth#1
OR PrintLeafValues
AND page.depth=1
THEN
debugStream.PutF[" (%bB)", IO.card[entry.value]];
};
WriteKey:
PROC[entry: IndexKey] =
TRUSTED {
For calling from the debugger; print the whole key rather than 1st 10 chars as above
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
debugStream.PutF["(length %g): ", IO.int[entry.length]];
FOR j: CARDINAL IN [0..entry.length) DO WriteChar[debugStream, entry.text[j]] ENDLOOP;
debugStream.PutChar['\n];
};
WriteItem:
PROC[entry: ItemHandle] =
TRUSTED {
For calling from the debugger; print the whole key rather than 1st 10 chars as above
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
debugStream.PutF["(length %g): ", IO.int[entry.length]];
FOR j: CARDINAL IN [0..entry.length) DO WriteChar[debugStream, entry.text[j]] ENDLOOP;
debugStream.PutChar['\n];
};
WriteChar:
PROC[stream:
IO.
STREAM, char:
CHAR] = {
i: CARDINAL = LOOPHOLE[char];
IF char>176C OR char<040C THEN stream.PutF["\\\\%g", IO.card[i]]
ELSE stream.PutChar[char]
};
CheckPageSize:
PROC [page: Page] =
TRUSTED {
IF page.pointer.size = 0 THEN SIGNAL BadBTree
};
CheckCacheLock:
PROC [db: DBCommon.DBPage, cache: DBCommon.CacheHandle] = {
IF DBSegment.LockCount[db, cache] # 1 THEN SIGNAL BadBTree;
};
CheckLeafEntries:
PROC [page: Page] =
TRUSTED {
endIndex: LONG POINTER TO UNSPECIFIED ← EndAddr[page];
entry: ItemHandle;
i: CARDINAL;
FOR i
IN [0..page.pointer.size)
DO
entry ← CoreAddr[page, i];
IF @entry.text + (entry.length + 1) / 2 # endIndex THEN SIGNAL BadBTree;
endIndex ← entry
ENDLOOP
};
CheckInternalEntries:
PROC [page: Page] =
TRUSTED {
endIndex: LONG POINTER TO UNSPECIFIED ← EndAddr[page] - 2;
entry: ItemHandle;
i: CARDINAL;
FOR i
IN [1..page.pointer.size)
DO
entry ← CoreAddr[page, i];
IF @entry.text + (entry.length + 1) / 2 # endIndex THEN SIGNAL BadBTree;
endIndex ← entry
ENDLOOP
};
Main body
DBIndexScan.InitScan[];
END.