<> <> <> <<>> <> <> <> <> <> <> <> <> <> DIRECTORY AlpineEnvironment, AlpineIdentity USING [myLogVolumeID, myAlpineImportHandle, myFileStore], AlpineImport, AlpineInternal, AlpineTransaction, AlpineTransMgr, AlpineTransMgrRpcControl, BasicTime, ClientMap, ConcreteTransID, ConvertUnsafe, Coordinator, CoordinatorControl, CoordinatorExtras USING [Info], CoordinatorInternal, CoordinatorMap, AlpineLog, LogControl, LogInline, Process USING [Detach, EnableAborts, SetTimeout, SecondsToTicks, Ticks], Rope USING [Cat, Concat, ROPE], RPC USING [Conversation, GetCaller], SafeStorage, SkiPatrolLog USING [AbortReason, notice, TransactionAbortInfo, TransactionBeginInfo, TransactionCommitInfo, OpFailureInfo]; CoordinatorImpl: MONITOR LOCKS c USING c: Handle IMPORTS AlpineIdentity, AlpineImport, AlpineTransaction, BasicTime, ClientMap, ConvertUnsafe, CoordinatorInternal, CoordinatorMap, AlpineLog, LogControl, LogInline, Process, Rope, RPC, SafeStorage, SkiPatrolLog EXPORTS AlpineInternal, AlpineTransaction, AlpineTransMgr, CoordinatorControl = BEGIN Conversation: TYPE = AlpineEnvironment.Conversation; TransID: TYPE = AlpineEnvironment.TransID; nullTransID: TransID = AlpineEnvironment.nullTransID; RecordID: TYPE = AlpineLog.RecordID; RecordType: TYPE = AlpineLog.RecordType; Results: TYPE = Coordinator.Results; InternalProgrammingError: ERROR = CODE; Handle: TYPE = Coordinator.Handle; nullHandle: Handle = Coordinator.nullHandle; AbortReason: TYPE = SkiPatrolLog.AbortReason; CoordinatorObject: PUBLIC TYPE = Coordinator.Object; <> secondsWaitAfterCommFailure: INT = CoordinatorInternal.SecondsWaitAfterCommFailure; timeoutForResultsReturned: Process.Ticks = Process.SecondsToTicks[CoordinatorInternal.SecondsTimeoutForResultsReturned]; maxCoordinators: INT _ CoordinatorInternal.MaxCoordinators; zone: ZONE = SafeStorage.GetSystemZone[]; Create: PUBLIC PROC [conversation: Conversation, createLocalWorker: BOOL] RETURNS [transID: AlpineTransaction.TransID] = { <> <> <> c: Handle; message: Rope.ROPE; logBeginProc: PROC [SkiPatrolLog.TransactionBeginInfo]; -- (used to prevent race condition) {IF CoordinatorMap.Count[] >= maxCoordinators THEN { message _ "hit limit on coordinators"; GOTO serverBusy; }; c _ ConsCoordinator[]; c.extras _ NEW[CoordinatorExtras.Info]; IF (c.extras.userRName _ RPC.GetCaller[conversation]) = NIL THEN c.extras.userRName _ Rope.Cat["Local - ", ClientMap.GetName[conversation]]; CoordinatorMap.Register[c]; IF createLocalWorker THEN <> AlpineTransaction.CreateWorker[conversation, c.transID, AlpineIdentity.myFileStore ! AlpineTransaction.Unknown => { message _ SELECT what FROM transID => "worker didn't recognize transID", coordinator => "worker doesn't know who we are" ENDCASE => ERROR; GOTO serverBusy }; AlpineTransaction.OperationFailed[busy] => { message _ "RPC returned `busy'"; GOTO serverBusy } ]; IF (logBeginProc _ SkiPatrolLog.notice.beginTransaction) # NIL THEN logBeginProc[[ transID: c.transID, where: "CoordinatorImpl.Create", message: "" ]]; RETURN [c.transID]; EXITS serverBusy => { logProc: PROC [SkiPatrolLog.OpFailureInfo]; IF (logProc _ SkiPatrolLog.notice.operationFailed) # NIL THEN logProc[[ what: busy, where: "CoordinatorImpl.Create", message: message.Cat["; user = ", RPC.GetCaller[conversation]] ]]; ERROR AlpineTransaction.OperationFailed[why: busy]; } } }; ConsCoordinator: PROC [] RETURNS [Handle] = { newTrans: Handle = zone.NEW[Coordinator.Object _ []]; Process.SetTimeout[@newTrans.resultsReturned, timeoutForResultsReturned]; <> CoordinatorInternal.NextTransID[newTrans]; --writes coordinatorBegin RETURN [newTrans] }; RegisterWorker: PUBLIC PROC [ conversation: Conversation, trans: TransID] RETURNS [AlpineTransMgr.RegisterWorkerResult] = { <> <> <> RegisterWorkerEntry: ENTRY PROC [c: Handle] RETURNS [AlpineTransMgr.RegisterWorkerResult] = { <> worker: AlpineImport.Handle; IF c.state # active THEN RETURN [transNotActive]; worker _ AlpineImport.Register[ClientMap.GetName[conversation]]; FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO IF worker.Equal[l.first.worker] THEN RETURN [duplicateCall]; ENDLOOP; ConsWorker[c, worker]; RETURN [ok]; };--RegisterWorkerEntry c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN [transNotActive]; RETURN [RegisterWorkerEntry[c]]; }; ConsWorker: PROC [c: Handle, w: AlpineImport.Handle] = { l: Coordinator.WorkerHandle _ zone.CONS[[worker: w], c.workers]; c.workers _ l; LogCoordinatorRegisterWorker[c, w]; }; CreateWithWorkers: INTERNAL PROC [c: Handle] RETURNS [Handle] = { <> newTrans: Handle = ConsCoordinator[]; FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO ConsWorker[newTrans, l.first.worker]; ENDLOOP; RETURN [newTrans] }; Finish: PUBLIC PROC [ conversation: RPC.Conversation, transID: AlpineTransaction.TransID, requestedOutcome: AlpineTransaction.RequestedOutcome, continue: BOOL] RETURNS [outcome: AlpineTransaction.Outcome, newTrans: AlpineTransaction.TransID] = { <> <> <<>> newTransCoordinator: Handle; c: Handle = CoordinatorMap.GetHandle[trans: transID]; reason: AbortReason; -- reason for aborting the transaction owner: Rope.ROPE; -- owner of the to-be-finished transaction IF c = nullHandle THEN RETURN [unknown, nullTransID]; owner _ c.extras.userRName; IF requestedOutcome = abort THEN continue _ FALSE; [outcome, newTransCoordinator, reason] _ FinishEntry[c, requestedOutcome, continue]; <> IF continue AND outcome = abort AND newTransCoordinator # nullHandle THEN { [] _ FinishEntry[newTransCoordinator, abort, --continue:-- FALSE]; newTransCoordinator _ nullHandle }; <> IF newTransCoordinator # nullHandle THEN { logProc: PROC [SkiPatrolLog.TransactionBeginInfo]; newTransCoordinator.extras _ NEW[CoordinatorExtras.Info]; newTransCoordinator.extras.userRName _ owner; IF (logProc _ SkiPatrolLog.notice.beginTransaction) # NIL THEN logProc[[ transID: newTransCoordinator.transID, where: "CoordinatorImpl.Finish", message: "" ]]; }; SELECT outcome FROM abort => { logProc: PROC [SkiPatrolLog.TransactionAbortInfo]; IF (logProc _ SkiPatrolLog.notice.abortTransaction) # NIL THEN logProc[[ transID: c.transID, where: "CoordinatorImpl.Finish", why: reason, message: Rope.Concat["Owner is ", owner] ]]; }; commit => { logProc: PROC [SkiPatrolLog.TransactionCommitInfo]; IF (logProc _ SkiPatrolLog.notice.commitTransaction) # NIL THEN logProc[[ transID: c.transID, where: "CoordinatorImpl.Finish", message: Rope.Concat["Owner is ", owner] ]]; <<(the owner name is given as a message because the transaction has already been unregistered, so the SkiPatrolLog routine can't get at the handle)>> }; ENDCASE => NULL; RETURN [outcome, IF newTransCoordinator = nullHandle THEN nullTransID ELSE newTransCoordinator.transID] }; FinishEntry: ENTRY PROC [ c: Handle, requestedOutcome: AlpineTransaction.RequestedOutcome, continue: BOOL] RETURNS [AlpineTransaction.Outcome, Handle, AbortReason] = { <> newTransCoordinator: Handle _ nullHandle; newTransID: TransID _ nullTransID; reason: AbortReason _ didntAbort; { IF c.finishInProgress THEN { <> WHILE c.finishInProgress DO WAIT finishComplete; ENDLOOP; RETURN [unknown, nullHandle, unknown]; }; SELECT c.state FROM active => NULL; <> collecting, completing => NULL; <> complete => RETURN [unknown, nullHandle, unknown]; <> ENDCASE => ERROR; c.finishInProgress _ TRUE; IF requestedOutcome = abort THEN { reason _ requested; c.outcome _ abort; }; IF continue THEN { newTransCoordinator _ CreateWithWorkers[c]; newTransID _ newTransCoordinator.transID; }; <> IF c.state = active THEN { AlpineLog.Force[followingRecord: c.forceRecord]; c.state _ collecting; }; <> DO <> noneActive: BOOL _ TRUE; allComplete: BOOL _ TRUE; <> FOR w: Coordinator.WorkerHandle _ c.workers, w.rest UNTIL w = NIL DO WITH w.first.resultsOfMostRecentCall SELECT FROM n: Results.none => { }; p: Results.prepare => { w.first.resultsOfMostRecentCall _ [none, none[]]; w.first.lastPrepareResult _ p; SELECT p.communicationError FROM none => { w.first.communicationTrouble _ FALSE; SELECT p.prepareResult FROM notReady => { --vote no-- w.first.state _ complete; IF c.outcome = commit THEN ERROR; reason _ workerNotReady; c.outcome _ abort; }; readOnlyReady => { w.first.state _ complete }; ready => { w.first.state _ ready; c.aWorkerBecameReady _ TRUE; }; ENDCASE => ERROR; }; bindingFailed, callFailed, busy => { --vote no-- IF c.outcome = commit THEN ERROR; reason _ commError; c.outcome _ abort; w.first.communicationTrouble _ TRUE; w.first.timeForNextCall _ BasicTime.Update[ base: BasicTime.Now[], period: secondsWaitAfterCommFailure]; }; ENDCASE => ERROR; }; f: Results.finish => { w.first.resultsOfMostRecentCall _ [none, none[]]; w.first.lastFinishResult _ f; SELECT f.communicationError FROM none => { w.first.communicationTrouble _ FALSE; w.first.state _ complete; }; bindingFailed, callFailed, busy => { w.first.communicationTrouble _ TRUE; w.first.timeForNextCall _ BasicTime.Update[ base: BasicTime.Now[], period: secondsWaitAfterCommFailure]; }; ENDCASE => ERROR; }; ENDCASE => ERROR; IF w.first.state = active THEN noneActive _ FALSE; IF w.first.state # complete THEN allComplete _ FALSE; ENDLOOP; <> IF c.state = collecting AND (c.outcome = abort OR noneActive) THEN { c.state _ completing; IF c.outcome = unknown THEN c.outcome _ commit; LogCoordinatorCompleting[c: c, outcome: c.outcome, force: c.outcome = abort OR c.aWorkerBecameReady]; }; IF c.state = completing AND allComplete THEN { c.state _ complete; LogCoordinatorComplete[c]; GOTO Done; }; <> FOR w: Coordinator.WorkerHandle _ c.workers, w.rest UNTIL w = NIL DO IF w.first.state # complete AND w.first.callInProgress = none AND (NOT w.first.communicationTrouble OR IsTimeForNextCall[w]) THEN { IF c.state = collecting AND w.first.state = active THEN { CoordinatorInternal.PassParms[[prepare, c, w, newTransID, commit--ignored--]]; w.first.callInProgress _ prepare; } ELSE IF c.state = completing THEN { CoordinatorInternal.PassParms[[finish, c, w, nullTransID--ignored--, IF c.outcome = commit THEN commit ELSE abort]]; w.first.callInProgress _ finish; }; }; ENDLOOP; WAIT c.resultsReturned; <> ENDLOOP;-- monster DO EXITS Done => { c.finishInProgress _ FALSE; CoordinatorMap.Unregister[c]; IF newTransCoordinator # nullHandle THEN CoordinatorMap.Register[newTransCoordinator]; RETURN [c.outcome, newTransCoordinator, reason] } }};--FinishEntry finishComplete: CONDITION; IsTimeForNextCall: PROC [w: Coordinator.WorkerHandle] RETURNS [BOOL] = INLINE { RETURN [BasicTime.Period[from: w.first.timeForNextCall, to: BasicTime.Now[]] >= 0] }; LogCoordinatorRegisterWorker: PROC [ c: Handle, w: AlpineImport.Handle] = { followingRecord: AlpineLog.RecordID; rec: Coordinator.RegisterWorkerLogRep _ []; ConvertUnsafe.AppendRope[to: LOOPHOLE[LONG[@rec.worker]], from: w.Name]; [followingRecord: followingRecord] _ AlpineLog.CoordinatorWrite[c, coordinatorRegisterWorker, [base: @rec, length: TEXT[rec.length].SIZE]]; IF NOT w.Equal[AlpineIdentity.myAlpineImportHandle] THEN c.forceRecord _ followingRecord; }; LogCoordinatorCompleting: PROC [c: Handle, outcome: AlpineEnvironment.CommitOrAbort, force: BOOL] = { rec: Coordinator.CompletingLogRep _ [outcome: outcome]; [followingRecord: c.forceRecord] _ AlpineLog.CoordinatorWrite[c, coordinatorCompleting, [base: @rec, length: Coordinator.CompletingLogRep.SIZE], force]; }; LogCoordinatorComplete: PROC [c: Handle] = { [followingRecord: c.forceRecord] _ AlpineLog.CoordinatorWrite[c, coordinatorComplete, AlpineLog.nullBlock]; }; AnalyzeCoordinatorBegin: PROC [record: RecordID, type: RecordType, trans: TransID] = { c: Handle = zone.NEW[Coordinator.Object _ [transID: trans, beginRecord: record]]; CoordinatorMap.Register[c]; CoordinatorInternal.NoticeCoordinatorBegin[trans]; }; AnalyzeCoordinatorRegisterWorker: PROC [ record: RecordID, type: RecordType, trans: TransID] = { worker: AlpineImport.Handle; { -- get worker name from log, map it into a AlpineImport.Handle. rec: Coordinator.RegisterWorkerLogRep _ []; status: AlpineLog.ReadProcStatus; [status: status] _ AlpineLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Coordinator.RegisterWorkerLogRep.SIZE]]; IF status = destinationFull THEN ERROR InternalProgrammingError; worker _ AlpineImport.Register[ server: ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.worker]]]]; }; { -- map trans into a coordinator handle, then add worker to volatile state c: Handle = CoordinatorMap.GetHandle[trans]; l: Coordinator.WorkerHandle; IF c = nullHandle THEN RETURN; IF c.state # active THEN ERROR; FOR l _ c.workers, l.rest UNTIL l = NIL DO IF worker.Equal[l.first.worker] THEN ERROR InternalProgrammingError; ENDLOOP; l _ zone.CONS[[worker: worker], c.workers]; c.workers _ l }; }; AnalyzeCoordinatorCompleting: PROC [ record: RecordID, type: RecordType, trans: TransID] = { outcome: AlpineEnvironment.CommitOrAbort; { -- get outcome from log rec: Coordinator.CompletingLogRep; status: AlpineLog.ReadProcStatus; [status: status] _ AlpineLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Coordinator.CompletingLogRep.SIZE]]; IF status # normal THEN ERROR InternalProgrammingError; outcome _ rec.outcome; }; { c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN; IF c.state # active THEN ERROR InternalProgrammingError; c.state _ completing; c.outcome _ outcome; IF c.outcome = commit THEN FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO l.first.state _ ready ENDLOOP; }; }; AnalyzeCoordinatorComplete: PROC [ record: RecordID, type: RecordType, trans: TransID] = { c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN; IF c.state # completing THEN ERROR InternalProgrammingError; c.state _ complete; CoordinatorMap.Unregister[c]; }; CallAfterAnalysisPass: PUBLIC PROC [] = { <> CoordinatorInternal.InitTransIDGenerator[AlpineIdentity.myLogVolumeID]; }; CallAfterUpdatePass: PUBLIC PROC [] = { <> EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = { requestedOutcome: AlpineTransaction.RequestedOutcome _ abort; SELECT c.state FROM complete => ERROR; active, collecting => NULL; completing => IF c.outcome = commit THEN requestedOutcome _ commit; ENDCASE => ERROR; Process.Detach[FORK CoordinatorFinishProcess[c, requestedOutcome]]; RETURN [stop: FALSE]; }; CoordinatorMap.LockedEnumerate[EnumProc]; }; CoordinatorFinishProcess: PROC [c: Handle, requestedOutcome: AlpineTransaction.RequestedOutcome] = { [] _ FinishEntry [c, requestedOutcome, FALSE]; }; CoordinatorCheckpointProc: PROC [] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { oldestBeginRecord: AlpineLog.RecordID _ AlpineLog.lastRecordID; EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = { oldestBeginRecord _ LogInline.Min[c.beginRecord, oldestBeginRecord]; RETURN [stop: FALSE]; }; CoordinatorMap.UnlockedEnumerate[EnumProc]; RETURN [oldestBeginRecord, oldestBeginRecord]; }; LogControl.RegisterAnalysisProc[coordinatorBegin, AnalyzeCoordinatorBegin]; LogControl.RegisterAnalysisProc[coordinatorRegisterWorker, AnalyzeCoordinatorRegisterWorker]; LogControl.RegisterAnalysisProc[coordinatorCompleting, AnalyzeCoordinatorCompleting]; LogControl.RegisterAnalysisProc[coordinatorComplete, AnalyzeCoordinatorComplete]; LogControl.RegisterCheckpointProc[CoordinatorCheckpointProc]; Process.SetTimeout[@finishComplete, Process.SecondsToTicks[5]]; Process.EnableAborts[@finishComplete]; END. CHANGE LOG. <> <> <> <> <> <> <> <> <<(1) Let the SkiPatrolLog routine get the user RName from the transaction ID, where possible.>> <<(2) Remove the possible race condition in SkiPatrolLog probes by assigning the PROC to a temporary variable.>> <> <> <> <<>>