-- CoordinatorImpl.mesa
-- Last edited by
--   MBrown on January 30, 1984 12:08:04 pm PST
--   Taft on  1-Feb-82 13:16:03
--   Kolling on July 13, 1983 3:27 pm

-- NOTES:
--  FORK CoordinatorFinishProcess[c, requestedOutcome] is not controlled (might try to fork
--too many) and not synchronized (outside call to Finish might get in).
-- Simultaneous Finish calls should work better (now, second caller gets result = unknown).
-- The mechanism for limiting the number of coordinators is quick and dirty, should be
--monitored in a different way.


  DIRECTORY
    AlpineEnvironment,
    AlpineIdentity USING [myLogVolumeID, myAlpineImportHandle, myFileStore],
    AlpineImport,
    AlpineInternal,
    AlpineTransaction,
    AlpineTransMgr,
    AlpineTransMgrRpcControl,
    BasicTime,
    ClientMap,
    ConcreteTransID,
    ConvertUnsafe,
    Coordinator,
    CoordinatorControl,
    CoordinatorInternal,
    CoordinatorMap,
    Log,
    LogControl,
    LogInline,
    Process USING [Detach, EnableAborts, SetTimeout, SecondsToTicks, Ticks],
    RPC,
    SafeStorage;

CoordinatorImpl: MONITOR LOCKS c USING c: Handle
  IMPORTS
    AlpineIdentity,
    AlpineImport,
    AlpineTransaction,
    BasicTime,
    ClientMap,
    ConvertUnsafe,
    CoordinatorInternal,
    CoordinatorMap,
    Log,
    LogControl,
    LogInline,
    Process,
    SafeStorage
  EXPORTS
    AlpineInternal,
    AlpineTransaction,
    AlpineTransMgr,
    CoordinatorControl
  = BEGIN
  Conversation: TYPE = AlpineEnvironment.Conversation;
  TransID: TYPE = AlpineEnvironment.TransID;
  nullTransID: TransID = AlpineEnvironment.nullTransID;
  RecordID: TYPE = Log.RecordID;
  RecordType: TYPE = Log.RecordType;
  Results: TYPE = Coordinator.Results;
  InternalProgrammingError: ERROR = CODE;
  Handle: TYPE = Coordinator.Handle;
  nullHandle: Handle = Coordinator.nullHandle;

  CoordinatorObject: PUBLIC TYPE = Coordinator.Object;
    -- AlpineInternal.CoordinatorObject

  secondsWaitAfterCommFailure: INT = CoordinatorInternal.SecondsWaitAfterCommFailure;
  timeoutForResultsReturned: Process.Ticks =
    Process.SecondsToTicks[CoordinatorInternal.SecondsTimeoutForResultsReturned];
  maxCoordinators: INT ← 40 --CoordinatorInternal.MaxCoordinators--;

  zone: ZONE = SafeStorage.GetSystemZone[];


  Create: PUBLIC PROC [conversation: Conversation, createLocalWorker: BOOL]
    RETURNS [transID: AlpineTransaction.TransID] = {
    -- ! AlpineTransaction.OperationFailed[why: busy];
    -- AlpineTransaction.Create.
    -- Called by any Alpine client.
    c: Handle;
    IF CoordinatorMap.Count[] >= maxCoordinators THEN GOTO fail;
    c ← ConsCoordinator[];
    CoordinatorMap.Register[c];
    IF createLocalWorker THEN
      AlpineTransaction.CreateWorker[conversation, c.transID, AlpineIdentity.myFileStore !
        AlpineTransaction.Unknown => GOTO fail];
    RETURN [c.transID];
    EXITS
      fail => ERROR AlpineTransaction.OperationFailed[why: busy];
    };

  ConsCoordinator: PROC [] RETURNS [Handle] = {
    newTrans: Handle = zone.NEW[Coordinator.Object ← []];
    Process.SetTimeout[@newTrans.resultsReturned, timeoutForResultsReturned];
    --Process.EnableAborts[@newTrans.resultsReturned];
    CoordinatorInternal.NextTransID[newTrans]; --writes coordinatorBegin
    RETURN [newTrans] };
    
  RegisterWorker: PUBLIC PROC [
    conversation: Conversation, trans: TransID] 
    RETURNS [AlpineTransMgr.RegisterWorkerResult] = {
    -- ! (none);
    -- AlpineTransMgr.RegisterWorker.
    -- The caller is a worker (identity obtainable via "conversation") with no monitors
    --locked.
    RegisterWorkerEntry: ENTRY PROC [c: Handle]
      RETURNS [AlpineTransMgr.RegisterWorkerResult] = {
      -- c # nullHandle.
      worker: AlpineImport.Handle;
      IF c.state # active THEN RETURN [transNotActive];
      worker ← AlpineImport.Register[ClientMap.GetName[conversation]];
      FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
        IF worker.Equal[l.first.worker] THEN RETURN [duplicateCall];
        ENDLOOP;
      ConsWorker[c, worker];
      RETURN [ok];
      };--RegisterWorkerEntry
    c: Handle = CoordinatorMap.GetHandle[trans];
    IF c = nullHandle THEN RETURN [transNotActive];
    RETURN [RegisterWorkerEntry[c]];
    };

  ConsWorker: PROC [c: Handle, w: AlpineImport.Handle] = {
    l: Coordinator.WorkerHandle ← zone.CONS[[worker: w], c.workers];
    c.workers ← l;
    LogCoordinatorRegisterWorker[c, w];
    };

  CreateWithWorkers: INTERNAL PROC [c: Handle]
    RETURNS [Handle] = {
    -- Creates a new transaction, and simulates RegisterWorker for each worker
    --of existing transaction c.  Returns the new transaction, without registering it.
    newTrans: Handle = ConsCoordinator[];
    FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
      ConsWorker[newTrans, l.first.worker];
      ENDLOOP;
    RETURN [newTrans]
    };

  Finish: PUBLIC PROC [
    conversation: RPC.Conversation, transID: AlpineTransaction.TransID,
    requestedOutcome: AlpineTransaction.RequestedOutcome,
    continue: BOOL]
    RETURNS [outcome: AlpineTransaction.Outcome, newTrans: AlpineTransaction.TransID] = {
    -- ! none (but may return "unknown" as a result.)
    -- AlpineTransaction.Finish
    newTransCoordinator: Handle;
    c: Handle = CoordinatorMap.GetHandle[trans: transID];
    IF c = nullHandle THEN RETURN [unknown, nullTransID];
    IF requestedOutcome = abort THEN continue ← FALSE;
    [outcome, newTransCoordinator] ← FinishEntry[c, requestedOutcome, continue];
    IF continue AND outcome = abort
      AND newTransCoordinator # nullHandle THEN {
      [] ← FinishEntry[newTransCoordinator, abort, FALSE];
      newTransCoordinator ← nullHandle };
    RETURN [outcome, IF newTransCoordinator = nullHandle THEN
      nullTransID ELSE newTransCoordinator.transID]
    };

  FinishEntry: ENTRY PROC [
    c: Handle,
    requestedOutcome: AlpineTransaction.RequestedOutcome,
    continue: BOOL] 
    RETURNS [AlpineTransaction.Outcome, Handle] = {
    -- ! (none);
    newTransCoordinator: Handle ← nullHandle;
    newTransID: TransID ← nullTransID;
    {
    IF c.finishInProgress THEN {
      -- wait for finish to complete, then return unknown
      WHILE c.finishInProgress DO
        WAIT finishComplete;
        ENDLOOP;
      RETURN [unknown, nullHandle];
      };
    c.finishInProgress ← TRUE;
    SELECT c.state FROM
      active => NULL;
        --normal--
      collecting, completing => NULL;
        --FinishEntry has been called from recovery--
      complete => GOTO Done;
        --FinishEntry has been called from two nearly simultaneous calls to Finish--
      ENDCASE => ERROR;
    IF requestedOutcome = abort THEN c.outcome ← abort;
    IF continue THEN {
      newTransCoordinator ← CreateWithWorkers[c];
      newTransID ← newTransCoordinator.transID;
      };

    -- General two-phase commit protocol.
    IF c.state = active THEN {
      Log.Force[followingRecord: c.forceRecord];
      c.state ← collecting;
      };
    -- c.state IN [collecting .. completing]
    DO
    -- Try to make progress toward c.state = complete.
    noneActive: BOOL ← TRUE;
    allComplete: BOOL ← TRUE;
    -- Get results from previous calls, if any.
    FOR w: Coordinator.WorkerHandle ← c.workers, w.rest UNTIL w = NIL DO
      WITH w.first.resultsOfMostRecentCall SELECT FROM
        n: Results.none => { };
        p: Results.prepare => {
          w.first.resultsOfMostRecentCall ← [none, none[]];
          w.first.lastPrepareResult ← p;
          SELECT p.communicationError FROM
            none => {
              w.first.communicationTrouble ← FALSE;
              SELECT p.prepareResult FROM
                notReady => { --vote no--
                  w.first.state ← complete;
                  IF c.outcome = commit THEN ERROR;
                  c.outcome ← abort;
                  };
                readOnlyReady => { w.first.state ← complete };
                ready => {
                  w.first.state ← ready;
                  c.aWorkerBecameReady ← TRUE;
                  };
                ENDCASE => ERROR;
              };
            bindingFailed, callFailed, busy => { --vote no--
              IF c.outcome = commit THEN ERROR;
              c.outcome ← abort;
              w.first.communicationTrouble ← TRUE;
              w.first.timeForNextCall ← BasicTime.Update[
                base: BasicTime.Now[], period: secondsWaitAfterCommFailure];
              };
            ENDCASE => ERROR;
          };
        f: Results.finish => {
          w.first.resultsOfMostRecentCall ← [none, none[]];
          w.first.lastFinishResult ← f;
          SELECT f.communicationError FROM
            none => {
              w.first.communicationTrouble ← FALSE;
              w.first.state ← complete;
              };
            bindingFailed, callFailed, busy => {
              w.first.communicationTrouble ← TRUE;
              w.first.timeForNextCall ← BasicTime.Update[
                base: BasicTime.Now[], period: secondsWaitAfterCommFailure];
              };
            ENDCASE => ERROR;
          };
        ENDCASE => ERROR;
      IF w.first.state = active THEN noneActive ← FALSE;
      IF w.first.state # complete THEN allComplete ← FALSE;
      ENDLOOP;
    -- Make state transitions if possible.
    IF c.state = collecting AND (c.outcome = abort OR noneActive) THEN {
      c.state ← completing;
      IF c.outcome = unknown THEN c.outcome ← commit;
      LogCoordinatorCompleting[c: c, outcome: c.outcome,
        force: c.outcome = abort OR c.aWorkerBecameReady];
      };
    IF c.state = completing AND allComplete THEN {
      c.state ← complete;
      LogCoordinatorComplete[c];
      GOTO Done;
      };
    -- Issue new calls.
    FOR w: Coordinator.WorkerHandle ← c.workers, w.rest UNTIL w = NIL DO
      IF w.first.state # complete AND w.first.callInProgress = none AND
        (NOT w.first.communicationTrouble OR IsTimeForNextCall[w]) THEN {
        IF c.state = collecting AND w.first.state = active THEN {
          CoordinatorInternal.PassParms[[prepare, c, w, newTransID, commit--ignored--]];
          w.first.callInProgress ← prepare;
          }
        ELSE IF c.state = completing THEN {
          CoordinatorInternal.PassParms[[finish, c, w, nullTransID--ignored--,
            IF c.outcome = commit THEN commit ELSE abort]];
          w.first.callInProgress ← finish;
          };
        };
      ENDLOOP;
    WAIT c.resultsReturned;
      -- wake up when a result is returned or by timeout
    ENDLOOP;
    EXITS Done => {
      c.finishInProgress ← FALSE;
      CoordinatorMap.Unregister[c];
      IF newTransCoordinator # nullHandle THEN
        CoordinatorMap.Register[newTransCoordinator];
      RETURN [c.outcome, newTransCoordinator] }
    }};--FinishEntry

  finishComplete: CONDITION;

  IsTimeForNextCall: PROC [w: Coordinator.WorkerHandle] RETURNS [BOOL] = INLINE {
    RETURN [BasicTime.Period[from: w.first.timeForNextCall, to: BasicTime.Now[]] >= 0]
    };

  LogCoordinatorRegisterWorker: PROC [
    c: Handle, w: AlpineImport.Handle] = {
    followingRecord: Log.RecordID;
    rec: Coordinator.RegisterWorkerLogRep ← [];
    ConvertUnsafe.AppendRope[to: LOOPHOLE[LONG[@rec.worker]], from: w.Name];
    [followingRecord: followingRecord] ← Log.CoordinatorWrite[c, coordinatorRegisterWorker,
      [base: @rec, length: TEXT[rec.length].SIZE]];
    IF NOT w.Equal[AlpineIdentity.myAlpineImportHandle] THEN
      c.forceRecord ← followingRecord;
    };

  LogCoordinatorCompleting: PROC [c: Handle,
    outcome: AlpineEnvironment.CommitOrAbort, force: BOOL] = {
    rec: Coordinator.CompletingLogRep ← [outcome: outcome];
    [followingRecord: c.forceRecord] ← Log.CoordinatorWrite[c, coordinatorCompleting,
      [base: @rec, length: Coordinator.CompletingLogRep.SIZE], force];
    };

  LogCoordinatorComplete: PROC [c: Handle] = {
    [followingRecord: c.forceRecord] ← Log.CoordinatorWrite[c, coordinatorComplete,
      Log.nullBlock];
    };

  AnalyzeCoordinatorBegin: PROC [record: RecordID, type: RecordType, trans: TransID] = {
    c: Handle = zone.NEW[Coordinator.Object ← [transID: trans, beginRecord: record]];
    CoordinatorMap.Register[c];
    CoordinatorInternal.NoticeCoordinatorBegin[trans];
    };

  AnalyzeCoordinatorRegisterWorker: PROC [
    record: RecordID, type: RecordType, trans: TransID] = {
    worker: AlpineImport.Handle;
      { -- get worker name from log, map it into a AlpineImport.Handle.
      rec: Coordinator.RegisterWorkerLogRep ← [];
      status: Log.ReadProcStatus;
      [status: status] ← Log.ReadForRecovery[thisRecord: record,
        to: [base: @rec, length: Coordinator.RegisterWorkerLogRep.SIZE]];
      IF status = destinationFull THEN ERROR InternalProgrammingError;
      worker ← AlpineImport.Register[
        server: ConvertUnsafe.ToRope[from: LOOPHOLE[LONG[@rec.worker]]]];
      };
      { -- map trans into a coordinator handle, then add worker to volatile state
      c: Handle = CoordinatorMap.GetHandle[trans];
      l: Coordinator.WorkerHandle;
      IF c = nullHandle THEN RETURN;
      IF c.state # active THEN ERROR;
      FOR l ← c.workers, l.rest UNTIL l = NIL DO
        IF worker.Equal[l.first.worker] THEN ERROR InternalProgrammingError;
        ENDLOOP;
      l ← zone.CONS[[worker: worker], c.workers];
      c.workers ← l
      };
    };

  AnalyzeCoordinatorCompleting: PROC [
    record: RecordID, type: RecordType, trans: TransID] = {
    outcome: AlpineEnvironment.CommitOrAbort;
      { -- get outcome from log
      rec: Coordinator.CompletingLogRep;
      status: Log.ReadProcStatus;
      [status: status] ← Log.ReadForRecovery[thisRecord: record,
        to: [base: @rec, length: Coordinator.CompletingLogRep.SIZE]];
      IF status # normal THEN ERROR InternalProgrammingError;
      outcome ← rec.outcome;
      };
      {
      c: Handle = CoordinatorMap.GetHandle[trans];
      IF c = nullHandle THEN RETURN;
      IF c.state # active THEN ERROR InternalProgrammingError;
      c.state ← completing;
      c.outcome ← outcome;
      IF c.outcome = commit THEN
        FOR l: Coordinator.WorkerHandle ← c.workers, l.rest UNTIL l = NIL DO
          l.first.state ← ready
          ENDLOOP;
      };
    };

  AnalyzeCoordinatorComplete: PROC [
    record: RecordID, type: RecordType, trans: TransID] = {
    c: Handle = CoordinatorMap.GetHandle[trans];
    IF c = nullHandle THEN RETURN;
    IF c.state # completing THEN ERROR InternalProgrammingError;
    c.state ← complete;
    CoordinatorMap.Unregister[c];
    };

  CallAfterAnalysisPass: PUBLIC PROC [] = {
    -- CoordinatorControl.CallAfterAnalysisPass
    CoordinatorInternal.InitTransIDGenerator[AlpineIdentity.myLogVolumeID];
    };

  CallAfterUpdatePass: PUBLIC PROC [] = {
    -- CoordinatorControl.CallAfterUpdatePass
    EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = {
      requestedOutcome: AlpineTransaction.RequestedOutcome ← abort;
      SELECT c.state FROM
        complete => ERROR;
        active, collecting => NULL;
        completing => IF c.outcome = commit THEN requestedOutcome ← commit;
        ENDCASE => ERROR;
      Process.Detach[FORK CoordinatorFinishProcess[c, requestedOutcome]];
      RETURN [stop: FALSE];
      };
    CoordinatorMap.LockedEnumerate[EnumProc];
    };

  CoordinatorFinishProcess: PROC [c: Handle,
    requestedOutcome: AlpineTransaction.RequestedOutcome] = {
    [] ← FinishEntry [c, requestedOutcome, FALSE];
    };

  CoordinatorCheckpointProc: PROC []
    RETURNS [keepRecord, startAnalysisRecord: RecordID] = {
    oldestBeginRecord: Log.RecordID ← Log.lastRecordID;
    EnumProc: PROC [c: Handle] RETURNS [stop: BOOL] = {
      oldestBeginRecord ← LogInline.Min[c.beginRecord, oldestBeginRecord];
      RETURN [stop: FALSE];
      };
    CoordinatorMap.UnlockedEnumerate[EnumProc];
    RETURN [oldestBeginRecord, oldestBeginRecord];
    };

  LogControl.RegisterAnalysisProc[coordinatorBegin, AnalyzeCoordinatorBegin];
  LogControl.RegisterAnalysisProc[coordinatorRegisterWorker, AnalyzeCoordinatorRegisterWorker];
  LogControl.RegisterAnalysisProc[coordinatorCompleting, AnalyzeCoordinatorCompleting];
  LogControl.RegisterAnalysisProc[coordinatorComplete, AnalyzeCoordinatorComplete];

  LogControl.RegisterCheckpointProc[CoordinatorCheckpointProc];

  Process.SetTimeout[@finishComplete, Process.SecondsToTicks[5]];
  Process.EnableAborts[@finishComplete];

  END.