DIRECTORY AccessControl, AlpineEnvironment, AlpineIdentity, AlpineImport, AlpineInternal, AlpineTransaction, AlpineTransMgr, AlpineTransMgrRpcControl, BasicTime, ClientMap USING [GetName], ConversationTable, ConvertUnsafe, FileControl, FilePageMgr, Lock, LockControl, AlpineLog, LogControl, LogInline, Process, Rope, RPC, SafeStorage, SkiPatrolHooks USING [ShouldAbort], SkiPatrolLog USING [LockConflictInfo, notice, TransactionAbortInfo], TransactionMap, Worker, WorkerControl, WorkerInternal; WorkerImpl: -- CEDAR ? -- MONITOR LOCKS self USING self: Handle IMPORTS AccessControl, AlpineIdentity, AlpineImport, AlpineTransaction, AlpineTransMgrRpcControl, BasicTime, ClientMap, ConversationTable, ConvertUnsafe, FileControl, FilePageMgr, Lock, LockControl, AlpineLog, LogControl, LogInline, Process, Rope, RPC, SafeStorage, SkiPatrolLog, TransactionMap, WorkerInternal EXPORTS AlpineTransaction, AlpineTransMgr, AlpineInternal, SkiPatrolHooks, TransactionMap, WorkerControl, WorkerInternal = BEGIN Conversation: TYPE = AlpineEnvironment.Conversation; TransID: TYPE = AlpineEnvironment.TransID; nullTransID: TransID = AlpineEnvironment.nullTransID; FileStore: TYPE = AlpineEnvironment.FileStore; FileInstanceHandle: TYPE = AlpineInternal.FileInstanceHandle; RecordID: TYPE = AlpineLog.RecordID; RecordType: TYPE = AlpineLog.RecordType; InternalProgrammingError: ERROR = CODE; Handle: TYPE = Worker.Handle; nullHandle: Handle = Worker.nullHandle; TransObject: PUBLIC TYPE = Worker.Object; Refused: PUBLIC ERROR [why: AlpineTransMgr.Refusal] = CODE; zone: ZONE = SafeStorage.GetSystemZone[]; disableLocalCoordinatorOptimization: BOOL _ FALSE; shortWaitOver: CONDITION; densityFactor: INT _ 64; longTimeSinceLastStartWork: INT _ 1000; -- seconds (about 15 minutes) logWordsUsableByClient: INT; maxLogWordsForFastRestart: INT = 3000*LONG[AlpineEnvironment.wordsPerPage]; maxLogWordsForWorker: INT; CreateWorker: PUBLIC PROC [ conversation: Conversation, transID: TransID, coordinator: FileStore] = { self: Handle; alreadyRegistered: BOOL _ FALSE; whyCallFailed: RPC.CallFailure; registerWorkerResult: AlpineTransMgr.RegisterWorkerResult; alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord; conversationOut: Conversation; IF TransactionMap.GetHandle[transID] # nullHandle THEN alreadyRegistered _ TRUE ELSE self _ ConsWorker[transID, AlpineImport.Register[coordinator]]; IF alreadyRegistered OR TransactionMap.Register[self].alreadyRegistered THEN { self _ TransactionMap.GetHandle[transID]; IF self = nullHandle OR WaitUntilNotUnknown[self].workerNotActive THEN ERROR AlpineTransaction.Unknown[transID]; } ELSE { conversationOut _ ConversationTable.Fetch[self.coordinator]; alpineTransMgr _ AlpineImport.GetTransMgrInterface[self.coordinator]; IF conversationOut = NIL OR alpineTransMgr = NIL THEN GOTO noCoordinator; registerWorkerResult _ alpineTransMgr.RegisterWorker[conversationOut, transID ! RPC.CallFailed => CHECKED { whyCallFailed _ why; GOTO callFailed } ]; EndCreateWorker[self, IF registerWorkerResult = ok THEN create ELSE abort]; EXITS noCoordinator => { EndCreateWorker[self, abort]; ERROR AlpineTransaction.Unknown[coordinator] }; callFailed => { EndCreateWorker[self, abort]; SELECT whyCallFailed FROM timeout, unbound => { self.coordinator.TransMgrInterfaceCallFailed[alpineTransMgr]; alpineTransMgr _ NIL; --must come after CallFailed is unwound ERROR AlpineTransaction.Unknown[coordinator] }; busy => ERROR AlpineTransaction.OperationFailed[busy]; runtimeProtocol, stubProtocol => ERROR; ENDCASE } }; }; ConsWorker: PROC [trans: TransID, coordinator: AlpineImport.Handle] RETURNS [Handle] = { RETURN [zone.NEW[Worker.Object _ [ transID: trans, timeOfLastStartWork: BasicTime.Now[], coordinator: coordinator, coordinatorIsRemote: disableLocalCoordinatorOptimization OR NOT coordinator.Equal[AlpineIdentity.myAlpineImportHandle]]]]; }; EndCreateWorker: ENTRY PROC [self: Handle, outcome: {create, abort} ] = { IF self.state # unknown THEN ERROR; IF outcome = create THEN { LogWorkerBegin[self]; self.state _ active; IF self.locks = NIL THEN self.locks _ LockControl.ConsTransHeader[self]; } ELSE { self.state _ complete; self.outcome _ abort; TransactionMap.Unregister[self]; }; }; WaitUntilNotUnknown: ENTRY PROC [self: Handle] RETURNS [workerNotActive: BOOL] = { WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP; RETURN [self.state # active]; }; WorkerPrepare: PUBLIC PROC [ conversation: Conversation, trans: TransID, newTrans: AlpineEnvironment.TransID] RETURNS [AlpineTransMgr.WorkerState] = { stateAfterPrepare: AlpineEnvironment.WorkerState; self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN [notReady]; IF NOT Rope.Equal[s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN ERROR Refused[wrongCoordinator]; SELECT BeginPrepare[self] FROM preparing => NULL; -- normal case ready => RETURN [ready]; -- duplicate call ENDCASE => RETURN [notReady]; -- duplicate call IF newTrans # nullTransID THEN { self.continueWorker _ ConsWorker[newTrans, self.coordinator]; IF TransactionMap.Register[self.continueWorker].alreadyRegistered THEN ERROR; }; { IF PreventSomeWork[self: self, allowDifficulty: normal].workInProgress THEN { DO IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; }; AccessControl.PhaseOneSpaceAndOwnerChanges[self ! AccessControl.LockFailed, AccessControl.Unknown => GOTO abort]; IF PreventStartWork[self: self, allowDifficulty: zero].abortThisTrans THEN GOTO abort; stateAfterPrepare _ ReadyWorker[self]; EXITS abort => { DO LockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; stateAfterPrepare _ notReady; }; }; EndPrepare[self, stateAfterPrepare]; SELECT stateAfterPrepare FROM notReady => NormalAbortWorker[self]; readOnlyReady => NormalCommitWorker[self]; ready => NULL; ENDCASE; RETURN [stateAfterPrepare] }; BeginPrepare: ENTRY PROC [self: Handle] RETURNS [Worker.State] = { DO SELECT self.state FROM unknown, preparing => WAIT shortWaitOver; ENDCASE => EXIT; ENDLOOP; IF self.state = active THEN self.state _ preparing; RETURN [self.state]; }; ReadyWorker: PROC [self: Handle] RETURNS [AlpineEnvironment.WorkerState] = { lockRoutine: Rope.ROPE; -- (name of LockControl routine called) howFailed: AlpineEnvironment.LockFailure; readOnly: BOOL; {readOnly _ FileControl.CommitPhase1[self ! Lock.Failed => { lockRoutine _ "WorkerImpl.ReadyWorker (calling CommitPhase1)"; howFailed _ why; GOTO lockProblem }; Lock.TransAborting => GOTO abort]; LockControl.UpgradeLocks[self ! Lock.Failed => { lockRoutine _ "WorkerImpl.ReadyWorker (calling UpgradeLocks)"; howFailed _ why; GOTO lockProblem }; Lock.TransAborting => GOTO abort]; RETURN [IF readOnly THEN readOnlyReady ELSE ready]; EXITS lockProblem => { logProc: PROC [SkiPatrolLog.LockConflictInfo]; IF (logProc _ SkiPatrolLog.notice.lockConflict) # NIL THEN logProc[[ what: howFailed, where: lockRoutine, transID: self.transID, mode: none, specifics: unknown[], message: "(sigh) attempted locking mode not known" ]]; RETURN [notReady]; }; abort => RETURN [notReady]; }}; EndPrepare: ENTRY PROC [self: Handle, stateAfterPrepare: AlpineEnvironment.WorkerState] = { SELECT stateAfterPrepare FROM notReady => { LogWorkerCompleting[self: self, outcome: abort, force: FALSE]; self.state _ completing; self.outcome _ abort; }; readOnlyReady => { LogWorkerCompleting[self: self, outcome: readOnly, force: FALSE]; self.state _ completing; self.outcome _ commit; }; ready => { LogWorkerReady[self: self, force: self.coordinatorIsRemote]; self.state _ ready; }; ENDCASE => ERROR; }; WorkerFinish: PUBLIC PROC [ conversation: Conversation, trans: TransID, requiredOutcome: AlpineTransMgr.RequiredOutcome] = { self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN; --we must have already finished. IF NOT Rope.Equal[ s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN ERROR Refused[wrongCoordinator]; IF requiredOutcome NOT IN [abort .. commit] THEN ERROR; LocalWorkerFinish[self, requiredOutcome, clientRequest]; }; AbortUnilaterally: PUBLIC PROC [ self: Handle, why: TransactionMap.AbortReason] = { SELECT BeginFinish[self, abort, returnIfPreparing] FROM active => ERROR Refused [notReady]; preparing => { DO LockControl.AbortWaitingRequests[self]; IF ShortWaitForState[self] # preparing THEN RETURN; ENDLOOP; }; completing => { IF PreventStartWork[self, zero].abortThisTrans THEN { DO LockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; }; NormalAbortWorker[self]; [] _ AlpineTransaction.Finish[ AlpineIdentity.myLocalConversation, self.transID, abort, FALSE]; RETURN; }; ENDCASE => RETURN; }; ShortWaitForState: ENTRY PROC [self: Handle] RETURNS [Worker.State] = INLINE { WAIT shortWaitOver; RETURN [self.state]; }; ShortWaitForNStarts: ENTRY PROC [self: Handle] RETURNS [nStarts: [0..Worker.maxStarts]] = INLINE { WAIT shortWaitOver; RETURN [self.nStarts]; }; LocalWorkerFinish: PROC [ self: Handle, requiredOutcome: AlpineTransMgr.RequiredOutcome, whyAbort: TransactionMap.AbortReason] = { SELECT BeginFinish[self, requiredOutcome, waitIfPreparing] FROM active => ERROR Refused [notReady]; completing => { IF requiredOutcome = abort THEN { IF PreventStartWork[self, zero].abortThisTrans THEN { DO LockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; }; NormalAbortWorker[self] } ELSE NormalCommitWorker[self]; }; ENDCASE => RETURN; }; BeginFinish: ENTRY PROC [self: Handle, requiredOutcome: AlpineTransMgr.RequiredOutcome, option: {returnIfPreparing, waitIfPreparing}] RETURNS [Worker.State] = { DO SELECT self.state FROM unknown, completing => WAIT shortWaitOver; preparing => IF option = waitIfPreparing THEN WAIT shortWaitOver ELSE RETURN [preparing]; ENDCASE => EXIT; ENDLOOP; IF requiredOutcome = abort AND (self.state = active OR self.state = ready) THEN { LogWorkerCompleting[self, abort, FALSE]; self.state _ completing; self.outcome _ abort; } ELSE IF requiredOutcome = commit AND self.state = ready THEN { LogWorkerCompleting[self, commit, self.coordinatorIsRemote]; self.state _ completing; self.outcome _ commit; }; RETURN [self.state]; }; NormalCommitWorker: PROC [self: Handle] = { AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[transHandle: self, outcome: commit, continueTrans: self.continueWorker]; CommitWorker[self]; }; CommitWorker: PROC [self: Handle] = { FileControl.CommitPhase2[trans: self, newTrans: self.continueWorker]; IF self.continueWorker # nullHandle THEN { LockControl.TransferLocks[from: self, to: self.continueWorker]; self.continueWorker.locks _ self.locks; self.locks _ NIL; EndCreateWorker[self.continueWorker, create]; self.continueWorker _ nullHandle; } ELSE { LockControl.ReleaseLocks[self]; self.locks _ NIL; }; }; NormalAbortWorker: PROC [self: Handle] = { AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[transHandle: self, outcome: abort]; AbortWorker[self]; }; AbortWorker: PROC [self: Handle] = { logProc: PROC [SkiPatrolLog.TransactionAbortInfo]; IF (logProc _ SkiPatrolLog.notice.abortTransaction) # NIL THEN logProc[[ transID: self.transID, where: "WorkerImpl.AbortWorker", locks: self.locks, why: lockInfo, message: "" ]]; FileControl.Abort[trans: self]; LockControl.ReleaseLocks[self]; self.locks _ NIL; IF self.continueWorker # nullHandle THEN { EndCreateWorker[self.continueWorker, abort]; self.continueWorker _ nullHandle; }; }; LogWorkerBegin: PROC [self: Handle] = { rec: Worker.BeginLogRep _ []; ConvertUnsafe.AppendRope[ to: LOOPHOLE[LONG[@rec.coordinator]], from: self.coordinator.Name]; [thisRecord: self.beginRecord] _ AlpineLog.Write[self, workerBegin, [base: @rec, length: TEXT[rec.length].SIZE]]; }; LogWorkerReady: PROC [self: Handle, force: BOOL] = { [] _ AlpineLog.Write[self, workerReady, AlpineLog.nullBlock, force]; }; LogWorkerCompleting: PROC [self: Handle, outcome: AlpineInternal.WorkerOutcome, force: BOOL] = { rec: Worker.CompletingLogRep _ [outcome: outcome]; [] _ AlpineLog.Write[self, workerCompleting, [base: @rec, length: Worker.CompletingLogRep.SIZE], force]; }; LogWorkerComplete: PROC [self: Handle] = { [] _ AlpineLog.Write[self, workerComplete, AlpineLog.nullBlock]; }; AnalyzeWorkerBegin: PROC [ record: RecordID, type: RecordType, trans: TransID] = { coordinator: AlpineEnvironment.FileStore; { -- get coordinator name from log, map it into a FileStore. rec: Worker.BeginLogRep _ []; status: AlpineLog.ReadProcStatus; [status: status] _ AlpineLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Worker.BeginLogRep.SIZE]]; IF status = destinationFull THEN ERROR InternalProgrammingError; coordinator _ ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.coordinator]]]; }; { -- create a worker handle from trans, setting its coordinator field from above. self: Handle _ ConsWorker[trans, AlpineImport.Register[coordinator]]; IF TransactionMap.Register[self].alreadyRegistered THEN ERROR InternalProgrammingError; self.beginRecord _ record; self.locks _ LockControl.ConsTransHeader[self]; self.allowableDifficulty _ zero; self.stateDuringRecovery _ active; }; }; AnalyzeWorkerReady: PROC [ record: RecordID, type: RecordType, trans: TransID] = { self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN; IF self.stateDuringRecovery # active THEN ERROR InternalProgrammingError; self.stateDuringRecovery _ ready; }; AnalyzeWorkerCompleting: PROC [ record: RecordID, type: RecordType, trans: TransID] = { outcome: AlpineInternal.WorkerOutcome; { -- get outcome from log rec: Worker.CompletingLogRep; status: AlpineLog.ReadProcStatus; [status: status] _ AlpineLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: Worker.CompletingLogRep.SIZE]]; IF status # normal THEN ERROR InternalProgrammingError; outcome _ rec.outcome; }; { self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN; IF self.stateDuringRecovery NOT IN [active .. ready] THEN ERROR InternalProgrammingError; self.stateDuringRecovery _ IF outcome = abort THEN aborted ELSE committed; }; }; AnalyzeWorkerComplete: PROC [ record: RecordID, type: RecordType, trans: TransID] = { self: Handle = TransactionMap.GetHandle[trans]; IF self = nullHandle THEN RETURN; SELECT self.stateDuringRecovery FROM active, ready => ERROR; committed => self.outcome _ commit; aborted => self.outcome _ abort ENDCASE => ERROR; LockControl.ReleaseLocks[self]; self.locks _ NIL; self.state _ complete; TransactionMap.Unregister[self]; }; CallAfterAnalysisPass: PUBLIC PROC [] = { AbortActiveWorker: PROC [self: Handle] RETURNS [stop: BOOL] = { SELECT self.stateDuringRecovery FROM active => { LogWorkerCompleting[self: self, outcome: abort, force: FALSE]; self.stateDuringRecovery _ aborted; }; ready, committed, aborted => NULL; ENDCASE => ERROR; RETURN [stop: FALSE]; }; logWordsUsableByClient _ LogControl.WordsUsableByClient[]; maxLogWordsForWorker _ MIN[ maxLogWordsForFastRestart, logWordsUsableByClient]; TransactionMap.LockedEnumerate[AbortActiveWorker]; }; RecoverWorkerBegin: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: AlpineLog.TransState] = { IF trans.state # unknown THEN ERROR; trans.state _ active; }; RecoverWorkerReady: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: AlpineLog.TransState] = { IF trans.state # active THEN ERROR; trans.state _ ready; }; RecoverWorkerCompleting: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: AlpineLog.TransState] = { trans.state _ completing; SELECT outcome FROM committed => { trans.state _ ready; IF ReadyWorker[trans] = notReady THEN ERROR; trans.state _ completing; trans.outcome _ commit; [] _ CommitWorker[trans]; }; aborted => { trans.state _ completing; trans.outcome _ abort; AbortWorker[trans]; }; ready => { trans.state _ ready; IF ReadyWorker[trans] # ready THEN ERROR; }; ENDCASE => ERROR; }; CallAfterUpdatePass: PUBLIC PROC [] = { noActiveWorkers: BOOL; CheckForActiveWorkers: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { SELECT self.state FROM ready, fpmComplete => NULL; ENDCASE => noActiveWorkers _ FALSE; RETURN [stop: FALSE]; }; DO noActiveWorkers _ TRUE; TransactionMap.LockedEnumerate[CheckForActiveWorkers]; IF noActiveWorkers THEN RETURN ELSE Process.Pause[Process.SecondsToTicks[1]]; ENDLOOP; }; WorkerCheckpointProc: PROC [] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { fpmCompleteWorkerSeen: BOOL _ FALSE; ExamineWorkerBeforeForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = fpmComplete THEN { self.state _ fpmCompleteBeingForcedOut; fpmCompleteWorkerSeen _ TRUE; }; RETURN [stop: FALSE]; }; oldestBeginRecord: AlpineLog.RecordID _ AlpineLog.lastRecordID; ExamineWorkerAfterForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = fpmCompleteBeingForcedOut THEN { LogWorkerComplete[self]; TransactionMap.Unregister[self]; self.state _ complete; } ELSE IF self.state # unknown THEN oldestBeginRecord _ LogInline.Min[self.beginRecord, oldestBeginRecord]; RETURN [stop: FALSE]; }; TransactionMap.UnlockedEnumerate[ExamineWorkerBeforeForceOut]; IF fpmCompleteWorkerSeen THEN FilePageMgr.ForceOutEverything[]; TransactionMap.UnlockedEnumerate[ExamineWorkerAfterForceOut]; IF oldestBeginRecord # AlpineLog.lastRecordID THEN IF LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], oldestBeginRecord] > maxLogWordsForWorker THEN WorkerInternal.NotifyLogWatchdog[]; RETURN [oldestBeginRecord, oldestBeginRecord]; }; WorkerLogWatchdogProcess: PUBLIC PROC [] = { transID: TransID _ nullTransID; w: Handle _ NIL; wait: BOOL _ FALSE; DO IF transID # nullTransID THEN w _ TransactionMap.GetHandle[transID]; IF w = NIL THEN w _ GetOldestActiveWorker[]; IF w = NIL THEN { transID _ nullTransID; wait _ TRUE } ELSE SELECT TestForAbort[w] FROM notActive => transID _ nullTransID; abort => { logProc: PROC [SkiPatrolLog.TransactionAbortInfo]; IF (logProc _ SkiPatrolLog.notice.abortTransaction) # NIL THEN logProc[[ transID: w.transID, where: "WorkerImpl.WorkerLogWatchdogProcess", why: watchDog, message: "had old uncommitted updates" ]]; AbortUnilaterally[w, oldUncommittedUpdates]; transID _ nullTransID; }; dontAbort => { transID _ w.transID; wait _ TRUE }; ENDCASE => ERROR; w _ NIL; -- so that w can be garbage-collected IF wait THEN { WorkerInternal.WaitForNotify[]; wait _ FALSE }; ENDLOOP; }; TestForAbort: PUBLIC ENTRY SAFE PROC [self: Handle] RETURNS [SkiPatrolHooks.ShouldAbort] = TRUSTED { logWordsForWorker: INT = LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], self.beginRecord]; IF self.state # active THEN RETURN [notActive]; IF logWordsForWorker <= maxLogWordsForWorker THEN RETURN [dontAbort]; IF logWordsForWorker > logWordsUsableByClient THEN RETURN [abort]; IF BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] < longTimeSinceLastStartWork --active-- AND self.estimatedUpdateCost * densityFactor > logWordsForWorker --expensive-- THEN RETURN [dontAbort]; RETURN [abort]; }; InactiveWorker: PUBLIC ENTRY SAFE PROC [self: Handle] RETURNS [BOOLEAN] ~ CHECKED { RETURN [BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] > longTimeSinceLastStartWork]; }; GetOldestActiveWorker: PROC [] RETURNS [Handle] = { w: Handle _ NIL; ExamineWorker: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = active AND (w = NIL OR LogInline.Compare[w.beginRecord, self.beginRecord] # less) THEN w _ self; RETURN [stop: FALSE]; }; TransactionMap.UnlockedEnumerate[ExamineWorker]; RETURN [w]; }; StartWork: PUBLIC ENTRY PROC [self: Handle, difficulty: AlpineInternal.WorkLevel] RETURNS [canWork: BOOL] = { IF difficulty > self.allowableDifficulty THEN RETURN [FALSE]; WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP; IF self.state NOT IN [active .. preparing] THEN RETURN [FALSE]; WHILE self.nStarts = Worker.maxStarts DO WAIT shortWaitOver; IF difficulty > self.allowableDifficulty THEN RETURN [FALSE]; ENDLOOP; self.nStarts _ self.nStarts + 1; self.timeOfLastStartWork _ BasicTime.Now[]; RETURN [TRUE]; }; StopWork: PUBLIC ENTRY PROC [self: Handle, estimatedUpdateCost: INT] = { IF self.nStarts = 0 THEN ERROR; --impossible if clients are correctly coded-- self.nStarts _ self.nStarts - 1; self.estimatedUpdateCost _ self.estimatedUpdateCost + estimatedUpdateCost; }; PreventStartWork: ENTRY PROC [self: Handle, allowDifficulty: Worker.Difficulty] RETURNS [abortThisTrans: BOOL] = { IF self.nStarts # 0 THEN { self.allowableDifficulty _ zero; RETURN [abortThisTrans: TRUE]; } ELSE { self.allowableDifficulty _ allowDifficulty; RETURN [abortThisTrans: FALSE]; }; }; PreventSomeWork: ENTRY PROC [self: Handle, allowDifficulty: Worker.Difficulty] RETURNS [workInProgress: BOOL] = { self.allowableDifficulty _ allowDifficulty; RETURN [workInProgress: self.nStarts # 0]; }; GetTimeOfLastStartWork: PUBLIC ENTRY PROC [self: Handle] RETURNS [BasicTime.GMT] = { RETURN [self.timeOfLastStartWork]; }; GetEstimatedUpdateCost: PUBLIC ENTRY PROC [self: Handle] RETURNS [INT] = { RETURN [self.estimatedUpdateCost]; }; GetTransID: PUBLIC PROC [self: Handle] RETURNS [TransID] = { RETURN [self.transID]; }; GetFileInstanceList: PUBLIC PROC [self: Handle] RETURNS [fileInstanceList: FileInstanceHandle] = { RETURN [self.fileInstanceList]; }; SetFileInstanceList: PUBLIC PROC [self: Handle, fileInstanceList: FileInstanceHandle] = { self.fileInstanceList _ fileInstanceList; }; GetLockHeader: PUBLIC PROC [self: Handle] RETURNS [lockHeader: AlpineInternal.LockTransHeaderHandle] = { RETURN[self.locks]; }; EnableAlpineWheel: PUBLIC ENTRY PROC [self: Handle, conversation: Conversation, enable: BOOL] = { l: LIST OF RPC.ConversationID; c: RPC.ConversationID = RPC.GetConversationID[conversation]; IF c = AlpineIdentity.myLocalConversationID THEN RETURN; FOR l _ self.enabledWheelList, l.rest UNTIL l = NIL DO IF l.first = c THEN EXIT; ENDLOOP; IF enable AND l = NIL THEN { new: LIST OF RPC.ConversationID = zone.CONS[ first: c, rest: self.enabledWheelList]; self.enabledWheelList _ new; } ELSE IF (NOT enable) AND l # NIL THEN { IF self.enabledWheelList = l THEN self.enabledWheelList _ l.rest ELSE FOR p: LIST OF RPC.ConversationID _ self.enabledWheelList, p.rest UNTIL p = NIL DO IF p.rest = l THEN { p.rest _ l.rest; EXIT }; ENDLOOP; }; }; IsAlpineWheel: PUBLIC ENTRY PROC [self: Handle, conversation: Conversation] RETURNS [enabled: BOOL] = { c: RPC.ConversationID = RPC.GetConversationID[conversation]; IF c = AlpineIdentity.myLocalConversationID THEN RETURN [enabled: TRUE]; FOR l: LIST OF RPC.ConversationID _ self.enabledWheelList, l.rest UNTIL l = NIL DO IF l.first = c THEN RETURN [enabled: TRUE]; ENDLOOP; RETURN [enabled: FALSE]; }; StateDuringRecovery: PUBLIC ENTRY PROC [self: Handle] RETURNS [TransactionMap.TransState] = { SELECT self.stateDuringRecovery FROM active => ERROR; ready => RETURN [ready]; committed => RETURN [committed]; aborted => RETURN [aborted]; ENDCASE => ERROR; }; IsCommitted: PUBLIC ENTRY PROC [self: Handle] RETURNS [BOOL] = { SELECT self.state FROM active, preparing => RETURN [FALSE]; completing => RETURN [self.outcome = commit]; ENDCASE => ERROR; -- locks are supposed to prevent this }; AssertUpdatesAreComplete: PUBLIC ENTRY PROC [self: Handle] = { IF self.state # completing THEN ERROR; self.state _ fpmComplete; }; TransIDFromTransHandle: PUBLIC SAFE PROC [h: AlpineInternal.TransHandle] RETURNS [AlpineEnvironment.TransID] ~ CHECKED { wh: Worker.Handle; IF h = NIL THEN RETURN [AlpineEnvironment.nullTransID] ELSE { TRUSTED {wh _ LOOPHOLE[h]}; RETURN [wh.transID] } }; LogControl.RegisterAnalysisProc[workerBegin, AnalyzeWorkerBegin]; LogControl.RegisterAnalysisProc[workerReady, AnalyzeWorkerReady]; LogControl.RegisterAnalysisProc[workerCompleting, AnalyzeWorkerCompleting]; LogControl.RegisterAnalysisProc[workerComplete, AnalyzeWorkerComplete]; AlpineLog.RegisterRecoveryProc[workerBegin, RecoverWorkerBegin]; AlpineLog.RegisterRecoveryProc[workerReady, RecoverWorkerReady]; AlpineLog.RegisterRecoveryProc[workerCompleting, RecoverWorkerCompleting]; LogControl.RegisterCheckpointProc[WorkerCheckpointProc]; Process.SetTimeout[@shortWaitOver, Process.MsecToTicks[Worker.shortWaitTime]]; Process.EnableAborts[@shortWaitOver]; END.--WorkerImpl CHANGE LOG Changed by MBrown on February 9, 1983 11:54 am Changed by MBrown on March 2, 1983 2:53 pm Changed by MBrown on April 3, 1983 10:32 am Changed by MBrown on June 6, 1983 11:28 am Changed by MBrown on June 25, 1983 9:43 pm #.WorkerImpl.mesa Copyright c 1984, 1985 by Xerox Corporation. All rights reserved. Implements worker in two-phase commit: create, prepare, finish (commit or abort.) Hauser, March 28, 1985 9:50:00 am PST Carl Hauser, March 28, 1986 2:36:34 pm PST Last edited by Kolling on June 6, 1983 12:05 pm MBrown on January 30, 1984 12:12:43 pm PST Kupfer on March 7, 1985 2:26:53 pm PST NOTES: It is poor to land in debugger due to RPC.CallFailed[protocolError]. The implementation of AbortUnilaterally should be improved by making a remote call to the coordinator, if the coordinator is remote. AlpineInternal.TransObject AlpineTransMgr.Refused. When TRUE, worker always forces log as if coordinator were remote. Client waits here under various unlikely circumstances involving concurrent calls to CreateWorker, PrepareWorker, and FinishWorker, concurrent calls to worker actions that use exclusive mode StartWork, and excessive concurrency among other worker actions. The wait times out after Worker.shortWaitTime milliseconds, allowing the situation to be reevaluated. Nobody notifies shortWaitOver. Parameters to worker log watchdog: if self.estimatedUpdateCost*densityFactor is larger than the amount of log tied up by this transaction, then the transaction is considered too expensive to abort (unless it is inactive or its log use exceeeds logWordsUsableByClient). if the time since last start work is greater than this, the the transaction is considered inactive. absolute upper bound on of log tied up by a transaction. fast restart means two minutes: two passes over 3000 pages at 50 pages/sec. A worker may comfortably keep this much log active. Beyond this bound, the worker may be aborted (but need not be). These two values are initialized during recovery (based on the log length) and not changed thereafter (except perhaps from the debugger). Worker-related procs ! Unknown {coordinator, transID}, OperationFailed {busy}; AlpineTransaction.CreateWorker. Called by a client. The worker object monitor is not held during the call to AlpineCoordinator.RegisterWorker. This was designed to allow AlpineCoordinator.Finish to hold the coordinator object monitor during its calls to workers. (AlpineCoordinator.Finish does not actually work this way). Drop self on the floor since transID is already registered. We are not allowed to call RegisterWorker; some other process is doing it. Wait for self.state # unknown, then return result. We registered the volatile worker, so we are responsible to call RegisterWorker and update volatile and permanent data structures to reflect the result. We cannot return or error until self.state # unknown, lest another process hang in WaitUntilNotUnknown. Note: does not establish the locks field of the Worker.Object. self is registered in TransactionMap, in unknown state (no WorkerBegin written). Must change state from unknown since another process in WaitUntilNotUnknown may be waiting for this event. ! Refused {wrongCoordinator} AlpineTransMgr.WorkerPrepare. Called by the coordinator of "trans". This procedure is prepared for duplicate calls. Extra callers wait on the global condition variable prepareFinished. We have caused self to enter the preparing state. Perform specialized version of CreateWorker. The newTrans is already known to us. This probably represents an error in the coordinator. Some work is still in progress. Wait for it to go away. Call any external caches now; they are allowed to do read/write pages. Any call to StartWork for this transaction will now fail. There is no work in progress, including waiting lock requests. We therefore have exclusive access without entering the monitor. We have caused self to enter either the ready state or the completing state. result is # unknown, active. result is = preparing iff this call caused the transition to enter the preparing state. Called during normal operation and during recovery. The caller has exclusive access to the transaction self, is not holding the monitor. ! Refused {wrongCoordinator, notReady}; AlpineTransMgr.WorkerFinish Called by the coordinator of "trans". TransactionMap.AbortUnilaterally Note some duplicate logic with LocalWorkerFinish. We cannot determine self's outcome, and are not responsible for making it complete, but we are responsible for ensuring that it leaves the preparing state. Otherwise it might stay there forever, waiting in the lock manager. This loop is really necessary, since neither process involved is synchronized by self's or Lock's monitors. We have caused self to enter the completing state, self.outcome = abort. Hence we are responsible for making it complete. Some work is still in progress. Wait for it to go away. temporary kludge, effective since remote coordinators aren't used now. We have caused self to enter the completing state, self.outcome = requiredOutcome. Hence we are responsible for making it complete. Some work is still in progress. Wait for it to go away. result is # unknown, ready. result is = active only if requiredOutcome = commit and transaction was unknown or active on entry. (Caller is in error.) result is = preparing only if returnIfPreparing = TRUE and transaction was preparing on entry. (Caller is trying to perform unilateral abort.) result is = completing iff this call caused the transaction to enter the completing state. (Caller is now responsible for making transaction complete.) self.state = completing, self.outcome = commit Call any external caches now; they need to know about the commit. Called during normal operation and during recovery. self.state = completing, self.outcome = abort Call any external caches now; they need to know about the abort. Called during normal operation and during recovery. Log writing and analysis; checkpoint proc WorkerControl.CallAfterAnalysisPass WorkerControl.CallAfterUpdatePass "Inactive" = haven't done a "StartWork" for some number of seconds. Procedures exported to TransactionMap, and some related procedures. Return abortThisTrans ~ TRUE if work is in progress. Return workInProgress ~ TRUE if work is in progress. Not ENTRY because the trans field is immutable. Not ENTRY because self.fileInstanceList "belongs" to the file manager. Not ENTRY because self.fileInstanceList "belongs" to the file manager. Not ENTRY because self.locks "belongs" to the lock manager. For SkiPatrolHooks. Initialization Change CommitWorker to set continueWorker _ nullHandle. (This was happening, unsynchronized, from the checkpoint process!) Change sanity check in StartWork. Implemented worker log watchdog. Changed AbortUnilaterally to call AlpineTransaction.Finish[..., abort, ...], making coordinator go away if it is local. Updated catch phrases for AccessControl errors. In WorkerImpl.CheckForReadyWorker, was examining worker state without entering monitor. Changed logic as follows: not WorkerImpl.CallAfterUpdatePass waits until all workers are either ready or fpmComplete. Fixed two related bugs. If AbortUnilaterally was called with self.state = preparing, it simply waited for self.state # preparing. This is no good because self may be waiting in the lock manager, deadlocked, and the caller of AbortUnilaterally may be the lock watchdog process! This was observed several times in server operation. The second bug was more subtle and probably has not yet arisen in practice. When a process calls LockControl.AbortWaitingRequests, what guarantee does it have that no new requests will come along and wait later? Answer: no guarantee. This synchronization must be done at the level of the worker object monitor, and the lock manager does not enter the worker monitor. We wrapped all calls to LockControl.AbortWaitingRequests in loops that are designed to ensure that when the loop exits, no more Lock.Set or LockControl.UpgradeLocks calls will be executed for this transaction. This seems messy but it is not clear how to do better. Edited June 1984, by Kupfer Add InactiveWorker function. Edited on July 12, 1984 3:10:32 pm PDT, by Kupfer changes to: StartWork, StopWork: Reflect change to SkiPatrolLog. Edited on July 25, 1984 11:05:24 am PDT, by Kupfer (1) Get rid of "work" probes and add probe for locking errors and for worker abort. (2) Make CreateWorker be cleaner about what ERRORs it returns. (3) Add TransIDFromTransHandle. changes to: ReadyWorker, StartWork, StopWork, WorkerImpl, WorkerLogWatchdogProcess, AbortWorker Edited on August 6, 1984 3:40:12 pm PDT, by Kupfer (1) Make TransIDFromTransHandle handle a NIL argument gracefully. (2) fix a comment about what signals WorkerPrepare generates. (3) Remove the possible race condition in SkiPatrolLog probes by assigning the PROC to a temporary variable. changes to: WorkerImpl, AbortWorker, WorkerLogWatchdogProcess, WorkerPrepare, TransIDFromTransHandle Edited on February 14, 1985 5:29:18 pm PST, by Kupfer Add the "specifics" field to the SkiPatrolLog call in ReadyWorker. Carl Hauser, October 4, 1985 1:40:26 pm PDT Change "Log" to "AlpineLog" Κš˜Icodešœ™Jšœ Οmœ7™BšœQ™QK™%K™*—K˜šœ™Kšœ ™ Kšœ*™*Kšœ&™&—K˜Kšœ™K˜KšœD™DKšœ„™„˜K˜šΟk ˜ K˜K˜K˜K˜ K˜K˜K˜K˜K˜ Kšœ žœ ˜K˜K˜K˜ K˜ K˜K˜ K˜ K˜ K˜ K˜K˜Kšžœ˜K˜ Kšœžœ˜#Kšœ žœ2˜DK˜K˜K˜K˜——unitš œ Οc œžœžœžœ ˜?šž˜K˜K˜K˜ K˜K˜K˜ K˜ K˜K˜K˜ K˜ K˜K˜ K˜ K˜ K˜ K˜K˜Kšžœ˜K˜ K˜ K˜K˜—šž˜K˜K˜K˜K˜K˜K˜K˜K˜—Kšœž˜Kšœžœ"˜4Kšœ žœ˜*K˜5Kšœ žœ˜.Kšœžœ%˜=Kšœ žœ˜$Kšœ žœ˜(Kšœžœžœ˜'K˜Kšœžœ˜K˜'K˜šœ žœžœ˜)Kšœ™K˜—šœ žœžœ!žœ˜;Kšœ™K˜—Kšœžœ˜)K˜šœ%žœžœ˜2KšœB™BK˜—šœž œ˜Kšœ†™†—Iheadšœ"™"šœžœ˜Kšœι™ι—šœžœ Ÿ˜EKšœc™c—šœžœ˜Kšœ9™9—šœžœžœ!˜KKšœK™K—šœžœ˜Kšœώ™ώ—M™šΟn œžœžœ˜K˜IKšœ9™9Kšœ™Kšœ™Kšœ™K˜ Kšœžœžœ˜ Kšœžœ ˜K˜:K˜9K˜K˜Kšžœ0žœž˜OKšžœ@˜Dšžœžœ1žœ˜NKšœ»™»K˜)šžœžœ+ž˜FKšžœ$˜)—K˜—šžœ˜Kšœ˜™˜Kšœg™gK˜™>šžœžœ˜"K˜K˜%K˜šœ9ž˜;Kšžœ;˜>——K˜—š œžœžœ.˜IKšœP™PKšžœžœžœ˜#šžœžœ˜K˜K˜Kšžœžœžœ0˜HK˜—šžœ˜Kšœj™jK˜K˜K˜ K˜—K˜—š œžœžœ˜.Kšžœžœ˜#Kšžœžœžœžœ˜9Kšžœ˜K˜—š  œžœžœ˜K˜+K˜$Kšžœ!˜(Kšœ™Kšœ™Kšœ%™%Kšœu™uK˜1K˜/Kšžœžœžœ ˜,KšžœžœRžœž œ˜„šžœž˜Kšœ žœŸ˜!Kšœ žœ Ÿ˜*Kšžœžœ Ÿ˜/—Kšœ1™1šžœžœ˜ Kšœ,™,K˜=šžœ@žœžœ˜MKšœ[™[—K˜—˜šžœEžœ˜MKšœ8™8šž˜Kšžœžœžœ˜+Kšžœ˜—K˜K˜—KšœF™F˜1Kšœ3žœ˜?—KšžœDžœžœ˜VKšœ9™9Kšœ€™€K˜&šž˜˜ šž˜K˜'Kšžœžœžœ˜+Kšžœ˜—K˜K˜——K˜—K˜$KšœL™Lšžœž˜K˜$K˜*Kšœ žœ˜Kšžœ˜—Kšžœ˜K˜—š  œžœžœ˜'Kšžœ˜Kšœ™KšœW™Wšž˜šžœ ž˜Kšœžœ˜)Kšžœžœ˜—Kšžœ˜—Kšžœžœ˜3Kšžœ˜K˜—š  œžœ˜ Kšžœ$˜+Kšœ3™3KšœT™TKšœžœŸ'˜@Kšœ)˜)Kšœ žœ˜šœ+˜+šœ˜Kšœ>˜>Kšœ˜Kšžœ ˜Kšœ˜—Kšœžœ˜"—˜šœ˜Kšœ>˜>Kšœ˜Kšžœ ˜Kšœ˜—Kšœžœ˜"—Kšžœžœ žœžœ˜3šž˜šœ˜Kšœ žœ!˜.šžœ0žœž˜:šœ ˜ Kšœ˜K˜K˜Kšœ ˜ K˜K˜2K˜——Kšžœ ˜K˜—Kšœ žœ ˜—Kšœ˜—š  œžœžœ˜%K˜5šžœž˜˜ Kšœ7žœ˜>K˜K˜K˜—˜Kšœ:žœ˜AK˜K˜K˜—˜ K˜K˜)šžœ5ž˜?Kšœ žœ˜#˜KšœR™RKšœ0™0šžœžœ˜!šžœ-žœ˜5Kšœ8™8šž˜K˜'Kšžœžœžœ˜+Kšžœ˜—K˜—K˜K˜—Kšžœ˜K˜—Kšžœžœ˜—K˜—š  œžœžœn˜…Kšžœ˜Kšœ™Kšœz™zKšœ™Kšœ˜™˜šž˜šžœ ž˜Kšœžœ˜*šœ žœžœžœ˜@Kšžœžœ ˜—Kšžœžœ˜—Kšžœ˜—šžœžœžœžœ˜QKšœ!žœ˜(K˜K˜K˜—šžœžœžœžœ˜>K˜šœ ˜ K˜Kšœ ˜ Kšœ˜K˜K˜ K˜——K˜K˜Kšœ žœ˜šžœ"žœ˜*K˜,K˜!K˜—K˜——šœ)™)š œžœ˜'K˜˜Kšœžœžœ2˜C—˜CKšœžœ žœ˜-—K˜—š œžœžœ˜4˜'K˜—K˜—š œžœ˜(Kšœ.žœ˜7K˜2˜,Kšœ-žœ ˜;—K˜—š œžœ˜*K˜@K˜—š œžœ˜K˜7˜)KšœŸ:˜K˜#K˜—Kšœžœ˜"Kšžœžœ˜Kšžœžœ˜—K˜—K˜:šœžœ˜K˜3—K˜2K˜—š œžœ˜K˜UKšžœžœžœ˜$K˜K˜—š œžœ˜K˜UKšžœžœžœ˜#K˜K˜—š œžœ˜K˜UK˜šžœ ž˜˜K˜Kšžœžœžœ˜,K˜2K˜K˜—˜ K˜1K˜K˜—˜ K˜Kšžœžœžœ˜)K˜—Kšžœžœ˜—K˜—š œžœžœ˜'Kšœ!™!Kšœžœ˜š  œžœžœžœžœ˜Išžœ ž˜Kšœžœ˜Kšžœžœ˜#—Kšžœžœ˜K˜—šž˜Kšœžœ˜K˜6Kšžœžœžœžœ*˜MKšžœ˜—K˜—š œžœ˜Kšžœ0˜7Kšœžœžœ˜$š  œžœžœžœžœ˜Ošžœžœ˜"K˜'Kšœžœ˜K˜—Kšžœžœ˜K˜—K˜?š  œžœžœžœžœ˜Nšžœ(žœ˜0K˜K˜ K˜K˜—šžœžœž˜!K˜G—Kšžœžœ˜K˜—K˜>Kšžœžœ"˜?K˜=šžœ,ž˜2šžœS˜UKšœžœ$˜=——Kšžœ(˜.K˜—š œžœžœ˜,K˜Kšœ žœ˜Kšœžœžœ˜šž˜Kšžœžœ'˜DKšžœžœžœ˜,Kšžœžœžœ!žœ˜6šžœžœž˜ K˜#˜ Kšœ žœ%˜2šžœ4žœž˜>šœ ˜ K˜Kšœ-˜-Kšœ˜K˜&K˜——K˜,K˜K˜—Kšœ+žœ˜2Kšžœžœ˜—KšœžœŸ%˜.Kšžœžœ*žœ˜>Kšžœ˜—K˜—š  œžœžœžœžœžœ žœ˜dšœžœ˜K˜P—Kšžœžœžœ ˜/Kšžœ+žœžœ ˜EKšžœ,žœžœ ˜BšžœH˜JKšœŸ œž˜)Kšœ=Ÿ œž˜OKšžœ ˜—Kšžœ ˜K˜—Lš œžœžœžœžœžœžœžœ˜SšœC™CKšžœf˜lK˜—š œžœžœ ˜3Kšœ žœ˜š   œžœžœžœžœ˜Ašžœžœžœž˜&Kšœ;ž˜?K˜ —Kšžœžœ˜K˜—K˜0Kšžœ˜ K˜——šœC™Cš   œžœžœžœ6žœ žœ˜mKšžœ'žœžœžœ˜=Kšžœžœžœžœ˜9Kš žœ žœžœžœžœžœ˜?šžœ!ž˜(Kšžœ˜Kšžœ'žœžœžœ˜=Kšžœ˜—K˜ K˜+Kšžœžœ˜K˜—š  œžœžœžœ%žœ˜HKšžœžœžœŸ-˜MK˜ K˜JK˜—š œžœžœ3˜OKšžœžœ˜"Kšœ4™4šžœžœ˜K˜ Kšžœžœ˜K˜—šžœ˜K˜+Kšžœžœ˜K˜—K˜—š œžœžœ3˜NKšžœžœ˜"K˜+Kšœ4™4Kšžœ$˜*K˜—K˜š œžœžœžœ˜8Kšžœ žœ˜Kšžœ˜"K˜—š  œžœžœžœžœžœ˜JKšžœ˜"K˜—š  œžœžœžœ˜Kšœ;™;Kšžœ ˜K˜—š œžœžœžœ˜3Kšœ$žœ˜-Kšœžœžœžœ˜Kšœžœžœ!˜Kšžœžœžœ˜&K˜K˜—š  œžœžœžœ!žœžœ˜xJšœ™Jšœ˜šžœžœž˜Jšžœ ˜&—šžœ˜Jšžœžœ˜Jšžœ ˜J˜—J˜K˜K˜——Kšœ™˜K˜AK˜AK˜KK˜GK˜K˜@K˜@K˜JK˜K˜8K˜K˜NK˜%K˜KšžœŸ ˜K˜—Kšžœž˜ K˜K˜.Kšœ{™{K˜K˜*Kšœ!™!K˜K˜+Kšœ™™™Kšœ/™/K˜K˜*KšœΞ™ΞK˜K˜*KšœΜ™ΜKšœό™όK™šœ™Kšœ™K™—šœ1™1Kšœ Οrœ!™@K™—šœ2™2KšœS™SKšœ>™>Kšœ™Kšœ ‘S™_—™2KšœC™CKšœ=™=Kšœl™lKšœ ‘X™d—™5Kšœ6‘ œ™B—™+K™—K™—…—_„ŸL