-- File: DBSegmentImpl.mesa
-- Last edited by
  --  MBrown on February 25, 1983 1:49 pm
  --  Cattell on June 7, 1983 1:45 pm
  --  Willie-Sue on February 3, 1983 10:47 am

DIRECTORY
  Atom,
  ConvertUnsafe,
  Inline,
  DBCache,
  DBCommon,
  DBEnvironment,
  DBFile,
  DBSegment,
  DBSegmentPrivate,
  DBStats,
  DBStoragePagetags,
  DBStorageTID USING[DBPageIncrement,  TID],
  File,
  IO,
  OrderedSymbolTable,
  Rope USING [ROPE, Equal],
  RTOS USING [GetPermanentDataPages];

DBSegmentImpl: PROGRAM
  IMPORTS
    Atom,
    ConvertUnsafe,
    Inline,
    DBCommon,
    DBEnvironment,
    DBFile,
    DBSegment,
    DBStats,
    IO,
    FileTable: OrderedSymbolTable,
    Rope,
    RTOS
  EXPORTS
    DBSegment
  SHARES
    DBCache
  = BEGIN
  
  ROPE: TYPE = Rope.ROPE;
  STREAM: TYPE = IO.STREAM;
  Trans: TYPE = DBCommon.Trans;
  Segment: TYPE = DBCommon.Segment;
  SegmentID: TYPE = DBCommon.SegmentID;
  SegmentIndex: TYPE = DBCommon.SegmentIndex;
  VersionOptions: TYPE = DBCommon.VersionOptions;
  DBPage: TYPE = DBCommon.DBPage;
  NullDBPage: DBPage = DBCommon.NullDBPage;
  CacheHandle: TYPE = DBCache.CacheHandle;
  CacheRecord: TYPE = DBCache.CacheRecord;
  TID: TYPE = DBStorageTID.TID;

  HashTable: TYPE = RECORD [
    nItems: NAT ← 0, -- number of CacheRecords in table
    buckets: SEQUENCE size: NAT OF CacheHandle
    ];

  --      Module state

  initialized: BOOL ← FALSE;
  createIndexesProc: PROC [s: Segment] RETURNS [TID, TID];
  zone: ZONE;
  
  logReads: BOOL← FALSE;  -- causes reads & writes to be logged on UserExec
  
  files: FileTable.Table;
    -- maps SegmentID to interesting information about a file.
  cacheSize: NAT;
    -- number of pages allocated to cache.
  ht: REF HashTable;
    -- Fast map from DBPage to CacheHandle.  ht.size is a power of two.
  hashMask: CARDINAL;
    -- ht.size-1, used in computing hash function.
  lruList: CacheHandle;
    -- points to header node of doubly-linked list of CacheRecords.

  -- Data structure invariants:  A cache record is present in lruList for every
  --allocated cache page; there is no separate "free list".  occupied=FALSE means that
  --the page is free, and that the other cache record fields (readonly, written)
  --are undefined.  occupied=TRUE means that the page contains valid data; a page is
  --stored in ht iff occupied=TRUE.
  -- lruList points to a cache record with no associated cache page; this "header node"
  --makes the circular-list manipulations work properly.  The most-recently used page is
  --lruList.succ, and the least-recently used page is lruList.pred.


  Initialize: PUBLIC PROC [
    z: ZONE,
    nCachePages: NAT,
    useCacheFile: ROPE,
    segmentInitProc: PROC [s: Segment] RETURNS [TID, TID]] = {
    IF initialized THEN ERROR DBEnvironment.InternalError;
    cacheSize ← nCachePages;
    createIndexesProc ← segmentInitProc;
    zone ← z;
    FileTable.Initialize[
      sentinel1: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]],
      sentinel2: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]]];
    files ← FileTable.CreateTable[
      header: zone.NEW[FileTable.NodeRecord ← [segmentID: NullDBPage]]];
    InitializeBufferMgr[];
    initialized ← TRUE;
    };

  AttachSegment: PUBLIC PROC [
    fileName: ROPE, s: Segment, segmentIndex: SegmentIndex,
    readonly: BOOL, version: VersionOptions, initializeExistingFile: BOOL,
    nBytesInitial, nBytesPerExtent: INT] = {
    fileNode: FileTable.Node;
    segmentID: SegmentID = SegmentIDFromSegmentIndex[segmentIndex];
    IF NOT initialized THEN ERROR;
    fileNode ← files.Lookup[segmentID];
    IF fileNode # NIL THEN {
      IF fileNode.trans # NIL THEN -- Client declared segment when transaction open on it
        {SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN};
      IF fileNode.segmentID # segmentID OR fileNode.segment # s THEN
       ERROR DBEnvironment.Error[MismatchedExistingSegment];
      fileNode.initializeExistingFile ← initializeExistingFile;
      fileNode.fileName← fileName;
      fileNode.version← version;
      fileNode.readonly← readonly;
      fileNode.pagesInitial← MAX[1, DBFile.PagesFromBytes[nBytesInitial]];
      fileNode.pagesPerExtent← MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]];
      RETURN;
      };
    fileNode ← zone.NEW[FileTable.NodeRecord ← [
      segmentID: segmentID,
      segment: s,
      readonly: readonly,
      version: version,
      initializeExistingFile: initializeExistingFile,
      fileName: fileName,
      pagesInitial: MAX[1, DBFile.PagesFromBytes[nBytesInitial]],
      pagesPerExtent: MAX[1, DBFile.PagesFromBytes[nBytesPerExtent]]
      ]];
    files.Insert[nodeToInsert: fileNode, insertKey: segmentID];
    };

  OpenTransaction: PUBLIC PROC [s: Segment, useTrans: Trans, noLog: BOOL] = {
    fileNode: FileTable.Node = FindFile[s];
    IF fileNode=NIL THEN
      {SIGNAL DBEnvironment.Error[SegmentNotDeclared]; RETURN};
    IF fileNode.trans # NIL THEN
      {SIGNAL DBEnvironment.Error[TransactionAlreadyOpen]; RETURN};
    IF useTrans = NIL THEN  {
      server: ROPE = DBFile.FileServerFromFileName[fileNode.fileName];
      useTrans ← DBFile.CreateTransaction[server];
      };
    fileNode.trans ← useTrans;
    OpenSegment[fileNode, noLog];
    };

  OpenSegment: PROC [fileNode: FileTable.Node, noLog: BOOL] = {
    createdFile: BOOL;
    IF fileNode.trans = NIL THEN ERROR;
    [fileNode.openFile, createdFile] ← DBFile.OpenFile[t: fileNode.trans, file: fileNode.fileName,
      version: fileNode.version, discardFileContents: fileNode.initializeExistingFile,
      nPagesInitial: fileNode.pagesInitial, noLog: noLog];
    IF createdFile OR fileNode.initializeExistingFile THEN
     { InitializeSegment[fileNode]; fileNode.initializeExistingFile← FALSE}
    ELSE CheckSegment[fileNode];
    };

  GetSegmentInfo: PUBLIC PROC [
    s: Segment] RETURNS [filePath: ROPE, number: NAT, trans: Trans] = {
    fileNode: FileTable.Node← FindFile[s];
    IF fileNode=NIL THEN RETURN[NIL, 0, NIL]
    ELSE RETURN[
      fileNode.fileName,
      SegmentIndexFromSegmentID[fileNode.segmentID],
      fileNode.trans]
    };

  FinishTransaction: PUBLIC PROC [t: Trans, abort: BOOL, continue: BOOL] = {
    CloseFileIfNoTransaction: SAFE PROC [f: FileTable.Node]
      RETURNS [stop: BOOL] = TRUSTED {
      IF f.trans = t THEN { f.openFile ← NIL;  f.trans ← NIL };
      RETURN [stop: FALSE];
      };
    IF NOT abort AND NOT WriteoutCache[t] THEN ERROR; --<PUBLIC>
    DBFile.FinishTransaction[t, abort, continue];
    IF abort OR NOT continue THEN {
      AbortCache[t];
      files.EnumerateIncreasing[CloseFileIfNoTransaction];
      };
    };

  SegmentFromTID: PUBLIC PROC [tid: TID] RETURNS [s: Segment] = {
    fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[tid]];
    IF fileNode # NIL THEN RETURN [fileNode.segment] ELSE RETURN [NIL];
    };

  IsValidTID: PUBLIC PROC [tid: TID] RETURNS [valid: BOOL] = {
    fileNode: FileTable.Node = files.Lookup[InlineSegmentIDFromDBPage[tid]];
    RETURN [fileNode # NIL AND fileNode.openFile # NIL];
    };

  SegmentIDFromSegment: PUBLIC PROC [s: Segment] RETURNS [SegmentID] = {
    fileNode: FileTable.Node = FindFile[s];
    IF fileNode = NIL THEN
      ERROR DBEnvironment.Error[SegmentNotDeclared]; 
    IF fileNode.openFile = NIL THEN
      ERROR DBEnvironment.Error[TransactionNotOpen];
    RETURN [fileNode.segmentID];
    };

  FindFile: PROC [s: Segment] RETURNS [fileNode: FileTable.Node] = {
    -- If efficiency of this ever becomes important, we can attach info to prop list of atom.
    StopOnMatch: SAFE PROC [f: FileTable.Node]
      RETURNS [stop: BOOL] = TRUSTED {
      IF f.segment = s THEN { fileNode ← f; RETURN [stop: TRUE] };
      RETURN [stop: FALSE];
      };
    fileNode ← NIL;
    files.EnumerateIncreasing[StopOnMatch];
    RETURN [fileNode];
    };

  EnumerateSegments: PUBLIC PROC [
    enumProc: PROC [s: Segment, segmentIndex: SegmentIndex] RETURNS [stop: BOOL]] = {
    f: FileTable.Node ← files.LookupSmallest[];
    WHILE f # NIL DO
      IF enumProc[f.segment, SegmentIndexFromSegmentID[f.segmentID]].stop THEN EXIT;
      f ← files.LookupNextLarger[f.segmentID];
      ENDLOOP;
    };

  FilePageFromDBPage: PROC [dbPage: DBPage]
    RETURNS [readonly: BOOL, file: DBCommon.OpenFileHandle, page: CARDINAL] = {
    segmentID: SegmentID;
    fileNode: FileTable.Node;
    [segmentID, page] ← DecomposeDBPage[dbPage];
    fileNode ← files.Lookup[segmentID];
    IF fileNode = NIL OR (file ← fileNode.openFile) = NIL THEN
      ERROR DBEnvironment.Error[TransactionNotOpen];
    RETURN [fileNode.readonly, file, page];
    };

  --  Procs that understand the format of the segment head page.

  InitializeSegment: PROC [fileNode: FileTable.Node] = {
    segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
    segmentHeadHandle: CacheHandle;
    fileSize: CARDINAL = DBFile.GetSize[fileNode.openFile];
    fileNode.readonly ← FALSE;
    [segmentHeadHandle, segmentHead] ← DBSegment.WritePage[fileNode.segmentID, NIL];
    segmentHead↑ ← [
      segmentID: fileNode.segmentID,
      allocator: [
        firstPageInBlock: ConsDBPage[fileNode.segmentID, 1],
        nPagesInBlock: fileSize-1,
        pagesPerExtent: fileNode.pagesPerExtent]];
    ConvertUnsafe.AppendRope[
      to: LOOPHOLE[@(segmentHead.name)], from: Atom.GetPName[fileNode.segment]];
    [segmentHead.index1, segmentHead.index2] ← createIndexesProc[fileNode.segment];
    DBSegment.UnlockPage[segmentHeadHandle];
    };

  CheckSegment: PROC [fileNode: FileTable.Node] = {
    segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
    segmentHeadHandle: CacheHandle;
    [segmentHeadHandle, segmentHead] ← DBSegment.ReadPage[fileNode.segmentID, NIL];
    IF segmentHead.tag # DBStoragePagetags.AMap OR
      segmentHead.softwareCompatibilityVersion #
       DBCommon.softwareCompatibilityVersion OR
      segmentHead.seal # DBSegmentPrivate.Seal THEN
        ERROR DBEnvironment.Error[MismatchedExistingSegment];
    IF NOT Rope.Equal[
      s1: Atom.GetPName[fileNode.segment],
      s2: ConvertUnsafe.ToRope[LOOPHOLE[@(segmentHead.name)]],
      case: FALSE] THEN ERROR DBEnvironment.Error[MismatchedExistingSegment];
    DBSegment.UnlockPage[segmentHeadHandle];
    };

  RootIndicesFromSegment: PUBLIC PROC [s: Segment] RETURNS [index1, index2: TID] = {
    segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
    segmentHeadHandle: CacheHandle;
    fileNode: FileTable.Node = FindFile[s];
    IF fileNode=NIL THEN ERROR DBEnvironment.Error[SegmentNotDeclared];
    [segmentHeadHandle, segmentHead] ← DBSegment.ReadPage[fileNode.segmentID, NIL];
    IF segmentHead.tag # DBStoragePagetags.AMap THEN
      ERROR DBEnvironment.Error[MismatchedExistingSegment];
    index1 ← segmentHead.index1;  index2 ← segmentHead.index2;
    DBSegment.UnlockPage[segmentHeadHandle];
    RETURN [index1, index2];
    };

  AllocPage: PUBLIC PROC [s: DBPage] RETURNS [allocatedPage: DBPage,
    allocatedPageHandle: CacheHandle, allocatedPagePtr: LONG POINTER] = {
    segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
    segmentHeadHandle: CacheHandle;
    DBStats.Inc[SegmentAllocPage];
    [segmentHeadHandle, segmentHead] ← DBSegment.WritePage[s, NIL];
    IF segmentHead.tag # DBStoragePagetags.AMap THEN
      ERROR DBEnvironment.Error[MismatchedExistingSegment];
    IF (allocatedPage ← segmentHead.allocator.freeList) # NullDBPage THEN {
      -- The segment's free list is nonempty, so return a page from it.
      allocatedFreeListPagePtr: LONG POINTER TO DBSegmentPrivate.FreeListPage;
      [allocatedPageHandle, allocatedFreeListPagePtr] ← DBSegment.WritePage[allocatedPage, NIL];
      IF allocatedFreeListPagePtr.tag # DBStoragePagetags.Free THEN ERROR DBEnvironment.InternalError;
      segmentHead.allocator.freeList ← allocatedFreeListPagePtr.next;
      allocatedPagePtr ← allocatedFreeListPagePtr;
      }
    ELSE {
      -- The segment's free list is empty, so allocate from block allocator, perhaps extending file.
      IF segmentHead.allocator.firstPageInBlock = NullDBPage THEN {
        fileNode: FileTable.Node ← files.Lookup[s];
        size: CARDINAL ← DBFile.GetSize[fileNode.openFile];
        DBFile.SetSize[fileNode.openFile, size + segmentHead.allocator.pagesPerExtent];
        segmentHead.allocator.firstPageInBlock ← ConsDBPage[s, size];
        segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.pagesPerExtent;
        };
      allocatedPage ← segmentHead.allocator.firstPageInBlock;
      IF (segmentHead.allocator.nPagesInBlock ← segmentHead.allocator.nPagesInBlock-1) = 0 THEN
        segmentHead.allocator.firstPageInBlock ← NullDBPage
      ELSE segmentHead.allocator.firstPageInBlock ←
        allocatedPage + DBStorageTID.DBPageIncrement;
      [allocatedPageHandle, allocatedPagePtr] ← NewPage[allocatedPage];
      };
    DBSegment.UnlockPage[segmentHeadHandle];
    LOOPHOLE[allocatedPagePtr, LONG POINTER TO DBStoragePagetags.PageHeader].pageTag ←
      DBStoragePagetags.Unused;
    RETURN[allocatedPage, allocatedPageHandle, allocatedPagePtr];
  };--AllocPage

  FreePage: PUBLIC PROC [s: DBPage, freePg: DBPage, freeHint: CacheHandle] = {
    segmentHead: LONG POINTER TO DBSegmentPrivate.SegmentHeadPage;
    segmentHeadHandle: CacheHandle;
    free: LONG POINTER TO DBSegmentPrivate.FreeListPage;
    DBStats.Inc[SegmentFreePage];
    IF s # InlineSegmentIDFromDBPage[freePg] THEN ERROR DBEnvironment.InternalError;
    [segmentHeadHandle, segmentHead] ← DBSegment.WritePage[s, NIL];
    IF segmentHead.tag # DBStoragePagetags.AMap THEN ERROR DBEnvironment.InternalError;
    IF DBSegment.LockCount[freePg, freeHint] # 1 THEN ERROR DBEnvironment.InternalError;
    free ← DBSegment.ObtainCoreLoc[freePg, freeHint];
    IF free.tag = DBStoragePagetags.Free OR free.tag = DBStoragePagetags.AMap THEN
      ERROR DBEnvironment.InternalError;
    DBSegment.WriteLockedPage[freeHint];
    free↑ ← [next: segmentHead.allocator.freeList];
    segmentHead.allocator.freeList ← freePg;
    DBSegment.UnlockPage[freeHint];
    DBSegment.UnlockPage[segmentHeadHandle];
  };--FreePage

  --  Procs that understand the bit layout of a DBPage.

  SegmentIDFromSegmentIndex: PROC [segmentIndex: SegmentIndex]
    RETURNS [SegmentID] = {
    segmentID: Inline.LongNumber.num = [num[lowbits: 0,
      highbits: Inline.BITSHIFT[value: segmentIndex, count: 7]]];
    RETURN [LOOPHOLE[segmentID]];
    };

  SegmentIndexFromSegmentID: PROC [segmentID: SegmentID]
    RETURNS [SegmentIndex] = {
    RETURN [Inline.BITSHIFT[
      value: LOOPHOLE[segmentID, Inline.LongNumber.num].highbits,
      count: -7]];
    };

  SegmentIDFromDBPage: PUBLIC PROC [dbPage: DBPage]
    RETURNS [SegmentID] = {
    RETURN [InlineSegmentIDFromDBPage[dbPage]]
    };

  InlineSegmentIDFromDBPage: PROC [dbPage: DBPage]
    RETURNS [SegmentID] = INLINE {
    segmentID: Inline.LongNumber.num = [num[lowbits: 0,
      highbits: Inline.BITAND[LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, 177600B]]];
    RETURN [LOOPHOLE[segmentID]];
    };

  DecomposeDBPage: PROC [dbPage: DBPage]
    RETURNS [SegmentID, CARDINAL] =  {
    segmentID1: Inline.LongNumber.num = [num[lowbits: 0,
      highbits: Inline.BITAND[LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, 177600B]]];
    filePage: CARDINAL ← Inline.BITOR[
      Inline.BITSHIFT[
        value: LOOPHOLE[dbPage, Inline.LongNumber.num].highbits, count: 10],
      Inline.BITSHIFT[
        value: LOOPHOLE[dbPage, Inline.LongNumber.num].lowbits, count: -6]];
    RETURN [LOOPHOLE[segmentID1], filePage];
    };

  ConsDBPage: PROC [segmentID: SegmentID, filePage: CARDINAL]
    RETURNS [dbPage: DBPage] = {
    --  Inverse of DecomposeDBPage
    dbPage1: Inline.LongNumber.num = [num[
      highbits: Inline.BITOR[
        LOOPHOLE[segmentID, Inline.LongNumber.num].highbits,
        Inline.BITSHIFT[value: filePage, count: -10]],
      lowbits: Inline.BITSHIFT[value: filePage, count: 6]]];
    RETURN[LOOPHOLE[dbPage1]];
    };

  Hash: PROC [dbPage: DBPage] RETURNS [NAT] = INLINE { RETURN [
    Inline.BITAND[
      hashMask,
      Inline.BITXOR[
        Inline.BITSHIFT[
          value: LOOPHOLE[dbPage, Inline.LongNumber.num].highbits,
          count: -7],
        Inline.BITSHIFT[
          value: LOOPHOLE[dbPage, Inline.LongNumber.num].lowbits,
          count: -6]]]];
    };

  -- Buffer manager

  invalidKey: DBPage = 37777777777B;
    -- doesn't match any true DBPage (since all DBPages have low bits 0).

  InitializeBufferMgr: PROC [] = {
    -- Number of hash buckets = smallest power of 2 not less than cacheSize.
    hashBuckets: CARDINAL ← 1;
    temp: CARDINAL ← Inline.BITSHIFT[cacheSize, -1];
    UNTIL temp=0 DO
      hashBuckets ← Inline.BITSHIFT[hashBuckets, 1];
      temp ← Inline.BITSHIFT[temp, -1];
      ENDLOOP;
    ht ← zone.NEW[HashTable[hashBuckets] ← [nItems: 0, buckets: ]];
    hashMask ← hashBuckets-1;
    -- Create cacheRecords, format them into an lru list.
    lruList ← zone.NEW[CacheRecord ← [dbPage: invalidKey, pagePtr: NIL]];
    lruList.pred ← lruList.succ ← lruList;
    FOR i: NAT IN [0 .. (cacheSize+15)/16) DO
      AddCachePages[16];
      ENDLOOP;
    };

  ReadPage: PUBLIC PROC [p: DBPage, pHint: CacheHandle]
    RETURNS [CacheHandle, LONG POINTER] = {
    IF logReads THEN
      DBCommon.GetDebugStream[].PutChar['#];
    DBStats.Inc[CacheReadOrWrite];
    IF pHint # NIL AND p = pHint.dbPage THEN GOTO ReturnPage;
    -- hint is wrong, so search hash table
    pHint ← Lookup[p];
    IF pHint # NIL THEN GO TO ReturnPage;
    -- page is not present in cache, read it in
    IF logReads THEN
      DBCommon.GetDebugStream[].PutChar['!];
    DBStats.Inc[CacheMiss];
    pHint ← GetUnoccupiedPage[];
    { -- We expect ERRORs to be raised from DBFile.ReadFilePage
      file: DBCommon.OpenFileHandle;  page: CARDINAL;
      [pHint.readonly, file, page] ← FilePageFromDBPage[p];
      DBFile.ReadFilePage[file, page, pHint.pagePtr]
      };
    pHint.dbPage ← p;  Insert[pHint];
    pHint.lockCount ← 0;
    GOTO ReturnPage;
    EXITS
      ReturnPage => {
        MakeMRU[pHint];
        pHint.lockCount ← pHint.lockCount + 1;
        RETURN[pHint, pHint.pagePtr]
      }
    };

  WritePage: PUBLIC PROC [p: DBPage, pHint: CacheHandle]
    RETURNS [CacheHandle, LONG POINTER] = {
    DBStats.Inc[CacheReadOrWrite];
    IF pHint # NIL AND p = pHint.dbPage THEN GO TO ReturnPage;
    -- hint is wrong, so search hash table
    pHint ← Lookup[p];
    IF pHint # NIL THEN GOTO ReturnPage;
    -- page is not present in cache, read it in
    DBStats.Inc[CacheMiss];
    pHint ← GetUnoccupiedPage[];
    { -- We expect ERRORs to be raised from DBFile.ReadFilePage
      file: DBCommon.OpenFileHandle;  page: CARDINAL;
      [pHint.readonly, file, page] ← FilePageFromDBPage[p];
      IF pHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed];
      DBFile.ReadFilePage[file, page, pHint.pagePtr]
      };
    pHint.dbPage ← p;  Insert[pHint];
    pHint.lockCount ← 0;
    GOTO ReturnPage;
    EXITS
      ReturnPage => {
        MakeMRU[pHint];
        pHint.lockCount ← pHint.lockCount + 1;
        pHint.written ← TRUE;
        RETURN[pHint, pHint.pagePtr]
      }
    };

  NewPage: PROC [p: DBPage]
    RETURNS [CacheHandle, LONG POINTER] = {
    pHint: CacheHandle ← Lookup[p];
    IF pHint # NIL THEN ERROR DBEnvironment.InternalError; -- [NotaNewPage]
    pHint ← GetUnoccupiedPage[];
    {
      [pHint.readonly, , ] ← FilePageFromDBPage[p];
      IF pHint.readonly THEN DBEnvironment.Error[WriteNotAllowed];
      };
    pHint.dbPage ← p;  Insert[pHint];
    MakeMRU[pHint];
    pHint.lockCount ← 1;
    pHint.written ← TRUE;
    RETURN[pHint, pHint.pagePtr]
    };

  WriteLockedPage: PUBLIC PROC [pValidHint: CacheHandle] = {
    IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN
      ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked]
    IF pValidHint.readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed];
    pValidHint.written ← TRUE;
    };

  LockCount: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle]
    RETURNS [CARDINAL] = {
    IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN
      ERROR DBEnvironment.InternalError; -- BadCacheHandle
    RETURN[pValidHint.lockCount];
    };

  ObtainCoreLoc: PUBLIC PROC [p: DBPage, pValidHint: CacheHandle]
    RETURNS [LONG POINTER] = {
    IF pValidHint = NIL OR pValidHint.dbPage # p OR pValidHint.lockCount = 0 THEN
      ERROR DBEnvironment.InternalError; -- [BadCacheHandle];
    RETURN[pValidHint.pagePtr];
    };

  UnlockPage: PUBLIC PROC [pValidHint: CacheHandle] = {
    IF pValidHint = NIL OR pValidHint.lockCount = 0 THEN
      ERROR DBEnvironment.InternalError; -- [AlreadyUnlocked];
    pValidHint.lockCount ← pValidHint.lockCount - 1;
    };

  WriteoutCache: PROC [t: Trans] RETURNS [success: BOOL] = {
    WritePageOutIfUsesTrans: PROC [cacheHandle: CacheHandle] = {
      IF cacheHandle.written THEN {
        fileNode: FileTable.Node =
          files.Lookup[InlineSegmentIDFromDBPage[cacheHandle.dbPage]];
        IF fileNode.trans = t THEN WritePageOut[cacheHandle];
        };
      };
    EnumerateHashTable[WritePageOutIfUsesTrans];
    RETURN[TRUE];
    };

  AbortCache: PROC [t: Trans] = {
    DeleteIfUsesTrans: PROC [cacheHandle: CacheHandle] = {
      IF cacheHandle.occupied THEN {
        fileNode: FileTable.Node =
          files.Lookup[InlineSegmentIDFromDBPage[cacheHandle.dbPage]];
        IF fileNode.trans = t THEN Delete[cacheHandle];
        };
      };
    EnumerateLRUList[DeleteIfUsesTrans];
    };

  WritePageOut: PROC [cacheHandle: CacheHandle] = {
    readonly: BOOL;  file: DBCommon.OpenFileHandle;  page: CARDINAL;
    DBStats.Inc[CacheWritePageBack];
    [readonly, file, page] ← FilePageFromDBPage[cacheHandle.dbPage];
    IF readonly THEN ERROR DBEnvironment.Error[WriteNotAllowed];
    DBFile.WriteFilePage[file, page, cacheHandle.pagePtr];
    cacheHandle.written ← FALSE;
    };

  -- Buffer manager LRU list

  AddCachePages: PROC [nPages: NAT] = {
    -- creates nPages new cache records, in the LRU position.
    curCachePage: LONG POINTER ← RTOS.GetPermanentDataPages[
      nPages: nPages*DBCommon.PagesPerDBPage, createUniformSwapUnits: TRUE];
    FOR i: NAT IN [0..nPages) DO
      p: CacheHandle ← zone.NEW[CacheRecord ← [
        dbPage: invalidKey, pagePtr: curCachePage]];
      curCachePage ← curCachePage + DBCommon.WordsPerPage;
      p.pred ← lruList.pred;  p.succ ← lruList;
      lruList.pred.succ ← p;  lruList.pred ← p;
      ENDLOOP;
    };

  MakeMRU: PROC [x: CacheHandle] = INLINE {
    -- Assumes that x is part of lruList.  Deletes x from its current position and makes
    --it first in lruList.
    x.pred.succ ← x.succ; x.succ.pred ← x.pred;
    x.succ ← lruList.succ;  x.pred ← lruList;
    lruList.succ.pred ← x; lruList.succ ← x;
    };

  EnumerateLRUList: PROC [procToCall: PROC[CacheHandle]] = {
    -- Calls procToCall for each record in LRUList, in MRU to LRU order.
    -- procToCall may not perform MakeMRU; use EnumerateHashTable if you want this.
    -- Note that procToCall will be called for both occupied and unoccupied pages.
    FOR p: CacheHandle ← lruList.succ, p.succ UNTIL p = lruList DO
      procToCall[p];
    ENDLOOP;
  };

  GetUnoccupiedPage: PROC RETURNS [CacheHandle] = {
    -- Obtain an empty cache page.  It will be the least-recently used page that
    --unoccupied or occupied but unlocked.  The page is returned with Key = garbage,
    --Page = ptr to a cache page, occupied = FALSE, written = FALSE,
    --and entered on lruList in no particular location.
    cacheHandle: CacheHandle;
    FOR cacheHandle ← lruList.pred, cacheHandle.pred UNTIL cacheHandle = lruList DO
      IF ~cacheHandle.occupied THEN GOTO FoundUnoccupiedPage;
      IF cacheHandle.lockCount = 0 THEN GOTO MakePageUnoccupied;
    REPEAT
      FoundUnoccupiedPage => NULL; -- ~cacheHandle.occupied
      MakePageUnoccupied => { -- cacheHandle.occupied AND cacheHandle.lockCount=0
        IF cacheHandle.written THEN WritePageOut[cacheHandle];
        Delete[cacheHandle];
        };
      FINISHED => ERROR DBEnvironment.Fatal[AllCachePagesLocked]
    ENDLOOP;
    cacheHandle.written ← FALSE;
    RETURN[cacheHandle];
    };

  -- Buffer manager hash table

  EnumerateHashTable: PROC[procToCall: PROC[CacheHandle]] = {
    -- Calls procToCall for each record in table.
    -- procToCall may not perform Delete; use EnumerateLRUList if you want this.
    FOR i: NAT IN [0..ht.size) DO
      FOR p: CacheHandle ← ht.buckets[i], p.hashChain UNTIL p = NIL DO
        procToCall[p];
      ENDLOOP;
    ENDLOOP;
  };

  Lookup: PROC[dbPage: DBPage] RETURNS[CacheHandle] = {
    i: CARDINAL ← Hash[dbPage];
    p: CacheHandle ← ht.buckets[i];
    DBStats.Inc[CacheHTLookup];
    WHILE p # NIL DO
      IF p.dbPage = dbPage THEN RETURN[p];
      DBStats.Inc[CacheHTConflictInLookup];
      p ← p.hashChain;
    ENDLOOP;
    RETURN[NIL];
  };

  Insert: PROC[x: CacheHandle] = {
    i: CARDINAL ← Hash[x.dbPage];
    -- Make sure its not there already...
    p: CacheHandle ← ht.buckets[i];
    WHILE p # NIL DO
      IF p.dbPage = x.dbPage THEN GOTO FoundDuplicate;
      p ← p.hashChain;
    REPEAT
      FoundDuplicate => ERROR DBEnvironment.InternalError; -- [AlreadyInHashTable];
      FINISHED => {
        x.hashChain ← ht.buckets[i];
        ht.buckets[i] ← x;
        ht.nItems ← ht.nItems + 1;
        x.occupied ← TRUE;
      };
    ENDLOOP;
  };

  Delete: PROC[x: CacheHandle] = {
    i: CARDINAL ← Hash[x.dbPage];
    p: CacheHandle ← ht.buckets[i];
    IF p = x THEN {ht.buckets[i] ← x.hashChain; GOTO Deleted};
    WHILE p # NIL DO
      IF p.hashChain = x THEN {p.hashChain ← x.hashChain; GOTO Deleted};
      p ← p.hashChain;
    REPEAT
      FINISHED => ERROR DBEnvironment.InternalError; -- [NotPresentInHashTable];
    ENDLOOP;
  EXITS
    Deleted => {
      x.dbPage ← invalidKey;
      x.occupied ← FALSE;
      x.hashChain ← NIL;
      ht.nItems ← ht.nItems - 1;
      };
  };

  -- Buffer manager debugging

  AssertionFailed: SIGNAL = CODE;
  Assert: PROC [assertion: BOOL] = {IF ~assertion THEN SIGNAL AssertionFailed; };
    -- So we can Proceed (in debugger) from failure.

  CheckState: PUBLIC PROC [doPrinting, printOnlyLockedPages: BOOL] = {
    nOccupiedItems, checkOccupiedItems: CARDINAL ← 0;
    nLockedItems: CARDINAL ← 0;
    nFreeItems: CARDINAL ← 0;

    PageHeader: TYPE = MACHINE DEPENDENT RECORD[
      pageTag: CARDINAL [0..256),
      dontCare: CARDINAL [0..256)];
    IOref: PROC[p: REF ANY] RETURNS[IO.Value] = INLINE {
      RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]};
    IOptr: PROC[p: LONG POINTER] RETURNS[IO.Value] = INLINE {
      RETURN[IO.card[LOOPHOLE[p, LONG CARDINAL]]]};

    out: IO.Handle ← IF doPrinting THEN DBCommon.GetDebugStream[] ELSE NIL;

    CheckEntry1: PROC[p: CacheHandle] = {
      Assert[p.pred.succ = p];
      Assert[p.succ.pred = p];
      Assert[p.pagePtr # NIL];
      IF ~p.occupied THEN {
        nFreeItems ← nFreeItems + 1;
        Assert[p.dbPage = invalidKey];
        RETURN };
      Assert[p.dbPage # invalidKey];
      nOccupiedItems ← nOccupiedItems + 1;
      IF p.lockCount > 0 THEN nLockedItems ← nLockedItems + 1;
      IF p.written THEN Assert[NOT p.readonly];
      IF ~doPrinting OR (p.lockCount = 0 AND printOnlyLockedPages) THEN RETURN;
      {pageTag: CARDINAL = LOOPHOLE[p.pagePtr, LONG POINTER TO PageHeader].pageTag;
        out.PutF["%11bB %11bB %11bB  %3d",
          IOref[p], IO.card[p.dbPage], IOptr[p.pagePtr], IO.card[pageTag]];
        out.PutF["    %3d      %g   %g*n", 
          IO.card[p.lockCount], IO.bool[p.written], IO.bool[p.readonly]];
       };
      };
    CheckEntry2: PROC[p: CacheHandle] = {
      Assert[p.occupied];
      checkOccupiedItems ← checkOccupiedItems + 1;
      };
    IF doPrinting THEN {
      out.PutF["Cache size %g pages.\n", IO.card[cacheSize]];
      out.PutRope[
        "    cacheHdl      dbPage      pagePtr    tag  lockCnt  written readonly\n"];
      };
    EnumerateLRUList[CheckEntry1];
    EnumerateHashTable[CheckEntry2];
    IF doPrinting THEN
      out.PutF["%g pages occupied, %g are locked\n",
        IO.card[nOccupiedItems], IO.card[nLockedItems]];
    Assert[nOccupiedItems = checkOccupiedItems];
    Assert[ht.nItems = checkOccupiedItems];
    Assert[nFreeItems + nOccupiedItems = cacheSize];
    };

END.--DBSegmentImpl


CHANGE LOG

Created by MBrown on December 10, 1979  5:09 PM

Changed by MBrown on December 1, 1982 9:41 am
-- Total rewrite for multiple segments, merging in DBCacheImpl.  No more support for module
--finalization.  Discarded >3 pages of change log.

Changed by Cattell on January 16, 1983 2:35 pm
-- Added some more meaningful signals for various client errors for segments and transactions,
--in conjunction with new DBEnvironment interface replacing DBException.

Changed by Cattell on January 17, 1983 8:21 pm
-- AttachSegment didn't put readOnly and version into fileNode when segment re-declared, so couldn't re-open transaction with version to create new file.

Changed by Cattell on January 19, 1983 10:57 am
-- The code didn't check for FindFile returning NIL in a couple places, resulting in address faults if the segment was not declared.  Put in checks and signal generation.

Changed by Willie-Sue on February 3, 1983
-- Added noLog arg to OpenTransaction and OpenSegment

Changed by MBrown on February 25, 1983 1:39 pm
-- Fixed bug in ReadPage, WritePage, NewPage: buffer left in "occupied" state after
--error (e.g. attempt to read after transaction abort).

Changed by Cattell & Donahue on May 19, 1983 2:26 pm
-- Changed AttachSegment so can re-call it with new file name for same segment atom.