-- File: DBIndexOpImpl.mesa
-- Contents: Operations on B-Tree leaf pages
-- Last edited by:
--   Suzuki:   2-Apr-81 15:49:32
--   MBrown:  February 27, 1981  9:39 PM
--   Cattell:   September 29, 1983 1:24 pm
--   Willie-Sue:  June 4, 1982 9:40 am

DIRECTORY
  DBCache, 
  DBCommon, 
  DBIndexFilePage, 
  DBIndex, 
  DBIndexOp, 
  DBIndexMod,
  DBIndexPage, 
  DBIndexScan USING [ManipulateScanIndex], 
  DBStorage, 
  DBStorageConcrete, 
  DBStorageInternal, 
  DBStoragePagetags;

DBIndexOpImpl: PROGRAM IMPORTS

  DBIndexFilePage, 
  DBIndexMod,
  DBIndexPage, 
  DBIndexScan
EXPORTS
  DBIndexOp, 
  DBStorage

= BEGIN OPEN DBIndexFilePage;
  
  IndexScanObject: PUBLIC TYPE = DBStorageConcrete.IndexScanObject; 
  IndexScanHandle: TYPE = REF IndexScanObject;
  
  Page: TYPE = DBIndex.Page;
  IndexKey: TYPE = DBIndex.IndexKey;
  ItemHandle: TYPE = DBIndex.ItemHandle;
  State: TYPE = DBIndex.State;
  
  OverHead: CARDINAL = DBIndex.OverHead;
  FullPage: CARDINAL = DBIndex.FullPage;
  HalfPage: CARDINAL = DBIndex.HalfPage; 
  
  -- Signals
  
  KeyIsNotFound: SIGNAL = CODE; 
  OutOfBound: ERROR = CODE; 
  PageOverflow: ERROR = CODE; 
  UnsplittablePage: SIGNAL = CODE;
  ThisLooksSuspicious: SIGNAL = CODE;
  BadBTree: SIGNAL = CODE; 
  
  -- Globals
  
  splitKeyBuffer: IndexKey← NIL;
    -- Used to store the result of MakeSplitKey
  
  -- Procedures
  
  CheckLength: PROC[c: CARDINAL] RETURNS[CARDINAL] =
    -- Use to check that cardinals are reasonable in key lengths and indexes into core.
    {IF c>250 THEN SIGNAL ThisLooksSuspicious; RETURN[c]}; 
  
  
  OverFlow: PUBLIC PROC [p: Page, key: IndexKey] RETURNS [BOOLEAN] = {
    RETURN [FreeSpace[p] < CARDINAL[(key.length + 9) / 2]]
    }; 
  
  EntrySize: PUBLIC PROC [p: Page, i: CARDINAL] RETURNS [CARDINAL] = {
    -- Returns the size of the entry: key plus value
    IF p.depth # 1 AND i = 0 
       THEN RETURN [2] 
       ELSE 
         RETURN [CheckLength[(KeyLength[p, i] + 7) / 2]]
    }; 
  
  KeyLength: PUBLIC PROC [page: Page, index: CARDINAL] RETURNS [CARDINAL] = {
    -- Returns the length of the key of the index-th entry
    p: ItemHandle ← CoreAddr[page, index];
    RETURN [CheckLength[p.length]]
    }; 
  
  MakeSplitKey: PROC [left, right: IndexKey] RETURNS [IndexKey] = {
    -- Assert: left<=right.  Creates a shortest key, s.t. left<=key<=right
    i, length: CARDINAL;
    FOR i IN [0..left.length) DO 
        IF left.text[i] # right.text[i] THEN GO TO Found
        REPEAT
          Found => length ← i + 1;
          FINISHED => length ← left.length;
        ENDLOOP; 
    DBIndexFilePage.FreeKey[splitKeyBuffer];
    splitKeyBuffer← DBIndexFilePage.AllocateIndexKey[length]; 
    FOR i IN [0..length) DO 
        splitKeyBuffer.text[i] ← right.text[i]
        ENDLOOP; 
    RETURN [splitKeyBuffer]
    }; 
  
  KeyInBetween: PROC [page: Page, after: CARDINAL] RETURNS [IndexKey] = {
    -- Creates a split key between page[after] and page[after+1]
    RETURN [MakeSplitKey[Key[page, after], Key[page, after + 1]]]
    }; 
  
  SplitPage: PUBLIC PROC [
    p: Page, loc: CARDINAL, insertSize: INTEGER]
    RETURNS [ splitLoc: CARDINAL, splitKey: IndexKey, overflow: Page] =
    -- Splits page p.  Caller must free splitKey returned.
    BEGIN
    splitLoc ← FindSplitLoc[p, loc, insertSize];
    overflow ← DBIndexPage.CreateEmptyPage[p.tree, p.depth, p.tree.segment];
    splitKey ← DBIndexMod.SplitEntriesToRightPage[from: p, to: overflow, at: splitLoc];
    END;

  FindSplitLoc: PROC [p: Page, loc: CARDINAL, size: INTEGER] RETURNS [CARDINAL] =
    BEGIN
    upper: CARDINAL ← p.pointer.size - 1;
    lower: CARDINAL ← 0;
    middle: CARDINAL;
    compareSize: CARDINAL;
    WHILE upper > lower DO
      middle ← (upper + lower)/2;
      IF middle = lower THEN EXIT;
      IF middle > loc THEN compareSize ← DBIndex.HalfPage - size
      ELSE compareSize ← DBIndex.HalfPage;
      IF FrontSize[p, middle] > compareSize THEN
	     IF FrontSize[p, middle - 1] <= compareSize THEN RETURN[middle + 1]
	     ELSE upper ← middle
      ELSE lower ← middle;
      ENDLOOP;
    IF upper=p.pointer.size-1 THEN RETURN[upper];
    IF lower#0 THEN RETURN[lower];
    ERROR UnsplittablePage;
    END;

  SplitLeaf: PUBLIC PROC[p: Page, key: IndexKey, value: LONG CARDINAL]
       RETURNS [Page, IndexKey] = {
    -- The page overflows if we insert <key, value> pair.  We allocate 
    -- a page to the right of p and moves contents, and insert 
    -- <key, value> at the same time.
    insertIndex: CARDINAL; 
    insertSize: CARDINAL ← CheckLength[(key.length + 9) / 2]; 
    index: CARDINAL ← FindTheLastLeafKey[p, key]; 
    frontSize: CARDINAL ← IF index = 0 THEN 0 ELSE FrontSize[p, index - 1]; 
    -- Returns the size occupied by entries from 0 to index
    overflow: Page; 
    splitIndex: CARDINAL; 
    splitKey: IndexKey; 
    SplitAtTheKey: PROC = {
      splitKey ← MakeSplitKey[Key[p, index - 1], key]; 
      splitKey ← CopyKey[splitKey]; 
      MoveRightAfterSecond[from: p, to: overflow, after: index - 1, key: key, value: value]
      };
    
    overflow ← DBIndexPage.CreateEmptyPage[p.tree, p.depth, p.tree.segment]; 
    DBIndexMod.SetLinks[left: p, new: overflow]; 
    IF frontSize + index + OverHead > HalfPage + 1 OR index >= p.pointer.size THEN 
       {splitIndex ← JustOver[p, HalfPage]; 
        IF splitIndex = index OR splitIndex = index - 1 THEN 
           {SplitAtTheKey[]; 
            RETURN [overflow, splitKey]}; 
        IF splitIndex = p.pointer.size - 1 THEN ERROR PageOverflow; 
        splitKey ← KeyInBetween[p, splitIndex]; 
        splitKey ← CopyKey[splitKey]; 
        insertIndex ← 
          DBIndexMod.MoveRightAndInsert
            [from: p, to: overflow, after: splitIndex, at: index - 1, key: key, value: value]; 
        MoveScanIndexRight[from: p, to: overflow, after: splitIndex]; 
        IncrementScanIndex[overflow, insertIndex]; 
        RETURN [overflow, splitKey]}; 
    IF index # 0 AND frontSize + index + OverHead <= HalfPage + 1 AND 
         frontSize + index + OverHead + insertSize > HalfPage 
       THEN 
         splitIndex ← index - 1 
       ELSE 
         splitIndex ← JustOver[p, HalfPage - insertSize]; 
    IF splitIndex = index - 1 
       THEN 
         splitKey ← MakeSplitKey[key, Key[p, splitIndex + 1]] 
       ELSE 
         splitKey ← KeyInBetween[p, splitIndex]; 
    splitKey ← CopyKey[splitKey]; 
    [] ← DBIndexMod.MoveEntriesToRightLeaf
        [from: p, to: overflow, nentries: p.pointer.size - splitIndex - 1]; 
    DBIndexMod.SlideLeafAt[p: p, index: index, key: key, value: value]; 
    RETURN [overflow, splitKey]
    }; -- SplitLeaf
  
  SplitInternal: PUBLIC PROC
    [p: Page, insertLoc: CARDINAL, key: IndexKey, value: LONG CARDINAL]
    RETURNS [Page, IndexKey] = {
    -- Splits the page p into half and inserts <key, value>.
    -- Returns the <Page, DBPage> of the newlyu created page, which
    -- locates to the right of p, and the key to separates the two pages.
    -- Caller must deallocate IndexKey returned.
    insertSize: INTEGER ← (key.length + 1) / 2 + 3; 
    splitLoc: CARDINAL; 
    splitKey, checkKey: IndexKey; 
    overflow: Page;
    [splitLoc, splitKey, overflow] ← SplitPage[p, insertLoc, insertSize]; 
    IF splitLoc > insertLoc THEN
      [,, checkKey] ← InsertInInternalPage[p, insertLoc, value, key] 
    ELSE 
      [,, checkKey] ← InsertInInternalPage[overflow, insertLoc - splitLoc, value, key]; 
    RETURN [overflow, splitKey]
    }; 
  
  InsertInInternalPage: PUBLIC PROC [
    p: Page, i: CARDINAL, newPage: LONG CARDINAL, newKey: IndexKey]
    RETURNS [State, Page, IndexKey] = {
    -- Works only for an internal node
    -- Insert <newKey, newPage> to the right of index
    -- Caller must always deallocate IndexKey returned.
    overflow: Page; 
    splitKey: IndexKey;
    IF OverFlow[p, newKey] 
       THEN 
         {[overflow, splitKey] ← SplitInternal[p, i, newKey, newPage]; 
          RETURN [split, overflow, splitKey]} 
       ELSE 
         {DBIndexMod.SlideLeafAt[p, i + 1, newKey, newPage]; 
          RETURN [normal, NIL, NIL]}
    }; 
  
  DeleteFromLeafPage: PUBLIC PROC [p: Page, key: IndexKey, tid: DBStorageInternal.TID]
       RETURNS [State, Page, IndexKey] = {
    endIndex, i: CARDINAL; 
    state: State;
    i ← FindTheFirstLeafKey[p, key]; 
    IF i = CheckLength[endIndex ← p.pointer.size] THEN 
       RETURN [deleteFromNextPage, NIL, NIL]; 
    DO IF Less[key, Key[p, i]] THEN ERROR KeyIsNotFound; 
       IF Tid[p, i] = tid THEN 
          {state ← DBIndexMod.RemoveFromLeaf[p, i]; 
           DBIndexPage.CheckTag[p.pointer]; 
           RETURN [state, NIL, NIL]}; 
       i ← i + 1; 
       IF i >= endIndex THEN RETURN [deleteFromNextPage, NIL, NIL]
       ENDLOOP
    }; -- DeleteFromLeafPage
  
  
  MergeInLeafPage: PUBLIC PROC [p: Page, son: Page, index: CARDINAL]
       RETURNS [State, Page, IndexKey] = {
    -- Caller must free IndexKey returned if non-NIL.
    freeSon: CARDINAL ← FreeSpace[son]; 
    i: CARDINAL; 
    newKey, temp1: IndexKey; 
    sizeSon: CARDINAL ← son.pointer.size; 
    state: State; 
    overflow: Page;
    {IF index # p.pointer.size - 1 
        -- balance with the right page
        THEN 
          {right: Page ← DBIndexPage.GetPage[p.tree, Value[p, index + 1], son.depth]; 
           IF FreeSpace[right] + freeSon + OverHead > FullPage THEN 
              {DBIndexMod.MoveEntriesToRightLeaf[from: son, to: right, nentries: sizeSon]; 
               DBIndexMod.RemoveLinks[son]; 
               DBIndexPage.UnlockPage[right]; 
               RETURN [delete, NIL, NIL]}; 
           i ← JustOver[right, HalfPage - FrontSize[son, sizeSon - 1] - sizeSon]; 
           IF i = 0 THEN {DBIndexPage.UnlockPage[right]; RETURN [normal, NIL, NIL]}; 
           newKey ← KeyInBetween[right, i - 1]; 
           DBIndexMod.MoveEntriesToLeftLeaf[from: right, to: son, nentries: i]; 
           DBIndexPage.UnlockPage[right]; 
           index ← index + 1; 
           GO TO Final} 
        ELSE 
          {-- balance with the left page
           left: Page ← DBIndexPage.GetPage[p.tree, Value[p, index - 1], son.depth]; 
           IF FreeSpace[left] + freeSon + OverHead > FullPage THEN 
              {DBIndexMod.MoveEntriesToLeftLeaf[from: son, to: left, nentries: sizeSon]; 
               DBIndexMod.RemoveLinks[son]; 
               DBIndexPage.UnlockPage[left]; 
               RETURN [delete, NIL, NIL]}; 
           i ← JustOver[left, HalfPage]; 
           IF i = left.pointer.size - 1 THEN 
              {DBIndexPage.UnlockPage[left]; RETURN [normal, NIL, NIL]}; 
           newKey ← KeyInBetween[left, i]; 
           DBIndexMod.MoveEntriesToRightLeaf
             [from: left, to: son, nentries: left.pointer.size - i - 1]; 
           DBIndexPage.UnlockPage[left]; 
           GO TO Final}
     EXITS
       Final => 
         {[state, overflow, temp1] ← ChangeKey[p, newKey, index]; 
          RETURN [state, overflow, temp1]}}
    }; 
  
ChangeKey: PUBLIC PROC [p: Page, newKey: IndexKey, index: CARDINAL]
       RETURNS [state: State, overflow: Page, key: IndexKey] = {
    -- Changes index in p to be newKey, splitting p if necessary.
    -- Caller must de-allocate IndexKey returned.
    IF newKey.length>DBIndex.CoreIndexSize THEN ERROR;
    state ← ComputeResultOfKeyChange[p, newKey, index]; 
    SELECT state FROM 
      normal => 
        {DBIndexMod.ReplaceKey[p, newKey, index]; 
         RETURN [normal, NIL, NIL]};
      split => 
        {[overflow, newKey] ← ReplaceKeyAndSplit[p, newKey, index]; 
         RETURN [split, overflow, newKey]};
      merge => 
        {DBIndexMod.ReplaceKey[p, newKey, index]; 
         RETURN [merge, NIL, NIL]}
      ENDCASE => ERROR
    }; 
  
  ComputeResultOfKeyChange: PROC
       [page: Page, key: IndexKey, index: CARDINAL]
       RETURNS [State] = {
    -- The index-th key of page will be replaced by "key".
    -- Returns whether this replacement requires page merge, split, 
    -- or no action
    oldKeyLength: CARDINAL ← KeyLength[page, index]; 
    free: CARDINAL ← FreeSpace[page];
    IF key.length > oldKeyLength 
       THEN 
         IF key.length > oldKeyLength + free 
            THEN RETURN [split] ELSE RETURN [normal] 
       ELSE 
         IF oldKeyLength + free > HalfPage + key.length 
            THEN RETURN [merge] 
            ELSE 
              RETURN [normal]
    }; 
  
  ReplaceKeyAndSplit: PROC
       [p: Page, key: IndexKey, at: CARDINAL] RETURNS [Page, IndexKey] = {
    -- Replaces the key at "at" by "key". This will cause the split of the page "p".
    -- Caller must free the IndexKey returned!
    insertSize: INTEGER ← (key.length + 1) / 2 + 3 - EntrySize[p, at]; 
    splitLoc: CARDINAL; 
    splitKey: IndexKey; 
    overflow: Page;
    IF insertSize>DBIndex.CoreIndexSize THEN ERROR;
    [splitLoc, splitKey, overflow] ←  SplitPage[p, at, insertSize]; 
    IF splitLoc > at THEN
      DBIndexMod.ReplaceKey[p, key, at] 
    ELSE IF splitLoc < at THEN
      DBIndexMod.ReplaceKey[overflow, key, at - splitLoc]
    ELSE -- splitLoc=at, the new split key should just be passed upward
      splitKey← CopyKey[key]; 
    RETURN [overflow, splitKey]
    }; 
  
  MoveScanIndexLeft: PUBLIC PROC [from, to: Page, nentries: CARDINAL] = {
    -- We are moving nentries from "from" to "to"; "from" is to the right of "to".
    -- Adjust any scan indices open on the first nentries of "from", to point to "to".
    MoveLeft: PROC [scan: IndexScanHandle] = {
      IF scan.index < nentries OR to.pointer.size = 0 
         THEN 
           {scan.index ← scan.index + to.pointer.size - nentries; scan.this ← to.db} 
         ELSE 
           scan.index ← scan.index - nentries
      };
    DBIndexScan.ManipulateScanIndex[from, 0, MoveLeft]
    }; 
  
  MoveScanIndexRight: PUBLIC PROC [from, to: Page, after: CARDINAL] = {
    -- Moves any scan index on "from" to "to", if the index is greater than "after".
    Move: PROC [scan: IndexScanHandle] = {
      scan.index ← scan.index - after - 1; scan.this ← to.db
      };
    DBIndexScan.ManipulateScanIndex[from, after + 1, Move]
    }; 
  
  MoveRightAfterSecond: PROC[
    from, to: Page, after: CARDINAL, key: IndexKey, value: LONG CARDINAL] = {
    -- Make the first entry of "to" be <key,value> and move the entries [after..size)
    -- of "from" just after that.  "to" is to the right of "from", and is empty to start with.
    DBIndexMod.InsertTheFirstLeafEntry[to, key, value]; 
    DBIndexMod.MoveEntriesToRightLeafAfter[from, to, from.pointer.size - after - 1]; 
    IncrementScanIndex[to, 0]; -- should be no refs to this new page, but Nori put this here
    }; 
  
  DecrementScanIndex: PUBLIC PROC [page: Page, atOrAfter: CARDINAL] = {
    -- Decrements any effected scan indices by one, if they are on "page" with index 
    -- is greater than "after"
    Decrement: PROC [scan: IndexScanHandle] = {scan.index ← scan.index - 1};
    DBIndexScan.ManipulateScanIndex[page, atOrAfter, Decrement]
    }; 
  
  IncrementScanIndex: PUBLIC PROC [
    page: Page, atOrAfter: CARDINAL, howMuch: CARDINAL← 1] = {
    -- Increments any effected scan indices by howMuch, if they are on "page" with index 
    -- is greater or equal to "atOrAfter"
    Increment: PROC [scan: IndexScanHandle] = {scan.index ← scan.index + howMuch};
    DBIndexScan.ManipulateScanIndex[page, atOrAfter, Increment]
    }; 
  
  END. 


Change Log

Changed the meaning of SlideAt
  by Suzuki: November 24, 1980  1:04 PM

Changed SplitLeaf so that there won't be an overflow even if index=0
  by Suzuki: November 25, 1980  9:08 AM

Changed MoveEntriesToRightLeaf so that it works even if toSize=0
  by Suzuki: November 25, 1980  9:21 AM

Changed offset in MoveEntriesToRightLeaf
  by Suzuki: November 25, 1980  9:52 AM

Changed the definition of offset in MoveEntriesToRightLeaf so that it works even when the entire page is moved
	by Suzuki December 2, 1980  10:08 AM

Changed FindSplitLoc; it now uses FrontSize instead of CoreIndex.
	by Suzuki December 3, 1980  10:07 AM

Changed MoveRightAndInsert; it works now even if key is inserted at the index 0.
	by Suzuki December 4, 1980  10:57 AM

Allocate storage from DBHeapStorage; define SplitKeyBufferWords.
	by MBrown February 27, 1981  6:12 PM
	
All WriteAndFreePage are changed to FreePage.
	by Suzuki  2-Apr-81 14:31:47

Changed SplitLeaf, MoveEntriesToRightLeaf, MoveEntriesToLeftLeaf, Remove, ReplaceKey, SlideAt: WriteLockedPage is added.
	by Suzuki  2-Apr-81 15:52:37

Ran through formatter.
	by Cattell  May 25, 1982 7:17 pm

Added CheckLengths in various places where small numbers should be passed as arguments.  Also adde ThisLooksSuspicious SIGNAL.
	by Cattell May 28, 1982 4:39 pm

Add check in ShiftRight for by=0
	by Willie-Sue June 4, 1982 9:40 am

Removed most of the OPENs at the beginning.  Yuck.  Added some checks on key lengths.
	by Cattell June 19, 1982 3:23 pm

Fixed bug #7!  Both MoveEntriesToRightLeaf and MoveEntriesToLeftLeaf can be called with nentries=0 and an EMPTY from page.  But these die in SizeOfEntries passed fromSize-1 (= 65536 as CARDINAL).  I will still allow call with nentries=0, but only if fromSize=0, and will not call SizeOfEntries till after checking for nentries=0.  Added more consistency checks.  Particularly, CheckPage and CheckLeafPage check for empty pages, which should never be left after an operation has been completed.
	by Cattell July 28, 1982 11:48 am

Oops!  Boo-boo in above fix to MoveEntriesToLeftLeaf, it should pass SizeOfEntries nentries-1, not fromSize-1.
	by Cattell July 29, 1982 12:49 pm

Changed some data structures from POINTERs to REFs.
	by Cattell August 6, 1982 5:01 pm

Made RemoveLinks public.
	by Cattell August 19, 1982 2:18 pm

Major re-organization: moved most procedures to new module DBIndexMod(Impl).  This module never touches B-tree pages directly, now.  The move involved inventing new procedures and re-organizing existing ones for the cleaner interface.
	by Cattell September 22, 1982 8:48 am

B-Tree bug fix #20: In ReplaceKeyAndSplit, called by ChangeKey, called by MergeInInternalPage.  IF the page happens to be be split at exactly the same place that the key was being changed (to reflect the merge of the sons by MergeInInternalPage), then an exception has to be made and the key must be passed up to be changed in the grandparent instead.  So now the IF has three cases, >, <, =.
	by Cattell December 29, 1982 7:37 pm

Moved some more procs here from DBIndexImpl, DBIndexFilePageImpl.
	by Cattell January 17, 1983 6:11 pm

B-Tree bug fix #21:  I don't see how MoveScanIndexLeft could ever have worked!  If scan.index<nentries (i.e., the scan pointed into one of the nentries that were moved to the left brother), was setting scan.index ← scan.index + to.pointer.size, pointing off the end of the page.  Added a few comments to these procedures.  Also, bug fix #22:  If to.pointer.size is zero, and scan.index=nentries, then scan is left pointing off into page about to be deleted!  Added "OR to.pointer.size = 0" to IF statement. 
	by Cattell May 27, 1983 1:51 pm

In conjunction with B-Tree bug fix #22: Changed IncrementScanIndex to have "atOrAfter" and "howMuch" arguments instead of just "at" argument, because DBIndexModImpl.MoveEntriesToRightLeaf needed this.  Also added atOrAfter or after to DecrementScanIndex for symmetry.  Changed all other refs to these routines to be consistent (add one first).

Converted to Cedar 5.0: made splitKeyBuffer allocated by DBIndexFilePage.