MachCamelotEmulationForSunOSImpl.mesa
Copyright Ó 1988, 1989 by Xerox Corporation. All rights reserved.
Bob Hagmann July 6, 1989 9:38:54 am PDT
DIRECTORY
BasicTime,
Camelot,
CamelotRecoverable,
CountedVM,
File,
FileStream,
FS,
FSBackdoor,
IO,
Mach,
PBasics,
Process,
RedBlackTree,
Rope,
YggBuffMan,
YggDID,
YggDIDPrivate,
YggDIDMap,
YggDIDMapPrivate,
YggdrasilInit,
YggLock,
YggLog,
YggLogBasic,
YggLogControl,
YggLogRep,
YggFixedNames,
YggEnvironment,
YggFile,
YggFileStream,
YggIndexMaint,
YggInternal,
YggMonitoringLog,
YggSunOS,
YggRep,
YggRestartFile,
YggTransaction,
VM;
MachCamelotEmulationForSunOSImpl: CEDAR MONITOR
IMPORTS BasicTime, CamelotRecoverable, CountedVM, FS, IO, PBasics, Process, RedBlackTree, YggBuffMan, YggLog, YggLogBasic, YggLogControl, YggdrasilInit, YggRestartFile, YggTransaction, VM
EXPORTS Camelot, Mach, CamelotRecoverable, YggdrasilInit, YggMonitoringLog, YggTransaction
= BEGIN
OPEN Camelot, Mach;
ROPE: TYPE = Rope.ROPE;
Exported junk
notice: PUBLIC YggMonitoringLog.ProcsRecord ← [];
DID: PUBLIC TYPE ~ REF DIDRep;
DIDRep: PUBLIC TYPE ~ YggDIDPrivate.DIDRep;
Document: TYPE = REF DocumentRep;
DocumentRep: PUBLIC TYPE = YggDIDMapPrivate.DocumentRep;
TraceLogWriting: BOOLTRUE;
TraceFileStream: IO.STREAM ← NIL;
WordsINWORDS: CARD = WORDS[CARD32];
Global data
GoAway: BOOLFALSE;
NextTransCount: CARD ← 10001;
CheckPointEphocNumber: CARD ← 1;
Exported task procedures
MachCall: PUBLIC SIGNAL [errorCode: msgReturnT, explanation: Rope.ROPE] = CODE;
MachAnomaly: PUBLIC SIGNAL [explanation: Rope.ROPE] = CODE;
taskSelf: PUBLIC PROC RETURNS [targetTask: taskT] ~ {
get my task
targetTask ← [1];
};
taskNotify: PUBLIC PROC RETURNS [notifyPort: portT] ~ {
get my notify port (task←notify)
notifyPort ← [2];
};
Exported virtual memory procedures
vmAllocate: PUBLIC PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, anywhere: BOOL,raiseSignal: BOOL] RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~ TRUSTED {
Grab some VM.
interval: VM.Interval;
interval ← VM.Allocate[count: VM.PagesForBytes[size]];
mappedAddress ← LOOPHOLE[VM.AddressForPageNumber[interval.page]];
kernCode ← KernSuccess;
};
VMAllocList: LIST OF AllocItem ← NIL;
AllocItem: TYPE = RECORD [
pagingObject: pagingObjectT,
offset: vmOffsetT,
size: vmSizeT,
mappedAddress: vmAddressT,
cvmHandle: CountedVM.Handle,
allocated: BOOLTRUE,
dirtied: BOOLFALSE,
timeOfLastDirty: BasicTime.GMT ← BasicTime.earliestGMT,
timeOfLastWrite: BasicTime.GMT ← BasicTime.earliestGMT,
dirtiedDuringEphoc: CARD ← 0,
ephocOfLastWrite: CARD ← 0,
ownedByCheckpointProcess: BOOLFALSE
];
vmAllocateWithPager: PUBLIC PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, anywhere: BOOL, pagingObject: pagingObjectT, offset: vmOffsetT, raiseSignal: BOOL] RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~ TRUSTED {
Map some externally backed memory into VM.
[mappedAddress, kernCode] ← vmAllocateWithPagerInner[size, pagingObject, offset, FALSE];
};
vmAllocateWithPagerInner: ENTRY PROC [size: vmSizeT, pagingObject: pagingObjectT, offset: vmOffsetT, zeroFillOnly: BOOL] RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~ TRUSTED {
Map some externally backed memory into VM.
loai: LIST OF AllocItem;
interval: VM.Interval;
cvmHandle: CountedVM.Handle;
pages: INT ← -1;
firstPage: INT ← -1;
FOR loai ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.pagingObject # pagingObject THEN LOOP;
IF loai.first.offset > offset+size THEN LOOP;
IF offset > loai.first.offset+loai.first.size THEN LOOP;
IF loai.first.offset = offset AND loai.first.size = size THEN {
IF loai.first.allocated THEN ERROR;
EXIT;
};
ENDLOOP;
IF loai = NIL THEN cvmHandle ← CountedVM.Allocate[words:size/BYTES[WORD]]
ELSE {
IF loai.first.cvmHandle = NIL THEN cvmHandle ← CountedVM.Allocate[words:size/BYTES[WORD]]
ELSE cvmHandle ← loai.first.cvmHandle;
};
interval ← VM.Allocate[count: VM.PagesForBytes[size]];
mappedAddress ← LOOPHOLE[cvmHandle.pointer];
pages ← FS.PagesForBytes[size];
IF INT[size] # FS.BytesForPages[pages] THEN ERROR;
firstPage ← FS.PagesForBytes[offset];
IF INT[offset] # FS.BytesForPages[firstPage] THEN ERROR;
IF loai = NIL THEN {
loai ← VMAllocList ← CONS[[pagingObject: pagingObject, offset: offset, size: size, mappedAddress: mappedAddress, cvmHandle: NIL], VMAllocList];
};
IF zeroFillOnly THEN {
where: LONG POINTERLOOPHOLE[mappedAddress];
nWordsLeft: CARD32 ← size/PBasics.bytesPerWord;
WHILE nWordsLeft > 0 DO
fillThisTime: CARD32MIN[nWordsLeft, 10000];
PBasics.Fill[where: where, nWords: fillThisTime, value: 0];
where ← where + fillThisTime * UNITS[PBasics.Word];
nWordsLeft ← nWordsLeft - fillThisTime;
ENDLOOP;
loai.first.allocated ← FALSE;
loai.first.mappedAddress ← mappedAddress;
loai.first.cvmHandle ← cvmHandle;
}
ELSE {
IF loai.first.cvmHandle = NIL THEN {
FS.Read[file: FileForPagingObject[loai.first.pagingObject], from: firstPage, nPages: pages, to: LOOPHOLE[mappedAddress]];
loai.first.mappedAddress ← mappedAddress;
loai.first.cvmHandle ← cvmHandle;
};
loai.first.allocated ← TRUE;
};
kernCode ← KernSuccess;
};
vmDeallocate: PUBLIC ENTRY PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, raiseSignal: BOOL] RETURNS [kernCode: kernReturnT ← -1] ~ {
Unmap some externally backed memory into VM, whether externally backed or not.
FOR loai: LIST OF AllocItem ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.mappedAddress = address THEN {
pages: INT ← -1;
firstPage: INT ← -1;
IF size # loai.first.size THEN ERROR;
WHILE loai.first.ownedByCheckpointProcess DO Process.Pause[1] ENDLOOP;
pages ← FS.PagesForBytes[loai.first.size];
firstPage ← FS.PagesForBytes[loai.first.offset];
FS.Write[file: FileForPagingObject[loai.first.pagingObject], to: firstPage, nPages: pages, from: LOOPHOLE[address]];
TRUSTED{VM.Free[interval: loai.first.interval];};
loai.first.dirtied ← FALSE;
loai.first.allocated ← FALSE;
loai.first.mappedAddress ← 0;
loai.first.cvmHandle ← NIL;
EXIT;
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
noteDirtyOfMemory: ENTRY PROC [optr: optrT, size: uInt] ~ {
po: pagingObjectT;
po ← PagingObjectForSegementId[optr.segmentId];
FOR loai: LIST OF AllocItem ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.pagingObject = po AND (loai.first.offset > optr.lowOffset + size) AND (loai.first.offset + loai.first.size < optr.lowOffset) THEN {
}
ELSE {
loai.first.dirtied ← TRUE;
loai.first.timeOfLastDirty ← BasicTime.Now[];
loai.first.dirtiedDuringEphoc ← CheckPointEphocNumber;
EXIT;
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
Exported message procedures
msgSend: PUBLIC PROC [header: REF msgHeaderT, option: msgOptionT, timeout: INT, raiseSignal: BOOL] RETURNS [msgCode: msgReturnT ← -1] ~ {
send a message
ERROR;
};
msgReceive: PUBLIC PROC [header: REF msgHeaderT, option: msgOptionT, timeout: INT, raiseSignal: BOOL] RETURNS [msgCode: msgReturnT ← -1] ~ {
Receive a message.
Modifies the header!
DO
Process.Pause[33];
IF GoAway THEN RETURN[Mach.RcvTimedOut];
ENDLOOP;
};
Exported port procedures
nameServerPort: PUBLIC PROC RETURNS [p: portT] ~ {
get my port to the name server (name←server←port)
p ← [3];
};
MachPortsLookup: PUBLIC PROC [targetTask: taskT, raiseSignal: BOOL] RETURNS [intPortSet: portArrayT, intPortArrayCount: INT, kernCode: kernReturnT] ~ {
get my port to the service port (service←port)
xPortArray: REF ARRAY[0..3] OF portT ← NEW[ARRAY[0..3] OF portT ← [[0], [1], [2], [3]]];
intPortArrayCount ← 4;
kernCode ← KernSuccess;
intPortSet ← LOOPHOLE[xPortArray];
};
nextPort: portT ← [10];
portAllocate: PUBLIC PROC [targetTask: taskT, raiseSignal: BOOL] RETURNS [newPort: portT, kernCode: kernReturnT ← -1] ~ TRUSTED {
send a message
nextPort ← [nextPort + 1];
newPort ← nextPort;
kernCode ← KernSuccess;
};
portRestrict: PUBLIC PROC [targetTask: taskT, port: portT, raiseSignal: BOOL] RETURNS [kernCode: kernReturnT ← -1] ~ {
restricts port so that msgReceive must be used the port number, not PortDefault
kernCode ← KernSuccess;
};
portUnrestrict: PUBLIC PROC [targetTask: taskT, port: portT, raiseSignal: BOOL] RETURNS [kernCode: kernReturnT ← -1] ~ {
unrestricts port so that PortDefault to msgReceive can receive from this port
kernCode ← KernSuccess;
};
Netname
netnameCheckIn: PUBLIC PROC [ServPort: portT, portName: Rope.ROPE, signature: portT, portId: portT, raiseSignal: BOOL] RETURNS [kernCode: kernReturnT] ~ {
"check in a name into the local name space"
kernCode ← KernSuccess;
};
Exported recoverable storage management procedures
DSInitialize: PUBLIC PROC [dsPort: portT, raiseSignal: BOOL] RETURNS [serverID: serverIdT, tsPort, mPort, sPort: portT, sharedMemAddr: vmAddressT, seqDescList: ListOfSegmentDesc ← NIL, seqPortList: ListOfPorts, kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Initialize the data server.
serverID ← [1989];
tsPort ← [4];
mPort ← [5];
sPort ← [6];
sharedMemAddr ← 0;
seqDescList ← LIST[[serverId: [1234], segmentId: [1066], logicalDisk: 'Z, unused: 'z, highSize: 0, lowSize: RecoverableMemoryPagingSize]];
seqPortList ← LIST[RecoverableMemoryPagingObject];
SegmentIdPagingObjectMap ← LIST[
[[1066], RecoverableMemoryPagingObject, CamelotRecoverable.CamelotRecoverableFile],
[YggSunOS.LogSegmentID, LogPagingObject, CamelotRecoverable.CamelotLogFile],
[[1492], [9], CamelotRecoverable.RestartFile]];
kernCode ← KernSuccess;
};
CamelotRecoverableInit: PUBLIC PROC RETURNS [seqDescList: Camelot.ListOfSegmentDesc, seqPortList: Mach.ListOfPorts] ~ {
seqDescList ← LIST[[serverId: [1234], segmentId: YggSunOS.LogSegmentID, logicalDisk: 'K, unused: 'k, highSize: 0, lowSize: YggSunOS.LogFileSize * YggSunOS.LogBytesPerPage], [serverId: [1234], segmentId: [1492], logicalDisk: 'R, unused: 'r, highSize: 0, lowSize: 4096]];
seqPortList ← LIST[LogPagingObject, [9]];
};
RecoverableMemoryPagingObject: pagingObjectT ← [7];
RecoverableMemoryPagingSize: CARD32 ← 40960000;
LogPagingObject: pagingObjectT ← [8];
SegmentIdPagingObjectMap: LIST OF SegmentIdPagingObjectMapItem;
SegmentIdPagingObjectMapItem: TYPE = RECORD[
segmentId: segmentIdT,
pagingObject: pagingObjectT,
backingFile: FS.OpenFile
];
PagingObjectForSegementId: PROC [segmentId: segmentIdT] RETURNS [pagingObject: pagingObjectT] ~ {
FOR lospom: LIST OF SegmentIdPagingObjectMapItem ← SegmentIdPagingObjectMap, lospom.rest UNTIL lospom = NIL DO
IF lospom.first.segmentId = segmentId THEN RETURN[lospom.first.pagingObject];
REPEAT FINISHED => ERROR
ENDLOOP;
};
SegementIdForPagingObject: PROC [pagingObject: pagingObjectT] RETURNS [segmentId: segmentIdT] ~ {
FOR lospom: LIST OF SegmentIdPagingObjectMapItem ← SegmentIdPagingObjectMap, lospom.rest UNTIL lospom = NIL DO
IF lospom.first.pagingObject = pagingObject THEN RETURN[lospom.first.segmentId];
REPEAT FINISHED => ERROR
ENDLOOP;
};
FileForPagingObject: PROC [pagingObject: pagingObjectT] RETURNS [backingFile: FS.OpenFile] ~ {
FOR lospom: LIST OF SegmentIdPagingObjectMapItem ← SegmentIdPagingObjectMap, lospom.rest UNTIL lospom = NIL DO
IF lospom.first.pagingObject = pagingObject THEN RETURN[lospom.first.backingFile];
REPEAT FINISHED => ERROR
ENDLOOP;
};
DSPinObject: PUBLIC PROC [dsPort: portT, tid: tidT, optr: optrT, size: uInt, raiseSignal: BOOL] RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Pin an object in preparation for modification. optr is the Camelot recoverable storage "address", not the VM address
effectiveOptr: optrT ← optr;
effectiveSize: uInt ← size;
IF BYTES[UNIT] = 2 THEN {
effectiveOptr.lowOffset ← PBasics.BITAND[optr.lowOffset, 0FFFFFFFEH];
effectiveSize ← PBasics.BITAND[optr.lowOffset - effectiveOptr.lowOffset + size + 1, 0FFFFFFFEH];
};
rememberPin[tid, effectiveOptr, effectiveSize];
noteDirtyOfMemory[effectiveOptr, effectiveSize];
kernCode ← KernSuccess;
};
OpenTraceFileAndStream: PROC ~ {
TraceFileStream ← FS.StreamOpen[fileName: "///Trace/LogTrace.txt", accessOptions: $create, keep: 8, createByteCount: 25600];
};
DSLogNewValue: PUBLIC PROC [dsPort: portT, tid: tidT, optr: optrT, newValue: pointerT, newValueCnt: INT, raiseSignal: BOOL] RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Send a new value of an object to the log.
thisRecord: YggLog.RecordID;
[thisRecord: thisRecord] ← YggLog.Write[trans: tid, logRecordPhaseType: redo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[newValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting THEN {
p: ENTRY PROC ~ {
IO.PutF[TraceFileStream, "New value at %g for seg: %g, offset: %b, byteSize: %g(%b)\n", IO.card[thisRecord.low], IO.card[optr.segmentId.value], IO.card[optr.lowOffset], IO.card[newValueCnt], IO.card[newValueCnt]];
IO.Flush[TraceFileStream];
};
p[];
};
monitoredChangePin[transID: tid, loggingNow: TRUE, forceClear: FALSE, optr: optr, size: newValueCnt];
kernCode ← KernSuccess;
};
NumberOfShortLogRecords: INT ← 0;
DSLogOldValueNewValue: PUBLIC PROC [dsPort: portT, tid: tidT, optr: optrT, oldValue: pointerT, oldValueCnt: INT, newValue: pointerT, newValueCnt: INT, raiseSignal: BOOL] RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Send a new value of an object to the log.
thisRecord: YggLog.RecordID;
[] ← YggLog.Write[trans: tid, logRecordPhaseType: undo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[oldValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
IF newValueCnt <= 2 THEN {
NumberOfShortLogRecords ← NumberOfShortLogRecords + 1;
};
[thisRecord: thisRecord] ← YggLog.Write[trans: tid, logRecordPhaseType: redo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[newValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting THEN {
p: ENTRY PROC ~ {
IO.PutF[TraceFileStream, "Old/new at %b for seg: %g, offset: %b, byteSize: %g(%b)\n", IO.card[thisRecord.low], IO.card[optr.segmentId.value], IO.card[optr.lowOffset], IO.card[newValueCnt], IO.card[newValueCnt]];
IO.Flush[TraceFileStream];
};
p[];
};
monitoredChangePin[transID: tid, loggingNow: TRUE, forceClear: FALSE, optr: optr, size: oldValueCnt];
kernCode ← KernSuccess;
};
DSQInit: PUBLIC PROC [sharedMemAddr: Mach.vmAddressT] ~ {
Init the shared memory queue (emulation does nothing).
};
DSQPreflush: PUBLIC PROC [dsPort: Mach.portT, optr: optrT, sizeInBytes: uInt] ~ {
Preflush some dirty memory
po: pagingObjectT;
po ← PagingObjectForSegementId[optr.segmentId];
FOR loai: LIST OF AllocItem ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.pagingObject = po AND loai.first.offset = optr.lowOffset THEN {
backingFile: FS.OpenFile;
pages: INT ← -1;
firstPage: INT ← -1;
IF sizeInBytes # loai.first.size THEN ERROR;
IF loai.first.cvmHandle = NIL THEN ERROR;
pages ← FS.PagesForBytes[loai.first.size];
firstPage ← FS.PagesForBytes[loai.first.offset];
backingFile ← FileForPagingObject[loai.first.pagingObject];
FS.Write[file: backingFile, to: firstPage, nPages: pages, from: LOOPHOLE[loai.first.mappedAddress]];
loai.first.dirtied ← FALSE;
TRUSTED{VM.Free[interval: loai.first.interval];};
EXIT;
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
DSQZeroFill: PUBLIC PROC [dsPort: Mach.portT, optr: optrT, sizeInBytes: uInt] ~ {
Zero some memory.
po: pagingObjectT;
po ← PagingObjectForSegementId[optr.segmentId];
[] ← vmAllocateWithPagerInner[sizeInBytes, po, optr.lowOffset, TRUE];
};
Exported transaction management procedures
TAAddApplication: PUBLIC PROC [tPort: portT, atPort: portT, authName: Rope.ROPE, raiseSignal: BOOL] RETURNS [applicationID: applicationIdT, taPort: portT, kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Initialize an application to the transaction manager.
kernCode ← KernSuccess;
};
TABegin: PUBLIC PROC [taPort: portT, parentTid: tidT, transType: transactionTypeT, raiseSignal: BOOL] RETURNS [newTid: tidT, kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Start a new transaction.
newTid ← GetNextTrans[];
IF ~noteNewTrans[newTid] THEN ERROR;
kernCode ← KernSuccess;
};
TAEnd: PUBLIC PROC [taPort: portT, tid: tidT, protocolType: protocolTypeT, raiseSignal: BOOL] RETURNS [timestamp: timestampT, status: INT, kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Try to commit a transaction.
thisRecord: YggLog.RecordID;
transID: YggTransaction.TransID ← LOOPHOLE[tid];
block: LIST OF YggLog.Block;
blockArrayOfWords: LONG POINTER TO ARRAY[0..1024/4) OF CARD32;
block ← getScratchBlock[];
block.first.length ← 2;
blockArrayOfWords ← LOOPHOLE[block.first.base];
blockArrayOfWords[0] ← tid.top.highTicker;
blockArrayOfWords[1] ← tid.top.lowTicker;
[thisRecord: thisRecord] ← YggLog.Write[trans: tid, logRecordPhaseType: analysis,
recordType: commitTrans, optr: [[0],0,0], recordData: block.first, force: TRUE];
returnScratchBlock[block];
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting THEN {
p: ENTRY PROC ~ TRUSTED {
IO.PutF[TraceFileStream, "TAEnd at %g\n", IO.card[thisRecord.low] ];
IO.Flush[TraceFileStream];
};
p[];
};
monitoredChangePin[transID: transID, loggingNow: FALSE, forceClear: FALSE, optr: [[0],0,0] , size: 0];
IF ~removeTrans[transID] THEN ERROR;
status ← ErSuccess;
kernCode ← KernSuccess;
};
TAKill: PUBLIC PROC [taPort: portT, tid: tidT, status: INT, raiseSignal: BOOL] RETURNS [kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Try to abort a transaction.
thisRecord: YggLog.RecordID;
transID: YggTransaction.TransID ← LOOPHOLE[tid];
block: LIST OF YggLog.Block;
blockArrayOfWords: LONG POINTER TO ARRAY[0..1024/4) OF CARD32;
block ← getScratchBlock[];
block.first.length ← 2;
blockArrayOfWords ← LOOPHOLE[block.first.base];
blockArrayOfWords[0] ← tid.top.highTicker;
blockArrayOfWords[1] ← tid.top.lowTicker;
YggTransaction.Suspend[transID, ErWaitingTransAborted];
[thisRecord: thisRecord] ← YggLog.Write[trans: tid, logRecordPhaseType: analysis,
recordType: abortTrans, optr: [[0],0,0], recordData: block.first, force: TRUE];
Insert backwards scan of log, calling CamelotMIG.SRRestoreObjectX
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting THEN {
p: ENTRY PROC ~ TRUSTED {
IO.PutF[TraceFileStream, "TAKill at %g\n", IO.card[thisRecord.low] ];
IO.Flush[TraceFileStream];
};
p[];
};
monitoredChangePin[transID: tid, loggingNow: FALSE, forceClear: FALSE, optr: [[0],0,0] , size: 0];
returnScratchBlock[block];
kernCode ← KernSuccess;
};
Internal block allocation
savedScratchBlocks: LIST OF YggLog.Block ← NIL;
getScratchBlock: ENTRY PROC RETURNS [block: LIST OF YggLog.Block] ~ {
ENABLE UNWIND => {};
IF savedScratchBlocks # NIL THEN {
block ← savedScratchBlocks;
savedScratchBlocks ← savedScratchBlocks.rest;
}
ELSE {
interval: VM.Interval;
interval ← VM.Allocate[count: VM.PagesForBytes[1024]];
block ← LIST[[base: LOOPHOLE[VM.AddressForPageNumber[interval.page]], length: 1024/4, rest: NIL]];
};
};
returnScratchBlock: ENTRY PROC [block: LIST OF YggLog.Block] ~ {
block.rest ← savedScratchBlocks;
savedScratchBlocks ← block;
};
Internal transaction procedures
savedScratchTrans: Trans;
savedScratchTransForEntries: Trans ← NEW[TransRep];
subtransactionAndTransactionMap: RedBlackTree.Table;
checkpointNullTransObj: Trans ← NEW[TransRep];
Trans: TYPE = REF TransRep;
TransRep: TYPE = RECORD [
transID: YggEnvironment.TransID,
outcome: YggEnvironment.Outcome,
latched: BOOLFALSE,
finishTime: BasicTime.GMT ← BasicTime.nullGMT,
suspendTime: BasicTime.GMT ← BasicTime.nullGMT,
pageBucketsModified: RECORD[
low32: CARD32,
high32: CARD32
]
];
forABit: CONDITION;
getScratchTrans: ENTRY PROC RETURNS [scratchTrans: Trans] ~ {
ENABLE UNWIND => {};
IF savedScratchTrans # NIL THEN {
scratchTrans ← savedScratchTrans;
savedScratchTrans ← NIL;
}
ELSE {
scratchTrans ← NEW[TransRep];
};
};
noteNewTrans: PROC [transID: YggEnvironment.TransID] RETURNS [parentOK: BOOLTRUE] ~ {
newTrans: Trans;
insertTrans: ENTRY PROC ~ {
ENABLE UNWIND => {};
data: RedBlackTree.UserData;
IF YggTransaction.IsTopLevel[transID] THEN {
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data # NIL THEN ERROR;
}
ELSE ERROR;
RedBlackTree.Insert[subtransactionAndTransactionMap, newTrans, newTrans];
};
newTrans ← NEW[TransRep ← [transID: transID, outcome: active, pageBucketsModified: [0, 0] ]];
insertTrans[];
};
StateDuringRecovery: PUBLIC PROC [transID: YggEnvironment.TransID] RETURNS [outcome: YggEnvironment.Outcome] ~ {
transFound: BOOLFALSE;
trans: Trans ← NIL;
[transFound, trans] ← FindTrans[transID];
IF transFound THEN {
RETURN[commit];
}
ELSE RETURN[unknown];
};
FindTrans: ENTRY PROC [transID: YggEnvironment.TransID, setLatch: BOOLFALSE] RETURNS [transFound: BOOLFALSE, trans: Trans ← NIL] ~ {
ENABLE UNWIND => {};
[transFound, trans] ← innerFindTrans[transID, setLatch];
};
innerFindTrans: INTERNAL PROC [transID: YggEnvironment.TransID, setLatch: BOOLFALSE] RETURNS [transFound: BOOLFALSE, trans: Trans ← NIL] ~ {
ENABLE UNWIND => {};
data: RedBlackTree.UserData;
IF YggTransaction.IsNullTrans[transID] THEN RETURN [transFound: TRUE, trans: checkpointNullTransObj];
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data = NIL THEN {
RETURN[FALSE, NIL];
}
ELSE {
trans ← NARROW[data];
WHILE trans.latched DO WAIT forABit ENDLOOP;
IF setLatch THEN trans.latched ← TRUE;
RETURN[TRUE, trans];
};
};
unlatchTrans: ENTRY PROC [trans: Trans] ~ {
trans.latched ← FALSE;
};
removeTrans: ENTRY PROC [transID: YggEnvironment.TransID] RETURNS [transFound: BOOLFALSE] ~ {
data: RedBlackTree.UserData;
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data = NIL THEN {
RETURN[FALSE];
}
ELSE {
trans: Trans;
trans ← NARROW[data];
[] ← RedBlackTree.Delete[subtransactionAndTransactionMap, savedScratchTransForEntries];
RETURN[TRUE];
};
};
Internal red black procs
A Red Black tree is used to store and find transactions/subtransactions. The tree is indexed by subtransaction.
GetKeyProc: RedBlackTree.GetKey = {
PROC [data: UserData] RETURNS [Key]
trans: Trans ← NARROW[data];
RETURN[ trans ];
};
CompareProc: RedBlackTree.Compare = {
PROC [k: Key, data: UserData] RETURNS [Basics.Comparison]
dataTrans: Trans ← NARROW[data];
keyTrans: Trans ← NARROW[k];
SELECT keyTrans.transID.bottom.nodeId.value FROM
> dataTrans.transID.bottom.nodeId.value => RETURN [greater];
< dataTrans.transID.bottom.nodeId.value => RETURN [less];
ENDCASE => {
SELECT keyTrans.transID.bottom.highTicker FROM
> dataTrans.transID.bottom.highTicker => RETURN [greater];
< dataTrans.transID.bottom.highTicker => RETURN [less];
ENDCASE => {
SELECT keyTrans.transID.bottom.lowTicker FROM
> dataTrans.transID.bottom.lowTicker => RETURN [greater];
< dataTrans.transID.bottom.lowTicker => RETURN [less];
ENDCASE => RETURN [equal];
};
};
};
Internal transaction procedures
pageMask: CARD32 = 077B;
noBuckets: CARD = 64;
byteMask: CARD32 = 01777B;
bytesPerBucket: CARD = 1024;
pageBuckets: ARRAY [0..noBuckets) OF LIST OF pinItem;
pinItem: TYPE = RECORD[
transID: YggEnvironment.TransID,
optr: optrT,
size: uInt,
pc: pinCode
];
pinCode: TYPE = {pin, pinAndLogged, preventPin};
pin => it's pinned and maybe updated
pinAndLogged => it's pinned and updated, and the log write has been buffered. A flush of the log will make everything OK.
preventPin => disallow pins for this page
hashOptr: PROC [optr: optrT] RETURNS [CARD] ~ {
lowPageNo: CARD32;
lowPageNo ← PBasics.BITRSHIFT[optr.lowOffset, 10];
RETURN[PBasics.BITAND[lowPageNo, pageMask]];
};
tooLow: BOOLFALSE;
debugAddress: CARD32 ← 1;
debugHits: INT ← 0;
rememberPin: ENTRY PROC [transID: YggEnvironment.TransID, optr: optrT, size: uInt, doPreventPin: BOOLFALSE] ~ {
nowOptr: optrT;
sizeLeft: uInt;
innerRememberPin: INTERNAL PROC [thisOptr: optrT, bytes: CARD] RETURNS [tryAgain: BOOLFALSE] ~ {
ENABLE UNWIND => {};
hashPage: CARD;
lowPageNo: CARD32;
transFound: BOOLFALSE;
trans: Trans ← NIL;
lowPageNo ← PBasics.BITRSHIFT[thisOptr.lowOffset, 10];
hashPage ← hashOptr[thisOptr];
FOR lopi: LIST OF pinItem ← pageBuckets[hashPage], lopi.rest UNTIL lopi = NIL DO
loopLowPageNo: CARD32;
loopLowPageNo ← PBasics.BITRSHIFT[lopi.first.optr.lowOffset, 10];
IF loopLowPageNo = lowPageNo AND lopi.first.pc = preventPin THEN RETURN[TRUE]
ENDLOOP;
pageBuckets[hashPage] ← CONS[[transID, thisOptr, bytes, pin], pageBuckets[hashPage]];
[transFound, trans] ← innerFindTrans[transID];
IF ~transFound THEN ERROR;
IF hashPage < 32 THEN {
trans.pageBucketsModified.low32 ← PBasics.BITOR[PBasics.BITLSHIFT[value: 1, count: hashPage], trans.pageBucketsModified.low32];
}
ELSE {
trans.pageBucketsModified.high32 ← PBasics.BITOR[PBasics.BITLSHIFT[value: 1, count: hashPage-32], trans.pageBucketsModified.high32];
};
};
nowOptr ← optr;
sizeLeft ← size;
IF optr.segmentId.value < 50 AND optr.segmentId.value # 0 THEN {
tooLow ← TRUE;
};
IF optr.lowOffset = debugAddress THEN {
debugHits ← debugHits + 1;
};
WHILE sizeLeft > 0 DO
firstByteOnPage: CARD;
bytesThisPage: CARD;
firstByteOnPage ← PBasics.BITAND[nowOptr.lowOffset, byteMask];
bytesThisPage ← MIN[sizeLeft, bytesPerBucket - firstByteOnPage];
IF bytesThisPage = 0 OR bytesThisPage > bytesPerBucket THEN ERROR;
IF innerRememberPin[nowOptr, bytesThisPage] THEN {
WAIT forABit;
LOOP;
};
sizeLeft ← sizeLeft - bytesThisPage;
nowOptr.lowOffset ← nowOptr.lowOffset + bytesThisPage;
IF sizeLeft # 0 AND PBasics.BITAND[nowOptr.lowOffset, byteMask] # 0 THEN ERROR;
ENDLOOP;
};
monitoredChangePin: ENTRY PROC [transID: YggTransaction.TransID, loggingNow: BOOL, optr: optrT, forceClear: BOOL, size: uInt] ~ {
changePin[transID, loggingNow, forceClear, optr, size];
};
changePin: INTERNAL PROC [transID: YggTransaction.TransID, loggingNow: BOOL, forceClear: BOOL, optr: optrT, size: uInt] ~ {
trans: Trans;
transFound: BOOLTRUE;
lowMod: CARD32;
highMod: CARD32;
gotIt: BOOLFALSE;
nowOptr: optrT;
sizeLeft: uInt;
removePinFromBucket: PROC [bucket: CARD, exactMatch: BOOL, thisOptr: optrT, bytes: CARD] RETURNS [gotOne: BOOLFALSE] ~ {
prev: LIST OF pinItem ← NIL;
FOR lopi: LIST OF pinItem ← pageBuckets[bucket], lopi.rest UNTIL lopi = NIL DO
IF YggTransaction.EqualTrans[trans.transID, lopi.first.transID] AND (~loggingNow OR ~exactMatch OR (lopi.first.optr.segmentId = thisOptr.segmentId AND lopi.first.optr.lowOffset = thisOptr.lowOffset)) THEN {
IF loggingNow THEN {
IF lopi.first.pc # pin THEN LOOP;
lopi.first.pc ← pinAndLogged;
}
ELSE {
IF ~forceClear AND lopi.first.pc # pinAndLogged THEN ERROR;
IF prev = NIL THEN pageBuckets[bucket] ← lopi.rest
ELSE prev.rest ← lopi.rest;
};
gotOne ← TRUE;
LOOP;
};
prev ← lopi;
ENDLOOP;
};
[transFound, trans] ← innerFindTrans[transID];
IF ~transFound THEN ERROR;
IF optr.lowOffset = debugAddress THEN {
debugHits ← debugHits + 1;
};
IF optr = [[0],0,0] AND size = 0 THEN {
gotOne: BOOLFALSE;
lowMod ← trans.pageBucketsModified.low32;
FOR bucktNo: INT IN [0..32) DO
IF PBasics.BITAND[lowMod, 1] = 1 THEN {
gotOne ← removePinFromBucket[bucktNo, FALSE, optr, size];
gotIt ← gotIt OR gotOne;
IF ~gotOne THEN ERROR;
};
lowMod ← PBasics.BITRSHIFT[value: lowMod, count: 1];
ENDLOOP;
highMod ← trans.pageBucketsModified.high32;
FOR bucktNo: INT IN [32..64) DO
IF PBasics.BITAND[highMod, 1] = 1 THEN {
gotOne ← removePinFromBucket[bucktNo, FALSE, optr, size];
gotIt ← gotIt OR gotOne;
IF ~gotOne THEN ERROR;
};
highMod ← PBasics.BITRSHIFT[value: highMod, count: 1];
ENDLOOP;
IF ~loggingNow THEN trans.pageBucketsModified ← [0 ,0];
}
ELSE {
nowOptr ← optr;
sizeLeft ← size;
IF optr.segmentId.value < 50 AND optr.segmentId.value # 0 THEN {
tooLow ← TRUE;
};
WHILE sizeLeft > 0 DO
firstByteOnPage: CARD;
bytesThisPage: CARD;
bucktNo: INT;
gotOne: BOOLFALSE;
firstByteOnPage ← PBasics.BITAND[nowOptr.lowOffset, byteMask];
bytesThisPage ← MIN[sizeLeft, bytesPerBucket - firstByteOnPage];
IF bytesThisPage = 0 OR bytesThisPage > bytesPerBucket THEN ERROR;
bucktNo ← hashOptr[nowOptr];
IF ~(gotOne ← removePinFromBucket[bucktNo, TRUE, nowOptr, bytesThisPage]) THEN ERROR;
sizeLeft ← sizeLeft - bytesThisPage;
nowOptr.lowOffset ← nowOptr.lowOffset + bytesThisPage;
IF sizeLeft # 0 AND PBasics.BITAND[nowOptr.lowOffset, byteMask] # 0 THEN ERROR;
ENDLOOP;
IF ~loggingNow THEN trans.pageBucketsModified ← [0 ,0];
};
};
Name server
CALookup: PUBLIC PROC [nameServerPort: Mach.portT, name: Rope.ROPE, site: Rope.ROPE, numberWanted: INT, maxSeconds: INT, raiseSignal: BOOL] RETURNS [portList: Mach.ListOfPorts, kernCode: Mach.kernReturnT] ~ {
Lookup for applications.
portList ← LIST[[9]];
kernCode ← KernSuccess;
};
Local procs
GetNextTrans: ENTRY PROC RETURNS [transID: YggTransaction.TransID] ~ {
NextTransCount ← NextTransCount + 1;
transID ← [top: [lowTicker: NextTransCount], bottom: [lowTicker: NextTransCount]];
};
DoCommit: PROC [transID: YggTransaction.TransID, doCommit: BOOL] ~ {
[] ← YggTransaction.Finish[transID, IF doCommit THEN commit ELSE abort];
};
YggdrasilInit
STServer: PUBLIC PROC [inMsg: REF Camelot.camlibSysReqMsgT, outMsg: REF Camelot.camlibSysRepMsgT] RETURNS [messageUnderstood: BOOLFALSE] ~ {
};
SRServer: PUBLIC PROC [inMsg: REF Camelot.camlibSysReqMsgT, outMsg: REF Camelot.camlibSysRepMsgT] RETURNS [messageUnderstood: BOOLFALSE] ~ {
};
ATServer: PUBLIC PROC [inMsg: REF Camelot.camlibSysReqMsgT, outMsg: REF Camelot.camlibSysRepMsgT] RETURNS [messageUnderstood: BOOLFALSE] ~ {
};
Checkpoint process
milliSecondsBetweenCheckpoint: INT ← 30000;
ephocsBetweenCleans: CARD ← 2;
CheckpointProcess: PROC ~ {
FOR checkpointCount: INT IN [1 ..INT.LAST] DO
data: RedBlackTree.UserData;
loai: LIST OF AllocItem;
now: BasicTime.GMT;
segmentId: segmentIdT;
IF checkpointCount > 1 THEN Process.PauseMsec[milliSecondsBetweenCheckpoint];
IF GoAway THEN EXIT;
now ← BasicTime.Now[];
CheckPointEphocNumber ← CheckPointEphocNumber + 1;
DO
findABufferToFlush: ENTRY PROC RETURNS [gotOne: BOOLFALSE] ~ {
FOR loai ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.dirtied AND loai.first.allocated AND loai.first.cvmHandle # NIL AND loai.first.pagingObject # LogPagingObject THEN {
IF CheckPointEphocNumber - loai.first.ephocOfLastWrite > 2 THEN {
loai.first.ownedByCheckpointProcess ← TRUE;
RETURN [TRUE];
};
};
ENDLOOP;
};
IF ~findABufferToFlush[] THEN EXIT;
checkpointNullTransObj.pageBucketsModified ← [0, 0];
segmentId ← SegementIdForPagingObject[loai.first.pagingObject];
rememberPin[transID: YggEnvironment.nullTransID, optr: [segmentId: segmentId, highOffset: 0, lowOffset: loai.first.offset], size: loai.first.size, doPreventPin: TRUE];
{
pages: INT ← -1;
firstPage: INT ← -1;
pages ← FS.PagesForBytes[loai.first.size];
firstPage ← FS.PagesForBytes[loai.first.offset];
FS.Write[file: CamelotRecoverable.CamelotRecoverableFile, to: firstPage, nPages: pages, from: LOOPHOLE[loai.first.mappedAddress]];
loai.first.timeOfLastWrite ← now;
loai.first.ephocOfLastWrite ← CheckPointEphocNumber;
loai.first.dirtied ← FALSE;
loai.first.ownedByCheckpointProcess ← FALSE;
};
monitoredChangePin [transID: YggEnvironment.nullTransID, loggingNow: FALSE, forceClear: TRUE, optr: [[0],0,0], size: 0];
ENDLOOP;
{
checkpointCompleteRecordBody: YggLogRep.CheckpointCompleteRecord ←
[startAnalysisRecordID: YggLog.nullRecordID, keepRecordID: YggLog.nullRecordID, checkPointEphocNumber: CheckPointEphocNumber];
thisRecord: YggLog.RecordID;
thisWordNumber: YggEnvironment.WordNumber;
TRUSTED {
checkpointCompleteRecordBody.blockArrayOfWords[0] ← 5910;
checkpointCompleteRecordBody.blockArrayOfWords[1] ← 42;
[thisRecord: thisRecord, thisWordNumber: thisWordNumber] ← YggLogBasic.Put[
from: [base: @checkpointCompleteRecordBody, length: SIZE[YggLogRep.CheckpointCompleteRecord]/WordsINWORDS],
optr: [[0], 0, 0],
writeID: TRUE,
force: TRUE];
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting THEN TRUSTED {
p: ENTRY PROC ~ TRUSTED {
IO.PutF[TraceFileStream, "Checkpoint at %g\n", IO.card[thisRecord.low]];
IO.Flush[TraceFileStream];
};
p[];
};
[thisRecord: thisRecord, thisWordNumber: thisWordNumber] ← YggLog.Write[
trans: YggEnvironment.nullTransID,
logRecordPhaseType: other,
recordType: checkpointComplete,
optr: [[0],0,0],
recordData: [base: @checkpointCompleteRecordBody, length: SIZE[YggLogRep.CheckpointCompleteRecord]/WordsINWORDS],
force: TRUE,
writeID: TRUE];
};
YggRestartFile.WriteRestartRecord[wordNumberForCheckpointCompleteRecord: thisWordNumber, recordIDForCheckpointCompleteRecord: thisRecord];
};
ENDLOOP;
};
Zero backing store
ZeroBackingStore: PROC = {
segmentId: segmentIdT;
po: pagingObjectT;
lowOffset: CARD32 ← 0;
cvmHandle: CountedVM.Handle;
segmentId ← SegementIdForPagingObject[RecoverableMemoryPagingObject];
po ← PagingObjectForSegementId[segmentId];
cvmHandle ← CountedVM.Allocate[words: 32768/BYTES[WORD]];
TRUSTED {PBasics.Fill[where: LOOPHOLE[cvmHandle.pointer], nWords: 32768/PBasics.bytesPerWord, value: 0];};
WHILE lowOffset < RecoverableMemoryPagingSize DO
FS.Write[file: CamelotRecoverable.CamelotRecoverableFile, to: FS.PagesForBytes[lowOffset], nPages: FS.PagesForBytes[32768], from: LOOPHOLE[cvmHandle.pointer]];
lowOffset ← lowOffset + 32768;
ENDLOOP;
cvmHandle ← NIL;
};
ZeroTwoMegsOfBackingStore: PROC = {
segmentId: segmentIdT;
po: pagingObjectT;
lowOffset: CARD32 ← 0;
cvmHandle: CountedVM.Handle;
sizeToZero: CARD32 ← 1024;
sizeToZero ← 2*1024*sizeToZero;
segmentId ← SegementIdForPagingObject[RecoverableMemoryPagingObject];
po ← PagingObjectForSegementId[segmentId];
cvmHandle ← CountedVM.Allocate[words: 32768/BYTES[WORD]];
TRUSTED {PBasics.Fill[where: LOOPHOLE[cvmHandle.pointer], nWords: 32768/PBasics.bytesPerWord, value: 0];};
WHILE lowOffset < sizeToZero DO
FS.Write[file: CamelotRecoverable.CamelotRecoverableFile, to: FS.PagesForBytes[lowOffset], nPages: FS.PagesForBytes[32768], from: LOOPHOLE[cvmHandle.pointer]];
lowOffset ← lowOffset + 32768;
ENDLOOP;
cvmHandle ← NIL;
};
ZeroBackingStore: PROC = {
segmentId: segmentIdT;
po: pagingObjectT;
lowOffset: CARD32 ← 0;
segmentId ← SegementIdForPagingObject[RecoverableMemoryPagingObject];
po ← PagingObjectForSegementId[segmentId];
WHILE lowOffset < RecoverableMemoryPagingSize DO
mappedAddress: vmAddressT ← 0;
[mappedAddress: mappedAddress] ← vmAllocateWithPagerInner[4096, po, lowOffset, TRUE];
DSQZeroFill[dsPort: YggEnvironment.dsPort, optr: [segmentId: segmentId, highOffset: 0, lowOffset: lowOffset], sizeInBytes: 4096];
[] ← vmDeallocate[taskSelf[], mappedAddress, 4096, TRUE];
lowOffset ← lowOffset + 4096;
ENDLOOP;
};
Log analysis and processing
LogAnalysisProc: YggLogControl.AnalysisProc = {
PROC [record: RecordID, type: RecordType, trans: TransID];
Analysis of commitTrans and abortTrans
SELECT type FROM
commitTrans => {
[] ← noteNewTrans[trans];
};
abortTrans => {};
ENDCASE => ERROR;
};
RecoveryBlock: REF ARRAY [0..4096) OF CARD32;
LogRecoveryProc: YggLog.RecoveryProc = {
PROC [record: RecordID, type: RecordType, trans: YggEnvironment.TransID, outcome: YggEnvironment.Outcome];
Recovery of writeBytes
SELECT type FROM
writeBytes => {
IF FindTrans[trans, FALSE].transFound THEN {
status: YggLog.ReadProcStatus;
wordsRead: CARDINAL; -- MD words
recordHeader: YggLogRep.TransactionHeader;
recordData: LONG POINTER;
recordHeader ← YggLogControl.PeekCurrentTransactionHeader[record];
[status: status, wordsRead: wordsRead] ← YggLog.ReadForRecovery[thisRecord: record, wordsToSkip: 0, to: [base: LOOPHOLE[RecoveryBlock], length: 4096]];
IF status = destinationFull THEN ERROR;
IF wordsRead = 0 THEN ERROR; -- fix this
recordData ← LOOPHOLE[RecoveryBlock, LONG POINTER] ;
Now we have:
sizeOfRecordDataInWords 32 bit words of data starting at recordData
it is for optr of recordHeader.optr
= [segmentId: segmentIdT, highOffset: CARD16, lowOffset: CARD32]
TRUSTED {YggBuffMan.WriteBytes[optr: recordHeader.optr, value: recordData, valueCnt: wordsRead*BYTES[WORD]];};
};
};
ENDCASE => ERROR;
};
Initialization
Init: PROC = {
subtransactionAndTransactionMap ← RedBlackTree.Create[getKey: GetKeyProc, compare: CompareProc];
TRUSTED {Process.InitializeCondition[@forABit, Process.MsecToTicks[10]]; };
RecoveryBlock ← NEW[ARRAY [0..4096) OF CARD32];
YggLogControl.RegisterAnalysisProc[recordType: commitTrans, analysisProc: LogAnalysisProc];
YggLogControl.RegisterAnalysisProc[recordType: abortTrans, analysisProc: LogAnalysisProc];
YggLog.RegisterRecoveryProc[recordType: writeBytes, recoveryProc: LogRecoveryProc];
};
CamelotRecoveryComplete: PUBLIC PROC [] ~ {
RecoveryBlock ← NIL;
subtransactionAndTransactionMap ← RedBlackTree.Create[getKey: GetKeyProc, compare: CompareProc]; -- create again after recovery to empty out trans seen during recovery
TRUSTED {Process.Detach[FORK CheckpointProcess[]]};
YggdrasilInit.RecoveryComplete[];
};
Init[];
END.