DBIndexImpl:
CEDAR
PROGRAM
IMPORTS
DB,
DBCommon,
DBSegment,
DBStats,
DBStorageInternal,
DBIndexFilePage,
DBIndexHandle,
DBIndexOp,
DBIndexMod,
DBIndexPage,
DBIndexScan,
IO,
Rope
EXPORTS
DBIndex,
DBStorage
= BEGIN OPEN DBIndexFilePage;
Types
IndexScanObject: PUBLIC TYPE = DBStorageConcrete.IndexScanObject;
IndexScanHandle: TYPE = REF IndexScanObject;
Page: TYPE = DBIndex.Page;
IndexKey: TYPE = DBIndex.IndexKey;
Core: TYPE = DBIndex.Core;
State: TYPE = DBIndex.State;
RealIndexHandle: TYPE = DBIndex.RealIndexHandle;
ItemHandle: TYPE = DBIndex.ItemHandle;
FullPage: CARDINAL = DBIndex.FullPage;
OverHead: CARDINAL = DBIndex.OverHead;
ROPE: TYPE = Rope.ROPE;
Signals
BadBTree: SIGNAL = CODE;
KeyIsNotFound: ERROR = CODE;
TakeALookAtThis: SIGNAL = CODE; -- for debugging
Constants
DeletionOfNonExistentKey: CARDINAL = 201B;
ThisIndexWasNotCreated: CARDINAL = 202B;
Global vars (mainly for debugging)
CheckFlag: BOOLEAN ← FALSE; -- do checking for illegal B-Tree format after each operation
ExtraCheckFlag: BOOLEAN ← FALSE; -- check for unusual but not illegal cases
PrintFlag: BOOLEAN ← FALSE; -- turn on debugging output
PrintBreadth: CARDINAL← 4; -- for debug output: # key entries to print per page
PrintInternalValues: BOOL← TRUE; -- for debug output: print values associated with internal keys
PrintLeafValues: BOOL← FALSE; -- for debug output: print values associated with leaf keys
opCount: INT← 0; -- counts number of operations for printing purposes
watchForThisKey: ROPE← NIL; -- prints this key when entered or deleted
stopAtOpCount: INT← 0; -- signals when get to this op count
deletionTurnedOn: BOOL ← FALSE; -- this should normally be TRUE, to free B-Tree pages
tryRecovery: BOOL ← FALSE; -- tries to recover from bad B-Tree; this should normally be FALSE
prevDB, nextDB: DBCommon.DBPage; -- Used only by check programs
Public procs
CreateIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle] = {
DBIndexHandle.CreateNewRealIndexHandle[DBStorageInternal.TIDOfTuple[x]];
WriteIndexTuple[tuple: x, root: DBCommon.NullDBPage, depth: 0]
};
DestroyIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle] = {
Destroys the tree itself and the index handle but not the tuple the latter is stored in
q: RealIndexHandle ← DBIndexHandle.GetOldRealIndexHandle[x];
DestroyTree[q];
DBIndexHandle.DestroyIndexHandle[q];
WriteIndexTuple[tuple: x, root: DBCommon.NullDBPage, depth: 0]
};
InsertIntoIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle, k:
ROPE, v: DBStorage.TupleHandle] = {
key, newKey: IndexKey;
q: RealIndexHandle ← DBIndexHandle.GetOldRealIndexHandle[x];
newP, overFlow, root: Page;
state: State;
tid: DBStorageInternal.TID ← DBStorageInternal.TIDOfTuple[v];
size: CARDINAL ← CheckTreeInit[q, "Inserting", k];
DBStats.Inc[DBIndexInsert];
key ← CreateIndexKey[k];
IF q.depth = 0
THEN
{newP ← DBIndexPage.CreateEmptyPage[q, 1,
-- the level -- q.segment];
DBIndexMod.InsertTheFirstLeafEntry[newP, key, tid];
PutRootPageToIndexHandle[q, newP];
IncrementDepth[q];
DBIndexScan.PutScanIndex[q, newP];
WriteIndexTuple[tuple: x, root: newP.db, depth: 1];
DBIndexPage.UnlockPage[newP] }
ELSE
{root ← GetRootPage[q];
[state, overFlow, newKey] ← InsertIntoTree[root, key, tid];
SELECT state
FROM
deleteFromNextPage => ERROR KeyIsNotFound;
normal => NULL;
split => SplitRoot[tuple: x, root: q, overflow: overFlow, splitKey: newKey];
ENDCASE => ERROR;
FreeKey[newKey];
DBIndexPage.UnlockPage[root] };
CheckTreeFinal[q, size + 1];
FreeKey[key]
};
DeleteFromIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle, k:
ROPE, v: DBStorage.TupleHandle] = {
tid: DBStorageInternal.TID ← DBStorageInternal.TIDOfTuple[v];
q: RealIndexHandle ← DBIndexHandle.GetOldRealIndexHandle[x];
newRoot, overflow, root: Page;
state: State;
key, newKey: IndexKey;
size: CARDINAL ← CheckTreeInit[q, "Deleting", k];
DBStats.Inc[DBIndexDelete];
key ← CreateIndexKey[k];
IF q.depth = 0
THEN ERROR DB.InternalError -- Deleting entry from empty index!
ELSE
{root ← GetRootPage[q];
[state, overflow, newKey] ← DeleteFromTree[root, key, tid];
SELECT state
FROM
deleteFromNextPage => ERROR KeyIsNotFound;
normal => DBIndexPage.UnlockPage[root];
merge =>
TRUSTED {
IF root.depth = 1
AND root.pointer.size = 0
THEN
Deleted last entry in index (on root page). Destroy the root page.
{
IF ExtraCheckFlag
THEN
SIGNAL TakeALookAtThis;
DBIndexPage.DestroyPage[q.segment, root, root.db];
DBIndexHandle.DeleteHandle[q];
DBIndexScan.ScanForNullTree[q];
WriteIndexTuple[tuple: x, root: q.rootDB, depth: 0] }
ELSE
One entry left on a non-root page; want to merge it.
IF root.depth > 1
AND root.pointer.size = 1
THEN
{
IF ExtraCheckFlag
THEN
SIGNAL TakeALookAtThis;
newRoot ← DeleteRoot[root, q.rootDB];
PutRootPageToIndexHandle[q, newRoot];
DecrementDepth[q];
WriteIndexTuple[tuple: x, root: q.rootDB, depth: q.depth];
DBIndexPage.UnlockPage[newRoot] }
ELSE
DBIndexPage.UnlockPage[root];
};
split =>
{SplitRoot[tuple: x, root: q, overflow: overflow, splitKey: newKey];
DBIndexPage.UnlockPage[root] }
ENDCASE => ERROR;
FreeKey[newKey] };
CheckTreeFinal[q, size - 1];
FreeKey[key]
}; -- end of DeleteFromIndex
OpenScanIndex:
PUBLIC
PROC [x: DBStorage.IndexHandle, y: DBStorage.Selection, startPosition: DBStorage.FirstLast]
RETURNS [scan: DBStorage.IndexScanHandle] = {
index: CARDINAL;
key: IndexKey;
page: DBCommon.DBPage;
q: RealIndexHandle ← DBIndexHandle.GetOldRealIndexHandle[x];
CreateTheNextKey:
PROC [key:
ROPE]
RETURNS [IndexKey] =
INLINE {RETURN[CreateIndexKey[Rope.Concat[key, "\000"]]]};
size: CARDINAL ← CheckTreeInit[q, "Scanning index from", y.lowerBound];
IF q.depth = 0
THEN
{page ← DBCommon.NullDBPage;
index ← 0}
ELSE
IF startPosition = First
THEN
IF y.lowerBoundInfinity
THEN {page ← SearchTheFirstPage[q]; index ← 0}
ELSE
{
IF y.includeLowerBound
THEN
[page, index] ←
Search[q, key ← CreateIndexKey[y.lowerBound]]
ELSE
{key ← CreateTheNextKey[y.lowerBound];
[page, index] ← Search[q, key]};
FreeKey[key]}
ELSE
IF y.upperBoundInfinity
THEN [page, index] ← SearchTheLastPage[q]
ELSE
{
IF y.includeUpperBound
THEN
{key ← CreateTheNextKey[y.upperBound];
[page, index] ← Search[q, key]}
ELSE
[page, index] ←
Search[q, key ← CreateIndexKey[y.upperBound]];
FreeKey[key]};
scan ← DBIndexScan.CreateScanHandle[q, y, startPosition, page, index];
CheckTreeFinal[q, size]
};
NextScanIndex:
PUBLIC
PROC [x: IndexScanHandle]
RETURNS [DBStorage.TupleHandle] =
TRUSTED {
Note that we fetch the tuple currently ref'd by x and THEN increment the scan to the next tuple, as the convention is that x already refs the next tuple to scan. This won't do for intended high-level semantics of scans.
page: Page;
tuple: DBStorage.TupleHandle;
size: CARDINAL;
IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released.
size ← CheckTreeInit[x.tree, "NextScanIndex"];
IF x.this = DBCommon.NullDBPage THEN RETURN [NIL];
page ← DBIndexPage.GetPage[x.tree, x.this, 1];
IF x.index >= page.pointer.size
check for consistency
THEN { IF page.pointer.right # DBCommon.NullDBPage THEN ERROR; tuple ← NIL }
ELSE
{key: IndexKey ← Key[page, x.index];
IF
NOT x.upperBoundInfinity
AND ( x.includeUpperBound
AND Less[x.upperBound, key]
OR
NOT x.includeUpperBound
AND LessEq[x.upperBound, key] )
THEN tuple ← NIL
ELSE
{tuple ← DBStorageInternal.MakeTupleHandle[tid: Tid[page, x.index]];
page ← IncrementIndex[x, page]} };
DBIndexPage.UnlockPage[page];
CheckTreeFinal[x.tree, size];
RETURN [tuple]
};
NextScanKey:
PUBLIC
PROC [x: IndexScanHandle]
RETURNS [key: DBIndex.IndexKey] =
TRUSTED {
Note that we fetch the tuple currently ref'd by x and THEN increment the scan to the next tuple, as the convention is that x already refs the next tuple to scan. This won't do for intended high-level semantics of scans.
page: Page;
IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released.
IF x.this = DBCommon.NullDBPage THEN RETURN [NIL];
page ← DBIndexPage.GetPage[x.tree, x.this, 1];
IF x.index >= page.pointer.size
check for consistency
THEN { IF page.pointer.right # DBCommon.NullDBPage THEN ERROR; key ← NIL }
ELSE {
key ← Key[page, x.index];
IF
NOT x.upperBoundInfinity
AND (x.includeUpperBound
AND Less[x.upperBound, key]
OR
NOT x.includeUpperBound
AND LessEq[x.upperBound, key])
THEN { key ← NIL; x.direction ← finished }
ELSE x.direction ← next };
RETURN [key]
};
PrevScanIndex:
PUBLIC
PROC[x: IndexScanHandle]
RETURNS [tuple: DBStorage.TupleHandle] = {
Decrement scan and then fetch tuple ref'd by x.
page: Page;
size: CARDINAL;
IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released.
size ← CheckTreeInit[x.tree, "PrevScanIndex"];
IF x.this = DBCommon.NullDBPage THEN RETURN [NIL];
page ← DBIndexPage.GetPage[x.tree, x.this, 1];
IF x.index = 0
AND NoPreviousPage[page]
ELSE
{key: IndexKey ← Key[page, x.index];
page ← DecrementIndex[x, page];
IF
NOT x.lowerBoundInfinity
AND ( x.includeLowerBound
AND Greater[x.lowerBound, key]
OR
NOT x.includeLowerBound
AND GreaterEq[x.lowerBound, key] )
THEN tuple ← NIL
ELSE
tuple ← DBStorageInternal.MakeTupleHandle[Tid[page, x.index], ]};
DBIndexPage.UnlockPage[page];
CheckTreeFinal[x.tree, size];
RETURN [tuple]
}; -- PredScanIndex
PrevScanKey:
PUBLIC
PROC[x: IndexScanHandle]
RETURNS [key: DBIndex.IndexKey] = {
page: Page;
IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released.
IF x.this = DBCommon.NullDBPage THEN RETURN [NIL];
page ← DBIndexPage.GetPage[x.tree, x.this, 1];
IF x.index = 0
AND NoPreviousPage[page]
THEN { key ← NIL; x.direction ← finished }
ELSE {
key ← Key[page, x.index];
IF
NOT x.lowerBoundInfinity
AND (x.includeLowerBound
AND Greater[x.lowerBound, key]
OR
NOT x.includeLowerBound
AND GreaterEq[x.lowerBound, key])
THEN {key ← NIL; x.direction ← finished}
ELSE x.direction ← prev };
RETURN [key]
}; -- PrevScanKey
ThisScanTuple:
PUBLIC
PROC[x: IndexScanHandle]
RETURNS [tuple: DBStorage.TupleHandle] = {
IF x.direction = finished THEN RETURN[NIL];
BEGIN
page: Page ← IF x.direction = next THEN DBIndexPage.GetPage[x.tree, x.this, 1] ELSE DecrementIndex[x, DBIndexPage.GetPage[x.tree, x.this, 1]];
tuple ← DBStorageInternal.MakeTupleHandle[tid: Tid[page, x.index]];
IF x.direction = next THEN page ← IncrementIndex[x, page];
DBIndexPage.UnlockPage[page]
END };
IgnoreEntry:
PUBLIC PROC[x: IndexScanHandle] = {
page: Page = IF x.direction = next THEN DBIndexPage.GetPage[x.tree, x.this, 1] ELSE DecrementIndex[x, DBIndexPage.GetPage[x.tree, x.this, 1]];
DBIndexPage.UnlockPage[page] };
InitializeIndex: PUBLIC PROC = {};
CallAfterFinishTransaction:
PUBLIC
PROC [s: DBCommon.Segment] =
{FinalizeIndex[s]};
FinalizeIndex:
PROC [s: DBCommon.Segment] = {
DBIndexPage.DestroyPageList[s];
DBIndexHandle.FinalizeIndexHandle[s];
DBIndexScan.FreeScan[s]
};
Private Procedures
Insertion
InsertIntoTree:
PROC
[page: Page, key: IndexKey, value: DBStorageInternal.TID]
RETURNS [State, Page, IndexKey] = {
Inserts <key, value> into the tree emanating from page a[0]<=key.
Caller must deallocate IndexKey returned.
i: CARDINAL;
newKey, newSplitKey: IndexKey;
newOverFlow, overFlow: Page;
q: Page;
state: State;
IF page.depth = 1
-- leaf --
THEN
{DBStats.Inc[BTreeSearchPage];
IF DBIndexOp.OverFlow[page, key]
THEN
{[overFlow, newKey] ← DBIndexOp.SplitLeaf[page, key, value];
RETURN [split, overFlow, newKey]}
ELSE
{SlideLeaf[page, key, value];
RETURN [normal, NIL, NIL]}}
ELSE
{DBStats.Inc[BTreeSearchPage];
i ← FindTheLastInternalKey[page, key];
i=0&key<a[1] | i<a.length-1&a[i]<=key<a[i+1] | i=a.length-1&a[i]<=key
q ← DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1];
[state, overFlow, newKey] ← InsertIntoTree[q, key, value];
SELECT state
FROM
normal =>
{DBIndexPage.UnlockPage[q];
FreeKey[newKey];
RETURN [normal, NIL, NIL]};
split =>
{[state, newOverFlow, newSplitKey] ←
DBIndexOp.InsertInInternalPage[page, i, overFlow.db, newKey];
FreeKey[newKey];
DBIndexPage.UnlockPage[q];
DBIndexPage.UnlockPage[overFlow];
RETURN [state, newOverFlow, newSplitKey]}
ENDCASE => ERROR}
};
SplitRoot:
PROC
[tuple: DBStorage.TupleHandle, root: RealIndexHandle, overflow: Page,
splitKey: IndexKey] = {
Splits the root into two and creates another page with two entries, root and overflow
newP: Page;
newP ← DBIndexPage.CreateEmptyPage[root, root.depth + 1, root.segment];
DBIndexMod.InsertTwoEntriesToEmptyPage[newP, root.rootDB, splitKey, overflow.db];
PutRootPageToIndexHandle[root, newP];
IncrementDepth[root];
DBIndexPage.UnlockPage[overflow];
WriteIndexTuple[tuple: tuple, root: newP.db, depth: root.depth];
DBIndexPage.UnlockPage[newP]
};
Deletion
DeleteFromTree:
PROC
[page: Page, key: IndexKey, tid: DBStorageInternal.TID]
RETURNS [State, Page, IndexKey] = {
Deletes the entry with given key. Caller must deallocate
IndexKey returned. If multiple entries, then the entry
with the value tid is deleted. Deletion has difficult problems in cases
like ours, where we allow multiple entries with the same key.
If we find the entry at the beginning of the page, there is a
possibility that the same key exists in the left page. Therefore,
the search algorithm is that in every internal page, look for the
left most key which has the same or equal value to the argument
key. If the same, go down the left of the key; if greater go
down the right.
i: CARDINAL;
newKey, retKey: IndexKey;
overflow, retOverflow, q: Page;
state: State;
DBIndexPage.CheckTag[page.pointer];
IF page.depth = 1
THEN
{DBStats.Inc[BTreeSearchPage];
[state, ,] ← DBIndexOp.DeleteFromLeafPage[page, key, tid];
DBIndexPage.CheckTag[page.pointer];
RETURN [state, NIL, NIL]};
DBStats.Inc[BTreeSearchPage];
i ← FindTheFirstInternalKey[page, key];
i=0&k<=a[i] OR 0<i<max[a]&a[i-1]<k<=a[i] OR i=max[a]&a[i]<key;
DO q ← DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1];
[state, overflow, newKey] ← DeleteFromTree[q, key, tid];
SELECT state
FROM
deleteFromNextPage => {
DBIndexPage.UnlockPage[q];
i ← i + 1;
TRUSTED {IF i >= page.pointer.size THEN RETURN [state, overflow, newKey]}
};
normal =>
{DBIndexPage.UnlockPage[q];
FreeKey[newKey];
RETURN [normal, NIL, NIL]};
merge =>
{[state, overflow, retKey] ← MergeInThisPage[page, q, i];
FreeKey[newKey];
RETURN [state, overflow, retKey]};
split =>
{[state, retOverflow, retKey] ← DBIndexOp.InsertInInternalPage[page, i, overflow.db, newKey];
FreeKey[newKey];
DBIndexPage.UnlockPage[q];
DBIndexPage.UnlockPage[overflow];
RETURN [state, retOverflow, retKey]}
ENDCASE => ERROR
ENDLOOP
}; -- DeleteFromTree
DestroyTree:
PROC [q: RealIndexHandle] = {
Deallocates all file pages of Btree
DestroyBelow:
PROC [tree: RealIndexHandle, root: Page] = {
Deallocates all file pages emanating from this page.
The root page is not destroyed. If there is no root page
(an empty B-Tree), simply return.
db: DBCommon.DBPage;
i: CARDINAL;
q: Page;
IF root.depth = 1
THEN RETURN -- leaves already freed by parent
ELSE
TRUSTED {
FOR i
IN [0..root.pointer.size)
DO
q ← DBIndexPage.GetPage[tree, db ← Value[root, i], root.depth - 1
! DBIndexPage.BadPage => IF tryRecovery THEN LOOP];
DestroyBelow[tree, q];
DBIndexPage.DestroyPage[tree.segment, q, db]
ENDLOOP
};
}; -- end DestroyBelow
root: Page;
IF q.root=NIL AND q.rootDB=0 THEN RETURN;
root ← GetRootPage[q];
DestroyBelow[q, root];
DBIndexPage.DestroyPage[q.segment, root, q.rootDB];
q.rootDB ← DBCommon.NullDBPage;
q.root ← NIL;
q.depth ← 0;
q.free ← TRUE
};
PutRootPageToIndexHandle:
PROC [q: RealIndexHandle, p: Page] = {
Sets DBPage and CacheHint of root
q.rootDB ← p.db; q.root ← p.cache
};
IncrementDepth: PROC [q: RealIndexHandle] = {q.depth ← q.depth + 1};
GetRootPage:
PROC [q: RealIndexHandle]
RETURNS [Page] = {
RETURN[DBIndexPage.GetPage[tree: q, db: q.rootDB, level: q.depth]]};
DeleteRoot:
PROC [p: Page, r: DBCommon.DBPage]
RETURNS [Page] = {
p contains one element. Deletes p and returns the son
son: Page ← DBIndexPage.GetPage[p.tree, Value[p, 0], p.depth - 1];
obtain the first item
DBIndexPage.DestroyPage[p.tree.segment, p, r];
RETURN [son]
};
DecrementDepth: PROC [q: RealIndexHandle] = {q.depth ← q.depth - 1};
IncrementIndex:
PROC [x: IndexScanHandle, p: Page]
RETURNS [Page] =
TRUSTED {
Increments the index of scan. If it falls off the page, then the right page is fetched and
returned. The original page is deallocated in that case.
db: DBCommon.DBPage;
q: Page;
IF x.index >= p.pointer.size - 1
THEN
{[q, db] ← GetRightPage[p];
IF q = NIL THEN {x.index ← x.index + 1; RETURN [p]};
DBIndexPage.UnlockPage[p];
x.this ← db;
x.index ← 0;
RETURN [q]}
ELSE
{x.index ← x.index + 1; RETURN [p]}
};
DecrementIndex:
PROC [x: IndexScanHandle, p: Page]
RETURNS [Page] =
TRUSTED {
Returns the left page if the previous element is in the left page
q: Page;
db: DBCommon.DBPage;
IF x.index # 0 THEN {x.index ← x.index - 1; RETURN [p]};
[q, db] ← GetLeftPage[p];
IF q = NIL THEN RETURN [p];
DBIndexPage.UnlockPage[p];
x.this ← db;
x.index ← q.pointer.size - 1;
RETURN [q]
};
GetRightPage:
PROC [p: Page]
RETURNS [Page, DBCommon.DBPage] =
TRUSTED {
p is a leaf page. Reads in the right page. If no right page, returns NIL
rightDB: DBCommon.DBPage;
IF (rightDB ← p.pointer.right) = DBCommon.NullDBPage
THEN
RETURN [NIL, DBCommon.NullDBPage];
RETURN [DBIndexPage.GetPage[p.tree, rightDB, 1], rightDB]
};
GetLeftPage:
PROC [p: Page]
RETURNS [Page, DBCommon.DBPage] =
TRUSTED {
p is a leaf page. Reads in the left page. If no left page, returns NIL
leftDB: DBCommon.DBPage;
IF (leftDB ← p.pointer.left) = DBCommon.NullDBPage
THEN
RETURN [NIL, DBCommon.NullDBPage];
RETURN [DBIndexPage.GetPage[p.tree, leftDB, 1], leftDB]
};
Internal movement of Btree pages
SlideLeaf:
PROC [p: Page, key: IndexKey, value:
LONG
CARDINAL] = {
Shifts part of entries in p and inserts, key and value
It is guranteed that there won't be any overflow.
index: CARDINAL ← FindTheLastLeafKey[p, key];
DBIndexMod.SlideLeafAt[p, index, key, value]
};
MergeInThisPage:
PROC
[p: Page, son: Page, index: CARDINAL]
RETURNS [State, Page, IndexKey] = {
son is the index-th entry in p.
son is sparse, so balance it.
Returns normal if p is normal,
delete if p needs balancing,
split if p needs splitting,
merge if p is less than half full
This procedure does a UnlockPage or DestroyPage on son, caller should not touch it.
Caller must free IndexKey returned.
state: State;
page: Page;
key: IndexKey;
IF son.depth = 1
THEN [state, page, key] ← DBIndexOp.MergeInLeafPage[p, son, index]
ELSE [state, page, key] ← MergeInInternalPage[p, son, index];
IF state = delete
THEN
{state ← DBIndexMod.RemoveFromInternal[p, index]; DestroyThisPage[son]}
ELSE
DBIndexPage.UnlockPage[son];
RETURN [state, page, key]
};
MergeInInternalPage:
PROC [parent: Page, son: Page, index:
CARDINAL]
RETURNS [State, Page, IndexKey] = TRUSTED {
"son" is an internal page.
"son" is the index-th entry in "parent"
"son" is sparse, so balance it.
Returns normal if son is normal,
delete if son needs balancing,
split if son needs splitting,
Caller must deallocate IndexKey returned.
freeSon: CARDINAL ← FreeSpace[son];
i: CARDINAL;
newKey, tempKey: IndexKey;
sizeSon: CARDINAL ← son.pointer.size;
state: State;
overflow: Page;
We balance son with its right brother except in the unusual case in which
it has no right brother (when index=parent.pointer.size-1). There will always
be a left or right brother because we don't permit "only child" leaf or internal
pages.
IF index # parent.pointer.size - 1
THEN
{
-- balance the son with the right page
right: Page ←
DBIndexPage.GetPage[parent.tree, Value[parent, index + 1], son.depth];
IF MergableToRightInternal
[parent: parent, leftFree: freeSon, rightFree: FreeSpace[right],
leftAt: index] THEN
move ALL entries from son to right; caller will delete son
{DBIndexMod.MoveEntriesToRightInternal
[from: son, to: right, key: Key[parent, index + 1],
nentries: sizeSon];
DBIndexPage.UnlockPage[right];
RETURN [delete, NIL, NIL]};
Move zero or more entries from right to son to balance
i ← JustOver[right, DBIndex.HalfPage - FrontSize[son, sizeSon - 1] - sizeSon];
If move would move no entries, or would leave zero or one entry on right,
then don't do it: return and leave tree as is. We are moving i entries.
IF i = 0
OR i >= right.pointer.size - 1
THEN
{DBIndexPage.UnlockPage[right]; RETURN[normal, NIL, NIL]}; -- ... so do nothing
tempKey ← CopyKey[Key[right, i]];
DBIndexMod.MoveEntriesToLeftInternal
[from: right, to: son, key: Key[parent, index + 1], nentries: i];
DBIndexPage.UnlockPage[right];
[state, overflow, newKey] ← DBIndexOp.ChangeKey[parent, tempKey, index + 1];
FreeKey[tempKey]}
ELSE
{
-- no right brother: balance with the left
left: Page ←
DBIndexPage.GetPage[parent.tree, Value[parent, index - 1], son.depth];
IF MergableToLeftInternal
[parent: parent, leftFree: FreeSpace[left], rightFree: freeSon,
rightAt: index] THEN
move ALL entries from son to left; caller will delete son
{DBIndexMod.MoveEntriesToLeftInternal
[from: son, to: left, key: Key[parent, index], nentries: sizeSon];
DBIndexPage.UnlockPage[left];
RETURN [delete, NIL, NIL]};
Move zero or more entries from left to son to balance, leaving i+1 in left:
i ← JustOver[left, DBIndex.HalfPage];
If JustOver would have us move zero entries, or all but one entry in left,
then don't do it: return and leave tree as is.
IF left.pointer.size = i + 1
OR i = 0
THEN
{DBIndexPage.UnlockPage[left]; RETURN[normal, NIL, NIL]};
tempKey ← CopyKey[Key[left, i + 1]];
DBIndexMod.MoveEntriesToRightInternal
[from: left, to: son, key: Key[parent, index],
nentries: left.pointer.size - i - 1];
DBIndexPage.UnlockPage[left];
[state, overflow, newKey] ← DBIndexOp.ChangeKey[parent, tempKey, index];
FreeKey[tempKey]};
RETURN [state, overflow, newKey]
};
MergableToRightInternal:
PROC
[parent: Page, leftFree, rightFree: CARDINAL, leftAt: CARDINAL]
RETURNS [BOOLEAN] = TRUSTED {
"left" is the leftAt-th son of "parent" and "right" is just to the right of "left".
Returns TRUE iff left and right can be merged.
key: IndexKey ← Key[parent, leftAt+1];
length: CARDINAL ← (key.length + 3) / 2;
Space used after merging less than a FullPage? Following is equivalent to val returned:
(FullPage - leftFree) + (FullPage - rightFree) - Overhead + length < FullPage
RETURN [rightFree + leftFree + OverHead > FullPage + length]
};
MergableToLeftInternal:
PROC
[parent: Page, leftFree, rightFree: CARDINAL, rightAt: CARDINAL]
RETURNS [BOOLEAN] = TRUSTED {
"right" is the rightAt-th son of "parent" and "left" is just to the
left of "right".
Returns TRUE iff left and right can be merged.
key: IndexKey ← Key[parent, rightAt];
length: CARDINAL ← (key.length + 3) / 2;
RETURN [rightFree + leftFree + OverHead > FullPage + length]
};
NoPreviousPage:
PROC [page: Page]
RETURNS [
BOOLEAN] =
TRUSTED {
RETURN [page.pointer.left = DBCommon.NullDBPage]
};
DestroyThisPage:
PROC [page: Page] =
TRUSTED
Does some checks, then call the real DestroyPage; use for pages removed from B-Tree.
Currently only called by MergeInThisPage, we call directly if destroying whole tree.
BEGIN
IF page.pointer.size#0 THEN SIGNAL TakeALookAtThis;
IF page.pointer.left#DBCommon.NullDBPage
OR page.pointer.right#DBCommon.NullDBPage
THEN
SIGNAL TakeALookAtThis; -- trying to destroy leaf without calling DBIndexOp.RemoveLinks
IF deletionTurnedOn
THEN
DBIndexPage.DestroyPage[page.tree.segment, page, page.db]
ELSE
DBIndexPage.UnlockPage[page];
END;
Manipulations on IndexScanHandles
Index: PROC [x: IndexScanHandle] RETURNS [CARDINAL] = {RETURN [x.index]};
Search
Search:
PROC
[r: RealIndexHandle, y: IndexKey]
RETURNS [DBCommon.DBPage, CARDINAL] = {
Returns the index i such that db[i-1]<y<=db[i]
p: Page ← GetRootPage[r];
db: DBCommon.DBPage;
i: CARDINAL;
[db, i] ← SearchTree[p, y];
DBIndexPage.UnlockPage[p];
RETURN [db, i]
};
SearchTree:
PROC
[page: Page, key: IndexKey]
RETURNS [db: DBCommon.DBPage, i: CARDINAL] = {
Returns the index i such that db[i-1]<key<=db[i]
son: Page;
temp: DBCommon.DBPage;
GotoRightKey:
PROC =
TRUSTED {
Assigns <dbpage of right page, and 0> if there is a right page.
Do nothing is there is no right page
core: LONG POINTER TO Core ← page.pointer;
IF core.right = DBCommon.NullDBPage THEN RETURN;
db ← core.right;
i ← 0
};
IF page.depth = 1
THEN
-- leaf --
{db ← page.db;
i ← FindTheFirstLeafKey[page, key];
TRUSTED {IF i = page.pointer.size THEN GotoRightKey[] };
RETURN [db, i]};
i ← FindTheFirstInternalKey[page, key];
son ← DBIndexPage.GetPage[page.tree, temp ← Value[page, i], page.depth - 1];
[db, i] ← SearchTree[son, key];
DBIndexPage.UnlockPage[son];
RETURN [db, i]
};
SearchTheFirstPage:
PROC [r: RealIndexHandle]
RETURNS [DBCommon.DBPage] = {
p: Page;
db: DBCommon.DBPage;
IF r.depth = 1 THEN RETURN [r.rootDB];
p ← GetRootPage[r];
db ← SearchTheFirstTree[p];
DBIndexPage.UnlockPage[p];
RETURN [db]
};
SearchTheFirstTree:
PROC [p: Page]
RETURNS [DBCommon.DBPage] = {
db: DBCommon.DBPage;
IF p.depth = 2
THEN RETURN [Value[p, 0]]
ELSE
{son: Page ← DBIndexPage.GetPage[p.tree, Value[p, 0], p.depth - 1];
db ← SearchTheFirstTree[son];
DBIndexPage.UnlockPage[son];
RETURN [db]}
};
SearchTheLastPage:
PROC
[r: RealIndexHandle]
RETURNS [DBCommon.DBPage, CARDINAL] = {
p: Page ← GetRootPage[r];
db: DBCommon.DBPage;
i, s: CARDINAL;
IF r.depth = 1
THEN
TRUSTED {s ← p.pointer.size; DBIndexPage.UnlockPage[p]; RETURN [r.rootDB, s]};
[db, i] ← SearchTheLastTree[p];
DBIndexPage.UnlockPage[p];
RETURN [db, i]
};
SearchTheLastTree:
PROC [p: Page]
RETURNS [DBCommon.DBPage,
CARDINAL] =
TRUSTED {
l: CARDINAL ← p.depth;
s: CARDINAL ← p.pointer.size;
index, sonSize: CARDINAL;
sonDB: DBCommon.DBPage ← Value[p, s - 1];
son: Page;
db: DBCommon.DBPage;
son ← DBIndexPage.GetPage[p.tree, sonDB, l - 1];
IF l = 2
THEN
{sonSize ← son.pointer.size; DBIndexPage.UnlockPage[son]; RETURN [sonDB, sonSize]}
ELSE
{[db, index] ← SearchTheLastTree[son]; DBIndexPage.UnlockPage[son]; RETURN [db, index]}
};
Index tuple
WriteIndexTuple:
PROC
[tuple: DBStorage.TupleHandle, root: DBCommon.DBPage, depth: CARDINAL] = {
Writes out the new information into database
ttr: DBStorageInternal.TupleTree←
NEW[DBStorageInternal.TupleTreeRecord← [
root, depth, DBStorageInternal.TIDOfTuple[tuple],
DBStorageInternal.SegmentIDOfTuple[tuple]]];
DBStorageInternal.WriteIndexObject[tuple, ttr]
};
Check procedures
CheckTreeInit:
PROC [q: RealIndexHandle, op:
ROPE, item:
ROPE←
NIL]
RETURNS [size:
CARDINAL] = {
prevDB ← DBCommon.NullDBPage;
IF NOT CheckFlag THEN RETURN;
opCount← opCount+1;
IF opCount=stopAtOpCount THEN SIGNAL TakeALookAtThis;
IF watchForThisKey#
NIL
AND
watchForThisKey.Equal[item.Substr[0, watchForThisKey.Length[]]] THEN
WriteHeaderStat[Rope.Cat["******Monitored key: ", op], item];
IF PrintFlag THEN WriteHeaderStat[op, item];
IF q.depth = 0
THEN RETURN [0]
ELSE
{root: Page ← GetRootPage[q];
temp: IndexKey;
savePrintingFlag: BOOL← PrintFlag;
PrintFlag← FALSE; -- only do printing during CheckTreeFinal, not CheckTreeInit
[size, temp] ← CheckPage[root, NIL];
PrintFlag← savePrintingFlag;
DBIndexPage.UnlockPage[root]}
};
CheckTreeFinal:
PROC [q: RealIndexHandle, size:
CARDINAL] = {
Checks tree for proper syntactic form.
Note: page depth is 1 for leaf nodes, and for internal tree nodes depth is height
of the tree (max distance from leaves). If the tree is empty, then q.depth=0
and there is NO root node (the root node is allocated/deallocated when the first/last entry
is inserted/deleted).
sonSize: CARDINAL;
prevDB ← DBCommon.NullDBPage;
IF NOT CheckFlag THEN RETURN;
IF q.depth = 0
THEN
IF size # 0
THEN SIGNAL BadBTree
ELSE RETURN;
{root: Page ← GetRootPage[q];
temp: IndexKey;
[sonSize, temp] ← CheckPage[root, NIL];
IF sonSize # size THEN SIGNAL BadBTree;
DBIndexPage.UnlockPage[root]};
IF nextDB # DBCommon.NullDBPage THEN SIGNAL BadBTree
};
CheckPage:
PROC
[page: Page, start: IndexKey] RETURNS [size: CARDINAL, end: IndexKey] = TRUSTED {
Debugging routine: check that this page of index and its descendants ok
er: IndexKey;
son: Page;
i, sonSize: CARDINAL;
size ← 0;
CheckCacheLock[page.db, page.cache];
CheckPageSize[page];
IF page.depth=0 THEN SIGNAL BadBTree;
IF page.depth = 1
THEN
{[size, end] ← CheckLeafPage[page, start]; RETURN};
IF PrintFlag THEN WritePageStat[page];
IF page.pointer.size=0
-- empty pages never permitted
OR page.pointer.size=1 -- only children of internal pages not permitted
OR page.pointer.size>1000 -- probably a subtraction from zero
THEN SIGNAL BadBTree;
CheckInternalEntries[page];
son ← DBIndexPage.GetPage[page.tree, Value[page, 0], page.depth - 1];
[sonSize, end] ← CheckPage[son, start];
size ← size + sonSize;
DBIndexPage.UnlockPage[son];
FOR i
IN [1..page.pointer.size)
DO
er ← Key[page, i];
IF end # NIL AND Greater[end, er] THEN SIGNAL BadBTree;
son ← DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1];
[sonSize, end] ← CheckPage[son, er];
size ← size + sonSize;
DBIndexPage.UnlockPage[son]
ENDLOOP
};
CheckLeafPage:
PROC
[page: Page, start: IndexKey] RETURNS [size: CARDINAL, end: IndexKey] = TRUSTED {
i: CARDINAL;
prevdesc: IndexKey ← start;
left, right: DBCommon.DBPage;
left ← page.pointer.left;
right ← page.pointer.right;
IF left # prevDB THEN SIGNAL BadBTree;
prevDB ← page.db;
nextDB ← right;
size ← 0;
IF PrintFlag THEN WritePageStat[page];
IF page.pointer.size=0 OR page.pointer.size>1000 THEN SIGNAL BadBTree;
CheckLeafEntries[page];
FOR i
IN [0..page.pointer.size)
DO
end ← Key[page, i];
size ← size + 1;
IF prevdesc # NIL AND Greater[prevdesc, end] THEN SIGNAL BadBTree;
prevdesc ← end
ENDLOOP
};
WriteHeaderStat:
PROC [op:
ROPE, item:
ROPE] = {
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
debugStream.PutF["**Op #%g: %g %g\n", IO.int[opCount], IO.rope[op], IO.rope[item]]
};
WritePageStat:
PROC [page: Page] =
TRUSTED {
Write page to debug stream, on one indented line, including up to PrintBreadth entries.
depth: CARDINAL ← page.depth;
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
IF depth>10 THEN SIGNAL TakeALookAtThis;
THROUGH [0..5-depth)
DO
debugStream.PutF[" "];
ENDLOOP;
debugStream.PutF[
"%8bB (%2d):", IO.card[page.db], IO.card[page.pointer.size]];
FOR i:
CARDINAL
IN [0..
MIN[PrintBreadth, page.pointer.size])
DO
WriteEntry[page, i] ENDLOOP;
IF page.pointer.size>PrintBreadth
AND page.pointer.size<100
THEN
{debugStream.PutF[" ... "]; WriteEntry[page, page.pointer.size-1]};
debugStream.PutChar['\n];
};
WriteEntry:
PROC[page: Page, entryNum:
CARDINAL] =
TRUSTED {
entry: ItemHandle← CoreAddr[page, entryNum];
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
IF entryNum#0
OR page.depth=1
THEN
BEGIN -- non-leaf pages have no key for the zero-th entry, so do only leaves & non-0th entries
debugStream.PutChar[' ];
IF entry.length>DBIndex.CoreIndexSize
THEN
debugStream.PutF[" ! entry too long(%g) ! ", IO.card[entry.length]]
ELSE
FOR j:
CARDINAL
IN [0..
MIN[12, entry.length])
DO
WriteChar[debugStream, entry.text[j]] ENDLOOP;
END;
IF PrintInternalValues
AND page.depth#1
OR PrintLeafValues
AND page.depth=1
THEN
debugStream.PutF[" (%bB)", IO.card[entry.value]];
};
WriteKey:
PROC[entry: IndexKey] =
TRUSTED {
For calling from the debugger; print the whole key rather than 1st 10 chars as above
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
debugStream.PutF["(length %g): ", IO.int[entry.length]];
FOR j:
CARDINAL
IN [0..
MIN[300, entry.length])
DO
WriteChar[debugStream, entry.text[j]] ENDLOOP;
debugStream.PutChar['\n];
};
WriteItem:
PROC[entry: ItemHandle] =
TRUSTED {
For calling from the debugger; print the whole key rather than 1st 10 chars as above
debugStream: IO.STREAM← DBCommon.GetDebugStream[];
debugStream.PutF["(length %g): ", IO.int[entry.length]];
FOR j:
CARDINAL
IN [0..
MIN[300, entry.length])
DO
WriteChar[debugStream, entry.text[j]] ENDLOOP;
debugStream.PutChar['\n];
};
WriteChar:
PROC[stream:
IO.
STREAM, char:
CHAR] = {
i: CARDINAL ← LOOPHOLE[char];
IF char>176C OR char<040C THEN stream.PutF["\\\\%g", IO.card[i]]
ELSE stream.PutChar[char]
};
CheckPageSize:
PROC [page: Page] =
TRUSTED {
IF page.pointer.size = 0 THEN SIGNAL BadBTree
};
CheckCacheLock:
PROC [db: DBCommon.DBPage, cache: DBCommon.CacheHandle] = {
IF DBSegment.LockCount[db, cache] # 1 THEN SIGNAL BadBTree;
};
CheckLeafEntries:
PROC [page: Page] =
TRUSTED {
endIndex: LONG POINTER TO UNSPECIFIED ← EndAddr[page];
entry: ItemHandle;
i: CARDINAL;
FOR i
IN [0..page.pointer.size)
DO
entry ← CoreAddr[page, i];
IF @entry.text + (entry.length + 1) / 2 # endIndex THEN SIGNAL BadBTree;
endIndex ← entry
ENDLOOP
};
CheckInternalEntries:
PROC [page: Page] =
TRUSTED {
endIndex: LONG POINTER TO UNSPECIFIED ← EndAddr[page] - 2;
entry: ItemHandle;
i: CARDINAL;
FOR i
IN [1..page.pointer.size)
DO
entry ← CoreAddr[page, i];
IF @entry.text + (entry.length + 1) / 2 # endIndex THEN SIGNAL BadBTree;
endIndex ← entry
ENDLOOP
};
Main body
DBIndexScan.InitScan[]
END.