MachCamelotEmulationForSunOSImpl:
CEDAR
MONITOR
IMPORTS BasicTime, CamelotRecoverable, CountedVM, FS, IO, PBasics, Process, RedBlackTree, YggBuffMan, YggLog, YggLogBasic, YggLogControl, YggdrasilInit, YggRestartFile, YggTransaction, VM
EXPORTS Camelot, Mach, CamelotRecoverable, YggdrasilInit, YggMonitoringLog, YggTransaction
= BEGIN
OPEN Camelot, Mach;
ROPE: TYPE = Rope.ROPE;
Exported virtual memory procedures
vmAllocate:
PUBLIC
PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, anywhere:
BOOL,raiseSignal:
BOOL]
RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~
TRUSTED {
Grab some VM.
interval: VM.Interval;
interval ← VM.Allocate[count: VM.PagesForBytes[size]];
mappedAddress ← LOOPHOLE[VM.AddressForPageNumber[interval.page]];
kernCode ← KernSuccess;
};
VMAllocList: LIST OF AllocItem ← NIL;
AllocItem:
TYPE =
RECORD [
pagingObject: pagingObjectT,
offset: vmOffsetT,
size: vmSizeT,
mappedAddress: vmAddressT,
cvmHandle: CountedVM.Handle,
allocated: BOOL ← TRUE,
dirtied: BOOL ← FALSE,
timeOfLastDirty: BasicTime.GMT ← BasicTime.earliestGMT,
timeOfLastWrite: BasicTime.GMT ← BasicTime.earliestGMT,
dirtiedDuringEphoc: CARD ← 0,
ephocOfLastWrite: CARD ← 0,
ownedByCheckpointProcess: BOOL ← FALSE
];
vmAllocateWithPager:
PUBLIC
PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, anywhere:
BOOL, pagingObject: pagingObjectT, offset: vmOffsetT, raiseSignal:
BOOL]
RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~
TRUSTED {
Map some externally backed memory into VM.
[mappedAddress, kernCode] ← vmAllocateWithPagerInner[size, pagingObject, offset, FALSE];
};
vmAllocateWithPagerInner:
ENTRY
PROC [size: vmSizeT, pagingObject: pagingObjectT, offset: vmOffsetT, zeroFillOnly:
BOOL]
RETURNS [mappedAddress: vmAddressT ← 0, kernCode: kernReturnT ← -1] ~
TRUSTED {
Map some externally backed memory into VM.
loai: LIST OF AllocItem;
interval: VM.Interval;
cvmHandle: CountedVM.Handle;
pages: INT ← -1;
firstPage: INT ← -1;
FOR loai ← VMAllocList, loai.rest
UNTIL loai =
NIL
DO
IF loai.first.pagingObject # pagingObject THEN LOOP;
IF loai.first.offset > offset+size THEN LOOP;
IF offset > loai.first.offset+loai.first.size THEN LOOP;
IF loai.first.offset = offset
AND loai.first.size = size
THEN {
IF loai.first.allocated THEN ERROR;
EXIT;
};
ENDLOOP;
IF loai = NIL THEN cvmHandle ← CountedVM.Allocate[words:size/BYTES[WORD]]
ELSE {
IF loai.first.cvmHandle = NIL THEN cvmHandle ← CountedVM.Allocate[words:size/BYTES[WORD]]
ELSE cvmHandle ← loai.first.cvmHandle;
};
interval ← VM.Allocate[count: VM.PagesForBytes[size]];
mappedAddress ← LOOPHOLE[cvmHandle.pointer];
pages ← FS.PagesForBytes[size];
IF INT[size] # FS.BytesForPages[pages] THEN ERROR;
firstPage ← FS.PagesForBytes[offset];
IF INT[offset] # FS.BytesForPages[firstPage] THEN ERROR;
IF loai =
NIL
THEN {
loai ← VMAllocList ← CONS[[pagingObject: pagingObject, offset: offset, size: size, mappedAddress: mappedAddress, cvmHandle: NIL], VMAllocList];
};
IF zeroFillOnly
THEN {
where: LONG POINTER ← LOOPHOLE[mappedAddress];
nWordsLeft: CARD32 ← size/PBasics.bytesPerWord;
WHILE nWordsLeft > 0
DO
fillThisTime: CARD32 ← MIN[nWordsLeft, 10000];
PBasics.Fill[where: where, nWords: fillThisTime, value: 0];
where ← where + fillThisTime * UNITS[PBasics.Word];
nWordsLeft ← nWordsLeft - fillThisTime;
ENDLOOP;
loai.first.allocated ← FALSE;
loai.first.mappedAddress ← mappedAddress;
loai.first.cvmHandle ← cvmHandle;
}
ELSE {
IF loai.first.cvmHandle =
NIL
THEN {
FS.Read[file: FileForPagingObject[loai.first.pagingObject], from: firstPage, nPages: pages, to: LOOPHOLE[mappedAddress]];
loai.first.mappedAddress ← mappedAddress;
loai.first.cvmHandle ← cvmHandle;
};
loai.first.allocated ← TRUE;
};
kernCode ← KernSuccess;
};
vmDeallocate:
PUBLIC
ENTRY
PROC [targetTask: vmTaskT, address: vmAddressT, size: vmSizeT, raiseSignal:
BOOL]
RETURNS [kernCode: kernReturnT ← -1] ~ {
Unmap some externally backed memory into VM, whether externally backed or not.
FOR loai:
LIST
OF AllocItem ← VMAllocList, loai.rest
UNTIL loai =
NIL
DO
IF loai.first.mappedAddress = address
THEN {
pages: INT ← -1;
firstPage: INT ← -1;
IF size # loai.first.size THEN ERROR;
WHILE loai.first.ownedByCheckpointProcess DO Process.Pause[1] ENDLOOP;
pages ← FS.PagesForBytes[loai.first.size];
firstPage ← FS.PagesForBytes[loai.first.offset];
FS.Write[file: FileForPagingObject[loai.first.pagingObject], to: firstPage, nPages: pages, from: LOOPHOLE[address]];
TRUSTED{VM.Free[interval: loai.first.interval];};
loai.first.dirtied ← FALSE;
loai.first.allocated ← FALSE;
loai.first.mappedAddress ← 0;
loai.first.cvmHandle ← NIL;
EXIT;
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
noteDirtyOfMemory:
ENTRY
PROC [optr: optrT, size: uInt] ~ {
po: pagingObjectT;
po ← PagingObjectForSegementId[optr.segmentId];
FOR loai:
LIST
OF AllocItem ← VMAllocList, loai.rest
UNTIL loai =
NIL
DO
IF loai.first.pagingObject = po
AND (loai.first.offset > optr.lowOffset + size)
AND (loai.first.offset + loai.first.size < optr.lowOffset)
THEN {
}
ELSE {
loai.first.dirtied ← TRUE;
loai.first.timeOfLastDirty ← BasicTime.Now[];
loai.first.dirtiedDuringEphoc ← CheckPointEphocNumber;
EXIT;
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
Exported port procedures
nameServerPort:
PUBLIC PROC
RETURNS [p: portT] ~ {
get my port to the name server (name←server←port)
p ← [3];
};
MachPortsLookup:
PUBLIC
PROC [targetTask: taskT, raiseSignal:
BOOL]
RETURNS [intPortSet: portArrayT, intPortArrayCount:
INT, kernCode: kernReturnT] ~ {
get my port to the service port (service←port)
xPortArray: REF ARRAY[0..3] OF portT ← NEW[ARRAY[0..3] OF portT ← [[0], [1], [2], [3]]];
intPortArrayCount ← 4;
kernCode ← KernSuccess;
intPortSet ← LOOPHOLE[xPortArray];
};
nextPort: portT ← [10];
portAllocate:
PUBLIC
PROC [targetTask: taskT, raiseSignal:
BOOL]
RETURNS [newPort: portT, kernCode: kernReturnT ← -1] ~
TRUSTED {
send a message
nextPort ← [nextPort + 1];
newPort ← nextPort;
kernCode ← KernSuccess;
};
portRestrict:
PUBLIC PROC [targetTask: taskT, port: portT, raiseSignal:
BOOL]
RETURNS [kernCode: kernReturnT ← -1] ~ {
restricts port so that msgReceive must be used the port number, not PortDefault
kernCode ← KernSuccess;
};
portUnrestrict:
PUBLIC PROC [targetTask: taskT, port: portT, raiseSignal:
BOOL]
RETURNS [kernCode: kernReturnT ← -1] ~ {
unrestricts port so that PortDefault to msgReceive can receive from this port
kernCode ← KernSuccess;
};
Exported recoverable storage management procedures
DSInitialize:
PUBLIC
PROC [dsPort: portT, raiseSignal:
BOOL]
RETURNS [serverID: serverIdT, tsPort, mPort, sPort: portT, sharedMemAddr: vmAddressT, seqDescList: ListOfSegmentDesc ← NIL, seqPortList: ListOfPorts, kernCode: Mach.kernReturnT ← -1] ~
TRUSTED {
Initialize the data server.
serverID ← [1989];
tsPort ← [4];
mPort ← [5];
sPort ← [6];
sharedMemAddr ← 0;
seqDescList ← LIST[[serverId: [1234], segmentId: [1066], logicalDisk: 'Z, unused: 'z, highSize: 0, lowSize: RecoverableMemoryPagingSize]];
seqPortList ← LIST[RecoverableMemoryPagingObject];
SegmentIdPagingObjectMap ← LIST[
[[1066], RecoverableMemoryPagingObject, CamelotRecoverable.CamelotRecoverableFile],
[YggSunOS.LogSegmentID, LogPagingObject, CamelotRecoverable.CamelotLogFile],
[[1492], [9], CamelotRecoverable.RestartFile]];
kernCode ← KernSuccess;
};
CamelotRecoverableInit:
PUBLIC PROC
RETURNS [seqDescList: Camelot.ListOfSegmentDesc, seqPortList: Mach.ListOfPorts] ~ {
seqDescList ← LIST[[serverId: [1234], segmentId: YggSunOS.LogSegmentID, logicalDisk: 'K, unused: 'k, highSize: 0, lowSize: YggSunOS.LogFileSize * YggSunOS.LogBytesPerPage], [serverId: [1234], segmentId: [1492], logicalDisk: 'R, unused: 'r, highSize: 0, lowSize: 4096]];
seqPortList ← LIST[LogPagingObject, [9]];
};
RecoverableMemoryPagingObject: pagingObjectT ← [7];
RecoverableMemoryPagingSize: CARD32 ← 40960000;
LogPagingObject: pagingObjectT ← [8];
SegmentIdPagingObjectMap: LIST OF SegmentIdPagingObjectMapItem;
SegmentIdPagingObjectMapItem:
TYPE =
RECORD[
segmentId: segmentIdT,
pagingObject: pagingObjectT,
backingFile: FS.OpenFile
];
PagingObjectForSegementId:
PROC [segmentId: segmentIdT]
RETURNS [pagingObject: pagingObjectT] ~ {
FOR lospom:
LIST
OF SegmentIdPagingObjectMapItem ← SegmentIdPagingObjectMap, lospom.rest
UNTIL lospom =
NIL
DO
IF lospom.first.segmentId = segmentId THEN RETURN[lospom.first.pagingObject];
REPEAT FINISHED => ERROR
ENDLOOP;
};
SegementIdForPagingObject:
PROC [pagingObject: pagingObjectT]
RETURNS [segmentId: segmentIdT] ~ {
FOR lospom:
LIST
OF SegmentIdPagingObjectMapItem ← SegmentIdPagingObjectMap, lospom.rest
UNTIL lospom =
NIL
DO
IF lospom.first.pagingObject = pagingObject THEN RETURN[lospom.first.segmentId];
REPEAT FINISHED => ERROR
ENDLOOP;
};
FileForPagingObject:
PROC [pagingObject: pagingObjectT]
RETURNS [backingFile:
FS.OpenFile] ~ {
FOR lospom:
LIST
OF SegmentIdPagingObjectMapItem ← SegmentIdPagingObjectMap, lospom.rest
UNTIL lospom =
NIL
DO
IF lospom.first.pagingObject = pagingObject THEN RETURN[lospom.first.backingFile];
REPEAT FINISHED => ERROR
ENDLOOP;
};
DSPinObject:
PUBLIC
PROC [dsPort: portT, tid: tidT, optr: optrT, size: uInt, raiseSignal:
BOOL]
RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Pin an object in preparation for modification. optr is the Camelot recoverable storage "address", not the VM address
effectiveOptr: optrT ← optr;
effectiveSize: uInt ← size;
IF BYTES[UNIT] = 2 THEN {
effectiveOptr.lowOffset ← PBasics.BITAND[optr.lowOffset, 0FFFFFFFEH];
effectiveSize ← PBasics.BITAND[optr.lowOffset - effectiveOptr.lowOffset + size + 1, 0FFFFFFFEH];
};
rememberPin[tid, effectiveOptr, effectiveSize];
noteDirtyOfMemory[effectiveOptr, effectiveSize];
kernCode ← KernSuccess;
};
OpenTraceFileAndStream:
PROC ~ {
TraceFileStream ← FS.StreamOpen[fileName: "///Trace/LogTrace.txt", accessOptions: $create, keep: 8, createByteCount: 25600];
};
DSLogNewValue:
PUBLIC
PROC [dsPort: portT, tid: tidT, optr: optrT, newValue: pointerT, newValueCnt:
INT, raiseSignal:
BOOL]
RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Send a new value of an object to the log.
thisRecord: YggLog.RecordID;
[thisRecord: thisRecord] ← YggLog.Write[trans: tid, logRecordPhaseType: redo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[newValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting
THEN {
p:
ENTRY
PROC ~ {
IO.PutF[TraceFileStream, "New value at %g for seg: %g, offset: %b, byteSize: %g(%b)\n", IO.card[thisRecord.low], IO.card[optr.segmentId.value], IO.card[optr.lowOffset], IO.card[newValueCnt], IO.card[newValueCnt]];
IO.Flush[TraceFileStream];
};
p[];
};
monitoredChangePin[transID: tid, loggingNow: TRUE, forceClear: FALSE, optr: optr, size: newValueCnt];
kernCode ← KernSuccess;
};
NumberOfShortLogRecords: INT ← 0;
DSLogOldValueNewValue:
PUBLIC
PROC [dsPort: portT, tid: tidT, optr: optrT, oldValue: pointerT, oldValueCnt:
INT, newValue: pointerT, newValueCnt:
INT, raiseSignal:
BOOL]
RETURNS [kernCode: Mach.kernReturnT ← -1] ~ {
Send a new value of an object to the log.
thisRecord: YggLog.RecordID;
[] ← YggLog.Write[trans: tid, logRecordPhaseType: undo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[oldValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
IF newValueCnt <= 2 THEN {
NumberOfShortLogRecords ← NumberOfShortLogRecords + 1;
};
[thisRecord: thisRecord] ← YggLog.Write[trans: tid, logRecordPhaseType: redo,
recordType: writeBytes, optr: optr, recordData: [base: LOOPHOLE[newValue], length: (newValueCnt+3)/4, rest: NIL], force: FALSE];
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting
THEN {
p:
ENTRY
PROC ~ {
IO.PutF[TraceFileStream, "Old/new at %b for seg: %g, offset: %b, byteSize: %g(%b)\n", IO.card[thisRecord.low], IO.card[optr.segmentId.value], IO.card[optr.lowOffset], IO.card[newValueCnt], IO.card[newValueCnt]];
IO.Flush[TraceFileStream];
};
p[];
};
monitoredChangePin[transID: tid, loggingNow: TRUE, forceClear: FALSE, optr: optr, size: oldValueCnt];
kernCode ← KernSuccess;
};
DSQInit:
PUBLIC
PROC [sharedMemAddr: Mach.vmAddressT] ~ {
Init the shared memory queue (emulation does nothing).
};
DSQPreflush:
PUBLIC
PROC [dsPort: Mach.portT, optr: optrT, sizeInBytes: uInt] ~ {
Preflush some dirty memory
po: pagingObjectT;
po ← PagingObjectForSegementId[optr.segmentId];
FOR loai:
LIST
OF AllocItem ← VMAllocList, loai.rest
UNTIL loai =
NIL
DO
IF loai.first.pagingObject = po
AND loai.first.offset = optr.lowOffset
THEN {
backingFile: FS.OpenFile;
pages: INT ← -1;
firstPage: INT ← -1;
IF sizeInBytes # loai.first.size THEN ERROR;
IF loai.first.cvmHandle = NIL THEN ERROR;
pages ← FS.PagesForBytes[loai.first.size];
firstPage ← FS.PagesForBytes[loai.first.offset];
backingFile ← FileForPagingObject[loai.first.pagingObject];
FS.Write[file: backingFile, to: firstPage, nPages: pages, from: LOOPHOLE[loai.first.mappedAddress]];
loai.first.dirtied ← FALSE;
TRUSTED{VM.Free[interval: loai.first.interval];};
EXIT;
};
REPEAT FINISHED => ERROR;
ENDLOOP;
};
DSQZeroFill:
PUBLIC
PROC [dsPort: Mach.portT, optr: optrT, sizeInBytes: uInt] ~ {
Zero some memory.
po: pagingObjectT;
po ← PagingObjectForSegementId[optr.segmentId];
[] ← vmAllocateWithPagerInner[sizeInBytes, po, optr.lowOffset, TRUE];
};
Exported transaction management procedures
TAAddApplication:
PUBLIC PROC [tPort: portT, atPort: portT, authName: Rope.
ROPE, raiseSignal:
BOOL]
RETURNS [applicationID: applicationIdT, taPort: portT, kernCode: Mach.kernReturnT ← -1] ~
TRUSTED {
Initialize an application to the transaction manager.
kernCode ← KernSuccess;
};
TABegin:
PUBLIC
PROC [taPort: portT, parentTid: tidT, transType: transactionTypeT, raiseSignal:
BOOL]
RETURNS [newTid: tidT, kernCode: Mach.kernReturnT ← -1] ~
TRUSTED {
Start a new transaction.
newTid ← GetNextTrans[];
IF ~noteNewTrans[newTid] THEN ERROR;
kernCode ← KernSuccess;
};
TAEnd:
PUBLIC
PROC [taPort: portT, tid: tidT, protocolType: protocolTypeT, raiseSignal:
BOOL]
RETURNS [timestamp: timestampT, status:
INT, kernCode: Mach.kernReturnT ← -1] ~
TRUSTED {
Try to commit a transaction.
thisRecord: YggLog.RecordID;
transID: YggTransaction.TransID ← LOOPHOLE[tid];
block: LIST OF YggLog.Block;
blockArrayOfWords: LONG POINTER TO ARRAY[0..1024/4) OF CARD32;
block ← getScratchBlock[];
block.first.length ← 2;
blockArrayOfWords ← LOOPHOLE[block.first.base];
blockArrayOfWords[0] ← tid.top.highTicker;
blockArrayOfWords[1] ← tid.top.lowTicker;
[thisRecord: thisRecord] ← YggLog.Write[trans: tid, logRecordPhaseType: analysis,
recordType: commitTrans, optr: [[0],0,0], recordData: block.first, force: TRUE];
returnScratchBlock[block];
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting
THEN {
p:
ENTRY
PROC ~
TRUSTED {
IO.PutF[TraceFileStream, "TAEnd at %g\n", IO.card[thisRecord.low] ];
IO.Flush[TraceFileStream];
};
p[];
};
monitoredChangePin[transID: transID, loggingNow: FALSE, forceClear: FALSE, optr: [[0],0,0] , size: 0];
IF ~removeTrans[transID] THEN ERROR;
status ← ErSuccess;
kernCode ← KernSuccess;
};
TAKill:
PUBLIC
PROC [taPort: portT, tid: tidT, status:
INT, raiseSignal:
BOOL]
RETURNS [kernCode: Mach.kernReturnT ← -1] ~
TRUSTED {
Try to abort a transaction.
thisRecord: YggLog.RecordID;
transID: YggTransaction.TransID ← LOOPHOLE[tid];
block: LIST OF YggLog.Block;
blockArrayOfWords: LONG POINTER TO ARRAY[0..1024/4) OF CARD32;
block ← getScratchBlock[];
block.first.length ← 2;
blockArrayOfWords ← LOOPHOLE[block.first.base];
blockArrayOfWords[0] ← tid.top.highTicker;
blockArrayOfWords[1] ← tid.top.lowTicker;
YggTransaction.Suspend[transID, ErWaitingTransAborted];
[thisRecord: thisRecord] ← YggLog.Write[trans: tid, logRecordPhaseType: analysis,
recordType: abortTrans, optr: [[0],0,0], recordData: block.first, force: TRUE];
Insert backwards scan of log, calling CamelotMIG.SRRestoreObjectX
IF TraceLogWriting AND TraceFileStream = NIL THEN OpenTraceFileAndStream[];
IF TraceLogWriting
THEN {
p:
ENTRY
PROC ~ TRUSTED {
IO.PutF[TraceFileStream, "TAKill at %g\n", IO.card[thisRecord.low] ];
IO.Flush[TraceFileStream];
};
p[];
};
monitoredChangePin[transID: tid, loggingNow: FALSE, forceClear: FALSE, optr: [[0],0,0] , size: 0];
returnScratchBlock[block];
kernCode ← KernSuccess;
};
Internal transaction procedures
savedScratchTrans: Trans;
savedScratchTransForEntries: Trans ← NEW[TransRep];
subtransactionAndTransactionMap: RedBlackTree.Table;
checkpointNullTransObj: Trans ← NEW[TransRep];
Trans: TYPE = REF TransRep;
TransRep:
TYPE =
RECORD [
transID: YggEnvironment.TransID,
outcome: YggEnvironment.Outcome,
latched: BOOL ← FALSE,
finishTime: BasicTime.GMT ← BasicTime.nullGMT,
suspendTime: BasicTime.GMT ← BasicTime.nullGMT,
pageBucketsModified: RECORD[
low32: CARD32,
high32: CARD32
]
];
forABit: CONDITION;
getScratchTrans:
ENTRY
PROC
RETURNS [scratchTrans: Trans] ~ {
ENABLE UNWIND => {};
IF savedScratchTrans #
NIL
THEN {
scratchTrans ← savedScratchTrans;
savedScratchTrans ← NIL;
}
ELSE {
scratchTrans ← NEW[TransRep];
};
};
noteNewTrans:
PROC [transID: YggEnvironment.TransID]
RETURNS [parentOK:
BOOL ←
TRUE] ~ {
newTrans: Trans;
insertTrans:
ENTRY
PROC ~ {
ENABLE UNWIND => {};
data: RedBlackTree.UserData;
IF YggTransaction.IsTopLevel[transID]
THEN {
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data # NIL THEN ERROR;
}
ELSE ERROR;
RedBlackTree.Insert[subtransactionAndTransactionMap, newTrans, newTrans];
};
newTrans ← NEW[TransRep ← [transID: transID, outcome: active, pageBucketsModified: [0, 0] ]];
insertTrans[];
};
StateDuringRecovery:
PUBLIC
PROC [transID: YggEnvironment.TransID]
RETURNS [outcome: YggEnvironment.Outcome] ~ {
transFound: BOOL ← FALSE;
trans: Trans ← NIL;
[transFound, trans] ← FindTrans[transID];
IF transFound
THEN {
RETURN[commit];
}
ELSE RETURN[unknown];
};
FindTrans:
ENTRY
PROC [transID: YggEnvironment.TransID, setLatch:
BOOL ←
FALSE]
RETURNS [transFound:
BOOL ←
FALSE, trans: Trans ←
NIL] ~ {
ENABLE UNWIND => {};
[transFound, trans] ← innerFindTrans[transID, setLatch];
};
innerFindTrans:
INTERNAL
PROC [transID: YggEnvironment.TransID, setLatch:
BOOL ←
FALSE]
RETURNS [transFound:
BOOL ←
FALSE, trans: Trans ←
NIL] ~ {
ENABLE UNWIND => {};
data: RedBlackTree.UserData;
IF YggTransaction.IsNullTrans[transID] THEN RETURN [transFound: TRUE, trans: checkpointNullTransObj];
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data =
NIL
THEN {
RETURN[FALSE, NIL];
}
ELSE {
trans ← NARROW[data];
WHILE trans.latched DO WAIT forABit ENDLOOP;
IF setLatch THEN trans.latched ← TRUE;
RETURN[TRUE, trans];
};
};
unlatchTrans:
ENTRY
PROC [trans: Trans] ~ {
trans.latched ← FALSE;
};
removeTrans:
ENTRY PROC [transID: YggEnvironment.TransID]
RETURNS [transFound:
BOOL ←
FALSE] ~ {
data: RedBlackTree.UserData;
savedScratchTransForEntries.transID ← transID;
data ← RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries];
IF data =
NIL
THEN {
RETURN[FALSE];
}
ELSE {
trans: Trans;
trans ← NARROW[data];
[] ← RedBlackTree.Delete[subtransactionAndTransactionMap, savedScratchTransForEntries];
RETURN[TRUE];
};
};
Internal transaction procedures
pageMask: CARD32 = 077B;
noBuckets: CARD = 64;
byteMask: CARD32 = 01777B;
bytesPerBucket: CARD = 1024;
pageBuckets: ARRAY [0..noBuckets) OF LIST OF pinItem;
pinItem:
TYPE =
RECORD[
transID: YggEnvironment.TransID,
optr: optrT,
size: uInt,
pc: pinCode
];
pinCode:
TYPE = {pin, pinAndLogged, preventPin};
pin => it's pinned and maybe updated
pinAndLogged => it's pinned and updated, and the log write has been buffered. A flush of the log will make everything OK.
preventPin => disallow pins for this page
hashOptr:
PROC [optr: optrT]
RETURNS [
CARD] ~ {
lowPageNo: CARD32;
lowPageNo ← PBasics.BITRSHIFT[optr.lowOffset, 10];
RETURN[PBasics.BITAND[lowPageNo, pageMask]];
};
tooLow: BOOL ← FALSE;
debugAddress: CARD32 ← 1;
rememberPin:
ENTRY
PROC [transID: YggEnvironment.TransID, optr: optrT, size: uInt, doPreventPin:
BOOL ←
FALSE] ~ {
nowOptr: optrT;
sizeLeft: uInt;
innerRememberPin:
INTERNAL
PROC [thisOptr: optrT, bytes:
CARD]
RETURNS [tryAgain:
BOOL ←
FALSE] ~ {
ENABLE UNWIND => {};
hashPage: CARD;
lowPageNo: CARD32;
transFound: BOOL ← FALSE;
trans: Trans ← NIL;
lowPageNo ← PBasics.BITRSHIFT[thisOptr.lowOffset, 10];
hashPage ← hashOptr[thisOptr];
FOR lopi:
LIST
OF pinItem ← pageBuckets[hashPage], lopi.rest
UNTIL lopi =
NIL
DO
loopLowPageNo: CARD32;
loopLowPageNo ← PBasics.BITRSHIFT[lopi.first.optr.lowOffset, 10];
IF loopLowPageNo = lowPageNo AND lopi.first.pc = preventPin THEN RETURN[TRUE]
ENDLOOP;
pageBuckets[hashPage] ← CONS[[transID, thisOptr, bytes, pin], pageBuckets[hashPage]];
[transFound, trans] ← innerFindTrans[transID];
IF ~transFound THEN ERROR;
IF hashPage < 32
THEN {
trans.pageBucketsModified.low32 ← PBasics.BITOR[PBasics.BITLSHIFT[value: 1, count: hashPage], trans.pageBucketsModified.low32];
}
ELSE {
trans.pageBucketsModified.high32 ← PBasics.BITOR[PBasics.BITLSHIFT[value: 1, count: hashPage-32], trans.pageBucketsModified.high32];
};
};
nowOptr ← optr;
sizeLeft ← size;
IF optr.segmentId.value < 50
AND optr.segmentId.value # 0
THEN {
tooLow ← TRUE;
};
IF optr.lowOffset = debugAddress
THEN {
debugHits ← debugHits + 1;
};
WHILE sizeLeft > 0
DO
firstByteOnPage: CARD;
bytesThisPage: CARD;
firstByteOnPage ← PBasics.BITAND[nowOptr.lowOffset, byteMask];
bytesThisPage ← MIN[sizeLeft, bytesPerBucket - firstByteOnPage];
IF bytesThisPage = 0 OR bytesThisPage > bytesPerBucket THEN ERROR;
IF innerRememberPin[nowOptr, bytesThisPage]
THEN {
WAIT forABit;
LOOP;
};
sizeLeft ← sizeLeft - bytesThisPage;
nowOptr.lowOffset ← nowOptr.lowOffset + bytesThisPage;
IF sizeLeft # 0 AND PBasics.BITAND[nowOptr.lowOffset, byteMask] # 0 THEN ERROR;
ENDLOOP;
};
monitoredChangePin:
ENTRY
PROC [transID: YggTransaction.TransID, loggingNow:
BOOL, optr: optrT, forceClear:
BOOL, size: uInt] ~ {
changePin[transID, loggingNow, forceClear, optr, size];
};
changePin:
INTERNAL
PROC [transID: YggTransaction.TransID, loggingNow:
BOOL, forceClear:
BOOL, optr: optrT, size: uInt] ~ {
trans: Trans;
transFound: BOOL ← TRUE;
lowMod: CARD32;
highMod: CARD32;
gotIt: BOOL ← FALSE;
nowOptr: optrT;
sizeLeft: uInt;
removePinFromBucket:
PROC [bucket:
CARD, exactMatch:
BOOL, thisOptr: optrT, bytes:
CARD]
RETURNS [gotOne:
BOOL ←
FALSE] ~ {
prev: LIST OF pinItem ← NIL;
FOR lopi:
LIST
OF pinItem ← pageBuckets[bucket], lopi.rest
UNTIL lopi =
NIL
DO
IF YggTransaction.EqualTrans[trans.transID, lopi.first.transID]
AND (~loggingNow
OR ~exactMatch
OR (lopi.first.optr.segmentId = thisOptr.segmentId
AND lopi.first.optr.lowOffset = thisOptr.lowOffset))
THEN {
IF loggingNow
THEN {
IF lopi.first.pc # pin THEN LOOP;
lopi.first.pc ← pinAndLogged;
}
ELSE {
IF ~forceClear AND lopi.first.pc # pinAndLogged THEN ERROR;
IF prev = NIL THEN pageBuckets[bucket] ← lopi.rest
ELSE prev.rest ← lopi.rest;
};
gotOne ← TRUE;
LOOP;
};
prev ← lopi;
ENDLOOP;
};
[transFound, trans] ← innerFindTrans[transID];
IF ~transFound THEN ERROR;
IF optr.lowOffset = debugAddress
THEN {
debugHits ← debugHits + 1;
};
IF optr = [[0],0,0]
AND size = 0
THEN {
gotOne: BOOL ← FALSE;
lowMod ← trans.pageBucketsModified.low32;
FOR bucktNo:
INT
IN [0..32)
DO
IF PBasics.
BITAND[lowMod, 1] = 1
THEN {
gotOne ← removePinFromBucket[bucktNo, FALSE, optr, size];
gotIt ← gotIt OR gotOne;
IF ~gotOne THEN ERROR;
};
lowMod ← PBasics.BITRSHIFT[value: lowMod, count: 1];
ENDLOOP;
highMod ← trans.pageBucketsModified.high32;
FOR bucktNo:
INT
IN [32..64)
DO
IF PBasics.
BITAND[highMod, 1] = 1
THEN {
gotOne ← removePinFromBucket[bucktNo, FALSE, optr, size];
gotIt ← gotIt OR gotOne;
IF ~gotOne THEN ERROR;
};
highMod ← PBasics.BITRSHIFT[value: highMod, count: 1];
ENDLOOP;
IF ~loggingNow THEN trans.pageBucketsModified ← [0 ,0];
}
ELSE {
nowOptr ← optr;
sizeLeft ← size;
IF optr.segmentId.value < 50
AND optr.segmentId.value # 0
THEN {
tooLow ← TRUE;
};
WHILE sizeLeft > 0
DO
firstByteOnPage: CARD;
bytesThisPage: CARD;
bucktNo: INT;
gotOne: BOOL ← FALSE;
firstByteOnPage ← PBasics.BITAND[nowOptr.lowOffset, byteMask];
bytesThisPage ← MIN[sizeLeft, bytesPerBucket - firstByteOnPage];
IF bytesThisPage = 0 OR bytesThisPage > bytesPerBucket THEN ERROR;
bucktNo ← hashOptr[nowOptr];
IF ~(gotOne ← removePinFromBucket[bucktNo, TRUE, nowOptr, bytesThisPage]) THEN ERROR;
sizeLeft ← sizeLeft - bytesThisPage;
nowOptr.lowOffset ← nowOptr.lowOffset + bytesThisPage;
IF sizeLeft # 0 AND PBasics.BITAND[nowOptr.lowOffset, byteMask] # 0 THEN ERROR;
ENDLOOP;
IF ~loggingNow THEN trans.pageBucketsModified ← [0 ,0];
};
};