-- CoordinatorImpl.mesa -- Last edited by -- MBrown on January 30, 1984 12:08:04 pm PST -- Taft on 1-Feb-82 13:16:03 -- Kolling on July 13, 1983 3:27 pm -- NOTES: -- FORK CoordinatorFinishProcess[c, requestedOutcome] is not controlled (might try to fork --too many) and not synchronized (outside call to Finish might get in). -- Simultaneous Finish calls should work better (now, second caller gets result = unknown). -- The mechanism for limiting the number of coordinators is quick and dirty, should be --monitored in a different way. DIRECTORY AlpineEnvironment, AlpineIdentity USING [myLogVolumeID, myAlpineImportHandle, myFileStore], AlpineImport, AlpineInternal, AlpineTransaction, AlpineTransMgr, AlpineTransMgrRpcControl, BasicTime, ClientMap, ConcreteTransID, ConvertUnsafe, Coordinator, CoordinatorControl, CoordinatorInternal, CoordinatorMap, Log, LogControl, LogInline, Process USING [Detach, EnableAborts, SetTimeout, SecondsToTicks, Ticks], RPC, SafeStorage; CoordinatorImpl: MONITOR LOCKS c USING c: Handle IMPORTS AlpineIdentity, AlpineImport, AlpineTransaction, BasicTime, ClientMap, ConvertUnsafe, CoordinatorInternal, CoordinatorMap, Log, LogControl, LogInline, Process, SafeStorage EXPORTS AlpineInternal, AlpineTransaction, AlpineTransMgr, CoordinatorControl = BEGIN Conversation: TYPE = AlpineEnvironment.Conversation; TransID: TYPE = AlpineEnvironment.TransID; nullTransID: TransID = AlpineEnvironment.nullTransID; RecordID: TYPE = Log.RecordID; RecordType: TYPE = Log.RecordType; Results: TYPE = Coordinator.Results; InternalProgrammingError: ERROR = CODE; Handle: TYPE = Coordinator.Handle; nullHandle: Handle = Coordinator.nullHandle; CoordinatorObject: PUBLIC TYPE = Coordinator.Object; -- AlpineInternal.CoordinatorObject secondsWaitAfterCommFailure: INT = CoordinatorInternal.SecondsWaitAfterCommFailure; timeoutForResultsReturned: Process.Ticks = Process.SecondsToTicks[CoordinatorInternal.SecondsTimeoutForResultsReturned]; maxCoordinators: INT _ 40 --CoordinatorInternal.MaxCoordinators--; zone: ZONE = SafeStorage.GetSystemZone[]; Create: PUBLIC PROC [conversation: Conversation, createLocalWorker: BOOL] RETURNS [transID: AlpineTransaction.TransID] = { -- ! AlpineTransaction.OperationFailed[why: busy]; -- AlpineTransaction.Create. -- Called by any Alpine client. c: Handle; IF CoordinatorMap.Count[] >= maxCoordinators THEN GOTO fail; c _ ConsCoordinator[]; CoordinatorMap.Register[c]; IF createLocalWorker THEN AlpineTransaction.CreateWorker[conversation, c.transID, AlpineIdentity.myFileStore ! AlpineTransaction.Unknown => GOTO fail]; RETURN [c.transID]; EXITS fail => ERROR AlpineTransaction.OperationFailed[why: busy]; }; ConsCoordinator: PROC [] RETURNS [Handle] = { newTrans: Handle = zone.NEW[Coordinator.Object _ []]; Process.SetTimeout[@newTrans.resultsReturned, timeoutForResultsReturned]; --Process.EnableAborts[@newTrans.resultsReturned]; CoordinatorInternal.NextTransID[newTrans]; --writes coordinatorBegin RETURN [newTrans] }; RegisterWorker: PUBLIC PROC [ conversation: Conversation, trans: TransID] RETURNS [AlpineTransMgr.RegisterWorkerResult] = { -- ! (none); -- AlpineTransMgr.RegisterWorker. -- The caller is a worker (identity obtainable via "conversation") with no monitors --locked. RegisterWorkerEntry: ENTRY PROC [c: Handle] RETURNS [AlpineTransMgr.RegisterWorkerResult] = { -- c # nullHandle. worker: AlpineImport.Handle; IF c.state # active THEN RETURN [transNotActive]; worker _ AlpineImport.Register[ClientMap.GetName[conversation]]; FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO IF worker.Equal[l.first.worker] THEN RETURN [duplicateCall]; ENDLOOP; ConsWorker[c, worker]; RETURN [ok]; };--RegisterWorkerEntry c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN [transNotActive]; RETURN [RegisterWorkerEntry[c]]; }; ConsWorker: PROC [c: Handle, w: AlpineImport.Handle] = { l: Coordinator.WorkerHandle _ zone.CONS[[worker: w], c.workers]; c.workers _ l; LogCoordinatorRegisterWorker[c, w]; }; CreateWithWorkers: INTERNAL PROC [c: Handle] RETURNS [Handle] = { -- Creates a new transaction, and simulates RegisterWorker for each worker --of existing transaction c. Returns the new transaction, without registering it. newTrans: Handle = ConsCoordinator[]; FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO ConsWorker[newTrans, l.first.worker]; ENDLOOP; RETURN [newTrans] }; Finish: PUBLIC PROC [ conversation: RPC.Conversation, transID: AlpineTransaction.TransID, requestedOutcome: AlpineTransaction.RequestedOutcome, continue: BOOL] RETURNS [outcome: AlpineTransaction.Outcome, newTrans: AlpineTransaction.TransID] = { -- ! none (but may return "unknown" as a result.) -- AlpineTransaction.Finish newTransCoordinator: Handle; c: Handle = CoordinatorMap.GetHandle[trans: transID]; IF c = nullHandle THEN RETURN [unknown, nullTransID]; IF requestedOutcome = abort THEN continue _ FALSE; [outcome, newTransCoordinator] _ FinishEntry[c, requestedOutcome, continue]; IF continue AND outcome = abort AND newTransCoordinator # nullHandle THEN { [] _ FinishEntry[newTransCoordinator, abort, FALSE]; newTransCoordinator _ nullHandle }; RETURN [outcome, IF newTransCoordinator = nullHandle THEN nullTransID ELSE newTransCoordinator.transID] }; FinishEntry: ENTRY PROC [ c: Handle, requestedOutcome: AlpineTransaction.RequestedOutcome, continue: BOOL] RETURNS [AlpineTransaction.Outcome, Handle] = { -- ! (none); newTransCoordinator: Handle _ nullHandle; newTransID: TransID _ nullTransID; { IF c.finishInProgress THEN { -- wait for finish to complete, then return unknown WHILE c.finishInProgress DO WAIT finishComplete; ENDLOOP; RETURN [unknown, nullHandle]; }; c.finishInProgress _ TRUE; SELECT c.state FROM active => NULL; --normal-- collecting, completing => NULL; --FinishEntry has been called from recovery-- complete => GOTO Done; --FinishEntry has been called from two nearly simultaneous calls to Finish-- ENDCASE => ERROR; IF requestedOutcome = abort THEN c.outcome _ abort; IF continue THEN { newTransCoordinator _ CreateWithWorkers[c]; newTransID _ newTransCoordinator.transID; }; -- General two-phase commit protocol. IF c.state = active THEN { Log.Force[followingRecord: c.forceRecord]; c.state _ collecting; }; -- c.state IN [collecting .. completing] DO -- Try to make progress toward c.state = complete. noneActive: BOOL _ TRUE; allComplete: BOOL _ TRUE; -- Get results from previous calls, if any. FOR w: Coordinator.WorkerHandle _ c.workers, w.rest UNTIL w = NIL DO WITH w.first.resultsOfMostRecentCall SELECT FROM n: Results.none => { }; p: Results.prepare => { w.first.resultsOfMostRecentCall _ [none, none[]]; w.first.lastPrepareResult _ p; SELECT p.communicationError FROM none => { w.first.communicationTrouble _ FALSE; SELECT p.prepareResult FROM notReady => { --vote no-- w.first.state _ complete; IF c.outcome = commit THEN ERROR; c.outcome _ abort; }; readOnlyReady => { w.first.state _ complete }; ready => { w.first.state _ ready; c.aWorkerBecameReady _ TRUE; }; ENDCASE => ERROR; }; bindingFailed, callFailed, busy => { --vote no-- IF c.outcome = commit THEN ERROR; c.outcome _ abort; w.first.communicationTrouble _ TRUE; w.first.timeForNextCall _ BasicTime.Update[ base: BasicTime.Now[], period: secondsWaitAfterCommFailure]; }; ENDCASE => ERROR; }; f: Results.finish => { w.first.resultsOfMostRecentCall _ [none, none[]]; w.first.lastFinishResult _ f; SELECT f.communicationError FROM none => { w.first.communicationTrouble _ FALSE; w.first.state _ complete; }; bindingFailed, callFailed, busy => { w.first.communicationTrouble _ TRUE; w.first.timeForNextCall _ BasicTime.Update[ base: BasicTime.Now[], period: secondsWaitAfterCommFailure]; }; ENDCASE => ERROR; }; ENDCASE => ERROR; IF w.first.state = active THEN noneActive _ FALSE; IF w.first.state # complete THEN allComplete _ FALSE; ENDLOOP; -- Make state transitions if possible. IF c.state = collecting AND (c.outcome = abort OR noneActive) THEN { c.state _ completing; IF c.outcome = unknown THEN c.outcome _ commit; LogCoordinatorCompleting[c: c, outcome: c.outcome, force: c.outcome = abort OR c.aWorkerBecameReady]; }; IF c.state = completing AND allComplete THEN { c.state _ complete; LogCoordinatorComplete[c]; GOTO Done; }; -- Issue new calls. FOR w: Coordinator.WorkerHandle _ c.workers, w.rest UNTIL w = NIL DO IF w.first.state # complete AND w.first.callInProgress = none AND (NOT w.first.communicationTrouble OR IsTimeForNextCall[w]) THEN { IF c.state = collecting AND w.first.state = active THEN { CoordinatorInternal.PassParms[[prepare, c, w, newTransID, commit--ignored--]]; w.first.callInProgress _ prepare; } ELSE IF c.state = completing THEN { CoordinatorInternal.PassParms[[finish, c, w, nullTransID--ignored--, IF c.outcome = commit THEN commit ELSE abort]]; w.first.callInProgress _ finish; }; }; ENDLOOP; WAIT c.resultsReturned; -- wake up when a result is returned or by timeout ENDLOOP; EXITS Done => { c.finishInProgress _ FALSE; CoordinatorMap.Unregister[c]; IF newTransCoordinator # nullHandle THEN CoordinatorMap.Register[newTransCoordinator]; RETURN [c.outcome, newTransCoordinator] } }};--FinishEntry finishComplete: CONDITION; IsTimeForNextCall: PROC [w: Coordinator.WorkerHandle] RETURNS [BOOL] = INLINE { RETURN [BasicTime.Period[from: w.first.timeForNextCall, to: BasicTime.Now[]] >= 0] }; LogCoordinatorRegisterWorker: PROC [ c: Handle, w: AlpineImport.Handle] = { followingRecord: Log.RecordID; rec: Coordinator.RegisterWorkerLogRep _ []; ConvertUnsafe.AppendRope[to: LOOPHOLE[LONG[@rec.worker]], from: w.Name]; [followingRecord: followingRecord] _ Log.CoordinatorWrite[c, coordinatorRegisterWorker, [base: @rec, length: TEXT[rec.length].SIZE]]; IF NOT w.Equal[AlpineIdentity.myAlpineImportHandle] THEN c.forceRecord _ followingRecord; }; LogCoordinatorCompleting: PROC [c: Handle, outcome: AlpineEnvironment.CommitOrAbort, force: BOOL] = { rec: Coordinator.CompletingLogRep _ [outcome: outcome]; [followingRecord: c.forceRecord] _ Log.CoordinatorWrite[c, coordinatorCompleting, [base: @rec, length: Coordinator.CompletingLogRep.SIZE], force]; }; LogCoordinatorComplete: PROC [c: Handle] = { [followingRecord: c.forceRecord] _ Log.CoordinatorWrite[c, coordinatorComplete, Log.nullBlock]; }; AnalyzeCoordinatorBegin: PROC [record: RecordID, type: RecordType, trans: TransID] = { c: Handle = zone.NEW[Coordinator.Object _ [transID: trans, beginRecord: record]]; CoordinatorMap.Register[c]; CoordinatorInternal.NoticeCoordinatorBegin[trans]; }; AnalyzeCoordinatorRegisterWorker: PROC [ record: RecordID, type: RecordType, trans: TransID] = { worker: AlpineImport.Handle; { -- get worker name from log, map it into a AlpineImport.Handle. rec: Coordinator.RegisterWorkerLogRep _ []; status: Log.ReadProcStatus; [status: status] _ Log.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Coordinator.RegisterWorkerLogRep.SIZE]]; IF status = destinationFull THEN ERROR InternalProgrammingError; worker _ AlpineImport.Register[ server: ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.worker]]]]; }; { -- map trans into a coordinator handle, then add worker to volatile state c: Handle = CoordinatorMap.GetHandle[trans]; l: Coordinator.WorkerHandle; IF c = nullHandle THEN RETURN; IF c.state # active THEN ERROR; FOR l _ c.workers, l.rest UNTIL l = NIL DO IF worker.Equal[l.first.worker] THEN ERROR InternalProgrammingError; ENDLOOP; l _ zone.CONS[[worker: worker], c.workers]; c.workers _ l }; }; AnalyzeCoordinatorCompleting: PROC [ record: RecordID, type: RecordType, trans: TransID] = { outcome: AlpineEnvironment.CommitOrAbort; { -- get outcome from log rec: Coordinator.CompletingLogRep; status: Log.ReadProcStatus; [status: status] _ Log.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Coordinator.CompletingLogRep.SIZE]]; IF status # normal THEN ERROR InternalProgrammingError; outcome _ rec.outcome; }; { c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN; IF c.state # active THEN ERROR InternalProgrammingError; c.state _ completing; c.outcome _ outcome; IF c.outcome = commit THEN FOR l: Coordinator.WorkerHandle _ c.workers, l.rest UNTIL l = NIL DO l.first.state _ ready ENDLOOP; }; }; AnalyzeCoordinatorComplete: PROC [ record: RecordID, type: RecordType, trans: TransID] = { c: Handle = CoordinatorMap.GetHandle[trans]; IF c = nullHandle THEN RETURN; IF c.state # completing THEN ERROR InternalProgrammingError; c.state _ complete; CoordinatorMap.Unregister[c]; }; CallAfterAnalysisPass: PUBLIC PROC [] = { -- CoordinatorControl.CallAfterAnalysisPass CoordinatorInternal.InitTransIDGenerator[AlpineIdentity.myLogVolumeID]; }; CallAfterUpdatePass: PUBLIC PROC [] = { -- CoordinatorControl.CallAfterUpdatePass EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = { requestedOutcome: AlpineTransaction.RequestedOutcome _ abort; SELECT c.state FROM complete => ERROR; active, collecting => NULL; completing => IF c.outcome = commit THEN requestedOutcome _ commit; ENDCASE => ERROR; Process.Detach[FORK CoordinatorFinishProcess[c, requestedOutcome]]; RETURN [stop: FALSE]; }; CoordinatorMap.LockedEnumerate[EnumProc]; }; CoordinatorFinishProcess: PROC [c: Handle, requestedOutcome: AlpineTransaction.RequestedOutcome] = { [] _ FinishEntry [c, requestedOutcome, FALSE]; }; CoordinatorCheckpointProc: PROC [] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { oldestBeginRecord: Log.RecordID _ Log.lastRecordID; EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = { oldestBeginRecord _ LogInline.Min[c.beginRecord, oldestBeginRecord]; RETURN [stop: FALSE]; }; CoordinatorMap.UnlockedEnumerate[EnumProc]; RETURN [oldestBeginRecord, oldestBeginRecord]; }; LogControl.RegisterAnalysisProc[coordinatorBegin, AnalyzeCoordinatorBegin]; LogControl.RegisterAnalysisProc[coordinatorRegisterWorker, AnalyzeCoordinatorRegisterWorker]; LogControl.RegisterAnalysisProc[coordinatorCompleting, AnalyzeCoordinatorCompleting]; LogControl.RegisterAnalysisProc[coordinatorComplete, AnalyzeCoordinatorComplete]; LogControl.RegisterCheckpointProc[CoordinatorCheckpointProc]; Process.SetTimeout[@finishComplete, Process.SecondsToTicks[5]]; Process.EnableAborts[@finishComplete]; END.