<> <> <> <> <> <> <> <> <> <> <> DIRECTORY DBCommon, DBIndex, DBIndexOp, DBIndexMod, DBIndexPage, DBIndexScan, DBSegment, DBStats, DBStorage, DBStorageConcrete, DBStorageTuple USING[TIDOfTuple, ConsTupleObject], IO, RefText, Rope; DBIndexImpl: CEDAR PROGRAM IMPORTS DBCommon, DBSegment, DBStats, DBIndexOp, DBIndexMod, DBIndexPage, DBIndexScan, DBStorageTuple, IO, RefText, Rope EXPORTS DBIndex, DBStorage = BEGIN OPEN DBIndexPage; <> IndexScanObject: PUBLIC TYPE = DBStorageConcrete.IndexScanObject; IndexScanHandle: TYPE = REF IndexScanObject; Page: TYPE = DBIndex.Page; IndexKey: TYPE = DBIndex.IndexKey; Core: TYPE = DBIndex.Core; State: TYPE = DBIndex.State; RealIndexHandle: TYPE = DBIndex.RealIndexHandle; ItemHandle: TYPE = DBIndex.ItemHandle; FullPage: CARDINAL = DBIndex.FullPage; OverHead: CARDINAL = DBIndex.OverHead; ROPE: TYPE = Rope.ROPE; <> BadBTree: SIGNAL = CODE; KeyIsNotFound: ERROR = CODE; TakeALookAtThis: SIGNAL = CODE; -- for debugging <> DeletionOfNonExistentKey: CARDINAL = 201B; ThisIndexWasNotCreated: CARDINAL = 202B; <> CheckFlag: BOOLEAN _ FALSE; -- do checking for illegal B-Tree format after each operation ExtraCheckFlag: BOOLEAN _ FALSE; -- check for unusual but not illegal cases PrintFlag: BOOLEAN _ FALSE; -- turn on debugging output PrintBreadth: CARDINAL _ 4; -- for debug output: # key entries to print per page PrintInternalValues: BOOL _ TRUE; -- for debug output: print values associated with internal keys PrintLeafValues: BOOL _ FALSE; -- for debug output: print values associated with leaf keys opCount: INT _ 0; -- counts number of operations for printing purposes watchForThisKey: ROPE _ NIL; -- prints this key when entered or deleted stopAtOpCount: INT _ 0; -- signals when get to this op count deletionTurnedOn: BOOL _ TRUE; -- this should normally be TRUE, to free B-Tree pages tryRecovery: BOOL _ FALSE; -- tries to recover from bad B-Tree; this should normally be FALSE prevDB, nextDB: DBCommon.DBPage; -- Used only by check programs <> CreateIndex: PUBLIC PROC [x: DBStorage.IndexHandle] = { DBIndexScan.CreateNewRealIndexHandle[DBStorageTuple.TIDOfTuple[x]]; WriteIndexTuple[tuple: x, root: DBCommon.NullDBPage, depth: 0] }; DestroyIndex: PUBLIC PROC [x: DBStorage.IndexHandle] = { <> q: RealIndexHandle _ DBIndexScan.GetOldRealIndexHandle[x]; DestroyTree[q]; DBIndexScan.DestroyIndexHandle[q]; WriteIndexTuple[tuple: x, root: DBCommon.NullDBPage, depth: 0] }; InsertIntoIndex: PUBLIC PROC[x: DBStorage.IndexHandle, k: ROPE, v: DBStorage.TupleHandle] = TRUSTED { newKey: REF TEXT; keyText: REF TEXT = LOOPHOLE[RefText.TrustTextRopeAsText[Rope.Flatten[k]]]; q: RealIndexHandle _ DBIndexScan.GetOldRealIndexHandle[x]; newP, overFlow, root: Page; state: State; tid: DBCommon.TID _ DBStorageTuple.TIDOfTuple[v]; size: CARDINAL _ CheckTreeInit[q, "Inserting", k]; DBStats.Inc[DBIndexInsert]; IF q.depth = 0 THEN { newP _ DBIndexPage.CreateEmptyPage[q, 1, -- the level -- q.segment]; DBIndexMod.InsertTheFirstLeafEntry[newP, keyText, tid]; PutRootPageToIndexHandle[q, newP]; IncrementDepth[q]; DBIndexScan.PutScanIndex[q, newP]; WriteIndexTuple[tuple: x, root: newP.db, depth: 1]; DBIndexPage.UnlockPage[newP] } ELSE { root _ GetRootPage[q]; [state, overFlow, newKey] _ InsertIntoTree[root, keyText, tid]; SELECT state FROM deleteFromNextPage => ERROR KeyIsNotFound; normal => NULL; split => SplitRoot[tuple: x, root: q, overflow: overFlow, splitKey: newKey]; ENDCASE => ERROR; DBIndexPage.UnlockPage[root] }; CheckTreeFinal[q, size + 1] }; DeleteFromIndex: PUBLIC PROC [x: DBStorage.IndexHandle, k: Rope.ROPE, v: DBStorage.TupleHandle] = TRUSTED { tid: DBCommon.TID = DBStorageTuple.TIDOfTuple[v]; q: RealIndexHandle = DBIndexScan.GetOldRealIndexHandle[x]; keyText: REF TEXT = LOOPHOLE[RefText.TrustTextRopeAsText[Rope.Flatten[k]]]; newRoot, overflow, root: Page; state: State; newKey: REF TEXT; size: CARDINAL _ CheckTreeInit[q, "Deleting", k]; DBStats.Inc[DBIndexDelete]; IF q.depth = 0 THEN ERROR DBCommon.InternalError -- Deleting entry from empty index! ELSE {root _ GetRootPage[q]; [state, overflow, newKey] _ DeleteFromTree[root, keyText, tid]; SELECT state FROM deleteFromNextPage => ERROR KeyIsNotFound; normal => DBIndexPage.UnlockPage[root]; merge => TRUSTED { IF root.depth = 1 AND root.pointer.size = 0 THEN <> {IF ExtraCheckFlag THEN SIGNAL TakeALookAtThis; DBIndexPage.DestroyPage[q.segment, root, root.db]; DBIndexScan.DeleteHandle[q]; DBIndexScan.ScanForNullTree[q]; WriteIndexTuple[tuple: x, root: q.rootDB, depth: 0] } ELSE <> IF root.depth > 1 AND root.pointer.size = 1 THEN {IF ExtraCheckFlag THEN SIGNAL TakeALookAtThis; newRoot _ DeleteRoot[root, q.rootDB]; PutRootPageToIndexHandle[q, newRoot]; DecrementDepth[q]; WriteIndexTuple[tuple: x, root: q.rootDB, depth: q.depth]; DBIndexPage.UnlockPage[newRoot] } ELSE DBIndexPage.UnlockPage[root]; }; split => {SplitRoot[tuple: x, root: q, overflow: overflow, splitKey: newKey]; DBIndexPage.UnlockPage[root] } ENDCASE => ERROR }; CheckTreeFinal[q, size - 1] }; -- end of DeleteFromIndex OpenScanIndex: PUBLIC PROC [x: DBStorage.IndexHandle, y: DBStorage.Selection, startPosition: DBStorage.FirstLast] RETURNS [scan: DBStorage.IndexScanHandle] = TRUSTED { index: CARDINAL; page: DBCommon.DBPage; q: RealIndexHandle = DBIndexScan.GetOldRealIndexHandle[x]; size: CARDINAL = CheckTreeInit[q, "Scanning index from", y.lowerBound]; IF NOT y.lowerBoundInfinity AND NOT y.includeLowerBound THEN y.lowerBound _ Rope.Concat[y.lowerBound, "\000"]; IF NOT y.upperBoundInfinity AND y.includeUpperBound THEN y.upperBound _ Rope.Concat[y.upperBound, "\000"]; IF q.depth = 0 THEN {page _ DBCommon.NullDBPage; index _ 0} ELSE IF startPosition = First THEN IF y.lowerBoundInfinity THEN {page _ SearchTheFirstPage[q]; index _ 0} ELSE [page, index] _ Search[q, LOOPHOLE[RefText.TrustTextRopeAsText[Rope.Flatten[y.lowerBound]]]] ELSE IF y.upperBoundInfinity THEN [page, index] _ SearchTheLastPage[q] ELSE [page, index] _ Search[q, LOOPHOLE[RefText.TrustTextRopeAsText[Rope.Flatten[y.upperBound]]]]; scan _ DBIndexScan.CreateScanHandle[q, y, startPosition, page, index]; CheckTreeFinal[q, size] }; NextScanIndex: PUBLIC PROC [x: IndexScanHandle]RETURNS [DBStorage.TupleHandle] = TRUSTED { <> page: Page; tuple: DBStorage.TupleHandle; size: CARDINAL; IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released. size _ CheckTreeInit[x.tree, "NextScanIndex"]; IF x.this = DBCommon.NullDBPage THEN RETURN [NIL]; page _ DBIndexPage.GetPage[x.tree, x.this, 1]; IF x.index >= page.pointer.size THEN { IF page.pointer.right # DBCommon.NullDBPage THEN ERROR; tuple _ NIL } ELSE { key: IndexKey _ Key[page, x.index]; IF NOT x.upperBoundInfinity AND ( x.includeUpperBound AND Less[x.upperBound, key] OR NOT x.includeUpperBound AND LessEq[x.upperBound, key] ) THEN tuple _ NIL ELSE { tuple _ DBStorageTuple.ConsTupleObject[tid: Tid[page, x.index]]; page _ IncrementIndex[x, page] } }; DBIndexPage.UnlockPage[page]; CheckTreeFinal[x.tree, size]; RETURN [tuple] }; NextScanKey: PUBLIC PROC [x: IndexScanHandle]RETURNS [key: DBIndex.IndexKey] = TRUSTED { <> page: Page; IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released. IF x.this = DBCommon.NullDBPage THEN RETURN [NIL]; page _ DBIndexPage.GetPage[x.tree, x.this, 1]; IF x.index >= page.pointer.size THEN { IF page.pointer.right # DBCommon.NullDBPage THEN ERROR; key _ NIL } ELSE { key _ Key[page, x.index]; IF NOT x.upperBoundInfinity AND (x.includeUpperBound AND Less[x.upperBound, key] OR NOT x.includeUpperBound AND LessEq[x.upperBound, key]) THEN { key _ NIL; x.direction _ finished } ELSE x.direction _ next }; RETURN [key] }; PrevScanIndex: PUBLIC PROC[x: IndexScanHandle]RETURNS [tuple: DBStorage.TupleHandle] = { <> page: Page; size: CARDINAL; IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released. size _ CheckTreeInit[x.tree, "PrevScanIndex"]; IF x.this = DBCommon.NullDBPage THEN RETURN [NIL]; page _ DBIndexPage.GetPage[x.tree, x.this, 1]; IF x.index = 0 AND NoPreviousPage[page] THEN tuple _ NIL ELSE { key: IndexKey _ Key[page, x.index]; page _ DecrementIndex[x, page]; IF NOT x.lowerBoundInfinity AND ( x.includeLowerBound AND Greater[x.lowerBound, key] OR NOT x.includeLowerBound AND GreaterEq[x.lowerBound, key] ) THEN tuple _ NIL ELSE tuple _ DBStorageTuple.ConsTupleObject[Tid[page, x.index]] }; DBIndexPage.UnlockPage[page]; CheckTreeFinal[x.tree, size]; RETURN [tuple] }; -- PredScanIndex PrevScanKey: PUBLIC PROC[x: IndexScanHandle] RETURNS [key: DBIndex.IndexKey] = { page: Page; IF x=NIL THEN RETURN[NIL]; -- scans set to NIL when released. IF x.this = DBCommon.NullDBPage THEN RETURN [NIL]; page _ DBIndexPage.GetPage[x.tree, x.this, 1]; IF x.index = 0 AND NoPreviousPage[page] THEN { key _ NIL; x.direction _ finished } ELSE { key _ Key[page, x.index]; IF NOT x.lowerBoundInfinity AND (x.includeLowerBound AND Greater[x.lowerBound, key] OR NOT x.includeLowerBound AND GreaterEq[x.lowerBound, key]) THEN {key _ NIL; x.direction _ finished} ELSE x.direction _ prev }; RETURN [key] }; -- PrevScanKey ThisScanTuple: PUBLIC PROC[x: IndexScanHandle]RETURNS [tuple: DBStorage.TupleHandle] = { IF x.direction = finished THEN RETURN[NIL]; BEGIN page: Page _ IF x.direction = next THEN DBIndexPage.GetPage[x.tree, x.this, 1] ELSE DecrementIndex[x, DBIndexPage.GetPage[x.tree, x.this, 1]]; tuple _ DBStorageTuple.ConsTupleObject[tid: Tid[page, x.index]]; IF x.direction = next THEN page _ IncrementIndex[x, page]; DBIndexPage.UnlockPage[page] END }; IgnoreEntry: PUBLIC PROC[x: IndexScanHandle] = { page: Page = IF x.direction = next THEN DBIndexPage.GetPage[x.tree, x.this, 1] ELSE DecrementIndex[x, DBIndexPage.GetPage[x.tree, x.this, 1]]; DBIndexPage.UnlockPage[page] }; TupleForKey: PUBLIC PROC[x: DBStorage.IndexHandle, key: Rope.ROPE] RETURNS[tuple: DBStorage.TupleHandle] = TRUSTED { q: RealIndexHandle = DBIndexScan.GetOldRealIndexHandle[x]; IF q.rootDB = DBCommon.NullDBPage THEN tuple _ NIL ELSE TRUSTED BEGIN root: Page = GetRootPage[q]; tuplePage: DBCommon.DBPage; thisPage: Page; tupleIndex: CARDINAL; [tuplePage, tupleIndex] _ SearchTree[root, LOOPHOLE[Rope.Flatten[key]]]; thisPage _ DBIndexPage.GetPage[q, tuplePage, q.depth]; IF tupleIndex >= thisPage.pointer.size OR Less[key, Key[thisPage, tupleIndex]] THEN tuple _ NIL ELSE tuple _ DBStorageTuple.ConsTupleObject[Tid[thisPage, tupleIndex]]; DBIndexPage.UnlockPage[thisPage]; DBIndexPage.UnlockPage[root]; END; DBIndexScan.DeleteHandle[q] }; CallAfterFinishTransaction: PUBLIC PROC [s: DBCommon.Segment] = { DBIndexPage.DestroyPageList[s]; DBIndexScan.FinalizeIndexHandle[s]; DBIndexScan.FreeScan[s] }; <> <> InsertIntoTree: PROC[page: Page, key: REF TEXT, value: DBCommon.TID] RETURNS [State, Page, REF TEXT] = { < into the tree emanating from page a[0]<=key. Caller must deallocate IndexKey returned.>> i: CARDINAL; newKey, newSplitKey: REF TEXT; newOverFlow, overFlow: Page; q: Page; state: State; IF page.depth = 1 -- leaf -- THEN { DBStats.Inc[BTreeSearchPage]; IF DBIndexOp.OverFlow[page, key] THEN { [overFlow, newKey] _ DBIndexOp.SplitLeaf[page, key, value]; RETURN [split, overFlow, newKey] } ELSE { SlideLeaf[page, key, value]; RETURN [normal, NIL, NIL] } } ELSE { DBStats.Inc[BTreeSearchPage]; i _ FindTheLastInternalKey[page, key]; <> q _ DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1]; [state, overFlow, newKey] _ InsertIntoTree[q, key, value]; SELECT state FROM normal => { DBIndexPage.UnlockPage[q]; RETURN [normal, NIL, NIL] }; split => { [state, newOverFlow, newSplitKey] _ DBIndexOp.InsertInInternalPage[page, i, overFlow.db, newKey]; DBIndexPage.UnlockPage[q]; DBIndexPage.UnlockPage[overFlow]; RETURN [state, newOverFlow, newSplitKey] } ENDCASE => ERROR} }; SplitRoot: PROC [tuple: DBStorage.TupleHandle, root: RealIndexHandle, overflow: Page, splitKey: REF TEXT] = { <> newP: Page; newP _ DBIndexPage.CreateEmptyPage[root, root.depth + 1, root.segment]; DBIndexMod.InsertTwoEntriesToEmptyPage[newP, root.rootDB, splitKey, overflow.db]; PutRootPageToIndexHandle[root, newP]; IncrementDepth[root]; DBIndexPage.UnlockPage[overflow]; WriteIndexTuple[tuple: tuple, root: newP.db, depth: root.depth]; DBIndexPage.UnlockPage[newP] }; <> DeleteFromTree: PROC[page: Page, key: REF TEXT, tid: DBCommon.TID] RETURNS [State, Page, REF TEXT] = { <> i: CARDINAL; newKey, retKey: REF TEXT; overflow, retOverflow, q: Page; state: State; DBIndexPage.CheckTag[page.pointer]; IF page.depth = 1 THEN { DBStats.Inc[BTreeSearchPage]; [state, ,] _ DBIndexOp.DeleteFromLeafPage[page, key, tid]; DBIndexPage.CheckTag[page.pointer]; RETURN [state, NIL, NIL] }; DBStats.Inc[BTreeSearchPage]; i _ FindTheFirstInternalKey[page, key]; <> DO q _ DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1]; [state, overflow, newKey] _ DeleteFromTree[q, key, tid]; SELECT state FROM deleteFromNextPage => { DBIndexPage.UnlockPage[q]; i _ i + 1; TRUSTED {IF i >= page.pointer.size THEN RETURN [state, overflow, newKey]} }; normal => { DBIndexPage.UnlockPage[q]; RETURN [normal, NIL, NIL]}; merge => { [state, overflow, retKey] _ MergeInThisPage[page, q, i]; RETURN [state, overflow, retKey] }; split => { [state, retOverflow, retKey] _ DBIndexOp.InsertInInternalPage[page, i, overflow.db, newKey]; DBIndexPage.UnlockPage[q]; DBIndexPage.UnlockPage[overflow]; RETURN [state, retOverflow, retKey] } ENDCASE => ERROR ENDLOOP }; -- DeleteFromTree DestroyTree: PROC [q: RealIndexHandle] = { <> DestroyBelow: PROC [tree: RealIndexHandle, root: Page] = { <> db: DBCommon.DBPage; i: CARDINAL; q: Page; IF root.depth = 1 THEN RETURN -- leaves already freed by parent ELSE TRUSTED { FOR i IN [0..root.pointer.size) DO q _ DBIndexPage.GetPage[tree, db _ Value[root, i], root.depth - 1 ! DBIndexPage.BadPage => IF tryRecovery THEN LOOP]; DestroyBelow[tree, q]; DBIndexPage.DestroyPage[tree.segment, q, db] ENDLOOP }; }; -- end DestroyBelow root: Page; IF q.root=NIL AND q.rootDB=0 THEN RETURN; root _ GetRootPage[q]; DestroyBelow[q, root]; DBIndexPage.DestroyPage[q.segment, root, q.rootDB]; q.rootDB _ DBCommon.NullDBPage; q.root _ NIL; q.depth _ 0; q.free _ TRUE }; PutRootPageToIndexHandle: PROC [q: RealIndexHandle, p: Page] = { <> q.rootDB _ p.db; q.root _ p.cache }; IncrementDepth: PROC [q: RealIndexHandle] = {q.depth _ q.depth + 1}; GetRootPage: PROC [q: RealIndexHandle] RETURNS [Page] = { RETURN[DBIndexPage.GetPage[tree: q, db: q.rootDB, level: q.depth]] }; DeleteRoot: PROC [p: Page, r: DBCommon.DBPage] RETURNS [Page] = { <

> son: Page _ DBIndexPage.GetPage[p.tree, Value[p, 0], p.depth - 1]; <> DBIndexPage.DestroyPage[p.tree.segment, p, r]; RETURN [son] }; DecrementDepth: PROC [q: RealIndexHandle] = {q.depth _ q.depth - 1}; IncrementIndex: PROC [x: IndexScanHandle, p: Page] RETURNS [Page] = TRUSTED { <> <> db: DBCommon.DBPage; q: Page; IF x.index >= p.pointer.size - 1 THEN { [q, db] _ GetRightPage[p]; IF q = NIL THEN {x.index _ x.index + 1; RETURN [p]}; DBIndexPage.UnlockPage[p]; x.this _ db; x.index _ 0; RETURN [q] } ELSE { x.index _ x.index + 1; RETURN [p] } }; DecrementIndex: PROC [x: IndexScanHandle, p: Page] RETURNS [Page] = TRUSTED { <> q: Page; db: DBCommon.DBPage; IF x.index # 0 THEN {x.index _ x.index - 1; RETURN [p]}; [q, db] _ GetLeftPage[p]; IF q = NIL THEN RETURN [p]; DBIndexPage.UnlockPage[p]; x.this _ db; x.index _ q.pointer.size - 1; RETURN [q] }; GetRightPage: PROC [p: Page] RETURNS [Page, DBCommon.DBPage] = TRUSTED { <

> rightDB: DBCommon.DBPage; IF (rightDB _ p.pointer.right) = DBCommon.NullDBPage THEN RETURN [NIL, DBCommon.NullDBPage]; RETURN [DBIndexPage.GetPage[p.tree, rightDB, 1], rightDB] }; GetLeftPage: PROC [p: Page] RETURNS [Page, DBCommon.DBPage] = TRUSTED { <

> leftDB: DBCommon.DBPage; IF (leftDB _ p.pointer.left) = DBCommon.NullDBPage THEN RETURN [NIL, DBCommon.NullDBPage]; RETURN [DBIndexPage.GetPage[p.tree, leftDB, 1], leftDB] }; <> SlideLeaf: PROC [p: Page, key: REF TEXT, value: LONG CARDINAL] = { <> index: CARDINAL _ FindTheLastLeafKey[p, key]; DBIndexMod.SlideLeafAt[p, index, key, value] }; MergeInThisPage: PROC [p: Page, son: Page, index: CARDINAL] RETURNS [State, Page, REF TEXT] = { <> state: State; page: Page; key: REF TEXT; IF son.depth = 1 THEN [state, page, key] _ DBIndexOp.MergeInLeafPage[p, son, index] ELSE [state, page, key] _ MergeInInternalPage[p, son, index]; IF state = delete THEN {state _ DBIndexMod.RemoveFromInternal[p, index]; DestroyThisPage[son]} ELSE DBIndexPage.UnlockPage[son]; RETURN [state, page, key] }; MergeInInternalPage: PROC [parent: Page, son: Page, index: CARDINAL] RETURNS [State, Page, REF TEXT] = TRUSTED { <<"son" is an internal page. "son" is the index-th entry in "parent" "son" is sparse, so balance it. Returns normal if son is normal, delete if son needs balancing, split if son needs splitting, Caller must deallocate IndexKey returned.>> freeSon: CARDINAL _ FreeSpace[son]; i: CARDINAL; newKey, tempKey: REF TEXT; sizeSon: CARDINAL _ son.pointer.size; state: State; overflow: Page; <> IF index # parent.pointer.size - 1 THEN {-- balance the son with the right page right: Page _ DBIndexPage.GetPage[parent.tree, Value[parent, index + 1], son.depth]; IF MergableToRightInternal [parent: parent, leftFree: freeSon, rightFree: FreeSpace[right], leftAt: index] THEN { <> DBIndexMod.MoveEntriesToRightInternal[from: son, to: right, key: DBIndexOp.RopeForKey[Key[parent, index + 1]], nentries: sizeSon]; DBIndexPage.UnlockPage[right]; RETURN [delete, NIL, NIL] }; <> i _ JustOver[right, DBIndex.HalfPage - FrontSize[son, sizeSon - 1] - sizeSon]; <> <> IF i = 0 OR i >= right.pointer.size - 1 THEN {DBIndexPage.UnlockPage[right]; RETURN[normal, NIL, NIL]}; -- ... so do nothing tempKey _ DBIndexOp.RopeForKey[Key[right, i]]; DBIndexMod.MoveEntriesToLeftInternal[from: right, to: son, key: DBIndexOp.RopeForKey[Key[parent, index + 1]], nentries: i]; DBIndexPage.UnlockPage[right]; [state, overflow, newKey] _ DBIndexOp.ChangeKey[parent, tempKey, index + 1] } ELSE {-- no right brother: balance with the left left: Page _ DBIndexPage.GetPage[parent.tree, Value[parent, index - 1], son.depth]; IF MergableToLeftInternal [parent: parent, leftFree: FreeSpace[left], rightFree: freeSon, rightAt: index] THEN { <> DBIndexMod.MoveEntriesToLeftInternal[from: son, to: left, key: DBIndexOp.RopeForKey[Key[parent, index]], nentries: sizeSon]; DBIndexPage.UnlockPage[left]; RETURN [delete, NIL, NIL] }; <> i _ JustOver[left, DBIndex.HalfPage]; <> <> IF left.pointer.size = i + 1 OR i = 0 THEN {DBIndexPage.UnlockPage[left]; RETURN[normal, NIL, NIL]}; tempKey _ DBIndexOp.RopeForKey[Key[left, i + 1]]; DBIndexMod.MoveEntriesToRightInternal [from: left, to: son, key: DBIndexOp.RopeForKey[Key[parent, index]], nentries: left.pointer.size - i - 1]; DBIndexPage.UnlockPage[left]; [state, overflow, newKey] _ DBIndexOp.ChangeKey[parent, tempKey, index] }; RETURN [state, overflow, newKey] }; MergableToRightInternal: PROC [parent: Page, leftFree, rightFree: CARDINAL, leftAt: CARDINAL] RETURNS [BOOLEAN] = TRUSTED { <<"left" is the leftAt-th son of "parent" and "right" is just to the right of "left". >> <> key: IndexKey _ Key[parent, leftAt+1]; length: CARDINAL _ (key.length + 3) / 2; <> <<(FullPage - leftFree) + (FullPage - rightFree) - Overhead + length < FullPage>> RETURN [rightFree + leftFree + OverHead > FullPage + length] }; MergableToLeftInternal: PROC [parent: Page, leftFree, rightFree: CARDINAL, rightAt: CARDINAL] RETURNS [BOOLEAN] = TRUSTED { <<"right" is the rightAt-th son of "parent" and "left" is just to the left of "right". Returns TRUE iff left and right can be merged.>> key: IndexKey _ Key[parent, rightAt]; length: CARDINAL _ (key.length + 3) / 2; RETURN [rightFree + leftFree + OverHead > FullPage + length] }; NoPreviousPage: PROC [page: Page] RETURNS [BOOLEAN] = TRUSTED { RETURN [page.pointer.left = DBCommon.NullDBPage] }; DestroyThisPage: PROC [page: Page] = TRUSTED <> BEGIN IF page.pointer.size#0 THEN SIGNAL TakeALookAtThis; IF page.pointer.left#DBCommon.NullDBPage OR page.pointer.right#DBCommon.NullDBPage THEN SIGNAL TakeALookAtThis; -- trying to destroy leaf without calling DBIndexOp.RemoveLinks IF deletionTurnedOn THEN DBIndexPage.DestroyPage[page.tree.segment, page, page.db] ELSE DBIndexPage.UnlockPage[page]; END; <> Index: PROC [x: IndexScanHandle] RETURNS [CARDINAL] = {RETURN [x.index]}; <> Search: PROC [r: RealIndexHandle, y: REF TEXT] RETURNS [DBCommon.DBPage, CARDINAL] = { <> p: Page _ GetRootPage[r]; db: DBCommon.DBPage; i: CARDINAL; [db, i] _ SearchTree[p, y]; DBIndexPage.UnlockPage[p]; RETURN [db, i] }; SearchTree: PROC [page: Page, key: REF TEXT] RETURNS [db: DBCommon.DBPage, i: CARDINAL] = { <> son: Page; temp: DBCommon.DBPage; GotoRightKey: PROC = TRUSTED { < if there is a right page. Do nothing is there is no right page>> core: LONG POINTER TO Core _ page.pointer; IF core.right = DBCommon.NullDBPage THEN RETURN; db _ core.right; i _ 0 }; IF page.depth = 1 THEN -- leaf -- {db _ page.db; i _ FindTheFirstLeafKey[page, key]; TRUSTED {IF i = page.pointer.size THEN GotoRightKey[] }; RETURN [db, i]}; i _ FindTheFirstInternalKey[page, key]; son _ DBIndexPage.GetPage[page.tree, temp _ Value[page, i], page.depth - 1]; [db, i] _ SearchTree[son, key]; DBIndexPage.UnlockPage[son]; RETURN [db, i] }; SearchTheFirstPage: PROC [r: RealIndexHandle] RETURNS [DBCommon.DBPage] = { p: Page; db: DBCommon.DBPage; IF r.depth = 1 THEN RETURN [r.rootDB]; p _ GetRootPage[r]; db _ SearchTheFirstTree[p]; DBIndexPage.UnlockPage[p]; RETURN [db] }; SearchTheFirstTree: PROC [p: Page] RETURNS [DBCommon.DBPage] = { db: DBCommon.DBPage; IF p.depth = 2 THEN RETURN [Value[p, 0]] ELSE { son: Page _ DBIndexPage.GetPage[p.tree, Value[p, 0], p.depth - 1]; db _ SearchTheFirstTree[son]; DBIndexPage.UnlockPage[son]; RETURN [db] } }; SearchTheLastPage: PROC [r: RealIndexHandle] RETURNS [DBCommon.DBPage, CARDINAL] = { p: Page _ GetRootPage[r]; db: DBCommon.DBPage; i, s: CARDINAL; IF r.depth = 1 THEN TRUSTED {s _ p.pointer.size; DBIndexPage.UnlockPage[p]; RETURN [r.rootDB, s]}; [db, i] _ SearchTheLastTree[p]; DBIndexPage.UnlockPage[p]; RETURN [db, i] }; SearchTheLastTree: PROC [p: Page] RETURNS [DBCommon.DBPage, CARDINAL] = TRUSTED { l: CARDINAL _ p.depth; s: CARDINAL _ p.pointer.size; index, sonSize: CARDINAL; sonDB: DBCommon.DBPage _ Value[p, s - 1]; son: Page; db: DBCommon.DBPage; son _ DBIndexPage.GetPage[p.tree, sonDB, l - 1]; IF l = 2 THEN { sonSize _ son.pointer.size; DBIndexPage.UnlockPage[son]; RETURN [sonDB, sonSize] } ELSE { [db, index] _ SearchTheLastTree[son]; DBIndexPage.UnlockPage[son]; RETURN [db, index] } }; <> WriteIndexTuple: PROC [tuple: DBStorage.TupleHandle, root: DBCommon.DBPage, depth: CARDINAL] = { <> tid: DBCommon.TID = DBStorageTuple.TIDOfTuple[tuple]; page: DBCommon.DBPage = DBCommon.DecomposeTID[tid].page; segment: DBCommon.SegmentID = DBSegment.SegmentIDFromDBPage[page]; ttr: DBIndexPage.TupleTree = NEW[DBIndexPage.TupleTreeRecord_ [root, depth, tid, segment]]; DBIndexPage.WriteIndexObject[tuple, ttr] }; <> CheckTreeInit: PROC [q: RealIndexHandle, op: ROPE, item: ROPE _ NIL] RETURNS [size: CARDINAL] = { prevDB _ DBCommon.NullDBPage; IF NOT CheckFlag THEN RETURN; opCount_ opCount+1; IF opCount=stopAtOpCount THEN SIGNAL TakeALookAtThis; IF watchForThisKey#NIL AND watchForThisKey.Equal[item.Substr[0, watchForThisKey.Length[]]] THEN WriteHeaderStat[Rope.Cat["******Monitored key: ", op], item]; IF PrintFlag THEN WriteHeaderStat[op, item]; IF q.depth = 0 THEN RETURN [0] ELSE { root: Page _ GetRootPage[q]; temp: IndexKey; savePrintingFlag: BOOL_ PrintFlag; PrintFlag_ FALSE; -- only do printing during CheckTreeFinal, not CheckTreeInit [size, temp] _ CheckPage[root, NIL]; PrintFlag_ savePrintingFlag; DBIndexPage.UnlockPage[root] } }; CheckTreeFinal: PROC [q: RealIndexHandle, size: CARDINAL] = { <> sonSize: CARDINAL; prevDB _ DBCommon.NullDBPage; IF NOT CheckFlag THEN RETURN; IF q.depth = 0 THEN IF size # 0 THEN SIGNAL BadBTree ELSE RETURN; {root: Page _ GetRootPage[q]; temp: IndexKey; [sonSize, temp] _ CheckPage[root, NIL]; IF sonSize # size THEN SIGNAL BadBTree; DBIndexPage.UnlockPage[root]}; IF nextDB # DBCommon.NullDBPage THEN SIGNAL BadBTree }; CheckPage: PROC [page: Page, start: IndexKey] RETURNS [size: CARDINAL, end: IndexKey] = TRUSTED { <> er: IndexKey; son: Page; i, sonSize: CARDINAL; size _ 0; CheckCacheLock[page.db, page.cache]; CheckPageSize[page]; IF page.depth=0 THEN SIGNAL BadBTree; IF page.depth = 1 THEN {[size, end] _ CheckLeafPage[page, start]; RETURN}; IF PrintFlag THEN WritePageStat[page]; IF page.pointer.size=0 -- empty pages never permitted OR page.pointer.size=1 -- only children of internal pages not permitted OR page.pointer.size>1000 -- probably a subtraction from zero THEN SIGNAL BadBTree; CheckInternalEntries[page]; son _ DBIndexPage.GetPage[page.tree, Value[page, 0], page.depth - 1]; [sonSize, end] _ CheckPage[son, start]; size _ size + sonSize; DBIndexPage.UnlockPage[son]; FOR i IN [1..page.pointer.size) DO er _ Key[page, i]; IF end # NIL AND NOT LessEq[RefText.TrustTextAsRope[DBIndexOp.RopeForKey[er]], end] THEN SIGNAL BadBTree; son _ DBIndexPage.GetPage[page.tree, Value[page, i], page.depth - 1]; [sonSize, end] _ CheckPage[son, er]; size _ size + sonSize; DBIndexPage.UnlockPage[son] ENDLOOP }; CheckLeafPage: PROC [page: Page, start: IndexKey] RETURNS [size: CARDINAL, end: IndexKey] = TRUSTED { i: CARDINAL; prevdesc: IndexKey _ start; left, right: DBCommon.DBPage; left _ page.pointer.left; right _ page.pointer.right; IF left # prevDB THEN SIGNAL BadBTree; prevDB _ page.db; nextDB _ right; size _ 0; IF PrintFlag THEN WritePageStat[page]; IF page.pointer.size=0 OR page.pointer.size>1000 THEN SIGNAL BadBTree; CheckLeafEntries[page]; FOR i IN [0..page.pointer.size) DO end _ Key[page, i]; size _ size + 1; IF prevdesc # NIL AND NOT LessEq[RefText.TrustTextAsRope[DBIndexOp.RopeForKey[end]], prevdesc] THEN SIGNAL BadBTree; prevdesc _ end ENDLOOP }; WriteHeaderStat: PROC [op: ROPE, item: ROPE] = { debugStream: IO.STREAM_ DBCommon.GetDebugStream[]; debugStream.PutF["**Op #%g: %g %g\n", IO.int[opCount], IO.rope[op], IO.rope[item]] }; WritePageStat: PROC [page: Page] = TRUSTED { <> depth: CARDINAL _ page.depth; debugStream: IO.STREAM_ DBCommon.GetDebugStream[]; IF depth>10 THEN SIGNAL TakeALookAtThis; THROUGH [0..5-depth) DO debugStream.PutF[" "]; ENDLOOP; debugStream.PutF["%8bB (%2d):", IO.card[page.db], IO.card[page.pointer.size]]; FOR i: CARDINAL IN [0..page.pointer.size) DO WriteEntry[page, i] ENDLOOP; debugStream.PutChar['\n]; }; WriteEntry: PROC[page: Page, entryNum: CARDINAL] = TRUSTED { entry: ItemHandle = CoreAddr[page, entryNum]; debugStream: IO.STREAM = DBCommon.GetDebugStream[]; IF entryNum#0 OR page.depth=1 THEN BEGIN -- non-leaf pages have no key for the zero-th entry, so do only leaves & non-0th entries debugStream.PutChar[' ]; IF entry.length>DBIndex.CoreIndexSize THEN debugStream.PutF[" ! entry too long(%g) ! ", IO.card[entry.length]] ELSE FOR j: CARDINAL IN [0..entry.length) DO WriteChar[debugStream, entry.text[j]] ENDLOOP; END; IF PrintInternalValues AND page.depth#1 OR PrintLeafValues AND page.depth=1 THEN debugStream.PutF[" (%bB)", IO.card[entry.value]]; }; WriteKey: PROC[entry: IndexKey] = TRUSTED { <> debugStream: IO.STREAM_ DBCommon.GetDebugStream[]; debugStream.PutF["(length %g): ", IO.int[entry.length]]; FOR j: CARDINAL IN [0..entry.length) DO WriteChar[debugStream, entry.text[j]] ENDLOOP; debugStream.PutChar['\n]; }; WriteItem: PROC[entry: ItemHandle] = TRUSTED { <> debugStream: IO.STREAM_ DBCommon.GetDebugStream[]; debugStream.PutF["(length %g): ", IO.int[entry.length]]; FOR j: CARDINAL IN [0..entry.length) DO WriteChar[debugStream, entry.text[j]] ENDLOOP; debugStream.PutChar['\n]; }; WriteChar: PROC[stream: IO.STREAM, char: CHAR] = { i: CARDINAL = LOOPHOLE[char]; IF char>176C OR char<040C THEN stream.PutF["\\\\%g", IO.card[i]] ELSE stream.PutChar[char] }; CheckPageSize: PROC [page: Page] = TRUSTED { IF page.pointer.size = 0 THEN SIGNAL BadBTree }; CheckCacheLock: PROC [db: DBCommon.DBPage, cache: DBCommon.CacheHandle] = { IF DBSegment.LockCount[db, cache] # 1 THEN SIGNAL BadBTree; }; CheckLeafEntries: PROC [page: Page] = TRUSTED { endIndex: LONG POINTER TO UNSPECIFIED _ EndAddr[page]; entry: ItemHandle; i: CARDINAL; FOR i IN [0..page.pointer.size) DO entry _ CoreAddr[page, i]; IF @entry.text + (entry.length + 1) / 2 # endIndex THEN SIGNAL BadBTree; endIndex _ entry ENDLOOP }; CheckInternalEntries: PROC [page: Page] = TRUSTED { endIndex: LONG POINTER TO UNSPECIFIED _ EndAddr[page] - 2; entry: ItemHandle; i: CARDINAL; FOR i IN [1..page.pointer.size) DO entry _ CoreAddr[page, i]; IF @entry.text + (entry.length + 1) / 2 # endIndex THEN SIGNAL BadBTree; endIndex _ entry ENDLOOP }; <

> DBIndexScan.InitScan[]; END. Change Log: Added to check page tag in CreateRootPage by Suzuki November 24, 1980 9:06 AM Added to PutScanIndex in InsertIntoIndex by Suzuki November 24, 1980 10:55 AM Changed DeleteFromIndex so that if there is no element as the result of removing, the tree is destroyed by Suzuki November 24, 1980 10:55 AM Changed OpenScanIndex so that if the tree is empty, just create RealScanObject by Suzuki November 24, 1980 10:55 AM Added NormalizeKey so that the last byte of the last word of the string is 0 if the length is odd by Suzuki November 24, 1980 4:40 PM Added a statement to set core.size in InsertTwoEntriesToEmptyPage by Suzuki November 25, 1980 8:35 AM Added CheckTreeInit & CheckTreeFinal by Suzuki November 28, 1980 4:19 PM Changed the definition of offset in MoveEntriesToRightInternal so that it works even when the entire page is moved by Suzuki December 2, 1980 10:08 AM Renamed Split to SplitInternal by Suzuki: December 3, 1980 10:14 AM Allocate storage from DBHeapStorage, define FreeIndexKey, use CWF for printing. by MBrown: February 27, 1981 5:56 PM Changed DBIndexPage.WriteAndUnlockPage to DBIndexPage.UnlockPage. Added WriteLockedPage in MoveEntriesTORightInternal and MoveEntriesToLeftInternal. by Suzuki: 2-Apr-81 15:41 Converted to Cedar by Cattell: 6-Jul-81 19:12:36 Use IOStream for printing. by Cattell: 11-Dec-81 10:06:11 DeleteFromIndex should generate ERROR KeyIsNotFound when state = deleteFromNextPage. Also, ran through mesa formatter because the TABs screwed up Tioga's formatting. by Cattell: May 6, 1982 6:09 pm DestroyTree didn't work when all the entries had been deleted from the tree, because the root page simply disappears in this case and GetRootPage does a DBSegment.ReadPage[0, NIL] (why doesn't ReadPage complain?). GetRootPage failed on pointer.tag.pageTag # DBStoragePagetags.BTree check. by Cattell: May 25, 1982 6:27 pm Put in checks for empty pages because zero-size pages aren't being cleaned up, and added other debugging checks. Added some comments to procs, will add more. by Cattell: May 27, 1982 11:01 am Added set of printing routines, extended WritePageStat to print individual keys and values. Added global variables for various debugging and printing flags. Removed most of the 8 OPENs (!) in the module header. Made printing procedures more robust. Check for unusually large keys. by Cattell: June 19, 1982 1:30 pm Fixed two bugs in MergeInInternalPage. Before calling MergeEntriesToLeftInternal, must COPY Key[right, i] else the bits move out from under it and a garbage key is later stored in a parent page. Also, a similar problem when calling MergeEntriesToRightInternal. I'm not sure keys are always being DE-allocated when they are allocated, but I traced up all the potential lines of call from MergeInInternalPage, InsertInThisPage, DeleteFromTree, etc., and found no error assuming the procedures in DBIndexInternalImpl are correct. Added more comments, this module still badly needs reasonable documentation. by Cattell: June 20, 1982 6:15 pm Changed by Willie-Sue on June 25, 1982 9:21 am < IO>> Changed by Willie-Sue on July 7, 1982 11:01 am <> Changed by Cattell on July 26, 1982 1:03 pm <> Changed by Cattell on July 28, 1982 12:25 pm <> Changed by Cattell on August 2, 1982 12:34 pm <= right.pointer.size - 1, but its intentions are still obscure.>> Changed by Cattell on August 2, 1982 12:55 pm <<#9.5: Need to do UnlockPage[left] when return early in MergeInInternalPage as described above.>> Changed by Cattell on August 4, 1982 9:14 am <> Changed by Cattell on August 5, 1982 5:55 pm <> Changed by Cattell on August 6, 1982 4:53 pm <> Changed by Cattell on August 18, 1982 1:00 pm <> Changed by Cattell on August 19, 1982 8:13 am <> Changed by Cattell on August 19, 1982 10:26 am <> Changed by Cattell on August 19, 1982 7:03 pm <> Changed by Cattell on August 20, 1982 12:15 pm <> Changed by Cattell on September 21, 1982 8:45 pm <> Changed by Cattell on September 23, 1982 10:28 am <> Changed by Cattell on January 17, 1983 6:20 pm <> Changed by Willie-Sue on February 18, 1985 <>