-- CoordinatorImpl.mesa
-- Last edited by
-- MBrown on January 30, 1984 12:08:04 pm PST
-- Taft on 1-Feb-82 13:16:03
-- Kolling on July 13, 1983 3:27 pm
-- NOTES:
-- FORK CoordinatorFinishProcess[c, requestedOutcome] is not controlled (might try to fork
--too many) and not synchronized (outside call to Finish might get in).
-- Simultaneous Finish calls should work better (now, second caller gets result = unknown).
-- The mechanism for limiting the number of coordinators is quick and dirty, should be
--monitored in a different way.
DIRECTORY
AlpineEnvironment,
AlpineIdentity USING [myLogVolumeID, myAlpineImportHandle, myFileStore],
AlpineImport,
AlpineInternal,
AlpineTransaction,
AlpineTransMgr,
AlpineTransMgrRpcControl,
BasicTime,
ClientMap,
ConcreteTransID,
ConvertUnsafe,
Coordinator,
CoordinatorControl,
CoordinatorInternal,
CoordinatorMap,
Log,
LogControl,
LogInline,
Process USING [Detach, EnableAborts, SetTimeout, SecondsToTicks, Ticks],
RPC,
SafeStorage;
CoordinatorImpl: MONITOR LOCKS c USING c: Handle
IMPORTS
AlpineIdentity,
AlpineImport,
AlpineTransaction,
BasicTime,
ClientMap,
ConvertUnsafe,
CoordinatorInternal,
CoordinatorMap,
Log,
LogControl,
LogInline,
Process,
SafeStorage
EXPORTS
AlpineInternal,
AlpineTransaction,
AlpineTransMgr,
CoordinatorControl
= BEGIN
Conversation: TYPE = AlpineEnvironment.Conversation;
TransID: TYPE = AlpineEnvironment.TransID;
nullTransID: TransID = AlpineEnvironment.nullTransID;
RecordID: TYPE = Log.RecordID;
RecordType: TYPE = Log.RecordType;
Results: TYPE = Coordinator.Results;
InternalProgrammingError: ERROR = CODE;
Handle: TYPE = Coordinator.Handle;
nullHandle: Handle = Coordinator.nullHandle;
CoordinatorObject: PUBLIC TYPE = Coordinator.Object;
-- AlpineInternal.CoordinatorObject
secondsWaitAfterCommFailure: INT = CoordinatorInternal.SecondsWaitAfterCommFailure;
timeoutForResultsReturned: Process.Ticks =
Process.SecondsToTicks[CoordinatorInternal.SecondsTimeoutForResultsReturned];
maxCoordinators: INT ← 40 --CoordinatorInternal.MaxCoordinators--;
zone: ZONE = SafeStorage.GetSystemZone[];
Create: PUBLIC PROC [conversation: Conversation, createLocalWorker: BOOL]
RETURNS [transID: AlpineTransaction.TransID] = {
-- ! AlpineTransaction.OperationFailed[why: busy];
-- AlpineTransaction.Create.
-- Called by any Alpine client.
c: Handle;
IF CoordinatorMap.Count[] >= maxCoordinators THEN GOTO fail;
c ← ConsCoordinator[];
CoordinatorMap.Register[c];
IF createLocalWorker THEN
AlpineTransaction.CreateWorker[conversation, c.transID, AlpineIdentity.myFileStore !
AlpineTransaction.Unknown => GOTO fail];
RETURN [c.transID];
EXITS
fail => ERROR AlpineTransaction.OperationFailed[why: busy];
};
ConsCoordinator: PROC [] RETURNS [Handle] = {
newTrans: Handle = zone.NEW[Coordinator.Object ← []];
Process.SetTimeout[@newTrans.resultsReturned, timeoutForResultsReturned];
--Process.EnableAborts[@newTrans.resultsReturned];
CoordinatorInternal.NextTransID[newTrans]; --writes coordinatorBegin
RETURN [newTrans] };
RegisterWorker: PUBLIC PROC [
conversation: Conversation, trans: TransID]
RETURNS [AlpineTransMgr.RegisterWorkerResult] = {
-- ! (none);
-- AlpineTransMgr.RegisterWorker.
-- The caller is a worker (identity obtainable via "conversation") with no monitors
--locked.
RegisterWorkerEntry: ENTRY PROC [c: Handle]
RETURNS [AlpineTransMgr.RegisterWorkerResult] = {
-- c # nullHandle.
worker: AlpineImport.Handle;
IF c.state # active THEN RETURN [transNotActive];
worker ← AlpineImport.Register[ClientMap.GetName[conversation]];
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
IF worker.Equal[l.first.worker] THEN RETURN [duplicateCall];
ENDLOOP;
ConsWorker[c, worker];
RETURN [ok];
};--RegisterWorkerEntry
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN [transNotActive];
RETURN [RegisterWorkerEntry[c]];
};
ConsWorker: PROC [c: Handle, w: AlpineImport.Handle] = {
l: Coordinator.WorkerHandle ← zone.CONS[[worker: w], c.workers];
c.workers ← l;
LogCoordinatorRegisterWorker[c, w];
};
CreateWithWorkers: INTERNAL PROC [c: Handle]
RETURNS [Handle] = {
-- Creates a new transaction, and simulates RegisterWorker for each worker
--of existing transaction c. Returns the new transaction, without registering it.
newTrans: Handle = ConsCoordinator[];
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
ConsWorker[newTrans, l.first.worker];
ENDLOOP;
RETURN [newTrans]
};
Finish: PUBLIC PROC [
conversation: RPC.Conversation, transID: AlpineTransaction.TransID,
requestedOutcome: AlpineTransaction.RequestedOutcome,
continue: BOOL]
RETURNS [outcome: AlpineTransaction.Outcome, newTrans: AlpineTransaction.TransID] = {
-- ! none (but may return "unknown" as a result.)
-- AlpineTransaction.Finish
newTransCoordinator: Handle;
c: Handle = CoordinatorMap.GetHandle[trans: transID];
IF c = nullHandle THEN RETURN [unknown, nullTransID];
IF requestedOutcome = abort THEN continue ← FALSE;
[outcome, newTransCoordinator] ← FinishEntry[c, requestedOutcome, continue];
IF continue AND outcome = abort
AND newTransCoordinator # nullHandle THEN {
[] ← FinishEntry[newTransCoordinator, abort, FALSE];
newTransCoordinator ← nullHandle };
RETURN [outcome, IF newTransCoordinator = nullHandle THEN
nullTransID ELSE newTransCoordinator.transID]
};
FinishEntry: ENTRY PROC [
c: Handle,
requestedOutcome: AlpineTransaction.RequestedOutcome,
continue: BOOL]
RETURNS [AlpineTransaction.Outcome, Handle] = {
-- ! (none);
newTransCoordinator: Handle ← nullHandle;
newTransID: TransID ← nullTransID;
{
IF c.finishInProgress THEN {
-- wait for finish to complete, then return unknown
WHILE c.finishInProgress DO
WAIT finishComplete;
ENDLOOP;
RETURN [unknown, nullHandle];
};
c.finishInProgress ← TRUE;
SELECT c.state FROM
active => NULL;
--normal--
collecting, completing => NULL;
--FinishEntry has been called from recovery--
complete => GOTO Done;
--FinishEntry has been called from two nearly simultaneous calls to Finish--
ENDCASE => ERROR;
IF requestedOutcome = abort THEN c.outcome ← abort;
IF continue THEN {
newTransCoordinator ← CreateWithWorkers[c];
newTransID ← newTransCoordinator.transID;
};
-- General two-phase commit protocol.
IF c.state = active THEN {
Log.Force[followingRecord: c.forceRecord];
c.state ← collecting;
};
-- c.state IN [collecting .. completing]
DO
-- Try to make progress toward c.state = complete.
noneActive: BOOL ← TRUE;
allComplete: BOOL ← TRUE;
-- Get results from previous calls, if any.
FOR w: Coordinator.WorkerHandle ← c.workers, w.rest UNTIL w = NIL DO
WITH w.first.resultsOfMostRecentCall SELECT FROM
n: Results.none => { };
p: Results.prepare => {
w.first.resultsOfMostRecentCall ← [none, none[]];
w.first.lastPrepareResult ← p;
SELECT p.communicationError FROM
none => {
w.first.communicationTrouble ← FALSE;
SELECT p.prepareResult FROM
notReady => { --vote no--
w.first.state ← complete;
IF c.outcome = commit THEN ERROR;
c.outcome ← abort;
};
readOnlyReady => { w.first.state ← complete };
ready => {
w.first.state ← ready;
c.aWorkerBecameReady ← TRUE;
};
ENDCASE => ERROR;
};
bindingFailed, callFailed, busy => { --vote no--
IF c.outcome = commit THEN ERROR;
c.outcome ← abort;
w.first.communicationTrouble ← TRUE;
w.first.timeForNextCall ← BasicTime.Update[
base: BasicTime.Now[], period: secondsWaitAfterCommFailure];
};
ENDCASE => ERROR;
};
f: Results.finish => {
w.first.resultsOfMostRecentCall ← [none, none[]];
w.first.lastFinishResult ← f;
SELECT f.communicationError FROM
none => {
w.first.communicationTrouble ← FALSE;
w.first.state ← complete;
};
bindingFailed, callFailed, busy => {
w.first.communicationTrouble ← TRUE;
w.first.timeForNextCall ← BasicTime.Update[
base: BasicTime.Now[], period: secondsWaitAfterCommFailure];
};
ENDCASE => ERROR;
};
ENDCASE => ERROR;
IF w.first.state = active THEN noneActive ← FALSE;
IF w.first.state # complete THEN allComplete ← FALSE;
ENDLOOP;
-- Make state transitions if possible.
IF c.state = collecting AND (c.outcome = abort OR noneActive) THEN {
c.state ← completing;
IF c.outcome = unknown THEN c.outcome ← commit;
LogCoordinatorCompleting[c: c, outcome: c.outcome,
force: c.outcome = abort OR c.aWorkerBecameReady];
};
IF c.state = completing AND allComplete THEN {
c.state ← complete;
LogCoordinatorComplete[c];
GOTO Done;
};
-- Issue new calls.
FOR w: Coordinator.WorkerHandle ← c.workers, w.rest UNTIL w = NIL DO
IF w.first.state # complete AND w.first.callInProgress = none AND
(NOT w.first.communicationTrouble OR IsTimeForNextCall[w]) THEN {
IF c.state = collecting AND w.first.state = active THEN {
CoordinatorInternal.PassParms[[prepare, c, w, newTransID, commit--ignored--]];
w.first.callInProgress ← prepare;
}
ELSE IF c.state = completing THEN {
CoordinatorInternal.PassParms[[finish, c, w, nullTransID--ignored--,
IF c.outcome = commit THEN commit ELSE abort]];
w.first.callInProgress ← finish;
};
};
ENDLOOP;
WAIT c.resultsReturned;
-- wake up when a result is returned or by timeout
ENDLOOP;
EXITS Done => {
c.finishInProgress ← FALSE;
CoordinatorMap.Unregister[c];
IF newTransCoordinator # nullHandle THEN
CoordinatorMap.Register[newTransCoordinator];
RETURN [c.outcome, newTransCoordinator] }
}};--FinishEntry
finishComplete: CONDITION;
IsTimeForNextCall: PROC [w: Coordinator.WorkerHandle] RETURNS [BOOL] = INLINE {
RETURN [BasicTime.Period[from: w.first.timeForNextCall, to: BasicTime.Now[]] >= 0]
};
LogCoordinatorRegisterWorker: PROC [
c: Handle, w: AlpineImport.Handle] = {
followingRecord: Log.RecordID;
rec: Coordinator.RegisterWorkerLogRep ← [];
ConvertUnsafe.AppendRope[to: LOOPHOLE[LONG[@rec.worker]], from: w.Name];
[followingRecord: followingRecord] ← Log.CoordinatorWrite[c, coordinatorRegisterWorker,
[base: @rec, length: TEXT[rec.length].SIZE]];
IF NOT w.Equal[AlpineIdentity.myAlpineImportHandle] THEN
c.forceRecord ← followingRecord;
};
LogCoordinatorCompleting: PROC [c: Handle,
outcome: AlpineEnvironment.CommitOrAbort, force: BOOL] = {
rec: Coordinator.CompletingLogRep ← [outcome: outcome];
[followingRecord: c.forceRecord] ← Log.CoordinatorWrite[c, coordinatorCompleting,
[base: @rec, length: Coordinator.CompletingLogRep.SIZE], force];
};
LogCoordinatorComplete: PROC [c: Handle] = {
[followingRecord: c.forceRecord] ← Log.CoordinatorWrite[c, coordinatorComplete,
Log.nullBlock];
};
AnalyzeCoordinatorBegin: PROC [record: RecordID, type: RecordType, trans: TransID] = {
c: Handle = zone.NEW[Coordinator.Object ← [transID: trans, beginRecord: record]];
CoordinatorMap.Register[c];
CoordinatorInternal.NoticeCoordinatorBegin[trans];
};
AnalyzeCoordinatorRegisterWorker: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
worker: AlpineImport.Handle;
{ -- get worker name from log, map it into a AlpineImport.Handle.
rec: Coordinator.RegisterWorkerLogRep ← [];
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Coordinator.RegisterWorkerLogRep.SIZE]];
IF status = destinationFull THEN ERROR InternalProgrammingError;
worker ← AlpineImport.Register[
server: ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.worker]]]];
};
{ -- map trans into a coordinator handle, then add worker to volatile state
c: Handle = CoordinatorMap.GetHandle[trans];
l: Coordinator.WorkerHandle;
IF c = nullHandle THEN RETURN;
IF c.state # active THEN ERROR;
FOR l ← c.workers, l.rest UNTIL l = NIL DO
IF worker.Equal[l.first.worker] THEN ERROR InternalProgrammingError;
ENDLOOP;
l ← zone.CONS[[worker: worker], c.workers];
c.workers ← l
};
};
AnalyzeCoordinatorCompleting: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
outcome: AlpineEnvironment.CommitOrAbort;
{ -- get outcome from log
rec: Coordinator.CompletingLogRep;
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Coordinator.CompletingLogRep.SIZE]];
IF status # normal THEN ERROR InternalProgrammingError;
outcome ← rec.outcome;
};
{
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN;
IF c.state # active THEN ERROR InternalProgrammingError;
c.state ← completing;
c.outcome ← outcome;
IF c.outcome = commit THEN
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
l.first.state ← ready
ENDLOOP;
};
};
AnalyzeCoordinatorComplete: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN;
IF c.state # completing THEN ERROR InternalProgrammingError;
c.state ← complete;
CoordinatorMap.Unregister[c];
};
CallAfterAnalysisPass: PUBLIC PROC [] = {
-- CoordinatorControl.CallAfterAnalysisPass
CoordinatorInternal.InitTransIDGenerator[AlpineIdentity.myLogVolumeID];
};
CallAfterUpdatePass: PUBLIC PROC [] = {
-- CoordinatorControl.CallAfterUpdatePass
EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = {
requestedOutcome: AlpineTransaction.RequestedOutcome ← abort;
SELECT c.state FROM
complete => ERROR;
active, collecting => NULL;
completing => IF c.outcome = commit THEN requestedOutcome ← commit;
ENDCASE => ERROR;
Process.Detach[FORK CoordinatorFinishProcess[c, requestedOutcome]];
RETURN [stop: FALSE];
};
CoordinatorMap.LockedEnumerate[EnumProc];
};
CoordinatorFinishProcess: PROC [c: Handle,
requestedOutcome: AlpineTransaction.RequestedOutcome] = {
[] ← FinishEntry [c, requestedOutcome, FALSE];
};
CoordinatorCheckpointProc: PROC []
RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
oldestBeginRecord: Log.RecordID ← Log.lastRecordID;
EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = {
oldestBeginRecord ← LogInline.Min[c.beginRecord, oldestBeginRecord];
RETURN [stop: FALSE];
};
CoordinatorMap.UnlockedEnumerate[EnumProc];
RETURN [oldestBeginRecord, oldestBeginRecord];
};
LogControl.RegisterAnalysisProc[coordinatorBegin, AnalyzeCoordinatorBegin];
LogControl.RegisterAnalysisProc[coordinatorRegisterWorker, AnalyzeCoordinatorRegisterWorker];
LogControl.RegisterAnalysisProc[coordinatorCompleting, AnalyzeCoordinatorCompleting];
LogControl.RegisterAnalysisProc[coordinatorComplete, AnalyzeCoordinatorComplete];
LogControl.RegisterCheckpointProc[CoordinatorCheckpointProc];
Process.SetTimeout[@finishComplete, Process.SecondsToTicks[5]];
Process.EnableAborts[@finishComplete];
END.