MachCamelotEmulationForSunOS.mesa
Copyright Ó 1988, 1989 by Xerox Corporation. All rights reserved.
Bob Hagmann May 8, 1989 3:11:02 pm PDT
DIRECTORY
BasicTime,
Camelot,
CamelotRecoverable,
CountedVM,
File,
FileStream,
FS,
FSBackdoor,
IO,
Mach,
PBasics,
Process,
RedBlackTree,
Rope,
YggDID,
YggDIDPrivate,
YggDIDMap,
YggDIDMapPrivate,
YggdrasilInit,
YggLock,
YggLog,
YggLogControl,
YggFixedNames,
YggEnvironment,
YggFile,
YggFileStream,
YggIndexMaint,
YggInternal,
YggMonitoringLog,
YggRep,
YggRestartFile,
YggTransaction,
VM;
MachCamelotEmulationForSunOS: CEDAR MONITOR
IMPORTS BasicTime, CamelotRecoverable, CountedVM, FS, PBasics, Process, RedBlackTree, YggLog, YggLogControl, YggdrasilInit, YggRestartFile, YggTransaction, VM
EXPORTS Camelot, Mach, YggdrasilInit, YggMonitoringLog
= BEGIN
OPEN Camelot, Mach;
ROPE: TYPE = Rope.ROPE;
Exported junk
notice: PUBLIC YggMonitoringLog.ProcsRecord ← [];
DID: PUBLIC TYPE ~ REF DIDRep;
DIDRep: PUBLIC TYPE ~ YggDIDPrivate.DIDRep;
Document: TYPE = REF DocumentRep;
DocumentRep: PUBLIC TYPE = YggDIDMapPrivate.DocumentRep;
Global data
NextTransCount: CARD ← 10001;
CheckPointEphocNumber: CARD ← 1;
Exported task procedures
MachCall: PUBLIC SIGNAL [errorCode: msgReturnT, explanation: Rope.ROPE] = CODE;
MachAnomaly: PUBLIC SIGNAL [explanation: Rope.ROPE] = CODE;
taskSelf: PUBLIC PROC RETURNS [targetTask: taskT] ~ {
get my task
targetTask ← [1];
};
taskNotify: PUBLIC PROC RETURNS [notifyPort: portT] ~ {
get my notify port (task←notify)
notifyPort ← [2];
};
Exported virtual memory procedures
vmAllocate: PUBLIC PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, anywhere: BOOL,raiseSignal: BOOL] RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~ TRUSTED {
Grab some VM.
interval: VM.Interval;
interval ← VM.Allocate[count: VM.PagesForBytes[size]];
mappedAddress ← LOOPHOLE[VM.AddressForPageNumber[interval.page]];
kernCode ← KernSuccess;
};
VMAllocList: LIST OF AllocItem ← NIL;
AllocItem: TYPE = RECORD [
pagingObject: pagingObjectT,
offset: vmOffsetT,
size: vmSizeT,
mappedAddress: vmAddressT,
cvmHandle: CountedVM.Handle,
allocated: BOOLTRUE,
dirtied: BOOLFALSE,
timeOfLastDirty: BasicTime.GMT ← BasicTime.earliestGMT,
timeOfLastWrite: BasicTime.GMT ← BasicTime.earliestGMT,
dirtiedDuringEphoc: CARD ← 0,
ephocOfLastWrite: CARD ← 0,
ownedByCheckpointProcess: BOOLFALSE
];
vmAllocateWithPager: PUBLIC PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, anywhere: BOOL, pagingObject: pagingObjectT, offset: vmOffsetT, raiseSignal: BOOL] RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~ TRUSTED {
Map some externally backed memory into VM.
[mappedAddress, kernCode] ← vmAllocateWithPagerInner[size, pagingObject, offset, FALSE];
};
vmAllocateWithPagerInner: ENTRY PROC [size: vmSizeT, pagingObject: pagingObjectT, offset: vmOffsetT, zeroFillOnly: BOOL] RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~ TRUSTED {
Map some externally backed memory into VM.
loai: LIST OF AllocItem;
interval: VM.Interval;
cvmHandle: CountedVM.Handle;
pages: INT ← -1;
firstPage: INT ← -1;
FOR loai ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.pagingObject # pagingObject THEN LOOP;
IF loai.first.offset > offset+size THEN LOOP;
IF offset > loai.first.offset+loai.first.size THEN LOOP;
IF loai.first.offset = offset AND loai.first.size = size THEN {
IF loai.first.allocated THEN ERROR;
EXIT;
};
ENDLOOP;
cvmHandle ← CountedVM.Allocate[words:size/BYTES[WORD]];
interval ← VM.Allocate[count: VM.PagesForBytes[size]];
mappedAddress ← LOOPHOLE[cvmHandle.pointer];
pages ← FS.PagesForBytes[size];
IF INT[size] # FS.BytesForPages[pages] THEN ERROR;
firstPage ← FS.PagesForBytes[offset];
IF INT[offset] # FS.BytesForPages[firstPage] THEN ERROR;
IF loai = NIL THEN {
loai ← VMAllocList ← CONS[[pagingObject: pagingObject, offset: offset, size: size, mappedAddress: mappedAddress, cvmHandle: cvmHandle], VMAllocList];
};
IF zeroFillOnly THEN {
where: LONG POINTERLOOPHOLE[mappedAddress];
nWordsLeft: CARD32 ← size/PBasics.bytesPerWord;
WHILE nWordsLeft > 0 DO
fillThisTime: CARD32MIN[nWordsLeft, 10000];
PBasics.Fill[where: where, nWords: fillThisTime, value: 0];
where ← where + fillThisTime * UNITS[PBasics.Word];
nWordsLeft ← nWordsLeft - fillThisTime;
ENDLOOP;
loai.first.allocated ← FALSE;
loai.first.mappedAddress ← mappedAddress;
loai.first.cvmHandle ← cvmHandle;
}
ELSE {
IF loai.first.cvmHandle = NIL THEN {
FS.Read[file: FileForPagingObject[loai.first.pagingObject], from: firstPage, nPages: pages, to: LOOPHOLE[mappedAddress]];
loai.first.allocated ← TRUE;
loai.first.mappedAddress ← mappedAddress;
loai.first.cvmHandle ← cvmHandle;
};
};
kernCode ← KernSuccess;
};
vmDeallocate: PUBLIC ENTRY PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, raiseSignal: BOOL] RETURNS [kernCode: kernReturnT ← -1] ~ {
Unmap some externally backed memory into VM, whether externally backed or not.
FOR loai: LIST OF AllocItem ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.mappedAddress = address THEN {
pages: INT ← -1;
firstPage: INT ← -1;
IF size # loai.first.size THEN ERROR;
pages ← FS.PagesForBytes[loai.first.size];
firstPage ← FS.PagesForBytes[loai.first.offset];
FS.Write[file: FileForPagingObject[loai.first.pagingObject], to: firstPage, nPages: pages, from: LOOPHOLE[address]];
TRUSTED{VM.Free[interval: loai.first.interval];};
loai.first.allocated ← FALSE;
loai.first.mappedAddress ← 0;
loai.first.cvmHandle ← NIL;
EXIT;
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
noteDirtyOfMemory: ENTRY PROC [optr: optrT, size: uInt] ~ {
FOR loai: LIST OF AllocItem ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF (loai.first.offset > optr.lowOffset + size) AND (loai.first.offset + loai.first.size < optr.lowOffset) THEN {
}
ELSE {
loai.first.dirtied ← TRUE;
loai.first.timeOfLastDirty ← BasicTime.Now[];
loai.first.dirtiedDuringEphoc ← CheckPointEphocNumber
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
Exported message procedures
msgSend: PUBLIC PROC [header: REF msgHeaderT, option: msgOptionT, timeout: INT, raiseSignal: BOOL] RETURNS [msgCode: msgReturnT ← -1] ~ {
send a message
ERROR;
};
msgReceive: PUBLIC PROC [header: REF msgHeaderT, option: msgOptionT, timeout: INT, raiseSignal: BOOL] RETURNS [msgCode: msgReturnT ← -1] ~ {
Receive a message.
Modifies the header!
DO
Process.Pause[33];
ENDLOOP;
};
Exported port procedures
nameServerPort: PUBLIC PROC RETURNS [p: portT] ~ {
get my port to the name server (name←server←port)
p ← [3];
};
MachPortsLookup: PUBLIC PROC [targetTask: taskT, raiseSignal: BOOL] RETURNS [intPortSet: portArrayT, intPortArrayCount: INT, kernCode: kernReturnT] ~ {
get my port to the service port (service←port)
xPortArray: REF ARRAY[0..3] OF portT ← NEW[ARRAY[0..3] OF portT ← [[0], [1], [2], [3]]];
intPortArrayCount ← 4;
kernCode ← KernSuccess;
intPortSet ← LOOPHOLE[xPortArray];
};
nextPort: portT ← [10];
portAllocate: PUBLIC PROC [targetTask: taskT, raiseSignal: BOOL] RETURNS [newPort: portT, kernCode: kernReturnT ← -1] ~ TRUSTED {
send a message
nextPort ← [nextPort + 1];
newPort ← nextPort;
kernCode ← KernSuccess;
};
portRestrict: PUBLIC PROC [targetTask: taskT, port: portT, raiseSignal: BOOL] RETURNS [kernCode: kernReturnT ← -1] ~ {
restricts port so that msgReceive must be used the port number, not PortDefault
kernCode ← KernSuccess;
};
portUnrestrict: PUBLIC PROC [targetTask: taskT, port: portT, raiseSignal: BOOL] RETURNS [kernCode: kernReturnT ← -1] ~ {
unrestricts port so that PortDefault to msgReceive can receive from this port
kernCode ← KernSuccess;
};
Netname
netnameCheckIn: PUBLIC PROC [ServPort: portT, portName: Rope.ROPE, signature: portT, portId: portT, raiseSignal: BOOL] RETURNS [kernCode: kernReturnT] ~ {
"check in a name into the local name space"
kernCode ← KernSuccess;
};
Exported recoverable storage management procedures
DSInitialize: PUBLIC PROC [dsPort: portT, raiseSignal: BOOL] RETURNS [serverID: serverIdT, tsPort, mPort, sPort: portT, sharedMemAddr: vmAddressT, seqDescList: ListOfSegmentDesc ← NIL, seqPortList: ListOfPorts, kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Initialize the data server.
serverID ← [1989];
tsPort ← [4];
mPort ← [5];
sPort ← [6];
sharedMemAddr ← 0;
seqDescList ← LIST[[serverId: [1234], segmentId: [1066], logicalDisk: 'Z, unused: 'z, highSize: 0, lowSize: 40960000], [serverId: [1234], segmentId: [42], logicalDisk: 'K, unused: 'k, highSize: 0, lowSize: 1048576], [serverId: [1234], segmentId: [1492], logicalDisk: 'R, unused: 'r, highSize: 0, lowSize: 4096]];
seqPortList ← LIST[[7], LogPagingObject, [9]];
SegmentIdPagingObjectMap ← LIST[
[[1066], [7], CamelotRecoverable.CamelotRecoverableFile],
[[42], LogPagingObject, CamelotRecoverable.CamelotLogFile],
[[1492], [9], CamelotRecoverable.RestartFile]];
kernCode ← KernSuccess;
};
LogPagingObject: pagingObjectT ← [8];
SegmentIdPagingObjectMap: LIST OF SegmentIdPagingObjectMapItem;
SegmentIdPagingObjectMapItem: TYPE = RECORD[
segmentId: segmentIdT,
pagingObject: pagingObjectT,
backingFile: FS.OpenFile
];
PagingObjectForSegementId: PROC [segmentId: segmentIdT] RETURNS [pagingObject: pagingObjectT] ~ {
FOR lospom: LIST OF SegmentIdPagingObjectMapItem ← SegmentIdPagingObjectMap, lospom.rest UNTIL lospom = NIL DO
IF lospom.first.segmentId = segmentId THEN RETURN[lospom.first.pagingObject];
REPEAT FINISHED => ERROR
ENDLOOP;
};
FileForPagingObject: PROC [pagingObject: pagingObjectT] RETURNS [backingFile: FS.OpenFile] ~ {
FOR lospom: LIST OF SegmentIdPagingObjectMapItem ← SegmentIdPagingObjectMap, lospom.rest UNTIL lospom = NIL DO
IF lospom.first.pagingObject = pagingObject THEN RETURN[lospom.first.backingFile];
REPEAT FINISHED => ERROR
ENDLOOP;
};
DSPinObject: PUBLIC PROC [dsPort: portT, tid: tidT, optr: optrT, size: uInt, raiseSignal: BOOL] RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Pin an object in preparation for modification. optr is the Camelot recoverable storage "address", not the VM address
rememberPin[tid, optr, size];
noteDirtyOfMemory[optr, size];
kernCode ← KernSuccess;
};
DSLogNewValue: PUBLIC PROC [dsPort: portT, tid: tidT, optr: optrT, newValue: pointerT, newValueCnt: INT, raiseSignal: BOOL] RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Send a new value of an object to the log.
[] ← YggLog.Write[trans: tid, logRecordPhaseType: redo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[newValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
monitoredChangePin[transID: tid, loggingNow: TRUE];
kernCode ← KernSuccess;
};
DSLogOldValueNewValue: PUBLIC PROC [dsPort: portT, tid: tidT, optr: optrT, oldValue: pointerT, oldValueCnt: INT, newValue: pointerT, newValueCnt: INT, raiseSignal: BOOL] RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Send a new value of an object to the log.
[] ← YggLog.Write[trans: tid, logRecordPhaseType: undo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[oldValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
[] ← YggLog.Write[trans: tid, logRecordPhaseType: redo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[newValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
monitoredChangePin[transID: tid, loggingNow: TRUE];
kernCode ← KernSuccess;
};
DSQInit: PUBLIC PROC [sharedMemAddr: Mach.vmAddressT] ~ {
Init the shared memory queue (emulation does nothing).
};
DSQPreflush: PUBLIC PROC [dsPort: Mach.portT, optr: optrT, sizeInBytes: uInt] ~ {
Preflush some dirty memory
po: pagingObjectT;
po ← PagingObjectForSegementId[optr.segmentId];
FOR loai: LIST OF AllocItem ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.pagingObject = po AND loai.first.offset = optr.lowOffset THEN {
backingFile: FS.OpenFile;
pages: INT ← -1;
firstPage: INT ← -1;
IF sizeInBytes # loai.first.size THEN ERROR;
pages ← FS.PagesForBytes[loai.first.size];
firstPage ← FS.PagesForBytes[loai.first.offset];
backingFile ← FileForPagingObject[loai.first.pagingObject];
FS.Write[file: backingFile, to: firstPage, nPages: pages, from: LOOPHOLE[loai.first.mappedAddress]];
TRUSTED{VM.Free[interval: loai.first.interval];};
EXIT;
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
DSQZeroFill: PUBLIC PROC [dsPort: Mach.portT, optr: optrT, sizeInBytes: uInt] ~ {
Zero some memory.
po: pagingObjectT;
po ← PagingObjectForSegementId[optr.segmentId];
[] ← vmAllocateWithPagerInner[sizeInBytes, po, optr.lowOffset, TRUE];
};
Exported transaction management procedures
TAAddApplication: PUBLIC PROC [tPort: portT, atPort: portT, authName: Rope.ROPE, raiseSignal: BOOL] RETURNS [applicationID: applicationIdT, taPort: portT, kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Initialize an application to the transaction manager.
kernCode ← KernSuccess;
};
TABegin: PUBLIC ENTRY PROC [taPort: portT, parentTid: tidT, transType: transactionTypeT, raiseSignal: BOOL] RETURNS [newTid: tidT, kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Start a new transaction.
newTid ← GetNextTrans[];
IF ~noteNewTrans[newTid] THEN ERROR;
kernCode ← KernSuccess;
};
TAEnd: PUBLIC PROC [taPort: portT, tid: tidT, protocolType: protocolTypeT, raiseSignal: BOOL] RETURNS [timestamp: timestampT, status: INT, kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Try to commit a transaction.
transID: YggTransaction.TransID ← LOOPHOLE[tid];
block: LIST OF YggLog.Block;
blockArrayOfWords: LONG POINTER TO ARRAY[0..1024/4) OF CARD32;
block ← getScratchBlock[];
block.first.length ← 2;
blockArrayOfWords ← LOOPHOLE[block.first.base];
blockArrayOfWords[0] ← tid.top.highTicker;
blockArrayOfWords[1] ← tid.top.lowTicker;
[] ← YggLog.Write[trans: tid, logRecordPhaseType: analysis,
recordType: commitTrans, optr: [[0],0,0], recordData: block.first, force: TRUE];
returnScratchBlock[block];
monitoredChangePin[transID: transID, loggingNow: FALSE];
IF ~removeTrans[transID] THEN ERROR;
status ← ErSuccess;
kernCode ← KernSuccess;
};
TAKill: PUBLIC PROC [taPort: portT, tid: tidT, status: INT, raiseSignal: BOOL] RETURNS [kernCode: Mach.kernReturnT ← -1] ~ TRUSTED {
Try to abort a transaction.
transID: YggTransaction.TransID ← LOOPHOLE[tid];
block: LIST OF YggLog.Block;
blockArrayOfWords: LONG POINTER TO ARRAY[0..1024/4) OF CARD32;
block ← getScratchBlock[];
block.first.length ← 2;
blockArrayOfWords ← LOOPHOLE[block.first.base];
blockArrayOfWords[0] ← tid.top.highTicker;
blockArrayOfWords[1] ← tid.top.lowTicker;
YggTransaction.Suspend[transID, ErWaitingTransAborted];
[] ← YggLog.Write[trans: tid, logRecordPhaseType: analysis,
recordType: abortTrans, optr: [[0],0,0], recordData: block.first, force: TRUE];
Insert backwards scan of log, calling CamelotMIG.SRRestoreObjectX
monitoredChangePin[transID: tid, loggingNow: FALSE];
returnScratchBlock[block];
kernCode ← KernSuccess;
};
Internal block allocation
savedScratchBlocks: LIST OF YggLog.Block ← NIL;
getScratchBlock: ENTRY PROC RETURNS [block: LIST OF YggLog.Block] ~ {
ENABLE UNWIND => {};
IF savedScratchBlocks # NIL THEN {
block ← savedScratchBlocks;
savedScratchBlocks ← savedScratchBlocks.rest;
}
ELSE {
interval: VM.Interval;
interval ← VM.Allocate[count: VM.PagesForBytes[1024]];
block ← LIST[[base: LOOPHOLE[VM.AddressForPageNumber[interval.page]], length: 1024/4, rest: NIL]];
};
};
returnScratchBlock: ENTRY PROC [block: LIST OF YggLog.Block] ~ {
block.rest ← savedScratchBlocks;
savedScratchBlocks ← block;
};
Internal transaction procedures
savedScratchTrans: Trans;
savedScratchTransForEntries: Trans ← NEW[TransRep];
subtransactionAndTransactionMap: RedBlackTree.Table;
checkpointNullTransObj: Trans ← NEW[TransRep];
Trans: TYPE = REF TransRep;
TransRep: TYPE = RECORD [
transID: YggEnvironment.TransID,
outcome: YggEnvironment.Outcome,
latched: BOOLFALSE,
finishTime: BasicTime.GMT ← BasicTime.nullGMT,
suspendTime: BasicTime.GMT ← BasicTime.nullGMT,
pageBucketsModified: RECORD[
low32: CARD32,
high32: CARD32
]
];
forABit: CONDITION;
getScratchTrans: ENTRY PROC RETURNS [scratchTrans: Trans] ~ {
ENABLE UNWIND => {};
IF savedScratchTrans # NIL THEN {
scratchTrans ← savedScratchTrans;
savedScratchTrans ← NIL;
}
ELSE {
scratchTrans ← NEW[TransRep];
};
};
noteNewTrans: PROC [transID: YggEnvironment.TransID] RETURNS [parentOK: BOOLFALSE] ~ {
newTrans: Trans;
insertTrans: ENTRY PROC ~ {
ENABLE UNWIND => {};
data: RedBlackTree.UserData;
IF ~YggTransaction.IsTopLevel[transID] THEN {
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data # NIL THEN ERROR;
}
ELSE ERROR;
RedBlackTree.Insert[subtransactionAndTransactionMap, newTrans, newTrans];
};
newTrans ← NEW[TransRep ← [transID: transID, outcome: active, pageBucketsModified: [0, 0] ]];
insertTrans[];
};
innerFindTrans: INTERNAL PROC [transID: YggEnvironment.TransID, setLatch: BOOLFALSE] RETURNS [transFound: BOOLFALSE, trans: Trans ← NIL] ~ {
ENABLE UNWIND => {};
data: RedBlackTree.UserData;
IF YggTransaction.IsNullTrans[transID] THEN RETURN [transFound: TRUE, trans: checkpointNullTransObj];
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data = NIL THEN {
RETURN[FALSE, NIL];
}
ELSE {
trans ← NARROW[data];
WHILE trans.latched DO WAIT forABit ENDLOOP;
trans.latched ← TRUE;
RETURN[TRUE, trans];
};
};
unlatchTrans: ENTRY PROC [trans: Trans] ~ {
trans.latched ← FALSE;
};
removeTrans: ENTRY PROC [transID: YggEnvironment.TransID] RETURNS [transFound: BOOLFALSE] ~ {
data: RedBlackTree.UserData;
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data = NIL THEN {
RETURN[FALSE];
}
ELSE {
trans: Trans;
trans ← NARROW[data];
[] ← RedBlackTree.Delete[subtransactionAndTransactionMap, savedScratchTransForEntries];
RETURN[TRUE];
};
};
Internal red black procs
A Red Black tree is used to store and find transactions/subtransactions. The tree is indexed by subtransaction.
GetKeyProc: RedBlackTree.GetKey = {
PROC [data: UserData] RETURNS [Key]
trans: Trans ← NARROW[data];
RETURN[ trans ];
};
CompareProc: RedBlackTree.Compare = {
PROC [k: Key, data: UserData] RETURNS [Basics.Comparison]
dataTrans: Trans ← NARROW[data];
keyTrans: Trans ← NARROW[k];
SELECT keyTrans.transID.bottom.nodeId.value FROM
> dataTrans.transID.bottom.nodeId.value => RETURN [greater];
< dataTrans.transID.bottom.nodeId.value => RETURN [less];
ENDCASE => {
SELECT keyTrans.transID.bottom.highTicker FROM
> dataTrans.transID.bottom.highTicker => RETURN [greater];
< dataTrans.transID.bottom.highTicker => RETURN [less];
ENDCASE => {
SELECT keyTrans.transID.bottom.lowTicker FROM
> dataTrans.transID.bottom.lowTicker => RETURN [greater];
< dataTrans.transID.bottom.lowTicker => RETURN [less];
ENDCASE => RETURN [equal];
};
};
};
Internal transaction procedures
pageMask: CARD32 = 077B;
noBuckets: CARD = 64;
byteMask: CARD32 = 01777B;
bytesPerBucket: CARD = 1024;
pageBuckets: ARRAY [0..noBuckets) OF LIST OF pinItem;
pinItem: TYPE = RECORD[
transID: YggEnvironment.TransID,
optr: optrT,
size: uInt,
pc: pinCode
];
pinCode: TYPE = {pin, pinAndLogged, preventPin};
pin => it's pinned and maybe updated
pinAndLogged => it's pinned and updated, and the log write has been buffered. A flush of the log will make everything OK.
preventPin => disallow pins for this page
hashOptr: PROC [optr: optrT] RETURNS [CARD] ~ {
lowPageNo: CARD32;
lowPageNo ← PBasics.BITRSHIFT[optr.lowOffset, 10];
RETURN[PBasics.BITAND[lowPageNo, pageMask]];
};
rememberPin: ENTRY PROC [transID: YggEnvironment.TransID, optr: optrT, size: uInt, doPreventPin: BOOLFALSE] ~ {
nowOptr: optrT;
sizeLeft: uInt;
innerRememberPin: INTERNAL PROC [thisOptr: optrT, bytes: CARD] RETURNS [tryAgain: BOOLFALSE] ~ {
ENABLE UNWIND => {};
hashPage: CARD;
lowPageNo: CARD32;
transFound: BOOLFALSE;
trans: Trans ← NIL;
lowPageNo ← PBasics.BITRSHIFT[thisOptr.lowOffset, 10];
hashPage ← hashOptr[thisOptr];
FOR lopi: LIST OF pinItem ← pageBuckets[hashPage], lopi.rest UNTIL lopi = NIL DO
loopLowPageNo: CARD32;
loopLowPageNo ← PBasics.BITRSHIFT[lopi.first.optr.lowOffset, 10];
IF loopLowPageNo = lowPageNo AND lopi.first.pc = preventPin THEN RETURN[TRUE]
ENDLOOP;
pageBuckets[hashPage] ← CONS[[transID, thisOptr, bytes, pin], pageBuckets[hashPage]];
[transFound, trans] ← innerFindTrans[transID];
IF ~transFound THEN ERROR;
IF hashPage < 32 THEN {
trans.pageBucketsModified.low32 ← PBasics.BITOR[PBasics.BITLSHIFT[value: 1, count: hashPage], trans.pageBucketsModified.low32];
}
ELSE {
trans.pageBucketsModified.high32 ← PBasics.BITOR[PBasics.BITLSHIFT[value: 1, count: hashPage-32], trans.pageBucketsModified.high32];
};
};
nowOptr ← optr;
sizeLeft ← size;
WHILE sizeLeft > 0 DO
firstByteOnPage: CARD;
bytesThisPage: CARD;
firstByteOnPage ← PBasics.BITAND[nowOptr.lowOffset, byteMask];
bytesThisPage ← bytesPerBucket - firstByteOnPage;
IF bytesThisPage = 0 OR bytesThisPage > bytesPerBucket THEN ERROR;
IF innerRememberPin[nowOptr, bytesThisPage] THEN {
WAIT forABit;
LOOP;
};
sizeLeft ← sizeLeft - bytesThisPage;
nowOptr.lowOffset ← nowOptr.lowOffset + bytesThisPage;
IF PBasics.BITAND[nowOptr.lowOffset, byteMask] # 0 THEN ERROR;
ENDLOOP;
};
monitoredChangePin: ENTRY PROC [transID: YggTransaction.TransID, loggingNow: BOOL] ~ {
changePin[transID, loggingNow];
};
changePin: INTERNAL PROC [transID: YggTransaction.TransID, loggingNow: BOOL] ~ {
trans: Trans;
transFound: BOOLTRUE;
lowMod: CARD32;
highMod: CARD32;
removePinFromBucket: PROC [bucket: CARD] ~ {
gotOne: BOOLFALSE;
prev: LIST OF pinItem ← NIL;
FOR lopi: LIST OF pinItem ← pageBuckets[bucket], lopi.rest UNTIL lopi = NIL DO
IF YggTransaction.EqualTrans[trans.transID, lopi.first.transID] THEN {
IF loggingNow THEN {
IF lopi.first.pc # pin THEN ERROR;
lopi.first.pc ← pinAndLogged;
}
ELSE {
IF lopi.first.pc # pinAndLogged THEN ERROR;
IF prev = NIL THEN pageBuckets[bucket] ← lopi.rest
ELSE prev.rest ← lopi.rest;
};
gotOne ← TRUE;
LOOP;
};
prev ← lopi;
ENDLOOP;
IF ~gotOne THEN ERROR;
};
[transFound, trans] ← innerFindTrans[transID];
IF ~transFound THEN ERROR;
lowMod ← trans.pageBucketsModified.low32;
FOR bucktNo: INT IN [0..32) DO
IF PBasics.BITAND[lowMod, 1] = 1 THEN {
removePinFromBucket[bucktNo];
};
lowMod ← PBasics.BITRSHIFT[value: lowMod, count: 1];
ENDLOOP;
highMod ← trans.pageBucketsModified.high32;
FOR bucktNo: INT IN [32..64) DO
IF PBasics.BITAND[lowMod, 1] = 1 THEN {
removePinFromBucket[bucktNo];
};
lowMod ← PBasics.BITRSHIFT[value: lowMod, count: 1];
ENDLOOP;
};
Name server
CALookup: PUBLIC PROC [nameServerPort: Mach.portT, name: Rope.ROPE, site: Rope.ROPE, numberWanted: INT, maxSeconds: INT, raiseSignal: BOOL] RETURNS [portList: Mach.ListOfPorts, kernCode: Mach.kernReturnT] ~ {
Lookup for applications.
portList ← LIST[[9]];
kernCode ← KernSuccess;
};
Local procs
GetNextTrans: PROC RETURNS [transID: YggTransaction.TransID] ~ {
NextTransCount ← NextTransCount + 1;
transID ← [top: [lowTicker: NextTransCount], bottom: [lowTicker: NextTransCount]];
};
DoCommit: PROC [transID: YggTransaction.TransID, doCommit: BOOL] ~ {
[] ← YggTransaction.Finish[transID, IF doCommit THEN commit ELSE abort];
};
YggdrasilInit
STServer: PUBLIC PROC [inMsg: REF Camelot.camlibSysReqMsgT, outMsg: REF Camelot.camlibSysRepMsgT] RETURNS [messageUnderstood: BOOLFALSE] ~ {
};
SRServer: PUBLIC PROC [inMsg: REF Camelot.camlibSysReqMsgT, outMsg: REF Camelot.camlibSysRepMsgT] RETURNS [messageUnderstood: BOOLFALSE] ~ {
};
ATServer: PUBLIC PROC [inMsg: REF Camelot.camlibSysReqMsgT, outMsg: REF Camelot.camlibSysRepMsgT] RETURNS [messageUnderstood: BOOLFALSE] ~ {
};
Checkpoint process
milliSecondsBetweenCheckpoint: INT ← 30000;
ephocsBetweenCleans: CARD ← 2;
CheckpointProcess: PROC ~ {
DO
data: RedBlackTree.UserData;
loai: LIST OF AllocItem;
now: BasicTime.GMT;
segmentId: segmentIdT;
Process.PauseMsec[milliSecondsBetweenCheckpoint];
now ← BasicTime.Now[];
CheckPointEphocNumber ← CheckPointEphocNumber + 1;
DO
findABufferToFlush: ENTRY PROC RETURNS [gotOne: BOOL ← FALSE] ~ {
FOR loai ← VMAllocList, loai.rest UNTIL loai = NIL DO
IF loai.first.dirtied AND loai.first.pagingObject # LogPagingObject THEN {
IF CheckPointEphocNumber - loai.first.ephocOfLastWrite > 2 THEN {
loai.first.ownedByCheckpointProcess ← TRUE;
RETURN [TRUE];
};
};
ENDLOOP;
};
IF ~findABufferToFlush[] THEN EXIT;
checkpointNullTransObj.pageBucketsModified ← [0, 0];
segmentId ← [loai.first.pagingObject.portNumber];
rememberPin[transID: YggEnvironment.nullTransID, optr: [segmentId: segmentId, highOffset: 0, lowOffset: loai.first.offset], size: loai.first.size, doPreventPin: TRUE];
{
pages: INT ← -1;
firstPage: INT ← -1;
pages ← FS.PagesForBytes[loai.first.size];
firstPage ← FS.PagesForBytes[loai.first.offset];
FS.Write[file: CamelotRecoverable.CamelotRecoverableFile, to: firstPage, nPages: pages, from: LOOPHOLE[loai.first.mappedAddress]];
loai.first.timeOfLastWrite ← now;
loai.first.ephocOfLastWrite ← CheckPointEphocNumber;
loai.first.dirtied ← FALSE;
loai.first.ownedByCheckpointProcess ← FALSE;
};
monitoredChangePin [transID: YggEnvironment.nullTransID, loggingNow: FALSE];
ENDLOOP;
{
block: LIST OF YggLog.Block;
blockArrayOfWords: LONG POINTER TO ARRAY[0..1024/4) OF CARD32;
thisRecord: YggLog.RecordID;
block ← getScratchBlock[];
block.first.length ← 1;
blockArrayOfWords ← LOOPHOLE[block.first.base];
TRUSTED {blockArrayOfWords[0] ← CheckPointEphocNumber;};
[thisRecord: thisRecord] ← YggLog.Write[trans: YggEnvironment.nullTransID, logRecordPhaseType: other, recordType: checkpointComplete, optr: [[0],0,0], recordData: block.first, force: TRUE];
returnScratchBlock[block];
YggRestartFile.WriteRestartRecord[recordIDForCheckpointCompleteRecord: thisRecord];
};
ENDLOOP;
};
Initialization
Init: PROC = {
subtransactionAndTransactionMap ← RedBlackTree.Create[getKey: GetKeyProc, compare: CompareProc];
TRUSTED {Process.InitializeCondition[@forABit, Process.MsecToTicks[10]]; };
TRUSTED {Process.Detach[FORK CheckpointProcess[]]};
YggLogControl.Recover[];
YggdrasilInit.RecoveryComplete[];
};
Init[];
END.