-- WorkerImpl.mesa
-- Implements worker in two-phase commit: create, prepare, finish (commit or abort.)
-- Last edited by
-- MBrown on January 30, 1984 12:12:43 pm PST
-- Kolling on June 6, 1983 12:05 pm
-- NOTES:
-- It seems poor for CreateWorker to ERROR Unknown[coordinator] in the case that the
--coordinator is there, but refusing new RPC work (CedarRPC.CallFailed[busy].)
-- It is also poor to land in debugger due to RPC.CallFailed[protocolError].
-- The implementation of AbortUnilaterally should be improved by making a remote
--call to the coordinator, if the coordinator is remote.
DIRECTORY
AccessControl,
AlpineEnvironment,
AlpineIdentity,
AlpineImport,
AlpineInternal,
AlpineTransaction,
AlpineTransMgr,
AlpineTransMgrRpcControl,
BasicTime,
ClientMap USING [GetName],
ConversationTable,
ConvertUnsafe,
FileControl,
FilePageMgr,
Lock,
LockControl,
Log,
LogControl,
LogInline,
Process,
Rope,
RPC,
SafeStorage,
TransactionMap,
Worker,
WorkerControl,
WorkerInternal;
WorkerImpl: MONITOR LOCKS self USING self: Handle
IMPORTS
AccessControl,
AlpineIdentity,
AlpineImport,
AlpineTransaction,
BasicTime,
ClientMap,
ConversationTable,
ConvertUnsafe,
FileControl,
FilePageMgr,
Lock,
LockControl,
Log,
LogControl,
LogInline,
Process,
Rope,
RPC,
SafeStorage,
TransactionMap,
WorkerInternal
EXPORTS
AlpineTransaction,
AlpineTransMgr,
AlpineInternal,
TransactionMap,
WorkerControl,
WorkerInternal
= BEGIN
Conversation: TYPE = AlpineEnvironment.Conversation;
TransID: TYPE = AlpineEnvironment.TransID;
nullTransID: TransID = AlpineEnvironment.nullTransID;
FileStore: TYPE = AlpineEnvironment.FileStore;
FileInstanceHandle: TYPE = AlpineInternal.FileInstanceHandle;
RecordID: TYPE = Log.RecordID;
RecordType: TYPE = Log.RecordType;
InternalProgrammingError: ERROR = CODE;
Handle: TYPE = Worker.Handle;
nullHandle: Handle = Worker.nullHandle;
TransObject: PUBLIC TYPE = Worker.Object;
-- AlpineInternal.TransObject
Refused: PUBLIC ERROR [why: AlpineTransMgr.Refusal] = CODE;
--AlpineTransMgr.Refused.
zone: ZONE = SafeStorage.GetSystemZone[];
disableLocalCoordinatorOptimization: BOOL ← FALSE;
-- When TRUE, worker always forces log as if coordinator were remote.
shortWaitOver: CONDITION;
-- Client waits here under various unlikely circumstances involving concurrent calls
--to CreateWorker, PrepareWorker, and FinishWorker, concurrent calls to worker actions
--that use exclusive mode StartWork, and excessive concurrency among other worker
--actions. The wait times out after Worker.shortWaitTime milliseconds, allowing the
--situation to be reevaluated. Nobody notifies shortWaitOver.
-- Parameters to worker log watchdog:
densityFactor: INT ← 64;
-- if self.estimatedUpdateCost*densityFactor is larger than the amount of log tied up
--by this transaction, then the transaction is considered too expensive to abort
--(unless it is inactive or its log use exceeeds logWordsUsableByClient).
longTimeSinceLastStartWork: INT ← 1000; -- seconds (about 15 minutes)
-- if the time since last start work is greater than this, the the transaction is
--considered inactive.
logWordsUsableByClient: INT;
-- absolute upper bound on of log tied up by a transaction.
maxLogWordsForFastRestart: INT = 3000*LONG[AlpineEnvironment.wordsPerPage];
-- fast restart means two minutes: two passes over 3000 pages at 50 pages/sec.
maxLogWordsForWorker: INT;
-- A worker may comfortably keep this much log active. Beyond this bound, the
--worker may be aborted (but need not be).
-- These two values are initialized during recovery (based on the log length) and not
--changed thereafter (except perhaps from the debugger).
CreateWorker: PUBLIC PROC [
conversation: Conversation, transID: TransID, coordinator: FileStore] = {
-- ! Unknown {coordinator, transID};
-- AlpineTransaction.CreateWorker.
-- Called by a client.
-- The worker object monitor is not held during the call to
--AlpineCoordinator.RegisterWorker. This was designed to allow
--AlpineCoordinator.Finish to hold the coordinator object monitor during its calls
--to workers. (AlpineCoordinator.Finish does not actually work this way).
self: Handle;
alreadyRegistered: BOOL ← FALSE;
whyCallFailed: RPC.CallFailure;
registerWorkerResult: AlpineTransMgr.RegisterWorkerResult;
alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord;
conversationOut: Conversation;
IF TransactionMap.GetHandle[transID] # nullHandle THEN alreadyRegistered ← TRUE
ELSE self ← ConsWorker[transID, AlpineImport.Register[coordinator]];
IF alreadyRegistered OR TransactionMap.Register[self].alreadyRegistered THEN {
-- Drop self on the floor since transID is already registered. We are not allowed
--to call RegisterWorker; some other process is doing it. Wait for
--self.state # unknown, then return result.
self ← TransactionMap.GetHandle[transID];
IF self = nullHandle OR WaitUntilNotUnknown[self].workerNotActive THEN
ERROR AlpineTransaction.Unknown[transID];
}
ELSE {
-- We registered the volatile worker, so we are responsible to call RegisterWorker
--and update volatile and permanent data structures to reflect the result.
--We cannot return or error until self.state # unknown, lest another process hang
--in WaitUntilNotUnknown.
conversationOut ← ConversationTable.Fetch[self.coordinator];
alpineTransMgr ← AlpineImport.GetTransMgrInterface[self.coordinator];
IF conversationOut = NIL OR alpineTransMgr = NIL THEN GOTO noCoordinator;
registerWorkerResult ← alpineTransMgr.RegisterWorker[conversationOut, transID !
RPC.CallFailed => CHECKED { whyCallFailed ← why; GOTO callFailed } ];
EndCreateWorker[self, IF registerWorkerResult = ok THEN create ELSE abort];
EXITS
noCoordinator => {
EndCreateWorker[self, abort];
ERROR AlpineTransaction.Unknown[coordinator] };
callFailed => {
EndCreateWorker[self, abort];
SELECT whyCallFailed FROM
timeout, unbound => {
self.coordinator.TransMgrInterfaceCallFailed[alpineTransMgr];
alpineTransMgr ← NIL; --must come after CallFailed is unwound
ERROR AlpineTransaction.Unknown[coordinator] };
busy => ERROR AlpineTransaction.Unknown[coordinator];
runtimeProtocol, stubProtocol => ERROR;
ENDCASE }
};
};
ConsWorker: PROC [trans: TransID, coordinator: AlpineImport.Handle]
RETURNS [Handle] = {
-- Note: does not establish the locks field of the Worker.Object.
RETURN [zone.NEW[Worker.Object ← [
transID: trans,
timeOfLastStartWork: BasicTime.Now[],
coordinator: coordinator,
coordinatorIsRemote: disableLocalCoordinatorOptimization OR
NOT coordinator.Equal[AlpineIdentity.myAlpineImportHandle]]]];
};
EndCreateWorker: ENTRY PROC [self: Handle, outcome: {create, abort} ] = {
-- self is registered in TransactionMap, in unknown state (no WorkerBegin written).
IF self.state # unknown THEN ERROR;
IF outcome = create THEN {
LogWorkerBegin[self];
self.state ← active;
IF self.locks = NIL THEN self.locks ← LockControl.ConsTransHeader[self];
}
ELSE {
-- Must change state from unknown since another process in WaitUntilNotUnknown
--may be waiting for this event.
self.state ← complete;
self.outcome ← abort;
TransactionMap.Unregister[self];
};
};
WaitUntilNotUnknown: ENTRY PROC [self: Handle]
RETURNS [workerNotActive: BOOL] = {
WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP;
RETURN [self.state # active];
};
WorkerPrepare: PUBLIC PROC [
conversation: Conversation, trans: TransID,
newTrans: AlpineEnvironment.TransID]
RETURNS [AlpineTransMgr.WorkerState] = {
-- ! Refused {transUnknown, wrongCoordinator}
-- AlpineTransMgr.WorkerPrepare.
-- Called by the coordinator of "trans".
-- This procedure is prepared for duplicate calls. Extra callers wait on the
--global condition variable prepareFinished.
stateAfterPrepare: AlpineEnvironment.WorkerState;
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN [notReady];
IF NOT Rope.Equal[
s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN
ERROR Refused[wrongCoordinator];
SELECT BeginPrepare[self] FROM
preparing => NULL; -- normal case
ready => RETURN [ready]; -- duplicate call
ENDCASE => RETURN [notReady]; -- duplicate call
-- We have caused self to enter the preparing state.
IF newTrans # nullTransID THEN {
-- Perform specialized version of CreateWorker.
self.continueWorker ← ConsWorker[newTrans, self.coordinator];
IF TransactionMap.Register[self.continueWorker].alreadyRegistered THEN ERROR;
-- The newTrans is already known to us. This probably represents an error
--in the coordinator.
};
{
IF PreventStartWork[self: self, allowDifficulty: normal].abortThisTrans THEN GOTO abort;
-- Call any external caches now; they are allowed to do read/write pages.
AccessControl.PhaseOneSpaceAndOwnerChanges[self !
AccessControl.LockFailed, AccessControl.Unknown => GOTO abort];
IF PreventStartWork[self: self, allowDifficulty: zero].abortThisTrans THEN GOTO abort;
-- Any call to StartWork for this transaction will now fail.
-- There is no work in progress, including waiting lock requests.
-- We therefore have exclusive access without entering the monitor.
stateAfterPrepare ← ReadyWorker[self];
EXITS
abort => {
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
stateAfterPrepare ← notReady;
};
};
EndPrepare[self, stateAfterPrepare];
-- We have caused self to enter either the ready state or the completing state.
SELECT stateAfterPrepare FROM
notReady => NormalAbortWorker[self];
readOnlyReady => NormalCommitWorker[self];
ready => NULL;
ENDCASE;
RETURN [stateAfterPrepare]
};
BeginPrepare: ENTRY PROC [self: Handle]
RETURNS [Worker.State] = {
-- result is # unknown, active.
-- result is = preparing iff this call caused the transition to enter
--the preparing state.
DO
SELECT self.state FROM
unknown, preparing => WAIT shortWaitOver;
ENDCASE => EXIT;
ENDLOOP;
IF self.state = active THEN self.state ← preparing;
RETURN [self.state];
};
ReadyWorker: PROC [self: Handle]
RETURNS [AlpineEnvironment.WorkerState] = {
-- Called during normal operation and during recovery.
-- The caller has exclusive access to the transaction self, is not holding the monitor.
readOnly: BOOL ← FileControl.CommitPhase1[self !
Lock.Failed, Lock.TransAborting => GOTO abort];
LockControl.UpgradeLocks[self !
Lock.Failed, Lock.TransAborting => GOTO abort];
RETURN [IF readOnly THEN readOnlyReady ELSE ready];
EXITS
abort => RETURN [notReady];
};
EndPrepare: ENTRY PROC [self: Handle,
stateAfterPrepare: AlpineEnvironment.WorkerState] = {
SELECT stateAfterPrepare FROM
notReady => {
LogWorkerCompleting[self: self, outcome: abort, force: FALSE];
self.state ← completing;
self.outcome ← abort;
};
readOnlyReady => {
LogWorkerCompleting[self: self, outcome: readOnly, force: FALSE];
self.state ← completing;
self.outcome ← commit;
};
ready => {
LogWorkerReady[self: self, force: self.coordinatorIsRemote];
self.state ← ready;
};
ENDCASE => ERROR;
};
WorkerFinish: PUBLIC PROC [
conversation: Conversation, trans: TransID,
requiredOutcome: AlpineTransMgr.RequiredOutcome] = {
-- ! Refused {wrongCoordinator, notReady};
-- AlpineTransMgr.WorkerFinish
-- Called by the coordinator of "trans".
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN; --we must have already finished.
IF NOT Rope.Equal[
s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN
ERROR Refused[wrongCoordinator];
IF requiredOutcome NOT IN [abort .. commit] THEN ERROR;
LocalWorkerFinish[self, requiredOutcome, clientRequest];
};
AbortUnilaterally: PUBLIC PROC [
self: Handle, why: TransactionMap.AbortReason] = {
-- TransactionMap.AbortUnilaterally
-- Note some duplicate logic with LocalWorkerFinish.
SELECT BeginFinish[self, abort, returnIfPreparing] FROM
active => ERROR Refused [notReady];
preparing => {
-- We cannot determine self's outcome, and are not responsible for making it
--complete, but we are responsible for ensuring that it leaves the preparing state.
--Otherwise it might stay there forever, waiting in the lock manager.
-- This loop is really necessary, since neither process involved is synchronized
--by self's or Lock's monitors.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForState[self] # preparing THEN RETURN;
ENDLOOP;
};
completing => {
-- We have caused self to enter the completing state, self.outcome = abort.
-- Hence we are responsible for making it complete.
IF PreventStartWork[self, zero].abortThisTrans THEN {
-- Some work is still in progress. Wait for it to go away.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
};
NormalAbortWorker[self];
-- temporary kludge, effective since remote coordinators aren't used now.
[] ← AlpineTransaction.Finish[
AlpineIdentity.myLocalConversation, self.transID, abort, FALSE];
RETURN;
};
ENDCASE => RETURN;
};
ShortWaitForState: ENTRY PROC [self: Handle]
RETURNS [Worker.State] = INLINE {
WAIT shortWaitOver;
RETURN [self.state];
};
ShortWaitForNStarts: ENTRY PROC [self: Handle]
RETURNS [nStarts: [0..Worker.maxStarts]] = INLINE {
WAIT shortWaitOver;
RETURN [self.nStarts];
};
LocalWorkerFinish: PROC [
self: Handle, requiredOutcome: AlpineTransMgr.RequiredOutcome,
whyAbort: TransactionMap.AbortReason] = {
SELECT BeginFinish[self, requiredOutcome, waitIfPreparing] FROM
active => ERROR Refused [notReady];
completing => {
-- We have caused self to enter the completing state, self.outcome = requiredOutcome.
-- Hence we are responsible for making it complete.
IF requiredOutcome = abort THEN {
IF PreventStartWork[self, zero].abortThisTrans THEN {
-- Some work is still in progress. Wait for it to go away.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
};
NormalAbortWorker[self]
}
ELSE NormalCommitWorker[self];
};
ENDCASE => RETURN;
};
BeginFinish: ENTRY PROC [self: Handle,
requiredOutcome: AlpineTransMgr.RequiredOutcome,
option: {returnIfPreparing, waitIfPreparing}]
RETURNS [Worker.State] = {
-- result is # unknown, ready.
-- result is = active only if requiredOutcome = commit and transaction was unknown
--or active on entry. (Caller is in error.)
-- result is = preparing only if returnIfPreparing = TRUE and transaction was preparing
--on entry. (Caller is trying to perform unilateral abort.)
-- result is = completing iff this call caused the transaction to enter
--the completing state. (Caller is now responsible for making transaction complete.)
DO
SELECT self.state FROM
unknown, completing => WAIT shortWaitOver;
preparing => IF option = waitIfPreparing THEN WAIT shortWaitOver
ELSE RETURN [preparing];
ENDCASE => EXIT;
ENDLOOP;
IF requiredOutcome = abort AND (self.state = active OR self.state = ready) THEN {
LogWorkerCompleting[self, abort, FALSE];
self.state ← completing;
self.outcome ← abort;
}
ELSE IF requiredOutcome = commit AND self.state = ready THEN {
LogWorkerCompleting[self, commit, self.coordinatorIsRemote];
self.state ← completing;
self.outcome ← commit;
};
RETURN [self.state];
};
NormalCommitWorker: PROC [self: Handle] = {
-- self.state = completing, self.outcome = commit
-- Call any external caches now; they need to know about the commit.
AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, commit];
CommitWorker[self];
};
CommitWorker: PROC [self: Handle] = {
-- Called during normal operation and during recovery.
FileControl.CommitPhase2[trans: self, newTrans: self.continueWorker];
IF self.continueWorker # nullHandle THEN {
LockControl.TransferLocks[from: self, to: self.continueWorker];
self.continueWorker.locks ← self.locks;
self.locks ← NIL;
EndCreateWorker[self.continueWorker, create];
self.continueWorker ← nullHandle;
}
ELSE {
LockControl.ReleaseLocks[self];
self.locks ← NIL;
};
};
NormalAbortWorker: PROC [self: Handle] = {
-- self.state = completing, self.outcome = abort
-- Call any external caches now; they need to know about the abort.
AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, abort];
AbortWorker[self];
};
AbortWorker: PROC [self: Handle] = {
-- Called during normal operation and during recovery.
FileControl.Abort[trans: self];
LockControl.ReleaseLocks[self];
self.locks ← NIL;
IF self.continueWorker # nullHandle THEN {
EndCreateWorker[self.continueWorker, abort];
self.continueWorker ← nullHandle;
};
};
-- Log writing and analysis; checkpoint proc
LogWorkerBegin: PROC [self: Handle] = {
rec: Worker.BeginLogRep ← [];
ConvertUnsafe.AppendRope[
to: LOOPHOLE[LONG[@rec.coordinator]], from: self.coordinator.Name];
[thisRecord: self.beginRecord] ← Log.Write[self, workerBegin,
[base: @rec, length: TEXT[rec.length].SIZE]];
};
LogWorkerReady: PROC [self: Handle, force: BOOL] = {
[] ← Log.Write[self, workerReady,
Log.nullBlock, force];
};
LogWorkerCompleting: PROC [self: Handle,
outcome: AlpineInternal.WorkerOutcome, force: BOOL] = {
rec: Worker.CompletingLogRep ← [outcome: outcome];
[] ← Log.Write[self, workerCompleting,
[base: @rec, length: Worker.CompletingLogRep.SIZE], force];
};
LogWorkerComplete: PROC [self: Handle] = {
[] ← Log.Write[self, workerComplete, Log.nullBlock];
};
AnalyzeWorkerBegin: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
coordinator: AlpineEnvironment.FileStore;
{ -- get coordinator name from log, map it into a FileStore.
rec: Worker.BeginLogRep ← [];
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Worker.BeginLogRep.SIZE]];
IF status = destinationFull THEN ERROR InternalProgrammingError;
coordinator ← ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.coordinator]]];
};
{ -- create a worker handle from trans, setting its coordinator field from above.
self: Handle ← ConsWorker[trans, AlpineImport.Register[coordinator]];
IF TransactionMap.Register[self].alreadyRegistered THEN ERROR InternalProgrammingError;
self.beginRecord ← record;
self.locks ← LockControl.ConsTransHeader[self];
self.allowableDifficulty ← zero;
self.stateDuringRecovery ← active;
};
};
AnalyzeWorkerReady: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
IF self.stateDuringRecovery # active THEN ERROR InternalProgrammingError;
self.stateDuringRecovery ← ready;
};
AnalyzeWorkerCompleting: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
outcome: AlpineInternal.WorkerOutcome;
{ -- get outcome from log
rec: Worker.CompletingLogRep;
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Worker.CompletingLogRep.SIZE]];
IF status # normal THEN ERROR InternalProgrammingError;
outcome ← rec.outcome;
};
{
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
IF self.stateDuringRecovery NOT IN [active .. ready] THEN
ERROR InternalProgrammingError;
self.stateDuringRecovery ← IF outcome = abort THEN aborted ELSE committed;
};
};
AnalyzeWorkerComplete: PROC [
record: RecordID, type: RecordType, trans: TransID] = {
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
SELECT self.stateDuringRecovery FROM
active, ready => ERROR;
committed => self.outcome ← commit;
aborted => self.outcome ← abort
ENDCASE => ERROR;
LockControl.ReleaseLocks[self];
self.locks ← NIL;
self.state ← complete;
TransactionMap.Unregister[self];
};
CallAfterAnalysisPass: PUBLIC PROC [] = {
-- WorkerControl.CallAfterAnalysisPass
AbortActiveWorker: PROC [self: Handle] RETURNS [stop: BOOL] = {
SELECT self.stateDuringRecovery FROM
active => {
LogWorkerCompleting[self: self, outcome: abort, force: FALSE];
self.stateDuringRecovery ← aborted;
};
ready, committed, aborted => NULL;
ENDCASE => ERROR;
RETURN [stop: FALSE];
};
logWordsUsableByClient ← LogControl.WordsUsableByClient[];
maxLogWordsForWorker ← MIN[
maxLogWordsForFastRestart, logWordsUsableByClient];
TransactionMap.LockedEnumerate[AbortActiveWorker];
};
RecoverWorkerBegin: PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
IF trans.state # unknown THEN ERROR;
trans.state ← active;
};
RecoverWorkerReady: PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
IF trans.state # active THEN ERROR;
trans.state ← ready;
};
RecoverWorkerCompleting: PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
trans.state ← completing;
SELECT outcome FROM
committed => {
trans.state ← ready;
IF ReadyWorker[trans] = notReady THEN ERROR;
trans.state ← completing; trans.outcome ← commit;
[] ← CommitWorker[trans];
};
aborted => {
trans.state ← completing; trans.outcome ← abort;
AbortWorker[trans];
};
ready => {
trans.state ← ready;
IF ReadyWorker[trans] # ready THEN ERROR;
};
ENDCASE => ERROR;
};
CallAfterUpdatePass: PUBLIC PROC [] = {
-- WorkerControl.CallAfterUpdatePass
noActiveWorkers: BOOL;
CheckForActiveWorkers: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
SELECT self.state FROM
ready, fpmComplete => NULL;
ENDCASE => noActiveWorkers ← FALSE;
RETURN [stop: FALSE];
};
DO
noActiveWorkers ← TRUE;
TransactionMap.LockedEnumerate[CheckForActiveWorkers];
IF noActiveWorkers THEN RETURN ELSE Process.Pause[Process.SecondsToTicks[1]];
ENDLOOP;
};
WorkerCheckpointProc: PROC []
RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
fpmCompleteWorkerSeen: BOOL ← FALSE;
ExamineWorkerBeforeForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
IF self.state = fpmComplete THEN {
self.state ← fpmCompleteBeingForcedOut;
fpmCompleteWorkerSeen ← TRUE;
};
RETURN [stop: FALSE];
};
oldestBeginRecord: Log.RecordID ← Log.lastRecordID;
ExamineWorkerAfterForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
IF self.state = fpmCompleteBeingForcedOut THEN {
LogWorkerComplete[self];
TransactionMap.Unregister[self];
self.state ← complete;
}
ELSE IF self.state # unknown THEN
oldestBeginRecord ← LogInline.Min[self.beginRecord, oldestBeginRecord];
RETURN [stop: FALSE];
};
TransactionMap.UnlockedEnumerate[ExamineWorkerBeforeForceOut];
IF fpmCompleteWorkerSeen THEN FilePageMgr.ForceOutEverything[];
TransactionMap.UnlockedEnumerate[ExamineWorkerAfterForceOut];
IF LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], oldestBeginRecord] >
maxLogWordsForWorker THEN WorkerInternal.NotifyLogWatchdog[];
RETURN [oldestBeginRecord, oldestBeginRecord];
};
WorkerLogWatchdogProcess: PUBLIC PROC [] = {
transID: TransID ← nullTransID;
w: Handle ← NIL;
wait: BOOL ← FALSE;
DO
IF transID # nullTransID THEN w ← TransactionMap.GetHandle[transID];
IF w = NIL THEN w ← GetOldestActiveWorker[];
IF w = NIL THEN { transID ← nullTransID; wait ← TRUE }
ELSE SELECT TestForAbort[w] FROM
notActive => transID ← nullTransID;
abort => {
AbortUnilaterally[w, oldUncommittedUpdates];
transID ← nullTransID;
};
dontAbort => { transID ← w.transID; wait ← TRUE };
ENDCASE => ERROR;
w ← NIL; -- so that w can be garbage-collected
IF wait THEN { WorkerInternal.WaitForNotify[]; wait ← FALSE };
ENDLOOP;
};
TestForAbort: ENTRY PROC [self: Handle] RETURNS [{notActive, abort, dontAbort}] = {
logWordsForWorker: INT =
LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], self.beginRecord];
IF self.state # active THEN RETURN [notActive];
IF logWordsForWorker <= maxLogWordsForWorker THEN RETURN [dontAbort];
IF logWordsForWorker > logWordsUsableByClient THEN RETURN [abort];
IF BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] <
longTimeSinceLastStartWork --active-- AND
self.estimatedUpdateCost * densityFactor > logWordsForWorker --expensive-- THEN
RETURN [dontAbort];
RETURN [abort];
};
GetOldestActiveWorker: PROC [] RETURNS [Handle] = {
w: Handle ← NIL;
ExamineWorker: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
IF self.state = active AND (w = NIL OR
LogInline.Compare[w.beginRecord, self.beginRecord] # less) THEN
w ← self;
RETURN [stop: FALSE];
};
TransactionMap.UnlockedEnumerate[ExamineWorker];
RETURN [w];
};
-- Procedures exported to TransactionMap, and some related procedures.
StartWork: PUBLIC ENTRY PROC [self: Handle,
difficulty: AlpineInternal.WorkLevel] RETURNS [canWork: BOOL] = {
IF difficulty > self.allowableDifficulty THEN RETURN [FALSE];
IF self.state NOT IN [active .. preparing] THEN ERROR;
WHILE self.nStarts = Worker.maxStarts DO
WAIT shortWaitOver;
IF difficulty > self.allowableDifficulty THEN RETURN [FALSE];
ENDLOOP;
self.nStarts ← self.nStarts + 1;
self.timeOfLastStartWork ← BasicTime.Now[];
RETURN [TRUE];
};
StopWork: PUBLIC ENTRY PROC [self: Handle, estimatedUpdateCost: INT] = {
IF self.nStarts = 0 THEN ERROR; --impossible if clients are correctly coded--
self.nStarts ← self.nStarts - 1;
self.estimatedUpdateCost ← self.estimatedUpdateCost + estimatedUpdateCost;
};
PreventStartWork: ENTRY PROC [self: Handle, allowDifficulty: Worker.Difficulty]
RETURNS [abortThisTrans: BOOL] = {
-- Return abortThisTrans ~ TRUE if work is in progress.
IF self.nStarts # 0 THEN {
self.allowableDifficulty ← zero;
RETURN [abortThisTrans: TRUE];
}
ELSE {
self.allowableDifficulty ← allowDifficulty;
RETURN [abortThisTrans: FALSE];
};
};
GetTimeOfLastStartWork: PUBLIC ENTRY PROC [self: Handle]
RETURNS [BasicTime.GMT] = {
RETURN [self.timeOfLastStartWork];
};
GetEstimatedUpdateCost: PUBLIC ENTRY PROC [self: Handle] RETURNS [INT] = {
RETURN [self.estimatedUpdateCost];
};
GetTransID: PUBLIC PROC [self: Handle] RETURNS [TransID] = {
-- Not ENTRY because the trans field is immutable.
RETURN [self.transID];
};
GetFileInstanceList: PUBLIC PROC [self: Handle]
RETURNS [fileInstanceList: FileInstanceHandle] = {
-- Not ENTRY because self.fileInstanceList "belongs" to the file manager.
RETURN [self.fileInstanceList];
};
SetFileInstanceList: PUBLIC PROC [self: Handle,
fileInstanceList: FileInstanceHandle] = {
-- Not ENTRY because self.fileInstanceList "belongs" to the file manager.
self.fileInstanceList ← fileInstanceList;
};
GetLockHeader: PUBLIC PROC [self: Handle]
RETURNS [lockHeader: AlpineInternal.LockTransHeaderHandle] = {
-- Not ENTRY because self.locks "belongs" to the lock manager.
RETURN[self.locks];
};
EnableAlpineWheel: PUBLIC ENTRY PROC [self: Handle,
conversation: Conversation, enable: BOOL] = {
l: LIST OF RPC.ConversationID;
c: RPC.ConversationID = RPC.GetConversationID[conversation];
IF c = AlpineIdentity.myLocalConversationID THEN RETURN;
FOR l ← self.enabledWheelList, l.rest UNTIL l = NIL DO
IF l.first = c THEN EXIT;
ENDLOOP;
IF enable AND l = NIL THEN {
new: LIST OF RPC.ConversationID = zone.CONS[
first: c, rest: self.enabledWheelList];
self.enabledWheelList ← new;
}
ELSE IF (NOT enable) AND l # NIL THEN {
IF self.enabledWheelList = l THEN self.enabledWheelList ← l.rest
ELSE
FOR p: LIST OF RPC.ConversationID ← self.enabledWheelList, p.rest UNTIL p = NIL DO
IF p.rest = l THEN { p.rest ← l.rest; EXIT };
ENDLOOP;
};
};
IsAlpineWheel: PUBLIC ENTRY PROC [self: Handle,
conversation: Conversation] RETURNS [enabled: BOOL] = {
c: RPC.ConversationID = RPC.GetConversationID[conversation];
IF c = AlpineIdentity.myLocalConversationID THEN RETURN [enabled: TRUE];
FOR l: LIST OF RPC.ConversationID ← self.enabledWheelList, l.rest UNTIL l = NIL DO
IF l.first = c THEN RETURN [enabled: TRUE];
ENDLOOP;
RETURN [enabled: FALSE];
};
StateDuringRecovery: PUBLIC ENTRY PROC [self: Handle]
RETURNS [TransactionMap.TransState] = {
SELECT self.stateDuringRecovery FROM
active => ERROR;
ready => RETURN [ready];
committed => RETURN [committed];
aborted => RETURN [aborted];
ENDCASE => ERROR;
};
IsCommitted: PUBLIC ENTRY PROC [self: Handle] RETURNS [BOOL] = {
SELECT self.state FROM
active, preparing => RETURN [FALSE];
completing => RETURN [self.outcome = commit];
ENDCASE => ERROR; -- locks are supposed to prevent this
};
AssertUpdatesAreComplete: PUBLIC ENTRY PROC [self: Handle] = {
IF self.state # completing THEN ERROR;
self.state ← fpmComplete;
};
-- Initialization
LogControl.RegisterAnalysisProc[workerBegin, AnalyzeWorkerBegin];
LogControl.RegisterAnalysisProc[workerReady, AnalyzeWorkerReady];
LogControl.RegisterAnalysisProc[workerCompleting, AnalyzeWorkerCompleting];
LogControl.RegisterAnalysisProc[workerComplete, AnalyzeWorkerComplete];
Log.RegisterRecoveryProc[workerBegin, RecoverWorkerBegin];
Log.RegisterRecoveryProc[workerReady, RecoverWorkerReady];
Log.RegisterRecoveryProc[workerCompleting, RecoverWorkerCompleting];
LogControl.RegisterCheckpointProc[WorkerCheckpointProc];
Process.SetTimeout[@shortWaitOver, Process.MsecToTicks[Worker.shortWaitTime]];
Process.EnableAborts[@shortWaitOver];
END.--WorkerImpl
CHANGE LOG
Changed by MBrown on February 9, 1983 11:54 am
-- Change CommitWorker to set continueWorker ← nullHandle. (This was happening,
--unsynchronized, from the checkpoint process!)
Changed by MBrown on March 2, 1983 2:53 pm
-- Change sanity check in StartWork.
Changed by MBrown on April 3, 1983 10:32 am
-- Implemented worker log watchdog. Changed AbortUnilaterally to call
--AlpineTransaction.Finish[..., abort, ...], making coordinator go away if it is local.
--Updated catch phrases for AccessControl errors.
Changed by MBrown on June 6, 1983 11:28 am
-- In WorkerImpl.CheckForReadyWorker, was examining worker state without entering
--monitor. Changed logic as follows: not WorkerImpl.CallAfterUpdatePass waits until
--all workers are either ready or fpmComplete.
Changed by MBrown on June 25, 1983 9:43 pm
-- Fixed two related bugs. If AbortUnilaterally was called with self.state = preparing,
--it simply waited for self.state # preparing. This is no good because self may be
--waiting in the lock manager, deadlocked, and the caller of AbortUnilaterally may be
--the lock watchdog process! This was observed several times in server operation.
-- The second bug was more subtle and probably has not yet arisen in practice. When
--a process calls LockControl.AbortWaitingRequests, what guarantee does it have that no
--new requests will come along and wait later? Answer: no guarantee. This synchronization
--must be done at the level of the worker object monitor, and the lock manager does not
--enter the worker monitor. We wrapped all calls to LockControl.AbortWaitingRequests in
--loops that are designed to ensure that when the loop exits, no more Lock.Set or
--LockControl.UpgradeLocks calls will be executed for this transaction. This seems messy
--but it is not clear how to do better.