-- File: DBStorageImplB.mesa
-- Last edited by:
--  MBrown on December 3, 1982 12:05 pm
--  Cattell on 26-Sep-81 13:58:54
--   Willie-Sue on June 24, 1982 12:20 pm
-- Last Edited by: Cattell, January 16, 1983 11:43 am

  DIRECTORY
    DBCache USING [CacheHandle],
    DBCommon USING [DBPage],
    DBEnvironment,
    DBHeapStorage USING [Node],
    DBStats USING [Inc],
    DBSegment USING [AllocPage, FreePage, ObtainCoreLoc, ReadPage, WritePage,
      WriteLockedPage, UnlockPage, SegmentIDFromDBPage],
    DBStorage USING [FieldHandle, TuplesetHandle, MaxSystemTupleID, TupleHandle],
    DBStorageExtTuple USING [TupleBody],
    DBStorageField USING [CheckFieldHandleType, FieldOffset, NWordsInField, GroupIDOfField],
    DBStorageGroup USING [TIDFieldBody, TIDFieldPart, GroupList, GroupListEntry, HeadFieldPart],
    DBStoragePagetags USING [Tuple, SystemTuple, AssertPageIsAnyTuplePage,
      AssertPageIsOverflowTuplePage, OverflowTuple],
    DBStoragePrivate USING [TupleFromTID],
    DBStorageString USING [EString, IString, LString, SizeOfNullEString, SizeOfNullIString],
    DBStorageTID USING [TID, NullTID, TIDSlotMask, ConsTID, DecomposeTID],
    DBStorageTSDict USING [NEntries, TSDict, TSDictSlotIndex],
    DBStorageTuple USING [ConsTupleObject],
    DBStorageVec USING [TagOfPage, HighSlotIndexOfPage, SetTypeOfSlot, TypeOfSlot, VecOfSlot,
      LengthOfVec, VecPage, AllocVec, ModifyVec, FreeVec, WordsInLargestAllocableVec,
      InitializeVecPage],
    DBStorageVectags,
    DBTuplesConcrete USING [TupleObject],
    Inline USING [LongCOPY, LowHalf],
    Rope USING [ROPE, Fetch, Size, Text],
    RopeInline USING [NewText];

DBStorageImplB: PROGRAM
  IMPORTS
    DBEnvironment,
    DBHeapStorage,
    DBSegment,
    DBStats,
    DBStorageField,
    DBStoragePagetags,
    DBStoragePrivate,
    DBStorageTID,
    DBStorageTSDict,
    DBStorageTuple, 
    DBStorageVec,
    DBStorageVectags,
    I: Inline,
    Rope,
    RopeInline
  EXPORTS
    DBEnvironment,
    DBStorage,
    DBStorageGroup,
    DBStoragePrivate
  SHARES
    Rope
  = BEGIN OPEN DBCommon, DBEnvironment, DBStorageTID;

  -- This module exports part of DBStorage: field reading/writing, excluding groups.
  -- It exports low-level support procs to DBStorageGroup for group field reading/writing.

  --  PROBLEMS, THINGS YET TO BE DONE:
  --  This version allows no overflow of tuples, only strings.


  DoChecks: BOOLEAN = TRUE;

  -- Type exported to DBTuplesOpaque (thence to DBStorage.)
  TupleObject: PUBLIC TYPE = DBTuplesConcrete.TupleObject;
  TupleHandle: TYPE = REF TupleObject;
  StoredTupleHandle: TYPE = REF TupleObject.stored;

  --    Public procedures: Read field

  ReadTupleset: PUBLIC PROC[x: TupleHandle] RETURNS[DBStorage.TuplesetHandle] = {
    -- Returns a tuple handle for the tuple representing x's tupleset.
    xs: StoredTupleHandle = NARROW[x];
    tsh: TupleHandle;
    dbPagePtr: LONG POINTER;
    s: CARDINAL;
    systemTuple: BOOLEAN;
    DBStats.Inc[StorageReadTupleset];
    BEGIN--read in the page containing x
      dbPage: DBCommon.DBPage;
      [dbPage, s] ← DecomposeTID[xs.tid];
      -- s is now the slot index of tuple x
      [xs.cacheHint, dbPagePtr] ← DBSegment.ReadPage[dbPage, xs.cacheHint];
    END;
    -- always check the page tag before doing an access...
    SELECT DBStorageVec.TagOfPage[LOOPHOLE[dbPagePtr]] FROM
      DBStoragePagetags.Tuple => systemTuple ← FALSE;
      DBStoragePagetags.SystemTuple => systemTuple ← TRUE;
    ENDCASE => ERROR DBEnvironment.InternalError; -- [BadPageTag];
    s ← DBStorageVec.TypeOfSlot[dbPagePtr, s];
    -- s is now the "local tuplesetID" of tuple x
    BEGIN--do the lookup in the tupleset dictionary
      tsDict: LONG POINTER TO DBStorageTSDict.TSDict
       ← LOOPHOLE[DBStorageVec.VecOfSlot[dbPagePtr, DBStorageTSDict.TSDictSlotIndex]];
      IF DoChecks AND s NOT IN [1..DBStorageTSDict.NEntries[tsDict]] THEN
        ERROR DBEnvironment.InternalError; -- [BadVecTag];
        -- when indirect tuples are allowed, this must change to look in the indirect
        --tuple for the local tuplesetID.
      tsh ← IF systemTuple THEN DBStoragePrivate.TupleFromTID[tsDict.seq[s].tuplesetID]
            ELSE DBStorageTuple.ConsTupleObject[tid: tsDict.seq[s].tuplesetID, cacheHint: NIL];
    END;
    DBSegment.UnlockPage[xs.cacheHint];
    RETURN[tsh];
  };--ReadTupleset

  Read1Word: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle]
   RETURNS[CARDINAL] = {
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    result: CARDINAL;
    DBStats.Inc[StorageReadField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBStorageField.CheckFieldHandleType[f, OneWord];
    CheckFieldAccess[tupleBase, f];
    result ← ReadCardinal[tupleBase + DBStorageField.FieldOffset[f]];
    DBSegment.UnlockPage[cacheHint];
    RETURN[result];
  };--Read1Word

  Read2Word: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle]
   RETURNS[LONG CARDINAL] = {
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    result: LONG CARDINAL;
    DBStats.Inc[StorageReadField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBStorageField.CheckFieldHandleType[f, TwoWord];
    CheckFieldAccess[tupleBase, f];
    result ← ReadLongCardinal[tupleBase + DBStorageField.FieldOffset[f]];
    DBSegment.UnlockPage[cacheHint];
    RETURN[result];
  };--Read2Word

  ReadNWord: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle]
   RETURNS[POINTER TO UNSPECIFIED] = {
    -- Result points to an n-word record, where n is encoded in f.
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    result: POINTER TO UNSPECIFIED;
    DBStats.Inc[StorageReadField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBStorageField.CheckFieldHandleType[f, NWord];
    CheckFieldAccess[tupleBase, f];
    result ← ReadRecord[tupleBase + DBStorageField.FieldOffset[f],
                        DBStorageField.NWordsInField[f]];
    DBSegment.UnlockPage[cacheHint];
    RETURN[result];
  };--ReadNWord

  GetNWordBase: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle]
   RETURNS[LONG POINTER, DBCache.CacheHandle] = {
    -- EXPORTed to DBStoragePrivate.
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    DBStats.Inc[StorageReadField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBStorageField.CheckFieldHandleType[f, NWord];
    CheckFieldAccess[tupleBase, f];
    RETURN[tupleBase + DBStorageField.FieldOffset[f], cacheHint];
  };--GetNWordBase

  ReadVarByte:
   PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle] RETURNS[Rope.ROPE] = {
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    dbPage: DBPage; pagePtr: LONG POINTER TO DBStorageVec.VecPage;
    result: Rope.ROPE;
    DBStats.Inc[StorageReadField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    --temporary crock
    [dbPage,] ← DecomposeTID[x.tid];  pagePtr ← DBSegment.ObtainCoreLoc[dbPage, cacheHint];
    DBStorageField.CheckFieldHandleType[f, VarByte];
    CheckFieldAccess[tupleBase, f];
    result ← ReadString[pagePtr, LOOPHOLE[tupleBase + DBStorageField.FieldOffset[f]],
                        DBStorageField.NWordsInField[f]];
    DBSegment.UnlockPage[cacheHint];
    RETURN[result];
  };--ReadVarByte

  --    Public procedures: Write field

  Write1Word:
   PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle, val: CARDINAL] = {
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    DBStats.Inc[StorageWriteField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBSegment.WriteLockedPage[cacheHint];
    DBStorageField.CheckFieldHandleType[f, OneWord];
    CheckFieldAccess[tupleBase, f];
    WriteCardinal[tupleBase + DBStorageField.FieldOffset[f], val];
    DBSegment.UnlockPage[cacheHint];
  };--Write1Word

  Write2Word:
   PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle, val: LONG CARDINAL] = {
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    DBStats.Inc[StorageWriteField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBSegment.WriteLockedPage[cacheHint];
    DBStorageField.CheckFieldHandleType[f, TwoWord];
    CheckFieldAccess[tupleBase, f];
    WriteLongCardinal[tupleBase + DBStorageField.FieldOffset[f], val];
    DBSegment.UnlockPage[cacheHint];
  };--Write2Word

  WriteNWord: PUBLIC PROC
   [x: TupleHandle, f: DBStorage.FieldHandle, val: POINTER TO UNSPECIFIED] = {
    --  Last parm points to an n-word record, where n is encoded in f.
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    DBStats.Inc[StorageWriteField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBSegment.WriteLockedPage[cacheHint];
    DBStorageField.CheckFieldHandleType[f, NWord];
    CheckFieldAccess[tupleBase, f];
    WriteRecord[tupleBase + DBStorageField.FieldOffset[f],
                DBStorageField.NWordsInField[f], val];
    DBSegment.UnlockPage[cacheHint];
  };--WriteNWord

  WriteVarByte:
   PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle, val: Rope.ROPE] = {
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    dbPage: DBPage;  slot: CARDINAL;  pagePtr: LONG POINTER TO DBStorageVec.VecPage;
    DBStats.Inc[StorageWriteField];
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    --temporary crock
    [dbPage, slot] ← DecomposeTID[x.tid];  pagePtr ← DBSegment.ObtainCoreLoc[dbPage, cacheHint];
    DBSegment.WriteLockedPage[cacheHint];
    DBStorageField.CheckFieldHandleType[f, VarByte];
    CheckFieldAccess[tupleBase, f];
    WriteString[dbPage, pagePtr, slot, DBStorageField.FieldOffset[f],
                DBStorageField.NWordsInField[f], val];
    DBSegment.UnlockPage[cacheHint];
  };--WriteVarByte

  --    Primitives for the above


  ReadableTupleBase: PROC[x: TupleHandle]
   RETURNS[--tupleBase-- LONG POINTER TO DBStorageExtTuple.TupleBody,
           -- cacheHint-- DBCache.CacheHandle] = {
    -- Returns tupleBase, a pointer to the body of tuple x (in the cache, locked
    --readonly), and cacheHint, a CacheHandle for the page containing this pointer (so
    --that the page can be unlocked once data has been read from the tuple).  Has
    --the side effect of updating the CacheHandle in x.
    -- Intended for use only on real tuples (checks for slot type in range).
    -- Called from: Read1Word, Read2Word, ReadNWord, ReadVarWord, ReadVarByte.
    xs: StoredTupleHandle = NARROW[x];
    dbPage: DBCommon.DBPage;
    dbPagePtr: LONG POINTER;
    slotIndex: CARDINAL;
    slotType: CARDINAL;
    [dbPage, slotIndex] ← DecomposeTID[xs.tid];
    [xs.cacheHint, dbPagePtr] ← DBSegment.ReadPage[dbPage, xs.cacheHint];
    -- we always check the page tag before doing an access...
    DBStoragePagetags.AssertPageIsAnyTuplePage[dbPagePtr];
    slotType ← DBStorageVec.TypeOfSlot[dbPagePtr, slotIndex];
    SELECT slotType FROM
      IN [1..DBStorageVectags.MaxTuplesetPerPage] => BEGIN
        RETURN[LOOPHOLE[DBStorageVec.VecOfSlot[dbPagePtr, slotIndex]], xs.cacheHint];
      END;
      DBStorageVectags.IndTupleType => BEGIN
        ERROR DBEnvironment.InternalError; -- [Unknown];
      END;
    ENDCASE => ERROR DBEnvironment.InternalError; -- [BadVecTag];
  };--ReadableTupleBase

  CheckFieldAccess: PROC
   [t: LONG POINTER TO DBStorageExtTuple.TupleBody, f: DBStorage.FieldHandle] = {
    -- Complains if an access to field f will extend past the end of the tuple t's field
    --data area.
    IF DBStorageField.FieldOffset[f] + DBStorageField.NWordsInField[f] > t.groupOffset THEN
      ERROR DBEnvironment.InternalError; -- [BadFieldHandle];
  };--CheckFieldAccess

  ReadCardinal: PROC[p: LONG POINTER] RETURNS[CARDINAL] = --INLINE-- {
    RETURN[LOOPHOLE[p,LONG POINTER TO CARDINAL]↑];
  };--ReadCardinal

  WriteCardinal: PROC[p: LONG POINTER, v: CARDINAL] = --INLINE-- {
    LOOPHOLE[p,LONG POINTER TO CARDINAL]↑ ← v;
  };--WriteCardinal


  ReadLongCardinal: PROC[p: LONG POINTER] RETURNS[LONG CARDINAL] = --INLINE-- {
    RETURN[LOOPHOLE[p,LONG POINTER TO LONG CARDINAL]↑];
  };--ReadLongCardinal

  WriteLongCardinal: PROC[p: LONG POINTER, v: LONG CARDINAL] = --INLINE-- {
    LOOPHOLE[p,LONG POINTER TO LONG CARDINAL]↑ ← v;
  };--WriteLongCardinal


  ReadRecord: PROC [p: LONG POINTER, nWords: CARDINAL]
    RETURNS [POINTER TO UNSPECIFIED] = --INLINE-- {
    result: POINTER TO UNSPECIFIED ← DBHeapStorage.Node[nWords];
    I.LongCOPY[from:p, nwords:nWords, to:LONG[result]];
    RETURN[result];
    };

  WriteRecord: PROC [p: LONG POINTER, nWords: CARDINAL,
    r: POINTER TO UNSPECIFIED] = --INLINE-- {
    I.LongCOPY[from: LONG[r], nwords:nWords, to:p];
    };

  WordsForString: PROC[nChars: CARDINAL] RETURNS[CARDINAL] = INLINE {
    RETURN[(nChars+1)/2]
    };

  ReadString: PROC[pagePtr: LONG POINTER TO DBStorageVec.VecPage,
    p: LONG POINTER TO DBStorageString.IString, nWords: CARDINAL]
    RETURNS[Rope.ROPE] = {
    -- here nWords refers to the length of the VarByte field being read
    result: Rope.Text;
    wordsForInlineString: CARDINAL ← WordsForString[p.bytesInString];
    IF DoChecks AND DBStorageString.SizeOfNullIString + wordsForInlineString > nWords THEN
      ERROR DBEnvironment.InternalError;--p.bytesInString and field handle are inconsistent
    IF p.slotOfExtension = 0 THEN { --handle simple inline string
      result ← RopeInline.NewText[p.bytesInString];
      I.LongCOPY[from: @p.words[0], nwords: wordsForInlineString,
        to: LOOPHOLE[result, LONG POINTER] + SIZE[TEXT[0]]];
      }
    ELSE {--handle "long string"
      --allocate storage for whole result and copy in the initial portion
      lString: LONG POINTER TO DBStorageString.LString;
      totalWordsForString: CARDINAL;
      IF DoChecks AND p.bytesInString # 2*(nWords - DBStorageString.SizeOfNullIString) THEN
        ERROR InternalError; -- TrashedPage, extension used only if inline space is full
      IF DoChecks AND
       DBStorageVec.TypeOfSlot[pagePtr, p.slotOfExtension] # DBStorageVectags.LString THEN
        ERROR InternalError; --slot type inconsistent
      lString ← LOOPHOLE[DBStorageVec.VecOfSlot[pagePtr, p.slotOfExtension]];
      totalWordsForString ← wordsForInlineString + WordsForString[lString.bytesInRemString];
      result ← RopeInline.NewText[p.bytesInString + lString.bytesInRemString];
      I.LongCOPY[from: @p.words[0], nwords: wordsForInlineString,
        to: LOOPHOLE[result, LONG POINTER] + SIZE[TEXT[0]]];
      ReadBigString[eStringID: lString.eStringID,
        copyTo: LOOPHOLE[result, LONG POINTER] + SIZE[TEXT[0]] + wordsForInlineString,
        wordsToCopy: totalWordsForString - wordsForInlineString];
      };
    RETURN[result];
    };

  ReadBigString: PROC[eStringID: TID, copyTo: LONG POINTER, wordsToCopy: CARDINAL] = {
    -- Called from: ReadString.
    wordsCopied: CARDINAL ← 0;
    WHILE eStringID # NullTID DO
      wordsForCurrentText: CARDINAL;
      sPagePtr: LONG POINTER TO DBStorageVec.VecPage; sCacheHdl: DBCache.CacheHandle;
      sDBPage: DBPage; slotIndex: CARDINAL;
      eString: LONG POINTER TO DBStorageString.EString;
      [sDBPage, slotIndex] ← DecomposeTID[eStringID];
      [sCacheHdl, sPagePtr] ← DBSegment.ReadPage[sDBPage, NIL];
      IF DBStorageVec.TypeOfSlot[sPagePtr, slotIndex] # DBStorageVectags.EString THEN
        ERROR DBEnvironment.InternalError; --slot type inconsistent
      eString ← LOOPHOLE[DBStorageVec.VecOfSlot[sPagePtr, slotIndex]];
      wordsForCurrentText ← eString.header.length - DBStorageString.SizeOfNullEString;
      IF wordsToCopy < (wordsCopied + wordsForCurrentText) THEN
        ERROR DBEnvironment.InternalError; --string length inconsistent
      I.LongCOPY[from: @eString.words[0], nwords: wordsForCurrentText,
                          to: copyTo + wordsCopied];
      wordsCopied ← wordsCopied + wordsForCurrentText;
      eStringID ← eString.eStringID;
      DBSegment.UnlockPage[sCacheHdl];
    ENDLOOP;
    IF wordsToCopy # wordsCopied THEN
      ERROR DBEnvironment.InternalError; --string length inconsistent
  };--ReadBigString
  
  CardinalFromLongInteger: PROC [i: LONG INTEGER] RETURNS [CARDINAL] = INLINE {
    IF i < 0 OR i > LAST[CARDINAL] THEN ERROR;
    RETURN [I.LowHalf[i]] };

  WriteString: PROC[dbPage: DBPage, pagePtr: LONG POINTER TO DBStorageVec.VecPage,
   iStringSlot: CARDINAL, iStringOffset: CARDINAL, nWords: CARDINAL, s: Rope.ROPE] = {
    -- here nWords refers to the length of the VarByte field being written.
    --note that page pagePtr is write-locked in the cache.
    p: LONG POINTER TO DBStorageString.IString ←
      LOOPHOLE[DBStorageVec.VecOfSlot[pagePtr, iStringSlot] + iStringOffset];
    bytesInS: CARDINAL = CardinalFromLongInteger[s.Size[]];
    totalWordsForString: CARDINAL ← WordsForString[bytesInS];
    -- get rid of extension, if any
    IF p.slotOfExtension # 0 THEN FreeBigString[pagePtr, p.slotOfExtension];
    IF SIZE[DBStorageString.IString] + totalWordsForString <= nWords THEN {
      -- string fits into body of tuple, just copy it in
      p↑ ← [bytesInString: bytesInS, slotOfExtension: 0, rest: ];
      FOR i: CARDINAL IN [0..bytesInS) DO p.text[i] ← s.Fetch[i] ENDLOOP }
    ELSE {
      -- long string, go off and do it
      WriteBigString[dbPage: dbPage, pagePtr: pagePtr, iStringSlot: iStringSlot,
        iStringOffset: iStringOffset, iWords: nWords, from: s, nBytes: bytesInS];
    };--IF
  };--WriteString

  FreeBigString: PROC[sPagePtr: LONG POINTER TO DBStorageVec.VecPage, slotIndex: CARDINAL] = {
    -- starting from the LString at slotIdex on the given page, free all space on all pages
    --associated with this string.
    sCacheHdl: DBCache.CacheHandle; sDBPage: DBPage;
    lString: LONG POINTER TO DBStorageString.LString;
    eStringID: TID;
    DBStorageVectags.AssertVecIsLString[sPagePtr, slotIndex];
    lString ← LOOPHOLE[DBStorageVec.VecOfSlot[sPagePtr, slotIndex]];
    eStringID ← lString.eStringID;
    DBStorageVec.FreeVec[sPagePtr, slotIndex]; --sPagePtr is already write-enabled
    WHILE eStringID # NullTID DO
      -- chain through pages, freeing vecs.  if page becomes empty, free it, too
      eString: LONG POINTER TO DBStorageString.EString;
      [sDBPage, slotIndex] ← DecomposeTID[eStringID];
      [sCacheHdl, sPagePtr] ← DBSegment.WritePage[sDBPage, NIL];
      DBStoragePagetags.AssertPageIsOverflowTuplePage[sPagePtr];
      DBStorageVectags.AssertVecIsEString[sPagePtr, slotIndex];
      eString ← LOOPHOLE[DBStorageVec.VecOfSlot[sPagePtr, slotIndex]];
      eStringID ← eString.eStringID;
      DBStorageVec.FreeVec[sPagePtr, slotIndex];
      IF DBStorageVec.HighSlotIndexOfPage[sPagePtr] = 0 THEN {
        DBSegment.FreePage[DBSegment.SegmentIDFromDBPage[sDBPage], sDBPage, sCacheHdl]; }
      ELSE {
        DBSegment.UnlockPage[sCacheHdl];
      };
    ENDLOOP;
  };--FreeBigString

  WriteBigString: PROC[dbPage: DBPage, pagePtr: LONG POINTER TO DBStorageVec.VecPage,
   iStringSlot: CARDINAL, iStringOffset: CARDINAL, iWords: CARDINAL, from: Rope.ROPE,
   nBytes: CARDINAL] = {
    -- Writes a long string (longer than lengthHint).  dbPage,,pagePtr is page on which
    --iString is located; we need dbPage to identify the segment when allocating a page.
    --iWords is total length of iString.
    iString: LONG POINTER TO DBStorageString.IString ←
      LOOPHOLE[DBStorageVec.VecOfSlot[pagePtr, iStringSlot] + iStringOffset];
    wordsThisWrite: CARDINAL;
    wordsToWrite: CARDINAL ← WordsForString[nBytes];
    wordsWritten: CARDINAL ← 0;
    lStringSlot: CARDINAL;
    link: LONG POINTER TO TID;  linkHdl: DBCache.CacheHandle;
    -- Write the inline portion of iString.
    wordsThisWrite ← iWords - SIZE[DBStorageString.IString];
    IF wordsThisWrite >= wordsToWrite THEN ERROR DBEnvironment.InternalError; --[TrashedPage];
    iString.bytesInString ← 2*wordsThisWrite;
    FOR i: CARDINAL IN [0..2*wordsThisWrite) DO
      iString.text[i] ← from.Fetch[i]
      ENDLOOP;
    wordsToWrite ← wordsToWrite - wordsThisWrite;
    wordsWritten ← wordsWritten + wordsThisWrite;
    -- Allocate an LString for off-page ref, point iString to it, but store no data words in it.
    {
      success: BOOLEAN;
      lString: LONG POINTER TO DBStorageString.LString;
      [lStringSlot, success] ← DBStorageVec.AllocVec[pagePtr, SIZE[DBStorageString.LString]];
      IF ~success THEN ERROR InternalError; -- PageOverflowNotImplemented
      DBStorageVec.SetTypeOfSlot[pagePtr, lStringSlot, DBStorageVectags.LString];
      iString ← LOOPHOLE[DBStorageVec.VecOfSlot[pagePtr, iStringSlot] + iStringOffset];
      iString.slotOfExtension ← lStringSlot;
      lString ← LOOPHOLE[DBStorageVec.VecOfSlot[pagePtr, lStringSlot]];
      lString.bytesInRemString ← nBytes - 2*wordsWritten;
      link ← @lString.eStringID;  linkHdl ← NIL;
    };
    -- Write the rest of the string.  For now, all writing goes to other pages.
    WHILE wordsToWrite > 0 DO
      -- Find a page to write on.
      sDBPage: DBPage; sCacheHdl: DBCache.CacheHandle; sPagePtr: LONG POINTER TO DBStorageVec.VecPage;
      FOR curSlot: CARDINAL DECREASING IN [1..DBStorageVec.HighSlotIndexOfPage[pagePtr]] DO
        -- Is this a candidate overflow page?
        IF (DBStorageVec.TypeOfSlot[pagePtr, curSlot] # DBStorageVectags.LString) OR
           (curSlot = lStringSlot) THEN LOOP;
        -- Will the page pointed to by this LString hold the remaining string?
        --Read the page and see.
        {
          lString: LONG POINTER TO DBStorageString.LString ←
           LOOPHOLE[DBStorageVec.VecOfSlot[pagePtr, curSlot]];
          [sDBPage,] ← DecomposeTID[lString.eStringID];
          [sCacheHdl, sPagePtr] ← DBSegment.ReadPage[sDBPage, NIL];
          DBStoragePagetags.AssertPageIsOverflowTuplePage[sPagePtr];
          IF DBStorageVec.WordsInLargestAllocableVec[sPagePtr] >=
             (wordsToWrite + DBStorageString.SizeOfNullEString) THEN GOTO FoundPage;
          DBSegment.UnlockPage[sCacheHdl];
        };
      REPEAT
        FoundPage => { DBSegment.WriteLockedPage[sCacheHdl]; };
        FINISHED => {
          -- Remainder of string will not fit on any existing page that is an immediate descendant
          --of pagePtr.  Allocate a new page to hold at least a prefix of the remainder.
          [sDBPage, sCacheHdl, sPagePtr] ←
            DBSegment.AllocPage[DBSegment.SegmentIDFromDBPage[dbPage]];
          DBSegment.WriteLockedPage[sCacheHdl];
          DBStorageVec.InitializeVecPage[sPagePtr, DBStoragePagetags.OverflowTuple];
        }--FINISHED
      ENDLOOP;
      -- Here sDBPage, sCacheHdl, and sPagePtr ref the writeable page where a prefix of the remaining
      --string will now be stored.  link points to a location on a write-locked cache page that
      --needs to be filled with the "TID" of this new string, and linkHdl, if # NIL, is a cache
      --handle that must be release once this write has been done.
      -- Now, write the next piece of string and update link, linkHdl.
      {
        vecLen, eStringSlot: CARDINAL;
        success: BOOLEAN;
        eString: LONG POINTER TO DBStorageString.EString;
        vecLen ← MIN[wordsToWrite + DBStorageString.SizeOfNullEString,--what we want--
                     DBStorageVec.WordsInLargestAllocableVec[sPagePtr]--the most we can get--];
        IF DBStorageString.SizeOfNullEString >= vecLen THEN ERROR InternalError; -- TrashedPage
        wordsThisWrite ← vecLen - DBStorageString.SizeOfNullEString;
        [eStringSlot, success] ← DBStorageVec.AllocVec[sPagePtr, vecLen];
        IF ~success THEN ERROR DBEnvironment.InternalError; --huh?
        DBStorageVec.SetTypeOfSlot[sPagePtr, eStringSlot, DBStorageVectags.EString];
        eString ← LOOPHOLE[DBStorageVec.VecOfSlot[sPagePtr, eStringSlot]];
        link↑ ← DBStorageTID.ConsTID[sDBPage, eStringSlot];
        IF linkHdl # NIL THEN DBSegment.UnlockPage[linkHdl];
        link ← @eString.eStringID;  linkHdl ← sCacheHdl;
	FOR i: CARDINAL IN [0..MIN[2*wordsThisWrite, nBytes-2*wordsWritten]) DO
	  eString.text[i] ← from.Fetch[2*wordsWritten + i]
	  ENDLOOP;
        wordsToWrite ← wordsToWrite - wordsThisWrite;
        wordsWritten ← wordsWritten + wordsThisWrite;
      };
    ENDLOOP;
    -- Terminate the list of EStrings with NullTID.
    link↑ ← NullTID;
    IF linkHdl # NIL THEN DBSegment.UnlockPage[linkHdl];
  };--WriteBigString

  -- Low-level group support


  ReadGroupField: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle,
   part: DBStorageGroup.TIDFieldPart] RETURNS[TupleHandle] = {
    -- returns the indicated part of x's group field f.
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    p: LONG POINTER TO DBStorageGroup.TIDFieldBody;
    resultTID: TID;
    result: TupleHandle;
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBStorageField.CheckFieldHandleType[f, Group];
    CheckFieldAccess[tupleBase, f];
    p ← LOOPHOLE[tupleBase + DBStorageField.FieldOffset[f]];
    resultTID ← SELECT part FROM
                  headTID => p.headTID,
                  prevTID => p.prevTID,
                  nextTID => p.nextTID,
                ENDCASE => ERROR;
    result ← IF resultTID = NullTID THEN NIL
             ELSE DBStorageTuple.ConsTupleObject[tid: resultTID, cacheHint: NIL];
    DBSegment.UnlockPage[cacheHint];
    RETURN[result];
  };--ReadGroupField

  WriteGroupField: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle,
   part: DBStorageGroup.TIDFieldPart, val: TupleHandle] = {
    -- writes val's TID into the indicated part of x's group field f.
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    p: LONG POINTER TO DBStorageGroup.TIDFieldBody;
    valTID: TID;
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBSegment.WriteLockedPage[cacheHint];
    DBStorageField.CheckFieldHandleType[f, Group];
    CheckFieldAccess[tupleBase, f];
    valTID ← IF val = NIL THEN NullTID ELSE val.tid; 
    p ← LOOPHOLE[tupleBase + DBStorageField.FieldOffset[f]];
    SELECT part FROM
      headTID => p.headTID ← valTID;
      prevTID => p.prevTID ← valTID;
      nextTID => p.nextTID ← valTID;
    ENDCASE => ERROR;
    DBSegment.UnlockPage[cacheHint];
  };--WriteGroupField

  ReadHeadField: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle,
   part: DBStorageGroup.HeadFieldPart]
   RETURNS[TupleHandle] = {
    -- returns the indicated part of x's GroupListEntry for group field f.
    -- error if no such entry.  result is never NIL.
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    p: LONG POINTER TO DBStorageGroup.GroupListEntry;
    result: TupleHandle;
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    p ← GetGroupListEntry[tupleBase, f];
    result ← SELECT part FROM
      firstTID => DBStorageTuple.ConsTupleObject[tid: p.firstTID, cacheHint: NIL],
      lastTID => DBStorageTuple.ConsTupleObject[tid: p.lastTID, cacheHint: NIL],
    ENDCASE => ERROR;
    DBSegment.UnlockPage[cacheHint];
    RETURN[result];
  };--ReadHeadField

  WriteHeadField: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle,
   part: DBStorageGroup.HeadFieldPart, val: TupleHandle] = {
    -- writes val's tid into the indicated part of x's GroupListEntry for group field f.
    -- error if no such entry.  val is never NIL.
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    p: LONG POINTER TO DBStorageGroup.GroupListEntry;
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBSegment.WriteLockedPage[cacheHint];
    p ← GetGroupListEntry[tupleBase, f];
    SELECT part FROM
      firstTID => p.firstTID ← val.tid;
      lastTID => p.lastTID ← val.tid;
    ENDCASE => ERROR;
    DBSegment.UnlockPage[cacheHint];
  };--WriteHeadField

  GetGroupListEntry: PROC[tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody,
   f: DBStorage.FieldHandle]
   RETURNS[LONG POINTER TO DBStorageGroup.GroupListEntry] = {
    -- returns long pointer to tupleBase's GroupListEntry for group field f.
    -- error if no such entry.  used by ReadHeadField, WriteHeadField.
    groupList: LONG DESCRIPTOR FOR DBStorageGroup.GroupList;
    i: CARDINAL;
    DBStorageField.CheckFieldHandleType[f, Group];
    groupList ← GroupListFromTupleBase[tupleBase];
    FOR i IN [0..LENGTH[groupList]) DO
      IF groupList[i].groupID=DBStorageField.GroupIDOfField[f] THEN RETURN[@groupList[i]];
    ENDLOOP;
    ERROR DBEnvironment.InternalError; -- GroupListEntryNotFound 
  };--GetGroupListEntry

  GroupListFromTupleBase: PUBLIC PROC[p: LONG POINTER TO DBStorageExtTuple.TupleBody]
   RETURNS[LONG DESCRIPTOR FOR DBStorageGroup.GroupList] = {
    -- returns groupList of tuple p.  this is the only proc that has any business
    --reading p.groupOffset (except CheckFieldAccess).
    groupListLen: INTEGER ← DBStorageVec.LengthOfVec[LOOPHOLE[p]] - p.groupOffset;
    IF DoChecks AND
     (groupListLen < 0 OR groupListLen MOD SIZE[DBStorageGroup.GroupListEntry] # 0) THEN
      ERROR DBEnvironment.InternalError; -- BadGroupListLength
    RETURN[DESCRIPTOR[
     LOOPHOLE[p+p.groupOffset, LONG POINTER TO DBStorageGroup.GroupList],
     groupListLen/SIZE[DBStorageGroup.GroupListEntry]  ]];
  };--GroupListFromTupleBase

  GroupListFromTuple: PUBLIC PROC[x: TupleHandle]
   RETURNS[LONG DESCRIPTOR FOR DBStorageGroup.GroupList, DBCache.CacheHandle] = {
    -- Returns the BASE and LENGTH of the tuple's GroupList.  The BASE is a LONG POINTER to a locked
    --readonly cache page, and the second result is the cache handle to be used for unlocking it.
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    RETURN[GroupListFromTupleBase[tupleBase],cacheHint];
  };--GroupListFromTuple

  CreateHeadEntry: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle] = {
    -- creates a group list entry in tuple x for group field f.
    -- error if already such an entry
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    cacheHint: DBCache.CacheHandle;
    groupList: LONG DESCRIPTOR FOR DBStorageGroup.GroupList;
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBStorageField.CheckFieldHandleType[f, Group];
    groupList ← GroupListFromTupleBase[tupleBase];
    BEGIN -- check to make sure we don't duplicate an entry
      i: CARDINAL;
      FOR i IN [0..LENGTH[groupList]) DO
        IF groupList[i].groupID = DBStorageField.GroupIDOfField[f] THEN BEGIN
          DBSegment.UnlockPage[cacheHint];
          ERROR DBEnvironment.InternalError; -- GroupListEntryAlreadyThere;
        END;--IF
      ENDLOOP;
    END;
    BEGIN -- expand the group list. this code is a crock, and WON'T work with overflow pages!
      xs: StoredTupleHandle = NARROW[x];
      dbPage: DBCommon.DBPage; p: LONG POINTER TO DBStorageVec.VecPage; slotIndex: CARDINAL;
      success: BOOLEAN;
      [dbPage,slotIndex] ← DecomposeTID[xs.tid];
      [xs.cacheHint, p] ← DBSegment.WritePage[dbPage, xs.cacheHint];
      -- tag has already been checked, and the tuple must be here (no overflow pages!) 
      success ← DBStorageVec.ModifyVec[p, slotIndex, SIZE[DBStorageGroup.GroupListEntry], TRUE];
      -- for now, error on overflow.  this is where the hair in handling overflow pages
      --will come.
      IF ~success THEN ERROR InternalError; -- ProbablyBadExpectedNTupleRefs;
      tupleBase ← LOOPHOLE[DBStorageVec.VecOfSlot[p, slotIndex]]; 
      groupList ← GroupListFromTupleBase[tupleBase];
      groupList[LENGTH[groupList]-1] ←
        [groupID:  DBStorageField.GroupIDOfField[f],
         firstTID: NullTID,
         lastTID:  NullTID];
      DBSegment.UnlockPage[xs.cacheHint];
    END;
  DBSegment.UnlockPage[cacheHint];
  };--CreateHeadEntry

  DestroyHeadEntry: PUBLIC PROC[x: TupleHandle, f: DBStorage.FieldHandle] = {
    -- removes the group list entry in tuple x for group field f.
    -- error if no such entry
    cacheHint: DBCache.CacheHandle;
    tupleBase: LONG POINTER TO DBStorageExtTuple.TupleBody;
    groupList: LONG DESCRIPTOR FOR DBStorageGroup.GroupList;
    entry: CARDINAL;
    [tupleBase,cacheHint] ← ReadableTupleBase[x];
    DBStorageField.CheckFieldHandleType[f, Group];
    groupList ← GroupListFromTupleBase[tupleBase];
    -- find the entry to delete, error if not found
    FOR entry IN [0..LENGTH[groupList]) DO
      IF groupList[entry].groupID = DBStorageField.GroupIDOfField[f] THEN EXIT;
    REPEAT
      FINISHED => BEGIN
        DBSegment.UnlockPage[cacheHint];
        ERROR InternalError; -- GroupListEntryNotFound
      END;--FINISHED
    ENDLOOP;
    BEGIN -- move entries with index > entry down one spot (could use LongCOPY...)
      i: CARDINAL;
      FOR i IN [entry+1..LENGTH[groupList]) DO
        groupList[i-1] ← groupList[i]
      ENDLOOP;
    END;
    BEGIN -- contract the group list. this code is a crock, and WON'T work with overflow pages!
      xs: StoredTupleHandle = NARROW[x];
      dbPage: DBCommon.DBPage; p: LONG POINTER TO DBStorageVec.VecPage; slotIndex: CARDINAL;
      success: BOOLEAN;
      [dbPage,slotIndex] ← DecomposeTID[xs.tid];
      [xs.cacheHint, p] ← DBSegment.WritePage[dbPage, xs.cacheHint];
      -- tag has already been checked, and the tuple must be here (no overflow pages!) 
      success ← DBStorageVec.ModifyVec[p, slotIndex, -SIZE[DBStorageGroup.GroupListEntry], TRUE];
      -- should always succeed!
      IF ~success THEN ERROR DBEnvironment.InternalError;
      DBSegment.UnlockPage[xs.cacheHint];
    END;
  DBSegment.UnlockPage[cacheHint];
  };--DestroyHeadEntry

  systemTupleTable: REF ARRAY [1..DBStorage.MaxSystemTupleID] OF DBStorage.TupleHandle ← NIL;
    -- The contents are probably not really DBStorage.TupleHandles, but we never look!  These
    --are passed back to the tuple level as results of ReadTupleset or GetGroupIDs on
    --dictionary tuples.  All accesses to systemTupleTable are through the following two
    --procedures:

  SetSystemTupleTable: PUBLIC PROC[sTTPtr:
   REF ARRAY [1..DBStorage.MaxSystemTupleID] OF DBStorage.TupleHandle] = {
    -- EXPORTed to DBStorage
    systemTupleTable ← sTTPtr;
  };--SetSystemTupleTable

  TupleFromTID: PUBLIC PROC[id: TID] RETURNS[DBStorage.TupleHandle] = {
    -- EXPORTed to DBStorageSystemTupleTable
    result: DBStorage.TupleHandle;
    IF DoChecks AND id NOT IN [1..DBStorage.MaxSystemTupleID] THEN
      ERROR DBEnvironment.InternalError; -- [BadSystemTupleID];
    result ← systemTupleTable[I.LowHalf[id]];
    IF DoChecks AND result = NIL THEN
      ERROR DBEnvironment.InternalError; -- [NoSystemTupleForID];
    RETURN[result];
  };--TupleFromTID


END.--DBStorageImplB


CHANGE LOG

Created by MBrown on February 16, 1980  12:07 AM
-- Read1Word, Read2Word, and ReadNWord were taken from earlier version, coded February
--10 (no overflow page reads).

Changed by MBrown on February 27, 1980  11:07 PM
-- Further fleshing out, including ReadTupleset and ReadVarByte.

Changed by MBrown on February 28, 1980  3:29 PM
-- Complete modulo restrictions on overflow pages and out-of-line strings.

Changed by MBrown on 28-Feb-80 21:51
-- Make sure that slotOfExtension is set to 0, even though no extensions are allowed.

Changed by MBrown on February 29, 1980  11:10 AM
-- WriteRecord/String/Array deallocated their inputs, which was supposed to be done by
--the DBStorage client.

Changed by MBrown on March 6, 1980  3:26 PM
-- Coded interim implementation of groups.

Changed by MBrown on  6-Mar-80 21:36
-- Blunder: updated group list through index i rather than nEntries.  I thought the
--right thing but just copied the code without implementing the thought!  At same time,
--noticed that a LONG POINTER into a tuple was being held across a call to ModifyVec,
--which is not OK.

Changed by MBrown on  6-Mar-80 22:09
-- Blunder: followed nextTID rather than prevTID in PrevInGroup.  This was coded by
--transforming NextInGroup, and this change was missed.

Changed by MBrown on April 16, 1980  11:15 PM
-- Group section has now been almost completely rewritten, and now is supposed to work
--even when multiple scans are in progress, etc.

Changed by MBrown on April 17, 1980  4:57 PM
-- The above runs DBTest3-DBTest6 successfully.  But I noted a bug in comparing tuple
--handles or field handles for equality by pointer equality in group scan fixups.  Also
--changed GetGroupIDs to return an element of systemTupleTable when the TID is small;
--this  problem was noted while writing a test program that attempts to implement
--dictionary tuples for both tuplesets and attributes.

Changed by MBrown on April 17, 1980  10:19 PM
-- Added USING clauses.

Changed by MBrown on April 21, 1980  11:48 PM
-- CloseScanGroup deallocated tuple handles without checking for NIL; it now checks.
--Now runs StCTst1 successfully.  But this test doesn't use multiple concurrent scans...

Changed by MBrown on June 1, 1980  12:05 PM
-- Localizing references to groupOffset in preparation for overflow page work.
--Introduced GroupListFromTupleBase as a replacement for NEntriesInGroupList; this
--cleaned things up a great deal.

Changed by MBrown on June 8, 1980  8:40 PM
-- Changes to accomodate DBStorage.FieldObject as an exported type.  CopyFieldObject
--and FreeFieldObject moved to DBStorageField.

Changed by MBrown on June 10, 1980  10:37 PM
-- Changes to accomodate DBStorage.TupleObject as an exported type.

Changed by MBrown on June 17, 1980  1:25 PM
-- Changes to allow moving group scan implementation elsewhere.

Changed by MBrown on June 18, 1980  8:23 PM
-- Made GroupListFromTupleBody public.

Changed by MBrown on July 24, 1980  10:41 AM
-- On WriteVarByte of a too-long string, raise Retryable[LAST[CARDINAL]].  If resumed, store
--as much of the string as possible, followed by "~~".

Changed by MBrown on July 24, 1980  11:16 PM
-- Implemented GetNWordBase.

Changed by MBrown on August 12, 1980  3:59 PM
-- Starting a simple implementation of "long strings" (overflow pages).  The simplicity comes in
--FORCING the string onto an overflow page, instead of trying to fit it onto the same page as its
--tuple.  This would cause group list page overflows, which we aren't ready to handle yet.

Changed by MBrown on August 21, 1980  10:17 AM
-- More work on long strings.

Changed by MBrown on August 22, 1980  9:15 PM
-- Finished coding on long strings.

Changed by MBrown on August 23, 1980  11:09 AM
-- Bug found on first (single-stepping) test run: in ReadBigString, failed to follow the eStringID
--link to the next EString, thus we tried to copy the same EString over and over.  Caught by
--check for "impossible" error.

Changed by MBrown on August 23, 1980  11:23 AM
-- (Later in same test run) in WriteBigString, failed Unlock a page after
--it was examined to see if allocation could take place on it, and there was not enough room.  Caught
--by DBSegment.FreePage, which found the lock count > 1.

Changed by MBrown on August 23, 1980  11:32 AM
-- (Later in same test run) in FreeBigString, Unlocked a page after it had been Freed.

Comment by MBrown on August 23, 1980  11:43 AM
-- First test runs to completion.  Needed now: cache state printer, stronger exercise for
--big strings.

Changed by MBrown on August 28, 1980  4:50 PM
-- Bug (revealed by Eric Schmidt's first program): I was holding onto a vec pointer across a call
--to AllocVec for that page.  The treatment of pointers into pages needs to be cleaned up throughout
--the storage level, maybe by introducing some new objects that can hold together all of the right
--stuff.

Changed by MBrown on September 26, 1980  3:46 PM
-- Converted to new DBException.

Changed by MBrown on December 7, 1980  12:30 PM
-- Added DBStats calls.

Changed by MBrown on February 27, 1981  9:29 PM
-- Pre-Pilot conversions, alloc storage using DBHeapStorage.Node.

Changed by MBrown on 20-Jun-81 14:53:44
-- Cedar conversion, tuple rep is variant record, some results are collectible.

Changed by MBrown on 15-Jul-81 10:22:48
-- @<REF TEXT>.text does not do what you might expect (it gives a pointer to the maxlength
--field!).  Use LOOPHOLE[<REF TEXT>, LONG POINTER] + TEXTDataOffset instead.

Changed by MBrown on 29-Aug-81 23:02:52
-- Was returning a REF TEXT as a Rope.  Return a REF Rope.RopeRep[text] instead.

Changed by Cattell on 26-Sep-81 13:58:16
-- Added TupleFromTID.

Changed by Willie-Sue on June 24, 1982 12:20 pm
-- Rope.Ref => Rope.ROPE