<> <> <> <> <> <> <> <> <<>> <> <> <> <> DIRECTORY YggEnvironment, YggIdentity USING [myLogVolumeID, myAlpineImportHandle, myFileStore], YggImport, YggInternal, YggTransaction, YggTransMgr, BasicTime, YggClientMap, YggConcreteTransID, ConvertUnsafe, YggCoordinator, YggCoordinatorControl, YggCoordinatorInternal, YggCoordinatorMap, YggLog, YggLogBasic, YggLogControl, YggLogInline, YggDummyProcess USING [Detach, EnableAborts, SetTimeout, SecondsToTicks, Ticks], Rope USING [Cat, Concat, ROPE], YggDummyRPC USING [Conversation, GetCaller], SafeStorage, YggMonitoringLog USING [AbortReason, notice, TransactionAbortInfo, TransactionBeginInfo, TransactionCommitInfo, OpFailureInfo]; YggCoordinatorImpl: CEDAR MONITOR LOCKS c USING c: Handle IMPORTS YggIdentity, YggImport, YggTransaction, BasicTime, YggClientMap, ConvertUnsafe, YggCoordinatorInternal, YggCoordinatorMap, YggLog, YggLogBasic, YggLogControl, YggLogInline, YggDummyProcess, Rope, YggDummyRPC, SafeStorage, YggMonitoringLog EXPORTS YggInternal, YggTransaction, YggTransMgr, YggCoordinatorControl = BEGIN Conversation: TYPE = YggEnvironment.Conversation; TransID: TYPE = YggEnvironment.TransID; nullTransID: TransID = YggEnvironment.nullTransID; RecordID: TYPE = YggLog.RecordID; RecordType: TYPE = YggLog.RecordType; Results: TYPE = YggCoordinator.Results; InternalProgrammingError: ERROR = CODE; Handle: TYPE = YggCoordinator.Handle; nullHandle: Handle = YggCoordinator.nullHandle; AbortReason: TYPE = YggMonitoringLog.AbortReason; CoordinatorObject: PUBLIC TYPE = YggCoordinator.Object; <> secondsWaitAfterCommFailure: INT = YggCoordinatorInternal.SecondsWaitAfterCommFailure; timeoutForResultsReturned: YggDummyProcess.Ticks = YggDummyProcess.SecondsToTicks[YggCoordinatorInternal.SecondsTimeoutForResultsReturned]; maxCoordinators: INT _ 500; usableLogFraction: REAL _ 0.5; zone: ZONE = SafeStorage.GetSystemZone[]; Create: PUBLIC PROC [conversation: Conversation, createLocalWorker: BOOL] RETURNS [transID: YggTransaction.TransID] = { <> <> <> c: Handle; message: Rope.ROPE; logBeginProc: PROC [YggMonitoringLog.TransactionBeginInfo]; -- (used to prevent race condition) {IF YggCoordinatorMap.Count[] >= maxCoordinators THEN { message _ "hit limit on coordinators"; GOTO serverBusy; }; IF YggLogBasic.LogUsage[] > usableLogFraction*YggLogBasic.LogFileSize[] THEN { message _ "log too full"; GOTO serverBusy; }; c _ ConsCoordinator[]; c.extras _ NEW[YggCoordinator.Info]; IF (c.extras.userRName _ YggDummyRPC.GetCaller[conversation]) = NIL THEN c.extras.userRName _ Rope.Cat["Local - ", YggClientMap.GetName[conversation]]; YggCoordinatorMap.Register[c]; IF createLocalWorker THEN <> YggTransaction.CreateWorker[conversation, c.transID, YggIdentity.myFileStore ! YggTransaction.Unknown => { message _ SELECT what FROM transID => "worker didn't recognize transID", coordinator => "worker doesn't know who we are" ENDCASE => ERROR; GOTO serverBusy }; < {>> <> <> <<}>> ]; IF (logBeginProc _ YggMonitoringLog.notice.beginTransaction) # NIL THEN logBeginProc[[ transID: c.transID, where: "CoordinatorImpl.Create", message: "" ]]; RETURN [c.transID]; EXITS serverBusy => { logProc: PROC [YggMonitoringLog.OpFailureInfo]; IF (logProc _ YggMonitoringLog.notice.operationFailed) # NIL THEN logProc[[ what: busy, where: "CoordinatorImpl.Create", message: message.Cat["; user = ", YggDummyRPC.GetCaller[conversation]] ]]; ERROR YggTransaction.OperationFailed[why: busy]; } } }; ConsCoordinator: PROC [] RETURNS [Handle] = { newTrans: Handle = zone.NEW[YggCoordinator.Object _ []]; TRUSTED {YggDummyProcess.SetTimeout[@newTrans.resultsReturned, timeoutForResultsReturned];}; <> YggCoordinatorInternal.NextTransID[newTrans]; --writes coordinatorBegin RETURN [newTrans] }; RegisterWorker: PUBLIC PROC [ conversation: Conversation, trans: TransID] RETURNS [YggTransMgr.RegisterWorkerResult] = { <> <> <> RegisterWorkerEntry: ENTRY PROC [c: Handle] RETURNS [YggTransMgr.RegisterWorkerResult] = { <> worker: YggImport.Handle; IF c.state # active THEN RETURN [transNotActive]; worker _ YggImport.Register[YggClientMap.GetName[conversation]]; FOR l: YggCoordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO IF worker.Equal[l.first.worker] THEN RETURN [duplicateCall]; ENDLOOP; ConsWorker[c, worker]; RETURN [ok]; };--RegisterWorkerEntry c: Handle = YggCoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN [transNotActive]; RETURN [RegisterWorkerEntry[c]]; }; ConsWorker: PROC [c: Handle, w: YggImport.Handle] = { l: YggCoordinator.WorkerHandle _ zone.CONS[[worker: w], c.workers]; c.workers _ l; LogCoordinatorRegisterWorker[c, w]; }; CreateWithWorkers: INTERNAL PROC [c: Handle] RETURNS [Handle] = { <> newTrans: Handle = ConsCoordinator[]; FOR l: YggCoordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO ConsWorker[newTrans, l.first.worker]; ENDLOOP; RETURN [newTrans] }; Check: PUBLIC PROC [ transID: YggTransaction.TransID] RETURNS [outcome: YggTransaction.Outcome] = { c: Handle = YggCoordinatorMap.GetHandle[trans: transID]; outcome _ c.outcome; }; Finish: PUBLIC PROC [ conversation: YggDummyRPC.Conversation, transID: YggTransaction.TransID, requestedOutcome: YggTransaction.RequestedOutcome, continue: BOOL] RETURNS [outcome: YggTransaction.Outcome, newTrans: YggTransaction.TransID] = { <> <> <<>> newTransCoordinator: Handle; c: Handle = YggCoordinatorMap.GetHandle[trans: transID]; reason: AbortReason; -- reason for aborting the transaction owner: Rope.ROPE; -- owner of the to-be-finished transaction IF c = nullHandle THEN RETURN [unknown, nullTransID]; owner _ c.extras.userRName; IF requestedOutcome = abort THEN continue _ FALSE; <> IF 2*YggLogBasic.LogUsage[] > YggLogBasic.LogFileSize[] THEN continue _ FALSE; [outcome, newTransCoordinator, reason] _ FinishEntry[c, requestedOutcome, continue]; <> IF continue AND outcome = abort AND newTransCoordinator # nullHandle THEN { [] _ FinishEntry[newTransCoordinator, abort, --continue:-- FALSE]; newTransCoordinator _ nullHandle }; <> IF newTransCoordinator # nullHandle THEN { logProc: PROC [YggMonitoringLog.TransactionBeginInfo]; newTransCoordinator.extras _ NEW[YggCoordinator.Info]; newTransCoordinator.extras.userRName _ owner; IF (logProc _ YggMonitoringLog.notice.beginTransaction) # NIL THEN logProc[[ transID: newTransCoordinator.transID, where: "CoordinatorImpl.Finish", message: "" ]]; }; SELECT outcome FROM abort => { logProc: PROC [YggMonitoringLog.TransactionAbortInfo]; IF (logProc _ YggMonitoringLog.notice.abortTransaction) # NIL THEN logProc[[ transID: c.transID, where: "CoordinatorImpl.Finish", why: reason, message: Rope.Concat["Owner is ", owner] ]]; }; commit => { logProc: PROC [YggMonitoringLog.TransactionCommitInfo]; IF (logProc _ YggMonitoringLog.notice.commitTransaction) # NIL THEN logProc[[ transID: c.transID, where: "CoordinatorImpl.Finish", message: Rope.Concat["Owner is ", owner] ]]; <<(the owner name is given as a message because the transaction has already been unregistered, so the YggMonitoringLog routine can't get at the handle)>> }; ENDCASE => NULL; RETURN [outcome, IF newTransCoordinator = nullHandle THEN nullTransID ELSE newTransCoordinator.transID] }; FinishEntry: ENTRY PROC [ c: Handle, requestedOutcome: YggTransaction.RequestedOutcome, continue: BOOL] RETURNS [YggTransaction.Outcome, Handle, AbortReason] = { <> newTransCoordinator: Handle _ nullHandle; newTransID: TransID _ nullTransID; reason: AbortReason _ didntAbort; { IF c.finishInProgress THEN { <> WHILE c.finishInProgress DO WAIT finishComplete; ENDLOOP; RETURN [unknown, nullHandle, unknown]; }; SELECT c.state FROM active => NULL; <> collecting, completing => NULL; <> complete => RETURN [unknown, nullHandle, unknown]; <> ENDCASE => ERROR; c.finishInProgress _ TRUE; IF requestedOutcome = abort THEN { reason _ requested; c.outcome _ abort; }; IF continue THEN { newTransCoordinator _ CreateWithWorkers[c]; newTransID _ newTransCoordinator.transID; }; <> IF c.state = active THEN { YggLog.Force[followingRecord: c.forceRecord]; c.state _ collecting; }; <> DO <> noneActive: BOOL _ TRUE; allComplete: BOOL _ TRUE; <> FOR w: YggCoordinator.WorkerHandle _ c.workers, w.rest UNTIL w = NIL DO WITH w.first.resultsOfMostRecentCall SELECT FROM n: Results.none => { }; p: Results.prepare => TRUSTED { w.first.resultsOfMostRecentCall _ [none, none[]]; w.first.lastPrepareResult _ p; SELECT p.communicationError FROM none => { w.first.communicationTrouble _ FALSE; SELECT p.prepareResult FROM notReady => { --vote no-- w.first.state _ complete; IF c.outcome = commit THEN ERROR; reason _ workerNotReady; c.outcome _ abort; }; readOnlyReady => { w.first.state _ complete }; ready => { w.first.state _ ready; c.aWorkerBecameReady _ TRUE; }; ENDCASE => ERROR; }; bindingFailed, callFailed, busy => { --vote no-- IF c.outcome = commit THEN ERROR; reason _ commError; c.outcome _ abort; w.first.communicationTrouble _ TRUE; w.first.timeForNextCall _ BasicTime.Update[ base: BasicTime.Now[], period: secondsWaitAfterCommFailure]; }; ENDCASE => ERROR; }; f: Results.finish => TRUSTED { w.first.resultsOfMostRecentCall _ [none, none[]]; w.first.lastFinishResult _ f; SELECT f.communicationError FROM none => { w.first.communicationTrouble _ FALSE; w.first.state _ complete; }; bindingFailed, callFailed, busy => { w.first.communicationTrouble _ TRUE; w.first.timeForNextCall _ BasicTime.Update[ base: BasicTime.Now[], period: secondsWaitAfterCommFailure]; }; ENDCASE => ERROR; }; ENDCASE => ERROR; IF w.first.state = active THEN noneActive _ FALSE; IF w.first.state # complete THEN allComplete _ FALSE; ENDLOOP; <> IF c.state = collecting AND (c.outcome = abort OR noneActive) THEN { c.state _ completing; IF c.outcome = unknown THEN c.outcome _ commit; LogCoordinatorCompleting[c: c, outcome: c.outcome, force: c.outcome = abort OR c.aWorkerBecameReady]; }; IF c.state = completing AND allComplete THEN { c.state _ complete; LogCoordinatorComplete[c]; GOTO Done; }; <> FOR w: YggCoordinator.WorkerHandle _ c.workers, w.rest UNTIL w = NIL DO IF w.first.state # complete AND w.first.callInProgress = none AND (NOT w.first.communicationTrouble OR IsTimeForNextCall[w]) THEN { IF c.state = collecting AND w.first.state = active THEN { YggCoordinatorInternal.PassParms[[prepare, c, w, newTransID, commit--ignored--]]; w.first.callInProgress _ prepare; } ELSE IF c.state = completing THEN { YggCoordinatorInternal.PassParms[[finish, c, w, nullTransID--ignored--, IF c.outcome = commit THEN commit ELSE abort]]; w.first.callInProgress _ finish; }; }; ENDLOOP; WAIT c.resultsReturned; <> ENDLOOP;-- monster DO EXITS Done => { c.finishInProgress _ FALSE; YggCoordinatorMap.Unregister[c]; IF newTransCoordinator # nullHandle THEN YggCoordinatorMap.Register[newTransCoordinator]; RETURN [c.outcome, newTransCoordinator, reason] } }};--FinishEntry finishComplete: CONDITION; IsTimeForNextCall: PROC [w: YggCoordinator.WorkerHandle] RETURNS [BOOL] = INLINE { RETURN [BasicTime.Period[from: w.first.timeForNextCall, to: BasicTime.Now[]] >= 0] }; LogCoordinatorRegisterWorker: PROC [ c: Handle, w: YggImport.Handle] = { followingRecord: YggLog.RecordID; rec: YggCoordinator.RegisterWorkerLogRep _ []; TRUSTED { ConvertUnsafe.AppendRope[to: LOOPHOLE[LONG[@rec.worker]], from: w.Name]; [followingRecord: followingRecord] _ YggLog.CoordinatorWrite[c, coordinatorRegisterWorker, [base: @rec, length: TEXT[rec.length].SIZE]]; }; IF NOT w.Equal[YggIdentity.myAlpineImportHandle] THEN c.forceRecord _ followingRecord; }; LogCoordinatorCompleting: PROC [c: Handle, outcome: YggEnvironment.CommitOrAbort, force: BOOL] = TRUSTED { rec: YggCoordinator.CompletingLogRep _ [outcome: outcome]; [followingRecord: c.forceRecord] _ YggLog.CoordinatorWrite[c, coordinatorCompleting, [base: @rec, length: YggCoordinator.CompletingLogRep.SIZE], force]; }; LogCoordinatorComplete: PROC [c: Handle] = { [followingRecord: c.forceRecord] _ YggLog.CoordinatorWrite[c, coordinatorComplete, YggLog.nullBlock]; }; AnalyzeCoordinatorBegin: PROC [record: RecordID, type: RecordType, trans: TransID] = { c: Handle = zone.NEW[YggCoordinator.Object _ [transID: trans, beginRecord: record]]; YggCoordinatorMap.Register[c]; YggCoordinatorInternal.NoticeCoordinatorBegin[trans]; }; AnalyzeCoordinatorRegisterWorker: PROC [ record: RecordID, type: RecordType, trans: TransID] = { worker: YggImport.Handle; { -- get worker name from log, map it into a YggImport.Handle. rec: YggCoordinator.RegisterWorkerLogRep _ []; status: YggLog.ReadProcStatus; TRUSTED {[status: status] _ YggLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: YggCoordinator.RegisterWorkerLogRep.SIZE]];}; IF status = destinationFull THEN ERROR InternalProgrammingError; TRUSTED {worker _ YggImport.Register[ server: ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.worker]]]]; }; }; { -- map trans into a coordinator handle, then add worker to volatile state c: Handle = YggCoordinatorMap.GetHandle[trans]; l: YggCoordinator.WorkerHandle; IF c = nullHandle THEN RETURN; IF c.state # active THEN ERROR; FOR l _ c.workers, l.rest UNTIL l = NIL DO IF worker.Equal[l.first.worker] THEN ERROR InternalProgrammingError; ENDLOOP; l _ zone.CONS[[worker: worker], c.workers]; c.workers _ l }; }; AnalyzeCoordinatorCompleting: PROC [ record: RecordID, type: RecordType, trans: TransID] = { outcome: YggEnvironment.CommitOrAbort; { -- get outcome from log rec: YggCoordinator.CompletingLogRep; status: YggLog.ReadProcStatus; TRUSTED {[status: status] _ YggLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: YggCoordinator.CompletingLogRep.SIZE]];}; IF status # normal THEN ERROR InternalProgrammingError; outcome _ rec.outcome; }; { c: Handle = YggCoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN; IF c.state # active THEN ERROR InternalProgrammingError; c.state _ completing; c.outcome _ outcome; IF c.outcome = commit THEN FOR l: YggCoordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO l.first.state _ ready ENDLOOP; }; }; AnalyzeCoordinatorComplete: PROC [ record: RecordID, type: RecordType, trans: TransID] = { c: Handle = YggCoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN; IF c.state # completing THEN ERROR InternalProgrammingError; c.state _ complete; YggCoordinatorMap.Unregister[c]; }; CallAfterAnalysisPass: PUBLIC PROC [] = { <> YggCoordinatorInternal.InitTransIDGenerator[YggIdentity.myLogVolumeID]; }; CallAfterUpdatePass: PUBLIC PROC [] = { <> EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = { requestedOutcome: YggTransaction.RequestedOutcome _ abort; SELECT c.state FROM complete => ERROR; active, collecting => NULL; completing => IF c.outcome = commit THEN requestedOutcome _ commit; ENDCASE => ERROR; TRUSTED {YggDummyProcess.Detach[FORK CoordinatorFinishProcess[c, requestedOutcome]];}; RETURN [stop: FALSE]; }; YggCoordinatorMap.LockedEnumerate[EnumProc]; }; CoordinatorFinishProcess: PROC [c: Handle, requestedOutcome: YggTransaction.RequestedOutcome] = { [] _ FinishEntry [c, requestedOutcome, FALSE]; }; CoordinatorCheckpointProc: PROC [] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { oldestBeginRecord: YggLog.RecordID _ YggLog.lastRecordID; EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = { oldestBeginRecord _ YggLogInline.Min[c.beginRecord, oldestBeginRecord]; RETURN [stop: FALSE]; }; YggCoordinatorMap.UnlockedEnumerate[EnumProc]; RETURN [oldestBeginRecord, oldestBeginRecord]; }; YggLogControl.RegisterAnalysisProc[coordinatorBegin, AnalyzeCoordinatorBegin]; YggLogControl.RegisterAnalysisProc[coordinatorRegisterWorker, AnalyzeCoordinatorRegisterWorker]; YggLogControl.RegisterAnalysisProc[coordinatorCompleting, AnalyzeCoordinatorCompleting]; YggLogControl.RegisterAnalysisProc[coordinatorComplete, AnalyzeCoordinatorComplete]; YggLogControl.RegisterCheckpointProc[CoordinatorCheckpointProc]; TRUSTED { YggDummyProcess.SetTimeout[@finishComplete, YggDummyProcess.SecondsToTicks[5]]; YggDummyProcess.EnableAborts[@finishComplete]; }; END. CHANGE LOG. <> <> <> <> <> <> <> <> <<(1) Let the YggMonitoringLog routine get the user RName from the transaction ID, where possible.>> <<(2) Remove the possible race condition in YggMonitoringLog probes by assigning the PROC to a temporary variable.>> <> <> <> <<>>