-- WorkerImpl.mesa -- Implements worker in two-phase commit: create, prepare, finish (commit or abort.) -- Last edited by -- MBrown on January 30, 1984 12:12:43 pm PST -- Kolling on June 6, 1983 12:05 pm -- NOTES: -- It seems poor for CreateWorker to ERROR Unknown[coordinator] in the case that the --coordinator is there, but refusing new RPC work (CedarRPC.CallFailed[busy].) -- It is also poor to land in debugger due to RPC.CallFailed[protocolError]. -- The implementation of AbortUnilaterally should be improved by making a remote --call to the coordinator, if the coordinator is remote. DIRECTORY AccessControl, AlpineEnvironment, AlpineIdentity, AlpineImport, AlpineInternal, AlpineTransaction, AlpineTransMgr, AlpineTransMgrRpcControl, BasicTime, ClientMap USING [GetName], ConversationTable, ConvertUnsafe, FileControl, FilePageMgr, Lock, LockControl, Log, LogControl, LogInline, Process, Rope, RPC, SafeStorage, TransactionMap, Worker, WorkerControl, WorkerInternal; WorkerImpl: MONITOR LOCKS self USING self: Handle IMPORTS AccessControl, AlpineIdentity, AlpineImport, AlpineTransaction, BasicTime, ClientMap, ConversationTable, ConvertUnsafe, FileControl, FilePageMgr, Lock, LockControl, Log, LogControl, LogInline, Process, Rope, RPC, SafeStorage, TransactionMap, WorkerInternal EXPORTS AlpineTransaction, AlpineTransMgr, AlpineInternal, TransactionMap, WorkerControl, WorkerInternal = BEGIN Conversation: TYPE = AlpineEnvironment.Conversation; TransID: TYPE = AlpineEnvironment.TransID; nullTransID: TransID = AlpineEnvironment.nullTransID; FileStore: TYPE = AlpineEnvironment.FileStore; FileInstanceHandle: TYPE = AlpineInternal.FileInstanceHandle; RecordID: TYPE = Log.RecordID; RecordType: TYPE = Log.RecordType; InternalProgrammingError: ERROR = CODE; Handle: TYPE = Worker.Handle; nullHandle: Handle = Worker.nullHandle; TransObject: PUBLIC TYPE = Worker.Object; -- AlpineInternal.TransObject Refused: PUBLIC ERROR [why: AlpineTransMgr.Refusal] = CODE; --AlpineTransMgr.Refused. zone: ZONE = SafeStorage.GetSystemZone[]; disableLocalCoordinatorOptimization: BOOL _ FALSE; -- When TRUE, worker always forces log as if coordinator were remote. shortWaitOver: CONDITION; -- Client waits here under various unlikely circumstances involving concurrent calls --to CreateWorker, PrepareWorker, and FinishWorker, concurrent calls to worker actions --that use exclusive mode StartWork, and excessive concurrency among other worker --actions. The wait times out after Worker.shortWaitTime milliseconds, allowing the --situation to be reevaluated. Nobody notifies shortWaitOver. -- Parameters to worker log watchdog: densityFactor: INT _ 64; -- if self.estimatedUpdateCost*densityFactor is larger than the amount of log tied up --by this transaction, then the transaction is considered too expensive to abort --(unless it is inactive or its log use exceeeds logWordsUsableByClient). longTimeSinceLastStartWork: INT _ 1000; -- seconds (about 15 minutes) -- if the time since last start work is greater than this, the the transaction is --considered inactive. logWordsUsableByClient: INT; -- absolute upper bound on of log tied up by a transaction. maxLogWordsForFastRestart: INT = 3000*LONG[AlpineEnvironment.wordsPerPage]; -- fast restart means two minutes: two passes over 3000 pages at 50 pages/sec. maxLogWordsForWorker: INT; -- A worker may comfortably keep this much log active. Beyond this bound, the --worker may be aborted (but need not be). -- These two values are initialized during recovery (based on the log length) and not --changed thereafter (except perhaps from the debugger). CreateWorker: PUBLIC PROC [ conversation: Conversation, transID: TransID, coordinator: FileStore] = { -- ! Unknown {coordinator, transID}; -- AlpineTransaction.CreateWorker. -- Called by a client. -- The worker object monitor is not held during the call to --AlpineCoordinator.RegisterWorker. This was designed to allow --AlpineCoordinator.Finish to hold the coordinator object monitor during its calls --to workers. (AlpineCoordinator.Finish does not actually work this way). self: Handle; alreadyRegistered: BOOL _ FALSE; whyCallFailed: RPC.CallFailure; registerWorkerResult: AlpineTransMgr.RegisterWorkerResult; alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord; conversationOut: Conversation; IF TransactionMap.GetHandle[transID] # nullHandle THEN alreadyRegistered _ TRUE ELSE self _ ConsWorker[transID, AlpineImport.Register[coordinator]]; IF alreadyRegistered OR TransactionMap.Register[self].alreadyRegistered THEN { -- Drop self on the floor since transID is already registered. We are not allowed --to call RegisterWorker; some other process is doing it. Wait for --self.state # unknown, then return result. self _ TransactionMap.GetHandle[transID]; IF self = nullHandle OR WaitUntilNotUnknown[self].workerNotActive THEN ERROR AlpineTransaction.Unknown[transID]; } ELSE { -- We registered the volatile worker, so we are responsible to call RegisterWorker --and update volatile and permanent data structures to reflect the result. --We cannot return or error until self.state # unknown, lest another process hang --in WaitUntilNotUnknown. conversationOut _ ConversationTable.Fetch[self.coordinator]; alpineTransMgr _ AlpineImport.GetTransMgrInterface[self.coordinator]; IF conversationOut = NIL OR alpineTransMgr = NIL THEN GOTO noCoordinator; registerWorkerResult _ alpineTransMgr.RegisterWorker[conversationOut, transID ! RPC.CallFailed => CHECKED { whyCallFailed _ why; GOTO callFailed } ]; EndCreateWorker[self, IF registerWorkerResult = ok THEN create ELSE abort]; EXITS noCoordinator => { EndCreateWorker[self, abort]; ERROR AlpineTransaction.Unknown[coordinator] }; callFailed => { EndCreateWorker[self, abort]; SELECT whyCallFailed FROM timeout, unbound => { self.coordinator.TransMgrInterfaceCallFailed[alpineTransMgr]; alpineTransMgr _ NIL; --must come after CallFailed is unwound ERROR AlpineTransaction.Unknown[coordinator] }; busy => ERROR AlpineTransaction.Unknown[coordinator]; runtimeProtocol, stubProtocol => ERROR; ENDCASE } }; }; ConsWorker: PROC [trans: TransID, coordinator: AlpineImport.Handle] RETURNS [Handle] = { -- Note: does not establish the locks field of the Worker.Object. RETURN [zone.NEW[Worker.Object _ [ transID: trans, timeOfLastStartWork: BasicTime.Now[], coordinator: coordinator, coordinatorIsRemote: disableLocalCoordinatorOptimization OR NOT coordinator.Equal[AlpineIdentity.myAlpineImportHandle]]]]; }; EndCreateWorker: ENTRY PROC [self: Handle, outcome: {create, abort} ] = { -- self is registered in TransactionMap, in unknown state (no WorkerBegin written). IF self.state # unknown THEN ERROR; IF outcome = create THEN { LogWorkerBegin[self]; self.state _ active; IF self.locks = NIL THEN self.locks _ LockControl.ConsTransHeader[self]; } ELSE { -- Must change state from unknown since another process in WaitUntilNotUnknown --may be waiting for this event. self.state _ complete; self.outcome _ abort; TransactionMap.Unregister[self]; }; }; WaitUntilNotUnknown: ENTRY PROC [self: Handle] RETURNS [workerNotActive: BOOL] = { WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP; RETURN [self.state # active]; }; WorkerPrepare: PUBLIC PROC [ conversation: Conversation, trans: TransID, newTrans: AlpineEnvironment.TransID] RETURNS [AlpineTransMgr.WorkerState] = { -- ! Refused {transUnknown, wrongCoordinator} -- AlpineTransMgr.WorkerPrepare. -- Called by the coordinator of "trans". -- This procedure is prepared for duplicate calls. Extra callers wait on the --global condition variable prepareFinished. stateAfterPrepare: AlpineEnvironment.WorkerState; self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN [notReady]; IF NOT Rope.Equal[ s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN ERROR Refused[wrongCoordinator]; SELECT BeginPrepare[self] FROM preparing => NULL; -- normal case ready => RETURN [ready]; -- duplicate call ENDCASE => RETURN [notReady]; -- duplicate call -- We have caused self to enter the preparing state. IF newTrans # nullTransID THEN { -- Perform specialized version of CreateWorker. self.continueWorker _ ConsWorker[newTrans, self.coordinator]; IF TransactionMap.Register[self.continueWorker].alreadyRegistered THEN ERROR; -- The newTrans is already known to us. This probably represents an error --in the coordinator. }; { IF PreventStartWork[self: self, allowDifficulty: normal].abortThisTrans THEN GOTO abort; -- Call any external caches now; they are allowed to do read/write pages. AccessControl.PhaseOneSpaceAndOwnerChanges[self ! AccessControl.LockFailed, AccessControl.Unknown => GOTO abort]; IF PreventStartWork[self: self, allowDifficulty: zero].abortThisTrans THEN GOTO abort; -- Any call to StartWork for this transaction will now fail. -- There is no work in progress, including waiting lock requests. -- We therefore have exclusive access without entering the monitor. stateAfterPrepare _ ReadyWorker[self]; EXITS abort => { DO LockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; stateAfterPrepare _ notReady; }; }; EndPrepare[self, stateAfterPrepare]; -- We have caused self to enter either the ready state or the completing state. SELECT stateAfterPrepare FROM notReady => NormalAbortWorker[self]; readOnlyReady => NormalCommitWorker[self]; ready => NULL; ENDCASE; RETURN [stateAfterPrepare] }; BeginPrepare: ENTRY PROC [self: Handle] RETURNS [Worker.State] = { -- result is # unknown, active. -- result is = preparing iff this call caused the transition to enter --the preparing state. DO SELECT self.state FROM unknown, preparing => WAIT shortWaitOver; ENDCASE => EXIT; ENDLOOP; IF self.state = active THEN self.state _ preparing; RETURN [self.state]; }; ReadyWorker: PROC [self: Handle] RETURNS [AlpineEnvironment.WorkerState] = { -- Called during normal operation and during recovery. -- The caller has exclusive access to the transaction self, is not holding the monitor. readOnly: BOOL _ FileControl.CommitPhase1[self ! Lock.Failed, Lock.TransAborting => GOTO abort]; LockControl.UpgradeLocks[self ! Lock.Failed, Lock.TransAborting => GOTO abort]; RETURN [IF readOnly THEN readOnlyReady ELSE ready]; EXITS abort => RETURN [notReady]; }; EndPrepare: ENTRY PROC [self: Handle, stateAfterPrepare: AlpineEnvironment.WorkerState] = { SELECT stateAfterPrepare FROM notReady => { LogWorkerCompleting[self: self, outcome: abort, force: FALSE]; self.state _ completing; self.outcome _ abort; }; readOnlyReady => { LogWorkerCompleting[self: self, outcome: readOnly, force: FALSE]; self.state _ completing; self.outcome _ commit; }; ready => { LogWorkerReady[self: self, force: self.coordinatorIsRemote]; self.state _ ready; }; ENDCASE => ERROR; }; WorkerFinish: PUBLIC PROC [ conversation: Conversation, trans: TransID, requiredOutcome: AlpineTransMgr.RequiredOutcome] = { -- ! Refused {wrongCoordinator, notReady}; -- AlpineTransMgr.WorkerFinish -- Called by the coordinator of "trans". self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN; --we must have already finished. IF NOT Rope.Equal[ s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN ERROR Refused[wrongCoordinator]; IF requiredOutcome NOT IN [abort .. commit] THEN ERROR; LocalWorkerFinish[self, requiredOutcome, clientRequest]; }; AbortUnilaterally: PUBLIC PROC [ self: Handle, why: TransactionMap.AbortReason] = { -- TransactionMap.AbortUnilaterally -- Note some duplicate logic with LocalWorkerFinish. SELECT BeginFinish[self, abort, returnIfPreparing] FROM active => ERROR Refused [notReady]; preparing => { -- We cannot determine self's outcome, and are not responsible for making it --complete, but we are responsible for ensuring that it leaves the preparing state. --Otherwise it might stay there forever, waiting in the lock manager. -- This loop is really necessary, since neither process involved is synchronized --by self's or Lock's monitors. DO LockControl.AbortWaitingRequests[self]; IF ShortWaitForState[self] # preparing THEN RETURN; ENDLOOP; }; completing => { -- We have caused self to enter the completing state, self.outcome = abort. -- Hence we are responsible for making it complete. IF PreventStartWork[self, zero].abortThisTrans THEN { -- Some work is still in progress. Wait for it to go away. DO LockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; }; NormalAbortWorker[self]; -- temporary kludge, effective since remote coordinators aren't used now. [] _ AlpineTransaction.Finish[ AlpineIdentity.myLocalConversation, self.transID, abort, FALSE]; RETURN; }; ENDCASE => RETURN; }; ShortWaitForState: ENTRY PROC [self: Handle] RETURNS [Worker.State] = INLINE { WAIT shortWaitOver; RETURN [self.state]; }; ShortWaitForNStarts: ENTRY PROC [self: Handle] RETURNS [nStarts: [0..Worker.maxStarts]] = INLINE { WAIT shortWaitOver; RETURN [self.nStarts]; }; LocalWorkerFinish: PROC [ self: Handle, requiredOutcome: AlpineTransMgr.RequiredOutcome, whyAbort: TransactionMap.AbortReason] = { SELECT BeginFinish[self, requiredOutcome, waitIfPreparing] FROM active => ERROR Refused [notReady]; completing => { -- We have caused self to enter the completing state, self.outcome = requiredOutcome. -- Hence we are responsible for making it complete. IF requiredOutcome = abort THEN { IF PreventStartWork[self, zero].abortThisTrans THEN { -- Some work is still in progress. Wait for it to go away. DO LockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; }; NormalAbortWorker[self] } ELSE NormalCommitWorker[self]; }; ENDCASE => RETURN; }; BeginFinish: ENTRY PROC [self: Handle, requiredOutcome: AlpineTransMgr.RequiredOutcome, option: {returnIfPreparing, waitIfPreparing}] RETURNS [Worker.State] = { -- result is # unknown, ready. -- result is = active only if requiredOutcome = commit and transaction was unknown --or active on entry. (Caller is in error.) -- result is = preparing only if returnIfPreparing = TRUE and transaction was preparing --on entry. (Caller is trying to perform unilateral abort.) -- result is = completing iff this call caused the transaction to enter --the completing state. (Caller is now responsible for making transaction complete.) DO SELECT self.state FROM unknown, completing => WAIT shortWaitOver; preparing => IF option = waitIfPreparing THEN WAIT shortWaitOver ELSE RETURN [preparing]; ENDCASE => EXIT; ENDLOOP; IF requiredOutcome = abort AND (self.state = active OR self.state = ready) THEN { LogWorkerCompleting[self, abort, FALSE]; self.state _ completing; self.outcome _ abort; } ELSE IF requiredOutcome = commit AND self.state = ready THEN { LogWorkerCompleting[self, commit, self.coordinatorIsRemote]; self.state _ completing; self.outcome _ commit; }; RETURN [self.state]; }; NormalCommitWorker: PROC [self: Handle] = { -- self.state = completing, self.outcome = commit -- Call any external caches now; they need to know about the commit. AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, commit]; CommitWorker[self]; }; CommitWorker: PROC [self: Handle] = { -- Called during normal operation and during recovery. FileControl.CommitPhase2[trans: self, newTrans: self.continueWorker]; IF self.continueWorker # nullHandle THEN { LockControl.TransferLocks[from: self, to: self.continueWorker]; self.continueWorker.locks _ self.locks; self.locks _ NIL; EndCreateWorker[self.continueWorker, create]; self.continueWorker _ nullHandle; } ELSE { LockControl.ReleaseLocks[self]; self.locks _ NIL; }; }; NormalAbortWorker: PROC [self: Handle] = { -- self.state = completing, self.outcome = abort -- Call any external caches now; they need to know about the abort. AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, abort]; AbortWorker[self]; }; AbortWorker: PROC [self: Handle] = { -- Called during normal operation and during recovery. FileControl.Abort[trans: self]; LockControl.ReleaseLocks[self]; self.locks _ NIL; IF self.continueWorker # nullHandle THEN { EndCreateWorker[self.continueWorker, abort]; self.continueWorker _ nullHandle; }; }; -- Log writing and analysis; checkpoint proc LogWorkerBegin: PROC [self: Handle] = { rec: Worker.BeginLogRep _ []; ConvertUnsafe.AppendRope[ to: LOOPHOLE[LONG[@rec.coordinator]], from: self.coordinator.Name]; [thisRecord: self.beginRecord] _ Log.Write[self, workerBegin, [base: @rec, length: TEXT[rec.length].SIZE]]; }; LogWorkerReady: PROC [self: Handle, force: BOOL] = { [] _ Log.Write[self, workerReady, Log.nullBlock, force]; }; LogWorkerCompleting: PROC [self: Handle, outcome: AlpineInternal.WorkerOutcome, force: BOOL] = { rec: Worker.CompletingLogRep _ [outcome: outcome]; [] _ Log.Write[self, workerCompleting, [base: @rec, length: Worker.CompletingLogRep.SIZE], force]; }; LogWorkerComplete: PROC [self: Handle] = { [] _ Log.Write[self, workerComplete, Log.nullBlock]; }; AnalyzeWorkerBegin: PROC [ record: RecordID, type: RecordType, trans: TransID] = { coordinator: AlpineEnvironment.FileStore; { -- get coordinator name from log, map it into a FileStore. rec: Worker.BeginLogRep _ []; status: Log.ReadProcStatus; [status: status] _ Log.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Worker.BeginLogRep.SIZE]]; IF status = destinationFull THEN ERROR InternalProgrammingError; coordinator _ ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.coordinator]]]; }; { -- create a worker handle from trans, setting its coordinator field from above. self: Handle _ ConsWorker[trans, AlpineImport.Register[coordinator]]; IF TransactionMap.Register[self].alreadyRegistered THEN ERROR InternalProgrammingError; self.beginRecord _ record; self.locks _ LockControl.ConsTransHeader[self]; self.allowableDifficulty _ zero; self.stateDuringRecovery _ active; }; }; AnalyzeWorkerReady: PROC [ record: RecordID, type: RecordType, trans: TransID] = { self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN; IF self.stateDuringRecovery # active THEN ERROR InternalProgrammingError; self.stateDuringRecovery _ ready; }; AnalyzeWorkerCompleting: PROC [ record: RecordID, type: RecordType, trans: TransID] = { outcome: AlpineInternal.WorkerOutcome; { -- get outcome from log rec: Worker.CompletingLogRep; status: Log.ReadProcStatus; [status: status] _ Log.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Worker.CompletingLogRep.SIZE]]; IF status # normal THEN ERROR InternalProgrammingError; outcome _ rec.outcome; }; { self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN; IF self.stateDuringRecovery NOT IN [active .. ready] THEN ERROR InternalProgrammingError; self.stateDuringRecovery _ IF outcome = abort THEN aborted ELSE committed; }; }; AnalyzeWorkerComplete: PROC [ record: RecordID, type: RecordType, trans: TransID] = { self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN; SELECT self.stateDuringRecovery FROM active, ready => ERROR; committed => self.outcome _ commit; aborted => self.outcome _ abort ENDCASE => ERROR; LockControl.ReleaseLocks[self]; self.locks _ NIL; self.state _ complete; TransactionMap.Unregister[self]; }; CallAfterAnalysisPass: PUBLIC PROC [] = { -- WorkerControl.CallAfterAnalysisPass AbortActiveWorker: PROC [self: Handle] RETURNS [stop: BOOL] = { SELECT self.stateDuringRecovery FROM active => { LogWorkerCompleting[self: self, outcome: abort, force: FALSE]; self.stateDuringRecovery _ aborted; }; ready, committed, aborted => NULL; ENDCASE => ERROR; RETURN [stop: FALSE]; }; logWordsUsableByClient _ LogControl.WordsUsableByClient[]; maxLogWordsForWorker _ MIN[ maxLogWordsForFastRestart, logWordsUsableByClient]; TransactionMap.LockedEnumerate[AbortActiveWorker]; }; RecoverWorkerBegin: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = { IF trans.state # unknown THEN ERROR; trans.state _ active; }; RecoverWorkerReady: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = { IF trans.state # active THEN ERROR; trans.state _ ready; }; RecoverWorkerCompleting: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = { trans.state _ completing; SELECT outcome FROM committed => { trans.state _ ready; IF ReadyWorker[trans] = notReady THEN ERROR; trans.state _ completing; trans.outcome _ commit; [] _ CommitWorker[trans]; }; aborted => { trans.state _ completing; trans.outcome _ abort; AbortWorker[trans]; }; ready => { trans.state _ ready; IF ReadyWorker[trans] # ready THEN ERROR; }; ENDCASE => ERROR; }; CallAfterUpdatePass: PUBLIC PROC [] = { -- WorkerControl.CallAfterUpdatePass noActiveWorkers: BOOL; CheckForActiveWorkers: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { SELECT self.state FROM ready, fpmComplete => NULL; ENDCASE => noActiveWorkers _ FALSE; RETURN [stop: FALSE]; }; DO noActiveWorkers _ TRUE; TransactionMap.LockedEnumerate[CheckForActiveWorkers]; IF noActiveWorkers THEN RETURN ELSE Process.Pause[Process.SecondsToTicks[1]]; ENDLOOP; }; WorkerCheckpointProc: PROC [] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { fpmCompleteWorkerSeen: BOOL _ FALSE; ExamineWorkerBeforeForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = fpmComplete THEN { self.state _ fpmCompleteBeingForcedOut; fpmCompleteWorkerSeen _ TRUE; }; RETURN [stop: FALSE]; }; oldestBeginRecord: Log.RecordID _ Log.lastRecordID; ExamineWorkerAfterForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = fpmCompleteBeingForcedOut THEN { LogWorkerComplete[self]; TransactionMap.Unregister[self]; self.state _ complete; } ELSE IF self.state # unknown THEN oldestBeginRecord _ LogInline.Min[self.beginRecord, oldestBeginRecord]; RETURN [stop: FALSE]; }; TransactionMap.UnlockedEnumerate[ExamineWorkerBeforeForceOut]; IF fpmCompleteWorkerSeen THEN FilePageMgr.ForceOutEverything[]; TransactionMap.UnlockedEnumerate[ExamineWorkerAfterForceOut]; IF LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], oldestBeginRecord] > maxLogWordsForWorker THEN WorkerInternal.NotifyLogWatchdog[]; RETURN [oldestBeginRecord, oldestBeginRecord]; }; WorkerLogWatchdogProcess: PUBLIC PROC [] = { transID: TransID _ nullTransID; w: Handle _ NIL; wait: BOOL _ FALSE; DO IF transID # nullTransID THEN w _ TransactionMap.GetHandle[transID]; IF w = NIL THEN w _ GetOldestActiveWorker[]; IF w = NIL THEN { transID _ nullTransID; wait _ TRUE } ELSE SELECT TestForAbort[w] FROM notActive => transID _ nullTransID; abort => { AbortUnilaterally[w, oldUncommittedUpdates]; transID _ nullTransID; }; dontAbort => { transID _ w.transID; wait _ TRUE }; ENDCASE => ERROR; w _ NIL; -- so that w can be garbage-collected IF wait THEN { WorkerInternal.WaitForNotify[]; wait _ FALSE }; ENDLOOP; }; TestForAbort: ENTRY PROC [self: Handle] RETURNS [{notActive, abort, dontAbort}] = { logWordsForWorker: INT = LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], self.beginRecord]; IF self.state # active THEN RETURN [notActive]; IF logWordsForWorker <= maxLogWordsForWorker THEN RETURN [dontAbort]; IF logWordsForWorker > logWordsUsableByClient THEN RETURN [abort]; IF BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] < longTimeSinceLastStartWork --active-- AND self.estimatedUpdateCost * densityFactor > logWordsForWorker --expensive-- THEN RETURN [dontAbort]; RETURN [abort]; }; GetOldestActiveWorker: PROC [] RETURNS [Handle] = { w: Handle _ NIL; ExamineWorker: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = active AND (w = NIL OR LogInline.Compare[w.beginRecord, self.beginRecord] # less) THEN w _ self; RETURN [stop: FALSE]; }; TransactionMap.UnlockedEnumerate[ExamineWorker]; RETURN [w]; }; -- Procedures exported to TransactionMap, and some related procedures. StartWork: PUBLIC ENTRY PROC [self: Handle, difficulty: AlpineInternal.WorkLevel] RETURNS [canWork: BOOL] = { IF difficulty > self.allowableDifficulty THEN RETURN [FALSE]; IF self.state NOT IN [active .. preparing] THEN ERROR; WHILE self.nStarts = Worker.maxStarts DO WAIT shortWaitOver; IF difficulty > self.allowableDifficulty THEN RETURN [FALSE]; ENDLOOP; self.nStarts _ self.nStarts + 1; self.timeOfLastStartWork _ BasicTime.Now[]; RETURN [TRUE]; }; StopWork: PUBLIC ENTRY PROC [self: Handle, estimatedUpdateCost: INT] = { IF self.nStarts = 0 THEN ERROR; --impossible if clients are correctly coded-- self.nStarts _ self.nStarts - 1; self.estimatedUpdateCost _ self.estimatedUpdateCost + estimatedUpdateCost; }; PreventStartWork: ENTRY PROC [self: Handle, allowDifficulty: Worker.Difficulty] RETURNS [abortThisTrans: BOOL] = { -- Return abortThisTrans ~ TRUE if work is in progress. IF self.nStarts # 0 THEN { self.allowableDifficulty _ zero; RETURN [abortThisTrans: TRUE]; } ELSE { self.allowableDifficulty _ allowDifficulty; RETURN [abortThisTrans: FALSE]; }; }; GetTimeOfLastStartWork: PUBLIC ENTRY PROC [self: Handle] RETURNS [BasicTime.GMT] = { RETURN [self.timeOfLastStartWork]; }; GetEstimatedUpdateCost: PUBLIC ENTRY PROC [self: Handle] RETURNS [INT] = { RETURN [self.estimatedUpdateCost]; }; GetTransID: PUBLIC PROC [self: Handle] RETURNS [TransID] = { -- Not ENTRY because the trans field is immutable. RETURN [self.transID]; }; GetFileInstanceList: PUBLIC PROC [self: Handle] RETURNS [fileInstanceList: FileInstanceHandle] = { -- Not ENTRY because self.fileInstanceList "belongs" to the file manager. RETURN [self.fileInstanceList]; }; SetFileInstanceList: PUBLIC PROC [self: Handle, fileInstanceList: FileInstanceHandle] = { -- Not ENTRY because self.fileInstanceList "belongs" to the file manager. self.fileInstanceList _ fileInstanceList; }; GetLockHeader: PUBLIC PROC [self: Handle] RETURNS [lockHeader: AlpineInternal.LockTransHeaderHandle] = { -- Not ENTRY because self.locks "belongs" to the lock manager. RETURN[self.locks]; }; EnableAlpineWheel: PUBLIC ENTRY PROC [self: Handle, conversation: Conversation, enable: BOOL] = { l: LIST OF RPC.ConversationID; c: RPC.ConversationID = RPC.GetConversationID[conversation]; IF c = AlpineIdentity.myLocalConversationID THEN RETURN; FOR l _ self.enabledWheelList, l.rest UNTIL l = NIL DO IF l.first = c THEN EXIT; ENDLOOP; IF enable AND l = NIL THEN { new: LIST OF RPC.ConversationID = zone.CONS[ first: c, rest: self.enabledWheelList]; self.enabledWheelList _ new; } ELSE IF (NOT enable) AND l # NIL THEN { IF self.enabledWheelList = l THEN self.enabledWheelList _ l.rest ELSE FOR p: LIST OF RPC.ConversationID _ self.enabledWheelList, p.rest UNTIL p = NIL DO IF p.rest = l THEN { p.rest _ l.rest; EXIT }; ENDLOOP; }; }; IsAlpineWheel: PUBLIC ENTRY PROC [self: Handle, conversation: Conversation] RETURNS [enabled: BOOL] = { c: RPC.ConversationID = RPC.GetConversationID[conversation]; IF c = AlpineIdentity.myLocalConversationID THEN RETURN [enabled: TRUE]; FOR l: LIST OF RPC.ConversationID _ self.enabledWheelList, l.rest UNTIL l = NIL DO IF l.first = c THEN RETURN [enabled: TRUE]; ENDLOOP; RETURN [enabled: FALSE]; }; StateDuringRecovery: PUBLIC ENTRY PROC [self: Handle] RETURNS [TransactionMap.TransState] = { SELECT self.stateDuringRecovery FROM active => ERROR; ready => RETURN [ready]; committed => RETURN [committed]; aborted => RETURN [aborted]; ENDCASE => ERROR; }; IsCommitted: PUBLIC ENTRY PROC [self: Handle] RETURNS [BOOL] = { SELECT self.state FROM active, preparing => RETURN [FALSE]; completing => RETURN [self.outcome = commit]; ENDCASE => ERROR; -- locks are supposed to prevent this }; AssertUpdatesAreComplete: PUBLIC ENTRY PROC [self: Handle] = { IF self.state # completing THEN ERROR; self.state _ fpmComplete; }; -- Initialization LogControl.RegisterAnalysisProc[workerBegin, AnalyzeWorkerBegin]; LogControl.RegisterAnalysisProc[workerReady, AnalyzeWorkerReady]; LogControl.RegisterAnalysisProc[workerCompleting, AnalyzeWorkerCompleting]; LogControl.RegisterAnalysisProc[workerComplete, AnalyzeWorkerComplete]; Log.RegisterRecoveryProc[workerBegin, RecoverWorkerBegin]; Log.RegisterRecoveryProc[workerReady, RecoverWorkerReady]; Log.RegisterRecoveryProc[workerCompleting, RecoverWorkerCompleting]; LogControl.RegisterCheckpointProc[WorkerCheckpointProc]; Process.SetTimeout[@shortWaitOver, Process.MsecToTicks[Worker.shortWaitTime]]; Process.EnableAborts[@shortWaitOver]; END.--WorkerImpl CHANGE LOG Changed by MBrown on February 9, 1983 11:54 am -- Change CommitWorker to set continueWorker _ nullHandle. (This was happening, --unsynchronized, from the checkpoint process!) Changed by MBrown on March 2, 1983 2:53 pm -- Change sanity check in StartWork. Changed by MBrown on April 3, 1983 10:32 am -- Implemented worker log watchdog. Changed AbortUnilaterally to call --AlpineTransaction.Finish[..., abort, ...], making coordinator go away if it is local. --Updated catch phrases for AccessControl errors. Changed by MBrown on June 6, 1983 11:28 am -- In WorkerImpl.CheckForReadyWorker, was examining worker state without entering --monitor. Changed logic as follows: not WorkerImpl.CallAfterUpdatePass waits until --all workers are either ready or fpmComplete. Changed by MBrown on June 25, 1983 9:43 pm -- Fixed two related bugs. If AbortUnilaterally was called with self.state = preparing, --it simply waited for self.state # preparing. This is no good because self may be --waiting in the lock manager, deadlocked, and the caller of AbortUnilaterally may be --the lock watchdog process! This was observed several times in server operation. -- The second bug was more subtle and probably has not yet arisen in practice. When --a process calls LockControl.AbortWaitingRequests, what guarantee does it have that no --new requests will come along and wait later? Answer: no guarantee. This synchronization --must be done at the level of the worker object monitor, and the lock manager does not --enter the worker monitor. We wrapped all calls to LockControl.AbortWaitingRequests in --loops that are designed to ensure that when the loop exits, no more Lock.Set or --LockControl.UpgradeLocks calls will be executed for this transaction. This seems messy --but it is not clear how to do better.