WorkerImpl:
-- CEDAR ? --
MONITOR
LOCKS self
USING self: Handle
IMPORTS
AccessControl,
AlpineIdentity,
AlpineImport,
AlpineTransaction,
BasicTime,
ClientMap,
ConversationTable,
ConvertUnsafe,
FileControl,
FilePageMgr,
Lock,
LockControl,
Log,
LogControl,
LogInline,
Process,
Rope,
RPC,
SafeStorage,
SkiPatrolLog,
TransactionMap,
WorkerInternal
EXPORTS
AlpineTransaction,
AlpineTransMgr,
AlpineInternal,
SkiPatrolHooks,
TransactionMap,
WorkerControl,
WorkerInternal
= BEGIN
Conversation: TYPE = AlpineEnvironment.Conversation;
TransID: TYPE = AlpineEnvironment.TransID;
nullTransID: TransID = AlpineEnvironment.nullTransID;
FileStore: TYPE = AlpineEnvironment.FileStore;
FileInstanceHandle: TYPE = AlpineInternal.FileInstanceHandle;
RecordID: TYPE = Log.RecordID;
RecordType: TYPE = Log.RecordType;
InternalProgrammingError: ERROR = CODE;
Handle: TYPE = Worker.Handle;
nullHandle: Handle = Worker.nullHandle;
TransObject:
PUBLIC
TYPE = Worker.Object;
AlpineInternal.TransObject
Refused:
PUBLIC
ERROR [why: AlpineTransMgr.Refusal] =
CODE;
AlpineTransMgr.Refused.
zone: ZONE = SafeStorage.GetSystemZone[];
disableLocalCoordinatorOptimization:
BOOL ←
FALSE;
When TRUE, worker always forces log as if coordinator were remote.
shortWaitOver:
CONDITION;
Client waits here under various unlikely circumstances involving concurrent calls to CreateWorker, PrepareWorker, and FinishWorker, concurrent calls to worker actions that use exclusive mode StartWork, and excessive concurrency among other worker actions. The wait times out after Worker.shortWaitTime milliseconds, allowing the situation to be reevaluated. Nobody notifies shortWaitOver.
Parameters to worker log watchdog:
densityFactor:
INT ← 64;
if self.estimatedUpdateCost*densityFactor is larger than the amount of log tied up by this transaction, then the transaction is considered too expensive to abort (unless it is inactive or its log use exceeeds logWordsUsableByClient).
longTimeSinceLastStartWork:
INT ← 1000;
-- seconds (about 15 minutes)
if the time since last start work is greater than this, the the transaction is considered inactive.
logWordsUsableByClient:
INT;
absolute upper bound on of log tied up by a transaction.
maxLogWordsForFastRestart:
INT = 3000*
LONG[AlpineEnvironment.wordsPerPage];
fast restart means two minutes: two passes over 3000 pages at 50 pages/sec.
maxLogWordsForWorker:
INT;
A worker may comfortably keep this much log active. Beyond this bound, the worker may be aborted (but need not be). These two values are initialized during recovery (based on the log length) and not changed thereafter (except perhaps from the debugger).
Worker-related procs
CreateWorker:
PUBLIC
PROC [
conversation: Conversation, transID: TransID, coordinator: FileStore] = {
! Unknown {coordinator, transID}, OperationFailed {busy};
AlpineTransaction.CreateWorker.
Called by a client.
The worker object monitor is not held during the call to AlpineCoordinator.RegisterWorker. This was designed to allow AlpineCoordinator.Finish to hold the coordinator object monitor during its calls to workers. (AlpineCoordinator.Finish does not actually work this way).
self: Handle;
alreadyRegistered: BOOL ← FALSE;
whyCallFailed: RPC.CallFailure;
registerWorkerResult: AlpineTransMgr.RegisterWorkerResult;
alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord;
conversationOut: Conversation;
IF TransactionMap.GetHandle[transID] # nullHandle THEN alreadyRegistered ← TRUE
ELSE self ← ConsWorker[transID, AlpineImport.Register[coordinator]];
IF alreadyRegistered
OR TransactionMap.Register[self].alreadyRegistered
THEN {
Drop self on the floor since transID is already registered. We are not allowed to call RegisterWorker; some other process is doing it. Wait for self.state # unknown, then return result.
self ← TransactionMap.GetHandle[transID];
IF self = nullHandle
OR WaitUntilNotUnknown[self].workerNotActive
THEN
ERROR AlpineTransaction.Unknown[transID];
}
ELSE {
We registered the volatile worker, so we are responsible to call RegisterWorker and update volatile and permanent data structures to reflect the result.
We cannot return or error until self.state # unknown, lest another process hang in WaitUntilNotUnknown.
conversationOut ← ConversationTable.Fetch[self.coordinator];
alpineTransMgr ← AlpineImport.GetTransMgrInterface[self.coordinator];
IF conversationOut = NIL OR alpineTransMgr = NIL THEN GOTO noCoordinator;
registerWorkerResult ← alpineTransMgr.RegisterWorker[conversationOut, transID !
RPC.CallFailed => CHECKED { whyCallFailed ← why; GOTO callFailed } ];
EndCreateWorker[self, IF registerWorkerResult = ok THEN create ELSE abort];
EXITS
noCoordinator => {
EndCreateWorker[self, abort];
ERROR AlpineTransaction.Unknown[coordinator] };
callFailed => {
EndCreateWorker[self, abort];
SELECT whyCallFailed
FROM
timeout, unbound => {
self.coordinator.TransMgrInterfaceCallFailed[alpineTransMgr];
alpineTransMgr ← NIL; --must come after CallFailed is unwound
ERROR AlpineTransaction.Unknown[coordinator] };
busy =>
ERROR AlpineTransaction.OperationFailed[busy];
runtimeProtocol, stubProtocol => ERROR;
ENDCASE
}
};
};
ConsWorker:
PROC [trans: TransID, coordinator: AlpineImport.Handle]
RETURNS [Handle] = {
Note: does not establish the locks field of the Worker.Object.
RETURN [zone.
NEW[Worker.Object ← [
transID: trans,
timeOfLastStartWork: BasicTime.Now[],
coordinator: coordinator,
coordinatorIsRemote: disableLocalCoordinatorOptimization
OR
NOT coordinator.Equal[AlpineIdentity.myAlpineImportHandle]]]];
};
EndCreateWorker:
ENTRY
PROC [self: Handle, outcome: {create, abort} ] = {
self is registered in TransactionMap, in unknown state (no WorkerBegin written).
IF self.state # unknown THEN ERROR;
IF outcome = create
THEN {
LogWorkerBegin[self];
self.state ← active;
IF self.locks = NIL THEN self.locks ← LockControl.ConsTransHeader[self];
}
ELSE {
Must change state from unknown since another process in WaitUntilNotUnknown may be waiting for this event.
self.state ← complete;
self.outcome ← abort;
TransactionMap.Unregister[self];
};
};
WaitUntilNotUnknown:
ENTRY
PROC [self: Handle]
RETURNS [workerNotActive: BOOL] = {
WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP;
RETURN [self.state # active];
};
WorkerPrepare:
PUBLIC
PROC [
conversation: Conversation, trans: TransID,
newTrans: AlpineEnvironment.TransID]
RETURNS [AlpineTransMgr.WorkerState] = {
! Refused {wrongCoordinator}
AlpineTransMgr.WorkerPrepare.
Called by the coordinator of "trans".
This procedure is prepared for duplicate calls. Extra callers wait on the global condition variable prepareFinished.
stateAfterPrepare: AlpineEnvironment.WorkerState;
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN [notReady];
IF NOT Rope.Equal[s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN ERROR Refused[wrongCoordinator];
SELECT BeginPrepare[self]
FROM
preparing => NULL; -- normal case
ready => RETURN [ready]; -- duplicate call
ENDCASE => RETURN [notReady]; -- duplicate call
We have caused self to enter the preparing state.
IF newTrans # nullTransID
THEN {
Perform specialized version of CreateWorker.
self.continueWorker ← ConsWorker[newTrans, self.coordinator];
IF TransactionMap.Register[self.continueWorker].alreadyRegistered
THEN
ERROR;
The newTrans is already known to us. This probably represents an error in the coordinator.
};
{
IF PreventStartWork[self: self, allowDifficulty: normal].abortThisTrans THEN GOTO abort;
Call any external caches now; they are allowed to do read/write pages.
AccessControl.PhaseOneSpaceAndOwnerChanges[self !
AccessControl.LockFailed, AccessControl.Unknown => GOTO abort];
IF PreventStartWork[self: self, allowDifficulty: zero].abortThisTrans THEN GOTO abort;
Any call to StartWork for this transaction will now fail.
There is no work in progress, including waiting lock requests. We therefore have exclusive access without entering the monitor.
stateAfterPrepare ← ReadyWorker[self];
EXITS
abort => {
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
stateAfterPrepare ← notReady;
};
};
EndPrepare[self, stateAfterPrepare];
We have caused self to enter either the ready state or the completing state.
SELECT stateAfterPrepare
FROM
notReady => NormalAbortWorker[self];
readOnlyReady => NormalCommitWorker[self];
ready => NULL;
ENDCASE;
RETURN [stateAfterPrepare]
};
BeginPrepare:
ENTRY
PROC [self: Handle]
RETURNS [Worker.State] = {
result is # unknown, active.
result is = preparing iff this call caused the transition to enter the preparing state.
DO
SELECT self.state
FROM
unknown, preparing => WAIT shortWaitOver;
ENDCASE => EXIT;
ENDLOOP;
IF self.state = active THEN self.state ← preparing;
RETURN [self.state];
};
ReadyWorker:
PROC [self: Handle]
RETURNS [AlpineEnvironment.WorkerState] = {
Called during normal operation and during recovery.
The caller has exclusive access to the transaction self, is not holding the monitor.
lockRoutine: Rope.ROPE; -- (name of LockControl routine called)
howFailed: AlpineEnvironment.LockFailure;
readOnly: BOOL;
{readOnly ← FileControl.CommitPhase1[self !
Lock.Failed => {
lockRoutine ← "WorkerImpl.ReadyWorker (calling CommitPhase1)";
howFailed ← why;
GOTO lockProblem
};
Lock.TransAborting => GOTO abort];
LockControl.UpgradeLocks[self !
Lock.Failed => {
lockRoutine ← "WorkerImpl.ReadyWorker (calling UpgradeLocks)";
howFailed ← why;
GOTO lockProblem
};
Lock.TransAborting => GOTO abort];
RETURN [IF readOnly THEN readOnlyReady ELSE ready];
EXITS
lockProblem => {
logProc: PROC [SkiPatrolLog.LockConflictInfo];
IF (logProc ← SkiPatrolLog.notice.lockConflict) #
NIL
THEN
logProc[[
what: howFailed,
where: lockRoutine,
transID: self.transID,
mode: none,
message: "(sigh) attempted locking mode not known"
]];
RETURN [notReady];
};
abort => RETURN [notReady];
}};
EndPrepare:
ENTRY
PROC [self: Handle,
stateAfterPrepare: AlpineEnvironment.WorkerState] = {
SELECT stateAfterPrepare
FROM
notReady => {
LogWorkerCompleting[self: self, outcome: abort, force: FALSE];
self.state ← completing;
self.outcome ← abort;
};
readOnlyReady => {
LogWorkerCompleting[self: self, outcome: readOnly, force: FALSE];
self.state ← completing;
self.outcome ← commit;
};
ready => {
LogWorkerReady[self: self, force: self.coordinatorIsRemote];
self.state ← ready;
};
ENDCASE => ERROR;
};
WorkerFinish:
PUBLIC
PROC [
conversation: Conversation, trans: TransID,
requiredOutcome: AlpineTransMgr.RequiredOutcome] = {
! Refused {wrongCoordinator, notReady};
AlpineTransMgr.WorkerFinish
Called by the coordinator of "trans".
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN; --we must have already finished.
IF
NOT Rope.Equal[
s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN
ERROR Refused[wrongCoordinator];
IF requiredOutcome NOT IN [abort .. commit] THEN ERROR;
LocalWorkerFinish[self, requiredOutcome, clientRequest];
};
AbortUnilaterally:
PUBLIC
PROC [
self: Handle, why: TransactionMap.AbortReason] = {
TransactionMap.AbortUnilaterally
Note some duplicate logic with LocalWorkerFinish.
SELECT BeginFinish[self, abort, returnIfPreparing]
FROM
active => ERROR Refused [notReady];
preparing => {
We cannot determine self's outcome, and are not responsible for making it complete, but we are responsible for ensuring that it leaves the preparing state. Otherwise it might stay there forever, waiting in the lock manager. This loop is really necessary, since neither process involved is synchronized by self's or Lock's monitors.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForState[self] # preparing THEN RETURN;
ENDLOOP;
};
completing => {
We have caused self to enter the completing state, self.outcome = abort.
Hence we are responsible for making it complete.
IF PreventStartWork[self, zero].abortThisTrans
THEN {
Some work is still in progress. Wait for it to go away.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
};
NormalAbortWorker[self];
temporary kludge, effective since remote coordinators aren't used now.
[] ← AlpineTransaction.Finish[
AlpineIdentity.myLocalConversation, self.transID, abort, FALSE];
RETURN;
};
ENDCASE => RETURN;
};
ShortWaitForState:
ENTRY
PROC [self: Handle]
RETURNS [Worker.State] = INLINE {
WAIT shortWaitOver;
RETURN [self.state];
};
ShortWaitForNStarts:
ENTRY
PROC [self: Handle]
RETURNS [nStarts: [0..Worker.maxStarts]] = INLINE {
WAIT shortWaitOver;
RETURN [self.nStarts];
};
LocalWorkerFinish:
PROC [
self: Handle, requiredOutcome: AlpineTransMgr.RequiredOutcome,
whyAbort: TransactionMap.AbortReason] = {
SELECT BeginFinish[self, requiredOutcome, waitIfPreparing]
FROM
active => ERROR Refused [notReady];
completing => {
We have caused self to enter the completing state, self.outcome = requiredOutcome.
Hence we are responsible for making it complete.
IF requiredOutcome = abort
THEN {
IF PreventStartWork[self, zero].abortThisTrans
THEN {
Some work is still in progress. Wait for it to go away.
DO
LockControl.AbortWaitingRequests[self];
IF ShortWaitForNStarts[self] = 0 THEN EXIT;
ENDLOOP;
};
NormalAbortWorker[self]
}
ELSE NormalCommitWorker[self];
};
ENDCASE => RETURN;
};
BeginFinish:
ENTRY
PROC [self: Handle,
requiredOutcome: AlpineTransMgr.RequiredOutcome,
option: {returnIfPreparing, waitIfPreparing}]
RETURNS [Worker.State] = {
result is # unknown, ready.
result is = active only if requiredOutcome = commit and transaction was unknown or active on entry. (Caller is in error.)
result is = preparing only if returnIfPreparing = TRUE and transaction was preparing on entry. (Caller is trying to perform unilateral abort.)
result is = completing iff this call caused the transaction to enter the completing state. (Caller is now responsible for making transaction complete.)
DO
SELECT self.state
FROM
unknown, completing => WAIT shortWaitOver;
preparing =>
IF option = waitIfPreparing
THEN
WAIT shortWaitOver
ELSE RETURN [preparing];
ENDCASE => EXIT;
ENDLOOP;
IF requiredOutcome = abort
AND (self.state = active
OR self.state = ready)
THEN {
LogWorkerCompleting[self, abort, FALSE];
self.state ← completing;
self.outcome ← abort;
}
ELSE
IF requiredOutcome = commit
AND self.state = ready
THEN {
LogWorkerCompleting[self, commit, self.coordinatorIsRemote];
self.state ← completing;
self.outcome ← commit;
};
RETURN [self.state];
};
NormalCommitWorker:
PROC [self: Handle] = {
self.state = completing, self.outcome = commit
Call any external caches now; they need to know about the commit.
AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, commit];
CommitWorker[self];
};
CommitWorker:
PROC [self: Handle] = {
Called during normal operation and during recovery.
FileControl.CommitPhase2[trans: self, newTrans: self.continueWorker];
IF self.continueWorker # nullHandle
THEN {
LockControl.TransferLocks[from: self, to: self.continueWorker];
self.continueWorker.locks ← self.locks;
self.locks ← NIL;
EndCreateWorker[self.continueWorker, create];
self.continueWorker ← nullHandle;
}
ELSE {
LockControl.ReleaseLocks[self];
self.locks ← NIL;
};
};
NormalAbortWorker:
PROC [self: Handle] = {
self.state = completing, self.outcome = abort
Call any external caches now; they need to know about the abort.
AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, abort];
AbortWorker[self];
};
AbortWorker:
PROC [self: Handle] = {
logProc: PROC [SkiPatrolLog.TransactionAbortInfo];
Called during normal operation and during recovery.
IF (logProc ← SkiPatrolLog.notice.abortTransaction) #
NIL
THEN
logProc[[
transID: self.transID,
where: "WorkerImpl.AbortWorker",
locks: self.locks,
why: lockInfo,
message: ""
]];
FileControl.Abort[trans: self];
LockControl.ReleaseLocks[self];
self.locks ← NIL;
IF self.continueWorker # nullHandle
THEN {
EndCreateWorker[self.continueWorker, abort];
self.continueWorker ← nullHandle;
};
};
Log writing and analysis; checkpoint proc
LogWorkerBegin:
PROC [self: Handle] = {
rec: Worker.BeginLogRep ← [];
ConvertUnsafe.AppendRope[
to: LOOPHOLE[LONG[@rec.coordinator]], from: self.coordinator.Name];
[thisRecord: self.beginRecord] ← Log.Write[self, workerBegin,
[base: @rec, length: TEXT[rec.length].SIZE]];
};
LogWorkerReady:
PROC [self: Handle, force:
BOOL] = {
[] ← Log.Write[self, workerReady,
Log.nullBlock, force];
};
LogWorkerCompleting:
PROC [self: Handle,
outcome: AlpineInternal.WorkerOutcome, force: BOOL] = {
rec: Worker.CompletingLogRep ← [outcome: outcome];
[] ← Log.Write[self, workerCompleting,
[base: @rec, length: Worker.CompletingLogRep.SIZE], force];
};
LogWorkerComplete:
PROC [self: Handle] = {
[] ← Log.Write[self, workerComplete, Log.nullBlock];
};
AnalyzeWorkerBegin:
PROC [
record: RecordID, type: RecordType, trans: TransID] = {
coordinator: AlpineEnvironment.FileStore;
{ -- get coordinator name from log, map it into a FileStore.
rec: Worker.BeginLogRep ← [];
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Worker.BeginLogRep.SIZE]];
IF status = destinationFull THEN ERROR InternalProgrammingError;
coordinator ← ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.coordinator]]];
};
{ -- create a worker handle from trans, setting its coordinator field from above.
self: Handle ← ConsWorker[trans, AlpineImport.Register[coordinator]];
IF TransactionMap.Register[self].alreadyRegistered THEN ERROR InternalProgrammingError;
self.beginRecord ← record;
self.locks ← LockControl.ConsTransHeader[self];
self.allowableDifficulty ← zero;
self.stateDuringRecovery ← active;
};
};
AnalyzeWorkerReady:
PROC [
record: RecordID, type: RecordType, trans: TransID] = {
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
IF self.stateDuringRecovery # active THEN ERROR InternalProgrammingError;
self.stateDuringRecovery ← ready;
};
AnalyzeWorkerCompleting:
PROC [
record: RecordID, type: RecordType, trans: TransID] = {
outcome: AlpineInternal.WorkerOutcome;
{ -- get outcome from log
rec: Worker.CompletingLogRep;
status: Log.ReadProcStatus;
[status: status] ← Log.ReadForRecovery[thisRecord: record,
to: [base: @rec, length: Worker.CompletingLogRep.SIZE]];
IF status # normal THEN ERROR InternalProgrammingError;
outcome ← rec.outcome;
};
{
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
IF self.stateDuringRecovery
NOT
IN [active .. ready]
THEN
ERROR InternalProgrammingError;
self.stateDuringRecovery ← IF outcome = abort THEN aborted ELSE committed;
};
};
AnalyzeWorkerComplete:
PROC [
record: RecordID, type: RecordType, trans: TransID] = {
self: Handle = TransactionMap.GetHandle[trans];
IF self = nullHandle THEN RETURN;
SELECT self.stateDuringRecovery
FROM
active, ready => ERROR;
committed => self.outcome ← commit;
aborted => self.outcome ← abort
ENDCASE => ERROR;
LockControl.ReleaseLocks[self];
self.locks ← NIL;
self.state ← complete;
TransactionMap.Unregister[self];
};
CallAfterAnalysisPass:
PUBLIC
PROC [] = {
WorkerControl.CallAfterAnalysisPass
AbortActiveWorker:
PROC [self: Handle]
RETURNS [stop:
BOOL] = {
SELECT self.stateDuringRecovery
FROM
active => {
LogWorkerCompleting[self: self, outcome: abort, force: FALSE];
self.stateDuringRecovery ← aborted;
};
ready, committed, aborted => NULL;
ENDCASE => ERROR;
RETURN [stop: FALSE];
};
logWordsUsableByClient ← LogControl.WordsUsableByClient[];
maxLogWordsForWorker ←
MIN[
maxLogWordsForFastRestart, logWordsUsableByClient];
TransactionMap.LockedEnumerate[AbortActiveWorker];
};
RecoverWorkerBegin:
PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
IF trans.state # unknown THEN ERROR;
trans.state ← active;
};
RecoverWorkerReady:
PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
IF trans.state # active THEN ERROR;
trans.state ← ready;
};
RecoverWorkerCompleting:
PROC [
record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
trans.state ← completing;
SELECT outcome
FROM
committed => {
trans.state ← ready;
IF ReadyWorker[trans] = notReady THEN ERROR;
trans.state ← completing; trans.outcome ← commit;
[] ← CommitWorker[trans];
};
aborted => {
trans.state ← completing; trans.outcome ← abort;
AbortWorker[trans];
};
ready => {
trans.state ← ready;
IF ReadyWorker[trans] # ready THEN ERROR;
};
ENDCASE => ERROR;
};
CallAfterUpdatePass:
PUBLIC
PROC [] = {
WorkerControl.CallAfterUpdatePass
noActiveWorkers: BOOL;
CheckForActiveWorkers:
ENTRY
PROC [self: Handle]
RETURNS [stop:
BOOL] = {
SELECT self.state
FROM
ready, fpmComplete => NULL;
ENDCASE => noActiveWorkers ← FALSE;
RETURN [stop: FALSE];
};
DO
noActiveWorkers ← TRUE;
TransactionMap.LockedEnumerate[CheckForActiveWorkers];
IF noActiveWorkers THEN RETURN ELSE Process.Pause[Process.SecondsToTicks[1]];
ENDLOOP;
};
WorkerCheckpointProc:
PROC []
RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
fpmCompleteWorkerSeen: BOOL ← FALSE;
ExamineWorkerBeforeForceOut:
ENTRY
PROC [self: Handle]
RETURNS [stop:
BOOL] = {
IF self.state = fpmComplete
THEN {
self.state ← fpmCompleteBeingForcedOut;
fpmCompleteWorkerSeen ← TRUE;
};
RETURN [stop: FALSE];
};
oldestBeginRecord: Log.RecordID ← Log.lastRecordID;
ExamineWorkerAfterForceOut:
ENTRY
PROC [self: Handle]
RETURNS [stop:
BOOL] = {
IF self.state = fpmCompleteBeingForcedOut
THEN {
LogWorkerComplete[self];
TransactionMap.Unregister[self];
self.state ← complete;
}
ELSE
IF self.state # unknown
THEN
oldestBeginRecord ← LogInline.Min[self.beginRecord, oldestBeginRecord];
RETURN [stop: FALSE];
};
TransactionMap.UnlockedEnumerate[ExamineWorkerBeforeForceOut];
IF fpmCompleteWorkerSeen THEN FilePageMgr.ForceOutEverything[];
TransactionMap.UnlockedEnumerate[ExamineWorkerAfterForceOut];
IF LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], oldestBeginRecord] >
maxLogWordsForWorker THEN WorkerInternal.NotifyLogWatchdog[];
RETURN [oldestBeginRecord, oldestBeginRecord];
};
WorkerLogWatchdogProcess:
PUBLIC
PROC [] = {
transID: TransID ← nullTransID;
w: Handle ← NIL;
wait: BOOL ← FALSE;
DO
IF transID # nullTransID THEN w ← TransactionMap.GetHandle[transID];
IF w = NIL THEN w ← GetOldestActiveWorker[];
IF w = NIL THEN { transID ← nullTransID; wait ← TRUE }
ELSE
SELECT TestForAbort[w]
FROM
notActive => transID ← nullTransID;
abort => {
logProc: PROC [SkiPatrolLog.TransactionAbortInfo];
IF (logProc ← SkiPatrolLog.notice.abortTransaction) #
NIL
THEN
logProc[[
transID: w.transID,
where: "WorkerImpl.WorkerLogWatchdogProcess",
why: watchDog,
message: "had old uncommitted updates"
]];
AbortUnilaterally[w, oldUncommittedUpdates];
transID ← nullTransID;
};
dontAbort => { transID ← w.transID; wait ← TRUE };
ENDCASE => ERROR;
w ← NIL; -- so that w can be garbage-collected
IF wait THEN { WorkerInternal.WaitForNotify[]; wait ← FALSE };
ENDLOOP;
};
TestForAbort:
PUBLIC
ENTRY
SAFE
PROC [self: Handle]
RETURNS [SkiPatrolHooks.ShouldAbort] =
TRUSTED {
logWordsForWorker:
INT =
LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], self.beginRecord];
IF self.state # active THEN RETURN [notActive];
IF logWordsForWorker <= maxLogWordsForWorker THEN RETURN [dontAbort];
IF logWordsForWorker > logWordsUsableByClient THEN RETURN [abort];
IF BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] <
longTimeSinceLastStartWork --active-- AND
self.estimatedUpdateCost * densityFactor > logWordsForWorker --expensive-- THEN
RETURN [dontAbort];
RETURN [abort];
};
InactiveWorker: PUBLIC ENTRY SAFE PROC [self: Handle] RETURNS [BOOLEAN] ~ CHECKED {
"Inactive" = haven't done a "StartWork" for some number of seconds.
RETURN [BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] > longTimeSinceLastStartWork];
};
GetOldestActiveWorker:
PROC []
RETURNS [Handle] = {
w: Handle ← NIL;
ExamineWorker:
ENTRY
PROC [self: Handle]
RETURNS [stop:
BOOL] = {
IF self.state = active
AND (w =
NIL
OR
LogInline.Compare[w.beginRecord, self.beginRecord] # less) THEN
w ← self;
RETURN [stop: FALSE];
};
TransactionMap.UnlockedEnumerate[ExamineWorker];
RETURN [w];
};
Procedures exported to TransactionMap, and some related procedures.
StartWork:
PUBLIC
ENTRY
PROC [self: Handle, difficulty: AlpineInternal.WorkLevel]
RETURNS [canWork:
BOOL] = {
IF difficulty > self.allowableDifficulty THEN RETURN [FALSE];
IF self.state NOT IN [active .. preparing] THEN ERROR;
WHILE self.nStarts = Worker.maxStarts
DO
WAIT shortWaitOver;
IF difficulty > self.allowableDifficulty THEN RETURN [FALSE];
ENDLOOP;
self.nStarts ← self.nStarts + 1;
self.timeOfLastStartWork ← BasicTime.Now[];
RETURN [TRUE];
};
StopWork:
PUBLIC
ENTRY
PROC [self: Handle, estimatedUpdateCost:
INT] = {
IF self.nStarts = 0 THEN ERROR; --impossible if clients are correctly coded--
self.nStarts ← self.nStarts - 1;
self.estimatedUpdateCost ← self.estimatedUpdateCost + estimatedUpdateCost;
};
PreventStartWork:
ENTRY
PROC [self: Handle, allowDifficulty: Worker.Difficulty]
RETURNS [abortThisTrans: BOOL] = {
Return abortThisTrans ~ TRUE if work is in progress.
IF self.nStarts # 0
THEN {
self.allowableDifficulty ← zero;
RETURN [abortThisTrans: TRUE];
}
ELSE {
self.allowableDifficulty ← allowDifficulty;
RETURN [abortThisTrans: FALSE];
};
};
GetTimeOfLastStartWork:
PUBLIC
ENTRY
PROC [self: Handle]
RETURNS [BasicTime.GMT] = {
RETURN [self.timeOfLastStartWork];
};
GetEstimatedUpdateCost:
PUBLIC
ENTRY
PROC [self: Handle]
RETURNS [
INT] = {
RETURN [self.estimatedUpdateCost];
};
GetTransID:
PUBLIC
PROC [self: Handle]
RETURNS [TransID] = {
Not ENTRY because the trans field is immutable.
RETURN [self.transID];
};
GetFileInstanceList:
PUBLIC
PROC [self: Handle]
RETURNS [fileInstanceList: FileInstanceHandle] = {
Not ENTRY because self.fileInstanceList "belongs" to the file manager.
RETURN [self.fileInstanceList];
};
SetFileInstanceList:
PUBLIC
PROC [self: Handle,
fileInstanceList: FileInstanceHandle] = {
Not ENTRY because self.fileInstanceList "belongs" to the file manager.
self.fileInstanceList ← fileInstanceList;
};
GetLockHeader:
PUBLIC
PROC [self: Handle]
RETURNS [lockHeader: AlpineInternal.LockTransHeaderHandle] = {
Not ENTRY because self.locks "belongs" to the lock manager.
RETURN[self.locks];
};
EnableAlpineWheel:
PUBLIC
ENTRY
PROC [self: Handle,
conversation: Conversation, enable: BOOL] = {
l: LIST OF RPC.ConversationID;
c: RPC.ConversationID = RPC.GetConversationID[conversation];
IF c = AlpineIdentity.myLocalConversationID THEN RETURN;
FOR l ← self.enabledWheelList, l.rest
UNTIL l =
NIL
DO
IF l.first = c THEN EXIT;
ENDLOOP;
IF enable
AND l =
NIL
THEN {
new:
LIST
OF
RPC.ConversationID = zone.
CONS[
first: c, rest: self.enabledWheelList];
self.enabledWheelList ← new;
}
ELSE
IF (
NOT enable)
AND l #
NIL
THEN {
IF self.enabledWheelList = l THEN self.enabledWheelList ← l.rest
ELSE
FOR p:
LIST
OF
RPC.ConversationID ← self.enabledWheelList, p.rest
UNTIL p =
NIL
DO
IF p.rest = l THEN { p.rest ← l.rest; EXIT };
ENDLOOP;
};
};
IsAlpineWheel:
PUBLIC
ENTRY
PROC [self: Handle,
conversation: Conversation] RETURNS [enabled: BOOL] = {
c: RPC.ConversationID = RPC.GetConversationID[conversation];
IF c = AlpineIdentity.myLocalConversationID THEN RETURN [enabled: TRUE];
FOR l:
LIST
OF
RPC.ConversationID ← self.enabledWheelList, l.rest
UNTIL l =
NIL
DO
IF l.first = c THEN RETURN [enabled: TRUE];
ENDLOOP;
RETURN [enabled: FALSE];
};
StateDuringRecovery:
PUBLIC
ENTRY
PROC [self: Handle]
RETURNS [TransactionMap.TransState] = {
SELECT self.stateDuringRecovery
FROM
active => ERROR;
ready => RETURN [ready];
committed => RETURN [committed];
aborted => RETURN [aborted];
ENDCASE => ERROR;
};
IsCommitted:
PUBLIC
ENTRY
PROC [self: Handle]
RETURNS [
BOOL] = {
SELECT self.state
FROM
active, preparing => RETURN [FALSE];
completing => RETURN [self.outcome = commit];
ENDCASE => ERROR; -- locks are supposed to prevent this
};
AssertUpdatesAreComplete:
PUBLIC
ENTRY
PROC [self: Handle] = {
IF self.state # completing THEN ERROR;
self.state ← fpmComplete;
};
TransIDFromTransHandle:
PUBLIC
SAFE
PROC [h: AlpineInternal.TransHandle]
RETURNS [AlpineEnvironment.TransID] ~
CHECKED {
For SkiPatrolHooks.
wh: Worker.Handle;
IF h =
NIL
THEN
RETURN [AlpineEnvironment.nullTransID]
ELSE {
TRUSTED {wh ← LOOPHOLE[h]};
RETURN [wh.transID]
}
};
LogControl.RegisterAnalysisProc[workerBegin, AnalyzeWorkerBegin];
LogControl.RegisterAnalysisProc[workerReady, AnalyzeWorkerReady];
LogControl.RegisterAnalysisProc[workerCompleting, AnalyzeWorkerCompleting];
LogControl.RegisterAnalysisProc[workerComplete, AnalyzeWorkerComplete];
Log.RegisterRecoveryProc[workerBegin, RecoverWorkerBegin];
Log.RegisterRecoveryProc[workerReady, RecoverWorkerReady];
Log.RegisterRecoveryProc[workerCompleting, RecoverWorkerCompleting];
LogControl.RegisterCheckpointProc[WorkerCheckpointProc];
Process.SetTimeout[@shortWaitOver, Process.MsecToTicks[Worker.shortWaitTime]];
Process.EnableAborts[@shortWaitOver];
END.--WorkerImpl