DIRECTORY AlpineEnvironment, AlpineIdentity USING [myLogVolumeID, myAlpineImportHandle, myFileStore], AlpineImport, AlpineInternal, AlpineTransaction, AlpineTransMgr, AlpineTransMgrRpcControl, BasicTime, ClientMap, ConcreteTransID, ConvertUnsafe, Coordinator, CoordinatorControl, CoordinatorExtras USING [Info], CoordinatorInternal, CoordinatorMap, AlpineLog, LogControl, LogInline, Process USING [Detach, EnableAborts, SetTimeout, SecondsToTicks, Ticks], Rope USING [Cat, Concat, ROPE], RPC USING [Conversation, GetCaller], SafeStorage, SkiPatrolLog USING [AbortReason, notice, TransactionAbortInfo, TransactionBeginInfo, TransactionCommitInfo, OpFailureInfo]; CoordinatorImpl: MONITOR LOCKS c USING c: Handle IMPORTS AlpineIdentity, AlpineImport, AlpineTransaction, BasicTime, ClientMap, ConvertUnsafe, CoordinatorInternal, CoordinatorMap, AlpineLog, LogControl, LogInline, Process, Rope, RPC, SafeStorage, SkiPatrolLog EXPORTS AlpineInternal, AlpineTransaction, AlpineTransMgr, CoordinatorControl = BEGIN Conversation: TYPE = AlpineEnvironment.Conversation; TransID: TYPE = AlpineEnvironment.TransID; nullTransID: TransID = AlpineEnvironment.nullTransID; RecordID: TYPE = AlpineLog.RecordID; RecordType: TYPE = AlpineLog.RecordType; Results: TYPE = Coordinator.Results; InternalProgrammingError: ERROR = CODE; Handle: TYPE = Coordinator.Handle; nullHandle: Handle = Coordinator.nullHandle; AbortReason: TYPE = SkiPatrolLog.AbortReason; CoordinatorObject: PUBLIC TYPE = Coordinator.Object; secondsWaitAfterCommFailure: INT = CoordinatorInternal.SecondsWaitAfterCommFailure; timeoutForResultsReturned: Process.Ticks = Process.SecondsToTicks[CoordinatorInternal.SecondsTimeoutForResultsReturned]; maxCoordinators: INT _ CoordinatorInternal.MaxCoordinators; zone: ZONE = SafeStorage.GetSystemZone[]; Create: PUBLIC PROC [conversation: Conversation, createLocalWorker: BOOL] RETURNS [transID: AlpineTransaction.TransID] = { c: Handle; message: Rope.ROPE; logBeginProc: PROC [SkiPatrolLog.TransactionBeginInfo]; -- (used to prevent race condition) {IF CoordinatorMap.Count[] >= maxCoordinators THEN { message _ "hit limit on coordinators"; GOTO serverBusy; }; c _ ConsCoordinator[]; c.extras _ NEW[CoordinatorExtras.Info]; IF (c.extras.userRName _ RPC.GetCaller[conversation]) = NIL THEN c.extras.userRName _ Rope.Cat["Local - ", ClientMap.GetName[conversation]]; CoordinatorMap.Register[c]; IF createLocalWorker THEN AlpineTransaction.CreateWorker[conversation, c.transID, AlpineIdentity.myFileStore ! AlpineTransaction.Unknown => { message _ SELECT what FROM transID => "worker didn't recognize transID", coordinator => "worker doesn't know who we are" ENDCASE => ERROR; GOTO serverBusy }; AlpineTransaction.OperationFailed[busy] => { message _ "RPC returned `busy'"; GOTO serverBusy } ]; IF (logBeginProc _ SkiPatrolLog.notice.beginTransaction) # NIL THEN logBeginProc[[ transID: c.transID, where: "CoordinatorImpl.Create", message: "" ]]; RETURN [c.transID]; EXITS serverBusy => { logProc: PROC [SkiPatrolLog.OpFailureInfo]; IF (logProc _ SkiPatrolLog.notice.operationFailed) # NIL THEN logProc[[ what: busy, where: "CoordinatorImpl.Create", message: message.Cat["; user = ", RPC.GetCaller[conversation]] ]]; ERROR AlpineTransaction.OperationFailed[why: busy]; } } }; ConsCoordinator: PROC [] RETURNS [Handle] = { newTrans: Handle = zone.NEW[Coordinator.Object _ []]; Process.SetTimeout[@newTrans.resultsReturned, timeoutForResultsReturned]; CoordinatorInternal.NextTransID[newTrans]; --writes coordinatorBegin RETURN [newTrans] }; RegisterWorker: PUBLIC PROC [ conversation: Conversation, trans: TransID] RETURNS [AlpineTransMgr.RegisterWorkerResult] = { RegisterWorkerEntry: ENTRY PROC [c: Handle] RETURNS [AlpineTransMgr.RegisterWorkerResult] = { worker: AlpineImport.Handle; IF c.state # active THEN RETURN [transNotActive]; worker _ AlpineImport.Register[ClientMap.GetName[conversation]]; FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO IF worker.Equal[l.first.worker] THEN RETURN [duplicateCall]; ENDLOOP; ConsWorker[c, worker]; RETURN [ok]; };--RegisterWorkerEntry c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN [transNotActive]; RETURN [RegisterWorkerEntry[c]]; }; ConsWorker: PROC [c: Handle, w: AlpineImport.Handle] = { l: Coordinator.WorkerHandle _ zone.CONS[[worker: w], c.workers]; c.workers _ l; LogCoordinatorRegisterWorker[c, w]; }; CreateWithWorkers: INTERNAL PROC [c: Handle] RETURNS [Handle] = { newTrans: Handle = ConsCoordinator[]; FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO ConsWorker[newTrans, l.first.worker]; ENDLOOP; RETURN [newTrans] }; Finish: PUBLIC PROC [ conversation: RPC.Conversation, transID: AlpineTransaction.TransID, requestedOutcome: AlpineTransaction.RequestedOutcome, continue: BOOL] RETURNS [outcome: AlpineTransaction.Outcome, newTrans: AlpineTransaction.TransID] = { newTransCoordinator: Handle; c: Handle = CoordinatorMap.GetHandle[trans: transID]; reason: AbortReason; -- reason for aborting the transaction owner: Rope.ROPE; -- owner of the to-be-finished transaction IF c = nullHandle THEN RETURN [unknown, nullTransID]; owner _ c.extras.userRName; IF requestedOutcome = abort THEN continue _ FALSE; [outcome, newTransCoordinator, reason] _ FinishEntry[c, requestedOutcome, continue]; IF continue AND outcome = abort AND newTransCoordinator # nullHandle THEN { [] _ FinishEntry[newTransCoordinator, abort, --continue:-- FALSE]; newTransCoordinator _ nullHandle }; IF newTransCoordinator # nullHandle THEN { logProc: PROC [SkiPatrolLog.TransactionBeginInfo]; newTransCoordinator.extras _ NEW[CoordinatorExtras.Info]; newTransCoordinator.extras.userRName _ owner; IF (logProc _ SkiPatrolLog.notice.beginTransaction) # NIL THEN logProc[[ transID: newTransCoordinator.transID, where: "CoordinatorImpl.Finish", message: "" ]]; }; SELECT outcome FROM abort => { logProc: PROC [SkiPatrolLog.TransactionAbortInfo]; IF (logProc _ SkiPatrolLog.notice.abortTransaction) # NIL THEN logProc[[ transID: c.transID, where: "CoordinatorImpl.Finish", why: reason, message: Rope.Concat["Owner is ", owner] ]]; }; commit => { logProc: PROC [SkiPatrolLog.TransactionCommitInfo]; IF (logProc _ SkiPatrolLog.notice.commitTransaction) # NIL THEN logProc[[ transID: c.transID, where: "CoordinatorImpl.Finish", message: Rope.Concat["Owner is ", owner] ]]; }; ENDCASE => NULL; RETURN [outcome, IF newTransCoordinator = nullHandle THEN nullTransID ELSE newTransCoordinator.transID] }; FinishEntry: ENTRY PROC [ c: Handle, requestedOutcome: AlpineTransaction.RequestedOutcome, continue: BOOL] RETURNS [AlpineTransaction.Outcome, Handle, AbortReason] = { newTransCoordinator: Handle _ nullHandle; newTransID: TransID _ nullTransID; reason: AbortReason _ didntAbort; { IF c.finishInProgress THEN { WHILE c.finishInProgress DO WAIT finishComplete; ENDLOOP; RETURN [unknown, nullHandle, unknown]; }; SELECT c.state FROM active => NULL; collecting, completing => NULL; complete => RETURN [unknown, nullHandle, unknown]; ENDCASE => ERROR; c.finishInProgress _ TRUE; IF requestedOutcome = abort THEN { reason _ requested; c.outcome _ abort; }; IF continue THEN { newTransCoordinator _ CreateWithWorkers[c]; newTransID _ newTransCoordinator.transID; }; IF c.state = active THEN { AlpineLog.Force[followingRecord: c.forceRecord]; c.state _ collecting; }; DO noneActive: BOOL _ TRUE; allComplete: BOOL _ TRUE; FOR w: Coordinator.WorkerHandle _ c.workers, w.rest UNTIL w = NIL DO WITH w.first.resultsOfMostRecentCall SELECT FROM n: Results.none => { }; p: Results.prepare => { w.first.resultsOfMostRecentCall _ [none, none[]]; w.first.lastPrepareResult _ p; SELECT p.communicationError FROM none => { w.first.communicationTrouble _ FALSE; SELECT p.prepareResult FROM notReady => { --vote no-- w.first.state _ complete; IF c.outcome = commit THEN ERROR; reason _ workerNotReady; c.outcome _ abort; }; readOnlyReady => { w.first.state _ complete }; ready => { w.first.state _ ready; c.aWorkerBecameReady _ TRUE; }; ENDCASE => ERROR; }; bindingFailed, callFailed, busy => { --vote no-- IF c.outcome = commit THEN ERROR; reason _ commError; c.outcome _ abort; w.first.communicationTrouble _ TRUE; w.first.timeForNextCall _ BasicTime.Update[ base: BasicTime.Now[], period: secondsWaitAfterCommFailure]; }; ENDCASE => ERROR; }; f: Results.finish => { w.first.resultsOfMostRecentCall _ [none, none[]]; w.first.lastFinishResult _ f; SELECT f.communicationError FROM none => { w.first.communicationTrouble _ FALSE; w.first.state _ complete; }; bindingFailed, callFailed, busy => { w.first.communicationTrouble _ TRUE; w.first.timeForNextCall _ BasicTime.Update[ base: BasicTime.Now[], period: secondsWaitAfterCommFailure]; }; ENDCASE => ERROR; }; ENDCASE => ERROR; IF w.first.state = active THEN noneActive _ FALSE; IF w.first.state # complete THEN allComplete _ FALSE; ENDLOOP; IF c.state = collecting AND (c.outcome = abort OR noneActive) THEN { c.state _ completing; IF c.outcome = unknown THEN c.outcome _ commit; LogCoordinatorCompleting[c: c, outcome: c.outcome, force: c.outcome = abort OR c.aWorkerBecameReady]; }; IF c.state = completing AND allComplete THEN { c.state _ complete; LogCoordinatorComplete[c]; GOTO Done; }; FOR w: Coordinator.WorkerHandle _ c.workers, w.rest UNTIL w = NIL DO IF w.first.state # complete AND w.first.callInProgress = none AND (NOT w.first.communicationTrouble OR IsTimeForNextCall[w]) THEN { IF c.state = collecting AND w.first.state = active THEN { CoordinatorInternal.PassParms[[prepare, c, w, newTransID, commit--ignored--]]; w.first.callInProgress _ prepare; } ELSE IF c.state = completing THEN { CoordinatorInternal.PassParms[[finish, c, w, nullTransID--ignored--, IF c.outcome = commit THEN commit ELSE abort]]; w.first.callInProgress _ finish; }; }; ENDLOOP; WAIT c.resultsReturned; ENDLOOP;-- monster DO EXITS Done => { c.finishInProgress _ FALSE; CoordinatorMap.Unregister[c]; IF newTransCoordinator # nullHandle THEN CoordinatorMap.Register[newTransCoordinator]; RETURN [c.outcome, newTransCoordinator, reason] } }};--FinishEntry finishComplete: CONDITION; IsTimeForNextCall: PROC [w: Coordinator.WorkerHandle] RETURNS [BOOL] = INLINE { RETURN [BasicTime.Period[from: w.first.timeForNextCall, to: BasicTime.Now[]] >= 0] }; LogCoordinatorRegisterWorker: PROC [ c: Handle, w: AlpineImport.Handle] = { followingRecord: AlpineLog.RecordID; rec: Coordinator.RegisterWorkerLogRep _ []; ConvertUnsafe.AppendRope[to: LOOPHOLE[LONG[@rec.worker]], from: w.Name]; [followingRecord: followingRecord] _ AlpineLog.CoordinatorWrite[c, coordinatorRegisterWorker, [base: @rec, length: TEXT[rec.length].SIZE]]; IF NOT w.Equal[AlpineIdentity.myAlpineImportHandle] THEN c.forceRecord _ followingRecord; }; LogCoordinatorCompleting: PROC [c: Handle, outcome: AlpineEnvironment.CommitOrAbort, force: BOOL] = { rec: Coordinator.CompletingLogRep _ [outcome: outcome]; [followingRecord: c.forceRecord] _ AlpineLog.CoordinatorWrite[c, coordinatorCompleting, [base: @rec, length: Coordinator.CompletingLogRep.SIZE], force]; }; LogCoordinatorComplete: PROC [c: Handle] = { [followingRecord: c.forceRecord] _ AlpineLog.CoordinatorWrite[c, coordinatorComplete, AlpineLog.nullBlock]; }; AnalyzeCoordinatorBegin: PROC [record: RecordID, type: RecordType, trans: TransID] = { c: Handle = zone.NEW[Coordinator.Object _ [transID: trans, beginRecord: record]]; CoordinatorMap.Register[c]; CoordinatorInternal.NoticeCoordinatorBegin[trans]; }; AnalyzeCoordinatorRegisterWorker: PROC [ record: RecordID, type: RecordType, trans: TransID] = { worker: AlpineImport.Handle; { -- get worker name from log, map it into a AlpineImport.Handle. rec: Coordinator.RegisterWorkerLogRep _ []; status: AlpineLog.ReadProcStatus; [status: status] _ AlpineLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Coordinator.RegisterWorkerLogRep.SIZE]]; IF status = destinationFull THEN ERROR InternalProgrammingError; worker _ AlpineImport.Register[ server: ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.worker]]]]; }; { -- map trans into a coordinator handle, then add worker to volatile state c: Handle = CoordinatorMap.GetHandle[trans]; l: Coordinator.WorkerHandle; IF c = nullHandle THEN RETURN; IF c.state # active THEN ERROR; FOR l _ c.workers, l.rest UNTIL l = NIL DO IF worker.Equal[l.first.worker] THEN ERROR InternalProgrammingError; ENDLOOP; l _ zone.CONS[[worker: worker], c.workers]; c.workers _ l }; }; AnalyzeCoordinatorCompleting: PROC [ record: RecordID, type: RecordType, trans: TransID] = { outcome: AlpineEnvironment.CommitOrAbort; { -- get outcome from log rec: Coordinator.CompletingLogRep; status: AlpineLog.ReadProcStatus; [status: status] _ AlpineLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Coordinator.CompletingLogRep.SIZE]]; IF status # normal THEN ERROR InternalProgrammingError; outcome _ rec.outcome; }; { c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN; IF c.state # active THEN ERROR InternalProgrammingError; c.state _ completing; c.outcome _ outcome; IF c.outcome = commit THEN FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO l.first.state _ ready ENDLOOP; }; }; AnalyzeCoordinatorComplete: PROC [ record: RecordID, type: RecordType, trans: TransID] = { c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN; IF c.state # completing THEN ERROR InternalProgrammingError; c.state _ complete; CoordinatorMap.Unregister[c]; }; CallAfterAnalysisPass: PUBLIC PROC [] = { CoordinatorInternal.InitTransIDGenerator[AlpineIdentity.myLogVolumeID]; }; CallAfterUpdatePass: PUBLIC PROC [] = { EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = { requestedOutcome: AlpineTransaction.RequestedOutcome _ abort; SELECT c.state FROM complete => ERROR; active, collecting => NULL; completing => IF c.outcome = commit THEN requestedOutcome _ commit; ENDCASE => ERROR; Process.Detach[FORK CoordinatorFinishProcess[c, requestedOutcome]]; RETURN [stop: FALSE]; }; CoordinatorMap.LockedEnumerate[EnumProc]; }; CoordinatorFinishProcess: PROC [c: Handle, requestedOutcome: AlpineTransaction.RequestedOutcome] = { [] _ FinishEntry [c, requestedOutcome, FALSE]; }; CoordinatorCheckpointProc: PROC [] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { oldestBeginRecord: AlpineLog.RecordID _ AlpineLog.lastRecordID; EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = { oldestBeginRecord _ LogInline.Min[c.beginRecord, oldestBeginRecord]; RETURN [stop: FALSE]; }; CoordinatorMap.UnlockedEnumerate[EnumProc]; RETURN [oldestBeginRecord, oldestBeginRecord]; }; LogControl.RegisterAnalysisProc[coordinatorBegin, AnalyzeCoordinatorBegin]; LogControl.RegisterAnalysisProc[coordinatorRegisterWorker, AnalyzeCoordinatorRegisterWorker]; LogControl.RegisterAnalysisProc[coordinatorCompleting, AnalyzeCoordinatorCompleting]; LogControl.RegisterAnalysisProc[coordinatorComplete, AnalyzeCoordinatorComplete]; LogControl.RegisterCheckpointProc[CoordinatorCheckpointProc]; Process.SetTimeout[@finishComplete, Process.SecondsToTicks[5]]; Process.EnableAborts[@finishComplete]; END. CHANGE LOG. $CoordinatorImpl.mesa Copyright c 1984 by Xerox Corporation. All rights reserved. Carl Hauser, October 4, 1985 1:24:49 pm PDT Last edited by Taft on 1-Feb-82 13:16:03 Kolling on July 13, 1983 3:27 pm MBrown on January 30, 1984 12:08:04 pm PST Last Edited by: Kupfer, August 6, 1984 3:05:48 pm PDT NOTES: FORK CoordinatorFinishProcess[c, requestedOutcome] is not controlled (might try to fork too many) and not synchronized (outside call to Finish might get in). Simultaneous Finish calls should work better (now, second caller gets result = unknown). The mechanism for limiting the number of coordinators is quick and dirty, should be monitored in a different way. AlpineInternal.CoordinatorObject ! AlpineTransaction.OperationFailed[why: busy]; AlpineTransaction.Create. Called by any Alpine client. As an agent of the client who called us, create a worker. Under the current implementation (coordinator and worker always on the same machine), we really oughta know our own RName and the current transaction ID. However, in the glorious future, it is possible that the (remote) server won't know about the transaction yet (or that the server crashed), and it's possible that Grapevine is too busy to tell the server our RName. So if Unknown is raised, convert it to a server-busy error, but record what went wrong via SkiPatrolLog. Process.EnableAborts[@newTrans.resultsReturned]; ! (none); AlpineTransMgr.RegisterWorker. The caller is a worker (identity obtainable via "conversation") with no monitors locked. c # nullHandle. Creates a new transaction, and simulates RegisterWorker for each worker of existing transaction c. Returns the new transaction, without registering it. ! (none) (but may return "unknown" as a result.) AlpineTransaction.Finish If we asked to continue the transaction but it aborted on us, we have to clean up (well, I think that's what happens next). I guess this means that logged transaction starts might occasionally skip transaction ID's. If we really did create a new transaction, then record the user name in the coordinator object and note the new transaction (if the probe is enabled). (the owner name is given as a message because the transaction has already been unregistered, so the SkiPatrolLog routine can't get at the handle) ! (none); wait for finish to complete, then return unknown normal-- FinishEntry has been called from recovery-- FinishEntry has been called from two nearly simultaneous calls to Finish-- General two-phase commit protocol. c.state IN [collecting .. completing] Try to make progress toward c.state = complete. Get results from previous calls, if any. Make state transitions if possible. Issue new calls. wake up when a result is returned or by timeout CoordinatorControl.CallAfterAnalysisPass CoordinatorControl.CallAfterUpdatePass Edited June 1984, by Kupfer changes to: Create: record the RName of the user creating the transaction. Add event logging for transaction starts. Edited on July 1, 1984 4:11:19 pm PDT, by Kupfer Be sure to event-log transactions which are created by Finish[]ing old one. Also, remember to record the RName for transactions that are created that way. Edited on July 25, 1984 9:22:20 am PDT, by Kupfer Install new SkiPatrolLog probes and reflect the new version of SkiPatrolLog. Also, be more rigorous about the possible ERRORs that CreateWorker might return. Also, make maxCoordinators really refer to CoordinatorInternal. changes to: CoordinatorImpl, Create, FinishEntry, Finish, AbortReason Edited on August 6, 1984 3:04:59 pm PDT, by Kupfer (1) Let the SkiPatrolLog routine get the user RName from the transaction ID, where possible. (2) Remove the possible race condition in SkiPatrolLog probes by assigning the PROC to a temporary variable. changes to: Create, Finish, CoordinatorImpl Carl Hauser, October 4, 1985 1:24:32 pm PDT Change "Log" to "AlpineLog" ΚQ˜Icodešœ™šœ Οmœ1™K˜——Kšžœ.˜3K˜—K˜—K˜K˜—šŸœžœžœ ˜-Kšœžœ˜5K˜IKšœ0™0Kšœ+ ˜DKšžœ˜K˜—šŸœžœžœ˜K˜,Kšžœ*˜1Kšœ ™ Kšœ™KšœX™XšŸœžœžœ ˜+Kšžœ*˜1Kšœ™K˜Kšžœžœžœ˜1K˜@šžœ1žœžœž˜DKšžœžœžœ˜šœ ˜ Kšœ%˜%Kšœ ˜ Kšœ ˜ K˜——K˜—šžœ ž˜˜ Jšœ žœ%˜2šžœ4žœž˜>šœ ˜ K˜Kšœ ˜ K˜ K˜(K˜——K˜—˜ Jšœ žœ&˜3šžœ5žœž˜?šœ ˜ K˜Kšœ ˜ K˜(K˜—Kšœ‘™‘—K˜—Jšžœ ˜—šžœ žœ"ž˜9Kšœ žœ˜-—K˜K˜—šŸ œžœžœ˜K˜ K˜5Kšœ žœ˜Kšžœ5˜