-- WorkerImpl.mesa
-- Implements worker in two-phase commit: create, prepare, finish (commit or abort.)
-- Last edited by
--   MBrown on January 30, 1984 12:12:43 pm PST
--   Kolling on June 6, 1983 12:05 pm



-- NOTES:

-- It seems poor for CreateWorker to ERROR Unknown[coordinator] in the case that the
--coordinator is there, but refusing new RPC work (CedarRPC.CallFailed[busy].)
-- It is also poor to land in debugger due to RPC.CallFailed[protocolError].
-- The implementation of AbortUnilaterally should be improved by making a remote
--call to the coordinator, if the coordinator is remote.


  DIRECTORY
    AccessControl,
    AlpineEnvironment,
    AlpineIdentity,
    AlpineImport,
    AlpineInternal,
    AlpineTransaction,
    AlpineTransMgr,
    AlpineTransMgrRpcControl,
    BasicTime,
    ClientMap USING [GetName],
    ConversationTable,
    ConvertUnsafe,
    FileControl,
    FilePageMgr,
    Lock,
    LockControl,
    Log,
    LogControl,
    LogInline,
    Process,
    Rope,
    RPC,
    SafeStorage,
    TransactionMap,
    Worker,
    WorkerControl,
    WorkerInternal;

WorkerImpl: MONITOR LOCKS self USING self: Handle
  IMPORTS
    AccessControl,
    AlpineIdentity,
    AlpineImport,
    AlpineTransaction,
    BasicTime,
    ClientMap,
    ConversationTable,
    ConvertUnsafe,
    FileControl,
    FilePageMgr,
    Lock,
    LockControl,
    Log,
    LogControl,
    LogInline,
    Process,
    Rope,
    RPC,
    SafeStorage,
    TransactionMap,
    WorkerInternal
  EXPORTS
    AlpineTransaction,
    AlpineTransMgr,
    AlpineInternal,
    TransactionMap,
    WorkerControl,
    WorkerInternal
  = BEGIN
  Conversation: TYPE = AlpineEnvironment.Conversation;
  TransID: TYPE = AlpineEnvironment.TransID;
  nullTransID: TransID = AlpineEnvironment.nullTransID;
  FileStore: TYPE = AlpineEnvironment.FileStore;
  FileInstanceHandle: TYPE = AlpineInternal.FileInstanceHandle;
  RecordID: TYPE = Log.RecordID;
  RecordType: TYPE = Log.RecordType;
  InternalProgrammingError: ERROR = CODE;

  Handle: TYPE = Worker.Handle;
  nullHandle: Handle = Worker.nullHandle;

  TransObject: PUBLIC TYPE = Worker.Object;
    -- AlpineInternal.TransObject

  Refused: PUBLIC ERROR [why: AlpineTransMgr.Refusal] = CODE;
    --AlpineTransMgr.Refused.

  zone: ZONE = SafeStorage.GetSystemZone[];

  disableLocalCoordinatorOptimization: BOOL ← FALSE;
    -- When TRUE, worker always forces log as if coordinator were remote.

  shortWaitOver: CONDITION;
    -- Client waits here under various unlikely circumstances involving concurrent calls
    --to CreateWorker, PrepareWorker, and FinishWorker, concurrent calls to worker actions
    --that use exclusive mode StartWork, and excessive concurrency among other worker
    --actions.  The wait times out after Worker.shortWaitTime milliseconds, allowing the
    --situation to be reevaluated.  Nobody notifies shortWaitOver.

  -- Parameters to worker log watchdog:

  densityFactor: INT ← 64;
    -- if self.estimatedUpdateCost*densityFactor is larger than the amount of log tied up
    --by this transaction, then the transaction is considered too expensive to abort
    --(unless it is inactive or its log use exceeeds logWordsUsableByClient).
  longTimeSinceLastStartWork: INT ← 1000; -- seconds (about 15 minutes)
    -- if the time since last start work is greater than this, the the transaction is
    --considered inactive.
  logWordsUsableByClient: INT;
    -- absolute upper bound on  of log tied up by a transaction.
  maxLogWordsForFastRestart: INT = 3000*LONG[AlpineEnvironment.wordsPerPage];
    -- fast restart means two minutes: two passes over 3000 pages at 50 pages/sec.
  maxLogWordsForWorker: INT;
    -- A worker may comfortably keep this much log active.  Beyond this bound, the
    --worker may be aborted (but need not be).
    -- These two values are initialized during recovery (based on the log length) and not
    --changed thereafter (except perhaps from the debugger).

  CreateWorker: PUBLIC PROC [
    conversation: Conversation, transID: TransID, coordinator: FileStore] = {
    -- ! Unknown {coordinator, transID};
    -- AlpineTransaction.CreateWorker.
    -- Called by a client.
    -- The worker object monitor is not held during the call to
    --AlpineCoordinator.RegisterWorker.  This was designed to allow
    --AlpineCoordinator.Finish to hold the coordinator object monitor during its calls
    --to workers.  (AlpineCoordinator.Finish does not actually work this way).
    self: Handle;
    alreadyRegistered: BOOL ← FALSE;
    whyCallFailed: RPC.CallFailure;
    registerWorkerResult: AlpineTransMgr.RegisterWorkerResult;
    alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord;
    conversationOut: Conversation;
    IF TransactionMap.GetHandle[transID] # nullHandle THEN alreadyRegistered ← TRUE
    ELSE self ← ConsWorker[transID, AlpineImport.Register[coordinator]];
    IF alreadyRegistered OR TransactionMap.Register[self].alreadyRegistered THEN {
      -- Drop self on the floor since transID is already registered.  We are not allowed
      --to call RegisterWorker; some other process is doing it.  Wait for
      --self.state # unknown, then return result.
      self ← TransactionMap.GetHandle[transID];
      IF self = nullHandle OR WaitUntilNotUnknown[self].workerNotActive THEN
        ERROR AlpineTransaction.Unknown[transID];
      }
    ELSE {
      -- We registered the volatile worker, so we are responsible to call RegisterWorker
      --and update volatile and permanent data structures to reflect the result.
      --We cannot return or error until self.state # unknown, lest another process hang
      --in WaitUntilNotUnknown.
      conversationOut ← ConversationTable.Fetch[self.coordinator];
      alpineTransMgr ← AlpineImport.GetTransMgrInterface[self.coordinator];
      IF conversationOut = NIL OR alpineTransMgr = NIL THEN GOTO noCoordinator;
      registerWorkerResult ← alpineTransMgr.RegisterWorker[conversationOut, transID !
        RPC.CallFailed => CHECKED { whyCallFailed ← why; GOTO callFailed } ];
      EndCreateWorker[self, IF registerWorkerResult = ok THEN create ELSE abort];
    EXITS
      noCoordinator => {
        EndCreateWorker[self, abort];
        ERROR AlpineTransaction.Unknown[coordinator] };
      callFailed => {
        EndCreateWorker[self, abort];
        SELECT whyCallFailed FROM
          timeout, unbound => {
            self.coordinator.TransMgrInterfaceCallFailed[alpineTransMgr];
            alpineTransMgr ← NIL; --must come after CallFailed is unwound
            ERROR AlpineTransaction.Unknown[coordinator] };
          busy => ERROR AlpineTransaction.Unknown[coordinator];
          runtimeProtocol, stubProtocol => ERROR;
          ENDCASE }
      };
    };

  ConsWorker: PROC [trans: TransID, coordinator: AlpineImport.Handle]
    RETURNS [Handle] = {
    -- Note: does not establish the locks field of the Worker.Object.
    RETURN [zone.NEW[Worker.Object ← [
      transID: trans,
      timeOfLastStartWork: BasicTime.Now[],
      coordinator: coordinator,
      coordinatorIsRemote: disableLocalCoordinatorOptimization OR
        NOT coordinator.Equal[AlpineIdentity.myAlpineImportHandle]]]];
    };
    
  EndCreateWorker: ENTRY PROC [self: Handle, outcome: {create, abort} ] = {
    -- self is registered in TransactionMap, in unknown state (no WorkerBegin written).
    IF self.state # unknown THEN ERROR;
    IF outcome = create THEN {
      LogWorkerBegin[self];
      self.state ← active;
      IF self.locks = NIL THEN self.locks ← LockControl.ConsTransHeader[self];
      }
    ELSE {
      -- Must change state from unknown since another process in WaitUntilNotUnknown
      --may be waiting for this event.
      self.state ← complete;
      self.outcome ← abort;
      TransactionMap.Unregister[self];
      };
    };

  WaitUntilNotUnknown: ENTRY PROC [self: Handle]
    RETURNS [workerNotActive: BOOL] = {
    WHILE self.state = unknown DO WAIT shortWaitOver ENDLOOP;
    RETURN [self.state # active];
    };

  WorkerPrepare: PUBLIC PROC [
    conversation: Conversation, trans: TransID,
    newTrans: AlpineEnvironment.TransID]
    RETURNS [AlpineTransMgr.WorkerState] = {
    -- ! Refused {transUnknown, wrongCoordinator}
    -- AlpineTransMgr.WorkerPrepare.
    -- Called by the coordinator of "trans".
    -- This procedure is prepared for duplicate calls.  Extra callers wait on the
    --global condition variable prepareFinished.
    stateAfterPrepare: AlpineEnvironment.WorkerState;
    self: Handle = TransactionMap.GetHandle[trans];
    IF self = nullHandle THEN RETURN [notReady];
    IF NOT Rope.Equal[
      s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN
      ERROR Refused[wrongCoordinator];
    SELECT BeginPrepare[self] FROM
      preparing => NULL; -- normal case
      ready => RETURN [ready]; -- duplicate call
      ENDCASE => RETURN [notReady]; -- duplicate call
    -- We have caused self to enter the preparing state.
    IF newTrans # nullTransID THEN {
      -- Perform specialized version of CreateWorker.
      self.continueWorker ← ConsWorker[newTrans, self.coordinator];
      IF TransactionMap.Register[self.continueWorker].alreadyRegistered THEN ERROR;
        -- The newTrans is already known to us.  This probably represents an error
        --in the coordinator.
      };
    {
      IF PreventStartWork[self: self, allowDifficulty: normal].abortThisTrans THEN GOTO abort;
      -- Call any external caches now; they are allowed to do read/write pages.
      AccessControl.PhaseOneSpaceAndOwnerChanges[self !
        AccessControl.LockFailed, AccessControl.Unknown => GOTO abort];
      IF PreventStartWork[self: self, allowDifficulty: zero].abortThisTrans THEN GOTO abort;
      -- Any call to StartWork for this transaction will now fail.
      -- There is no work in progress, including waiting lock requests.
      -- We therefore have exclusive access without entering the monitor.
      stateAfterPrepare ← ReadyWorker[self];
      EXITS
        abort => {
          DO
            LockControl.AbortWaitingRequests[self];
            IF ShortWaitForNStarts[self] = 0 THEN EXIT;
            ENDLOOP;
          stateAfterPrepare ← notReady;
          };
      };
    EndPrepare[self, stateAfterPrepare];
    -- We have caused self to enter either the ready state or the completing state.
    SELECT stateAfterPrepare FROM
      notReady => NormalAbortWorker[self];
      readOnlyReady => NormalCommitWorker[self];
      ready => NULL;
      ENDCASE;
    RETURN [stateAfterPrepare]
    };

  BeginPrepare: ENTRY PROC [self: Handle]
    RETURNS [Worker.State] = {
    -- result is # unknown, active.
    -- result is = preparing iff this call caused the transition to enter
    --the preparing state.
    DO
      SELECT self.state FROM
        unknown, preparing => WAIT shortWaitOver;
        ENDCASE => EXIT;
      ENDLOOP;
    IF self.state = active THEN self.state ← preparing;
    RETURN [self.state];
    };

  ReadyWorker: PROC [self: Handle]
    RETURNS [AlpineEnvironment.WorkerState] = {
    -- Called during normal operation and during recovery.
    -- The caller has exclusive access to the transaction self, is not holding the monitor.
    readOnly: BOOL ← FileControl.CommitPhase1[self !
      Lock.Failed, Lock.TransAborting => GOTO abort];
    LockControl.UpgradeLocks[self !
      Lock.Failed, Lock.TransAborting => GOTO abort];
    RETURN [IF readOnly THEN readOnlyReady ELSE ready];
    EXITS
      abort => RETURN [notReady];
    };

  EndPrepare: ENTRY PROC [self: Handle,
    stateAfterPrepare: AlpineEnvironment.WorkerState] = {
    SELECT stateAfterPrepare FROM
      notReady => {
        LogWorkerCompleting[self: self, outcome: abort, force: FALSE];
        self.state ← completing;
        self.outcome ← abort;
        };
      readOnlyReady => {
        LogWorkerCompleting[self: self, outcome: readOnly, force: FALSE];
        self.state ← completing;
        self.outcome ← commit;
        };
      ready => {
        LogWorkerReady[self: self, force: self.coordinatorIsRemote];
        self.state ← ready;
        };
      ENDCASE => ERROR;
    };

  WorkerFinish: PUBLIC PROC [
    conversation: Conversation, trans: TransID,
    requiredOutcome: AlpineTransMgr.RequiredOutcome] = {
    -- ! Refused {wrongCoordinator, notReady};
    -- AlpineTransMgr.WorkerFinish
    -- Called by the coordinator of "trans".
    self: Handle = TransactionMap.GetHandle[trans];
    IF self = nullHandle THEN RETURN; --we must have already finished.
    IF NOT Rope.Equal[
      s1: ClientMap.GetName[conversation], s2: self.coordinator.Name, case: FALSE] THEN
      ERROR Refused[wrongCoordinator];
    IF requiredOutcome NOT IN [abort .. commit] THEN ERROR;
    LocalWorkerFinish[self, requiredOutcome, clientRequest];
    };

  AbortUnilaterally: PUBLIC PROC [
    self: Handle, why: TransactionMap.AbortReason] = {
    -- TransactionMap.AbortUnilaterally
    -- Note some duplicate logic with LocalWorkerFinish.
    SELECT BeginFinish[self, abort, returnIfPreparing] FROM
      active => ERROR Refused [notReady];
      preparing => {
        -- We cannot determine self's outcome, and are not responsible for making it
        --complete, but we are responsible for ensuring that it leaves the preparing state.
        --Otherwise it might stay there forever, waiting in the lock manager.
        -- This loop is really necessary, since neither process involved is synchronized
        --by self's or Lock's monitors.
        DO
          LockControl.AbortWaitingRequests[self];
          IF ShortWaitForState[self] # preparing THEN RETURN;
          ENDLOOP;
        };
      completing => {
        -- We have caused self to enter the completing state, self.outcome = abort.
        -- Hence we are responsible for making it complete.
        IF PreventStartWork[self, zero].abortThisTrans THEN {
          -- Some work is still in progress.  Wait for it to go away.
          DO
            LockControl.AbortWaitingRequests[self];
            IF ShortWaitForNStarts[self] = 0 THEN EXIT;
            ENDLOOP;
          };
        NormalAbortWorker[self];
        -- temporary kludge, effective since remote coordinators aren't used now.
        [] ← AlpineTransaction.Finish[
          AlpineIdentity.myLocalConversation, self.transID, abort, FALSE];
        RETURN;
        };
      ENDCASE => RETURN;
    };

  ShortWaitForState: ENTRY PROC [self: Handle]
    RETURNS [Worker.State] = INLINE {
    WAIT shortWaitOver;
    RETURN [self.state];
    };

  ShortWaitForNStarts: ENTRY PROC [self: Handle]
    RETURNS [nStarts: [0..Worker.maxStarts]] = INLINE {
    WAIT shortWaitOver;
    RETURN [self.nStarts];
    };

  LocalWorkerFinish: PROC [
    self: Handle, requiredOutcome: AlpineTransMgr.RequiredOutcome,
    whyAbort: TransactionMap.AbortReason] = {
    SELECT BeginFinish[self, requiredOutcome, waitIfPreparing] FROM
      active => ERROR Refused [notReady];
      completing => {
        -- We have caused self to enter the completing state, self.outcome = requiredOutcome.
        -- Hence we are responsible for making it complete.
        IF requiredOutcome = abort THEN {
          IF PreventStartWork[self, zero].abortThisTrans THEN {
            -- Some work is still in progress.  Wait for it to go away.
            DO
              LockControl.AbortWaitingRequests[self];
              IF ShortWaitForNStarts[self] = 0 THEN EXIT;
              ENDLOOP;
            };
          NormalAbortWorker[self]
          }
        ELSE NormalCommitWorker[self];
        };
      ENDCASE => RETURN;
    };

  BeginFinish: ENTRY PROC [self: Handle,
    requiredOutcome: AlpineTransMgr.RequiredOutcome,
    option: {returnIfPreparing, waitIfPreparing}]
    RETURNS [Worker.State] = {
    -- result is # unknown, ready.
    -- result is = active only if requiredOutcome = commit and transaction was unknown
    --or active on entry.  (Caller is in error.)
    -- result is = preparing only if returnIfPreparing = TRUE and transaction was preparing
    --on entry.  (Caller is trying to perform unilateral abort.)
    -- result is = completing iff this call caused the transaction to enter
    --the completing state.  (Caller is now responsible for making transaction complete.)
    DO
      SELECT self.state FROM
        unknown, completing => WAIT shortWaitOver;
        preparing => IF option = waitIfPreparing THEN WAIT shortWaitOver
          ELSE RETURN [preparing];
        ENDCASE => EXIT;
      ENDLOOP;
    IF requiredOutcome = abort AND (self.state = active OR self.state = ready) THEN {
      LogWorkerCompleting[self, abort, FALSE];
      self.state ← completing;
      self.outcome ← abort;
      }
    ELSE IF requiredOutcome = commit AND self.state = ready THEN {
      LogWorkerCompleting[self, commit, self.coordinatorIsRemote];
      self.state ← completing;
      self.outcome ← commit;
      };
    RETURN [self.state];
    };

  NormalCommitWorker: PROC [self: Handle] = {
    -- self.state = completing, self.outcome = commit
    -- Call any external caches now; they need to know about the commit.
    AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, commit];
    CommitWorker[self];
    };

  CommitWorker: PROC [self: Handle] = {
    -- Called during normal operation and during recovery.
    FileControl.CommitPhase2[trans: self, newTrans: self.continueWorker];
    IF self.continueWorker # nullHandle THEN {
      LockControl.TransferLocks[from: self, to: self.continueWorker];
      self.continueWorker.locks ← self.locks;
      self.locks ← NIL;
      EndCreateWorker[self.continueWorker, create];
      self.continueWorker ← nullHandle;
      }
    ELSE {
      LockControl.ReleaseLocks[self];
      self.locks ← NIL;
      };
    };

  NormalAbortWorker: PROC [self: Handle] = {
    -- self.state = completing, self.outcome = abort
    -- Call any external caches now; they need to know about the abort.
    AccessControl.PhaseTwoOrAbortSpaceAndOwnerChanges[self, abort];
    AbortWorker[self];
    };

  AbortWorker: PROC [self: Handle] = {
    -- Called during normal operation and during recovery.
    FileControl.Abort[trans: self];
    LockControl.ReleaseLocks[self];
    self.locks ← NIL;
    IF self.continueWorker # nullHandle THEN {
      EndCreateWorker[self.continueWorker, abort];
      self.continueWorker ← nullHandle;
      };
    };

-- Log writing and analysis; checkpoint proc

  LogWorkerBegin: PROC [self: Handle] = {
    rec: Worker.BeginLogRep ← [];
    ConvertUnsafe.AppendRope[
      to: LOOPHOLE[LONG[@rec.coordinator]], from: self.coordinator.Name];
    [thisRecord: self.beginRecord] ← Log.Write[self, workerBegin,
      [base: @rec, length: TEXT[rec.length].SIZE]];
    };

  LogWorkerReady: PROC [self: Handle, force: BOOL] = {
    [] ← Log.Write[self, workerReady,
      Log.nullBlock, force];
    };

  LogWorkerCompleting: PROC [self: Handle,
    outcome: AlpineInternal.WorkerOutcome, force: BOOL] = {
    rec: Worker.CompletingLogRep ← [outcome: outcome];
    [] ← Log.Write[self, workerCompleting,
      [base: @rec, length: Worker.CompletingLogRep.SIZE], force];
    };

  LogWorkerComplete: PROC [self: Handle] = {
    [] ← Log.Write[self, workerComplete, Log.nullBlock];
    };

  AnalyzeWorkerBegin: PROC [
    record: RecordID, type: RecordType, trans: TransID] = {
    coordinator: AlpineEnvironment.FileStore;
      { -- get coordinator name from log, map it into a FileStore.
      rec: Worker.BeginLogRep ← [];
      status: Log.ReadProcStatus;
      [status: status] ← Log.ReadForRecovery[thisRecord: record,
        to: [base: @rec, length: Worker.BeginLogRep.SIZE]];
      IF status = destinationFull THEN ERROR InternalProgrammingError;
      coordinator ← ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.coordinator]]];
      };
      { -- create a worker handle from trans, setting its coordinator field from above.
      self: Handle ← ConsWorker[trans, AlpineImport.Register[coordinator]];
      IF TransactionMap.Register[self].alreadyRegistered THEN ERROR InternalProgrammingError;
      self.beginRecord ← record;
      self.locks ← LockControl.ConsTransHeader[self];
      self.allowableDifficulty ← zero;
      self.stateDuringRecovery ← active;
      };
    };

  AnalyzeWorkerReady: PROC [
    record: RecordID, type: RecordType, trans: TransID] = {
    self: Handle = TransactionMap.GetHandle[trans];
    IF self = nullHandle THEN RETURN;
    IF self.stateDuringRecovery # active THEN ERROR InternalProgrammingError;
    self.stateDuringRecovery ← ready;
    };

  AnalyzeWorkerCompleting: PROC [
    record: RecordID, type: RecordType, trans: TransID] = {
    outcome: AlpineInternal.WorkerOutcome;
      { -- get outcome from log
      rec: Worker.CompletingLogRep;
      status: Log.ReadProcStatus;
      [status: status] ← Log.ReadForRecovery[thisRecord: record,
        to: [base: @rec, length: Worker.CompletingLogRep.SIZE]];
      IF status # normal THEN ERROR InternalProgrammingError;
      outcome ← rec.outcome;
      };
      {
      self: Handle = TransactionMap.GetHandle[trans];
      IF self = nullHandle THEN RETURN;
      IF self.stateDuringRecovery NOT IN [active .. ready] THEN
        ERROR InternalProgrammingError;
      self.stateDuringRecovery ← IF outcome = abort THEN aborted ELSE committed;
      };
    };

  AnalyzeWorkerComplete: PROC [
    record: RecordID, type: RecordType, trans: TransID] = {
    self: Handle = TransactionMap.GetHandle[trans];
    IF self = nullHandle THEN RETURN;
    SELECT self.stateDuringRecovery FROM
      active, ready => ERROR;
      committed => self.outcome ← commit;
      aborted => self.outcome ← abort
      ENDCASE => ERROR;
    LockControl.ReleaseLocks[self];
    self.locks ← NIL;
    self.state ← complete;
    TransactionMap.Unregister[self];
    };

  CallAfterAnalysisPass: PUBLIC PROC [] = {
    -- WorkerControl.CallAfterAnalysisPass
    AbortActiveWorker: PROC [self: Handle] RETURNS [stop: BOOL] = {
      SELECT self.stateDuringRecovery FROM
        active => {
          LogWorkerCompleting[self: self, outcome: abort, force: FALSE];
          self.stateDuringRecovery ← aborted;
          };
        ready, committed, aborted => NULL;
        ENDCASE => ERROR;
        RETURN [stop: FALSE];
      };
    logWordsUsableByClient ← LogControl.WordsUsableByClient[];
    maxLogWordsForWorker ← MIN[
      maxLogWordsForFastRestart, logWordsUsableByClient];
    TransactionMap.LockedEnumerate[AbortActiveWorker];
    };

  RecoverWorkerBegin: PROC [
    record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
    IF trans.state # unknown THEN ERROR;
    trans.state ← active;
    };

  RecoverWorkerReady: PROC [
    record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
    IF trans.state # active THEN ERROR;
    trans.state ← ready;
    };

  RecoverWorkerCompleting: PROC [
    record: RecordID, type: RecordType, trans: Handle, outcome: Log.TransState] = {
    trans.state ← completing;
    SELECT outcome FROM
      committed => {
        trans.state ← ready;
        IF ReadyWorker[trans] = notReady THEN ERROR;
        trans.state ← completing;  trans.outcome ← commit;
        [] ← CommitWorker[trans];
        };
      aborted => {
        trans.state ← completing;  trans.outcome ← abort;
        AbortWorker[trans];
        };
      ready => {
        trans.state ← ready;
        IF ReadyWorker[trans] # ready THEN ERROR;
        };
      ENDCASE => ERROR;
    };

  CallAfterUpdatePass: PUBLIC PROC [] = {
    -- WorkerControl.CallAfterUpdatePass
    noActiveWorkers: BOOL;
    CheckForActiveWorkers: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
      SELECT self.state FROM
        ready, fpmComplete => NULL;
        ENDCASE => noActiveWorkers ← FALSE;
      RETURN [stop: FALSE];
      };
    DO
      noActiveWorkers ← TRUE;
      TransactionMap.LockedEnumerate[CheckForActiveWorkers];
      IF noActiveWorkers THEN RETURN ELSE Process.Pause[Process.SecondsToTicks[1]];
      ENDLOOP;
    };

  WorkerCheckpointProc: PROC []
    RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
    fpmCompleteWorkerSeen: BOOL ← FALSE;
    ExamineWorkerBeforeForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
      IF self.state = fpmComplete THEN {
        self.state ← fpmCompleteBeingForcedOut;
        fpmCompleteWorkerSeen ← TRUE;
        };
      RETURN [stop: FALSE];
      };
    oldestBeginRecord: Log.RecordID ← Log.lastRecordID;
    ExamineWorkerAfterForceOut: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
      IF self.state = fpmCompleteBeingForcedOut THEN {
        LogWorkerComplete[self];
        TransactionMap.Unregister[self];
        self.state ← complete;
        }
      ELSE IF self.state # unknown THEN
        oldestBeginRecord ← LogInline.Min[self.beginRecord, oldestBeginRecord];
      RETURN [stop: FALSE];
      };
    TransactionMap.UnlockedEnumerate[ExamineWorkerBeforeForceOut];
    IF fpmCompleteWorkerSeen THEN FilePageMgr.ForceOutEverything[];
    TransactionMap.UnlockedEnumerate[ExamineWorkerAfterForceOut];
    IF LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], oldestBeginRecord] >
      maxLogWordsForWorker THEN WorkerInternal.NotifyLogWatchdog[];
    RETURN [oldestBeginRecord, oldestBeginRecord];
    };

  WorkerLogWatchdogProcess: PUBLIC PROC [] = {
    transID: TransID ← nullTransID;
    w: Handle ← NIL;
    wait: BOOL ← FALSE;
    DO
      IF transID # nullTransID THEN w ← TransactionMap.GetHandle[transID];
      IF w = NIL THEN w ← GetOldestActiveWorker[];
      IF w = NIL THEN { transID ← nullTransID; wait ← TRUE }
      ELSE SELECT TestForAbort[w] FROM
        notActive => transID ← nullTransID;
        abort => {
          AbortUnilaterally[w, oldUncommittedUpdates];
          transID ← nullTransID;
          };
        dontAbort => { transID ← w.transID; wait ← TRUE };
        ENDCASE => ERROR;
      w ← NIL; -- so that w can be garbage-collected
      IF wait THEN { WorkerInternal.WaitForNotify[]; wait ← FALSE };
      ENDLOOP;
    };

  TestForAbort: ENTRY PROC [self: Handle] RETURNS [{notActive, abort, dontAbort}] = {
    logWordsForWorker: INT =
      LogInline.WordsFromSubtract[LogControl.RecordIDOfNextWrite[], self.beginRecord];
    IF self.state # active THEN RETURN [notActive];
    IF logWordsForWorker <= maxLogWordsForWorker THEN RETURN [dontAbort];
    IF logWordsForWorker > logWordsUsableByClient THEN RETURN [abort];
    IF BasicTime.Period[from: self.timeOfLastStartWork, to: BasicTime.Now[]] <
      longTimeSinceLastStartWork --active-- AND
      self.estimatedUpdateCost * densityFactor > logWordsForWorker --expensive-- THEN
      RETURN [dontAbort];
    RETURN [abort];
    };
    
  GetOldestActiveWorker: PROC [] RETURNS [Handle] = {
    w: Handle ← NIL;
    ExamineWorker: ENTRY PROC [self: Handle] RETURNS [stop: BOOL] = {
      IF self.state = active AND (w = NIL OR
        LogInline.Compare[w.beginRecord, self.beginRecord] # less) THEN
        w ← self;
      RETURN [stop: FALSE];
      };
    TransactionMap.UnlockedEnumerate[ExamineWorker];
    RETURN [w];
    };

-- Procedures exported to TransactionMap, and some related procedures.

  StartWork: PUBLIC ENTRY PROC [self: Handle,
    difficulty: AlpineInternal.WorkLevel] RETURNS [canWork: BOOL] = {
    IF difficulty > self.allowableDifficulty THEN RETURN [FALSE];
    IF self.state NOT IN [active .. preparing] THEN ERROR;
    WHILE self.nStarts = Worker.maxStarts DO
      WAIT shortWaitOver;
      IF difficulty > self.allowableDifficulty THEN RETURN [FALSE];
      ENDLOOP;
    self.nStarts ← self.nStarts + 1;
    self.timeOfLastStartWork ← BasicTime.Now[];
    RETURN [TRUE];
    };

  StopWork: PUBLIC ENTRY PROC [self: Handle, estimatedUpdateCost: INT] = {
    IF self.nStarts = 0 THEN ERROR; --impossible if clients are correctly coded--
    self.nStarts ← self.nStarts - 1;
    self.estimatedUpdateCost ← self.estimatedUpdateCost + estimatedUpdateCost;
    };

  PreventStartWork: ENTRY PROC [self: Handle, allowDifficulty: Worker.Difficulty]
    RETURNS [abortThisTrans: BOOL] = {
    -- Return abortThisTrans ~ TRUE if work is in progress.
    IF self.nStarts # 0 THEN {
      self.allowableDifficulty ← zero;
      RETURN [abortThisTrans: TRUE];
      }
    ELSE {
      self.allowableDifficulty ← allowDifficulty;
      RETURN [abortThisTrans: FALSE];
      };
    };

  GetTimeOfLastStartWork: PUBLIC ENTRY PROC [self: Handle]
    RETURNS [BasicTime.GMT] = {
    RETURN [self.timeOfLastStartWork];
    };

  GetEstimatedUpdateCost: PUBLIC ENTRY PROC [self: Handle] RETURNS [INT] = {
    RETURN [self.estimatedUpdateCost];
    };

  GetTransID: PUBLIC PROC [self: Handle] RETURNS [TransID] = {
    -- Not ENTRY because the trans field is immutable.
    RETURN [self.transID];
    };

  GetFileInstanceList: PUBLIC PROC [self: Handle]
    RETURNS [fileInstanceList: FileInstanceHandle] = {
    -- Not ENTRY because self.fileInstanceList "belongs" to the file manager.
    RETURN [self.fileInstanceList];
    };

  SetFileInstanceList: PUBLIC PROC [self: Handle,
    fileInstanceList: FileInstanceHandle] = {
    -- Not ENTRY because self.fileInstanceList "belongs" to the file manager.
    self.fileInstanceList ← fileInstanceList;
    };

  GetLockHeader: PUBLIC PROC [self: Handle]
    RETURNS [lockHeader: AlpineInternal.LockTransHeaderHandle] = {
    -- Not ENTRY because self.locks "belongs" to the lock manager.
    RETURN[self.locks];
    };

  EnableAlpineWheel: PUBLIC ENTRY PROC [self: Handle,
    conversation: Conversation, enable: BOOL] = {
    l: LIST OF RPC.ConversationID;
    c: RPC.ConversationID = RPC.GetConversationID[conversation];
    IF c = AlpineIdentity.myLocalConversationID THEN RETURN;
    FOR l ← self.enabledWheelList, l.rest UNTIL l = NIL DO
      IF l.first = c THEN EXIT;
      ENDLOOP;
    IF enable AND l = NIL THEN {
      new: LIST OF RPC.ConversationID = zone.CONS[
        first: c, rest: self.enabledWheelList];
      self.enabledWheelList ← new;
      }
    ELSE IF (NOT enable) AND l # NIL THEN {
      IF self.enabledWheelList = l THEN self.enabledWheelList ← l.rest
      ELSE
        FOR p: LIST OF RPC.ConversationID ← self.enabledWheelList, p.rest UNTIL p = NIL DO
          IF p.rest = l THEN { p.rest ← l.rest; EXIT };
          ENDLOOP;
      };
    };
     
  IsAlpineWheel: PUBLIC ENTRY PROC [self: Handle,
    conversation: Conversation] RETURNS [enabled: BOOL] = {
    c: RPC.ConversationID = RPC.GetConversationID[conversation];
    IF c = AlpineIdentity.myLocalConversationID THEN RETURN [enabled: TRUE];
    FOR l: LIST OF RPC.ConversationID ← self.enabledWheelList, l.rest UNTIL l = NIL DO
      IF l.first = c THEN RETURN [enabled: TRUE];
      ENDLOOP;
    RETURN [enabled: FALSE];
    };

  StateDuringRecovery: PUBLIC ENTRY PROC [self: Handle]
    RETURNS [TransactionMap.TransState] = {
    SELECT self.stateDuringRecovery FROM
      active => ERROR;
      ready => RETURN [ready];
      committed => RETURN [committed];
      aborted => RETURN [aborted];
      ENDCASE => ERROR;
    };

  IsCommitted: PUBLIC ENTRY PROC [self: Handle] RETURNS [BOOL] = {
    SELECT self.state FROM
      active, preparing => RETURN [FALSE];
      completing => RETURN [self.outcome = commit];
      ENDCASE => ERROR; -- locks are supposed to prevent this
    };

  AssertUpdatesAreComplete: PUBLIC ENTRY PROC [self: Handle] = {
    IF self.state # completing THEN ERROR;
    self.state ← fpmComplete;
    };


-- Initialization

  LogControl.RegisterAnalysisProc[workerBegin, AnalyzeWorkerBegin];
  LogControl.RegisterAnalysisProc[workerReady, AnalyzeWorkerReady];
  LogControl.RegisterAnalysisProc[workerCompleting, AnalyzeWorkerCompleting];
  LogControl.RegisterAnalysisProc[workerComplete, AnalyzeWorkerComplete];

  Log.RegisterRecoveryProc[workerBegin, RecoverWorkerBegin];
  Log.RegisterRecoveryProc[workerReady, RecoverWorkerReady];
  Log.RegisterRecoveryProc[workerCompleting, RecoverWorkerCompleting];

  LogControl.RegisterCheckpointProc[WorkerCheckpointProc];

  Process.SetTimeout[@shortWaitOver, Process.MsecToTicks[Worker.shortWaitTime]];
  Process.EnableAborts[@shortWaitOver];

  END.--WorkerImpl

CHANGE LOG

Changed by MBrown on February 9, 1983 11:54 am
-- Change CommitWorker to set continueWorker ← nullHandle.  (This was happening,
--unsynchronized, from the checkpoint process!)

Changed by MBrown on March 2, 1983 2:53 pm
-- Change sanity check in StartWork.

Changed by MBrown on April 3, 1983 10:32 am
-- Implemented worker log watchdog.  Changed AbortUnilaterally to call
--AlpineTransaction.Finish[..., abort, ...], making coordinator go away if it is local.
--Updated catch phrases for AccessControl errors.

Changed by MBrown on June 6, 1983 11:28 am
-- In WorkerImpl.CheckForReadyWorker, was examining worker state without entering
--monitor.  Changed logic as follows: not WorkerImpl.CallAfterUpdatePass waits until
--all workers are either ready or fpmComplete.

Changed by MBrown on June 25, 1983 9:43 pm
-- Fixed two related bugs.  If AbortUnilaterally was called with self.state = preparing,
--it simply waited for self.state # preparing.  This is no good because self may be
--waiting in the lock manager, deadlocked, and the caller of AbortUnilaterally may be
--the lock watchdog process!  This was observed several times in server operation.
-- The second bug was more subtle and probably has not yet arisen in practice.  When
--a process calls LockControl.AbortWaitingRequests, what guarantee does it have that no
--new requests will come along and wait later?  Answer: no guarantee.  This synchronization
--must be done at the level of the worker object monitor, and the lock manager does not
--enter the worker monitor.  We wrapped all calls to LockControl.AbortWaitingRequests in
--loops that are designed to ensure that when the loop exits, no more Lock.Set or
--LockControl.UpgradeLocks calls will be executed for this transaction.  This seems messy
--but it is not clear how to do better.