LogImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Last edited by
MBrown on January 30, 1984 10:19:45 am PST
Hauser, March 8, 1985 10:51:12 am PST
DIRECTORY
AlpineEnvironment,
AlpineInline,
AlpineInternal USING [],
AlpineZones USING [static],
Coordinator USING [Handle, Object],
ConvertUnsafe,
FilePageMgr,
InitProcs,
Log,
LogBasic,
LogControl,
LogInline,
LogRep,
Process,
RestartFile,
Rope,
SafeStorage USING [ReclaimCollectibleObjects],
TransactionMap,
Worker USING [Handle, Object];
LogImpl: MONITOR
IMPORTS
AlpineInline,
AlpineZones,
ConvertUnsafe,
FilePageMgr,
InitProcs,
LogBasic,
LogInline,
LogRep,
Process,
RestartFile,
Rope,
SafeStorage,
TransactionMap
EXPORTS
AlpineInternal,
Log,
LogControl =
BEGIN
VolumeID: TYPE = AlpineEnvironment.VolumeID;
FileID: TYPE = AlpineEnvironment.FileID;
FileStore: TYPE = AlpineEnvironment.FileStore;
TransID: TYPE = AlpineEnvironment.TransID;
nullTransID: TransID = AlpineEnvironment.nullTransID;
PageNumber: TYPE = AlpineEnvironment.PageNumber;
PageCount: TYPE = AlpineEnvironment.PageCount;
WordNumber: TYPE = LogBasic.WordNumber;
WordCount: TYPE = LogBasic.WordCount;
RecordID: TYPE = Log.RecordID;
nullRecordID: RecordID = Log.nullRecordID;
Block: TYPE = Log.Block;
RecordType: TYPE = Log.RecordType;
wordsPerPage: CARDINAL = AlpineEnvironment.wordsPerPage;
ClientProgrammingError: ERROR = CODE;
InternalProgrammingError: ERROR = CODE;
TransObject: PUBLIC TYPE = Worker.Object;
CoordinatorObject: PUBLIC TYPE = Coordinator.Object;
WriteFailed: PUBLIC ERROR = CODE;
Log.WriteFailed
Compare: PUBLIC PROC [a, b: RecordID] RETURNS [Log.Comparison] = {
Log.Compare
RETURN [LogInline.Compare[a, b]] };
Format: PUBLIC PROC [
logVolume: VolumeID, logFile, restartFile: FileID, myFileStore: FileStore] = {
LogControl.Format
logSize: PageCount;
thisCheckpointBeginRecord: RecordID;
LogBasic.EstablishLogFile[logVolume, logFile];
logSize ← LogBasic.LogFileSize[];
RestartFile.EstablishRestartFile[logVolume, restartFile];
Write a noop record on each page of the log file. A noop record is precisely
one page long.
LogBasic.OpenForPut[nextPage: 0, version: 0, nextRecord: nullRecordID];
LogBasic.Release[beforeRecord: nullRecordID];
LogBasic.Release[beforeRecord: WriteNoopRecords[logSize/2].followingRecord];
[] ← WriteNoopRecords[logSize - logSize/2];
Write a CheckpointBegin record, then a CheckpointComplete record pointing
to it (and the restart record in the restart file).
thisCheckpointBeginRecord ← LogBasic.RecordIDOfNextPut[];
If all has gone well, the CheckpointBegin record was written as first record on
page 0 of the log file.
IF LogBasic.WordNumberFromRecordID[thisCheckpointBeginRecord] # 0 THEN
ERROR InternalProgrammingError;
LogBasic.Release[beforeRecord: thisCheckpointBeginRecord];
[] ← WriteCheckpointCompleteRecord[
startAnalysisRecord: thisCheckpointBeginRecord,
keepRecord: thisCheckpointBeginRecord,
myFileStore: myFileStore];
LogBasic.CloseForPut[];
};
WriteNoopRecords: PUBLIC PROC [nPages: PageCount]
RETURNS [followingRecord: RecordID] = {
This proc has a large frame.
noopRecordBody: LogRep.NoopRecord ← [];
FOR page: PageCount ← 0, page+1 UNTIL page = nPages DO
[followingRecord: followingRecord] ← LogBasic.Put[from:
[base: @noopRecordBody, length: SIZE[LogRep.NoopRecord]]];
ENDLOOP;
RETURN [followingRecord];
};
WriteCheckpointCompleteRecord: PROC [
startAnalysisRecord, keepRecord: RecordID, myFileStore: FileStore]
RETURNS [thisRecord: RecordID] = {
This call waits both for a log force and for a restart file write before returning.
checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord ←
[startAnalysisRecordID: startAnalysisRecord, keepRecordID: keepRecord];
ConvertUnsafe.AppendRope[
to: LOOPHOLE[LONG[@(checkpointCompleteRecordBody.myFileStore)]], from: myFileStore];
[thisRecord: thisRecord] ← LogBasic.Put[from: [base: @checkpointCompleteRecordBody,
length: LogRep.CheckpointCompleteRecordSize[myFileStore.Length]],
writeID: TRUE, force: TRUE];
RestartFile.WriteRestartRecord[
LogBasic.WordNumberFromRecordID[thisRecord], thisRecord];
};
Recover: PUBLIC PROC [logVolume: VolumeID, logFile, restartFile: FileID] = {
myFileStore: FileStore;
DiscoverWhereToStartAnalysisPass: PROC [] RETURNS [
checkpointWord: WordNumber, checkpointRecord, startAnalysisRecord: RecordID] = {
CheckpointCompleteRecordFromWord: PROC [checkpointWord: WordNumber] RETURNS [
ok: BOOL, checkpointRecord, startAnalysisRecord: RecordID] = {
Updates: Recover.myFileStore.
Returns ok = FALSE if any error detected.
{
checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord;
notStartOfRecord: BOOL;
currentRecord: RecordID;
[notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
LogBasic.OpenRecordStreamFromWord[checkpointWord ! LogBasic.InvalidPage =>
GOTO failReturn ];
IF notStartOfRecord THEN GOTO failReturn;
IF LogBasic.CheckCurrentRecord[].truncated THEN GOTO failReturn;
IF LogBasic.GetCurrentRecord[currentRecord, [base: @checkpointCompleteRecordBody,
length: SIZE[LogRep.CheckpointCompleteRecord]]].status = destinationFull OR
checkpointCompleteRecordBody.type # checkpointComplete THEN GOTO failReturn;
myFileStore ← ConvertUnsafe.ToRope[
from: LOOPHOLE[LONG[@checkpointCompleteRecordBody.myFileStore]]];
checkpointRecord ← checkpointCompleteRecordBody.thisRecordID;
startAnalysisRecord ← checkpointCompleteRecordBody.startAnalysisRecordID;
ok ← TRUE;
EXITS
failReturn => { ok ← FALSE };
};
LogBasic.CloseRecordStream[];
RETURN [ok, checkpointRecord, startAnalysisRecord];
};--CheckpointCompleteRecordFromWord
FindNewestCheckpointCompleteRecord: PROC [] RETURNS [WordNumber] = {
Do a sequential scan of log, starting at LogBasic.LocateFirstRecord[].
bigRecordID: RecordID = LogInline.RecordIDFromWordNumber[
AlpineInline.WordsFromPages[LogBasic.LogFileSize[]]];
firstWord: WordNumber = LogBasic.LocateFirstRecord[];
checkpointRecord, currentRecord: RecordID;
foundCheckpointRecord: BOOLFALSE;
notStartOfRecord: BOOL;
[notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
LogBasic.OpenRecordStreamFromWord[firstWord];
IF notStartOfRecord THEN ERROR InternalProgrammingError;
DO
nextRecord: RecordID;
endOfLog, truncatedRecord: BOOL;
currentType: RecordType ← PeekCurrentType[currentRecord];
[endOfLog, truncatedRecord, nextRecord] ← LogBasic.AdvanceRecordStream[];
IF currentType = checkpointComplete AND NOT truncatedRecord THEN {
checkpointRecord ← currentRecord;
foundCheckpointRecord ← TRUE;
};
IF endOfLog THEN EXIT;
currentRecord ← nextRecord;
ENDLOOP;
IF NOT foundCheckpointRecord THEN ERROR InternalProgrammingError;
RETURN [LogInline.WordsFromSubtract[larger: checkpointRecord, smaller:
IF Compare[checkpointRecord, bigRecordID] # less THEN bigRecordID
ELSE nullRecordID]];
};--FindNewestCheckpointCompleteRecord
checkpointRecordFromRestartFile: RecordID;
ok: BOOL;
[checkpointWord, checkpointRecordFromRestartFile] ←
RestartFile.ReadRestartRecord[];
[ok, checkpointRecord, startAnalysisRecord] ←
CheckpointCompleteRecordFromWord[checkpointWord];
IF NOT ok OR checkpointRecord # checkpointRecordFromRestartFile THEN {
checkpointWord ← FindNewestCheckpointCompleteRecord[];
[ok, checkpointRecord, startAnalysisRecord] ←
CheckpointCompleteRecordFromWord[checkpointWord];
IF NOT ok THEN ERROR InternalProgrammingError;
};
RETURN [checkpointWord, checkpointRecord, startAnalysisRecord];
};--DiscoverWhereToStartAnalysisPass
checkpointWord: WordNumber; checkpointRecord: RecordID;
startAnalysisRecord: RecordID;
[] ← LogBasic.EstablishLogFile[logVolume, logFile];
RestartFile.EstablishRestartFile[logVolume, restartFile];
[checkpointWord, checkpointRecord, startAnalysisRecord] ←
DiscoverWhereToStartAnalysisPass[];
InitProcs.CalledBeforeAnalysisPass[myFileStore, logVolume];
AnalysisPass[checkpointWord, checkpointRecord, startAnalysisRecord];
InitProcs.CalledAfterAnalysisPass[];
This writes log records. To make them available for reading by the update
pass, they must be forced.
LogBasic.Force[followingRecord: LogBasic.RecordIDOfNextPut[]];
InitProcs.CalledBeforeUpdatePass[];
UpdatePass[checkpointWord, checkpointRecord, startAnalysisRecord,
LogBasic.RecordIDOfNextPut[]];
InitProcs.CalledAfterUpdatePass[];
[] ← Checkpoint[nullRecordID, nullRecordID, myFileStore];
LogBasic.AssertNormalOperation[];
};--Recover
AnalysisPass: PROC [
checkpointWord: WordNumber, checkpointRecord, firstRecord: RecordID] = {
Scans log, calling procs registered for analysis pass of recovery.
currentRecord, nextRecord: RecordID;
TurnOffCheckpointRegistration[];
{ notStartOfRecord: BOOL;
[notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
LogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord,
checkpointRecord: checkpointRecord, firstRecord: firstRecord];
IF notStartOfRecord THEN ERROR InternalProgrammingError;
};
DO
currentRecordType: RecordType ← PeekCurrentType[currentRecord]; -- no errors
analysisProc: LogControl.AnalysisProc;
IF currentRecordType IN AnalysisProcsIndex AND
(analysisProc ← analysisProcs[currentRecordType]) # NIL AND
NOT LogBasic.CheckCurrentRecord[].truncated THEN {
transID: TransID ← PeekCurrentTrans[currentRecord];
analysisProc[currentRecord, currentRecordType, transID];
};
{ endOfLog, truncatedRecord: BOOL;
[endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] ←
LogBasic.AdvanceRecordStream[];
IF truncatedRecord THEN NoteTruncatedRecord[currentRecord];
IF endOfLog THEN EXIT;
};
currentRecord ← nextRecord;
ENDLOOP;
Analysis complete. If no checkpoint seen, we are in big trouble.
IF NOT checkpointState.checkpointSeen THEN ERROR InternalProgrammingError;
Overwrite enough pages past the end to ensure that we see no inconsistency due
to out-of-order log page writes. Then open log for writing at correct position.
{ nextPageToWrite: PageNumber; versionOfNextPageToWrite: LogRep.PageVersion;
[nextPageToWrite, versionOfNextPageToWrite] ← LogBasic.GetCurrentPageAndVersion[];
LogBasic.CloseRecordStream[];
LogBasic.OpenForPut[nextPage: nextPageToWrite,
version: 1-versionOfNextPageToWrite, nextRecord: nextRecord];
LogBasic.Release[beforeRecord: checkpointState.keepRecord];
LogBasic.Force[followingRecord:
WriteNoopRecords[nPages: LogBasic.maxInvalidLogPages].followingRecord];
LogBasic.CloseForPut[];
LogBasic.OpenForPut[nextPage: nextPageToWrite,
version: versionOfNextPageToWrite, nextRecord: nextRecord]
};
};--AnalysisPass
RoundUpToPages: PROC [recordID: RecordID] RETURNS [RecordID] = {
words: CARDINAL = LogInline.WordInPageFromRecordID[recordID];
RETURN [
IF words= 0 THEN recordID ELSE LogInline.AddC[recordID, wordsPerPage-words]];
};
firstTruncatedRecord, lastTruncatedRecord: LIST OF RecordID ← NIL;
NoteTruncatedRecord: PROC [record: RecordID] = {
new: LIST OF RecordID ← CONS [first: record, rest: NIL];
IF lastTruncatedRecord # NIL THEN lastTruncatedRecord.rest ← new;
lastTruncatedRecord ← new;
IF firstTruncatedRecord = NIL THEN firstTruncatedRecord ← new;
};
CheckpointCompleteAnalysisProc: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
We use this to find the keepRecord value in the LAST checkpoint record.
checkpointCompleteRecordBody: LogRep.CheckpointCompleteRecord;
IF LogBasic.GetCurrentRecord[currentRecord: record, to:
[base: @checkpointCompleteRecordBody,
length: SIZE[LogRep.CheckpointCompleteRecord]]].status = destinationFull OR
checkpointCompleteRecordBody.type # checkpointComplete THEN ERROR;
checkpointState.keepRecord ← checkpointCompleteRecordBody.keepRecordID;
checkpointState.checkpointSeen ← TRUE;
};
CheckpointState: TYPE = RECORD [
checkpointSeen: BOOLFALSE,
keepRecord: RecordID
];
checkpointState: CheckpointState;
UpdatePass: PROC [checkpointWord: WordNumber,
checkpointRecord, firstRecord, nextRecordToWrite: RecordID] = {
currentTruncatedRecord: LIST OF RecordID ← firstTruncatedRecord;
notStartOfRecord: BOOL;
currentRecord: RecordID;
expectTruncatedRecord: BOOL;
TurnOffRecoveryRegistration[];
[notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
LogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord,
checkpointRecord: checkpointRecord, firstRecord: firstRecord];
IF notStartOfRecord THEN ERROR InternalProgrammingError;
DO
expectTruncatedRecord ← FALSE;
{
currentRecordType: RecordType;
recoveryProc: Log.RecoveryProc;
IF currentTruncatedRecord # NIL THEN
SELECT Compare[currentRecord, currentTruncatedRecord.first] FROM
less => NULL;
equal => {
expectTruncatedRecord ← TRUE;
currentTruncatedRecord ← currentTruncatedRecord.rest;
GOTO GetNext };
greater => ERROR InternalProgrammingError;
ENDCASE => ERROR;
SELECT Compare[currentRecord, nextRecordToWrite] FROM
less => NULL;
equal => EXIT;
greater => ERROR InternalProgrammingError;
ENDCASE => ERROR;
IF (currentRecordType ← PeekCurrentType[currentRecord]) IN RecoveryProcsIndex AND
(recoveryProc ← recoveryProcs[currentRecordType]) # NIL THEN {
transID: TransID ← PeekCurrentTrans[currentRecord];
transHandle: TransactionMap.Handle = TransactionMap.GetHandle[transID];
IF transHandle # TransactionMap.nullHandle THEN {
s: TransactionMap.TransState ← TransactionMap.StateDuringRecovery[transHandle];
state: Log.TransState ← IF s = committed THEN committed
ELSE IF s = aborted THEN aborted ELSE ready;
recoveryProc[currentRecord, currentRecordType, transHandle, state];
};
};
GOTO GetNext;
EXITS
GetNext => {
endOfLog, truncatedRecord: BOOL; nextRecord: RecordID;
[endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] ←
LogBasic.AdvanceRecordStream[];
IF endOfLog AND (nextRecord # nextRecordToWrite) OR
(truncatedRecord # expectTruncatedRecord) THEN ERROR InternalProgrammingError;
currentRecord ← nextRecord;
};
};
ENDLOOP;
FilePageMgr.ForceOutEverything[];
};
PeekCurrentType: PROC [thisRecord: RecordID] RETURNS [recordType: RecordType] = {
recordTypeHeader: LogRep.RecordTypeHeader;
status: Log.ReadProcStatus;
[status: status] ← LogBasic.GetCurrentRecord[currentRecord: thisRecord,
to: [base: @recordTypeHeader, length: LogRep.RecordTypeHeader.SIZE]];
IF status = sourceExhausted THEN ERROR InternalProgrammingError;
RETURN [recordTypeHeader.type];
};
PeekCurrentTrans: PROC [thisRecord: RecordID] RETURNS [TransID] = {
transactionHeader: LogRep.TransactionHeader;
status: Log.ReadProcStatus;
[status: status] ← LogBasic.GetCurrentRecord[currentRecord: thisRecord,
to: [base: @transactionHeader, length: LogRep.TransactionHeader.SIZE]];
IF status = sourceExhausted THEN RETURN [nullTransID]
ELSE RETURN [transactionHeader.transID];
};
ReadForRecovery: PUBLIC PROC [
thisRecord: RecordID, wordsToSkip: CARDINAL ← 0, to: Block]
RETURNS [status: Log.ReadProcStatus, wordsRead: CARDINAL] = {
This is a Log.ReadProc.
IF wordsToSkip > LAST[CARDINAL] - SIZE[LogRep.TransactionHeader] THEN
wordsToSkip ← LAST[CARDINAL]
ELSE
wordsToSkip ← wordsToSkip + SIZE[LogRep.TransactionHeader];
[status: status, wordsRead: wordsRead] ←
LogBasic.GetCurrentRecord[currentRecord: thisRecord,
to: [base: NIL, length: wordsToSkip, rest: @to]];
wordsRead ← wordsRead - SIZE[LogRep.TransactionHeader];
};
Log writing
Exported to Log
Write: PUBLIC PROC [
trans: Worker.Handle, recordType: RecordType, recordData: Block, force: BOOL]
RETURNS [thisRecord, followingRecord: RecordID] = {
recordHeader: LogRep.TransactionHeader ← [type: recordType, transID: trans.transID];
[thisRecord, followingRecord] ← LogBasic.Put[
from: [base: @recordHeader, length: SIZE[LogRep.TransactionHeader], rest: @recordData],
force: force, writeID: FALSE];
};
CoordinatorWrite: PUBLIC PROC [
trans: Coordinator.Handle, recordType: RecordType, recordData: Block, force: BOOL]
RETURNS [thisRecord, followingRecord: RecordID] = {
recordHeader: LogRep.TransactionHeader ← [type: recordType, transID: trans.transID];
[thisRecord, followingRecord] ← LogBasic.Put[
from: [base: @recordHeader, length: SIZE[LogRep.TransactionHeader], rest: @recordData],
force: force, writeID: FALSE];
};
Force: PUBLIC PROC [followingRecord: RecordID] = {
IF followingRecord # nullRecordID THEN LogBasic.Force[followingRecord];
};
Miscellaneous info concerning the log.
Exported to LogControl
WordsUsableByClient: PUBLIC PROC [] RETURNS [INT] = {
RETURN [(3*LogBasic.LogFileSize[]/4)*wordsPerPage];
};
RecordIDOfNextWrite: PUBLIC PROC [] RETURNS [RecordID] = {
RETURN [LogBasic.RecordIDOfNextPut[]];
};
Log reading
Exported to Log
Read: PUBLIC PROC [
thisRecord: RecordID, wordsToSkip: CARDINAL, to: Log.Block]
RETURNS [status: Log.ReadProcStatus, wordsRead: CARDINAL] = {
IF wordsToSkip > LAST[CARDINAL] - SIZE[LogRep.TransactionHeader] THEN
wordsToSkip ← LAST[CARDINAL]
ELSE
wordsToSkip ← wordsToSkip + SIZE[LogRep.TransactionHeader];
[status: status, wordsRead: wordsRead] ← LogBasic.Get[
thisRecord: thisRecord, to: [base: NIL, length: wordsToSkip, rest: @to]];
wordsRead ← wordsRead - SIZE[LogRep.TransactionHeader];
};
Recovery procs
Exported to Log
noMoreRecoveryRegistration: BOOLFALSE;
TurnOffRecoveryRegistration: ENTRY PROC [] = INLINE {
noMoreRecoveryRegistration ← TRUE;
};
RecoveryProcsIndex: TYPE = RecordType [noop .. reserved);
RecoveryProcsArray: TYPE = ARRAY RecoveryProcsIndex OF Log.RecoveryProc;
recoveryProcs: REF RecoveryProcsArray =
AlpineZones.static.NEW[RecoveryProcsArray ← ALL [NIL]];
RegisterRecoveryProc: PUBLIC ENTRY PROC [
recordType: RecordType, recoveryProc: Log.RecoveryProc] = {
IF noMoreRecoveryRegistration THEN
RETURN WITH ERROR ClientProgrammingError;
IF recordType NOT IN RecoveryProcsIndex THEN
RETURN WITH ERROR InternalProgrammingError;
recoveryProcs[recordType] ← recoveryProc;
};
Analysis procs, checkpoint procs
Exported to LogControl
noMoreCheckpointRegistration: BOOLFALSE;
TurnOffCheckpointRegistration: ENTRY PROC [] = INLINE {
noMoreCheckpointRegistration ← TRUE;
};
IsCheckpointProcCallable: ENTRY PROC [] RETURNS [BOOL] = INLINE {
RETURN [noMoreCheckpointRegistration];
};
AnalysisProcsIndex: TYPE = RecordType [checkpointBegin .. workerComplete];
AnalysisProcsArray: TYPE = ARRAY AnalysisProcsIndex OF LogControl.AnalysisProc;
analysisProcs: REF AnalysisProcsArray =
AlpineZones.static.NEW[AnalysisProcsArray ← ALL [NIL]];
RegisterAnalysisProc: PUBLIC ENTRY PROC [
recordType: RecordType, analysisProc: LogControl.AnalysisProc] = {
IF noMoreCheckpointRegistration THEN
RETURN WITH ERROR ClientProgrammingError;
IF recordType NOT IN AnalysisProcsIndex THEN
RETURN WITH ERROR InternalProgrammingError;
analysisProcs[recordType] ← analysisProc;
};
checkpointProcs: ARRAY [0 .. maxCheckpointProcs) OF LogControl.CheckpointProc ← ALL [NIL];
maxCheckpointProcs: INT = 5;
nCheckpointProcs: INT [0 .. maxCheckpointProcs] ← 0;
RegisterCheckpointProc: PUBLIC ENTRY PROC [
checkpointProc: LogControl.CheckpointProc] = {
IF noMoreCheckpointRegistration THEN
RETURN WITH ERROR ClientProgrammingError;
IF nCheckpointProcs = maxCheckpointProcs THEN
RETURN WITH ERROR InternalProgrammingError;
checkpointProcs[nCheckpointProcs] ← checkpointProc;
nCheckpointProcs ← nCheckpointProcs + 1;
};
ForkCheckpointProcess: PUBLIC PROC [myFileStore: FileStore,
wakeupPeriodInSeconds: NAT] = {
Process.Detach[FORK CheckpointProcess[myFileStore, wakeupPeriodInSeconds]];
};
CheckpointProcess: PROC [myFileStore: FileStore, wakeupPeriodInSeconds: NAT] = {
keepRecord, startAnalysisRecord: RecordID ← nullRecordID;
DO
Process.Pause[Process.SecondsToTicks[wakeupPeriodInSeconds]];
[keepRecord, startAnalysisRecord] ←
Checkpoint[keepRecord, startAnalysisRecord, myFileStore];
SafeStorage.ReclaimCollectibleObjects[suspendMe: TRUE, traceAndSweep: FALSE];
ENDLOOP;
};
Checkpoint: PROC [
previousKeepRecord, previousStartAnalysisRecord: RecordID,
myFileStore: FileStore]
RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
Writes a checkpointCompleteRecord only if it contains a value different than
the previous checkpointCompleteRecord.
onlineLogLength: INT = AlpineInline.WordsFromPages[LogBasic.LogFileSize[]];
differentRecordIDsSeen: BOOLFALSE;
NoticeDifferentRecordIDs: PROC [old, new: RecordID] = {
SELECT Compare[old, new] FROM
less => differentRecordIDsSeen ← TRUE;
greater => ERROR InternalProgrammingError;
ENDCASE;
};
keepRecord ← startAnalysisRecord ← LogBasic.RecordIDOfNextPut[];
IF NOT IsCheckpointProcCallable[] THEN ERROR InternalProgrammingError;
FOR i: INT [0 .. maxCheckpointProcs] IN [0 .. nCheckpointProcs) DO
thisKeepRecord, thisStartAnalysisRecord: RecordID;
[thisKeepRecord, thisStartAnalysisRecord] ← checkpointProcs[i][];
keepRecord ← LogInline.Min[thisKeepRecord, keepRecord];
startAnalysisRecord ← LogInline.Min[thisStartAnalysisRecord, startAnalysisRecord]
ENDLOOP;
NoticeDifferentRecordIDs[previousKeepRecord, keepRecord];
NoticeDifferentRecordIDs[previousStartAnalysisRecord, startAnalysisRecord];
IF differentRecordIDsSeen THEN {
[] ← WriteCheckpointCompleteRecord[
startAnalysisRecord: startAnalysisRecord,
keepRecord: keepRecord,
myFileStore: myFileStore];
LogBasic.Release[beforeRecord: keepRecord];
previousKeepRecord ← keepRecord;
previousStartAnalysisRecord ← startAnalysisRecord;
};
RETURN [keepRecord, startAnalysisRecord];
};
RegisterAnalysisProc[
recordType: checkpointComplete,
analysisProc: CheckpointCompleteAnalysisProc];
END.
CHANGE LOG
Changed by MBrown on April 1, 1983 1:25 pm
Checkpoint process is now implemented here, not in InitProcsImpl(!).
Changed by MBrown on June 16, 1983 9:35 am
Fixed bug in AnalysisPass. After analysis is complete, we must write a few noop records to
clean up the log tail, but we failed to call LogBasic.Release[keepRecord] to tell lower-level
log routines that overwriting those pages is ok.
Changed by MBrown on June 27, 1983 10:06 am
Eliminated bug/feature in AnalysisPass that reclaimed "extra" log pages occupied by
a multi-page truncated record found at end of log. This never really worked
(it has caused several restart failures) and the LogBasic abstraction does not
allow for its clean implementation. How much is it worth?
Added call to LogBasic.AssertNormalOperation at end of restart; this permits
restart to write closer to the beginning of the log than normal clients are allowed to.
Changed by MBrown on January 30, 1984 10:20:09 am PST
Cedar 5.0: don't mention Environment.
Hauser, March 8, 1985 10:50:52 am PST
Nodified, added copyright.