DIRECTORY
AlpineEnvironment,
AlpineIdentity USING [myLogVolumeID, myAlpineImportHandle, myFileStore],
AlpineImport,
AlpineInternal,
AlpineTransaction,
AlpineTransMgr,
AlpineTransMgrRpcControl,
BasicTime,
ClientMap,
ConcreteTransID,
ConvertUnsafe,
Coordinator,
CoordinatorControl,
CoordinatorExtras USING [Info],
CoordinatorInternal,
CoordinatorMap,
Log,
LogControl,
LogInline,
Process USING [Detach, EnableAborts, SetTimeout, SecondsToTicks, Ticks],
Rope USING [Cat, Concat, ROPE],
RPC USING [Conversation, GetCaller],
SafeStorage,
SkiPatrolLog USING [AbortReason, notice, TransactionAbortInfo, TransactionBeginInfo, TransactionCommitInfo, OpFailureInfo];
CoordinatorImpl:
MONITOR
LOCKS c
USING c: Handle
IMPORTS
AlpineIdentity,
AlpineImport,
AlpineTransaction,
BasicTime,
ClientMap,
ConvertUnsafe,
CoordinatorInternal,
CoordinatorMap,
Log,
LogControl,
LogInline,
Process,
Rope,
RPC,
SafeStorage,
SkiPatrolLog
EXPORTS
AlpineInternal,
AlpineTransaction,
AlpineTransMgr,
CoordinatorControl
= BEGIN
Conversation: TYPE = AlpineEnvironment.Conversation;
TransID: TYPE = AlpineEnvironment.TransID;
nullTransID: TransID = AlpineEnvironment.nullTransID;
RecordID: TYPE = Log.RecordID;
RecordType: TYPE = Log.RecordType;
Results: TYPE = Coordinator.Results;
InternalProgrammingError: ERROR = CODE;
Handle: TYPE = Coordinator.Handle;
nullHandle: Handle = Coordinator.nullHandle;
AbortReason: TYPE = SkiPatrolLog.AbortReason;
CoordinatorObject:
PUBLIC
TYPE = Coordinator.Object;
AlpineInternal.CoordinatorObject
secondsWaitAfterCommFailure: INT = CoordinatorInternal.SecondsWaitAfterCommFailure;
timeoutForResultsReturned: Process.Ticks =
Process.SecondsToTicks[CoordinatorInternal.SecondsTimeoutForResultsReturned];
maxCoordinators: INT ← CoordinatorInternal.MaxCoordinators;
zone: ZONE = SafeStorage.GetSystemZone[];
Create:
PUBLIC
PROC [conversation: Conversation, createLocalWorker:
BOOL]
RETURNS [transID: AlpineTransaction.TransID] = {
! AlpineTransaction.OperationFailed[why: busy];
AlpineTransaction.Create.
Called by any Alpine client.
c: Handle;
message: Rope.ROPE;
logBeginProc: PROC [SkiPatrolLog.TransactionBeginInfo]; -- (used to prevent race condition)
{IF CoordinatorMap.Count[] >= maxCoordinators
THEN {
message ← "hit limit on coordinators";
GOTO serverBusy;
};
c ← ConsCoordinator[];
c.extras ← NEW[CoordinatorExtras.Info];
c.extras.userRName ← RPC.GetCaller[conversation];
CoordinatorMap.Register[c];
IF createLocalWorker
THEN
As an agent of the client who called us, create a worker. Under the current implementation (coordinator and worker always on the same machine), we really oughta know our own RName and the current transaction ID. However, in the glorious future, it is possible that the (remote) server won't know about the transaction yet (or that the server crashed), and it's possible that Grapevine is too busy to tell the server our RName. So if Unknown is raised, convert it to a server-busy error, but record what went wrong via SkiPatrolLog.
AlpineTransaction.CreateWorker[conversation, c.transID, AlpineIdentity.myFileStore !
AlpineTransaction.Unknown => {
message ←
SELECT what
FROM
transID => "worker didn't recognize transID",
coordinator => "worker doesn't know who we are"
ENDCASE => ERROR;
GOTO serverBusy
};
AlpineTransaction.OperationFailed[busy] => {
message ← "RPC returned `busy'";
GOTO serverBusy
}
];
IF (logBeginProc ← SkiPatrolLog.notice.beginTransaction) #
NIL
THEN
logBeginProc[[
transID: c.transID,
where: "CoordinatorImpl.Create",
message: ""
]];
RETURN [c.transID];
EXITS
serverBusy => {
logProc: PROC [SkiPatrolLog.OpFailureInfo];
IF (logProc ← SkiPatrolLog.notice.operationFailed) #
NIL
THEN
logProc[[
what: busy,
where: "CoordinatorImpl.Create",
message: message.Cat["; user = ", RPC.GetCaller[conversation]]
]];
ERROR AlpineTransaction.OperationFailed[why: busy];
}
}
};
ConsCoordinator:
PROC []
RETURNS [Handle] = {
newTrans: Handle = zone.NEW[Coordinator.Object ← []];
Process.SetTimeout[@newTrans.resultsReturned, timeoutForResultsReturned];
Process.EnableAborts[@newTrans.resultsReturned];
CoordinatorInternal.NextTransID[newTrans]; --writes coordinatorBegin
RETURN [newTrans] };
RegisterWorker:
PUBLIC
PROC [
conversation: Conversation, trans: TransID]
RETURNS [AlpineTransMgr.RegisterWorkerResult] = {
! (none);
AlpineTransMgr.RegisterWorker.
The caller is a worker (identity obtainable via "conversation") with no monitors locked.
RegisterWorkerEntry:
ENTRY
PROC [c: Handle]
RETURNS [AlpineTransMgr.RegisterWorkerResult] = {
c # nullHandle.
worker: AlpineImport.Handle;
IF c.state # active THEN RETURN [transNotActive];
worker ← AlpineImport.Register[ClientMap.GetName[conversation]];
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest
UNTIL l =
NIL
DO
IF worker.Equal[l.first.worker] THEN RETURN [duplicateCall];
ENDLOOP;
ConsWorker[c, worker];
RETURN [ok];
};--RegisterWorkerEntry
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN [transNotActive];
RETURN [RegisterWorkerEntry[c]];
};
ConsWorker:
PROC [c: Handle, w: AlpineImport.Handle] = {
l: Coordinator.WorkerHandle ← zone.CONS[[worker: w], c.workers];
c.workers ← l;
LogCoordinatorRegisterWorker[c, w];
};
CreateWithWorkers:
INTERNAL
PROC [c: Handle]
RETURNS [Handle] = {
Creates a new transaction, and simulates RegisterWorker for each worker of existing transaction c. Returns the new transaction, without registering it.
newTrans: Handle = ConsCoordinator[];
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest
UNTIL l =
NIL
DO
ConsWorker[newTrans, l.first.worker];
ENDLOOP;
RETURN [newTrans]
};
Finish:
PUBLIC
PROC [
conversation: RPC.Conversation, transID: AlpineTransaction.TransID,
requestedOutcome: AlpineTransaction.RequestedOutcome,
continue: BOOL]
RETURNS [outcome: AlpineTransaction.Outcome, newTrans: AlpineTransaction.TransID] = {
! (none) (but may return "unknown" as a result.)
AlpineTransaction.Finish
newTransCoordinator: Handle;
c: Handle = CoordinatorMap.GetHandle[trans: transID];
reason: AbortReason; -- reason for aborting the transaction
owner: Rope.ROPE; -- owner of the to-be-finished transaction
IF c = nullHandle THEN RETURN [unknown, nullTransID];
owner ← c.extras.userRName;
IF requestedOutcome = abort THEN continue ← FALSE;
[outcome, newTransCoordinator, reason] ← FinishEntry[c, requestedOutcome, continue];
If we asked to continue the transaction but it aborted on us, we have to clean up (well, I think that's what happens next). I guess this means that logged transaction starts might occasionally skip transaction ID's.
IF continue
AND outcome = abort
AND newTransCoordinator # nullHandle THEN {
[] ← FinishEntry[newTransCoordinator, abort, --continue:-- FALSE];
newTransCoordinator ← nullHandle };
If we really did create a new transaction, then record the user name in the coordinator object and note the new transaction (if the probe is enabled).
IF newTransCoordinator # nullHandle
THEN {
logProc: PROC [SkiPatrolLog.TransactionBeginInfo];
newTransCoordinator.extras ← NEW[CoordinatorExtras.Info];
newTransCoordinator.extras.userRName ← RPC.GetCaller[conversation];
IF (logProc ← SkiPatrolLog.notice.beginTransaction) #
NIL
THEN
logProc[[
transID: newTransCoordinator.transID,
where: "CoordinatorImpl.Finish",
message: ""
]];
};
SELECT outcome
FROM
abort => {
logProc: PROC [SkiPatrolLog.TransactionAbortInfo];
IF (logProc ← SkiPatrolLog.notice.abortTransaction) #
NIL
THEN
logProc[[
transID: c.transID,
where: "CoordinatorImpl.Finish",
why: reason,
message: Rope.Concat["Owner is ", owner]
]];
};
commit => {
logProc: PROC [SkiPatrolLog.TransactionCommitInfo];
IF (logProc ← SkiPatrolLog.notice.commitTransaction) #
NIL
THEN
logProc[[
transID: c.transID,
where: "CoordinatorImpl.Finish",
message: Rope.Concat["Owner is ", owner]
]];
(the owner name is given as a message because the transaction has already been unregistered, so the SkiPatrolLog routine can't get at the handle)
};
ENDCASE => NULL;
RETURN [outcome,
IF newTransCoordinator = nullHandle
THEN
nullTransID ELSE newTransCoordinator.transID]
};
FinishEntry:
ENTRY
PROC [
c: Handle,
requestedOutcome: AlpineTransaction.RequestedOutcome,
continue: BOOL]
RETURNS [AlpineTransaction.Outcome, Handle, AbortReason] = {
! (none);
newTransCoordinator: Handle ← nullHandle;
newTransID: TransID ← nullTransID;
reason: AbortReason ← didntAbort;
{
IF c.finishInProgress
THEN {
wait for finish to complete, then return unknown
WHILE c.finishInProgress
DO
WAIT finishComplete;
ENDLOOP;
RETURN [unknown, nullHandle, unknown];
};
c.finishInProgress ← TRUE;
SELECT c.state
FROM
collecting, completing =>
NULL;
FinishEntry has been called from recovery--
complete =>
GOTO Done;
FinishEntry has been called from two nearly simultaneous calls to Finish--
ENDCASE => ERROR;
IF requestedOutcome = abort
THEN {
reason ← requested;
c.outcome ← abort;
};
IF continue
THEN {
newTransCoordinator ← CreateWithWorkers[c];
newTransID ← newTransCoordinator.transID;
};
General two-phase commit protocol.
IF c.state = active
THEN {
Log.Force[followingRecord: c.forceRecord];
c.state ← collecting;
};
c.state IN [collecting .. completing]
DO
Try to make progress toward c.state = complete.
noneActive: BOOL ← TRUE;
allComplete: BOOL ← TRUE;
Get results from previous calls, if any.
FOR w: Coordinator.WorkerHandle ← c.workers, w.rest
UNTIL w =
NIL
DO
WITH w.first.resultsOfMostRecentCall
SELECT
FROM
n: Results.none => { };
p: Results.prepare => {
w.first.resultsOfMostRecentCall ← [none, none[]];
w.first.lastPrepareResult ← p;
SELECT p.communicationError
FROM
none => {
w.first.communicationTrouble ← FALSE;
SELECT p.prepareResult
FROM
notReady => {
--vote no--
w.first.state ← complete;
IF c.outcome = commit THEN ERROR;
reason ← workerNotReady;
c.outcome ← abort;
};
readOnlyReady => { w.first.state ← complete };
ready => {
w.first.state ← ready;
c.aWorkerBecameReady ← TRUE;
};
ENDCASE => ERROR;
};
bindingFailed, callFailed, busy => {
--vote no--
IF c.outcome = commit THEN ERROR;
reason ← commError;
c.outcome ← abort;
w.first.communicationTrouble ← TRUE;
w.first.timeForNextCall ← BasicTime.Update[
base: BasicTime.Now[], period: secondsWaitAfterCommFailure];
};
ENDCASE => ERROR;
};
f: Results.finish => {
w.first.resultsOfMostRecentCall ← [none, none[]];
w.first.lastFinishResult ← f;
SELECT f.communicationError
FROM
none => {
w.first.communicationTrouble ← FALSE;
w.first.state ← complete;
};
bindingFailed, callFailed, busy => {
w.first.communicationTrouble ← TRUE;
w.first.timeForNextCall ← BasicTime.Update[
base: BasicTime.Now[], period: secondsWaitAfterCommFailure];
};
ENDCASE => ERROR;
};
ENDCASE => ERROR;
IF w.first.state = active THEN noneActive ← FALSE;
IF w.first.state # complete THEN allComplete ← FALSE;
ENDLOOP;
Make state transitions if possible.
IF c.state = collecting
AND (c.outcome = abort
OR noneActive)
THEN {
c.state ← completing;
IF c.outcome = unknown THEN c.outcome ← commit;
LogCoordinatorCompleting[c: c, outcome: c.outcome,
force: c.outcome = abort OR c.aWorkerBecameReady];
};
IF c.state = completing
AND allComplete
THEN {
c.state ← complete;
LogCoordinatorComplete[c];
GOTO Done;
};
Issue new calls.
FOR w: Coordinator.WorkerHandle ← c.workers, w.rest
UNTIL w =
NIL
DO
IF w.first.state # complete
AND w.first.callInProgress = none
AND
(NOT w.first.communicationTrouble OR IsTimeForNextCall[w]) THEN {
IF c.state = collecting
AND w.first.state = active
THEN {
CoordinatorInternal.PassParms[[prepare, c, w, newTransID, commit--ignored--]];
w.first.callInProgress ← prepare;
}
ELSE
IF c.state = completing
THEN {
CoordinatorInternal.PassParms[[finish, c, w, nullTransID
--ignored--,
IF c.outcome = commit THEN commit ELSE abort]];
w.first.callInProgress ← finish;
};
};
ENDLOOP;
WAIT c.resultsReturned;
wake up when a result is returned or by timeout
ENDLOOP;-- monster DO
EXITS Done => {
c.finishInProgress ← FALSE;
CoordinatorMap.Unregister[c];
IF newTransCoordinator # nullHandle
THEN
CoordinatorMap.Register[newTransCoordinator];
RETURN [c.outcome, newTransCoordinator, reason] }
}};--FinishEntry
finishComplete: CONDITION;
IsTimeForNextCall:
PROC [w: Coordinator.WorkerHandle]
RETURNS [
BOOL] =
INLINE {
RETURN [BasicTime.Period[from: w.first.timeForNextCall, to: BasicTime.Now[]] >= 0]
};
LogCoordinatorRegisterWorker:
PROC [
c: Handle, w: AlpineImport.Handle] = {
followingRecord: Log.RecordID;
rec: Coordinator.RegisterWorkerLogRep ← [];
ConvertUnsafe.AppendRope[to: LOOPHOLE[LONG[@rec.worker]], from: w.Name];
[followingRecord: followingRecord] ← Log.CoordinatorWrite[c, coordinatorRegisterWorker,
[base: @rec, length: TEXT[rec.length].SIZE]];
IF
NOT w.Equal[AlpineIdentity.myAlpineImportHandle]
THEN
c.forceRecord ← followingRecord;
};
LogCoordinatorCompleting:
PROC [c: Handle,
outcome: AlpineEnvironment.CommitOrAbort, force: BOOL] = {
rec: Coordinator.CompletingLogRep ← [outcome: outcome];
[followingRecord: c.forceRecord] ← Log.CoordinatorWrite[c, coordinatorCompleting,
[base: @rec, length: Coordinator.CompletingLogRep.SIZE], force];
};
LogCoordinatorComplete:
PROC [c: Handle] = {
[followingRecord: c.forceRecord] ← Log.CoordinatorWrite[c, coordinatorComplete,
Log.nullBlock];
};
AnalyzeCoordinatorBegin:
PROC [record: RecordID, type: RecordType, trans: TransID] = {
c: Handle = zone.NEW[Coordinator.Object ← [transID: trans, beginRecord: record]];
CoordinatorMap.Register[c];
CoordinatorInternal.NoticeCoordinatorBegin[trans];
};
AnalyzeCoordinatorRegisterWorker:
PROC [
record: RecordID, type: RecordType, trans: TransID] = {
worker: AlpineImport.Handle;
{ -- get worker name from log, map it into a AlpineImport.Handle.
rec: Coordinator.RegisterWorkerLogRep ← [];
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Coordinator.RegisterWorkerLogRep.SIZE]];
IF status = destinationFull THEN ERROR InternalProgrammingError;
worker ← AlpineImport.Register[
server: ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.worker]]]];
};
{ -- map trans into a coordinator handle, then add worker to volatile state
c: Handle = CoordinatorMap.GetHandle[trans];
l: Coordinator.WorkerHandle;
IF c = nullHandle THEN RETURN;
IF c.state # active THEN ERROR;
FOR l ← c.workers, l.rest
UNTIL l =
NIL
DO
IF worker.Equal[l.first.worker] THEN ERROR InternalProgrammingError;
ENDLOOP;
l ← zone.CONS[[worker: worker], c.workers];
c.workers ← l
};
};
AnalyzeCoordinatorCompleting:
PROC [
record: RecordID, type: RecordType, trans: TransID] = {
outcome: AlpineEnvironment.CommitOrAbort;
{ -- get outcome from log
rec: Coordinator.CompletingLogRep;
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Coordinator.CompletingLogRep.SIZE]];
IF status # normal THEN ERROR InternalProgrammingError;
outcome ← rec.outcome;
};
{
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN;
IF c.state # active THEN ERROR InternalProgrammingError;
c.state ← completing;
c.outcome ← outcome;
IF c.outcome = commit
THEN
FOR l: Coordinator.WorkerHandle ← c.workers, l.rest
UNTIL l =
NIL
DO
l.first.state ← ready
ENDLOOP;
};
};
AnalyzeCoordinatorComplete:
PROC [
record: RecordID, type: RecordType, trans: TransID] = {
c: Handle = CoordinatorMap.GetHandle[trans];
IF c = nullHandle THEN RETURN;
IF c.state # completing THEN ERROR InternalProgrammingError;
c.state ← complete;
CoordinatorMap.Unregister[c];
};
CallAfterAnalysisPass:
PUBLIC
PROC [] = {
CoordinatorControl.CallAfterAnalysisPass
CoordinatorInternal.InitTransIDGenerator[AlpineIdentity.myLogVolumeID];
};
CallAfterUpdatePass:
PUBLIC
PROC [] = {
CoordinatorControl.CallAfterUpdatePass
EnumProc:
PROC [c: Handle]
RETURNS [stop:
BOOL] = {
requestedOutcome: AlpineTransaction.RequestedOutcome ← abort;
SELECT c.state
FROM
complete => ERROR;
active, collecting => NULL;
completing => IF c.outcome = commit THEN requestedOutcome ← commit;
ENDCASE => ERROR;
Process.Detach[FORK CoordinatorFinishProcess[c, requestedOutcome]];
RETURN [stop: FALSE];
};
CoordinatorMap.LockedEnumerate[EnumProc];
};
CoordinatorFinishProcess:
PROC [c: Handle,
requestedOutcome: AlpineTransaction.RequestedOutcome] = {
[] ← FinishEntry [c, requestedOutcome, FALSE];
};
CoordinatorCheckpointProc:
PROC []
RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
oldestBeginRecord: Log.RecordID ← Log.lastRecordID;
EnumProc:
PROC [c: Handle]
RETURNS [stop:
BOOL] = {
oldestBeginRecord ← LogInline.Min[c.beginRecord, oldestBeginRecord];
RETURN [stop: FALSE];
};
CoordinatorMap.UnlockedEnumerate[EnumProc];
RETURN [oldestBeginRecord, oldestBeginRecord];
};
LogControl.RegisterAnalysisProc[coordinatorBegin, AnalyzeCoordinatorBegin];
LogControl.RegisterAnalysisProc[coordinatorRegisterWorker, AnalyzeCoordinatorRegisterWorker];
LogControl.RegisterAnalysisProc[coordinatorCompleting, AnalyzeCoordinatorCompleting];
LogControl.RegisterAnalysisProc[coordinatorComplete, AnalyzeCoordinatorComplete];
LogControl.RegisterCheckpointProc[CoordinatorCheckpointProc];
Process.SetTimeout[@finishComplete, Process.SecondsToTicks[5]];
Process.EnableAborts[@finishComplete];
END.
CHANGE LOG.
Edited June 1984, by Kupfer
changes to: Create: record the RName of the user creating the transaction. Add event logging for transaction starts.
Edited on July 1, 1984 4:11:19 pm PDT, by Kupfer
Be sure to event-log transactions which are created by Finish[]ing old one. Also, remember to record the RName for transactions that are created that way.
Edited on July 25, 1984 9:22:20 am PDT, by Kupfer
Install new SkiPatrolLog probes and reflect the new version of SkiPatrolLog. Also, be more rigorous about the possible ERRORs that CreateWorker might return. Also, make maxCoordinators really refer to CoordinatorInternal.
changes to: CoordinatorImpl, Create, FinishEntry, Finish, AbortReason
Edited on August 6, 1984 3:04:59 pm PDT, by Kupfer
(1) Let the SkiPatrolLog routine get the user RName from the transaction ID, where possible.
(2) Remove the possible race condition in SkiPatrolLog probes by assigning the PROC to a temporary variable.
changes to: Create, Finish, CoordinatorImpl