CoordinatorImpl.mesa
Copyright © 1984 by Xerox Corporation. All rights reserved.
Last edited by
Taft on 1-Feb-82 13:16:03
Kolling on July 13, 1983 3:27 pm
MBrown on January 30, 1984 12:08:04 pm PST
Last Edited by: Kupfer, August 6, 1984 3:05:48 pm PDT
NOTES:
FORK CoordinatorFinishProcess[c, requestedOutcome] is not controlled (might try to fork too many) and not synchronized (outside call to Finish might get in).
Simultaneous Finish calls should work better (now, second caller gets result = unknown).
The mechanism for limiting the number of coordinators is quick and dirty, should be monitored in a different way.
DIRECTORY
AlpineEnvironment,
AlpineIdentity USING [myLogVolumeID, myAlpineImportHandle, myFileStore],
AlpineImport,
AlpineInternal,
AlpineTransaction,
AlpineTransMgr,
AlpineTransMgrRpcControl,
BasicTime,
ClientMap,
ConcreteTransID,
ConvertUnsafe,
Coordinator,
CoordinatorControl,
CoordinatorExtras USING [Info],
CoordinatorInternal,
CoordinatorMap,
Log,
LogControl,
LogInline,
Process USING [Detach, EnableAborts, SetTimeout, SecondsToTicks, Ticks],
Rope USING [Cat, Concat, ROPE],
RPC USING [Conversation, GetCaller],
SafeStorage,
SkiPatrolLog USING [AbortReason, notice, TransactionAbortInfo, TransactionBeginInfo, TransactionCommitInfo, OpFailureInfo];
CoordinatorImpl: MONITOR LOCKS c USING c: Handle
IMPORTS
AlpineIdentity,
AlpineImport,
AlpineTransaction,
BasicTime,
ClientMap,
ConvertUnsafe,
CoordinatorInternal,
CoordinatorMap,
Log,
LogControl,
LogInline,
Process,
Rope,
RPC,
SafeStorage,
SkiPatrolLog
EXPORTS
AlpineInternal,
AlpineTransaction,
AlpineTransMgr,
CoordinatorControl
= BEGIN
Conversation: TYPE = AlpineEnvironment.Conversation;
TransID: TYPE = AlpineEnvironment.TransID;
nullTransID: TransID = AlpineEnvironment.nullTransID;
RecordID: TYPE = Log.RecordID;
RecordType: TYPE = Log.RecordType;
Results: TYPE = Coordinator.Results;
InternalProgrammingError: ERROR = CODE;
Handle: TYPE = Coordinator.Handle;
nullHandle: Handle = Coordinator.nullHandle;
AbortReason: TYPE = SkiPatrolLog.AbortReason;
CoordinatorObject: PUBLIC TYPE = Coordinator.Object;
AlpineInternal.CoordinatorObject
secondsWaitAfterCommFailure: INT = CoordinatorInternal.SecondsWaitAfterCommFailure;
timeoutForResultsReturned: Process.Ticks =
Process.SecondsToTicks[CoordinatorInternal.SecondsTimeoutForResultsReturned];
maxCoordinators: INT ← CoordinatorInternal.MaxCoordinators;
zone: ZONE = SafeStorage.GetSystemZone[];
Create: PUBLIC PROC [conversation: Conversation, createLocalWorker: BOOL]
RETURNS [transID: AlpineTransaction.TransID] = {
! AlpineTransaction.OperationFailed[why: busy];
AlpineTransaction.Create.
Called by any Alpine client.
c: Handle;
message: Rope.ROPE;
logBeginProc: PROC [SkiPatrolLog.TransactionBeginInfo]; -- (used to prevent race condition)
{IF CoordinatorMap.Count[] >= maxCoordinators THEN {
message ← "hit limit on coordinators";
GOTO serverBusy;
};
c ← ConsCoordinator[];
c.extras ← NEW[CoordinatorExtras.Info];
c.extras.userRName ← RPC.GetCaller[conversation];
CoordinatorMap.Register[c];
IF createLocalWorker THEN
As an agent of the client who called us, create a worker. Under the current implementation (coordinator and worker always on the same machine), we really oughta know our own RName and the current transaction ID. However, in the glorious future, it is possible that the (remote) server won't know about the transaction yet (or that the server crashed), and it's possible that Grapevine is too busy to tell the server our RName. So if Unknown is raised, convert it to a server-busy error, but record what went wrong via SkiPatrolLog.
AlpineTransaction.CreateWorker[conversation, c.transID, AlpineIdentity.myFileStore !
AlpineTransaction.Unknown => {
message ← SELECT what FROM
transID => "worker didn't recognize transID",
coordinator => "worker doesn't know who we are"
ENDCASE => ERROR;
GOTO serverBusy
};
AlpineTransaction.OperationFailed[busy] => {
message ← "RPC returned `busy'";
GOTO serverBusy
}
];
IF (logBeginProc ← SkiPatrolLog.notice.beginTransaction) # NIL THEN
logBeginProc[[
transID: c.transID,
where: "CoordinatorImpl.Create",
message: ""
]];
RETURN [c.transID];
EXITS
serverBusy => {
logProc: PROC [SkiPatrolLog.OpFailureInfo];
IF (logProc ← SkiPatrolLog.notice.operationFailed) # NIL THEN
logProc[[
what: busy,
where: "CoordinatorImpl.Create",
message: message.Cat["; user = ", RPC.GetCaller[conversation]]
]];
ERROR AlpineTransaction.OperationFailed[why: busy];
}
}
};
ConsCoordinator: PROC [] RETURNS [Handle] = {
newTrans: Handle = zone.NEW[Coordinator.Object ← []];
Process.SetTimeout[@newTrans.resultsReturned, timeoutForResultsReturned];
Process.EnableAborts[@newTrans.resultsReturned];
CoordinatorInternal.NextTransID[newTrans]; --writes coordinatorBegin
RETURN [newTrans] };
RegisterWorker: PUBLIC PROC [
conversation: Conversation, trans: TransID]
RETURNS [AlpineTransMgr.RegisterWorkerResult] = {
! (none);
AlpineTransMgr.RegisterWorker.
The caller is a worker (identity obtainable via "conversation") with no monitors locked.
RegisterWorkerEntry: ENTRY PROC [c: Handle]
RETURNS [AlpineTransMgr.RegisterWorkerResult] = {
c # nullHandle.
worker: AlpineImport.Handle;
IF c.state # active THEN RETURN [transNotActive];
worker ← AlpineImport.Register[ClientMap.GetName[conversation]];
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
IF worker.Equal[l.first.worker] THEN RETURN [duplicateCall];
ENDLOOP;
ConsWorker[c, worker];
RETURN [ok];
};--RegisterWorkerEntry
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN [transNotActive];
RETURN [RegisterWorkerEntry[c]];
};
ConsWorker: PROC [c: Handle, w: AlpineImport.Handle] = {
l: Coordinator.WorkerHandle ← zone.CONS[[worker: w], c.workers];
c.workers ← l;
LogCoordinatorRegisterWorker[c, w];
};
CreateWithWorkers: INTERNAL PROC [c: Handle]
RETURNS [Handle] = {
Creates a new transaction, and simulates RegisterWorker for each worker of existing transaction c. Returns the new transaction, without registering it.
newTrans: Handle = ConsCoordinator[];
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
ConsWorker[newTrans, l.first.worker];
ENDLOOP;
RETURN [newTrans]
};
Finish: PUBLIC PROC [
conversation: RPC.Conversation, transID: AlpineTransaction.TransID,
requestedOutcome: AlpineTransaction.RequestedOutcome,
continue: BOOL]
RETURNS [outcome: AlpineTransaction.Outcome, newTrans: AlpineTransaction.TransID] = {
! (none) (but may return "unknown" as a result.)
AlpineTransaction.Finish
newTransCoordinator: Handle;
c: Handle = CoordinatorMap.GetHandle[trans: transID];
reason: AbortReason;  -- reason for aborting the transaction
owner: Rope.ROPE;  -- owner of the to-be-finished transaction
IF c = nullHandle THEN RETURN [unknown, nullTransID];
owner ← c.extras.userRName;
IF requestedOutcome = abort THEN continue ← FALSE;
[outcome, newTransCoordinator, reason] ← FinishEntry[c, requestedOutcome, continue];
If we asked to continue the transaction but it aborted on us, we have to clean up (well, I think that's what happens next). I guess this means that logged transaction starts might occasionally skip transaction ID's.
IF continue AND outcome = abort
AND newTransCoordinator # nullHandle THEN {
[] ← FinishEntry[newTransCoordinator, abort, --continue:-- FALSE];
newTransCoordinator ← nullHandle };
If we really did create a new transaction, then record the user name in the coordinator object and note the new transaction (if the probe is enabled).
IF newTransCoordinator # nullHandle THEN {
logProc: PROC [SkiPatrolLog.TransactionBeginInfo];
newTransCoordinator.extras ← NEW[CoordinatorExtras.Info];
newTransCoordinator.extras.userRName ← RPC.GetCaller[conversation];
IF (logProc ← SkiPatrolLog.notice.beginTransaction) # NIL THEN
logProc[[
transID: newTransCoordinator.transID,
where: "CoordinatorImpl.Finish",
message: ""
]];
};
SELECT outcome FROM
abort => {
logProc: PROC [SkiPatrolLog.TransactionAbortInfo];
IF (logProc ← SkiPatrolLog.notice.abortTransaction) # NIL THEN
logProc[[
transID: c.transID,
where: "CoordinatorImpl.Finish",
why: reason,
message: Rope.Concat["Owner is ", owner]
]];
};
commit => {
logProc: PROC [SkiPatrolLog.TransactionCommitInfo];
IF (logProc ← SkiPatrolLog.notice.commitTransaction) # NIL THEN
logProc[[
transID: c.transID,
where: "CoordinatorImpl.Finish",
message: Rope.Concat["Owner is ", owner]
]];
(the owner name is given as a message because the transaction has already been unregistered, so the SkiPatrolLog routine can't get at the handle)
};
ENDCASE => NULL;
RETURN [outcome, IF newTransCoordinator = nullHandle THEN
nullTransID ELSE newTransCoordinator.transID]
};
FinishEntry: ENTRY PROC [
c: Handle,
requestedOutcome: AlpineTransaction.RequestedOutcome,
continue: BOOL]
RETURNS [AlpineTransaction.Outcome, Handle, AbortReason] = {
! (none);
newTransCoordinator: Handle ← nullHandle;
newTransID: TransID ← nullTransID;
reason: AbortReason ← didntAbort;
{
IF c.finishInProgress THEN {
wait for finish to complete, then return unknown
WHILE c.finishInProgress DO
WAIT finishComplete;
ENDLOOP;
RETURN [unknown, nullHandle, unknown];
};
c.finishInProgress ← TRUE;
SELECT c.state FROM
active => NULL;
normal--
collecting, completing => NULL;
FinishEntry has been called from recovery--
complete => GOTO Done;
FinishEntry has been called from two nearly simultaneous calls to Finish--
ENDCASE => ERROR;
IF requestedOutcome = abort THEN {
reason ← requested;
c.outcome ← abort;
};
IF continue THEN {
newTransCoordinator ← CreateWithWorkers[c];
newTransID ← newTransCoordinator.transID;
};
General two-phase commit protocol.
IF c.state = active THEN {
Log.Force[followingRecord: c.forceRecord];
c.state ← collecting;
};
c.state IN [collecting .. completing]
DO
Try to make progress toward c.state = complete.
noneActive: BOOLTRUE;
allComplete: BOOLTRUE;
Get results from previous calls, if any.
FOR w: Coordinator.WorkerHandle ← c.workers, w.rest UNTIL w = NIL DO
WITH w.first.resultsOfMostRecentCall SELECT FROM
n: Results.none => { };
p: Results.prepare => {
w.first.resultsOfMostRecentCall ← [none, none[]];
w.first.lastPrepareResult ← p;
SELECT p.communicationError FROM
none => {
w.first.communicationTrouble ← FALSE;
SELECT p.prepareResult FROM
notReady => { --vote no--
w.first.state ← complete;
IF c.outcome = commit THEN ERROR;
reason ← workerNotReady;
c.outcome ← abort;
};
readOnlyReady => { w.first.state ← complete };
ready => {
w.first.state ← ready;
c.aWorkerBecameReady ← TRUE;
};
ENDCASE => ERROR;
};
bindingFailed, callFailed, busy => { --vote no--
IF c.outcome = commit THEN ERROR;
reason ← commError;
c.outcome ← abort;
w.first.communicationTrouble ← TRUE;
w.first.timeForNextCall ← BasicTime.Update[
base: BasicTime.Now[], period: secondsWaitAfterCommFailure];
};
ENDCASE => ERROR;
};
f: Results.finish => {
w.first.resultsOfMostRecentCall ← [none, none[]];
w.first.lastFinishResult ← f;
SELECT f.communicationError FROM
none => {
w.first.communicationTrouble ← FALSE;
w.first.state ← complete;
};
bindingFailed, callFailed, busy => {
w.first.communicationTrouble ← TRUE;
w.first.timeForNextCall ← BasicTime.Update[
base: BasicTime.Now[], period: secondsWaitAfterCommFailure];
};
ENDCASE => ERROR;
};
ENDCASE => ERROR;
IF w.first.state = active THEN noneActive ← FALSE;
IF w.first.state # complete THEN allComplete ← FALSE;
ENDLOOP;
Make state transitions if possible.
IF c.state = collecting AND (c.outcome = abort OR noneActive) THEN {
c.state ← completing;
IF c.outcome = unknown THEN c.outcome ← commit;
LogCoordinatorCompleting[c: c, outcome: c.outcome,
force: c.outcome = abort OR c.aWorkerBecameReady];
};
IF c.state = completing AND allComplete THEN {
c.state ← complete;
LogCoordinatorComplete[c];
GOTO Done;
};
Issue new calls.
FOR w: Coordinator.WorkerHandle ← c.workers, w.rest UNTIL w = NIL DO
IF w.first.state # complete AND w.first.callInProgress = none AND
(NOT w.first.communicationTrouble OR IsTimeForNextCall[w]) THEN {
IF c.state = collecting AND w.first.state = active THEN {
CoordinatorInternal.PassParms[[prepare, c, w, newTransID, commit--ignored--]];
w.first.callInProgress ← prepare;
}
ELSE IF c.state = completing THEN {
CoordinatorInternal.PassParms[[finish, c, w, nullTransID--ignored--,
IF c.outcome = commit THEN commit ELSE abort]];
w.first.callInProgress ← finish;
};
};
ENDLOOP;
WAIT c.resultsReturned;
wake up when a result is returned or by timeout
ENDLOOP;-- monster DO
EXITS Done => {
c.finishInProgress ← FALSE;
CoordinatorMap.Unregister[c];
IF newTransCoordinator # nullHandle THEN
CoordinatorMap.Register[newTransCoordinator];
RETURN [c.outcome, newTransCoordinator, reason] }
}};--FinishEntry
finishComplete: CONDITION;
IsTimeForNextCall: PROC [w: Coordinator.WorkerHandle] RETURNS [BOOL] = INLINE {
RETURN [BasicTime.Period[from: w.first.timeForNextCall, to: BasicTime.Now[]] >= 0]
};
LogCoordinatorRegisterWorker: PROC [
c: Handle, w: AlpineImport.Handle] = {
followingRecord: Log.RecordID;
rec: Coordinator.RegisterWorkerLogRep ← [];
ConvertUnsafe.AppendRope[to: LOOPHOLE[LONG[@rec.worker]], from: w.Name];
[followingRecord: followingRecord] ← Log.CoordinatorWrite[c, coordinatorRegisterWorker,
[base: @rec, length: TEXT[rec.length].SIZE]];
IF NOT w.Equal[AlpineIdentity.myAlpineImportHandle] THEN
c.forceRecord ← followingRecord;
};
LogCoordinatorCompleting: PROC [c: Handle,
outcome: AlpineEnvironment.CommitOrAbort, force: BOOL] = {
rec: Coordinator.CompletingLogRep ← [outcome: outcome];
[followingRecord: c.forceRecord] ← Log.CoordinatorWrite[c, coordinatorCompleting,
[base: @rec, length: Coordinator.CompletingLogRep.SIZE], force];
};
LogCoordinatorComplete: PROC [c: Handle] = {
[followingRecord: c.forceRecord] ← Log.CoordinatorWrite[c, coordinatorComplete,
Log.nullBlock];
};
AnalyzeCoordinatorBegin: PROC [record: RecordID, type: RecordType, trans: TransID] = {
c: Handle = zone.NEW[Coordinator.Object ← [transID: trans, beginRecord: record]];
CoordinatorMap.Register[c];
CoordinatorInternal.NoticeCoordinatorBegin[trans];
};
AnalyzeCoordinatorRegisterWorker: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
worker: AlpineImport.Handle;
{ -- get worker name from log, map it into a AlpineImport.Handle.
rec: Coordinator.RegisterWorkerLogRep ← [];
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Coordinator.RegisterWorkerLogRep.SIZE]];
IF status = destinationFull THEN ERROR InternalProgrammingError;
worker ← AlpineImport.Register[
server: ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.worker]]]];
};
{ -- map trans into a coordinator handle, then add worker to volatile state
c: Handle = CoordinatorMap.GetHandle[trans];
l: Coordinator.WorkerHandle;
IF c = nullHandle THEN RETURN;
IF c.state # active THEN ERROR;
FOR l ← c.workers, l.rest UNTIL l = NIL DO
IF worker.Equal[l.first.worker] THEN ERROR InternalProgrammingError;
ENDLOOP;
l ← zone.CONS[[worker: worker], c.workers];
c.workers ← l
};
};
AnalyzeCoordinatorCompleting: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
outcome: AlpineEnvironment.CommitOrAbort;
{ -- get outcome from log
rec: Coordinator.CompletingLogRep;
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Coordinator.CompletingLogRep.SIZE]];
IF status # normal THEN ERROR InternalProgrammingError;
outcome ← rec.outcome;
};
{
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN;
IF c.state # active THEN ERROR InternalProgrammingError;
c.state ← completing;
c.outcome ← outcome;
IF c.outcome = commit THEN
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
l.first.state ← ready
ENDLOOP;
};
};
AnalyzeCoordinatorComplete: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN;
IF c.state # completing THEN ERROR InternalProgrammingError;
c.state ← complete;
CoordinatorMap.Unregister[c];
};
CallAfterAnalysisPass: PUBLIC PROC [] = {
CoordinatorControl.CallAfterAnalysisPass
CoordinatorInternal.InitTransIDGenerator[AlpineIdentity.myLogVolumeID];
};
CallAfterUpdatePass: PUBLIC PROC [] = {
CoordinatorControl.CallAfterUpdatePass
EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = {
requestedOutcome: AlpineTransaction.RequestedOutcome ← abort;
SELECT c.state FROM
complete => ERROR;
active, collecting => NULL;
completing => IF c.outcome = commit THEN requestedOutcome ← commit;
ENDCASE => ERROR;
Process.Detach[FORK CoordinatorFinishProcess[c, requestedOutcome]];
RETURN [stop: FALSE];
};
CoordinatorMap.LockedEnumerate[EnumProc];
};
CoordinatorFinishProcess: PROC [c: Handle,
requestedOutcome: AlpineTransaction.RequestedOutcome] = {
[] ← FinishEntry [c, requestedOutcome, FALSE];
};
CoordinatorCheckpointProc: PROC []
RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
oldestBeginRecord: Log.RecordID ← Log.lastRecordID;
EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = {
oldestBeginRecord ← LogInline.Min[c.beginRecord, oldestBeginRecord];
RETURN [stop: FALSE];
};
CoordinatorMap.UnlockedEnumerate[EnumProc];
RETURN [oldestBeginRecord, oldestBeginRecord];
};
LogControl.RegisterAnalysisProc[coordinatorBegin, AnalyzeCoordinatorBegin];
LogControl.RegisterAnalysisProc[coordinatorRegisterWorker, AnalyzeCoordinatorRegisterWorker];
LogControl.RegisterAnalysisProc[coordinatorCompleting, AnalyzeCoordinatorCompleting];
LogControl.RegisterAnalysisProc[coordinatorComplete, AnalyzeCoordinatorComplete];
LogControl.RegisterCheckpointProc[CoordinatorCheckpointProc];
Process.SetTimeout[@finishComplete, Process.SecondsToTicks[5]];
Process.EnableAborts[@finishComplete];
END.
CHANGE LOG.
Edited June 1984, by Kupfer
changes to: Create: record the RName of the user creating the transaction. Add event logging for transaction starts.
Edited on July 1, 1984 4:11:19 pm PDT, by Kupfer
Be sure to event-log transactions which are created by Finish[]ing old one. Also, remember to record the RName for transactions that are created that way.
Edited on July 25, 1984 9:22:20 am PDT, by Kupfer
Install new SkiPatrolLog probes and reflect the new version of SkiPatrolLog. Also, be more rigorous about the possible ERRORs that CreateWorker might return. Also, make maxCoordinators really refer to CoordinatorInternal.
changes to: CoordinatorImpl, Create, FinishEntry, Finish, AbortReason
Edited on August 6, 1984 3:04:59 pm PDT, by Kupfer
(1) Let the SkiPatrolLog routine get the user RName from the transaction ID, where possible.
(2) Remove the possible race condition in SkiPatrolLog probes by assigning the PROC to a temporary variable.
changes to: Create, Finish, CoordinatorImpl