YggLogImpl:
CEDAR MONITOR
IMPORTS
CamelotRecoverable, ConstArith, Process, SafeStorage, YggLog, YggLogBasic, YggLogInline, YggRestartFile, YggTransaction
EXPORTS
YggLog, YggLogControl =
BEGIN
FileStore: TYPE = YggEnvironment.FileStore;
TransID: TYPE = YggEnvironment.TransID;
nullTransID: TransID = YggEnvironment.nullTransID;
PageNumber: TYPE = YggEnvironment.PageNumber;
PageCount: TYPE = YggEnvironment.PageCount;
WordNumber: TYPE = YggLogBasic.WordNumber;
WordCount: TYPE = YggLogBasic.WordCount;
RecordID: TYPE = YggLog.RecordID;
nullRecordID: RecordID = YggLog.nullRecordID;
Block: TYPE = YggLog.Block;
RecordType: TYPE = YggLog.RecordType;
LogRecordPhaseType: TYPE = YggLog.LogRecordPhaseType;
ClientProgrammingError: ERROR = CODE;
InternalProgrammingError: ERROR = CODE;
WriteFailed: PUBLIC ERROR = CODE;
logDeviceCharacteristics: PUBLIC YggEnvironment.DeviceCharacteristics;
wordsPerPage: PUBLIC CARD; -- words in a Log page
bytesPerPage: PUBLIC CARD; -- bytes in a Log page
lastRecordWithUndo: RecordID ← nullRecordID;
WordsINWORDS: CARD = WORDS[CARD32];
Format:
PUBLIC
PROC [ ] = {
logSize: PageCount;
thisCheckpointBeginRecord: RecordID;
YggLogBasic.EstablishLogFile[];
logSize ← YggLogBasic.LogFileSize[];
logDeviceCharacteristics ← NEW[YggEnvironment.DeviceCharacteristicsRep ← [wordsPerPage: 1024/BYTES[WORD], bytesPerPage: 1024, logWordsPerPage: 8, logBytesPerPage: 10]];
wordsPerPage ← logDeviceCharacteristics.wordsPerPage;
bytesPerPage ← wordsPerPage * Basics.bytesPerWord;
Write a noop record on each page of the log file. A noop record is precisely
one page long.
YggLogBasic.OpenForPut[nextPage: 0, version: 0, nextRecord: nullRecordID];
YggLogBasic.Release[beforeRecord: nullRecordID];
YggLogBasic.Release[beforeRecord: WriteNoopRecords[logSize/2].followingRecord];
[] ← WriteNoopRecords[logSize - logSize/2];
Write a CheckpointBegin record, then a CheckpointComplete record pointing
to it (and the restart record in the restart file).
thisCheckpointBeginRecord ← YggLogBasic.RecordIDOfNextPut[];
If all has gone well, the CheckpointBegin record was written as first record on
page 0 of the log file.
IF YggLogBasic.WordNumberFromRecordID[thisCheckpointBeginRecord] # YggLog.nullRecordID
THEN
ERROR InternalProgrammingError;
YggLogBasic.Release[beforeRecord: thisCheckpointBeginRecord];
[] ← WriteCheckpointCompleteRecord[
startAnalysisRecord: thisCheckpointBeginRecord,
keepRecord: thisCheckpointBeginRecord,
length: SIZE[YggLogRep.CheckpointCompleteRecord]/WordsINWORDS
];
YggLogBasic.CloseForPut[];
};
WriteNoopRecords:
PUBLIC
PROC [nPages: PageCount]
RETURNS [followingRecord: RecordID] = {
This proc has a large frame.
noopRecordBody: YggLogRep.NoopRecord ← [];
FOR page: PageCount ← 0, page+1
UNTIL page = nPages
DO
TRUSTED{[followingRecord: followingRecord] ← YggLogBasic.Put[from:
[base: @noopRecordBody, length: SIZE[YggLogRep.NoopRecord]/SIZE[CARD32]], optr: [[0], 0, 0]];};
ENDLOOP;
RETURN [followingRecord];
};
WriteCheckpointCompleteRecord:
PROC [
startAnalysisRecord, keepRecord: RecordID, length: INT]
RETURNS [thisRecord: RecordID] = {
This call waits both for a log force and for a restart file write before returning.
checkpointCompleteRecordBody: YggLogRep.CheckpointCompleteRecord ←
[startAnalysisRecordID: startAnalysisRecord, keepRecordID: keepRecord];
TRUSTED {
[thisRecord: thisRecord] ← YggLogBasic.Put[
from: [base: @checkpointCompleteRecordBody, length: length],
optr: [[0], 0, 0],
writeID: TRUE,
force: TRUE];
};
YggRestartFile.WriteRestartRecord[YggLogBasic.WordNumberFromRecordID[thisRecord], thisRecord];
};
DiscoverWhereToStartAnalysisPass:
PUBLIC PROC [errorOnBadCheckpoint: BOOL]
RETURNS [
ok: BOOL, checkpointWord: WordNumber, checkpointRecord, startAnalysisRecord: RecordID] = {
CheckpointCompleteRecordFromWord:
PROC [checkpointWord: WordNumber]
RETURNS [
ok: BOOL, checkpointRecord, startAnalysisRecord: RecordID] = {
Returns ok = FALSE if any error detected.
{
checkpointCompleteRecordBody: YggLogRep.CheckpointCompleteRecord;
notStartOfRecord: BOOL;
currentRecord: RecordID;
[notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
YggLogBasic.OpenRecordStreamFromWord[checkpointWord ! YggLogBasic.InvalidPage =>
GOTO failReturn ];
IF notStartOfRecord THEN GOTO failReturn;
IF YggLogBasic.CheckCurrentRecord[].truncated THEN GOTO failReturn;
TRUSTED {
IF YggLogBasic.GetCurrentRecord[currentRecord, [base: @checkpointCompleteRecordBody,
length: SIZE[YggLogRep.CheckpointCompleteRecord]]].status = destinationFull OR
checkpointCompleteRecordBody.type # checkpointComplete THEN GOTO failReturn;
};
checkpointRecord ← checkpointCompleteRecordBody.thisRecordID;
startAnalysisRecord ← checkpointCompleteRecordBody.thisRecordID;
ok ← TRUE;
EXITS
failReturn => { ok ← FALSE };
};
YggLogBasic.CloseRecordStream[];
RETURN [ok, checkpointRecord, startAnalysisRecord];
};--CheckpointCompleteRecordFromWord
checkpointRecordFromRestartFile: RecordID;
[checkpointWord, checkpointRecordFromRestartFile] ←
YggRestartFile.ReadRestartRecord[];
[ok, checkpointRecord, startAnalysisRecord] ←
CheckpointCompleteRecordFromWord[checkpointWord];
IF errorOnBadCheckpoint AND (NOT ok OR checkpointRecord # checkpointRecordFromRestartFile) THEN ERROR;
RETURN [ok, checkpointWord, checkpointRecord, startAnalysisRecord];
};--DiscoverWhereToStartAnalysisPass
Recover:
PUBLIC
PROC [] = {
checkpointWord: WordNumber; checkpointRecord: RecordID;
startAnalysisRecord: RecordID;
[] ← YggLogBasic.EstablishLogFile[];
[ok, checkpointWord, checkpointRecord, startAnalysisRecord] ←
DiscoverWhereToStartAnalysisPass[TRUE];
YggRecoveryPassProcs.CalledBeforeAnalysisPass[];
AnalysisPass[checkpointWord, checkpointRecord, startAnalysisRecord];
YggRecoveryPassProcs.CalledAfterAnalysisPass[];
This writes log records. To make them available for reading by the update
pass, they must be forced.
YggLogBasic.Force[followingRecord: YggLogBasic.RecordIDOfNextPut[]];
YggRecoveryPassProcs.CalledBeforeUpdatePass[];
UpdatePass[checkpointWord, checkpointRecord, startAnalysisRecord,
YggLogBasic.RecordIDOfNextPut[]];
YggRecoveryPassProcs.CalledAfterUpdatePass[];
[] ← Checkpoint[nullRecordID, nullRecordID];
YggLogBasic.AssertNormalOperation[];
CamelotRecoverable.CamelotRecoveryComplete[];
};--Recover
AnalysisPass:
PROC [
checkpointWord: WordNumber, checkpointRecord, firstRecord: RecordID] = {
Scans log, calling procs registered for analysis pass of recovery.
currentRecord, nextRecord: RecordID;
TurnOffCheckpointRegistration[];
{ notStartOfRecord:
BOOL;
[notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
YggLogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord,
checkpointRecord: checkpointRecord, firstRecord: firstRecord];
IF notStartOfRecord THEN ERROR InternalProgrammingError;
};
DO
currentRecordType: RecordType ← PeekCurrentType[currentRecord]; -- no errors
analysisProc: YggLogControl.AnalysisProc;
IF currentRecordType
IN AnalysisProcsIndex
AND
(analysisProc ← analysisProcs[currentRecordType]) # NIL AND
NOT YggLogBasic.CheckCurrentRecord[].truncated
THEN {
transID: TransID ← PeekCurrentTrans[currentRecord];
analysisProc[currentRecord, currentRecordType, transID];
};
{ endOfLog, truncatedRecord:
BOOL;
[endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] ←
YggLogBasic.AdvanceRecordStream[];
IF truncatedRecord THEN NoteTruncatedRecord[currentRecord];
IF endOfLog THEN EXIT;
};
currentRecord ← nextRecord;
ENDLOOP;
Analysis complete. If no checkpoint seen, we are in big trouble.
IF NOT checkpointState.checkpointSeen THEN ERROR InternalProgrammingError;
Overwrite enough pages past the end to ensure that we see no inconsistency due
to out-of-order log page writes. Then open log for writing at correct position.
{ nextPageToWrite: PageNumber; versionOfNextPageToWrite: YggLogRep.PageVersion;
[nextPageToWrite, versionOfNextPageToWrite] ← YggLogBasic.GetCurrentPageAndVersion[];
YggLogBasic.CloseRecordStream[];
YggLogBasic.OpenForPut[nextPage: nextPageToWrite,
version: 1-versionOfNextPageToWrite, nextRecord: nextRecord];
YggLogBasic.Release[beforeRecord: checkpointState.keepRecord];
YggLogBasic.Force[followingRecord:
WriteNoopRecords[nPages: YggLogBasic.maxInvalidLogPages].followingRecord];
YggLogBasic.CloseForPut[];
YggLogBasic.OpenForPut[nextPage: nextPageToWrite,
version: versionOfNextPageToWrite, nextRecord: nextRecord]
};
};--AnalysisPass
RoundUpToPages:
PROC [recordID: RecordID]
RETURNS [RecordID] = {
words: CARDINAL = YggLogInline.WordInPageFromRecordID[recordID, YggLog.wordsPerPage];
RETURN [
IF words= 0 THEN recordID ELSE YggLogInline.AddC[recordID, YggLog.wordsPerPage-words]];
};
firstTruncatedRecord, lastTruncatedRecord: LIST OF RecordID ← NIL;
NoteTruncatedRecord:
PROC [record: RecordID] = {
new: LIST OF RecordID ← CONS [first: record, rest: NIL];
IF lastTruncatedRecord # NIL THEN lastTruncatedRecord.rest ← new;
lastTruncatedRecord ← new;
IF firstTruncatedRecord = NIL THEN firstTruncatedRecord ← new;
};
CheckpointCompleteAnalysisProc:
PROC [
record: RecordID, type: RecordType, trans: TransID] = TRUSTED {
We use this to find the keepRecord value in the LAST checkpoint record.
checkpointCompleteRecordBody: YggLogRep.CheckpointCompleteRecord;
IF YggLogBasic.GetCurrentRecord[currentRecord: record, to:
[base: @checkpointCompleteRecordBody,
length: SIZE[YggLogRep.CheckpointCompleteRecord]]].status = destinationFull OR
checkpointCompleteRecordBody.type # checkpointComplete THEN ERROR;
checkpointState.keepRecord ← checkpointCompleteRecordBody.keepRecordID;
checkpointState.checkpointSeen ← TRUE;
};
CheckpointState:
TYPE =
RECORD [
checkpointSeen: BOOL ← FALSE,
keepRecord: RecordID
];
checkpointState: CheckpointState;
UpdatePass:
PROC [checkpointWord: WordNumber,
checkpointRecord, firstRecord, nextRecordToWrite: RecordID] = {
currentTruncatedRecord: LIST OF RecordID ← firstTruncatedRecord;
notStartOfRecord: BOOL;
currentRecord: RecordID;
expectTruncatedRecord: BOOL;
TurnOffRecoveryRegistration[];
[notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] ←
YggLogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord,
checkpointRecord: checkpointRecord, firstRecord: firstRecord];
IF notStartOfRecord THEN ERROR InternalProgrammingError;
DO
expectTruncatedRecord ← FALSE;
{
currentRecordType: RecordType;
recoveryProc: YggLog.RecoveryProc;
IF currentTruncatedRecord #
NIL
THEN TRUSTED {
SELECT ConstArith.Compare[currentRecord, currentTruncatedRecord.first]
FROM
less => NULL;
equal => {
expectTruncatedRecord ← TRUE;
currentTruncatedRecord ← currentTruncatedRecord.rest;
GOTO GetNext };
greater => ERROR InternalProgrammingError;
ENDCASE => ERROR;
};
TRUSTED {
SELECT ConstArith.Compare[currentRecord, nextRecordToWrite]
FROM
less => NULL;
equal => EXIT;
greater => ERROR InternalProgrammingError;
ENDCASE => ERROR;
};
IF (currentRecordType ← PeekCurrentType[currentRecord])
IN RecoveryProcsIndex
AND
(recoveryProc ← recoveryProcs[currentRecordType]) # NIL THEN {
transID: TransID ← PeekCurrentTrans[currentRecord];
IF ~YggTransaction.IsNullTrans[transID]
THEN {
s: YggTransaction.Outcome ← YggTransaction.StateDuringRecovery[transID];
recoveryProc[currentRecord, currentRecordType, transID, s];
};
};
GOTO GetNext;
EXITS
GetNext => {
endOfLog, truncatedRecord: BOOL; nextRecord: RecordID;
[endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] ←
YggLogBasic.AdvanceRecordStream[];
IF endOfLog
AND (nextRecord # nextRecordToWrite)
OR
(truncatedRecord # expectTruncatedRecord) THEN ERROR InternalProgrammingError;
currentRecord ← nextRecord;
};
};
ENDLOOP;
FilePageMgr.ForceOutEverything[];
};
PeekCurrentType:
PUBLIC PROC [thisRecord: RecordID]
RETURNS [recordType: RecordType] = {
recordTypeHeader: YggLogRep.TransactionHeader; -- was RecordTypeHeader
status: YggLog.ReadProcStatus;
TRUSTED {[status: status] ← YggLogBasic.GetCurrentRecord[currentRecord: thisRecord,
to: [base: @recordTypeHeader, length: WORDS[CARD32]]]; }; -- size was YggLogRep.RecordTypeHeader.SIZE, but that is 1 and is a problem on the D-machines for PBasics.Copy
IF status = sourceExhausted THEN ERROR InternalProgrammingError;
RETURN [recordTypeHeader.type];
};
PeekCurrentTrans:
PROC [thisRecord: RecordID]
RETURNS [TransID] = {
transactionHeader: YggLogRep.TransactionHeader;
status: YggLog.ReadProcStatus;
TRUSTED {[status: status] ← YggLogBasic.GetCurrentRecord[currentRecord: thisRecord,
to: [base: @transactionHeader, length: YggLogRep.TransactionHeader.SIZE]]; };
IF status = sourceExhausted THEN RETURN [nullTransID]
ELSE RETURN [transactionHeader.transID];
};
PeekCurrentTransactionHeader:
PUBLIC PROC [thisRecord: RecordID]
RETURNS [YggLogRep.TransactionHeader] = {
transactionHeader: YggLogRep.TransactionHeader;
status: YggLog.ReadProcStatus;
TRUSTED {[status: status] ← YggLogBasic.GetCurrentRecord[currentRecord: thisRecord,
to: [base: @transactionHeader, length: YggLogRep.TransactionHeader.SIZE]]; };
IF status = sourceExhausted THEN RETURN [[0, noop, 0, nullTransID, [[0],0,0]]]
ELSE RETURN [transactionHeader];
};
ReadForRecovery:
PUBLIC
PROC [
thisRecord: RecordID, wordsToSkip: CARDINAL ← 0, to: Block]
RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = {
This is a YggLog.ReadProc.
IF wordsToSkip >
LAST[
CARDINAL] -
SIZE[YggLogRep.TransactionHeader]
THEN
wordsToSkip ← LAST[CARDINAL]
ELSE
wordsToSkip ← wordsToSkip + SIZE[YggLogRep.TransactionHeader];
TRUSTED {[status: status, wordsRead: wordsRead] ←
YggLogBasic.GetCurrentRecord[currentRecord: thisRecord,
to: [base: NIL, length: wordsToSkip, rest: @to]]; };
wordsRead ← wordsRead - SIZE[YggLogRep.TransactionHeader];
};
YggLog writing
Exported to YggLog
LengthCutOff: CARD32 ← 10;
LastOffset: CARD32 ← 55;
LastWordCount: CARD ← 1066;
SameAgain: CARD ← 0;
Write:
PUBLIC
PROC [trans: YggEnvironment.TransID, logRecordPhaseType: LogRecordPhaseType, recordType: RecordType, optr: Camelot.optrT,
recordData: Block, force:
BOOL, writeID:
BOOL ]
RETURNS [thisRecord, followingRecord: RecordID, thisWordNumber: YggEnvironment.WordNumber] = {
recordThisAddress: ENTRY PROC ~ {
IF recordData.length >= LengthCutOff THEN {
IF optr.lowOffset = LastOffset AND recordData.length = LastWordCount THEN SameAgain ← SameAgain + 1;
};
LastOffset ← optr.lowOffset;
LastWordCount ← recordData.length;
};
recordHeader: YggLogRep.TransactionHeader ← [type: recordType, transID: trans, optr: optr];
recordThisAddress[];
TRUSTED {[thisRecord, followingRecord, thisWordNumber] ← YggLogBasic.Put[
from: [base: @recordHeader, length: WORDS[YggLogRep.TransactionHeader]/WordsINWORDS, rest: @recordData],
optr: optr,
force: force,
writeID: writeID]; };
IF logRecordPhaseType = undo OR logRecordPhaseType = redoUndo THEN lastRecordWithUndo ← thisRecord;
};
Force:
PUBLIC
PROC [followingRecord: RecordID] = {
IF followingRecord # nullRecordID THEN YggLogBasic.Force[followingRecord];
};
Analysis procs, checkpoint procs
Exported to YggLogControl
noMoreCheckpointRegistration: BOOL ← FALSE;
TurnOffCheckpointRegistration:
ENTRY
PROC [] =
INLINE {
noMoreCheckpointRegistration ← TRUE;
};
IsCheckpointProcCallable:
ENTRY
PROC []
RETURNS [
BOOL] =
INLINE {
RETURN [noMoreCheckpointRegistration];
};
AnalysisProcsIndex: TYPE = RecordType [noop .. writeBytes];
AnalysisProcsArray: TYPE = ARRAY AnalysisProcsIndex OF YggLogControl.AnalysisProc;
analysisProcs:
REF AnalysisProcsArray =
SafeStorage.GetSystemZone[].NEW[AnalysisProcsArray ← ALL [NIL]];
RegisterAnalysisProc:
PUBLIC
ENTRY
PROC [
recordType: RecordType, analysisProc: YggLogControl.AnalysisProc] = {
IF noMoreCheckpointRegistration
THEN
RETURN WITH ERROR ClientProgrammingError;
IF recordType
NOT
IN AnalysisProcsIndex
THEN
RETURN WITH ERROR InternalProgrammingError;
analysisProcs[recordType] ← analysisProc;
};
checkpointProcs: ARRAY [0 .. maxCheckpointProcs) OF YggLogControl.CheckpointProc ← ALL [NIL];
maxCheckpointProcs: INT = 5;
nCheckpointProcs: INT [0 .. maxCheckpointProcs] ← 0;
RegisterCheckpointProc:
PUBLIC
ENTRY
PROC [
checkpointProc: YggLogControl.CheckpointProc] = {
IF noMoreCheckpointRegistration
THEN
RETURN WITH ERROR ClientProgrammingError;
IF nCheckpointProcs = maxCheckpointProcs
THEN
RETURN WITH ERROR InternalProgrammingError;
checkpointProcs[nCheckpointProcs] ← checkpointProc;
nCheckpointProcs ← nCheckpointProcs + 1;
};
ForkCheckpointYggDummyProcess:
PUBLIC
PROC [
wakeupPeriodInSeconds: NAT] = TRUSTED {
Process.Detach[FORK CheckpointYggDummyProcess[wakeupPeriodInSeconds]];
};
CheckpointYggDummyProcess:
PROC [wakeupPeriodInSeconds:
NAT] = {
keepRecord, startAnalysisRecord: RecordID ← nullRecordID;
Process.SetPriority[Process.priorityForeground];
DO
Process.Pause[Process.SecondsToTicks[wakeupPeriodInSeconds]];
[keepRecord, startAnalysisRecord] ←
Checkpoint[keepRecord, startAnalysisRecord];
SafeStorage.ReclaimCollectibleObjects[suspendMe: TRUE, traceAndSweep: FALSE];
ENDLOOP;
};
Checkpoint:
PROC [
previousKeepRecord, previousStartAnalysisRecord: RecordID]
RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
Writes a checkpointCompleteRecord only if it contains a value different than
the previous checkpointCompleteRecord.
differentRecordIDsSeen: BOOL ← FALSE;
NoticeDifferentRecordIDs:
PROC [old, new: RecordID] = {
comp: PBasics.Comparison;
TRUSTED {comp ← ConstArith.Compare[old, new]; };
SELECT comp
FROM
less => differentRecordIDsSeen ← TRUE;
greater => ERROR InternalProgrammingError;
ENDCASE;
};
keepRecord ← startAnalysisRecord ← YggLogBasic.RecordIDOfNextPut[];
IF NOT IsCheckpointProcCallable[] THEN ERROR InternalProgrammingError;
FOR i:
INT [0 .. maxCheckpointProcs]
IN [0 .. nCheckpointProcs)
DO
thisKeepRecord, thisStartAnalysisRecord: RecordID;
[thisKeepRecord, thisStartAnalysisRecord] ← checkpointProcs[i][];
keepRecord ← YggLogInline.Min[thisKeepRecord, keepRecord];
startAnalysisRecord ← YggLogInline.Min[thisStartAnalysisRecord, startAnalysisRecord]
ENDLOOP;
NoticeDifferentRecordIDs[previousKeepRecord, keepRecord];
NoticeDifferentRecordIDs[previousStartAnalysisRecord, startAnalysisRecord];
IF differentRecordIDsSeen
THEN {
[] ← WriteCheckpointCompleteRecord[
startAnalysisRecord: startAnalysisRecord,
keepRecord: keepRecord,
length: 128 -- fix this
];
YggLogBasic.Release[beforeRecord: keepRecord];
previousKeepRecord ← keepRecord;
previousStartAnalysisRecord ← startAnalysisRecord;
};
RETURN [keepRecord, startAnalysisRecord];
};
RegisterAnalysisProc[
recordType: checkpointComplete,
analysisProc: CheckpointCompleteAnalysisProc];
END.