WorkerImpl.mesa
Copyright © 1984 by Xerox Corporation. All rights reserved.
Implements worker in two-phase commit: create, prepare, finish (commit or abort.)
Last edited by
Kolling on June 6, 1983 12:05 pm
MBrown on January 30, 1984 12:12:43 pm PST
Kupfer on August 6, 1984 3:41:38 pm PDT
NOTES:
It is poor to land in debugger due to RPC.CallFailed[protocolError].
The implementation of AbortUnilaterally should be improved by making a remote call to the coordinator, if the coordinator is remote.
DIRECTORY
AccessControl,
AlpineEnvironment,
AlpineIdentity,
AlpineImport,
AlpineInternal,
AlpineTransaction,
AlpineTransMgr,
AlpineTransMgrRpcControl,
BasicTime,
ClientMap USING [GetName],
ConversationTable,
ConvertUnsafe,
FileControl,
FilePageMgr,
Lock,
LockControl,
Log,
LogControl,
LogInline,
Process,
Rope,
RPC,
SafeStorage,
SkiPatrolHooks USING [ShouldAbort],
SkiPatrolLog USING [LockConflictInfo, notice, TransactionAbortInfo],
TransactionMap,
Worker,
WorkerControl,
WorkerInternal;
WorkerImpl: -- CEDAR ? -- MONITOR LOCKS self USING self: Handle
IMPORTS
AccessControl,
AlpineIdentity,
AlpineImport,
AlpineTransaction,
BasicTime,
ClientMap,
ConversationTable,
ConvertUnsafe,
FileControl,
FilePageMgr,
Lock,
LockControl,
Log,
LogControl,
LogInline,
Process,
Rope,
RPC,
SafeStorage,
SkiPatrolLog,
TransactionMap,
WorkerInternal
EXPORTS
AlpineTransaction,
AlpineTransMgr,
AlpineInternal,
SkiPatrolHooks,
TransactionMap,
WorkerControl,
WorkerInternal
= BEGIN
Conversation: TYPE = AlpineEnvironment.Conversation;
TransID: TYPE = AlpineEnvironment.TransID;
nullTransID: TransID = AlpineEnvironment.nullTransID;
FileStore: TYPE = AlpineEnvironment.FileStore;
FileInstanceHandle: TYPE = AlpineInternal.FileInstanceHandle;
RecordID: TYPE = Log.RecordID;
RecordType: TYPE = Log.RecordType;
InternalProgrammingError: ERROR = CODE;
Handle: TYPE = Worker.Handle;
nullHandle: Handle = Worker.nullHandle;
TransObject: PUBLIC TYPE = Worker.Object;
AlpineInternal.TransObject
Refused: PUBLIC ERROR [why: AlpineTransMgr.Refusal] = CODE;
AlpineTransMgr.Refused.
zone: ZONE = SafeStorage.GetSystemZone[];
disableLocalCoordinatorOptimization: BOOLFALSE;
When TRUE, worker always forces log as if coordinator were remote.
shortWaitOver: CONDITION;
Client waits here under various unlikely circumstances involving concurrent calls to CreateWorker, PrepareWorker, and FinishWorker, concurrent calls to worker actions that use exclusive mode StartWork, and excessive concurrency among other worker actions. The wait times out after Worker.shortWaitTime milliseconds, allowing the situation to be reevaluated. Nobody notifies shortWaitOver.
Parameters to worker log watchdog:
densityFactor: INT ← 64;
if self.estimatedUpdateCost*densityFactor is larger than the amount of log tied up by this transaction, then the transaction is considered too expensive to abort (unless it is inactive or its log use exceeeds logWordsUsableByClient).
longTimeSinceLastStartWork: INT ← 1000; -- seconds (about 15 minutes)
if the time since last start work is greater than this, the the transaction is considered inactive.
logWordsUsableByClient: INT;
absolute upper bound on of log tied up by a transaction.
maxLogWordsForFastRestart: INT = 3000*LONG[AlpineEnvironment.wordsPerPage];
fast restart means two minutes: two passes over 3000 pages at 50 pages/sec.
maxLogWordsForWorker: INT;
A worker may comfortably keep this much log active. Beyond this bound, the worker may be aborted (but need not be). These two values are initialized during recovery (based on the log length) and not changed thereafter (except perhaps from the debugger).
Worker-related procs
CreateWorker: PUBLIC PROC [
conversation: Conversation, transID: TransID, coordinator: FileStore] = {
! Unknown {coordinator, transID}, OperationFailed {busy};
AlpineTransaction.CreateWorker.
Called by a client.
The worker object monitor is not held during the call to AlpineCoordinator.RegisterWorker. This was designed to allow AlpineCoordinator.Finish to hold the coordinator object monitor during its calls to workers. (AlpineCoordinator.Finish does not actually work this way).
self: Handle;
alreadyRegistered: BOOLFALSE;
whyCallFailed: RPC.CallFailure;
registerWorkerResult: AlpineTransMgr.RegisterWorkerResult;
alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord;
conversationOut: Conversation;
IF TransactionMap.GetHandle[transID] # nullHandle THEN alreadyRegistered ← TRUE
ELSE self ← ConsWorker[transID, AlpineImport.Register[coordinator]];
IF alreadyRegistered OR TransactionMap.Register[self].alreadyRegistered THEN {
Drop self on the floor since transID is already registered. We are not allowed to call RegisterWorker; some other process is doing it. Wait for self.state # unknown, then return result.
self ← TransactionMap.GetHandle[transID];
IF self = nullHandle OR WaitUntilNotUnknown[self].workerNotActive THEN
ERROR AlpineTransaction.Unknown[transID];
}
ELSE {
We registered the volatile worker, so we are responsible to call RegisterWorker and update volatile and permanent data structures to reflect the result.
We cannot return or error until self.state # unknown, lest another process hang in WaitUntilNotUnknown.
conversationOut ← ConversationTable.Fetch[self.coordinator];
alpineTransMgr ← AlpineImport.GetTransMgrInterface[self.coordinator];
IF conversationOut = NIL OR alpineTransMgr = NIL THEN GOTO noCoordinator;
registerWorkerResult ← alpineTransMgr.RegisterWorker[conversationOut, transID !
RPC.CallFailed => CHECKED { whyCallFailed ← why; GOTO callFailed } ];
EndCreateWorker[self, IF registerWorkerResult = ok THEN create ELSE abort];
EXITS
noCoordinator => {
EndCreateWorker[self, abort];
ERROR AlpineTransaction.Unknown[coordinator] };
callFailed => {
EndCreateWorker[self, abort];
SELECT whyCallFailed FROM
timeout, unbound => {
self.coordinator.TransMgrInterfaceCallFailed[alpineTransMgr];
alpineTransMgr ← NIL; --must come after CallFailed is unwound
ERROR AlpineTransaction.Unknown[coordinator] };
busy =>
ERROR AlpineTransaction.OperationFailed[busy];
runtimeProtocol, stubProtocol => ERROR;
ENDCASE
}
};
};
ConsWorker: PROC [trans: TransID, coordinator: AlpineImport.Handle]
RETURNS [Handle] = {
Note: does not establish the locks field of the Worker.Object.
RETURN [zone.NEW[Worker.Object ← [
transID: trans,
timeOfLastStartWork: BasicTime.Now[],
coordinator: coordinator,
coordinatorIsRemote: disableLocalCoordinatorOptimization OR
NOT coordinator.Equal[AlpineIdentity.myAlpineImportHandle]]]];
};
EndCreateWorker: ENTRY PROC [self: Handle, outcome: {create, abort} ] = {
self is registered in TransactionMap, in unknown state (no WorkerBegin written).
IF self.state # unknown THEN ERROR;
IF outcome = create THEN {
LogWorkerBegin[self];
self.state ← active;
IF self.locks = NIL THEN self.locks ← LockControl.ConsTransHeader[self];
}
ELSE {
Must change state from unknown since another process in WaitUntilNotUnknown may be waiting for this event.
self.state ← complete;
self.outcome ← abort;
TransactionMap.Unregister[self];
};
};
WaitUntilNotUnknown: ENTRY PROC [self: Handle]
RETURNS [workerNotActive: BOOL] = {
WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP;
RETURN [self.state # active];
};
WorkerPrepare: PUBLIC PROC [
conversation: Conversation, trans: TransID,
newTrans: AlpineEnvironment.TransID]
RETURNS [AlpineTransMgr.WorkerState] = {
! Refused {wrongCoordinator}
AlpineTransMgr.WorkerPrepare.
Called by the coordinator of "trans".
This procedure is prepared for duplicate calls. Extra callers wait on the global condition variable prepareFinished.
stateAfterPrepare: AlpineEnvironment.WorkerState;
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN [notReady];
IF NOT Rope.Equal[s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN ERROR Refused[wrongCoordinator];
SELECT BeginPrepare[self] FROM
preparing => NULL; -- normal case
ready => RETURN [ready]; -- duplicate call
ENDCASE => RETURN [notReady]; -- duplicate call
We have caused self to enter the preparing state.
IF newTrans # nullTransID THEN {
Perform specialized version of CreateWorker.
self.continueWorker ← ConsWorker[newTrans, self.coordinator];
IF TransactionMap.Register[self.continueWorker].alreadyRegistered THEN ERROR;
The newTrans is already known to us. This probably represents an error in the coordinator.
};
{
IF PreventStartWork[self: self, allowDifficulty: normal].abortThisTrans THEN GOTO abort;
Call any external caches now; they are allowed to do read/write pages.
AccessControl.PhaseOneSpaceAndOwnerChanges[self !
AccessControl.LockFailed, AccessControl.Unknown => GOTO abort];
IF PreventStartWork[self: self, allowDifficulty: zero].abortThisTrans THEN GOTO abort;
Any call to StartWork for this transaction will now fail.
There is no work in progress, including waiting lock requests. We therefore have exclusive access without entering the monitor.
stateAfterPrepare ← ReadyWorker[self];
EXITS
abort => {
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
stateAfterPrepare ← notReady;
};
};
EndPrepare[self, stateAfterPrepare];
We have caused self to enter either the ready state or the completing state.
SELECT stateAfterPrepare FROM
notReady => NormalAbortWorker[self];
readOnlyReady => NormalCommitWorker[self];
ready => NULL;
ENDCASE;
RETURN [stateAfterPrepare]
};
BeginPrepare: ENTRY PROC [self: Handle]
RETURNS [Worker.State] = {
result is # unknown, active.
result is = preparing iff this call caused the transition to enter the preparing state.
DO
SELECT self.state FROM
unknown, preparing => WAIT shortWaitOver;
ENDCASE => EXIT;
ENDLOOP;
IF self.state = active THEN self.state ← preparing;
RETURN [self.state];
};
ReadyWorker: PROC [self: Handle]
RETURNS [AlpineEnvironment.WorkerState] = {
Called during normal operation and during recovery.
The caller has exclusive access to the transaction self, is not holding the monitor.
lockRoutine: Rope.ROPE;  -- (name of LockControl routine called)
howFailed: AlpineEnvironment.LockFailure;
readOnly: BOOL;
{readOnly ← FileControl.CommitPhase1[self !
Lock.Failed => {
lockRoutine ← "WorkerImpl.ReadyWorker (calling CommitPhase1)";
howFailed ← why;
GOTO lockProblem
};
Lock.TransAborting => GOTO abort];
LockControl.UpgradeLocks[self !
Lock.Failed => {
lockRoutine ← "WorkerImpl.ReadyWorker (calling UpgradeLocks)";
howFailed ← why;
GOTO lockProblem
};
Lock.TransAborting => GOTO abort];
RETURN [IF readOnly THEN readOnlyReady ELSE ready];
EXITS
lockProblem => {
logProc: PROC [SkiPatrolLog.LockConflictInfo];
IF (logProc ← SkiPatrolLog.notice.lockConflict) # NIL THEN
logProc[[
what: howFailed,
where: lockRoutine,
transID: self.transID,
mode: none,
message: "(sigh) attempted locking mode not known"
]];
RETURN [notReady];
};
abort => RETURN [notReady];
}};
EndPrepare: ENTRY PROC [self: Handle,
stateAfterPrepare: AlpineEnvironment.WorkerState] = {
SELECT stateAfterPrepare FROM
notReady => {
LogWorkerCompleting[self: self, outcome: abort, force: FALSE];
self.state ← completing;
self.outcome ← abort;
};
readOnlyReady => {
LogWorkerCompleting[self: self, outcome: readOnly, force: FALSE];
self.state ← completing;
self.outcome ← commit;
};
ready => {
LogWorkerReady[self: self, force: self.coordinatorIsRemote];
self.state ← ready;
};
ENDCASE => ERROR;
};
WorkerFinish: PUBLIC PROC [
conversation: Conversation, trans: TransID,
requiredOutcome: AlpineTransMgr.RequiredOutcome] = {
! Refused {wrongCoordinator, notReady};
AlpineTransMgr.WorkerFinish
Called by the coordinator of "trans".
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN; --we must have already finished.
IF NOT Rope.Equal[
s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN
ERROR Refused[wrongCoordinator];
IF requiredOutcome NOT IN [abort .. commit] THEN ERROR;
LocalWorkerFinish[self, requiredOutcome, clientRequest];
};
AbortUnilaterally: PUBLIC PROC [
self: Handle, why: TransactionMap.AbortReason] = {
TransactionMap.AbortUnilaterally
Note some duplicate logic with LocalWorkerFinish.
SELECT BeginFinish[self, abort, returnIfPreparing] FROM
active => ERROR Refused [notReady];
preparing => {
We cannot determine self's outcome, and are not responsible for making it complete, but we are responsible for ensuring that it leaves the preparing state. Otherwise it might stay there forever, waiting in the lock manager. This loop is really necessary, since neither process involved is synchronized by self's or Lock's monitors.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForState[self] # preparing THEN RETURN;
ENDLOOP;
};
completing => {
We have caused self to enter the completing state, self.outcome = abort.
Hence we are responsible for making it complete.
IF PreventStartWork[self, zero].abortThisTrans THEN {
Some work is still in progress. Wait for it to go away.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
};
NormalAbortWorker[self];
temporary kludge, effective since remote coordinators aren't used now.
[] ← AlpineTransaction.Finish[
AlpineIdentity.myLocalConversation, self.transID, abort, FALSE];
RETURN;
};
ENDCASE => RETURN;
};
ShortWaitForState: ENTRY PROC [self: Handle]
RETURNS [Worker.State] = INLINE {
WAIT shortWaitOver;
RETURN [self.state];
};
ShortWaitForNStarts: ENTRY PROC [self: Handle]
RETURNS [nStarts: [0..Worker.maxStarts]] = INLINE {
WAIT shortWaitOver;
RETURN [self.nStarts];
};
LocalWorkerFinish: PROC [
self: Handle, requiredOutcome: AlpineTransMgr.RequiredOutcome,
whyAbort: TransactionMap.AbortReason] = {
SELECT BeginFinish[self, requiredOutcome, waitIfPreparing] FROM
active => ERROR Refused [notReady];
completing => {
We have caused self to enter the completing state, self.outcome = requiredOutcome.
Hence we are responsible for making it complete.
IF requiredOutcome = abort THEN {
IF PreventStartWork[self, zero].abortThisTrans THEN {
Some work is still in progress. Wait for it to go away.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
};
NormalAbortWorker[self]
}
ELSE NormalCommitWorker[self];
};
ENDCASE => RETURN;
};
BeginFinish: ENTRY PROC [self: Handle,
requiredOutcome: AlpineTransMgr.RequiredOutcome,
option: {returnIfPreparing, waitIfPreparing}]
RETURNS [Worker.State] = {
result is # unknown, ready.
result is = active only if requiredOutcome = commit and transaction was unknown or active on entry. (Caller is in error.)
result is = preparing only if returnIfPreparing = TRUE and transaction was preparing on entry. (Caller is trying to perform unilateral abort.)
result is = completing iff this call caused the transaction to enter the completing state. (Caller is now responsible for making transaction complete.)
DO
SELECT self.state FROM
unknown, completing => WAIT shortWaitOver;
preparing => IF option = waitIfPreparing THEN WAIT shortWaitOver
ELSE RETURN [preparing];
ENDCASE => EXIT;
ENDLOOP;
IF requiredOutcome = abort AND (self.state = active OR self.state = ready) THEN {
LogWorkerCompleting[self, abort, FALSE];
self.state ← completing;
self.outcome ← abort;
}
ELSE IF requiredOutcome = commit AND self.state = ready THEN {
LogWorkerCompleting[self, commit, self.coordinatorIsRemote];
self.state ← completing;
self.outcome ← commit;
};
RETURN [self.state];
};
NormalCommitWorker: PROC [self: Handle] = {
self.state = completing, self.outcome = commit
Call any external caches now; they need to know about the commit.
AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, commit];
CommitWorker[self];
};
CommitWorker: PROC [self: Handle] = {
Called during normal operation and during recovery.
FileControl.CommitPhase2[trans: self, newTrans: self.continueWorker];
IF self.continueWorker # nullHandle THEN {
LockControl.TransferLocks[from: self, to: self.continueWorker];
self.continueWorker.locks ← self.locks;
self.locks ← NIL;
EndCreateWorker[self.continueWorker, create];
self.continueWorker ← nullHandle;
}
ELSE {
LockControl.ReleaseLocks[self];
self.locks ← NIL;
};
};
NormalAbortWorker: PROC [self: Handle] = {
self.state = completing, self.outcome = abort
Call any external caches now; they need to know about the abort.
AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, abort];
AbortWorker[self];
};
AbortWorker: PROC [self: Handle] = {
logProc: PROC [SkiPatrolLog.TransactionAbortInfo];
Called during normal operation and during recovery.
IF (logProc ← SkiPatrolLog.notice.abortTransaction) # NIL THEN
logProc[[
transID: self.transID,
where: "WorkerImpl.AbortWorker",
locks: self.locks,
why: lockInfo,
message: ""
]];
FileControl.Abort[trans: self];
LockControl.ReleaseLocks[self];
self.locks ← NIL;
IF self.continueWorker # nullHandle THEN {
EndCreateWorker[self.continueWorker, abort];
self.continueWorker ← nullHandle;
};
};
Log writing and analysis; checkpoint proc
LogWorkerBegin: PROC [self: Handle] = {
rec: Worker.BeginLogRep ← [];
ConvertUnsafe.AppendRope[
to: LOOPHOLE[LONG[@rec.coordinator]], from: self.coordinator.Name];
[thisRecord: self.beginRecord] ← Log.Write[self, workerBegin,
[base: @rec, length: TEXT[rec.length].SIZE]];
};
LogWorkerReady: PROC [self: Handle, force: BOOL] = {
[] ← Log.Write[self, workerReady,
Log.nullBlock, force];
};
LogWorkerCompleting: PROC [self: Handle,
outcome: AlpineInternal.WorkerOutcome, force: BOOL] = {
rec: Worker.CompletingLogRep ← [outcome: outcome];
[] ← Log.Write[self, workerCompleting,
[base: @rec, length: Worker.CompletingLogRep.SIZE], force];
};
LogWorkerComplete: PROC [self: Handle] = {
[] ← Log.Write[self, workerComplete, Log.nullBlock];
};
AnalyzeWorkerBegin: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
coordinator: AlpineEnvironment.FileStore;
{ -- get coordinator name from log, map it into a FileStore.
rec: Worker.BeginLogRep ← [];
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Worker.BeginLogRep.SIZE]];
IF status = destinationFull THEN ERROR InternalProgrammingError;
coordinator ← ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.coordinator]]];
};
{ -- create a worker handle from trans, setting its coordinator field from above.
self: Handle ← ConsWorker[trans, AlpineImport.Register[coordinator]];
IF TransactionMap.Register[self].alreadyRegistered THEN ERROR InternalProgrammingError;
self.beginRecord ← record;
self.locks ← LockControl.ConsTransHeader[self];
self.allowableDifficulty ← zero;
self.stateDuringRecovery ← active;
};
};
AnalyzeWorkerReady: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
IF self.stateDuringRecovery # active THEN ERROR InternalProgrammingError;
self.stateDuringRecovery ← ready;
};
AnalyzeWorkerCompleting: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
outcome: AlpineInternal.WorkerOutcome;
{ -- get outcome from log
rec: Worker.CompletingLogRep;
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Worker.CompletingLogRep.SIZE]];
IF status # normal THEN ERROR InternalProgrammingError;
outcome ← rec.outcome;
};
{
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
IF self.stateDuringRecovery NOT IN [active .. ready] THEN
ERROR InternalProgrammingError;
self.stateDuringRecovery ← IF outcome = abort THEN aborted ELSE committed;
};
};
AnalyzeWorkerComplete: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
SELECT self.stateDuringRecovery FROM
active, ready => ERROR;
committed => self.outcome ← commit;
aborted => self.outcome ← abort
ENDCASE => ERROR;
LockControl.ReleaseLocks[self];
self.locks ← NIL;
self.state ← complete;
TransactionMap.Unregister[self];
};
CallAfterAnalysisPass: PUBLIC PROC [] = {
WorkerControl.CallAfterAnalysisPass
AbortActiveWorker: PROC [self: Handle] RETURNS [stop: BOOL] = {
SELECT self.stateDuringRecovery FROM
active => {
LogWorkerCompleting[self: self, outcome: abort, force: FALSE];
self.stateDuringRecovery ← aborted;
};
ready, committed, aborted => NULL;
ENDCASE => ERROR;
RETURN [stop: FALSE];
};
logWordsUsableByClient ← LogControl.WordsUsableByClient[];
maxLogWordsForWorker ← MIN[
maxLogWordsForFastRestart, logWordsUsableByClient];
TransactionMap.LockedEnumerate[AbortActiveWorker];
};
RecoverWorkerBegin: PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
IF trans.state # unknown THEN ERROR;
trans.state ← active;
};
RecoverWorkerReady: PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
IF trans.state # active THEN ERROR;
trans.state ← ready;
};
RecoverWorkerCompleting: PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
trans.state ← completing;
SELECT outcome FROM
committed => {
trans.state ← ready;
IF ReadyWorker[trans] = notReady THEN ERROR;
trans.state ← completing; trans.outcome ← commit;
[] ← CommitWorker[trans];
};
aborted => {
trans.state ← completing; trans.outcome ← abort;
AbortWorker[trans];
};
ready => {
trans.state ← ready;
IF ReadyWorker[trans] # ready THEN ERROR;
};
ENDCASE => ERROR;
};
CallAfterUpdatePass: PUBLIC PROC [] = {
WorkerControl.CallAfterUpdatePass
noActiveWorkers: BOOL;
CheckForActiveWorkers: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
SELECT self.state FROM
ready, fpmComplete => NULL;
ENDCASE => noActiveWorkers ← FALSE;
RETURN [stop: FALSE];
};
DO
noActiveWorkers ← TRUE;
TransactionMap.LockedEnumerate[CheckForActiveWorkers];
IF noActiveWorkers THEN RETURN ELSE Process.Pause[Process.SecondsToTicks[1]];
ENDLOOP;
};
WorkerCheckpointProc: PROC []
RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
fpmCompleteWorkerSeen: BOOLFALSE;
ExamineWorkerBeforeForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
IF self.state = fpmComplete THEN {
self.state ← fpmCompleteBeingForcedOut;
fpmCompleteWorkerSeen ← TRUE;
};
RETURN [stop: FALSE];
};
oldestBeginRecord: Log.RecordID ← Log.lastRecordID;
ExamineWorkerAfterForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
IF self.state = fpmCompleteBeingForcedOut THEN {
LogWorkerComplete[self];
TransactionMap.Unregister[self];
self.state ← complete;
}
ELSE IF self.state # unknown THEN
oldestBeginRecord ← LogInline.Min[self.beginRecord, oldestBeginRecord];
RETURN [stop: FALSE];
};
TransactionMap.UnlockedEnumerate[ExamineWorkerBeforeForceOut];
IF fpmCompleteWorkerSeen THEN FilePageMgr.ForceOutEverything[];
TransactionMap.UnlockedEnumerate[ExamineWorkerAfterForceOut];
IF LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], oldestBeginRecord] >
maxLogWordsForWorker THEN WorkerInternal.NotifyLogWatchdog[];
RETURN [oldestBeginRecord, oldestBeginRecord];
};
WorkerLogWatchdogProcess: PUBLIC PROC [] = {
transID: TransID ← nullTransID;
w: Handle ← NIL;
wait: BOOLFALSE;
DO
IF transID # nullTransID THEN w ← TransactionMap.GetHandle[transID];
IF w = NIL THEN w ← GetOldestActiveWorker[];
IF w = NIL THEN { transID ← nullTransID; wait ← TRUE }
ELSE SELECT TestForAbort[w] FROM
notActive => transID ← nullTransID;
abort => {
logProc: PROC [SkiPatrolLog.TransactionAbortInfo];
IF (logProc ← SkiPatrolLog.notice.abortTransaction) # NIL THEN
logProc[[
transID: w.transID,
where: "WorkerImpl.WorkerLogWatchdogProcess",
why: watchDog,
message: "had old uncommitted updates"
]];
AbortUnilaterally[w, oldUncommittedUpdates];
transID ← nullTransID;
};
dontAbort => { transID ← w.transID; wait ← TRUE };
ENDCASE => ERROR;
w ← NIL; -- so that w can be garbage-collected
IF wait THEN { WorkerInternal.WaitForNotify[]; wait ← FALSE };
ENDLOOP;
};
TestForAbort: PUBLIC ENTRY SAFE PROC [self: Handle] RETURNS [SkiPatrolHooks.ShouldAbort] = TRUSTED {
logWordsForWorker: INT =
LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], self.beginRecord];
IF self.state # active THEN RETURN [notActive];
IF logWordsForWorker <= maxLogWordsForWorker THEN RETURN [dontAbort];
IF logWordsForWorker > logWordsUsableByClient THEN RETURN [abort];
IF BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] <
longTimeSinceLastStartWork --active-- AND
self.estimatedUpdateCost * densityFactor > logWordsForWorker --expensive-- THEN
RETURN [dontAbort];
RETURN [abort];
};
InactiveWorker: PUBLIC ENTRY SAFE PROC [self: Handle] RETURNS [BOOLEAN] ~ CHECKED {
"Inactive" = haven't done a "StartWork" for some number of seconds.
RETURN [BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] > longTimeSinceLastStartWork];
};
GetOldestActiveWorker: PROC [] RETURNS [Handle] = {
w: Handle ← NIL;
ExamineWorker: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
IF self.state = active AND (w = NIL OR
LogInline.Compare[w.beginRecord, self.beginRecord] # less) THEN
w ← self;
RETURN [stop: FALSE];
};
TransactionMap.UnlockedEnumerate[ExamineWorker];
RETURN [w];
};
Procedures exported to TransactionMap, and some related procedures.
StartWork: PUBLIC ENTRY PROC [self: Handle, difficulty: AlpineInternal.WorkLevel] RETURNS [canWork: BOOL] = {
IF difficulty > self.allowableDifficulty THEN RETURN [FALSE];
IF self.state NOT IN [active .. preparing] THEN ERROR;
WHILE self.nStarts = Worker.maxStarts DO
WAIT shortWaitOver;
IF difficulty > self.allowableDifficulty THEN RETURN [FALSE];
ENDLOOP;
self.nStarts ← self.nStarts + 1;
self.timeOfLastStartWork ← BasicTime.Now[];
RETURN [TRUE];
};
StopWork: PUBLIC ENTRY PROC [self: Handle, estimatedUpdateCost: INT] = {
IF self.nStarts = 0 THEN ERROR; --impossible if clients are correctly coded--
self.nStarts ← self.nStarts - 1;
self.estimatedUpdateCost ← self.estimatedUpdateCost + estimatedUpdateCost;
};
PreventStartWork: ENTRY PROC [self: Handle, allowDifficulty: Worker.Difficulty]
RETURNS [abortThisTrans: BOOL] = {
Return abortThisTrans ~ TRUE if work is in progress.
IF self.nStarts # 0 THEN {
self.allowableDifficulty ← zero;
RETURN [abortThisTrans: TRUE];
}
ELSE {
self.allowableDifficulty ← allowDifficulty;
RETURN [abortThisTrans: FALSE];
};
};
GetTimeOfLastStartWork: PUBLIC ENTRY PROC [self: Handle]
RETURNS [BasicTime.GMT] = {
RETURN [self.timeOfLastStartWork];
};
GetEstimatedUpdateCost: PUBLIC ENTRY PROC [self: Handle] RETURNS [INT] = {
RETURN [self.estimatedUpdateCost];
};
GetTransID: PUBLIC PROC [self: Handle] RETURNS [TransID] = {
Not ENTRY because the trans field is immutable.
RETURN [self.transID];
};
GetFileInstanceList: PUBLIC PROC [self: Handle]
RETURNS [fileInstanceList: FileInstanceHandle] = {
Not ENTRY because self.fileInstanceList "belongs" to the file manager.
RETURN [self.fileInstanceList];
};
SetFileInstanceList: PUBLIC PROC [self: Handle,
fileInstanceList: FileInstanceHandle] = {
Not ENTRY because self.fileInstanceList "belongs" to the file manager.
self.fileInstanceList ← fileInstanceList;
};
GetLockHeader: PUBLIC PROC [self: Handle]
RETURNS [lockHeader: AlpineInternal.LockTransHeaderHandle] = {
Not ENTRY because self.locks "belongs" to the lock manager.
RETURN[self.locks];
};
EnableAlpineWheel: PUBLIC ENTRY PROC [self: Handle,
conversation: Conversation, enable: BOOL] = {
l: LIST OF RPC.ConversationID;
c: RPC.ConversationID = RPC.GetConversationID[conversation];
IF c = AlpineIdentity.myLocalConversationID THEN RETURN;
FOR l ← self.enabledWheelList, l.rest UNTIL l = NIL DO
IF l.first = c THEN EXIT;
ENDLOOP;
IF enable AND l = NIL THEN {
new: LIST OF RPC.ConversationID = zone.CONS[
first: c, rest: self.enabledWheelList];
self.enabledWheelList ← new;
}
ELSE IF (NOT enable) AND l # NIL THEN {
IF self.enabledWheelList = l THEN self.enabledWheelList ← l.rest
ELSE
FOR p: LIST OF RPC.ConversationID ← self.enabledWheelList, p.rest UNTIL p = NIL DO
IF p.rest = l THEN { p.rest ← l.rest; EXIT };
ENDLOOP;
};
};
IsAlpineWheel: PUBLIC ENTRY PROC [self: Handle,
conversation: Conversation] RETURNS [enabled: BOOL] = {
c: RPC.ConversationID = RPC.GetConversationID[conversation];
IF c = AlpineIdentity.myLocalConversationID THEN RETURN [enabled: TRUE];
FOR l: LIST OF RPC.ConversationID ← self.enabledWheelList, l.rest UNTIL l = NIL DO
IF l.first = c THEN RETURN [enabled: TRUE];
ENDLOOP;
RETURN [enabled: FALSE];
};
StateDuringRecovery: PUBLIC ENTRY PROC [self: Handle]
RETURNS [TransactionMap.TransState] = {
SELECT self.stateDuringRecovery FROM
active => ERROR;
ready => RETURN [ready];
committed => RETURN [committed];
aborted => RETURN [aborted];
ENDCASE => ERROR;
};
IsCommitted: PUBLIC ENTRY PROC [self: Handle] RETURNS [BOOL] = {
SELECT self.state FROM
active, preparing => RETURN [FALSE];
completing => RETURN [self.outcome = commit];
ENDCASE => ERROR; -- locks are supposed to prevent this
};
AssertUpdatesAreComplete: PUBLIC ENTRY PROC [self: Handle] = {
IF self.state # completing THEN ERROR;
self.state ← fpmComplete;
};
TransIDFromTransHandle: PUBLIC SAFE PROC [h: AlpineInternal.TransHandle] RETURNS [AlpineEnvironment.TransID] ~ CHECKED {
For SkiPatrolHooks.
wh: Worker.Handle;
IF h = NIL THEN
RETURN [AlpineEnvironment.nullTransID]
ELSE {
TRUSTED {wh ← LOOPHOLE[h]};
RETURN [wh.transID]
}
};
Initialization
LogControl.RegisterAnalysisProc[workerBegin, AnalyzeWorkerBegin];
LogControl.RegisterAnalysisProc[workerReady, AnalyzeWorkerReady];
LogControl.RegisterAnalysisProc[workerCompleting, AnalyzeWorkerCompleting];
LogControl.RegisterAnalysisProc[workerComplete, AnalyzeWorkerComplete];
Log.RegisterRecoveryProc[workerBegin, RecoverWorkerBegin];
Log.RegisterRecoveryProc[workerReady, RecoverWorkerReady];
Log.RegisterRecoveryProc[workerCompleting, RecoverWorkerCompleting];
LogControl.RegisterCheckpointProc[WorkerCheckpointProc];
Process.SetTimeout[@shortWaitOver, Process.MsecToTicks[Worker.shortWaitTime]];
Process.EnableAborts[@shortWaitOver];
END.--WorkerImpl
CHANGE LOG
Changed by MBrown on February 9, 1983 11:54 am
Change CommitWorker to set continueWorker ← nullHandle. (This was happening, unsynchronized, from the checkpoint process!)
Changed by MBrown on March 2, 1983 2:53 pm
Change sanity check in StartWork.
Changed by MBrown on April 3, 1983 10:32 am
Implemented worker log watchdog. Changed AbortUnilaterally to call AlpineTransaction.Finish[..., abort, ...], making coordinator go away if it is local.
Updated catch phrases for AccessControl errors.
Changed by MBrown on June 6, 1983 11:28 am
In WorkerImpl.CheckForReadyWorker, was examining worker state without entering monitor. Changed logic as follows: not WorkerImpl.CallAfterUpdatePass waits until all workers are either ready or fpmComplete.
Changed by MBrown on June 25, 1983 9:43 pm
Fixed two related bugs. If AbortUnilaterally was called with self.state = preparing, it simply waited for self.state # preparing. This is no good because self may be waiting in the lock manager, deadlocked, and the caller of AbortUnilaterally may be the lock watchdog process! This was observed several times in server operation.
The second bug was more subtle and probably has not yet arisen in practice. When a process calls LockControl.AbortWaitingRequests, what guarantee does it have that no new requests will come along and wait later? Answer: no guarantee. This synchronization must be done at the level of the worker object monitor, and the lock manager does not enter the worker monitor. We wrapped all calls to LockControl.AbortWaitingRequests in loops that are designed to ensure that when the loop exits, no more Lock.Set or LockControl.UpgradeLocks calls will be executed for this transaction. This seems messy but it is not clear how to do better.
Edited June 1984, by Kupfer
Add InactiveWorker function.
Edited on July 12, 1984 3:10:32 pm PDT, by Kupfer
changes to: StartWork, StopWork: Reflect change to SkiPatrolLog.
Edited on July 25, 1984 11:05:24 am PDT, by Kupfer
(1) Get rid of "work" probes and add probe for locking errors and for worker abort.
(2) Make CreateWorker be cleaner about what ERRORs it returns.
(3) Add TransIDFromTransHandle.
changes to: ReadyWorker, StartWork, StopWork, WorkerImpl, WorkerLogWatchdogProcess, AbortWorker
Edited on August 6, 1984 3:40:12 pm PDT, by Kupfer
(1) Make TransIDFromTransHandle handle a NIL argument gracefully.
(2) fix a comment about what signals WorkerPrepare generates.
(3) Remove the possible race condition in SkiPatrolLog probes by assigning the PROC to a temporary variable.
changes to: WorkerImpl, AbortWorker, WorkerLogWatchdogProcess, WorkerPrepare, TransIDFromTransHandle