-- LogBasicCoreImpl.mesa
-- Implements stream log reading, log tail management, random log reading.
-- Last edited by
--   MBrown on January 30, 1984 5:50:24 pm PST

  DIRECTORY
    AlpineEnvironment,
    AlpineInternal,
    AlpineInline,
    FileMap,
    FilePageMgr,
    Log,
    LogBasic,
    LogInline,
    LogBasicInternal,
    LogRep,
    PrincOpsUtils,
    Process,
    RuntimeError;

LogBasicCoreImpl: MONITOR
  IMPORTS
    AlpineInline,
    FileMap,
    FilePageMgr,
    Log,
    LogInline,
    PrincOpsUtils,
    Process,
    RuntimeError
  EXPORTS
    LogBasic,
    LogBasicInternal =
  BEGIN
  PageCount: TYPE = AlpineEnvironment.PageCount;
  PageNumber: TYPE = AlpineEnvironment.PageNumber;
  PageRun: TYPE = AlpineEnvironment.PageRun;
  FileID: TYPE = AlpineEnvironment.FileID;
  VolumeID: TYPE = AlpineEnvironment.VolumeID;
  WordNumber: TYPE = LogBasic.WordNumber;
  WordCount: TYPE = LogBasic.WordCount;
  RecordID: TYPE = Log.RecordID;
  nullRecordID: RecordID = Log.nullRecordID;
  Comparison: TYPE = Log.Comparison;
  wordsPerPage: CARDINAL = AlpineEnvironment.wordsPerPage;

  CallerProgrammingError: ERROR = CODE;
  InternalProgrammingError: ERROR = CODE;

  Header: PROC [p: LONG POINTER] RETURNS [LONG POINTER TO LogRep.Header] = INLINE {
    RETURN [LOOPHOLE[p]] };

  -- Static log file state

  logFileHandle: FileMap.Handle;
  logFileSize: PageCount;
  logFileLength: WordCount;
  logChunkSize: INT = 4;


  PageRunStartingWith: PROC [p: PageNumber] RETURNS [pr: PageRun] = INLINE {
    RETURN [[firstPage: p, count: MIN[logFileSize-p, logChunkSize]]] };

  PageFollowing: PROC [pr: PageRun] RETURNS [p: PageNumber] = INLINE {
    p ← pr.firstPage+pr.count;
    RETURN [IF p = logFileSize THEN 0 ELSE p] };

  IsMemberOfPageRun: PROC [p: PageNumber, pr: PageRun] RETURNS [BOOL] = INLINE {
    RETURN [p IN [pr.firstPage .. pr.firstPage+pr.count)] };

  EstablishLogFile: PUBLIC PROC [
    logVolume: AlpineEnvironment.VolumeID, logFile: AlpineEnvironment.FileID] = {
    -- LogBasic.EstablishLogFile
    logFileHandle ← FileMap.Register[volumeID: logVolume, fileID: logFile];
    logFileSize ← FilePageMgr.GetSize[logFileHandle];
    logFileLength ← AlpineInline.WordsFromPages[logFileSize];
    };

  LogFileSize: PUBLIC PROC [] RETURNS [PageCount] = {
    RETURN[logFileSize];
    };

  -- Log tail.

  -- This data structure keeps status information about log chunks near the log tail. 
  --Specifically, any chunk that may be dirty but which has not yet been forced out
  --must be present in the table.  The table is managed as a queue which always has
  --at least one element in it.  When any chunk completes a forceout, its slot in the
  --table is marked free.  A slot is reused if it is free and corresponds to the earliest
  --log page in the table.

  -- Types
  ChunkTableSize: CARDINAL = 4;
  ChunkTableIndex: TYPE = [0..ChunkTableSize);
  ChunkTableRec: TYPE = RECORD [
    chunk: FilePageMgr.VMPageSet ← FilePageMgr.nullVMPageSet,
      -- Contains a PageRun (PageNumber, Count) and the LONG POINTER to the mapped pages.
    recordID: Log.RecordID ← Log.nullRecordID,
      -- ID of first word of first page of chunk.
    forceInProgress: BOOL ← FALSE
      -- Set TRUE before forking process to force final update to the chunk; set FALSE when
      --this process finishes.  So if chunk is not the tail and NOT forceInProgress, it is eligible
      --for re-use.
    ];
  NextChunkTableIndex: PROC [i: ChunkTableIndex] RETURNS [ChunkTableIndex] = INLINE {
    RETURN [IF i = LAST[ChunkTableIndex] THEN FIRST[ChunkTableIndex] ELSE SUCC[i]] };

  -- State
  -- Variables below are only updated by AdvanceChunk, except that the TRUE -> FALSE
  --transition of forceInProgress for a chunk is made by ForceChunk
  --(forked by AdvanceChunk), and oldestFilePage is updated by Release.
  chunkTable: ARRAY ChunkTableIndex OF ChunkTableRec;
  tail: ChunkTableIndex;
  completion: CONDITION;
    -- Wait here for a forceInProgress = FALSE in chunkTable.
  curVersion: LogRep.PageVersion;
    -- Version bit for pages in tail chunk
  nextTailChunkID: Log.RecordID;
    -- RecordID of first word of first page of NEXT tail chunk.  Used to bounds-check RecordIDs
    --presented to Force and Get.  Redundant; can be computed from chunkTable[tail] .
  oldestFilePage: PageNumber;
    -- The least-recently written page of the log file that cannot be overwritten.
  enforceBufferPages: BOOL;
  nBufferPages: INT = 32;
    -- If enforceBufferPages, then do not allow writing within nBufferPages pages
    --of oldestFilePage

  -- Operations on the log tail.

  AdvanceChunk: PUBLIC ENTRY PROC [] RETURNS [
    version: LogRep.PageVersion, pagesInChunk: INT, firstPagePtr: LONG POINTER] = {
    -- LogBasicInternal.AdvanceChunk
    -- Caller must hold the log tail monitor; hence only one caller at a time.
    newTail: ChunkTableIndex;
    -- Fork a process to write the current tail chunk.
    chunkTable[tail].forceInProgress ← TRUE;
    Process.Detach[FORK ForceChunk[index: tail]];
    -- Wait, if necessary, for room in the chunk table to allocate the next tail chunk.
    DO
      newTail ← NextChunkTableIndex[tail];
      IF NOT chunkTable[newTail].forceInProgress THEN EXIT;
      WAIT completion;
      ENDLOOP;
    IF chunkTable[newTail].chunk.pages # NIL THEN
      FilePageMgr.ReleaseVMPageSet[
        vMPageSet: chunkTable[newTail].chunk, releaseState: clean, keep: TRUE];
    {
    -- Determine the page number of the first page, and the number of pages, in
    --the next chunk.
    nextPage: PageNumber ← PageFollowing[chunkTable[tail].chunk.pageRun];
    IF nextPage = 0 THEN curVersion ← 1 - curVersion;
    -- Get new chunk, update chunk table
    tail ← newTail;
    chunkTable[tail].recordID ← nextTailChunkID;
    chunkTable[tail].chunk ← FilePageMgr.UseLogPages[
      fileHandle: logFileHandle, pageRun: PageRunStartingWith[nextPage]];
    { -- hack to avoid completely filling up log file
      biasedOldestFilePage: PageNumber ← oldestFilePage - nBufferPages;
      IF biasedOldestFilePage < 0 THEN
        biasedOldestFilePage ← biasedOldestFilePage + logFileSize;
      IF (enforceBufferPages AND
        IsMemberOfPageRun[biasedOldestFilePage, chunkTable[tail].chunk.pageRun]) OR
        IsMemberOfPageRun[oldestFilePage, chunkTable[tail].chunk.pageRun]
        THEN ERROR Log.WriteFailed;
      };
    nextTailChunkID ← LogInline.AddC[r: nextTailChunkID,
      words: AlpineInline.ShortWordsFromPages[chunkTable[tail].chunk.pageRun.count]];
    };
    RETURN [
      version: curVersion,
      pagesInChunk: chunkTable[tail].chunk.pageRun.count,
      firstPagePtr: chunkTable[tail].chunk.pages];
    };

  ForceChunk: PROC [index: ChunkTableIndex] = {
    -- On entry, chunkTable[index].forceInProgress.
    -- Force the chunk, and set forceInProgress ← false.
    -- Note: do NOT hold monitor during force out.
    BroadcastCompletion: ENTRY PROC [] =  INLINE {
      chunkTable[index].forceInProgress ← FALSE;
      BROADCAST completion;
      };
    FilePageMgr.ForceOutVMPageSet[chunkTable[index].chunk];
    BroadcastCompletion[];
    };

  ForceTo: PUBLIC PROC [followingRecord: RecordID] = {
    -- LogBasicInternal.ForceTo
    -- If the word before followingRecord is in the current tail chunk, force that chunk
    --(using this process).  Then wait for other chunks that precede followingRecord to
    --go out.
    TailForceRequired: ENTRY PROC []
      RETURNS [doForce: BOOL, chunk: FilePageMgr.VMPageSet] = {
      -- followingRecord must not be later than page about to be written.
      IF Log.Compare[followingRecord, nextTailChunkID] = greater THEN
        RETURN WITH ERROR CallerProgrammingError;
      IF Log.Compare[chunkTable[tail].recordID, followingRecord] = less THEN {
        FilePageMgr.ShareVMPageSet[chunkTable[tail].chunk];
        RETURN [TRUE, chunkTable[tail].chunk] }
      ELSE RETURN [FALSE, FilePageMgr.nullVMPageSet]
      };
    AwaitForce: ENTRY PROC [] = {
      -- Return when all chunks that precede followingRecord are out.
      FOR currentChunk: ChunkTableIndex IN ChunkTableIndex DO
        IF Log.Compare[chunkTable[currentChunk].recordID, followingRecord] = less THEN
          UNTIL NOT chunkTable[currentChunk].forceInProgress DO
            WAIT completion;
            ENDLOOP;
        ENDLOOP;
      };
    doForce: BOOL;  chunk: FilePageMgr.VMPageSet;
    [doForce, chunk] ← TailForceRequired[];
    IF doForce THEN {
      FilePageMgr.ForceOutVMPageSet[vMPageSet: chunk];
      FilePageMgr.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: TRUE] };
    AwaitForce[];
    };

  OpenCoreForPut: PUBLIC ENTRY PROC [
    nextPage: PageNumber, version: LogRep.PageVersion, nextRecord: RecordID]
    RETURNS [pagesInChunk: INT, firstPagePtr: LONG POINTER] = {
    -- LogBasicInternal.OpenCoreForPut
    enforceBufferPages ← FALSE;
    curVersion ← version;
    tail ← FIRST[ChunkTableIndex];
    chunkTable[tail].recordID ← nextRecord;
    chunkTable[tail].chunk ← FilePageMgr.UseLogPages[
      fileHandle: logFileHandle, pageRun: PageRunStartingWith[nextPage]];
    nextTailChunkID ← LogInline.AddC[r: chunkTable[tail].recordID,
      words: AlpineInline.ShortWordsFromPages[chunkTable[tail].chunk.pageRun.count]];
    RETURN [
      pagesInChunk: chunkTable[tail].chunk.pageRun.count, 
      firstPagePtr: chunkTable[tail].chunk.pages];
    };

  AssertNormalOperation: PUBLIC ENTRY PROC [] = {
    enforceBufferPages ← TRUE;
    };

  CloseCoreForPut: PUBLIC ENTRY PROC [] = {
    -- LogBasicInternal.CloseCoreForPut
    FOR i: ChunkTableIndex IN ChunkTableIndex DO
      WHILE chunkTable[i].forceInProgress DO
        WAIT completion;
        ENDLOOP;
      FilePageMgr.ForceOutVMPageSet[chunkTable[i].chunk];
      FilePageMgr.ReleaseVMPageSet[
        vMPageSet: chunkTable[i].chunk, releaseState: clean, keep: FALSE];
      chunkTable[i] ← [];
      ENDLOOP;
    };

  WordNumberFromRecordID: PUBLIC ENTRY PROC [thisRecord: RecordID]
    RETURNS [result: WordNumber] = {
    -- LogBasic.WordNumberFromRecordID
    offsetFromNextTailChunk: WordCount;
    IF Log.Compare[thisRecord, nextTailChunkID] = greater THEN GOTO tooLarge;
    offsetFromNextTailChunk ← LogInline.WordsFromSubtract[
      larger: nextTailChunkID, smaller: thisRecord ! RuntimeError.BoundsFault => GOTO tooSmall];
    IF offsetFromNextTailChunk >= logFileLength THEN GOTO tooSmall;
    result ← AlpineInline.WordsFromPages[
      pages: PageFollowing[chunkTable[tail].chunk.pageRun]] - offsetFromNextTailChunk;
    RETURN [IF result < 0 THEN result + logFileLength ELSE result];
    EXITS
      tooLarge => ERROR CallerProgrammingError;
      tooSmall => ERROR CallerProgrammingError;
    };

  VerifyGet: ENTRY PROC [page: PageCount, pagePtr: LONG POINTER] = {
    -- Called from Get if it encounters a page marked invalid.
    IF Header[pagePtr].valid THEN RETURN;
    IF IsMemberOfPageRun[page, chunkTable[tail].chunk.pageRun] THEN RETURN;
    ERROR; 
    };

  Release: PUBLIC PROC [beforeRecord: RecordID] = {
    -- LogBasic.Release
    AssignToOldestFilePage: ENTRY PROC [newOldestFilePage: PageNumber] = INLINE {
      oldestFilePage ← newOldestFilePage };
    AssignToOldestFilePage[AlpineInline.PagesFromWords[
      WordNumberFromRecordID[beforeRecord]]];
    };

  -- A readonly page stream on the log.
  -- This stream has a limited capability for backup: it can save one page position that
  --MUST be returned to later.
  -- End of log is determined at the page level.

  nChunksReadAhead: CARDINAL = 4;

  PageStreamObject: TYPE = RECORD [
    chunk: FilePageMgr.VMPageSet ← FilePageMgr.nullVMPageSet,
    savePageRun: PageRun,
    pageInPageRun, savePageInPageRun: CARDINAL,
    expectedVersion: LogRep.PageVersion,
    pagePtr: LONG POINTER,
    posSaved: BOOL,
    nextReadAheadPage: PageNumber ];
  pageStream: PageStreamObject;

  OpenPageStream: PROC [p: PageNumber] RETURNS [endOfLog: BOOL] = {
    -- The initial stream position is NOT saved.
    pageStream.chunk ←
      FilePageMgr.ReadLogPages[
        fileHandle: logFileHandle, pageRun: PageRunStartingWith[p]];
    pageStream.pagePtr ← pageStream.chunk.pages;
    pageStream.pageInPageRun ← 0;
    pageStream.nextReadAheadPage ← PageFollowing[pageStream.chunk.pageRun];
    THROUGH [1..nChunksReadAhead] DO ReadAhead[] ENDLOOP;
    IF NOT Header[pageStream.pagePtr].valid THEN RETURN [TRUE];
    pageStream.expectedVersion ← Header[pageStream.pagePtr].version;
    pageStream.posSaved ← FALSE;
    RETURN [FALSE];
    };

  ClosePageStream: PROC [] = {
    FilePageMgr.ReleaseVMPageSet[
      vMPageSet: pageStream.chunk, releaseState: clean, keep: FALSE];
    pageStream.chunk ← FilePageMgr.nullVMPageSet;
    };

  ReadAhead: PROC [] = {
    FilePageMgr.ReadAheadLogPages[
      logFileHandle, PageRunStartingWith[pageStream.nextReadAheadPage]];
    pageStream.nextReadAheadPage ←
      IF pageStream.nextReadAheadPage < logFileSize - logChunkSize THEN
        pageStream.nextReadAheadPage + logChunkSize
      ELSE 0;
    };
    
  SavePageStreamPos: PROC [] = {
    pageStream.savePageRun ← pageStream.chunk.pageRun;
    pageStream.savePageInPageRun ← pageStream.pageInPageRun;
    pageStream.posSaved ← TRUE;
    };

  CurrentPageNumber: PROC [] RETURNS [PageNumber] = INLINE {
    RETURN [pageStream.chunk.pageRun.firstPage + pageStream.pageInPageRun];
    };

  ForgetSavedPageStreamPos: PROC [] = {
    -- It is only valid to forget when positioned at the saved position.
    IF NOT pageStream.posSaved THEN RETURN;
    IF pageStream.savePageRun # pageStream.chunk.pageRun THEN
      ERROR InternalProgrammingError;
    pageStream.posSaved ← FALSE;
    };

  AdvancePageStream: PROC [] RETURNS [endOfLog: BOOL] = {
    pageStream.pageInPageRun ← pageStream.pageInPageRun+1;
    IF pageStream.pageInPageRun = pageStream.chunk.pageRun.count THEN {
      p: PageNumber;
      FilePageMgr.ReleaseVMPageSet[
        vMPageSet: pageStream.chunk, releaseState: clean, keep: pageStream.posSaved];
      p ← PageFollowing[pageStream.chunk.pageRun];
      IF p = 0 THEN pageStream.expectedVersion ← 1 - pageStream.expectedVersion;
      pageStream.chunk ←
        FilePageMgr.ReadLogPages[fileHandle: logFileHandle, pageRun: PageRunStartingWith[p]];
      pageStream.pagePtr ← pageStream.chunk.pages;
      pageStream.pageInPageRun ← 0;
      IF NOT pageStream.posSaved THEN ReadAhead[];
      }
    ELSE
      pageStream.pagePtr ← pageStream.pagePtr + wordsPerPage;
    RETURN[ (NOT Header[pageStream.pagePtr].valid) OR
      (pageStream.expectedVersion # Header[pageStream.pagePtr].version) ];
    };

  SetPageStreamPosFromSavedPos: PROC [] = {
    IF NOT pageStream.posSaved THEN ERROR InternalProgrammingError;
    IF pageStream.chunk.pageRun # pageStream.savePageRun THEN {
      FilePageMgr.ReleaseVMPageSet[vMPageSet: pageStream.chunk, releaseState: clean, keep: TRUE];
      pageStream.chunk ←
        FilePageMgr.ReadLogPages[fileHandle: logFileHandle, pageRun: pageStream.savePageRun];
      pageStream.expectedVersion ←
        Header[pageStream.pagePtr].version;
      };
    pageStream.pageInPageRun ← pageStream.savePageInPageRun;
    pageStream.pagePtr ← pageStream.chunk.pages +
      AlpineInline.ShortWordsFromPages[pageStream.pageInPageRun];
    };

  -- A readonly log record stream.

  -- Log reading during recovery is significantly different than log reading for carrying out intentions.  In the case of recovery, we expect to encounter the log end while reading; in carrying out intentions we expect all log records to be perfectly formed.  Also, log reading for recovery is sequential, while log reading for intentions is more random.  In the case of recovery we need the ability to first "peek" and then read; for intentions, reading is enough.

  RecordStreamObject: TYPE = RECORD [
    id: RecordID
      -- of current log record, whose first word is on the current page stream page.
    ];
  recordStream: RecordStreamObject;

  OpenRecordStreamFromWord: PUBLIC PROC [firstWord: WordNumber]
    RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = {
    notStartOfRecord ← OpenRecordStream[firstWord: firstWord,
      firstRecord: LogInline.RecordIDFromWordNumber[firstWord]];
    RETURN [notStartOfRecord, recordStream.id];
    };

  OpenRecordStreamFromCheckpoint: PUBLIC PROC [
    checkpointWord: WordNumber, checkpointRecord: RecordID, firstRecord: RecordID]
    RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = {
    offsetFromCheckpoint, firstWord: WordCount;
    IF Log.Compare[checkpointRecord, firstRecord] = less THEN
      ERROR CallerProgrammingError;
    offsetFromCheckpoint ← LogInline.WordsFromSubtract[
      larger: checkpointRecord, smaller: firstRecord ! RuntimeError.BoundsFault => GOTO tooSmall];
    IF offsetFromCheckpoint >= logFileLength THEN GOTO tooSmall;
    firstWord ← checkpointWord - offsetFromCheckpoint;
    IF firstWord < 0 THEN firstWord ← firstWord + logFileLength;
    notStartOfRecord ← OpenRecordStream[firstWord: firstWord, firstRecord: firstRecord];
    RETURN [notStartOfRecord, firstRecord];
    EXITS
      tooSmall => ERROR CallerProgrammingError;
    };

  OpenRecordStream: PROC [firstWord: WordNumber, firstRecord: RecordID]
    RETURNS [notStartOfRecord: BOOL] = {
    p: PageNumber;
    w: CARDINAL [0..wordsPerPage);
    [page: p, wordInPage: w] ← AlpineInline.DecomposeWords[firstWord];
    IF LogInline.WordInPageFromRecordID[firstRecord] # w THEN
      ERROR CallerProgrammingError;
    IF OpenPageStream[p].endOfLog THEN GOTO invalidPage;
    SavePageStreamPos[];
    recordStream ← [id: firstRecord];
    RETURN [VerifyCurrentRecordID[].notStartOfRecord];
    EXITS
      invalidPage => ERROR InvalidPage;
    };

  InvalidPage: PUBLIC ERROR = CODE;

  CurrentWordNumber: PROC [] RETURNS [WordNumber] = {
    RETURN [AlpineInline.WordsFromPages[CurrentPageNumber[]] +
      LogInline.WordInPageFromRecordID[recordStream.id]];
    };

  CloseRecordStream: PUBLIC PROC [] = {
    ClosePageStream[];
    };

  VerifyCurrentRecordID: PROC [] RETURNS [notStartOfRecord: BOOL] = {
    wordInPage: CARDINAL ← LogInline.WordInPageFromRecordID[recordStream.id];
    { w: CARDINAL ← 0;
      UNTIL w >= wordInPage DO
        w ← w + Header[pageStream.pagePtr+w].nWords;
        ENDLOOP;
      IF w > wordInPage THEN RETURN [TRUE];
      };
    RETURN [Header[pageStream.pagePtr+wordInPage].isContinuation];
    };

  CheckCurrentRecord: PUBLIC PROC [] RETURNS [truncated: BOOL] = {
    wordInPage: CARDINAL ← LogInline.WordInPageFromRecordID[recordStream.id];
    IF VerifyCurrentRecordID[].notStartOfRecord THEN ERROR InternalProgrammingError;
    IF Header[pageStream.pagePtr+wordInPage].hasContinuation THEN {
      IF (Header[pageStream.pagePtr+wordInPage].nWords + wordInPage) # wordsPerPage THEN
        GOTO checkFailed;
      DO
        IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord;
        IF NOT Header[pageStream.pagePtr].isContinuation
          THEN GOTO truncatedRecord;
        IF NOT Header[pageStream.pagePtr].hasContinuation THEN EXIT;
        IF Header[pageStream.pagePtr].nWords # wordsPerPage THEN GOTO checkFailed;
        ENDLOOP;
      IF Header[pageStream.pagePtr].nWords NOT IN [1 .. wordsPerPage] THEN
        GOTO checkFailed;
      }
    ELSE {
      IF Header[pageStream.pagePtr+wordInPage].nWords
        NOT IN [1 .. wordsPerPage-wordInPage] THEN GOTO checkFailed;
      };
    SetPageStreamPosFromSavedPos[];
    RETURN [FALSE];
    EXITS
      checkFailed => { SetPageStreamPosFromSavedPos[];  ERROR InternalProgrammingError };
      truncatedRecord => { SetPageStreamPosFromSavedPos[];  RETURN [TRUE] };
    }; 

  GetCurrentRecord: PUBLIC PROC [currentRecord: RecordID, to: Log.Block]
    RETURNS [status: Log.ReadProcStatus, wordsRead: CARDINAL] = {
    wordInPage: CARDINAL ← LogInline.WordInPageFromRecordID[recordStream.id];
    wordsRead ← 0;
    IF LogInline.Compare[currentRecord, recordStream.id] # equal THEN
      ERROR CallerProgrammingError;
    DO
      -- Read one log block from this page.
      wordsThisBlock, wordsThisCopy: CARDINAL;
      hasContinuation, isContinuation: BOOL;
      -- Read the header for the log block.
      [hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] ←
        Header[pageStream.pagePtr+wordInPage]↑;
      IF wordsRead > 0 AND NOT isContinuation THEN GOTO truncatedRecord;
      wordsThisBlock ← wordsThisBlock-SIZE[LogRep.Header];
      wordInPage ← wordInPage+SIZE[LogRep.Header];
      DO
        -- Copy a run of words to "to".
        wordsThisCopy ← MIN[to.length, wordsThisBlock];
        IF to.base # NIL THEN {
          PrincOpsUtils.LongCopy[
            to: to.base, from: pageStream.pagePtr+wordInPage, nwords: wordsThisCopy];
          to.base ← to.base+wordsThisCopy;
          };
        wordsRead ← wordsRead+wordsThisCopy;
        -- Now what?
        IF wordsThisCopy # to.length THEN {
          -- to.length > wordsThisCopy = wordsThisBlock; go read next log block.
          to.length ← to.length-wordsThisCopy;
          IF NOT hasContinuation THEN GOTO sourceExhausted;
          IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord;
          wordInPage ← 0;
          GOTO nextBlock;
          }
        ELSE IF to.rest # NIL THEN {
          -- wordsThisCopy = to.length AND to.rest # NIL, so not done copying.
          to ← to.rest↑;
          wordsThisBlock ← wordsThisBlock-wordsThisCopy;
          wordInPage ← wordInPage + wordsThisCopy;
          }
        ELSE {
          -- wordsThisCopy = to.length AND to.rest = NIL, so destination full; return.
          SetPageStreamPosFromSavedPos[];
          RETURN [
            IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal
            ELSE destinationFull, wordsRead]
          }
        REPEAT
          nextBlock => NULL;
        ENDLOOP;
      ENDLOOP;
    EXITS
      sourceExhausted => {
        SetPageStreamPosFromSavedPos[];
        RETURN [sourceExhausted, wordsRead]
        };
      truncatedRecord => {
        SetPageStreamPosFromSavedPos[];
        ERROR CallerProgrammingError;
        };
    };

  GetCurrentPageAndVersion: PUBLIC PROC []
    RETURNS [PageNumber, LogRep.PageVersion] = {
    RETURN[CurrentPageNumber[], pageStream.expectedVersion];
    };

  AdvanceRecordStream: PUBLIC PROC []
    RETURNS [endOfLog, truncatedRecord: BOOL, currentRecord: RecordID] = {
    -- It is ok to call Advance when recordStream is in a bad state, e.g. not pointing to the start
    --of a record.  It should be pointing to the start of a block, however.  Note that
    --wordInPage = 0 is sufficient.
    wordsInRecord: CARDINAL ← 0;
    endOfLog ← truncatedRecord ← FALSE;
    { -- block for EXITS
    wordInPage: CARDINAL ← LogInline.WordInPageFromRecordID[recordStream.id];
    ForgetSavedPageStreamPos[];
    WHILE Header[pageStream.pagePtr+wordInPage].hasContinuation DO
      wordsInRecord ← wordsInRecord + Header[pageStream.pagePtr+wordInPage].nWords;
      wordInPage ← 0;
      IF AdvancePageStream[].endOfLog THEN {
        endOfLog ← truncatedRecord ← TRUE;  GOTO return };
      IF NOT Header[pageStream.pagePtr].isContinuation THEN {
        truncatedRecord ← TRUE;  GOTO return };
      ENDLOOP;
    { wordsInLastBlock: CARDINAL ← Header[pageStream.pagePtr+wordInPage].nWords;
    wordInPage ← wordInPage + wordsInLastBlock;
    wordsInRecord ← wordsInRecord + wordsInLastBlock };
    IF wordInPage < wordsPerPage AND
      Header[pageStream.pagePtr+wordInPage].nWords = 0 THEN {
      wordsInRecord ← wordsInRecord + (wordsPerPage-wordInPage);
      wordInPage ← wordsPerPage;
      };
    IF wordInPage = wordsPerPage AND AdvancePageStream[].endOfLog THEN {
      endOfLog ← TRUE;
      };
    GOTO return
    EXITS return => {
      SavePageStreamPos[];
      recordStream.id ← LogInline.AddC[recordStream.id, wordsInRecord];
      RETURN[endOfLog, truncatedRecord, recordStream.id] };
    }};

  -- Recovering from loss of the restart record.

  -- This code checks several invariants of the log that are not checked when the restart record is used.  In particular, a log with more than one "end" will be discovered by this code, but not when the restart record is used.  So it may be useful to run this code during debugging of the system.

  nTransitionPages: INT = 10 * logChunkSize;

  LocateFirstRecord: PUBLIC PROC [] RETURNS [firstRecord: WordNumber] = {
    notStartOfRecord: BOOL;
    endOfLog: BOOL;
    [notStartOfRecord: notStartOfRecord] ← OpenRecordStreamFromWord[
      AlpineInline.WordsFromPages[LocateStartingPage[]]];
    IF notStartOfRecord THEN [endOfLog: endOfLog] ← AdvanceRecordStream[];
    IF endOfLog THEN ERROR InternalProgrammingError;
    firstRecord ← CurrentWordNumber[];
    CloseRecordStream[];
    RETURN [firstRecord];
    };

  LocateStartingPage: PROC [] RETURNS [PageNumber] = {
    -- Scans the log once.
    SkipAhead: PROC [] = {
      -- A scan is in progress, but EndOfLog has been seen.
      -- Move forward nTransitionPages pages.
      nextPage: PageNumber = CurrentPageNumber[] + nTransitionPages;
      ClosePageStream[];
      IF OpenPageStream[nextPage].endOfLog THEN ERROR InternalProgrammingError;
      };
    startPage, beginPage: PageNumber;
    { -- Read to find a page in a "clean" area.
    IF OpenPageStream[0].endOfLog THEN GOTO skip;
    THROUGH [0 .. nTransitionPages) DO
      IF AdvancePageStream[].endOfLog THEN GOTO skip;
      ENDLOOP;
    EXITS
      skip => SkipAhead[];
    };
    startPage ← CurrentPageNumber[];
    -- Next EndOfLog really IS the end of the log!
    DO  IF AdvancePageStream[].endOfLog THEN EXIT  ENDLOOP;
    -- Skip past any junk that might follow the EndOfLog, then save the page number.
    SkipAhead[];
    beginPage ← CurrentPageNumber[];
    -- Now read back to startPage.  Should see no more EndOfLogs.
    UNTIL CurrentPageNumber[] = startPage DO
      IF AdvancePageStream[].endOfLog THEN ERROR InternalProgrammingError;
      ENDLOOP;
    ClosePageStream[];
    RETURN [beginPage];
    };


  -- Random log reading.


  Get: PUBLIC PROC [thisRecord: Log.RecordID, to: Log.Block]
    RETURNS [status: Log.ReadProcStatus, wordsRead: CARDINAL] = {
    -- LogBasic.Get
    -- Enable UNWIND => ReleaseChunk[]

    chunk: FilePageMgr.VMPageSet;
    pagePtr: LONG POINTER;
    pagesLeftInChunk: CARDINAL;

    GetChunk: PROC [nextPage: PageNumber]  = {
      chunk ←
        FilePageMgr.ReadLogPages[
          fileHandle: logFileHandle, pageRun: PageRunStartingWith[nextPage]];
      pagePtr ← chunk.pages;
      pagesLeftInChunk ← chunk.pageRun.count - 1;
      };
  
    ReleaseChunk: PROC [] = INLINE {
      FilePageMgr.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: FALSE];
      };

    AdvancePage: PROC [] = INLINE {
      IF pagesLeftInChunk = 0 THEN {
        nextPage: PageNumber ← PageFollowing[chunk.pageRun];
        ReleaseChunk[];
        IF nextPage = 0 THEN pageVersion ← 1-pageVersion;
        GetChunk[nextPage];
        }
      ELSE {
        pagePtr ← pagePtr + AlpineEnvironment.wordsPerPage;
        pagesLeftInChunk ← pagesLeftInChunk - 1;
        };
      };

    isFirstPage: BOOL ← TRUE;
    pageVersion: LogRep.PageVersion ← 0;
    wordInPage: CARDINAL;

    Verify: PROC [] = {
      -- Ensure that wordInPage points to a LogRep.Header, assuming that page
      --is properly formatted.
      w: CARDINAL ← 0;
      UNTIL w = wordInPage DO
        w ← w + Header[pagePtr+w].nWords;
        IF w > wordInPage THEN ERROR InternalProgrammingError;
        ENDLOOP;
      };

      {
      firstPage: PageNumber;
      [page: firstPage, wordInPage: wordInPage] ← AlpineInline.DecomposeWords[
        WordNumberFromRecordID[thisRecord]];
      GetChunk[firstPage];
      };
    Verify[];
    wordsRead ← 0;
    DO
      -- Read one log block from this page.
      wordsThisBlock, wordsThisCopy: CARDINAL;
      hasContinuation, isContinuation: BOOL;
      version: LogRep.PageVersion;
      -- Read valid bit from the header for first block on this page.
      IF NOT Header[pagePtr].valid THEN
        VerifyGet[
          chunk.pageRun.firstPage+(chunk.pageRun.count-(1+pagesLeftInChunk)), pagePtr];
      -- Read the header for this log block.
      [version: version, hasContinuation: hasContinuation, isContinuation: isContinuation,
       nWords: wordsThisBlock] ← Header[pagePtr+wordInPage]↑;
      wordsThisBlock ← wordsThisBlock-LogRep.Header.SIZE;
      wordInPage ← wordInPage+LogRep.Header.SIZE;
      IF isFirstPage THEN {
        IF isContinuation THEN ERROR InternalProgrammingError;
        pageVersion ← version;
        isFirstPage ← FALSE }
      ELSE {
        IF NOT isContinuation OR (version # pageVersion) THEN
          ERROR InternalProgrammingError
        };
      DO
        -- Copy a run of words to "to".
        wordsThisCopy ← MIN[to.length, wordsThisBlock];
        IF to.base # NIL THEN {
          PrincOpsUtils.LongCopy[
            to: to.base, from: pagePtr+wordInPage, nwords: wordsThisCopy];
          to.base ← to.base+wordsThisCopy;
          };
        wordsRead ← wordsRead+wordsThisCopy;
        -- Now what?
        IF wordsThisCopy # to.length THEN {
          -- to.length > wordsThisCopy = wordsThisBlock; go read next log block.
          to.length ← to.length-wordsThisCopy;
          IF NOT hasContinuation THEN {
            ReleaseChunk[];
            RETURN[sourceExhausted, wordsRead]
            };
          AdvancePage[];
          wordInPage ← 0;
          GOTO nextBlock;
          }
        ELSE IF to.rest # NIL THEN {
          -- wordsThisCopy = to.length AND to.rest # NIL, so not done copying.
          to ← to.rest↑;
          wordsThisBlock ← wordsThisBlock-wordsThisCopy;
          wordInPage ← wordInPage + wordsThisCopy;
          }
        ELSE {
          -- wordsThisCopy = to.length AND to.rest = NIL, so destination full; return.
          ReleaseChunk[];
          RETURN [
            IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal
            ELSE destinationFull, wordsRead]
          }
        REPEAT
          nextBlock => NULL;
        ENDLOOP;
      ENDLOOP;
    };
  END.


OPEN PROBLEMS
Need a way to get the log chunk size from FilePageMgr (see logChunkSize constant above).

CHANGE LOG

Created by MBrown on June 15, 1982 3:38 pm
-- Introduce a separate monitor for "core" log data structures.

Changed by MBrown on September 10, 1982 9:45 pm
-- Bug: attempted to release an uninitialized chunk, in AdvanceChunk.
-- Now runs simple log read/write test.

Changed by MBrown on September 25, 1982 3:27 pm
-- First "complete" implementation of LogBasic.

Changed by MBrown on November 16, 1982 10:10 pm
-- Revised Get to be more closely analogous to GetCurrentRecord.

Changed by MBrown on April 3, 1983 4:17 pm
-- Test "checkpointRecord <= firstRecord" changed to checkpointRecord < firstRecord",
--since equality is now ok.

Changed by MBrown on June 27, 1983 9:56 am
-- Added AssertNormalOperation proc.  Now, if enforceBufferPages, log write
--fails if within nBufferPages pages of wrap-around (in AdvanceChunk).

Changed by MBrown on January 30, 1984 10:18:19 am PST
-- Cedar 5.0: use PrincOpsUtils.