-- LogImpl.mesa -- Last edited by -- MBrown on January 30, 1984 10:19:45 am PST DIRECTORY AlpineEnvironment, AlpineInline, AlpineInternal USING [], AlpineZones USING [static], Coordinator USING [Handle, Object], ConvertUnsafe, FilePageMgr, InitProcs, Log, LogBasic, LogControl, LogInline, LogRep, Process, RestartFile, Rope, SafeStorage USING [ReclaimCollectibleObjects], TransactionMap, Worker USING [Handle, Object]; LogImpl: MONITOR IMPORTS AlpineInline, AlpineZones, ConvertUnsafe, FilePageMgr, InitProcs, LogBasic, LogInline, LogRep, Process, RestartFile, Rope, SafeStorage, TransactionMap EXPORTS AlpineInternal, Log, LogControl = BEGIN VolumeID: TYPE = AlpineEnvironment.VolumeID; FileID: TYPE = AlpineEnvironment.FileID; FileStore: TYPE = AlpineEnvironment.FileStore; TransID: TYPE = AlpineEnvironment.TransID; nullTransID: TransID = AlpineEnvironment.nullTransID; PageNumber: TYPE = AlpineEnvironment.PageNumber; PageCount: TYPE = AlpineEnvironment.PageCount; WordNumber: TYPE = LogBasic.WordNumber; WordCount: TYPE = LogBasic.WordCount; RecordID: TYPE = Log.RecordID; nullRecordID: RecordID = Log.nullRecordID; Block: TYPE = Log.Block; RecordType: TYPE = Log.RecordType; wordsPerPage: CARDINAL = AlpineEnvironment.wordsPerPage; ClientProgrammingError: ERROR = CODE; InternalProgrammingError: ERROR = CODE; TransObject: PUBLIC TYPE = Worker.Object; CoordinatorObject: PUBLIC TYPE = Coordinator.Object; WriteFailed: PUBLIC ERROR = CODE; -- Log.WriteFailed Compare: PUBLIC PROC [a, b: RecordID] RETURNS [Log.Comparison] = { -- Log.Compare RETURN [LogInline.Compare[a, b]] }; Format: PUBLIC PROC [ logVolume: VolumeID, logFile, restartFile: FileID, myFileStore: FileStore] = { -- LogControl.Format logSize: PageCount; thisCheckpointBeginRecord: RecordID; LogBasic.EstablishLogFile[logVolume, logFile]; logSize _ LogBasic.LogFileSize[]; RestartFile.EstablishRestartFile[logVolume, restartFile]; -- Write a noop record on each page of the log file. A noop record is precisely --one page long. LogBasic.OpenForPut[nextPage: 0, version: 0, nextRecord: nullRecordID]; LogBasic.Release[beforeRecord: nullRecordID]; LogBasic.Release[beforeRecord: WriteNoopRecords[logSize/2].followingRecord]; [] _ WriteNoopRecords[logSize - logSize/2]; -- Write a CheckpointBegin record, then a CheckpointComplete record pointing --to it (and the restart record in the restart file). thisCheckpointBeginRecord _ LogBasic.RecordIDOfNextPut[]; -- If all has gone well, the CheckpointBegin record was written as first record on --page 0 of the log file. IF LogBasic.WordNumberFromRecordID[thisCheckpointBeginRecord] # 0 THEN ERROR InternalProgrammingError; LogBasic.Release[beforeRecord: thisCheckpointBeginRecord]; [] _ WriteCheckpointCompleteRecord[ startAnalysisRecord: thisCheckpointBeginRecord, keepRecord: thisCheckpointBeginRecord, myFileStore: myFileStore]; LogBasic.CloseForPut[]; }; WriteNoopRecords: PUBLIC PROC [nPages: PageCount] RETURNS [followingRecord: RecordID] = { -- This proc has a large frame. noopRecordBody: LogRep.NoopRecord _ []; FOR page: PageCount _ 0, page+1 UNTIL page = nPages DO [followingRecord: followingRecord] _ LogBasic.Put[from: [base: @noopRecordBody, length: SIZE[LogRep.NoopRecord]]]; ENDLOOP; RETURN [followingRecord]; }; WriteCheckpointCompleteRecord: PROC [ startAnalysisRecord, keepRecord: RecordID, myFileStore: FileStore] RETURNS [thisRecord: RecordID] = { -- This call waits both for a log force and for a restart file write before returning. checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord _ [startAnalysisRecordID: startAnalysisRecord, keepRecordID: keepRecord]; ConvertUnsafe.AppendRope[ to: LOOPHOLE[LONG[@(checkpointCompleteRecordBody.myFileStore)]], from: myFileStore]; [thisRecord: thisRecord] _ LogBasic.Put[from: [base: @checkpointCompleteRecordBody, length: LogRep.CheckpointCompleteRecordSize[myFileStore.Length]], writeID: TRUE, force: TRUE]; RestartFile.WriteRestartRecord[ LogBasic.WordNumberFromRecordID[thisRecord], thisRecord]; }; Recover: PUBLIC PROC [logVolume: VolumeID, logFile, restartFile: FileID] = { myFileStore: FileStore; DiscoverWhereToStartAnalysisPass: PROC [] RETURNS [ checkpointWord: WordNumber, checkpointRecord, startAnalysisRecord: RecordID] = { CheckpointCompleteRecordFromWord: PROC [checkpointWord: WordNumber] RETURNS [ ok: BOOL, checkpointRecord, startAnalysisRecord: RecordID] = { -- Updates: Recover.myFileStore. -- Returns ok = FALSE if any error detected. { checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord; notStartOfRecord: BOOL; currentRecord: RecordID; [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] _ LogBasic.OpenRecordStreamFromWord[checkpointWord ! LogBasic.InvalidPage => GOTO failReturn ]; IF notStartOfRecord THEN GOTO failReturn; IF LogBasic.CheckCurrentRecord[].truncated THEN GOTO failReturn; IF LogBasic.GetCurrentRecord[currentRecord, [base: @checkpointCompleteRecordBody, length: SIZE[LogRep.CheckpointCompleteRecord]]].status = destinationFull OR checkpointCompleteRecordBody.type # checkpointComplete THEN GOTO failReturn; myFileStore _ ConvertUnsafe.ToRope[ from: LOOPHOLE[LONG[@checkpointCompleteRecordBody.myFileStore]]]; checkpointRecord _ checkpointCompleteRecordBody.thisRecordID; startAnalysisRecord _ checkpointCompleteRecordBody.startAnalysisRecordID; ok _ TRUE; EXITS failReturn => { ok _ FALSE }; }; LogBasic.CloseRecordStream[]; RETURN [ok, checkpointRecord, startAnalysisRecord]; };--CheckpointCompleteRecordFromWord FindNewestCheckpointCompleteRecord: PROC [] RETURNS [WordNumber] = { -- Do a sequential scan of log, starting at LogBasic.LocateFirstRecord[]. bigRecordID: RecordID = LogInline.RecordIDFromWordNumber[ AlpineInline.WordsFromPages[LogBasic.LogFileSize[]]]; firstWord: WordNumber = LogBasic.LocateFirstRecord[]; checkpointRecord, currentRecord: RecordID; foundCheckpointRecord: BOOL _ FALSE; notStartOfRecord: BOOL; [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] _ LogBasic.OpenRecordStreamFromWord[firstWord]; IF notStartOfRecord THEN ERROR InternalProgrammingError; DO nextRecord: RecordID; endOfLog, truncatedRecord: BOOL; currentType: RecordType _ PeekCurrentType[currentRecord]; [endOfLog, truncatedRecord, nextRecord] _ LogBasic.AdvanceRecordStream[]; IF currentType = checkpointComplete AND NOT truncatedRecord THEN { checkpointRecord _ currentRecord; foundCheckpointRecord _ TRUE; }; IF endOfLog THEN EXIT; currentRecord _ nextRecord; ENDLOOP; IF NOT foundCheckpointRecord THEN ERROR InternalProgrammingError; RETURN [LogInline.WordsFromSubtract[larger: checkpointRecord, smaller: IF Compare[checkpointRecord, bigRecordID] # less THEN bigRecordID ELSE nullRecordID]]; };--FindNewestCheckpointCompleteRecord checkpointRecordFromRestartFile: RecordID; ok: BOOL; [checkpointWord, checkpointRecordFromRestartFile] _ RestartFile.ReadRestartRecord[]; [ok, checkpointRecord, startAnalysisRecord] _ CheckpointCompleteRecordFromWord[checkpointWord]; IF NOT ok OR checkpointRecord # checkpointRecordFromRestartFile THEN { checkpointWord _ FindNewestCheckpointCompleteRecord[]; [ok, checkpointRecord, startAnalysisRecord] _ CheckpointCompleteRecordFromWord[checkpointWord]; IF NOT ok THEN ERROR InternalProgrammingError; }; RETURN [checkpointWord, checkpointRecord, startAnalysisRecord]; };--DiscoverWhereToStartAnalysisPass checkpointWord: WordNumber; checkpointRecord: RecordID; startAnalysisRecord: RecordID; [] _ LogBasic.EstablishLogFile[logVolume, logFile]; RestartFile.EstablishRestartFile[logVolume, restartFile]; [checkpointWord, checkpointRecord, startAnalysisRecord] _ DiscoverWhereToStartAnalysisPass[]; InitProcs.CalledBeforeAnalysisPass[myFileStore, logVolume]; AnalysisPass[checkpointWord, checkpointRecord, startAnalysisRecord]; InitProcs.CalledAfterAnalysisPass[]; -- This writes log records. To make them available for reading by the update --pass, they must be forced. LogBasic.Force[followingRecord: LogBasic.RecordIDOfNextPut[]]; InitProcs.CalledBeforeUpdatePass[]; UpdatePass[checkpointWord, checkpointRecord, startAnalysisRecord, LogBasic.RecordIDOfNextPut[]]; InitProcs.CalledAfterUpdatePass[]; [] _ Checkpoint[nullRecordID, nullRecordID, myFileStore]; LogBasic.AssertNormalOperation[]; };--Recover AnalysisPass: PROC [ checkpointWord: WordNumber, checkpointRecord, firstRecord: RecordID] = { -- Scans log, calling procs registered for analysis pass of recovery. currentRecord, nextRecord: RecordID; TurnOffCheckpointRegistration[]; { notStartOfRecord: BOOL; [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] _ LogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord, checkpointRecord: checkpointRecord, firstRecord: firstRecord]; IF notStartOfRecord THEN ERROR InternalProgrammingError; }; DO currentRecordType: RecordType _ PeekCurrentType[currentRecord]; -- no errors analysisProc: LogControl.AnalysisProc; IF currentRecordType IN AnalysisProcsIndex AND (analysisProc _ analysisProcs[currentRecordType]) # NIL AND NOT LogBasic.CheckCurrentRecord[].truncated THEN { transID: TransID _ PeekCurrentTrans[currentRecord]; analysisProc[currentRecord, currentRecordType, transID]; }; { endOfLog, truncatedRecord: BOOL; [endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] _ LogBasic.AdvanceRecordStream[]; IF truncatedRecord THEN NoteTruncatedRecord[currentRecord]; IF endOfLog THEN EXIT; }; currentRecord _ nextRecord; ENDLOOP; -- Analysis complete. If no checkpoint seen, we are in big trouble. IF NOT checkpointState.checkpointSeen THEN ERROR InternalProgrammingError; -- Overwrite enough pages past the end to ensure that we see no inconsistency due --to out-of-order log page writes. Then open log for writing at correct position. { nextPageToWrite: PageNumber; versionOfNextPageToWrite: LogRep.PageVersion; [nextPageToWrite, versionOfNextPageToWrite] _ LogBasic.GetCurrentPageAndVersion[]; LogBasic.CloseRecordStream[]; LogBasic.OpenForPut[nextPage: nextPageToWrite, version: 1-versionOfNextPageToWrite, nextRecord: nextRecord]; LogBasic.Release[beforeRecord: checkpointState.keepRecord]; LogBasic.Force[followingRecord: WriteNoopRecords[nPages: LogBasic.maxInvalidLogPages].followingRecord]; LogBasic.CloseForPut[]; LogBasic.OpenForPut[nextPage: nextPageToWrite, version: versionOfNextPageToWrite, nextRecord: nextRecord] }; };--AnalysisPass RoundUpToPages: PROC [recordID: RecordID] RETURNS [RecordID] = { words: CARDINAL = LogInline.WordInPageFromRecordID[recordID]; RETURN [ IF words= 0 THEN recordID ELSE LogInline.AddC[recordID, wordsPerPage-words]]; }; firstTruncatedRecord, lastTruncatedRecord: LIST OF RecordID _ NIL; NoteTruncatedRecord: PROC [record: RecordID] = { new: LIST OF RecordID _ CONS [first: record, rest: NIL]; IF lastTruncatedRecord # NIL THEN lastTruncatedRecord.rest _ new; lastTruncatedRecord _ new; IF firstTruncatedRecord = NIL THEN firstTruncatedRecord _ new; }; CheckpointCompleteAnalysisProc: PROC [ record: RecordID, type: RecordType, trans: TransID] = { -- We use this to find the keepRecord value in the LAST checkpoint record. checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord; IF LogBasic.GetCurrentRecord[currentRecord: record, to: [base: @checkpointCompleteRecordBody, length: SIZE[LogRep.CheckpointCompleteRecord]]].status = destinationFull OR checkpointCompleteRecordBody.type # checkpointComplete THEN ERROR; checkpointState.keepRecord _ checkpointCompleteRecordBody.keepRecordID; checkpointState.checkpointSeen _ TRUE; }; CheckpointState: TYPE = RECORD [ checkpointSeen: BOOL _ FALSE, keepRecord: RecordID ]; checkpointState: CheckpointState; UpdatePass: PROC [checkpointWord: WordNumber, checkpointRecord, firstRecord, nextRecordToWrite: RecordID] = { currentTruncatedRecord: LIST OF RecordID _ firstTruncatedRecord; notStartOfRecord: BOOL; currentRecord: RecordID; expectTruncatedRecord: BOOL; TurnOffRecoveryRegistration[]; [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] _ LogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord, checkpointRecord: checkpointRecord, firstRecord: firstRecord]; IF notStartOfRecord THEN ERROR InternalProgrammingError; DO expectTruncatedRecord _ FALSE; { currentRecordType: RecordType; recoveryProc: Log.RecoveryProc; IF currentTruncatedRecord # NIL THEN SELECT Compare[currentRecord, currentTruncatedRecord.first] FROM less => NULL; equal => { expectTruncatedRecord _ TRUE; currentTruncatedRecord _ currentTruncatedRecord.rest; GOTO GetNext }; greater => ERROR InternalProgrammingError; ENDCASE => ERROR; SELECT Compare[currentRecord, nextRecordToWrite] FROM less => NULL; equal => EXIT; greater => ERROR InternalProgrammingError; ENDCASE => ERROR; IF (currentRecordType _ PeekCurrentType[currentRecord]) IN RecoveryProcsIndex AND (recoveryProc _ recoveryProcs[currentRecordType]) # NIL THEN { transID: TransID _ PeekCurrentTrans[currentRecord]; transHandle: TransactionMap.Handle = TransactionMap.GetHandle[transID]; IF transHandle # TransactionMap.nullHandle THEN { s: TransactionMap.TransState _ TransactionMap.StateDuringRecovery[transHandle]; state: Log.TransState _ IF s = committed THEN committed ELSE IF s = aborted THEN aborted ELSE ready; recoveryProc[currentRecord, currentRecordType, transHandle, state]; }; }; GOTO GetNext; EXITS GetNext => { endOfLog, truncatedRecord: BOOL; nextRecord: RecordID; [endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] _ LogBasic.AdvanceRecordStream[]; IF endOfLog AND (nextRecord # nextRecordToWrite) OR (truncatedRecord # expectTruncatedRecord) THEN ERROR InternalProgrammingError; currentRecord _ nextRecord; }; }; ENDLOOP; FilePageMgr.ForceOutEverything[]; }; PeekCurrentType: PROC [thisRecord: RecordID] RETURNS [recordType: RecordType] = { recordTypeHeader: LogRep.RecordTypeHeader; status: Log.ReadProcStatus; [status: status] _ LogBasic.GetCurrentRecord[currentRecord: thisRecord, to: [base: @recordTypeHeader, length: LogRep.RecordTypeHeader.SIZE]]; IF status = sourceExhausted THEN ERROR InternalProgrammingError; RETURN [recordTypeHeader.type]; }; PeekCurrentTrans: PROC [thisRecord: RecordID] RETURNS [TransID] = { transactionHeader: LogRep.TransactionHeader; status: Log.ReadProcStatus; [status: status] _ LogBasic.GetCurrentRecord[currentRecord: thisRecord, to: [base: @transactionHeader, length: LogRep.TransactionHeader.SIZE]]; IF status = sourceExhausted THEN RETURN [nullTransID] ELSE RETURN [transactionHeader.transID]; }; ReadForRecovery: PUBLIC PROC [ thisRecord: RecordID, wordsToSkip: CARDINAL _ 0, to: Block] RETURNS [status: Log.ReadProcStatus, wordsRead: CARDINAL] = { -- This is a Log.ReadProc. IF wordsToSkip > LAST[CARDINAL] - SIZE[LogRep.TransactionHeader] THEN wordsToSkip _ LAST[CARDINAL] ELSE wordsToSkip _ wordsToSkip + SIZE[LogRep.TransactionHeader]; [status: status, wordsRead: wordsRead] _ LogBasic.GetCurrentRecord[currentRecord: thisRecord, to: [base: NIL, length: wordsToSkip, rest: @to]]; wordsRead _ wordsRead - SIZE[LogRep.TransactionHeader]; }; -- Log writing -- Exported to Log Write: PUBLIC PROC [ trans: Worker.Handle, recordType: RecordType, recordData: Block, force: BOOL] RETURNS [thisRecord, followingRecord: RecordID] = { recordHeader: LogRep.TransactionHeader _ [type: recordType, transID: trans.transID]; [thisRecord, followingRecord] _ LogBasic.Put[ from: [base: @recordHeader, length: SIZE[LogRep.TransactionHeader], rest: @recordData], force: force, writeID: FALSE]; }; CoordinatorWrite: PUBLIC PROC [ trans: Coordinator.Handle, recordType: RecordType, recordData: Block, force: BOOL] RETURNS [thisRecord, followingRecord: RecordID] = { recordHeader: LogRep.TransactionHeader _ [type: recordType, transID: trans.transID]; [thisRecord, followingRecord] _ LogBasic.Put[ from: [base: @recordHeader, length: SIZE[LogRep.TransactionHeader], rest: @recordData], force: force, writeID: FALSE]; }; Force: PUBLIC PROC [followingRecord: RecordID] = { IF followingRecord # nullRecordID THEN LogBasic.Force[followingRecord]; }; -- Miscellaneous info concerning the log. -- Exported to LogControl WordsUsableByClient: PUBLIC PROC [] RETURNS [INT] = { RETURN [(3*LogBasic.LogFileSize[]/4)*wordsPerPage]; }; RecordIDOfNextWrite: PUBLIC PROC [] RETURNS [RecordID] = { RETURN [LogBasic.RecordIDOfNextPut[]]; }; -- Log reading -- Exported to Log Read: PUBLIC PROC [ thisRecord: RecordID, wordsToSkip: CARDINAL, to: Log.Block] RETURNS [status: Log.ReadProcStatus, wordsRead: CARDINAL] = { IF wordsToSkip > LAST[CARDINAL] - SIZE[LogRep.TransactionHeader] THEN wordsToSkip _ LAST[CARDINAL] ELSE wordsToSkip _ wordsToSkip + SIZE[LogRep.TransactionHeader]; [status: status, wordsRead: wordsRead] _ LogBasic.Get[ thisRecord: thisRecord, to: [base: NIL, length: wordsToSkip, rest: @to]]; wordsRead _ wordsRead - SIZE[LogRep.TransactionHeader]; }; -- Recovery procs -- Exported to Log noMoreRecoveryRegistration: BOOL _ FALSE; TurnOffRecoveryRegistration: ENTRY PROC [] = INLINE { noMoreRecoveryRegistration _ TRUE; }; RecoveryProcsIndex: TYPE = RecordType [noop .. reserved); RecoveryProcsArray: TYPE = ARRAY RecoveryProcsIndex OF Log.RecoveryProc; recoveryProcs: REF RecoveryProcsArray = AlpineZones.static.NEW[RecoveryProcsArray _ ALL [NIL]]; RegisterRecoveryProc: PUBLIC ENTRY PROC [ recordType: RecordType, recoveryProc: Log.RecoveryProc] = { IF noMoreRecoveryRegistration THEN RETURN WITH ERROR ClientProgrammingError; IF recordType NOT IN RecoveryProcsIndex THEN RETURN WITH ERROR InternalProgrammingError; recoveryProcs[recordType] _ recoveryProc; }; -- Analysis procs, checkpoint procs -- Exported to LogControl noMoreCheckpointRegistration: BOOL _ FALSE; TurnOffCheckpointRegistration: ENTRY PROC [] = INLINE { noMoreCheckpointRegistration _ TRUE; }; IsCheckpointProcCallable: ENTRY PROC [] RETURNS [BOOL] = INLINE { RETURN [noMoreCheckpointRegistration]; }; AnalysisProcsIndex: TYPE = RecordType [checkpointBegin .. workerComplete]; AnalysisProcsArray: TYPE = ARRAY AnalysisProcsIndex OF LogControl.AnalysisProc; analysisProcs: REF AnalysisProcsArray = AlpineZones.static.NEW[AnalysisProcsArray _ ALL [NIL]]; RegisterAnalysisProc: PUBLIC ENTRY PROC [ recordType: RecordType, analysisProc: LogControl.AnalysisProc] = { IF noMoreCheckpointRegistration THEN RETURN WITH ERROR ClientProgrammingError; IF recordType NOT IN AnalysisProcsIndex THEN RETURN WITH ERROR InternalProgrammingError; analysisProcs[recordType] _ analysisProc; }; checkpointProcs: ARRAY [0 .. maxCheckpointProcs) OF LogControl.CheckpointProc _ ALL [NIL]; maxCheckpointProcs: INT = 5; nCheckpointProcs: INT [0 .. maxCheckpointProcs] _ 0; RegisterCheckpointProc: PUBLIC ENTRY PROC [ checkpointProc: LogControl.CheckpointProc] = { IF noMoreCheckpointRegistration THEN RETURN WITH ERROR ClientProgrammingError; IF nCheckpointProcs = maxCheckpointProcs THEN RETURN WITH ERROR InternalProgrammingError; checkpointProcs[nCheckpointProcs] _ checkpointProc; nCheckpointProcs _ nCheckpointProcs + 1; }; ForkCheckpointProcess: PUBLIC PROC [myFileStore: FileStore, wakeupPeriodInSeconds: NAT] = { Process.Detach[FORK CheckpointProcess[myFileStore, wakeupPeriodInSeconds]]; }; CheckpointProcess: PROC [myFileStore: FileStore, wakeupPeriodInSeconds: NAT] = { keepRecord, startAnalysisRecord: RecordID _ nullRecordID; DO Process.Pause[Process.SecondsToTicks[wakeupPeriodInSeconds]]; [keepRecord, startAnalysisRecord] _ Checkpoint[keepRecord, startAnalysisRecord, myFileStore]; SafeStorage.ReclaimCollectibleObjects[suspendMe: TRUE, traceAndSweep: FALSE]; ENDLOOP; }; Checkpoint: PROC [ previousKeepRecord, previousStartAnalysisRecord: RecordID, myFileStore: FileStore] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { -- Writes a checkpointCompleteRecord only if it contains a value different than --the previous checkpointCompleteRecord. onlineLogLength: INT = AlpineInline.WordsFromPages[LogBasic.LogFileSize[]]; differentRecordIDsSeen: BOOL _ FALSE; NoticeDifferentRecordIDs: PROC [old, new: RecordID] = { SELECT Compare[old, new] FROM less => differentRecordIDsSeen _ TRUE; greater => ERROR InternalProgrammingError; ENDCASE; }; keepRecord _ startAnalysisRecord _ LogBasic.RecordIDOfNextPut[]; IF NOT IsCheckpointProcCallable[] THEN ERROR InternalProgrammingError; FOR i: INT [0 .. maxCheckpointProcs] IN [0 .. nCheckpointProcs) DO thisKeepRecord, thisStartAnalysisRecord: RecordID; [thisKeepRecord, thisStartAnalysisRecord] _ checkpointProcs[i][]; keepRecord _ LogInline.Min[thisKeepRecord, keepRecord]; startAnalysisRecord _ LogInline.Min[thisStartAnalysisRecord, startAnalysisRecord] ENDLOOP; NoticeDifferentRecordIDs[previousKeepRecord, keepRecord]; NoticeDifferentRecordIDs[previousStartAnalysisRecord, startAnalysisRecord]; IF differentRecordIDsSeen THEN { [] _ WriteCheckpointCompleteRecord[ startAnalysisRecord: startAnalysisRecord, keepRecord: keepRecord, myFileStore: myFileStore]; LogBasic.Release[beforeRecord: keepRecord]; previousKeepRecord _ keepRecord; previousStartAnalysisRecord _ startAnalysisRecord; }; RETURN [keepRecord, startAnalysisRecord]; }; RegisterAnalysisProc[ recordType: checkpointComplete, analysisProc: CheckpointCompleteAnalysisProc]; END. CHANGE LOG Changed by MBrown on April 1, 1983 1:25 pm -- Checkpoint process is now implemented here, not in InitProcsImpl(!). Changed by MBrown on June 16, 1983 9:35 am -- Fixed bug in AnalysisPass. After analysis is complete, we must write a few noop records to --clean up the log tail, but we failed to call LogBasic.Release[keepRecord] to tell lower-level --log routines that overwriting those pages is ok. Changed by MBrown on June 27, 1983 10:06 am -- Eliminated bug/feature in AnalysisPass that reclaimed "extra" log pages occupied by --a multi-page truncated record found at end of log. This never really worked --(it has caused several restart failures) and the LogBasic abstraction does not --allow for its clean implementation. How much is it worth? -- Added call to LogBasic.AssertNormalOperation at end of restart; this permits --restart to write closer to the beginning of the log than normal clients are allowed to. Changed by MBrown on January 30, 1984 10:20:09 am PST -- Cedar 5.0: don't mention Environment.