<> <> <> <> <> DIRECTORY Basics, Camelot, CamelotRecoverable, ConstArith, PBasics, Process, Rope, SafeStorage, YggEnvironment, YggInternal, YggLog, YggLogBasic, YggLogControl, YggLogInline, YggLogRep, YggRecoveryPassProcs, YggRestartFile, YggTransaction; YggLogImpl: CEDAR MONITOR IMPORTS CamelotRecoverable, ConstArith, Process, SafeStorage, YggLog, YggLogBasic, YggLogInline, YggRestartFile, YggTransaction EXPORTS YggLog, YggLogControl = BEGIN FileStore: TYPE = YggEnvironment.FileStore; TransID: TYPE = YggEnvironment.TransID; nullTransID: TransID = YggEnvironment.nullTransID; PageNumber: TYPE = YggEnvironment.PageNumber; PageCount: TYPE = YggEnvironment.PageCount; WordNumber: TYPE = YggLogBasic.WordNumber; WordCount: TYPE = YggLogBasic.WordCount; RecordID: TYPE = YggLog.RecordID; nullRecordID: RecordID = YggLog.nullRecordID; Block: TYPE = YggLog.Block; RecordType: TYPE = YggLog.RecordType; LogRecordPhaseType: TYPE = YggLog.LogRecordPhaseType; ClientProgrammingError: ERROR = CODE; InternalProgrammingError: ERROR = CODE; WriteFailed: PUBLIC ERROR = CODE; logDeviceCharacteristics: PUBLIC YggEnvironment.DeviceCharacteristics; wordsPerPage: PUBLIC CARD; -- words in a Log page bytesPerPage: PUBLIC CARD; -- bytes in a Log page lastRecordWithUndo: RecordID _ nullRecordID; WordsINWORDS: CARD = WORDS[CARD32]; Format: PUBLIC PROC [ ] = { logSize: PageCount; thisCheckpointBeginRecord: RecordID; YggLogBasic.EstablishLogFile[]; logSize _ YggLogBasic.LogFileSize[]; logDeviceCharacteristics _ NEW[YggEnvironment.DeviceCharacteristicsRep _ [wordsPerPage: 1024/BYTES[WORD], bytesPerPage: 1024, logWordsPerPage: 8, logBytesPerPage: 10]]; wordsPerPage _ logDeviceCharacteristics.wordsPerPage; bytesPerPage _ wordsPerPage * Basics.bytesPerWord; <> <> YggLogBasic.OpenForPut[nextPage: 0, version: 0, nextRecord: nullRecordID]; YggLogBasic.Release[beforeRecord: nullRecordID]; YggLogBasic.Release[beforeRecord: WriteNoopRecords[logSize/2].followingRecord]; [] _ WriteNoopRecords[logSize - logSize/2]; <> <> thisCheckpointBeginRecord _ YggLogBasic.RecordIDOfNextPut[]; <> <> IF YggLogBasic.WordNumberFromRecordID[thisCheckpointBeginRecord] # YggLog.nullRecordID THEN ERROR InternalProgrammingError; YggLogBasic.Release[beforeRecord: thisCheckpointBeginRecord]; [] _ WriteCheckpointCompleteRecord[ startAnalysisRecord: thisCheckpointBeginRecord, keepRecord: thisCheckpointBeginRecord, length: SIZE[YggLogRep.CheckpointCompleteRecord]/WordsINWORDS ]; YggLogBasic.CloseForPut[]; }; WriteNoopRecords: PUBLIC PROC [nPages: PageCount] RETURNS [followingRecord: RecordID] = { <> noopRecordBody: YggLogRep.NoopRecord _ []; FOR page: PageCount _ 0, page+1 UNTIL page = nPages DO TRUSTED{[followingRecord: followingRecord] _ YggLogBasic.Put[from: [base: @noopRecordBody, length: SIZE[YggLogRep.NoopRecord]/SIZE[CARD32]], optr: [[0], 0, 0]];}; ENDLOOP; RETURN [followingRecord]; }; WriteCheckpointCompleteRecord: PROC [ startAnalysisRecord, keepRecord: RecordID, length: INT] RETURNS [thisRecord: RecordID] = { <> checkpointCompleteRecordBody: YggLogRep.CheckpointCompleteRecord _ [startAnalysisRecordID: startAnalysisRecord, keepRecordID: keepRecord]; TRUSTED { [thisRecord: thisRecord] _ YggLogBasic.Put[ from: [base: @checkpointCompleteRecordBody, length: length], optr: [[0], 0, 0], writeID: TRUE, force: TRUE]; }; YggRestartFile.WriteRestartRecord[YggLogBasic.WordNumberFromRecordID[thisRecord], thisRecord]; }; DiscoverWhereToStartAnalysisPass: PUBLIC PROC [errorOnBadCheckpoint: BOOL] RETURNS [ ok: BOOL, checkpointWord: WordNumber, checkpointRecord, startAnalysisRecord: RecordID] = { CheckpointCompleteRecordFromWord: PROC [checkpointWord: WordNumber] RETURNS [ ok: BOOL, checkpointRecord, startAnalysisRecord: RecordID] = { <> { checkpointCompleteRecordBody: YggLogRep.CheckpointCompleteRecord; notStartOfRecord: BOOL; currentRecord: RecordID; [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] _ YggLogBasic.OpenRecordStreamFromWord[checkpointWord ! YggLogBasic.InvalidPage => GOTO failReturn ]; IF notStartOfRecord THEN GOTO failReturn; IF YggLogBasic.CheckCurrentRecord[].truncated THEN GOTO failReturn; TRUSTED { IF YggLogBasic.GetCurrentRecord[currentRecord, [base: @checkpointCompleteRecordBody, length: SIZE[YggLogRep.CheckpointCompleteRecord]]].status = destinationFull OR checkpointCompleteRecordBody.type # checkpointComplete THEN GOTO failReturn; }; checkpointRecord _ checkpointCompleteRecordBody.thisRecordID; startAnalysisRecord _ checkpointCompleteRecordBody.thisRecordID; ok _ TRUE; EXITS failReturn => { ok _ FALSE }; }; YggLogBasic.CloseRecordStream[]; RETURN [ok, checkpointRecord, startAnalysisRecord]; };--CheckpointCompleteRecordFromWord checkpointRecordFromRestartFile: RecordID; [checkpointWord, checkpointRecordFromRestartFile] _ YggRestartFile.ReadRestartRecord[]; [ok, checkpointRecord, startAnalysisRecord] _ CheckpointCompleteRecordFromWord[checkpointWord]; IF errorOnBadCheckpoint AND (NOT ok OR checkpointRecord # checkpointRecordFromRestartFile) THEN ERROR; RETURN [ok, checkpointWord, checkpointRecord, startAnalysisRecord]; };--DiscoverWhereToStartAnalysisPass Recover: PUBLIC PROC [] = { checkpointWord: WordNumber; checkpointRecord: RecordID; startAnalysisRecord: RecordID; ok: BOOL; [] _ YggLogBasic.EstablishLogFile[]; [ok, checkpointWord, checkpointRecord, startAnalysisRecord] _ DiscoverWhereToStartAnalysisPass[TRUE]; <> AnalysisPass[checkpointWord, checkpointRecord, startAnalysisRecord]; <> <> <> YggLogBasic.Force[followingRecord: YggLogBasic.RecordIDOfNextPut[]]; <> UpdatePass[checkpointWord, checkpointRecord, startAnalysisRecord, YggLogBasic.RecordIDOfNextPut[]]; <> [] _ Checkpoint[nullRecordID, nullRecordID]; YggLogBasic.AssertNormalOperation[]; CamelotRecoverable.CamelotRecoveryComplete[]; };--Recover AnalysisPass: PROC [ checkpointWord: WordNumber, checkpointRecord, firstRecord: RecordID] = { <> currentRecord, nextRecord: RecordID; TurnOffCheckpointRegistration[]; { notStartOfRecord: BOOL; [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] _ YggLogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord, checkpointRecord: checkpointRecord, firstRecord: firstRecord]; IF notStartOfRecord THEN ERROR InternalProgrammingError; }; DO currentRecordType: RecordType _ PeekCurrentType[currentRecord]; -- no errors analysisProc: YggLogControl.AnalysisProc; IF currentRecordType IN AnalysisProcsIndex AND (analysisProc _ analysisProcs[currentRecordType]) # NIL AND NOT YggLogBasic.CheckCurrentRecord[].truncated THEN { transID: TransID _ PeekCurrentTrans[currentRecord]; analysisProc[currentRecord, currentRecordType, transID]; }; { endOfLog, truncatedRecord: BOOL; [endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] _ YggLogBasic.AdvanceRecordStream[]; IF truncatedRecord THEN NoteTruncatedRecord[currentRecord]; IF endOfLog THEN EXIT; }; currentRecord _ nextRecord; ENDLOOP; <> IF NOT checkpointState.checkpointSeen THEN ERROR InternalProgrammingError; <> <> { nextPageToWrite: PageNumber; versionOfNextPageToWrite: YggLogRep.PageVersion; [nextPageToWrite, versionOfNextPageToWrite] _ YggLogBasic.GetCurrentPageAndVersion[]; YggLogBasic.CloseRecordStream[]; YggLogBasic.OpenForPut[nextPage: nextPageToWrite, version: 1-versionOfNextPageToWrite, nextRecord: nextRecord]; YggLogBasic.Release[beforeRecord: checkpointState.keepRecord]; YggLogBasic.Force[followingRecord: WriteNoopRecords[nPages: YggLogBasic.maxInvalidLogPages].followingRecord]; YggLogBasic.CloseForPut[]; YggLogBasic.OpenForPut[nextPage: nextPageToWrite, version: versionOfNextPageToWrite, nextRecord: nextRecord] }; };--AnalysisPass RoundUpToPages: PROC [recordID: RecordID] RETURNS [RecordID] = { words: CARDINAL = YggLogInline.WordInPageFromRecordID[recordID, YggLog.wordsPerPage]; RETURN [ IF words= 0 THEN recordID ELSE YggLogInline.AddC[recordID, YggLog.wordsPerPage-words]]; }; firstTruncatedRecord, lastTruncatedRecord: LIST OF RecordID _ NIL; NoteTruncatedRecord: PROC [record: RecordID] = { new: LIST OF RecordID _ CONS [first: record, rest: NIL]; IF lastTruncatedRecord # NIL THEN lastTruncatedRecord.rest _ new; lastTruncatedRecord _ new; IF firstTruncatedRecord = NIL THEN firstTruncatedRecord _ new; }; CheckpointCompleteAnalysisProc: PROC [ record: RecordID, type: RecordType, trans: TransID] = TRUSTED { <> checkpointCompleteRecordBody: YggLogRep.CheckpointCompleteRecord; IF YggLogBasic.GetCurrentRecord[currentRecord: record, to: [base: @checkpointCompleteRecordBody, length: SIZE[YggLogRep.CheckpointCompleteRecord]]].status = destinationFull OR checkpointCompleteRecordBody.type # checkpointComplete THEN ERROR; checkpointState.keepRecord _ checkpointCompleteRecordBody.keepRecordID; checkpointState.checkpointSeen _ TRUE; }; CheckpointState: TYPE = RECORD [ checkpointSeen: BOOL _ FALSE, keepRecord: RecordID ]; checkpointState: CheckpointState; UpdatePass: PROC [checkpointWord: WordNumber, checkpointRecord, firstRecord, nextRecordToWrite: RecordID] = { currentTruncatedRecord: LIST OF RecordID _ firstTruncatedRecord; notStartOfRecord: BOOL; currentRecord: RecordID; expectTruncatedRecord: BOOL; TurnOffRecoveryRegistration[]; [notStartOfRecord: notStartOfRecord, currentRecord: currentRecord] _ YggLogBasic.OpenRecordStreamFromCheckpoint[checkpointWord: checkpointWord, checkpointRecord: checkpointRecord, firstRecord: firstRecord]; IF notStartOfRecord THEN ERROR InternalProgrammingError; DO expectTruncatedRecord _ FALSE; { currentRecordType: RecordType; recoveryProc: YggLog.RecoveryProc; IF currentTruncatedRecord # NIL THEN TRUSTED { SELECT ConstArith.Compare[currentRecord, currentTruncatedRecord.first] FROM less => NULL; equal => { expectTruncatedRecord _ TRUE; currentTruncatedRecord _ currentTruncatedRecord.rest; GOTO GetNext }; greater => ERROR InternalProgrammingError; ENDCASE => ERROR; }; TRUSTED { SELECT ConstArith.Compare[currentRecord, nextRecordToWrite] FROM less => NULL; equal => EXIT; greater => ERROR InternalProgrammingError; ENDCASE => ERROR; }; IF (currentRecordType _ PeekCurrentType[currentRecord]) IN RecoveryProcsIndex AND (recoveryProc _ recoveryProcs[currentRecordType]) # NIL THEN { transID: TransID _ PeekCurrentTrans[currentRecord]; IF ~YggTransaction.IsNullTrans[transID] THEN { s: YggTransaction.Outcome _ YggTransaction.StateDuringRecovery[transID]; recoveryProc[currentRecord, currentRecordType, transID, s]; }; }; GOTO GetNext; EXITS GetNext => { endOfLog, truncatedRecord: BOOL; nextRecord: RecordID; [endOfLog: endOfLog, truncatedRecord: truncatedRecord, currentRecord: nextRecord] _ YggLogBasic.AdvanceRecordStream[]; IF endOfLog AND (nextRecord # nextRecordToWrite) OR (truncatedRecord # expectTruncatedRecord) THEN ERROR InternalProgrammingError; currentRecord _ nextRecord; }; }; ENDLOOP; <> }; PeekCurrentType: PUBLIC PROC [thisRecord: RecordID] RETURNS [recordType: RecordType] = { recordTypeHeader: YggLogRep.TransactionHeader; -- was RecordTypeHeader status: YggLog.ReadProcStatus; TRUSTED {[status: status] _ YggLogBasic.GetCurrentRecord[currentRecord: thisRecord, to: [base: @recordTypeHeader, length: WORDS[CARD32]]]; }; -- size was YggLogRep.RecordTypeHeader.SIZE, but that is 1 and is a problem on the D-machines for PBasics.Copy IF status = sourceExhausted THEN ERROR InternalProgrammingError; RETURN [recordTypeHeader.type]; }; PeekCurrentTrans: PROC [thisRecord: RecordID] RETURNS [TransID] = { transactionHeader: YggLogRep.TransactionHeader; status: YggLog.ReadProcStatus; TRUSTED {[status: status] _ YggLogBasic.GetCurrentRecord[currentRecord: thisRecord, to: [base: @transactionHeader, length: YggLogRep.TransactionHeader.SIZE]]; }; IF status = sourceExhausted THEN RETURN [nullTransID] ELSE RETURN [transactionHeader.transID]; }; PeekCurrentTransactionHeader: PUBLIC PROC [thisRecord: RecordID] RETURNS [YggLogRep.TransactionHeader] = { transactionHeader: YggLogRep.TransactionHeader; status: YggLog.ReadProcStatus; TRUSTED {[status: status] _ YggLogBasic.GetCurrentRecord[currentRecord: thisRecord, to: [base: @transactionHeader, length: YggLogRep.TransactionHeader.SIZE]]; }; IF status = sourceExhausted THEN RETURN [[0, noop, 0, nullTransID, [[0],0,0]]] ELSE RETURN [transactionHeader]; }; ReadForRecovery: PUBLIC PROC [ thisRecord: RecordID, wordsToSkip: CARDINAL _ 0, to: Block] RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = { <> IF wordsToSkip > LAST[CARDINAL] - SIZE[YggLogRep.TransactionHeader] THEN wordsToSkip _ LAST[CARDINAL] ELSE wordsToSkip _ wordsToSkip + SIZE[YggLogRep.TransactionHeader]; TRUSTED {[status: status, wordsRead: wordsRead] _ YggLogBasic.GetCurrentRecord[currentRecord: thisRecord, to: [base: NIL, length: wordsToSkip, rest: @to]]; }; wordsRead _ wordsRead - SIZE[YggLogRep.TransactionHeader]; }; <> <> LengthCutOff: CARD32 _ 10; LastOffset: CARD32 _ 55; LastWordCount: CARD _ 1066; SameAgain: CARD _ 0; Write: PUBLIC PROC [trans: YggEnvironment.TransID, logRecordPhaseType: LogRecordPhaseType, recordType: RecordType, optr: Camelot.optrT, recordData: Block, force: BOOL, writeID: BOOL ] RETURNS [thisRecord, followingRecord: RecordID, thisWordNumber: YggEnvironment.WordNumber] = { recordThisAddress: ENTRY PROC ~ { IF recordData.length >= LengthCutOff THEN { IF optr.lowOffset = LastOffset AND recordData.length = LastWordCount THEN SameAgain _ SameAgain + 1; }; LastOffset _ optr.lowOffset; LastWordCount _ recordData.length; }; recordHeader: YggLogRep.TransactionHeader _ [type: recordType, transID: trans, optr: optr]; recordThisAddress[]; TRUSTED {[thisRecord, followingRecord, thisWordNumber] _ YggLogBasic.Put[ from: [base: @recordHeader, length: WORDS[YggLogRep.TransactionHeader]/WordsINWORDS, rest: @recordData], optr: optr, force: force, writeID: writeID]; }; IF logRecordPhaseType = undo OR logRecordPhaseType = redoUndo THEN lastRecordWithUndo _ thisRecord; }; Force: PUBLIC PROC [followingRecord: RecordID] = { IF followingRecord # nullRecordID THEN YggLogBasic.Force[followingRecord]; }; <> <> Read: PUBLIC PROC [ thisRecord: RecordID, wordsToSkip: CARDINAL, to: YggLog.Block] RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = { IF wordsToSkip > LAST[CARDINAL] - SIZE[YggLogRep.TransactionHeader] THEN wordsToSkip _ LAST[CARDINAL] ELSE wordsToSkip _ wordsToSkip + SIZE[YggLogRep.TransactionHeader]; TRUSTED {[status: status, wordsRead: wordsRead] _ YggLogBasic.Get[ thisRecord: thisRecord, to: [base: NIL, length: wordsToSkip, rest: @to]]; }; wordsRead _ wordsRead - SIZE[YggLogRep.TransactionHeader]; }; <> <> noMoreRecoveryRegistration: BOOL _ FALSE; TurnOffRecoveryRegistration: ENTRY PROC [] = INLINE { noMoreRecoveryRegistration _ TRUE; }; RecoveryProcsIndex: TYPE = RecordType [noop .. reserved); RecoveryProcsArray: TYPE = ARRAY RecoveryProcsIndex OF YggLog.RecoveryProc; recoveryProcs: REF RecoveryProcsArray = SafeStorage.GetSystemZone[].NEW[RecoveryProcsArray _ ALL [NIL]]; RegisterRecoveryProc: PUBLIC ENTRY PROC [ recordType: RecordType, recoveryProc: YggLog.RecoveryProc] = { IF noMoreRecoveryRegistration THEN RETURN WITH ERROR ClientProgrammingError; IF recordType NOT IN RecoveryProcsIndex THEN RETURN WITH ERROR InternalProgrammingError; recoveryProcs[recordType] _ recoveryProc; }; <> <> noMoreCheckpointRegistration: BOOL _ FALSE; TurnOffCheckpointRegistration: ENTRY PROC [] = INLINE { noMoreCheckpointRegistration _ TRUE; }; IsCheckpointProcCallable: ENTRY PROC [] RETURNS [BOOL] = INLINE { RETURN [noMoreCheckpointRegistration]; }; AnalysisProcsIndex: TYPE = RecordType [noop .. writeBytes]; AnalysisProcsArray: TYPE = ARRAY AnalysisProcsIndex OF YggLogControl.AnalysisProc; analysisProcs: REF AnalysisProcsArray = SafeStorage.GetSystemZone[].NEW[AnalysisProcsArray _ ALL [NIL]]; RegisterAnalysisProc: PUBLIC ENTRY PROC [ recordType: RecordType, analysisProc: YggLogControl.AnalysisProc] = { IF noMoreCheckpointRegistration THEN RETURN WITH ERROR ClientProgrammingError; IF recordType NOT IN AnalysisProcsIndex THEN RETURN WITH ERROR InternalProgrammingError; analysisProcs[recordType] _ analysisProc; }; checkpointProcs: ARRAY [0 .. maxCheckpointProcs) OF YggLogControl.CheckpointProc _ ALL [NIL]; maxCheckpointProcs: INT = 5; nCheckpointProcs: INT [0 .. maxCheckpointProcs] _ 0; RegisterCheckpointProc: PUBLIC ENTRY PROC [ checkpointProc: YggLogControl.CheckpointProc] = { IF noMoreCheckpointRegistration THEN RETURN WITH ERROR ClientProgrammingError; IF nCheckpointProcs = maxCheckpointProcs THEN RETURN WITH ERROR InternalProgrammingError; checkpointProcs[nCheckpointProcs] _ checkpointProc; nCheckpointProcs _ nCheckpointProcs + 1; }; ForkCheckpointYggDummyProcess: PUBLIC PROC [ wakeupPeriodInSeconds: NAT] = TRUSTED { Process.Detach[FORK CheckpointYggDummyProcess[wakeupPeriodInSeconds]]; }; CheckpointYggDummyProcess: PROC [wakeupPeriodInSeconds: NAT] = { keepRecord, startAnalysisRecord: RecordID _ nullRecordID; Process.SetPriority[Process.priorityForeground]; DO Process.Pause[Process.SecondsToTicks[wakeupPeriodInSeconds]]; [keepRecord, startAnalysisRecord] _ Checkpoint[keepRecord, startAnalysisRecord]; SafeStorage.ReclaimCollectibleObjects[suspendMe: TRUE, traceAndSweep: FALSE]; ENDLOOP; }; Checkpoint: PROC [ previousKeepRecord, previousStartAnalysisRecord: RecordID] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { <> <> differentRecordIDsSeen: BOOL _ FALSE; NoticeDifferentRecordIDs: PROC [old, new: RecordID] = { comp: PBasics.Comparison; TRUSTED {comp _ ConstArith.Compare[old, new]; }; SELECT comp FROM less => differentRecordIDsSeen _ TRUE; greater => ERROR InternalProgrammingError; ENDCASE; }; keepRecord _ startAnalysisRecord _ YggLogBasic.RecordIDOfNextPut[]; IF NOT IsCheckpointProcCallable[] THEN ERROR InternalProgrammingError; FOR i: INT [0 .. maxCheckpointProcs] IN [0 .. nCheckpointProcs) DO thisKeepRecord, thisStartAnalysisRecord: RecordID; [thisKeepRecord, thisStartAnalysisRecord] _ checkpointProcs[i][]; keepRecord _ YggLogInline.Min[thisKeepRecord, keepRecord]; startAnalysisRecord _ YggLogInline.Min[thisStartAnalysisRecord, startAnalysisRecord] ENDLOOP; NoticeDifferentRecordIDs[previousKeepRecord, keepRecord]; NoticeDifferentRecordIDs[previousStartAnalysisRecord, startAnalysisRecord]; IF differentRecordIDsSeen THEN { [] _ WriteCheckpointCompleteRecord[ startAnalysisRecord: startAnalysisRecord, keepRecord: keepRecord, length: 128 -- fix this ]; YggLogBasic.Release[beforeRecord: keepRecord]; previousKeepRecord _ keepRecord; previousStartAnalysisRecord _ startAnalysisRecord; }; RETURN [keepRecord, startAnalysisRecord]; }; RegisterAnalysisProc[ recordType: checkpointComplete, analysisProc: CheckpointCompleteAnalysisProc]; END.