-- LogImpl.mesa
-- Last edited by
--   MBrown on January 30, 1984 10:19:45 am PST

  DIRECTORY
    AlpineEnvironment,
    AlpineInline,
    AlpineInternal USING [],
    AlpineZones USING [static],
    Coordinator USING [Handle, Object],
    ConvertUnsafe,
    FilePageMgr,
    InitProcs,
    Log,
    LogBasic,
    LogControl,
    LogInline,
    LogRep,
    Process,
    RestartFile,
    Rope,
    SafeStorage USING [ReclaimCollectibleObjects],
    TransactionMap,
    Worker USING [Handle, Object];

LogImpl: MONITOR
  IMPORTS
    AlpineInline,
    AlpineZones,
    ConvertUnsafe,
    FilePageMgr,
    InitProcs,
    LogBasic,
    LogInline,
    LogRep,
    Process,
    RestartFile,
    Rope,
    SafeStorage,
    TransactionMap
  EXPORTS
    AlpineInternal,
    Log,
    LogControl =
  BEGIN
  VolumeID: TYPE = AlpineEnvironment.VolumeID;
  FileID: TYPE = AlpineEnvironment.FileID;
  FileStore: TYPE = AlpineEnvironment.FileStore;
  TransID: TYPE = AlpineEnvironment.TransID;
  nullTransID: TransID = AlpineEnvironment.nullTransID;
  PageNumber: TYPE = AlpineEnvironment.PageNumber;
  PageCount: TYPE = AlpineEnvironment.PageCount;
  WordNumber: TYPE = LogBasic.WordNumber;
  WordCount: TYPE = LogBasic.WordCount;
  RecordID: TYPE = Log.RecordID;
  nullRecordID: RecordID = Log.nullRecordID;
  Block: TYPE = Log.Block;
  RecordType: TYPE = Log.RecordType;
  wordsPerPage: CARDINAL = AlpineEnvironment.wordsPerPage;

  ClientProgrammingError: ERROR = CODE;
  InternalProgrammingError: ERROR = CODE;

  TransObject: PUBLIC TYPE = Worker.Object;
  CoordinatorObject: PUBLIC TYPE = Coordinator.Object;

  WriteFailed: PUBLIC ERROR = CODE;
    -- Log.WriteFailed

  Compare: PUBLIC PROC [a, b: RecordID] RETURNS [Log.Comparison] = {
    -- Log.Compare
    RETURN [LogInline.Compare[a, b]] };

  Format: PUBLIC PROC [
    logVolume: VolumeID, logFile, restartFile: FileID, myFileStore: FileStore] = {
    -- LogControl.Format
    logSize: PageCount;
    thisCheckpointBeginRecord: RecordID;
    LogBasic.EstablishLogFile[logVolume, logFile];
    logSize ← LogBasic.LogFileSize[];
    RestartFile.EstablishRestartFile[logVolume, restartFile];
    -- Write a noop record on each page of the log file.  A noop record is precisely
    --one page long.
    LogBasic.OpenForPut[nextPage: 0, version: 0, nextRecord: nullRecordID];
    LogBasic.Release[beforeRecord: nullRecordID];
    LogBasic.Release[beforeRecord: WriteNoopRecords[logSize/2].followingRecord];
    [] ← WriteNoopRecords[logSize - logSize/2];
    -- Write a CheckpointBegin record, then a CheckpointComplete record pointing
    --to it (and the restart record in the restart file).
    thisCheckpointBeginRecord ← LogBasic.RecordIDOfNextPut[];
    -- If all has gone well, the CheckpointBegin record was written as first record on
    --page 0 of the log file.
    IF LogBasic.WordNumberFromRecordID[thisCheckpointBeginRecord] # 0 THEN
      ERROR InternalProgrammingError;
    LogBasic.Release[beforeRecord: thisCheckpointBeginRecord];
    [] ← WriteCheckpointCompleteRecord[
      startAnalysisRecord: thisCheckpointBeginRecord,
      keepRecord: thisCheckpointBeginRecord,
      myFileStore: myFileStore];
    LogBasic.CloseForPut[];
    };

  WriteNoopRecords: PUBLIC PROC [nPages: PageCount]
    RETURNS [followingRecord: RecordID] = {
    -- This proc has a large frame.
    noopRecordBody: LogRep.NoopRecord ← [];
    FOR page: PageCount ← 0, page+1 UNTIL page = nPages DO
      [followingRecord: followingRecord] ← LogBasic.Put[from:
        [base: @noopRecordBody, length: SIZE[LogRep.NoopRecord]]];
      ENDLOOP;
    RETURN [followingRecord];
    };

  WriteCheckpointCompleteRecord: PROC [
    startAnalysisRecord, keepRecord: RecordID, myFileStore: FileStore]
    RETURNS [thisRecord: RecordID] = {
    -- This call waits both for a log force and for a restart file write before returning.
    checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord ←
      [startAnalysisRecordID: startAnalysisRecord, keepRecordID: keepRecord];
    ConvertUnsafe.AppendRope[
      to: LOOPHOLE[LONG[@(checkpointCompleteRecordBody.myFileStore)]], from: myFileStore];
    [thisRecord: thisRecord] ← LogBasic.Put[from: [base: @checkpointCompleteRecordBody,
      length: LogRep.CheckpointCompleteRecordSize[myFileStore.Length]],
      writeID: TRUE, force: TRUE];
    RestartFile.WriteRestartRecord[
      LogBasic.WordNumberFromRecordID[thisRecord], thisRecord];
    };

  Recover: PUBLIC PROC [logVolume: VolumeID, logFile, restartFile: FileID] = {
    myFileStore: FileStore;

    DiscoverWhereToStartAnalysisPass: PROC [] RETURNS [
      checkpointWord: WordNumber, checkpointRecord, startAnalysisRecord: RecordID] = {

      CheckpointCompleteRecordFromWord: PROC [checkpointWord: WordNumber] RETURNS [
        ok: BOOL, checkpointRecord, startAnalysisRecord: RecordID] = {
        -- Updates: Recover.myFileStore.
        -- Returns ok = FALSE if any error detected.
          {
          checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord;
          notStartOfRecord: BOOL;
          currentRecord: RecordID;
          [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
            LogBasic.OpenRecordStreamFromWord[checkpointWord ! LogBasic.InvalidPage =>
              GOTO failReturn ];
          IF notStartOfRecord THEN GOTO failReturn;
          IF LogBasic.CheckCurrentRecord[].truncated THEN GOTO failReturn;
          IF LogBasic.GetCurrentRecord[currentRecord, [base: @checkpointCompleteRecordBody,
            length: SIZE[LogRep.CheckpointCompleteRecord]]].status = destinationFull OR
            checkpointCompleteRecordBody.type # checkpointComplete THEN GOTO failReturn;
          myFileStore ← ConvertUnsafe.ToRope[
            from: LOOPHOLE[LONG[@checkpointCompleteRecordBody.myFileStore]]];
          checkpointRecord ← checkpointCompleteRecordBody.thisRecordID;
          startAnalysisRecord ← checkpointCompleteRecordBody.startAnalysisRecordID;
          ok ← TRUE;
          EXITS
            failReturn => { ok ← FALSE };
          };
        LogBasic.CloseRecordStream[];
        RETURN [ok, checkpointRecord, startAnalysisRecord];
        };--CheckpointCompleteRecordFromWord

      FindNewestCheckpointCompleteRecord: PROC [] RETURNS [WordNumber] = {
        -- Do a sequential scan of log, starting at LogBasic.LocateFirstRecord[].
        bigRecordID: RecordID = LogInline.RecordIDFromWordNumber[
          AlpineInline.WordsFromPages[LogBasic.LogFileSize[]]];
        firstWord: WordNumber = LogBasic.LocateFirstRecord[];
        checkpointRecord, currentRecord: RecordID;
        foundCheckpointRecord: BOOL ← FALSE;
        notStartOfRecord: BOOL;
        [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
          LogBasic.OpenRecordStreamFromWord[firstWord];
        IF notStartOfRecord THEN ERROR InternalProgrammingError;
        DO
          nextRecord: RecordID;
          endOfLog, truncatedRecord: BOOL;
          currentType: RecordType ← PeekCurrentType[currentRecord];
          [endOfLog, truncatedRecord, nextRecord] ← LogBasic.AdvanceRecordStream[];
          IF currentType = checkpointComplete AND NOT truncatedRecord THEN {
            checkpointRecord ← currentRecord;
            foundCheckpointRecord ← TRUE;
            };
          IF endOfLog THEN EXIT;
          currentRecord ← nextRecord;
          ENDLOOP;
        IF NOT foundCheckpointRecord THEN ERROR InternalProgrammingError;
        RETURN [LogInline.WordsFromSubtract[larger: checkpointRecord, smaller:
          IF Compare[checkpointRecord, bigRecordID] # less THEN bigRecordID
          ELSE nullRecordID]];
        };--FindNewestCheckpointCompleteRecord

      checkpointRecordFromRestartFile: RecordID;
      ok: BOOL;
      [checkpointWord, checkpointRecordFromRestartFile] ←
        RestartFile.ReadRestartRecord[];
      [ok, checkpointRecord, startAnalysisRecord] ←
        CheckpointCompleteRecordFromWord[checkpointWord];
      IF NOT ok OR checkpointRecord # checkpointRecordFromRestartFile THEN {
        checkpointWord ← FindNewestCheckpointCompleteRecord[];
        [ok, checkpointRecord, startAnalysisRecord] ←
          CheckpointCompleteRecordFromWord[checkpointWord];
        IF NOT ok THEN ERROR InternalProgrammingError;
        };
      RETURN [checkpointWord, checkpointRecord, startAnalysisRecord];
      };--DiscoverWhereToStartAnalysisPass

    checkpointWord: WordNumber; checkpointRecord: RecordID;
      startAnalysisRecord: RecordID;

    [] ← LogBasic.EstablishLogFile[logVolume, logFile];
    RestartFile.EstablishRestartFile[logVolume, restartFile];
    [checkpointWord, checkpointRecord, startAnalysisRecord] ←
      DiscoverWhereToStartAnalysisPass[];

    InitProcs.CalledBeforeAnalysisPass[myFileStore, logVolume];
    AnalysisPass[checkpointWord, checkpointRecord, startAnalysisRecord];
    InitProcs.CalledAfterAnalysisPass[];
      -- This writes log records.  To make them available for reading by the update
      --pass, they must be forced.
    LogBasic.Force[followingRecord: LogBasic.RecordIDOfNextPut[]];

    InitProcs.CalledBeforeUpdatePass[];
    UpdatePass[checkpointWord, checkpointRecord, startAnalysisRecord,
      LogBasic.RecordIDOfNextPut[]];
    InitProcs.CalledAfterUpdatePass[];

    [] ← Checkpoint[nullRecordID, nullRecordID, myFileStore];
    LogBasic.AssertNormalOperation[];
    };--Recover

  AnalysisPass: PROC [
    checkpointWord: WordNumber, checkpointRecord, firstRecord: RecordID] = {
    -- Scans log, calling procs registered for analysis pass of recovery.
    currentRecord, nextRecord: RecordID;
    TurnOffCheckpointRegistration[];
    { notStartOfRecord: BOOL;
      [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
        LogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord,
        checkpointRecord: checkpointRecord, firstRecord: firstRecord];
      IF notStartOfRecord THEN ERROR InternalProgrammingError;
      };
    DO
      currentRecordType: RecordType ← PeekCurrentType[currentRecord]; -- no errors
      analysisProc: LogControl.AnalysisProc;
      IF currentRecordType IN AnalysisProcsIndex AND
       (analysisProc ← analysisProcs[currentRecordType]) # NIL AND
       NOT LogBasic.CheckCurrentRecord[].truncated THEN {
        transID: TransID ← PeekCurrentTrans[currentRecord];
        analysisProc[currentRecord, currentRecordType, transID];
        };
      { endOfLog, truncatedRecord: BOOL;
        [endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] ←
          LogBasic.AdvanceRecordStream[];
        IF truncatedRecord THEN NoteTruncatedRecord[currentRecord];
        IF endOfLog THEN EXIT;
        };
      currentRecord ← nextRecord;
      ENDLOOP;
    -- Analysis complete.  If no checkpoint seen, we are in big trouble.
    IF NOT checkpointState.checkpointSeen THEN ERROR InternalProgrammingError;
    -- Overwrite enough pages past the end to ensure that we see no inconsistency due
    --to out-of-order log page writes.  Then open log for writing at correct position.
    { nextPageToWrite: PageNumber;  versionOfNextPageToWrite: LogRep.PageVersion;
      [nextPageToWrite, versionOfNextPageToWrite] ← LogBasic.GetCurrentPageAndVersion[];
      LogBasic.CloseRecordStream[];
      LogBasic.OpenForPut[nextPage: nextPageToWrite,
        version: 1-versionOfNextPageToWrite, nextRecord: nextRecord];
      LogBasic.Release[beforeRecord: checkpointState.keepRecord];
      LogBasic.Force[followingRecord:
        WriteNoopRecords[nPages: LogBasic.maxInvalidLogPages].followingRecord];
      LogBasic.CloseForPut[];
      LogBasic.OpenForPut[nextPage: nextPageToWrite,
        version: versionOfNextPageToWrite, nextRecord: nextRecord]
      };
    };--AnalysisPass

  RoundUpToPages: PROC [recordID: RecordID] RETURNS [RecordID] = {
    words: CARDINAL = LogInline.WordInPageFromRecordID[recordID];
    RETURN [
      IF words= 0 THEN recordID ELSE LogInline.AddC[recordID, wordsPerPage-words]];
    };

  firstTruncatedRecord, lastTruncatedRecord: LIST OF RecordID ← NIL;

  NoteTruncatedRecord: PROC [record: RecordID] = {
    new: LIST OF RecordID ← CONS [first: record, rest: NIL];
    IF lastTruncatedRecord # NIL THEN lastTruncatedRecord.rest ← new;
    lastTruncatedRecord ← new;
    IF firstTruncatedRecord = NIL THEN firstTruncatedRecord ← new;
    };

  CheckpointCompleteAnalysisProc: PROC [
    record: RecordID, type: RecordType, trans: TransID] = {
    -- We use this to find the keepRecord value in the LAST checkpoint record.
    checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord;
    IF LogBasic.GetCurrentRecord[currentRecord: record, to:
      [base: @checkpointCompleteRecordBody,
       length: SIZE[LogRep.CheckpointCompleteRecord]]].status = destinationFull OR
      checkpointCompleteRecordBody.type # checkpointComplete THEN ERROR;
    checkpointState.keepRecord ← checkpointCompleteRecordBody.keepRecordID;
    checkpointState.checkpointSeen ← TRUE;
    };

  CheckpointState: TYPE = RECORD [
    checkpointSeen: BOOL ← FALSE,
    keepRecord: RecordID
    ];
  checkpointState: CheckpointState;

  UpdatePass: PROC [checkpointWord: WordNumber,
    checkpointRecord, firstRecord, nextRecordToWrite: RecordID] = {
    currentTruncatedRecord: LIST OF RecordID ← firstTruncatedRecord;
    notStartOfRecord: BOOL;
    currentRecord: RecordID;
    expectTruncatedRecord: BOOL;
    TurnOffRecoveryRegistration[];
    [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
      LogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord,
      checkpointRecord: checkpointRecord, firstRecord: firstRecord];
    IF notStartOfRecord THEN ERROR InternalProgrammingError;
    DO
      expectTruncatedRecord ← FALSE;
      {
      currentRecordType: RecordType;
      recoveryProc: Log.RecoveryProc;
      IF currentTruncatedRecord # NIL THEN
        SELECT Compare[currentRecord, currentTruncatedRecord.first] FROM
          less => NULL;
          equal => {
            expectTruncatedRecord ← TRUE;
            currentTruncatedRecord ← currentTruncatedRecord.rest;
            GOTO GetNext };
          greater => ERROR InternalProgrammingError;
          ENDCASE => ERROR;
      SELECT Compare[currentRecord, nextRecordToWrite] FROM
        less => NULL;
        equal => EXIT;
        greater => ERROR InternalProgrammingError;
        ENDCASE => ERROR;
      IF (currentRecordType ← PeekCurrentType[currentRecord]) IN RecoveryProcsIndex AND
        (recoveryProc ← recoveryProcs[currentRecordType]) # NIL THEN {
        transID: TransID ← PeekCurrentTrans[currentRecord];
        transHandle: TransactionMap.Handle = TransactionMap.GetHandle[transID];
        IF transHandle # TransactionMap.nullHandle THEN {
          s: TransactionMap.TransState ← TransactionMap.StateDuringRecovery[transHandle];
          state: Log.TransState ← IF s = committed THEN committed
            ELSE IF s = aborted THEN aborted ELSE ready;
          recoveryProc[currentRecord, currentRecordType, transHandle, state];
          };
        };
      GOTO GetNext;
      EXITS
        GetNext => {
          endOfLog, truncatedRecord: BOOL;  nextRecord: RecordID;
          [endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] ←
            LogBasic.AdvanceRecordStream[];
        IF endOfLog AND (nextRecord # nextRecordToWrite) OR
          (truncatedRecord # expectTruncatedRecord) THEN ERROR InternalProgrammingError;
        currentRecord ← nextRecord;
        };
      };
      ENDLOOP;
    FilePageMgr.ForceOutEverything[];
    };

  PeekCurrentType: PROC [thisRecord: RecordID] RETURNS [recordType: RecordType] = {
    recordTypeHeader: LogRep.RecordTypeHeader;
    status: Log.ReadProcStatus;
    [status: status] ← LogBasic.GetCurrentRecord[currentRecord: thisRecord,
      to: [base: @recordTypeHeader, length: LogRep.RecordTypeHeader.SIZE]];
    IF status = sourceExhausted THEN ERROR InternalProgrammingError;
    RETURN [recordTypeHeader.type];
    };

  PeekCurrentTrans: PROC [thisRecord: RecordID] RETURNS [TransID] = {
    transactionHeader: LogRep.TransactionHeader;
    status: Log.ReadProcStatus;
    [status: status] ← LogBasic.GetCurrentRecord[currentRecord: thisRecord,
      to: [base: @transactionHeader, length: LogRep.TransactionHeader.SIZE]];
    IF status = sourceExhausted THEN RETURN [nullTransID]
    ELSE RETURN [transactionHeader.transID];
    };

  ReadForRecovery: PUBLIC PROC [
    thisRecord: RecordID, wordsToSkip: CARDINAL ← 0, to: Block]
    RETURNS [status: Log.ReadProcStatus, wordsRead: CARDINAL] = {
    -- This is a Log.ReadProc.
    IF wordsToSkip > LAST[CARDINAL] - SIZE[LogRep.TransactionHeader] THEN
      wordsToSkip ← LAST[CARDINAL]
    ELSE
      wordsToSkip ← wordsToSkip + SIZE[LogRep.TransactionHeader];
    [status: status, wordsRead: wordsRead] ←
      LogBasic.GetCurrentRecord[currentRecord: thisRecord,
        to: [base: NIL, length: wordsToSkip, rest: @to]];
    wordsRead ← wordsRead - SIZE[LogRep.TransactionHeader];
    };

  -- Log writing
  -- Exported to Log

  Write: PUBLIC PROC [
    trans: Worker.Handle, recordType: RecordType, recordData: Block, force: BOOL]
    RETURNS [thisRecord, followingRecord: RecordID] = {
    recordHeader: LogRep.TransactionHeader ← [type: recordType, transID: trans.transID];
    [thisRecord, followingRecord] ← LogBasic.Put[
      from: [base: @recordHeader, length: SIZE[LogRep.TransactionHeader], rest: @recordData],
      force: force, writeID: FALSE];
    };

  CoordinatorWrite: PUBLIC PROC [
    trans: Coordinator.Handle, recordType: RecordType, recordData: Block, force: BOOL]
    RETURNS [thisRecord, followingRecord: RecordID] = {
    recordHeader: LogRep.TransactionHeader ← [type: recordType, transID: trans.transID];
    [thisRecord, followingRecord] ← LogBasic.Put[
      from: [base: @recordHeader, length: SIZE[LogRep.TransactionHeader], rest: @recordData],
      force: force, writeID: FALSE];
    };

  Force: PUBLIC PROC [followingRecord: RecordID] = {
    IF followingRecord # nullRecordID THEN LogBasic.Force[followingRecord];
    };


  -- Miscellaneous info concerning the log.
  -- Exported to LogControl

  WordsUsableByClient: PUBLIC PROC [] RETURNS [INT] = {
    RETURN [(3*LogBasic.LogFileSize[]/4)*wordsPerPage];
    };

  RecordIDOfNextWrite: PUBLIC PROC [] RETURNS [RecordID] = {
    RETURN [LogBasic.RecordIDOfNextPut[]];
    };

  -- Log reading
  -- Exported to Log

  Read: PUBLIC PROC [
    thisRecord: RecordID, wordsToSkip: CARDINAL, to: Log.Block]
    RETURNS [status: Log.ReadProcStatus, wordsRead: CARDINAL] = {
    IF wordsToSkip > LAST[CARDINAL] - SIZE[LogRep.TransactionHeader] THEN
      wordsToSkip ← LAST[CARDINAL]
    ELSE
      wordsToSkip ← wordsToSkip + SIZE[LogRep.TransactionHeader];
    [status: status, wordsRead: wordsRead] ← LogBasic.Get[
      thisRecord: thisRecord, to: [base: NIL, length: wordsToSkip, rest: @to]];
    wordsRead ← wordsRead - SIZE[LogRep.TransactionHeader];
    };

  -- Recovery procs
  -- Exported to Log

  noMoreRecoveryRegistration: BOOL ← FALSE;

  TurnOffRecoveryRegistration: ENTRY PROC [] = INLINE {
    noMoreRecoveryRegistration ← TRUE;
    };

  RecoveryProcsIndex: TYPE = RecordType [noop .. reserved);
  RecoveryProcsArray: TYPE = ARRAY RecoveryProcsIndex OF Log.RecoveryProc;
  recoveryProcs: REF RecoveryProcsArray =
    AlpineZones.static.NEW[RecoveryProcsArray ← ALL [NIL]];

  RegisterRecoveryProc: PUBLIC ENTRY PROC [
    recordType: RecordType, recoveryProc: Log.RecoveryProc] = {
    IF noMoreRecoveryRegistration THEN
      RETURN WITH ERROR ClientProgrammingError;
    IF recordType NOT IN RecoveryProcsIndex THEN 
      RETURN WITH ERROR InternalProgrammingError;
    recoveryProcs[recordType] ← recoveryProc;
    };


  -- Analysis procs, checkpoint procs
  -- Exported to LogControl

  noMoreCheckpointRegistration: BOOL ← FALSE;

  TurnOffCheckpointRegistration: ENTRY PROC [] = INLINE {
    noMoreCheckpointRegistration ← TRUE;
    };

  IsCheckpointProcCallable: ENTRY PROC [] RETURNS [BOOL] = INLINE {
    RETURN [noMoreCheckpointRegistration];
    };

  AnalysisProcsIndex: TYPE = RecordType [checkpointBegin .. workerComplete];
  AnalysisProcsArray: TYPE = ARRAY AnalysisProcsIndex OF LogControl.AnalysisProc;
  analysisProcs: REF AnalysisProcsArray =
    AlpineZones.static.NEW[AnalysisProcsArray ← ALL [NIL]];

  RegisterAnalysisProc: PUBLIC ENTRY PROC [
    recordType: RecordType, analysisProc: LogControl.AnalysisProc] = {
    IF noMoreCheckpointRegistration THEN
      RETURN WITH ERROR ClientProgrammingError;
    IF recordType NOT IN AnalysisProcsIndex THEN 
      RETURN WITH ERROR InternalProgrammingError;
    analysisProcs[recordType] ← analysisProc;
    };

  checkpointProcs: ARRAY [0 .. maxCheckpointProcs) OF LogControl.CheckpointProc ← ALL [NIL];
  maxCheckpointProcs: INT = 5;
  nCheckpointProcs: INT [0 .. maxCheckpointProcs] ← 0;
  
  RegisterCheckpointProc: PUBLIC ENTRY PROC [
    checkpointProc: LogControl.CheckpointProc] = {
    IF noMoreCheckpointRegistration THEN
      RETURN WITH ERROR ClientProgrammingError;
    IF nCheckpointProcs = maxCheckpointProcs THEN 
      RETURN WITH ERROR InternalProgrammingError;
    checkpointProcs[nCheckpointProcs] ← checkpointProc;
    nCheckpointProcs ← nCheckpointProcs + 1;
    };

  ForkCheckpointProcess: PUBLIC PROC [myFileStore: FileStore,
    wakeupPeriodInSeconds: NAT] = {
    Process.Detach[FORK CheckpointProcess[myFileStore, wakeupPeriodInSeconds]];
    };

  CheckpointProcess: PROC [myFileStore: FileStore, wakeupPeriodInSeconds: NAT] = {
    keepRecord, startAnalysisRecord: RecordID ← nullRecordID;
    DO
      Process.Pause[Process.SecondsToTicks[wakeupPeriodInSeconds]];
      [keepRecord, startAnalysisRecord] ←
        Checkpoint[keepRecord, startAnalysisRecord, myFileStore];
      SafeStorage.ReclaimCollectibleObjects[suspendMe: TRUE, traceAndSweep: FALSE];
      ENDLOOP;
    };

  Checkpoint: PROC [
    previousKeepRecord, previousStartAnalysisRecord: RecordID,
    myFileStore: FileStore]
    RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
    -- Writes a checkpointCompleteRecord only if it contains a value different than
    --the previous checkpointCompleteRecord.
    onlineLogLength: INT = AlpineInline.WordsFromPages[LogBasic.LogFileSize[]];
    differentRecordIDsSeen: BOOL ← FALSE;
    NoticeDifferentRecordIDs: PROC [old, new: RecordID] = {
      SELECT Compare[old, new] FROM
        less => differentRecordIDsSeen ← TRUE;
        greater => ERROR InternalProgrammingError;
        ENDCASE;
      };
    keepRecord ← startAnalysisRecord ← LogBasic.RecordIDOfNextPut[];
    IF NOT IsCheckpointProcCallable[] THEN ERROR InternalProgrammingError;
    FOR i: INT [0 .. maxCheckpointProcs] IN [0 .. nCheckpointProcs) DO
      thisKeepRecord, thisStartAnalysisRecord: RecordID;
      [thisKeepRecord, thisStartAnalysisRecord] ← checkpointProcs[i][];
      keepRecord ← LogInline.Min[thisKeepRecord, keepRecord];
      startAnalysisRecord ← LogInline.Min[thisStartAnalysisRecord, startAnalysisRecord]
      ENDLOOP;
    NoticeDifferentRecordIDs[previousKeepRecord, keepRecord];
    NoticeDifferentRecordIDs[previousStartAnalysisRecord, startAnalysisRecord];
    IF differentRecordIDsSeen THEN {
      [] ← WriteCheckpointCompleteRecord[
        startAnalysisRecord: startAnalysisRecord,
        keepRecord: keepRecord,
        myFileStore: myFileStore];
      LogBasic.Release[beforeRecord: keepRecord];
      previousKeepRecord ← keepRecord;
      previousStartAnalysisRecord ← startAnalysisRecord;
      };
    RETURN [keepRecord, startAnalysisRecord];
    };

  RegisterAnalysisProc[
    recordType: checkpointComplete,
    analysisProc: CheckpointCompleteAnalysisProc];

  END.

CHANGE LOG

Changed by MBrown on April 1, 1983 1:25 pm
-- Checkpoint process is now implemented here, not in InitProcsImpl(!).

Changed by MBrown on June 16, 1983 9:35 am
-- Fixed bug in AnalysisPass.  After analysis is complete, we must write a few noop records to
--clean up the log tail, but we failed to call LogBasic.Release[keepRecord] to tell lower-level
--log routines that overwriting those pages is ok.

Changed by MBrown on June 27, 1983 10:06 am
-- Eliminated bug/feature in AnalysisPass that reclaimed "extra" log pages occupied by
--a multi-page truncated record found at end of log.  This never really worked
--(it has caused several restart failures) and the LogBasic abstraction does not
--allow for its clean implementation.  How much is it worth?
-- Added call to LogBasic.AssertNormalOperation at end of restart; this permits
--restart to write closer to the beginning of the log than normal clients are allowed to.

Changed by MBrown on January 30, 1984 10:20:09 am PST
-- Cedar 5.0: don't mention Environment.