DIRECTORY Basics, ConstArith, YggAccessControl, YggEnvironment, YggIdentity, YggImport, YggInternal, YggTransaction, YggTransMgr, BasicTime, YggClientMap USING [GetName], YggConversationTable, ConvertUnsafe, YggFileControl, YggFilePageMgr, YggLock, YggLockControl, YggLog, YggLogControl, YggLogInline, YggDummyProcess, Rope, YggDummyRPC, YggMonitoringHooks USING [ShouldAbort], YggMonitoringLog USING [LockConflictInfo, notice, TransactionAbortInfo], YggTransactionMap, YggWorker, YggWorkerControl, YggWorkerInternal, VM; YggWorkerImpl: CEDAR MONITOR LOCKS self USING self: Handle IMPORTS ConstArith, YggAccessControl, YggIdentity, YggImport, YggTransaction, BasicTime, YggClientMap, YggConversationTable, ConvertUnsafe, YggFileControl, YggFilePageMgr, YggLock, YggLockControl, YggLog, YggLogControl, YggLogInline, YggDummyProcess, Rope, YggDummyRPC, YggMonitoringLog, YggTransactionMap, YggWorkerInternal, VM EXPORTS YggTransaction, YggTransMgr, YggInternal, YggMonitoringHooks, YggTransactionMap, YggWorkerControl, YggWorkerInternal = BEGIN Conversation: TYPE = YggEnvironment.Conversation; TransID: TYPE = YggEnvironment.TransID; nullTransID: TransID = YggEnvironment.nullTransID; FileStore: TYPE = YggEnvironment.FileStore; RecordID: TYPE = YggLog.RecordID; RecordType: TYPE = YggLog.RecordType; InternalProgrammingError: ERROR = CODE; Handle: TYPE = YggWorker.Handle; nullHandle: Handle = YggWorker.nullHandle; TransObject: PUBLIC TYPE = YggWorker.Object; Refused: PUBLIC ERROR [why: YggTransMgr.Refusal] = CODE; disableLocalCoordinatorOptimization: BOOL _ FALSE; shortWaitOver: CONDITION; densityFactor: INT _ 64; longTimeSinceLastStartWork: INT _ 1000; -- seconds (about 15 minutes) logWordsUsableByClient: ConstArith.Const; maxLogWordsForFastRestart: ConstArith.Const -- = ConstArith.FromInt[3000*VM.wordsPerPage] -- ; maxLogWordsForWorker: ConstArith.Const; CreateWorker: PUBLIC PROC [ conversation: Conversation, transID: TransID, coordinator: FileStore] = { self: Handle; alreadyRegistered: BOOL _ FALSE; registerWorkerResult: YggTransMgr.RegisterWorkerResult; alpineTransMgr: REF _ NIL; conversationOut: Conversation; IF YggTransactionMap.GetTransHandle[transID] # nullHandle THEN alreadyRegistered _ TRUE ELSE self _ ConsWorker[transID, YggImport.Register[coordinator]]; IF alreadyRegistered OR YggTransactionMap.Register[self].alreadyRegistered THEN { self _ YggTransactionMap.GetTransHandle[transID]; IF self = nullHandle OR WaitUntilNotUnknown[self].workerNotActive THEN ERROR YggTransaction.Unknown[transID]; } ELSE { conversationOut _ YggConversationTable.Fetch[self.coordinator]; IF conversationOut = NIL OR alpineTransMgr = NIL THEN GOTO noCoordinator; EndCreateWorker[self, IF registerWorkerResult = ok THEN create ELSE abort]; EXITS noCoordinator => { EndCreateWorker[self, abort]; ERROR YggTransaction.Unknown[coordinator] }; }; }; ConsWorker: PROC [trans: TransID, coordinator: YggImport.Handle] RETURNS [Handle] = { RETURN [NEW[YggWorker.Object _ [ transID: trans, timeOfLastStartWork: BasicTime.Now[], coordinator: coordinator, coordinatorIsRemote: disableLocalCoordinatorOptimization OR NOT coordinator.Equal[YggIdentity.myAlpineImportHandle]]]]; }; EndCreateWorker: ENTRY PROC [self: Handle, outcome: {create, abort} ] = { IF self.state # unknown THEN ERROR; IF outcome = create THEN { LogWorkerBegin[self]; self.state _ active; IF self.locks = NIL THEN self.locks _ YggLockControl.ConsTransHeader[self]; } ELSE { self.state _ complete; self.outcome _ abort; YggTransactionMap.Unregister[self]; }; }; WaitUntilNotUnknown: ENTRY PROC [self: Handle] RETURNS [workerNotActive: BOOL] = { WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP; RETURN [self.state # active]; }; WorkerPrepare: PUBLIC PROC [ conversation: Conversation, trans: TransID, newTrans: YggEnvironment.TransID] RETURNS [YggTransMgr.WorkerState] = { stateAfterPrepare: YggEnvironment.WorkerState; self: Handle = YggTransactionMap.GetTransHandle[trans]; IF self = nullHandle THEN RETURN [notReady]; IF NOT Rope.Equal[s1: YggClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN ERROR Refused[wrongCoordinator]; SELECT BeginPrepare[self] FROM preparing => NULL; -- normal case ready => RETURN [ready]; -- duplicate call ENDCASE => RETURN [notReady]; -- duplicate call IF newTrans # nullTransID THEN { self.continueWorker _ ConsWorker[newTrans, self.coordinator]; IF YggTransactionMap.Register[self.continueWorker].alreadyRegistered THEN ERROR; }; { IF PreventSomeWork[self: self, allowDifficulty: normal].workInProgress THEN { DO IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; }; YggAccessControl.PhaseOneSpaceAndOwnerChanges[self ! YggAccessControl.LockFailed, YggAccessControl.Unknown => GOTO abort]; IF PreventStartWork[self: self, allowDifficulty: zero].abortThisTrans THEN GOTO abort; stateAfterPrepare _ ReadyWorker[self]; EXITS abort => { DO YggLockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; stateAfterPrepare _ notReady; }; }; EndPrepare[self, stateAfterPrepare]; SELECT stateAfterPrepare FROM notReady => NormalAbortWorker[self]; readOnlyReady => NormalCommitWorker[self]; ready => NULL; ENDCASE; RETURN [stateAfterPrepare] }; BeginPrepare: ENTRY PROC [self: Handle] RETURNS [YggWorker.State] = { DO SELECT self.state FROM unknown, preparing => WAIT shortWaitOver; ENDCASE => EXIT; ENDLOOP; IF self.state = active THEN self.state _ preparing; RETURN [self.state]; }; ReadyWorker: PROC [self: Handle] RETURNS [YggEnvironment.WorkerState] = { lockRoutine: Rope.ROPE; -- (name of YggLockControl routine called) howFailed: YggEnvironment.LockFailure; readOnly: BOOL; {readOnly _ YggFileControl.CommitPhase1[self ! YggLock.Failed => { lockRoutine _ "YggWorkerImpl.ReadyWorker (calling CommitPhase1)"; howFailed _ why; GOTO lockProblem }; YggLock.TransAborting => GOTO abort]; YggLockControl.UpgradeLocks[self ! YggLock.Failed => { lockRoutine _ "YggWorkerImpl.ReadyWorker (calling UpgradeLocks)"; howFailed _ why; GOTO lockProblem }; YggLock.TransAborting => GOTO abort]; RETURN [IF readOnly THEN readOnlyReady ELSE ready]; EXITS lockProblem => { logProc: PROC [YggMonitoringLog.LockConflictInfo]; IF (logProc _ YggMonitoringLog.notice.lockConflict) # NIL THEN logProc[[ what: howFailed, where: lockRoutine, transID: self.transID, mode: none, specifics: unknown[], message: "(sigh) attempted locking mode not known" ]]; RETURN [notReady]; }; abort => RETURN [notReady]; }}; EndPrepare: ENTRY PROC [self: Handle, stateAfterPrepare: YggEnvironment.WorkerState] = { SELECT stateAfterPrepare FROM notReady => { LogWorkerCompleting[self: self, outcome: abort, force: FALSE]; self.state _ completing; self.outcome _ abort; }; readOnlyReady => { LogWorkerCompleting[self: self, outcome: readOnly, force: FALSE]; self.state _ completing; self.outcome _ commit; }; ready => { LogWorkerReady[self: self, force: self.coordinatorIsRemote]; self.state _ ready; }; ENDCASE => ERROR; }; WorkerFinish: PUBLIC PROC [ conversation: Conversation, trans: TransID, requiredOutcome: YggTransMgr.RequiredOutcome] = { self: Handle = YggTransactionMap.GetTransHandle[trans]; IF self = nullHandle THEN RETURN; --we must have already finished. IF NOT Rope.Equal[ s1: YggClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN ERROR Refused[wrongCoordinator]; IF requiredOutcome NOT IN [abort .. commit] THEN ERROR; LocalWorkerFinish[self, requiredOutcome, clientRequest]; }; AbortUnilaterally: PUBLIC PROC [ self: Handle, why: YggTransactionMap.AbortReason] = { SELECT BeginFinish[self, abort, returnIfPreparing] FROM active => ERROR Refused [notReady]; preparing => { DO YggLockControl.AbortWaitingRequests[self]; IF ShortWaitForState[self] # preparing THEN RETURN; ENDLOOP; }; completing => { IF PreventStartWork[self, zero].abortThisTrans THEN { DO YggLockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; }; NormalAbortWorker[self]; [] _ YggTransaction.Finish[ YggIdentity.myLocalConversation, self.transID, abort, FALSE]; RETURN; }; ENDCASE => RETURN; }; ShortWaitForState: ENTRY PROC [self: Handle] RETURNS [YggWorker.State] = INLINE { WAIT shortWaitOver; RETURN [self.state]; }; ShortWaitForNStarts: ENTRY PROC [self: Handle] RETURNS [nStarts: [0..YggWorker.maxStarts]] = INLINE { WAIT shortWaitOver; RETURN [self.nStarts]; }; LocalWorkerFinish: PROC [ self: Handle, requiredOutcome: YggTransMgr.RequiredOutcome, whyAbort: YggTransactionMap.AbortReason] = { SELECT BeginFinish[self, requiredOutcome, waitIfPreparing] FROM active => ERROR Refused [notReady]; completing => { IF requiredOutcome = abort THEN { IF PreventStartWork[self, zero].abortThisTrans THEN { DO YggLockControl.AbortWaitingRequests[self]; IF ShortWaitForNStarts[self] = 0 THEN EXIT; ENDLOOP; }; NormalAbortWorker[self] } ELSE NormalCommitWorker[self]; }; ENDCASE => RETURN; }; BeginFinish: ENTRY PROC [self: Handle, requiredOutcome: YggTransMgr.RequiredOutcome, option: {returnIfPreparing, waitIfPreparing}] RETURNS [YggWorker.State] = { DO SELECT self.state FROM unknown, completing => WAIT shortWaitOver; preparing => IF option = waitIfPreparing THEN WAIT shortWaitOver ELSE RETURN [preparing]; ENDCASE => EXIT; ENDLOOP; IF requiredOutcome = abort AND (self.state = active OR self.state = ready) THEN { LogWorkerCompleting[self, abort, FALSE]; self.state _ completing; self.outcome _ abort; } ELSE IF requiredOutcome = commit AND self.state = ready THEN { LogWorkerCompleting[self, commit, self.coordinatorIsRemote]; self.state _ completing; self.outcome _ commit; }; RETURN [self.state]; }; NormalCommitWorker: PROC [self: Handle] = { YggAccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[transHandle: self, outcome: commit, continueTrans: self.continueWorker]; CommitWorker[self]; }; CommitWorker: PROC [self: Handle] = { YggFileControl.CommitPhase2[trans: self, newTrans: self.continueWorker]; IF self.continueWorker # nullHandle THEN { YggLockControl.TransferLocks[from: self, to: self.continueWorker]; self.continueWorker.locks _ self.locks; self.locks _ NIL; EndCreateWorker[self.continueWorker, create]; self.continueWorker _ nullHandle; } ELSE { YggLockControl.ReleaseLocks[self]; self.locks _ NIL; }; }; NormalAbortWorker: PROC [self: Handle] = { YggAccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[transHandle: self, outcome: abort]; AbortWorker[self]; }; AbortWorker: PROC [self: Handle] = { logProc: PROC [YggMonitoringLog.TransactionAbortInfo]; IF (logProc _ YggMonitoringLog.notice.abortTransaction) # NIL THEN logProc[[ transID: self.transID, where: "WorkerImpl.AbortWorker", locks: self.locks, why: lockInfo, message: "" ]]; YggFileControl.Abort[trans: self]; YggLockControl.ReleaseLocks[self]; self.locks _ NIL; IF self.continueWorker # nullHandle THEN { EndCreateWorker[self.continueWorker, abort]; self.continueWorker _ nullHandle; }; }; LogWorkerBegin: PROC [self: Handle] = { rec: YggWorker.BeginLogRep _ []; TRUSTED { ConvertUnsafe.AppendRope[ to: LOOPHOLE[LONG[@rec.coordinator]], from: self.coordinator.Name]; [thisRecord: self.beginRecord] _ YggLog.Write[self, analysis, workerBegin, [base: @rec, length: TEXT[rec.length].SIZE]]; }; }; LogWorkerReady: PROC [self: Handle, force: BOOL] = { [] _ YggLog.Write[self, analysis, workerReady, YggLog.nullBlock, force]; }; LogWorkerCompleting: PROC [self: Handle, outcome: YggInternal.WorkerOutcome, force: BOOL] = { rec: YggWorker.CompletingLogRep _ [outcome: outcome]; TRUSTED {[] _ YggLog.Write[self, analysis, workerCompleting, [base: @rec, length: YggWorker.CompletingLogRep.SIZE], force]; }; }; LogWorkerComplete: PROC [self: Handle] = { [] _ YggLog.Write[self, analysis, workerComplete, YggLog.nullBlock]; }; AnalyzeWorkerBegin: PROC [ record: RecordID, type: RecordType, trans: TransID] = { coordinator: YggEnvironment.FileStore; { -- get coordinator name from log, map it into a FileStore. rec: YggWorker.BeginLogRep _ []; status: YggLog.ReadProcStatus; TRUSTED { [status: status] _ YggLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: YggWorker.BeginLogRep.SIZE]]; IF status = destinationFull THEN ERROR InternalProgrammingError; coordinator _ ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.coordinator]]]; }; }; { -- create a worker handle from trans, setting its coordinator field from above. self: Handle _ ConsWorker[trans, YggImport.Register[coordinator]]; IF YggTransactionMap.Register[self].alreadyRegistered THEN ERROR InternalProgrammingError; self.beginRecord _ record; self.locks _ YggLockControl.ConsTransHeader[self]; self.allowableDifficulty _ zero; self.stateDuringRecovery _ active; }; }; AnalyzeWorkerReady: PROC [ record: RecordID, type: RecordType, trans: TransID] = { self: Handle = YggTransactionMap.GetTransHandle[trans]; IF self = nullHandle THEN RETURN; IF self.stateDuringRecovery # active THEN ERROR InternalProgrammingError; self.stateDuringRecovery _ ready; }; AnalyzeWorkerCompleting: PROC [ record: RecordID, type: RecordType, trans: TransID] = { outcome: YggInternal.WorkerOutcome; { -- get outcome from log rec: YggWorker.CompletingLogRep; status: YggLog.ReadProcStatus; TRUSTED {[status: status] _ YggLog.ReadForRecovery[thisRecord: record, to: [base: @rec, length: YggWorker.CompletingLogRep.SIZE]]; }; IF status # normal THEN ERROR InternalProgrammingError; outcome _ rec.outcome; }; { self: Handle = YggTransactionMap.GetTransHandle[trans]; IF self = nullHandle THEN RETURN; IF self.stateDuringRecovery NOT IN [active .. ready] THEN ERROR InternalProgrammingError; self.stateDuringRecovery _ IF outcome = abort THEN aborted ELSE committed; }; }; AnalyzeWorkerComplete: PROC [ record: RecordID, type: RecordType, trans: TransID] = { self: Handle = YggTransactionMap.GetTransHandle[trans]; IF self = nullHandle THEN RETURN; SELECT self.stateDuringRecovery FROM active, ready => ERROR; committed => self.outcome _ commit; aborted => self.outcome _ abort ENDCASE => ERROR; YggLockControl.ReleaseLocks[self]; self.locks _ NIL; self.state _ complete; YggTransactionMap.Unregister[self]; }; CallAfterAnalysisPass: PUBLIC PROC [] = { AbortActiveWorker: PROC [self: Handle] RETURNS [stop: BOOL] = { SELECT self.stateDuringRecovery FROM active => { LogWorkerCompleting[self: self, outcome: abort, force: FALSE]; self.stateDuringRecovery _ aborted; }; ready, committed, aborted => NULL; ENDCASE => ERROR; RETURN [stop: FALSE]; }; TRUSTED { logWordsUsableByClient _ ConstArith.FromInt[YggLogControl.WordsUsableByClient[]]; maxLogWordsForWorker _ IF ConstArith.Compare[maxLogWordsForFastRestart, logWordsUsableByClient] = greater THEN maxLogWordsForFastRestart ELSE logWordsUsableByClient; }; YggTransactionMap.LockedEnumerate[AbortActiveWorker]; }; RecoverWorkerBegin: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: YggLog.TransState] = { IF trans.state # unknown THEN ERROR; trans.state _ active; }; RecoverWorkerReady: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: YggLog.TransState] = { IF trans.state # active THEN ERROR; trans.state _ ready; }; RecoverWorkerCompleting: PROC [ record: RecordID, type: RecordType, trans: Handle, outcome: YggLog.TransState] = { trans.state _ completing; SELECT outcome FROM committed => { trans.state _ ready; IF ReadyWorker[trans] = notReady THEN ERROR; trans.state _ completing; trans.outcome _ commit; [] _ CommitWorker[trans]; }; aborted => { trans.state _ completing; trans.outcome _ abort; AbortWorker[trans]; }; ready => { trans.state _ ready; IF ReadyWorker[trans] # ready THEN ERROR; }; ENDCASE => ERROR; }; CallAfterUpdatePass: PUBLIC PROC [] = { noActiveWorkers: BOOL; CheckForActiveWorkers: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { SELECT self.state FROM ready, fpmComplete => NULL; ENDCASE => noActiveWorkers _ FALSE; RETURN [stop: FALSE]; }; DO noActiveWorkers _ TRUE; YggTransactionMap.LockedEnumerate[CheckForActiveWorkers]; IF noActiveWorkers THEN RETURN ELSE YggDummyProcess.Pause[YggDummyProcess.SecondsToTicks[1]]; ENDLOOP; }; WorkerCheckpointProc: PROC [] RETURNS [keepRecord, startAnalysisRecord: RecordID] = { fpmCompleteWorkerSeen: BOOL _ FALSE; ExamineWorkerBeforeForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = fpmComplete THEN { self.state _ fpmCompleteBeingForcedOut; fpmCompleteWorkerSeen _ TRUE; }; RETURN [stop: FALSE]; }; oldestBeginRecord: YggLog.RecordID _ YggLog.lastRecordID; ExamineWorkerAfterForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = fpmCompleteBeingForcedOut THEN { LogWorkerComplete[self]; YggTransactionMap.Unregister[self]; self.state _ complete; } ELSE IF self.state # unknown THEN oldestBeginRecord _ YggLogInline.Min[self.beginRecord, oldestBeginRecord]; RETURN [stop: FALSE]; }; YggTransactionMap.UnlockedEnumerate[ExamineWorkerBeforeForceOut]; IF fpmCompleteWorkerSeen THEN YggFilePageMgr.ForceOutEverything[]; YggTransactionMap.UnlockedEnumerate[ExamineWorkerAfterForceOut]; IF oldestBeginRecord # YggLog.lastRecordID THEN { distanceFromOldestBegin: ConstArith.Const; distanceFromOldestBegin _ YggLogInline.WordsFromSubtract[ YggLogControl.RecordIDOfNextWrite[], oldestBeginRecord]; TRUSTED {IF ConstArith.Compare[distanceFromOldestBegin, oldestBeginRecord] = greater THEN YggWorkerInternal.NotifyLogWatchdog[];}; }; RETURN [oldestBeginRecord, oldestBeginRecord]; }; WorkerLogWatchdogProcess: PUBLIC PROC [] = { transID: TransID _ nullTransID; w: Handle _ NIL; wait: BOOL _ FALSE; DO IF transID # nullTransID THEN w _ YggTransactionMap.GetTransHandle[transID]; IF w = NIL THEN w _ GetOldestActiveWorker[]; IF w = NIL THEN { transID _ nullTransID; wait _ TRUE } ELSE SELECT TestForAbort[w] FROM notActive => transID _ nullTransID; abort => { logProc: PROC [YggMonitoringLog.TransactionAbortInfo]; IF (logProc _ YggMonitoringLog.notice.abortTransaction) # NIL THEN logProc[[ transID: w.transID, where: "WorkerImpl.WorkerLogWatchdogProcess", why: watchDog, message: "had old uncommitted updates" ]]; AbortUnilaterally[w, oldUncommittedUpdates]; transID _ nullTransID; }; dontAbort => { transID _ w.transID; wait _ TRUE }; ENDCASE => ERROR; w _ NIL; -- so that w can be garbage-collected IF wait THEN { YggWorkerInternal.WaitForNotify[]; wait _ FALSE }; ENDLOOP; }; TestForAbort: PUBLIC ENTRY SAFE PROC [self: Handle] RETURNS [YggMonitoringHooks.ShouldAbort] = TRUSTED { logWordsForWorker: YggEnvironment.WordCount = YggLogInline.WordsFromSubtract[YggLogControl.RecordIDOfNextWrite[], self.beginRecord]; IF self.state # active THEN RETURN [notActive]; IF ConstArith.Compare[logWordsForWorker, maxLogWordsForWorker] = less THEN RETURN [dontAbort]; IF ConstArith.Compare[logWordsForWorker, logWordsUsableByClient] = greater THEN RETURN [abort]; IF BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] < longTimeSinceLastStartWork --active-- AND self.estimatedUpdateCost * densityFactor > ConstArith.ToInt[logWordsForWorker] --expensive-- THEN RETURN [dontAbort]; RETURN [abort]; }; InactiveWorker: PUBLIC ENTRY SAFE PROC [self: Handle] RETURNS [BOOLEAN] ~ CHECKED { RETURN [BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] > longTimeSinceLastStartWork]; }; GetOldestActiveWorker: PROC [] RETURNS [Handle] = { w: Handle _ NIL; ExamineWorker: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = { IF self.state = active AND (w = NIL OR YggLogInline.Compare[w.beginRecord, self.beginRecord] # less) THEN w _ self; RETURN [stop: FALSE]; }; YggTransactionMap.UnlockedEnumerate[ExamineWorker]; RETURN [w]; }; StartWork: PUBLIC ENTRY PROC [self: Handle, difficulty: YggInternal.WorkLevel] RETURNS [canWork: BOOL] = { IF difficulty > self.allowableDifficulty THEN RETURN [FALSE]; WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP; IF self.state NOT IN [active .. preparing] THEN RETURN [FALSE]; WHILE self.nStarts = YggWorker.maxStarts DO WAIT shortWaitOver; IF difficulty > self.allowableDifficulty THEN RETURN [FALSE]; ENDLOOP; self.nStarts _ self.nStarts + 1; self.timeOfLastStartWork _ BasicTime.Now[]; RETURN [TRUE]; }; StopWork: PUBLIC ENTRY PROC [self: Handle] = { IF self.nStarts = 0 THEN ERROR; --impossible if clients are correctly coded-- self.nStarts _ self.nStarts - 1; }; PreventStartWork: ENTRY PROC [self: Handle, allowDifficulty: YggWorker.Difficulty] RETURNS [abortThisTrans: BOOL] = { IF self.nStarts # 0 THEN { self.allowableDifficulty _ zero; RETURN [abortThisTrans: TRUE]; } ELSE { self.allowableDifficulty _ allowDifficulty; RETURN [abortThisTrans: FALSE]; }; }; PreventSomeWork: ENTRY PROC [self: Handle, allowDifficulty: YggWorker.Difficulty] RETURNS [workInProgress: BOOL] = { self.allowableDifficulty _ allowDifficulty; RETURN [workInProgress: self.nStarts # 0]; }; GetTimeOfLastStartWork: PUBLIC ENTRY PROC [self: Handle] RETURNS [BasicTime.GMT] = { RETURN [self.timeOfLastStartWork]; }; GetEstimatedUpdateCost: PUBLIC ENTRY PROC [self: Handle] RETURNS [INT] = { RETURN [self.estimatedUpdateCost]; }; GetTransID: PUBLIC PROC [self: Handle] RETURNS [TransID] = { RETURN [self.transID]; }; GetOpenDocList: PUBLIC PROC [self: Handle] RETURNS [openDocList: YggInternal.OpenDoc] = { RETURN [self.openDocList]; }; SetOpenDocList: PUBLIC PROC [self: Handle, documentList: YggInternal.OpenDoc] = { self.openDocList _ documentList; }; GetLockHeader: PUBLIC PROC [self: Handle] RETURNS [lockHeader: YggInternal.LockTransHeader] = { RETURN[self.locks]; }; EnableAlpineWheel: PUBLIC ENTRY PROC [self: Handle, conversation: Conversation, enable: BOOL] = { l: LIST OF YggDummyRPC.ConversationID; c: YggDummyRPC.ConversationID = YggDummyRPC.GetConversationID[conversation]; IF c = YggIdentity.myLocalConversationID THEN RETURN; FOR l _ self.enabledWheelList, l.rest UNTIL l = NIL DO IF l.first = c THEN EXIT; ENDLOOP; IF enable AND l = NIL THEN { new: LIST OF YggDummyRPC.ConversationID = CONS[ first: c, rest: self.enabledWheelList]; self.enabledWheelList _ new; } ELSE IF (NOT enable) AND l # NIL THEN { IF self.enabledWheelList = l THEN self.enabledWheelList _ l.rest ELSE FOR p: LIST OF YggDummyRPC.ConversationID _ self.enabledWheelList, p.rest UNTIL p = NIL DO IF p.rest = l THEN { p.rest _ l.rest; EXIT }; ENDLOOP; }; }; IsAlpineWheel: PUBLIC ENTRY PROC [self: Handle, conversation: Conversation] RETURNS [enabled: BOOL] = { c: YggDummyRPC.ConversationID = YggDummyRPC.GetConversationID[conversation]; IF c = YggIdentity.myLocalConversationID THEN RETURN [enabled: TRUE]; FOR l: LIST OF YggDummyRPC.ConversationID _ self.enabledWheelList, l.rest UNTIL l = NIL DO IF l.first = c THEN RETURN [enabled: TRUE]; ENDLOOP; RETURN [enabled: FALSE]; }; StateDuringRecovery: PUBLIC ENTRY PROC [self: Handle] RETURNS [YggTransactionMap.TransState] = { SELECT self.stateDuringRecovery FROM active => ERROR; ready => RETURN [ready]; committed => RETURN [committed]; aborted => RETURN [aborted]; ENDCASE => ERROR; }; IsCommitted: PUBLIC ENTRY PROC [self: Handle] RETURNS [BOOL] = { SELECT self.state FROM active, preparing => RETURN [FALSE]; completing => RETURN [self.outcome = commit]; ENDCASE => ERROR; -- locks are supposed to prevent this }; AssertUpdatesAreComplete: PUBLIC ENTRY PROC [self: Handle] = { IF self.state # completing THEN ERROR; self.state _ fpmComplete; }; TransIDFromTransHandle: PUBLIC SAFE PROC [h: YggInternal.TransHandle] RETURNS [YggEnvironment.TransID] ~ CHECKED { wh: YggWorker.Handle; IF h = NIL THEN RETURN [YggEnvironment.nullTransID] ELSE { TRUSTED {wh _ LOOPHOLE[h]}; RETURN [wh.transID] } }; YggLogControl.RegisterAnalysisProc[workerBegin, AnalyzeWorkerBegin]; YggLogControl.RegisterAnalysisProc[workerReady, AnalyzeWorkerReady]; YggLogControl.RegisterAnalysisProc[workerCompleting, AnalyzeWorkerCompleting]; YggLogControl.RegisterAnalysisProc[workerComplete, AnalyzeWorkerComplete]; YggLog.RegisterRecoveryProc[workerBegin, RecoverWorkerBegin]; YggLog.RegisterRecoveryProc[workerReady, RecoverWorkerReady]; YggLog.RegisterRecoveryProc[workerCompleting, RecoverWorkerCompleting]; YggLogControl.RegisterCheckpointProc[WorkerCheckpointProc]; TRUSTED { maxLogWordsForFastRestart _ ConstArith.FromInt[3000*VM.wordsPerPage]; YggDummyProcess.SetTimeout[@shortWaitOver, YggDummyProcess.MsecToTicks[YggWorker.shortWaitTime]]; YggDummyProcess.EnableAborts[@shortWaitOver]; }; END.--WorkerImpl CHANGE LOG Changed by MBrown on February 9, 1983 11:54 am Changed by MBrown on March 2, 1983 2:53 pm Changed by MBrown on April 3, 1983 10:32 am Changed by MBrown on June 6, 1983 11:28 am Changed by MBrown on June 25, 1983 9:43 pm %κYggWorkerImpl.mesa Copyright Σ 1984, 1985, 1988 by Xerox Corporation. All rights reserved. Implements worker in two-phase commit: create, prepare, finish (commit or abort.) Last edited by Kolling on June 6, 1983 12:05 pm MBrown on January 30, 1984 12:12:43 pm PST Kupfer on March 7, 1985 2:26:53 pm PST Carl Hauser, March 28, 1986 2:36:34 pm PST Bob Hagmann May 27, 1988 9:56:26 am PDT NOTES: It is poor to land in debugger due to YggDummyRPC.CallFailed[protocolError]. The implementation of AbortUnilaterally should be improved by making a remote call to the coordinator, if the coordinator is remote. YggInternal.TransObject YggTransMgr.Refused. When TRUE, worker always forces log as if coordinator were remote. Client waits here under various unlikely circumstances involving concurrent calls to CreateWorker, PrepareWorker, and FinishWorker, concurrent calls to worker actions that use exclusive mode StartWork, and excessive concurrency among other worker actions. The wait times out after YggWorker.shortWaitTime milliseconds, allowing the situation to be reevaluated. Nobody notifies shortWaitOver. Parameters to worker log watchdog: if self.estimatedUpdateCost*densityFactor is larger than the amount of log tied up by this transaction, then the transaction is considered too expensive to abort (unless it is inactive or its log use exceeeds logWordsUsableByClient). if the time since last start work is greater than this, the the transaction is considered inactive. absolute upper bound on of log tied up by a transaction. fast restart means two minutes: two passes over 3000 pages at 50 pages/sec. A worker may comfortably keep this much log active. Beyond this bound, the worker may be aborted (but need not be). These two values are initialized during recovery (based on the log length) and not changed thereafter (except perhaps from the debugger). Worker-related procs ! Unknown {coordinator, transID}, OperationFailed {busy}; YggTransaction.CreateWorker. Called by a client. The worker object monitor is not held during the call to AlpineCoordinator.RegisterWorker. This was designed to allow AlpineCoordinator.Finish to hold the coordinator object monitor during its calls to workers. (AlpineCoordinator.Finish does not actually work this way). alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord; Drop self on the floor since transID is already registered. We are not allowed to call RegisterWorker; some other process is doing it. Wait for self.state # unknown, then return result. We registered the volatile worker, so we are responsible to call RegisterWorker and update volatile and permanent data structures to reflect the result. We cannot return or error until self.state # unknown, lest another process hang in WaitUntilNotUnknown. alpineTransMgr _ YggImport.GetTransMgrInterface[self.coordinator]; registerWorkerResult _ alpineTransMgr.RegisterWorker[conversationOut, transID ! YggDummyRPC.CallFailed => CHECKED { whyCallFailed _ why; GOTO callFailed } ]; callFailed => { EndCreateWorker[self, abort]; SELECT whyCallFailed FROM timeout, unbound => { self.coordinator.TransMgrInterfaceCallFailed[alpineTransMgr]; alpineTransMgr _ NIL; --must come after CallFailed is unwound ERROR YggTransaction.Unknown[coordinator] }; busy => ERROR YggTransaction.OperationFailed[busy]; runtimeProtocol, stubProtocol => ERROR; ENDCASE } Note: does not establish the locks field of the YggWorker.Object. self is registered in YggTransactionMap, in unknown state (no WorkerBegin written). Must change state from unknown since another process in WaitUntilNotUnknown may be waiting for this event. ! Refused {wrongCoordinator} YggTransMgr.WorkerPrepare. Called by the coordinator of "trans". This procedure is prepared for duplicate calls. Extra callers wait on the global condition variable prepareFinished. We have caused self to enter the preparing state. Perform specialized version of CreateWorker. The newTrans is already known to us. This probably represents an error in the coordinator. Some work is still in progress. Wait for it to go away. Call any external caches now; they are allowed to do read/write pages. Any call to StartWork for this transaction will now fail. There is no work in progress, including waiting lock requests. We therefore have exclusive access without entering the monitor. We have caused self to enter either the ready state or the completing state. result is # unknown, active. result is = preparing iff this call caused the transition to enter the preparing state. Called during normal operation and during recovery. The caller has exclusive access to the transaction self, is not holding the monitor. ! Refused {wrongCoordinator, notReady}; YggTransMgr.WorkerFinish Called by the coordinator of "trans". YggTransactionMap.AbortUnilaterally Note some duplicate logic with LocalWorkerFinish. We cannot determine self's outcome, and are not responsible for making it complete, but we are responsible for ensuring that it leaves the preparing state. Otherwise it might stay there forever, waiting in the lock manager. This loop is really necessary, since neither process involved is synchronized by self's or YggLock's monitors. We have caused self to enter the completing state, self.outcome = abort. Hence we are responsible for making it complete. Some work is still in progress. Wait for it to go away. temporary kludge, effective since remote coordinators aren't used now. We have caused self to enter the completing state, self.outcome = requiredOutcome. Hence we are responsible for making it complete. Some work is still in progress. Wait for it to go away. result is # unknown, ready. result is = active only if requiredOutcome = commit and transaction was unknown or active on entry. (Caller is in error.) result is = preparing only if returnIfPreparing = TRUE and transaction was preparing on entry. (Caller is trying to perform unilateral abort.) result is = completing iff this call caused the transaction to enter the completing state. (Caller is now responsible for making transaction complete.) self.state = completing, self.outcome = commit Call any external caches now; they need to know about the commit. Called during normal operation and during recovery. self.state = completing, self.outcome = abort Call any external caches now; they need to know about the abort. Called during normal operation and during recovery. Log writing and analysis; checkpoint proc YggWorkerControl.CallAfterAnalysisPass YggWorkerControl.CallAfterUpdatePass "Inactive" = haven't done a "StartWork" for some number of seconds. Procedures exported to YggTransactionMap, and some related procedures. Return abortThisTrans ~ TRUE if work is in progress. Return workInProgress ~ TRUE if work is in progress. Not ENTRY because the trans field is immutable. Not ENTRY because self.openDocList "belongs" to the open doc manager. Not ENTRY because self.openDocList "belongs" to the open doc manager. Not ENTRY because self.locks "belongs" to the lock manager. For YggMonitoringHooks. Initialization Change CommitWorker to set continueWorker _ nullHandle. (This was happening, unsynchronized, from the checkpoint process!) Change sanity check in StartWork. Implemented worker log watchdog. Changed AbortUnilaterally to call YggTransaction.Finish[..., abort, ...], making coordinator go away if it is local. Updated catch phrases for YggAccessControl errors. In WorkerImpl.CheckForReadyWorker, was examining worker state without entering monitor. Changed logic as follows: not WorkerImpl.CallAfterUpdatePass waits until all workers are either ready or fpmComplete. Fixed two related bugs. If AbortUnilaterally was called with self.state = preparing, it simply waited for self.state # preparing. This is no good because self may be waiting in the lock manager, deadlocked, and the caller of AbortUnilaterally may be the lock watchdog process! This was observed several times in server operation. The second bug was more subtle and probably has not yet arisen in practice. When a process calls YggLockControl.AbortWaitingRequests, what guarantee does it have that no new requests will come along and wait later? Answer: no guarantee. This synchronization must be done at the level of the worker object monitor, and the lock manager does not enter the worker monitor. We wrapped all calls to YggLockControl.AbortWaitingRequests in loops that are designed to ensure that when the loop exits, no more YggLock.Set or YggLockControl.UpgradeLocks calls will be executed for this transaction. This seems messy but it is not clear how to do better. Edited June 1984, by Kupfer Add InactiveWorker function. Edited on July 12, 1984 3:10:32 pm PDT, by Kupfer changes to: StartWork, StopWork: Reflect change to YggMonitoringLog. Edited on July 25, 1984 11:05:24 am PDT, by Kupfer (1) Get rid of "work" probes and add probe for locking errors and for worker abort. (2) Make CreateWorker be cleaner about what ERRORs it returns. (3) Add TransIDFromTransHandle. changes to: ReadyWorker, StartWork, StopWork, WorkerImpl, WorkerLogWatchdogProcess, AbortWorker Edited on August 6, 1984 3:40:12 pm PDT, by Kupfer (1) Make TransIDFromTransHandle handle a NIL argument gracefully. (2) fix a comment about what signals WorkerPrepare generates. (3) Remove the possible race condition in YggMonitoringLog probes by assigning the PROC to a temporary variable. changes to: WorkerImpl, AbortWorker, WorkerLogWatchdogProcess, WorkerPrepare, TransIDFromTransHandle Edited on February 14, 1985 5:29:18 pm PST, by Kupfer Add the "specifics" field to the YggMonitoringLog call in ReadyWorker. Carl Hauser, October 4, 1985 1:40:26 pm PDT Change "Log" to "YggLog" Κl˜Icodešœ™JšœH™HKšœQ™Qšœ™Kšœ ™ Kšœ*™*Kšœ&™&K™*K™'—K˜Kšœ™K˜KšœL™LKšœ„™„˜K˜šΟk ˜ K˜K˜ Kšœ˜Kšœ˜Kšœ ˜ Kšœ ˜ Kšœ ˜ Kšœ˜Kšœ ˜ K˜ Kšœ œ ˜Kšœ˜K˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ ˜ Kšœ˜K˜Kš œ˜ Kšœœ˜'Kšœœ2˜HKšœ˜Kšœ ˜ Kšœ˜Kšœ˜Kšœ˜——unitš Οn œœœœœ ˜:š˜K˜ Kšœ˜Kšœ ˜ Kšœ ˜ Kšœ˜K˜ Kšœ ˜ Kšœ˜K˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜Kšœ ˜ Kšœ˜K˜Kš œ˜ Kšœ˜Kšœ˜Kšœ˜K˜—š˜Kšœ˜Kšœ ˜ Kšœ ˜ Kšœ˜Kšœ˜Kšœ˜Kšœ˜K˜—Kšœ˜Kšœœ˜1Kšœ œ˜'Kšœ2˜2Kšœ œ˜+Kšœ œ˜!Kšœ œ˜%Kšœœœ˜'K˜Kšœœ˜ Kšœ*˜*K˜šœ œœ˜,Kšœ™K˜—šœ œœœ˜8Kšœ™—K˜šœ%œœ˜2KšœB™BK˜—šœ œ˜Kšœ‰™‰—Iheadšœ"™"šœœ˜Kšœι™ι—šœœ Οc˜EKšœc™c—šœ)˜)Kšœ9™9—šœ_˜_KšœK™K—šœ'˜'Kšœώ™ώ—M™šž œœœ˜K˜IKšœ9™9Kšœ™Kšœ™Kšœ™K˜ Kšœœœ˜ Kšœ7˜7K˜K™9K˜K˜Kšœ8œ˜WKšœ=˜Ašœœ4œ˜QKšœ»™»Kšœ1˜1šœœ+˜FKšœ!˜&—K˜—šœ˜Kšœ˜™˜Kšœg™gKšœ?˜?KšœB™BKš œœœœœœ˜I™OKš œœœ™M—Kšœœœœ˜K—š˜˜K˜Kšœ'˜,—™K™šœ™™K™=KšœœŸ'™=Kšœ'™,—šœ™Kšœ&™+—Kšœ!œ™'Kšœ™—Kšœ™—K˜—K˜—šž œœ0˜@Kšœ ˜KšœA™Ašœœ˜ K˜K˜%K˜šœ9˜;Kšœ8˜;——K˜—šžœœœ.˜IKšœS™SKšœœœ˜#šœœ˜K˜K˜Kšœœœ3˜KK˜—šœ˜Kšœj™jK˜K˜Kšœ#˜#K˜—K˜—šžœœœ˜.Kšœœ˜#Kšœœœœ˜9Kšœ˜K˜—šž œœœ˜K˜+Kšœ!˜!Kšœ˜%Kšœ™Kšœ™Kšœ%™%Kšœu™uKšœ.˜.Kšœ7˜7Kšœœœ ˜,KšœœUœ œ˜‡šœ˜Kšœ œŸ˜!Kšœ œ Ÿ˜*Kšœœ Ÿ˜/—Kšœ1™1šœœ˜ Kšœ,™,K˜=šœCœœ˜PKšœ[™[—K˜—˜šœEœ˜MKšœ8™8š˜Kšœœœ˜+Kšœ˜—K˜K˜—KšœF™Fšœ4˜4Kšœ9œ˜E—KšœDœœ˜VKšœ9™9Kšœ€™€K˜&š˜˜ š˜Kšœ*˜*Kšœœœ˜+Kšœ˜—K˜K˜——K˜—K˜$KšœL™Lšœ˜K˜$K˜*Kšœ œ˜Kšœ˜—Kšœ˜K˜—šž œœœ˜'Kšœ˜Kšœ™KšœW™Wš˜šœ ˜Kšœœ˜)Kšœœ˜—Kšœ˜—Kšœœ˜3Kšœ˜K˜—šž œœ˜ Kšœ!˜(Kšœ3™3KšœT™TKšœœŸ*˜CKšœ&˜&Kšœ œ˜šœ.˜.šœ˜KšœA˜AKšœ˜Kšœ ˜Kšœ˜—Kšœœ˜%—šœ"˜"šœ˜KšœA˜AKšœ˜Kšœ ˜Kšœ˜—Kšœœ˜%—Kšœœ œœ˜3š˜šœ˜Kšœ œ%˜2šœ4œ˜>šœ ˜ Kšœ˜K˜K˜Kšœ ˜ K˜K˜2K˜——Kšœ ˜K˜—Kšœ œ ˜—Kšœ˜—šž œœœ˜%Kšœ2˜2šœ˜˜ Kšœ7œ˜>K˜K˜K˜—˜Kšœ:œ˜AK˜K˜K˜—˜ K˜—Kšœœœ˜7K˜K˜K˜Kšœ7˜7Kšœœœ˜!šœœœ˜9Kšœ˜—Kšœœœ œ ˜JK˜—K˜—šžœœ˜K˜7Kšœ7˜7Kšœœœ˜!šœ˜$Kšœœ˜K˜#K˜Kšœœ˜—Kšœ"˜"Kšœ œ˜K˜Kšœ#˜#K˜—šžœœœ˜)Kšœ&™&šžœœœœ˜?šœ˜$˜ Kšœ7œ˜>K˜#K˜—Kšœœ˜"Kšœœ˜Kšœœ˜—K˜—šœ ˜ KšœQ˜QKšœ₯˜₯K˜—Kšœ5˜5K˜—šžœœ˜KšœR˜RKšœœœ˜$K˜K˜—šžœœ˜KšœR˜RKšœœœ˜#K˜K˜—šžœœ˜KšœR˜RK˜šœ ˜˜K˜Kšœœœ˜,K˜2K˜K˜—˜ K˜1K˜K˜—˜ K˜Kšœœœ˜)K˜—Kšœœ˜—K˜—šžœœœ˜'Kšœ$™$Kšœœ˜š žœœœœœ˜Išœ ˜Kšœœ˜Kšœœ˜#—Kšœœ˜K˜—š˜Kšœœ˜Kšœ9˜9Kšœœœœ:˜]Kšœ˜—K˜—šžœœ˜Kšœ0˜7Kšœœœ˜$š žœœœœœ˜Ošœœ˜"K˜'Kšœœ˜K˜—Kšœœ˜K˜—Kšœ9˜9š žœœœœœ˜Nšœ(œ˜0K˜Kšœ#˜#K˜K˜—šœœ˜!KšœJ˜J—Kšœœ˜K˜—KšœA˜AKšœœ%˜BKšœ@˜@šœ)˜1K˜*Kšœr˜rKšœUœ)˜‚K˜—Kšœ(˜.K˜—šžœœœ˜,K˜Kšœ œ˜Kšœœœ˜š˜Kšœœ/˜LKšœœœ˜,Kšœœœ!œ˜6šœœ˜ K˜#˜ Kšœ œ)˜6šœ8œ˜Bšœ ˜ K˜Kšœ-˜-Kšœ˜K˜&K˜——K˜,K˜K˜—Kšœ+œ˜2Kšœœ˜—KšœœŸ%˜.Kšœœ-œ˜AKšœ˜—K˜—šž œœœœœœ$œ˜hšœ-˜-KšœV˜V—Kšœœœ ˜/KšœDœœ ˜^KšœIœœ ˜_šœH˜JKšœŸ œ˜)KšœOŸ œ˜aKšœ ˜—Kšœ ˜K˜—Lšžœœœœœœœœ˜SšœC™CKšœf˜lK˜—šžœœœ ˜3Kšœ œ˜š ž œœœœœ˜Ašœœœ˜&Kšœ>˜BK˜ —Kšœœ˜K˜—Kšœ3˜3Kšœ˜ K˜——šœF™Fš ž œœœœ3œ œ˜jKšœ'œœœ˜=Kšœœœœ˜9Kš œ œœœœœ˜?šœ$˜+Kšœ˜Kšœ'œœœ˜=Kšœ˜—K˜ K˜+Kšœœ˜K˜—šžœœœœ˜.KšœœœŸ-˜MK˜ K˜—šžœœœ6˜RKšœœ˜"Kšœ4™4šœœ˜K˜ Kšœœ˜K˜—šœ˜K˜+Kšœœ˜K˜—K˜—šžœœœ6˜QKšœœ˜"K˜+Kšœ4™4Kšœ$˜*K˜—K˜šžœœœœ˜8Kšœ œ˜Kšœ˜"K˜—š žœœœœœœ˜JKšœ˜"K˜—šž œœœœ˜Kšœœœ˜&K˜K˜—š žœœœœœœ˜rJšœ™Jšœ˜šœœ˜Jšœ˜#—šœ˜Jšœœ˜Jšœ ˜J˜—J˜K˜K˜——Kšœ™˜KšœD˜DKšœD˜DKšœN˜NKšœJ˜JK˜Kšœ=˜=Kšœ=˜=KšœG˜GK˜Kšœ;˜;K˜šœ ˜ KšœF˜FKšœa˜aKšœ-˜-K˜—K˜KšœŸ ˜K˜—Kšœ˜ K˜K˜.Kšœ{™{K˜K˜*Kšœ!™!K˜K˜+Kšœ–™–Kšœ2™2K˜K˜*KšœΞ™ΞK˜K˜*KšœΜ™ΜKšœˆ™ˆK™šœ™Kšœ™K™—šœ1™1Kšœ Οrœ%™DK™—šœ2™2KšœS™SKšœ>™>Kšœ™Kšœ  S™_—™2KšœC™CKšœ=™=Kšœp™pKšœ  X™d—™5Kšœ:  œ™F—™+Kšœ™—K™—…—_j£ΐ