<> <> <> <<>> <> <<>> DIRECTORY BasicTime USING [GMT, Now, nullGMT, Period], Camelot USING [AcCommitFailed, btidT, ErSuccess, TABegin, TAEnd, TAKill, tidT], Mach USING [portT], PBasics USING [Comparison], Process USING [Detach, InitializeCondition, MsecToTicks, PauseMsec], RedBlackTree USING [Compare, Create, Delete, GetKey, Insert, Lookup, LookupNextLarger, LookupSmallest, Table, UserData], YggDIDMap USING [Abort, Commit, PreCommit], YggEnvironment USING [CommitOrAbort, DID, nullDID, nullTransID, OperationFailure, Outcome, taPort, TransID, UnknownType], YggFile USING [Abort, Commit, PreCommit], YggLock USING [ReleaseTransactionLocks], YggNaming USING [Abort, Commit, PreCommit], YggRep USING [Abort, Commit, PreCommit, VDoc], YggTransaction USING [Outcome, TransID]; YggTransactionImpl: CEDAR MONITOR IMPORTS BasicTime, Camelot, Process, RedBlackTree, YggDIDMap, YggEnvironment, YggFile, YggLock, YggNaming, YggRep EXPORTS YggTransaction ~ BEGIN <> TransID: TYPE = YggEnvironment.TransID; -- really a Camelot.tidT RequestedOutcome: TYPE = YggEnvironment.CommitOrAbort; Outcome: TYPE = YggEnvironment.Outcome; Unknown: PUBLIC ERROR [what: YggEnvironment.UnknownType] = CODE; OperationFailed: PUBLIC ERROR [why: YggEnvironment.OperationFailure] = CODE; nullTransID: PUBLIC TransID; subtransactionAndTransactionMap: RedBlackTree.Table; Trans: TYPE = REF TransRep; TransRep: TYPE = RECORD [ transID: TransID, outcome: Outcome, latched: BOOL _ FALSE, finishTime: BasicTime.GMT _ BasicTime.nullGMT, suspendTime: BasicTime.GMT _ BasicTime.nullGMT, parentTrans: Trans, -- circular data structures subTrans: LIST OF Trans, possibleDocumentUpdates: LIST OF YggRep.VDoc _ NIL, defaultContainer: YggEnvironment.DID _ YggEnvironment.nullDID ]; forABit: CONDITION; <> IsTopLevel: PUBLIC PROC [transID: TransID] RETURNS [topLevel: BOOL] ~ { IF transID.top.nodeId = transID.bottom.nodeId AND transID.top.randomBits = transID.bottom.randomBits AND transID.top.highTicker = transID.bottom.highTicker AND transID.top.lowTicker = transID.bottom.lowTicker THEN RETURN [TRUE] ELSE RETURN [FALSE]; }; IsNullTrans: PUBLIC PROC [ transID: YggTransaction.TransID] RETURNS [null: BOOL] ~ { IF transID.top.nodeId = YggEnvironment.nullTransID.top.nodeId AND transID.top.randomBits = YggEnvironment.nullTransID.top.randomBits AND transID.top.highTicker = YggEnvironment.nullTransID.top.highTicker AND transID.top.lowTicker = YggEnvironment.nullTransID.top.lowTicker THEN RETURN [TRUE] ELSE RETURN [FALSE]; }; EqualTrans: PUBLIC PROC [transID1: TransID, transID2: TransID] RETURNS [equal: BOOL] ~ { EqualBTid: PROC [bTid1, bTid2: Camelot.btidT] RETURNS [equal: BOOL] ~ { IF bTid1.nodeId = bTid2.nodeId AND bTid1.randomBits = bTid2.randomBits AND bTid1.highTicker = bTid2.highTicker AND bTid1.lowTicker = bTid2.lowTicker THEN RETURN [TRUE] ELSE RETURN [FALSE]; }; IF EqualBTid[transID1.top, transID2.top] AND EqualBTid[transID1.bottom, transID2.bottom] THEN RETURN [TRUE] ELSE RETURN [FALSE]; }; CompareTrans: PUBLIC PROC [transID1: TransID, transID2: TransID] RETURNS [comp: PBasics.Comparison] ~ { CompareBTid: PROC [bTid1, bTid2: Camelot.btidT] RETURNS [bcomp: PBasics.Comparison] ~ { SELECT bTid1.lowTicker FROM > bTid2.lowTicker => RETURN [greater]; < bTid2.lowTicker => RETURN [less]; ENDCASE => { SELECT bTid1.highTicker FROM > bTid2.highTicker => RETURN [greater]; < bTid2.highTicker => RETURN [less]; ENDCASE => { SELECT bTid1.nodeId FROM > bTid2.nodeId => RETURN [greater]; < bTid2.nodeId => RETURN [less]; ENDCASE => RETURN [equal]; }; }; }; comp _ CompareBTid[transID1.top, transID2.top]; IF comp = equal THEN comp _ CompareBTid[transID1.bottom, transID2.bottom]; }; CreateTrans: PUBLIC PROC [parentTransID: TransID] RETURNS [transID: TransID _ nullTransID] ~ { newTid: Camelot.tidT; [newTid: newTid] _ Camelot.TABegin[taPort: YggEnvironment.taPort, parentTid: parentTransID, transType: ttOvnvStandard, raiseSignal: TRUE]; IF ~noteNewTrans[newTid, parentTransID] THEN ERROR; transID _ newTid; }; NotePossibleDocumentUpdate: PUBLIC PROC [transID: TransID, vDoc: YggRep.VDoc] ~ { transFound: BOOL _ FALSE; trans: Trans _ NIL; [transFound: transFound, trans: trans] _ findTrans[transID]; IF transFound THEN { FOR pD: LIST OF YggRep.VDoc _ trans.possibleDocumentUpdates, pD.rest UNTIL pD = NIL DO IF pD.first = vDoc THEN {unlatchTrans[trans]; RETURN;}; ENDLOOP; trans.possibleDocumentUpdates _ CONS[vDoc, trans.possibleDocumentUpdates]; unlatchTrans[trans]; }; }; GetPossibleDocumentUpdates: PUBLIC PROC [transID: TransID] RETURNS [ vDocs: LIST OF YggRep.VDoc] ~ { transFound: BOOL _ FALSE; trans: Trans _ NIL; [transFound: transFound, trans: trans] _ findTrans[transID]; IF transFound THEN { unlatchTrans[trans]; RETURN [trans.possibleDocumentUpdates]; }; }; GetDefaultContainer: PUBLIC PROC [transID: TransID] RETURNS [did: YggEnvironment.DID _ YggEnvironment.nullDID] ~ { transFound: BOOL _ FALSE; trans: Trans _ NIL; [transFound: transFound, trans: trans] _ findTrans[transID]; IF transFound THEN { did _ trans.defaultContainer; unlatchTrans[trans]; }; }; SetDefaultContainer: PUBLIC PROC [transID: TransID, did: YggEnvironment.DID] ~ { transFound: BOOL _ FALSE; trans: Trans _ NIL; [transFound: transFound, trans: trans] _ findTrans[transID]; IF transFound THEN { trans.defaultContainer _ did; unlatchTrans[trans]; }; }; Check: PUBLIC PROC [ transID: YggTransaction.TransID] RETURNS [outcome: YggTransaction.Outcome _ unknown] ~ { transFound: BOOL _ FALSE; trans: Trans _ NIL; [transFound: transFound, trans: trans] _ findTrans[transID]; IF transFound THEN { unlatchTrans[trans]; RETURN [trans.outcome]; } ELSE { IF EqualTrans[transID, YggEnvironment.nullTransID] THEN RETURN [active]} ; }; GetParent: PUBLIC PROC [ transID: YggTransaction.TransID] RETURNS [transFound: BOOL _ FALSE, parentTransID: YggTransaction.TransID _ nullTransID] ~ { trans: Trans _ NIL; parentTrans: Trans _ NIL; [transFound, trans, parentTrans] _ findTrans[transID]; IF transFound AND parentTrans # NIL THEN parentTransID _ parentTrans.transID; IF transFound THEN unlatchTrans[trans]; }; Suspend: PUBLIC PROC [transID: TransID, status: INT] ~ { transFound: BOOL _ FALSE; trans: Trans _ NIL; [transFound: transFound, trans: trans] _ findTrans[transID, TRUE]; IF transFound THEN { trans.outcome _ suspended; trans.suspendTime _ BasicTime.Now[]; unlatchTrans[trans]; }; }; Finish: PUBLIC PROC [transID: TransID, requestedOutcome: RequestedOutcome] RETURNS [outcome: Outcome] ~ { transFound: BOOL _ FALSE; trans: Trans _ NIL; [transFound: transFound, trans: trans] _ findTrans[transID, TRUE]; IF transFound THEN { SELECT trans.outcome FROM active => { }; suspended => { SELECT requestedOutcome FROM commit => { unlatchTrans[trans]; ERROR OperationFailed[cantCommitSuspendedTrans]; }; abort => { unlatchTrans[trans]; ERROR OperationFailed[cantAbortSuspendedTrans]; }; ENDCASE => { unlatchTrans[trans]; ERROR; }; }; commit => { SELECT requestedOutcome FROM commit => { unlatchTrans[trans]; ERROR OperationFailed[cantCommitCommittedTrans]; }; abort => { unlatchTrans[trans]; ERROR OperationFailed[cantAbortCommittedTrans]; }; ENDCASE => { unlatchTrans[trans]; ERROR; }; }; abort => { SELECT requestedOutcome FROM commit => { unlatchTrans[trans]; ERROR OperationFailed[cantCommitAbortedTrans]; }; abort => { unlatchTrans[trans]; ERROR OperationFailed[cantAbortAbortedTrans]; }; ENDCASE => { unlatchTrans[trans]; ERROR; }; }; ENDCASE => { unlatchTrans[trans]; ERROR; }; trans.outcome _ requestedOutcome; trans.finishTime _ BasicTime.Now[]; unlatchTrans[trans]; SELECT requestedOutcome FROM commit => { status: INT; callAllPreCommitProcs[transID]; status _ Camelot.TAEnd[taPort: YggEnvironment.taPort, tid: transID, protocolType: ptNonBlocking, raiseSignal: TRUE].status; SELECT status FROM Camelot.ErSuccess => {outcome _ commit; callAllCommitProcs[transID];}; Camelot.AcCommitFailed => {outcome _ abort; callAllAbortProcs[transID];}; ENDCASE => ERROR; }; abort => { callAllAbortProcs[transID]; [] _ Camelot.TAKill[taPort: YggEnvironment.taPort, tid: transID, status: 0, raiseSignal: TRUE]; outcome _ abort; }; ENDCASE => ERROR; IF ~removeTrans[transID] THEN ERROR; } ELSE ERROR Unknown[transID]; }; <> callAllPreCommitProcs: PROC [transID: TransID] ~ { YggRep.PreCommit[transID]; -- Rep must preceed File (Rep writes its volatile form of document's contents, attributes, and links to its files during precommit, so some file size changes are possible) YggNaming.PreCommit[transID]; -- Naming must preceed File (Naming really does the updates during precommit, so some file size changes are possible) YggFile.PreCommit[transID]; -- File must preceed DIDMap (File performs its intentions during precommit) YggDIDMap.PreCommit[transID]; }; callAllCommitProcs: PROC [transID: TransID] ~ { YggRep.Commit[transID]; YggNaming.Commit[transID]; YggFile.Commit[transID]; YggDIDMap.Commit[transID]; [] _ YggLock.ReleaseTransactionLocks[transID, FALSE]; }; callAllAbortProcs: PROC [transID: TransID] ~ { YggRep.Abort[transID]; YggNaming.Abort[transID]; YggFile.Abort[transID]; YggDIDMap.Abort[transID]; [] _ YggLock.ReleaseTransactionLocks[transID, FALSE]; }; <> savedScratchTrans: Trans; savedScratchTransForEntries: Trans _ NEW[TransRep]; getScratchTrans: ENTRY PROC RETURNS [scratchTrans: Trans] ~ { ENABLE UNWIND => {}; IF savedScratchTrans # NIL THEN { scratchTrans _ savedScratchTrans; savedScratchTrans _ NIL; } ELSE { scratchTrans _ NEW[TransRep]; }; }; returnScratchRefChunk: ENTRY PROC [scratchTrans: Trans] ~ { savedScratchTrans _ scratchTrans; }; noteNewTrans: PROC [transID: TransID, parentTransID: TransID] RETURNS [parentOK: BOOL _ FALSE] ~ { newTrans: Trans; insertTrans: ENTRY PROC RETURNS [monitoredParentOK: BOOL _ FALSE] ~ { ENABLE UNWIND => {}; data: RedBlackTree.UserData; IF ~IsTopLevel[transID] THEN { savedScratchTransForEntries.transID _ parentTransID; data _ RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries]; IF data = NIL THEN { RETURN[FALSE]; } ELSE { parent: Trans; parent _ NARROW[data]; parent.subTrans _ CONS[newTrans, parent.subTrans]; newTrans.parentTrans _ parent; monitoredParentOK _ TRUE; }; } ELSE monitoredParentOK _ TRUE; RedBlackTree.Insert[subtransactionAndTransactionMap, newTrans, newTrans]; RETURN [TRUE]; }; newTrans _ NEW[TransRep _ [transID: transID, outcome: active, parentTrans: NIL, subTrans: NIL]]; parentOK _ insertTrans[]; }; findTrans: ENTRY PROC [transID: TransID, setLatch: BOOL _ FALSE] RETURNS [transFound: BOOL _ FALSE, trans: Trans _ NIL, parentTrans: Trans _ NIL] ~ { ENABLE UNWIND => {}; data: RedBlackTree.UserData; savedScratchTransForEntries.transID _ transID; data _ RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries]; IF data = NIL THEN { RETURN[FALSE, NIL, NIL]; } ELSE { trans _ NARROW[data]; WHILE trans.latched DO WAIT forABit ENDLOOP; trans.latched _ TRUE; RETURN[TRUE, trans, trans.parentTrans]; }; }; unlatchTrans: ENTRY PROC [trans: Trans] ~ { trans.latched _ FALSE; }; removeTrans: PROC [transID: TransID] RETURNS [transFound: BOOL _ FALSE] ~ { data: RedBlackTree.UserData; savedScratchTransForEntries.transID _ transID; data _ RedBlackTree.Lookup[subtransactionAndTransactionMap, savedScratchTransForEntries]; IF data = NIL THEN { RETURN[FALSE]; } ELSE { trans: Trans; trans _ NARROW[data]; IF trans.parentTrans # NIL THEN { prevTrans: LIST OF Trans _ NIL; FOR lot: LIST OF Trans _ trans.parentTrans.subTrans, lot.rest UNTIL lot = NIL DO IF EqualTrans[transID, lot.first.transID] THEN { IF prevTrans = NIL THEN trans.parentTrans.subTrans _ lot.rest ELSE prevTrans.rest _ lot.rest; EXIT; }; prevTrans _ lot; REPEAT FINISHED => {ERROR}; ENDLOOP; trans.parentTrans _ NIL; }; [] _ RedBlackTree.Delete[subtransactionAndTransactionMap, savedScratchTransForEntries]; RETURN[TRUE]; }; }; <> <> GetKeyProc: RedBlackTree.GetKey = { <> trans: Trans _ NARROW[data]; RETURN[ trans ]; }; CompareProc: RedBlackTree.Compare = { <> dataTrans: Trans _ NARROW[data]; keyTrans: Trans _ NARROW[k]; SELECT keyTrans.transID.bottom.nodeId.value FROM > dataTrans.transID.bottom.nodeId.value => RETURN [greater]; < dataTrans.transID.bottom.nodeId.value => RETURN [less]; ENDCASE => { SELECT keyTrans.transID.bottom.highTicker FROM > dataTrans.transID.bottom.highTicker => RETURN [greater]; < dataTrans.transID.bottom.highTicker => RETURN [less]; ENDCASE => { SELECT keyTrans.transID.bottom.lowTicker FROM > dataTrans.transID.bottom.lowTicker => RETURN [greater]; < dataTrans.transID.bottom.lowTicker => RETURN [less]; ENDCASE => RETURN [equal]; }; }; }; <> milliSecondsBetweenClean: INT _ 3000; secondsToRetainTrans: INT _ 3600; -- for debugging, a long time CleanTransactionTableProcess: PROC ~ { DO data: RedBlackTree.UserData; now: BasicTime.GMT; Process.PauseMsec[milliSecondsBetweenClean]; now _ BasicTime.Now[]; data _ RedBlackTree.LookupSmallest[subtransactionAndTransactionMap]; WHILE data # NIL DO trans: Trans; trans _ NARROW[data]; IF trans.outcome # active AND BasicTime.Period[from: trans.finishTime, to: now] > secondsToRetainTrans THEN { IF ~removeTrans[trans.transID] THEN ERROR; }; data _ RedBlackTree.LookupNextLarger[subtransactionAndTransactionMap, data]; ENDLOOP; ENDLOOP; }; <> Init: PROC ~ { subtransactionAndTransactionMap _ RedBlackTree.Create[getKey: GetKeyProc, compare: CompareProc]; TRUSTED {Process.InitializeCondition[@forABit, Process.MsecToTicks[10]]; }; TRUSTED {Process.Detach[FORK CleanTransactionTableProcess[]]}; }; Init[]; END.