-- File: RPCPktIO.mesa - last edit:
-- BJackson.pa          3-Jul-85  4:05:57
-- Created? HGM,       21-Jan-84 20:38:50
-- Cedar 5, HGM,       23-Jun-85 11:34:33
-- Yetch.  Similar patch on Recv, HGM, 12-Apr-83 21:58:43
-- Patch out myHost checking, HGM, 12-Apr-83 20:11:32

-- Copyright (C) 1983, 1984 , 1985, 1985, 1985 by Xerox Corporation. All rights reserved. 

-- RPC: Reliable transmission and reception of packets

-- RPCPktIO.mesa

-- Andrew Birrell  September 1, 1982 4:00 pm

DIRECTORY
  Buffer USING [PupBuffer, ReturnBuffer],
  Driver USING [GetInputBuffer],
  Frame USING [Free],
  Heap USING [systemZone],
  Inline USING [LongCOPY],
  Process USING [Detach, MsecToTicks],
  ProcessOperations USING [HandleToIndex, ReadPSB],
  PSB USING [PsbIndex, PsbNull],
  PupDefs USING [
    GetHopsToNetwork, GetLocalPupAddress, PupAddress,
    PupPackageMake, PupRouterSendThis, PupSocket, PupSocketDestroy, PupSocketMake,
    veryLongWait],
  PupTypes USING [PupAddress, PupHostID, PupNetID, PupSocketID],
  MesaRPC USING [CallFailed, unencrypted],
  RPCInternal USING [
    DecryptPkt, DoSignal, EncryptPkt, ReplyToRFA, RPCBinding, RPCPktStreams,
    RPCSecurity, ServerMain],
  MesaRPCLupine USING [DataLength, Dispatcher, Header, RPCPkt],
  RPCPkt USING [
    CallCount, Header, Machine, PktExchangeState, PktID, pktLengthOverhead],
  RPCPrivate USING [rpcSocket],
  System USING [GetClockPulses, MicrosecondsToPulses, Pulses];

RPCPktIO: MONITOR
  IMPORTS
    Buffer, Driver, Frame, Heap, Inline, Process, ProcessOperations, PupDefs, MesaRPC,
    RPCInternal, System
  EXPORTS MesaRPCLupine --Encapsulation,Header-- , RPCPkt  --PktExchange,IdleReceive--
  SHARES MesaRPCLupine =
  BEGIN

  Header: PUBLIC TYPE = RPCPkt.Header;

  ConcreteHeader: PROC [abstract: LONG POINTER TO MesaRPCLupine.Header]
    RETURNS [LONG POINTER TO Header] = INLINE {RETURN[abstract]};

  callSequence: RPCPkt.CallCount ← 0;  -- monotonic from this host --


  -- ******** sending and receiving packets ******** --

  myHost: RPCPkt.Machine;

  sent: CARDINAL ← 0;
  recvd: CARDINAL ← 0;
  retransmitted: CARDINAL ← 0;

  -- "PktExchange" implements a reliable packet exchange.  There are five cases:
  --    sending: transmit data, wait for ack
  --    receiving: transmit ack, wait for data
  --    call: transmit data, wait for data
  --    endCall: transmit data, wait for ack or data (data => start of new call)
  --    authReq: transmit RFA, wait for data (like "call", but no retransmissions)
  -- Data and RFA packets are retransmitted until an ack is received.
  -- Ack packets are retransmitted only in response to "pleaseAck" requests
  -- No acknowledgement after 14 transmissions is fatal (CallFailed[timeout])
  -- When the transmitted packet has been acknowledged (if needed), a ping is
  -- sent periodically (ack packet saying pleaseAck).  If necessary the ping is
  -- retransmitted until it has been acked.  Failure to receive an ack for the ping
  -- is fatal (CallFailed[timeout]).  Provided pings continue to be acknowledged,
  -- there is no limit on how long we will wait for the next data packet.  If the
  -- client gets impatient, he can abort this process.  (This corresponds to the
  -- local-machine semantics of a procedure call.)

  minRetransmitMsecs: CARDINAL = 100;  -- retransmit delay for local net --
  msecsPerHop: CARDINAL = 500;  -- approximate typical gateway hop delay? --
  minPingMsecs: CARDINAL = 5000;  -- initial interval between pings --
  maxPingSecs: CARDINAL = 300;  -- long-term ping interval --
  maxTransmissions: CARDINAL = 20;  -- give up after too many transmissions --
  signalTimeout: BOOLEAN ← TRUE;  -- debugging switch --

  -- The retransmission delay is incremented by minPingMsecs each time we timeout,
  -- but is reset when we receive the desired packet.  If n=maxTransmissions
  -- and i=minRetransmitMsecs and j=hops*msecsPerHop, we give up after
  --   i*(n*n+n)/2+n*j msecs.
  -- For the local network, that comes to 21 seconds.
  -- For a two-hop route, that comes to 41 seconds.
  -- The ping delay is doubled at each ping, up to maxPingSecs, which is 5 minutes;
  -- The maxPingSecs value is reached after about 5 minutes.

  minRetransmitPulses: System.Pulses = System.MicrosecondsToPulses[
    LONG[1000]*minRetransmitMsecs];
  pulsesPerHop: System.Pulses = System.MicrosecondsToPulses[
    LONG[1000]*msecsPerHop];
  minPingPulses: System.Pulses = System.MicrosecondsToPulses[
    LONG[1000]*minPingMsecs];
  maxPingPulses: System.Pulses = System.MicrosecondsToPulses[
    LONG[1000]*LONG[1000]*maxPingSecs];
  transmitLocalPkts: BOOLEAN ← TRUE;


  PktExchange: PUBLIC PROC [
    inPkt: MesaRPCLupine.RPCPkt, length: MesaRPCLupine.DataLength,
    maxlength: MesaRPCLupine.DataLength, state: RPCPkt.PktExchangeState,
    signalHandler: MesaRPCLupine.Dispatcher ← NIL]
    RETURNS [newPkt: BOOLEAN, newLength: MesaRPCLupine.DataLength] =
    BEGIN
    -- On exit if newPkt=TRUE, the packet has been decrypted if needed. --

    -- Normally, transmits inPkt and copies result into inPkt.
    -- If a signal occurs, calls DoSignal which handles the signal protocol
    -- up to the last resumption packet.  DoSignal returns the last resume
    -- packet for us to transmit, which we do by assigning it to outPkt;
    -- that packet was allocated in DoSignal's local frame, which we must
    -- later deallocate.
    outPkt: MesaRPCLupine.RPCPkt ← inPkt;  -- altered after a signal --
    outPktFrame: POINTER ← NIL;  -- DoSignal's local frame --

    DO  -- loop for signal handlers --

      sentTime: System.Pulses;  -- initialized after sending any packet --

      NewCallNumber: ENTRY PROC RETURNS [RPCPkt.CallCount] = INLINE {
        RETURN[callSequence ← callSequence + 1]};

      reply: Buffer.PupBuffer;
      recvdHeader: LONG POINTER TO Header;
      myPSB: PSB.PsbIndex = ProcessOperations.HandleToIndex[
        ProcessOperations.ReadPSB[]];
      acked: BOOLEAN;
      thisPktID: RPCPkt.PktID;
      header: LONG POINTER TO Header = @outPkt.header;
      pingPulses: System.Pulses ← minPingPulses;
      header.srceHost ← myHost;
      header.srceSoc ← RPCPrivate.rpcSocket;
      header.srcePSB ← myPSB;
      IF header.pktID.pktSeq = 0  -- first packet of a call;  yucky interface! --
        THEN
        BEGIN
        header.type ←
          SELECT state FROM
            sending => [0, rpc, notEnd, pleaseAck, call],
            call => [0, rpc, end, dontAck, call],
            authReq => [0, rpc, end, dontAck, rfa],
            ENDCASE => --receiving, endCall-- ERROR;
        header.pktID.callSeq ← NewCallNumber[];
        header.pktID.pktSeq ← 1;
        acked ← FALSE;
        END
      ELSE
        BEGIN
        header.type ←
          SELECT state FROM
            sending => [0, rpc, notEnd, pleaseAck, data],
            receiving => [0, rpc, end, dontAck, ack],
            call => [0, rpc, end, dontAck, data],
            endCall => [0, rpc, end, dontAck, data],
            ENDCASE => --authReq-- ERROR;
        IF state # receiving  --header.type.class = data --
          THEN {header.pktID.pktSeq ← header.pktID.pktSeq + 1; acked ← FALSE}
        ELSE acked ← TRUE;
        END;
      thisPktID ← header.pktID;
      SetWanting[myPSB];

      DO  -- loop for pings --
        ENABLE
          UNWIND => {
            ClearWanting[myPSB];
            IF outPktFrame # NIL THEN Frame.Free[outPktFrame]};
        BEGIN
        transmissions: CARDINAL ← 0;
        retransmitPulses: System.Pulses ← [minRetransmitPulses +
              pulsesPerHop*PupDefs.GetHopsToNetwork[header.destHost.net]];

        IF outPkt.convHandle # MesaRPC.unencrypted THEN
          header.length ← RPCInternal.EncryptPkt[outPkt, length]
        ELSE header.length ← length + RPCPkt.pktLengthOverhead;
        header.oddByte ← no;

        DO  -- loop for retransmissions --
          BEGIN
          GeneralSend[outPkt];
          sentTime ← System.GetClockPulses[];
          sent ← sent + 1;
          -- wait for response: an ack or the reply or a timeout --

          DO  -- loop for each incoming packet (including garbage) --
            reply ← MyReceive[
              myPSB, sentTime, (IF acked THEN pingPulses ELSE retransmitPulses)];
            IF reply = NIL THEN
              IF acked THEN GOTO ping
              ELSE {header.type.ack ← pleaseAck; GOTO retransmit};
            recvdHeader ← LOOPHOLE[@reply.pup.pupLength];
            IF recvdHeader.type.class = rfa THEN
              BEGIN
              IF RPCInternal.ReplyToRFA[
                reply, header,  --encrypted--
                thisPktID --clear-- , inPkt.convHandle] THEN
                  NULL-- can't set "acked", because we must retransmit our data
                      -- packet until we obtain the correct destPSB from some ack pkt --;
              LOOP
              END;
            IF outPkt.convHandle # MesaRPC.unencrypted
              AND recvdHeader.conv = header.conv
              AND recvdHeader.srceHost = header.destHost THEN
              [, newLength] ← RPCInternal.DecryptPkt[
                recvdHeader, outPkt.convHandle]
            ELSE newLength ← recvdHeader.length - RPCPkt.pktLengthOverhead;
            IF recvdHeader.conv = header.conv
-- PATCH      AND recvdHeader.srceHost = header.destHost
              AND recvdHeader.pktID.activity = thisPktID.activity THEN  -- pkt is related to our call --
              SELECT TRUE FROM
                recvdHeader.pktID.callSeq = thisPktID.callSeq =>
                  -- pkt is in our call --
                  SELECT TRUE FROM
                    -- a) pkt has next sequence number to ours --
                    recvdHeader.pktID.pktSeq = 1 + thisPktID.pktSeq =>
                      BEGIN
                      IF state = sending OR state = endCall THEN  -- he's not allowed to generate that pktSeq! --
                        ERROR MesaRPC.CallFailed[
                          runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
                      SELECT recvdHeader.type.class FROM
                        data => GOTO done;
                        ack =>
                          BEGIN
                          -- This can happen if state=call and callee sent next data
                          -- pkt, but it was lost and now he is responding to our
                          -- retransmission or ping.  Soon, he will retransmit his
                          -- data pkt. --
                          acked ← TRUE;
                          GiveBackBuffer[reply];
                          END;
                        ENDCASE =>  --call,rfa--
                          ERROR MesaRPC.CallFailed[
                            runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
                      END;
                    -- b) pkt has same sequence number as ours --
                    recvdHeader.pktID.pktSeq = thisPktID.pktSeq =>
                      BEGIN
                      SELECT recvdHeader.type.class FROM
                        ack =>  -- acknowledgement of our packet --
                          BEGIN
                          IF header.type.class = call THEN
                            header.destPSB ← recvdHeader.srcePSB;
                          acked ← TRUE;
                          IF state = sending OR state = endCall THEN {
                            GiveBackBuffer[reply]; reply ← NIL; GOTO done}
                          ELSE  -- state = call, authReq, or receiving --
                            -- Even if "pleaseAck", we don't need to ack it,
                            -- because other end should send data pkt soon --
                            GiveBackBuffer[reply];
                          END;
                        data, call =>  -- retransmisson of his packet --
                          IF state = receiving THEN
                            IF recvdHeader.type.ack = pleaseAck THEN
			      BEGIN
			      GiveBackBuffer[reply]; reply ← NIL;
			      GOTO retransmit -- we're already sending an ack --
			      END
                            ELSE GiveBackBuffer[reply]
                          ELSE
                            ERROR MesaRPC.CallFailed[
                              runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
                        ENDCASE =>  --rfa --
                          ERROR MesaRPC.CallFailed[
                            runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
                      END;
                    -- c) pkt has earlier sequence number than ours --
                    recvdHeader.pktID.pktSeq < thisPktID.pktSeq =>
                      GiveBackBuffer[reply];  -- no need to ack it --
                    -- d) pkt has some future sequence number --
                    ENDCASE =>
                      ERROR MesaRPC.CallFailed[
                        runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
                recvdHeader.pktID.callSeq > thisPktID.callSeq AND state = endCall
                  =>
                  BEGIN
                  IF recvdHeader.type.class # call THEN
                    ERROR MesaRPC.CallFailed[
                      runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
                  -- acks our last packet, so we can handle the call --
                  GOTO done
                  END
                ENDCASE =>  -- wrong call: let someone else do it --
                  {recvdHeader.destPSB ← PSB.PsbNull; EnqueueAgain[reply]}
            ELSE  -- wrong conversation or activity: let someone else do it --
              {recvdHeader.destPSB ← PSB.PsbNull; EnqueueAgain[reply]};
            -- Incoming RFA packets don't reach here. --
            ENDLOOP --for each incoming packet-- ;
          EXITS
            retransmit =>
              BEGIN
              transmissions ← transmissions + 1;
              IF (transmissions = maxTransmissions AND signalTimeout)
              OR state = authReq THEN
                 -- Don't retransmit RFA: caller will retransmit call pkt.
                 -- Otherwise, if a spurious worker process occurred because of call pkt
                 -- retransmission before response to RFA, then the spurious worker
                 -- process sits needlessly retransmitting RFA's until it times out.
                {SIGNAL MesaRPC.CallFailed[timeout]; transmissions ← 0};
              retransmitted ← retransmitted + 1;
              retransmitPulses ← [retransmitPulses + minRetransmitPulses];
              END;
          END;
          ENDLOOP -- for each transmission -- ;
        EXITS
          ping =>
            BEGIN
            header.type ← [0, rpc, end, pleaseAck, ack];
            length ← 0;
            header.pktID ← thisPktID;
            acked ← FALSE;
            pingPulses ← [MIN[maxPingPulses, pingPulses*2]];
            END;
        END;
        -- only exit from loop is "GOTO done" or UNWIND --
        REPEAT
          done =>
            BEGIN
            -- This isn't covered by any UNWIND --
            ClearWanting[myPSB];
            IF outPktFrame # NIL THEN Frame.Free[outPktFrame];
            IF reply = NIL THEN {
              --restore clear pktID-- header.pktID ← thisPktID; RETURN[FALSE, ]}
            ELSE
              BEGIN
              IF recvdHeader.outcome = signal AND state = call
                AND signalHandler # NIL THEN
                [outPkt, length, outPktFrame] ← RPCInternal.DoSignal[
                  reply, newLength, signalHandler, inPkt.convHandle]
              ELSE
                BEGIN
                IF newLength > maxlength THEN
                  ERROR MesaRPC.CallFailed[
                    runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
                Inline.LongCOPY[
                  from: recvdHeader, to: @inPkt.header,
                  nwords: recvdHeader.length];
                GiveBackBuffer[reply];
                RETURN[TRUE, newLength]
                END;
              END
            END;
        ENDLOOP -- for each ping-- ;
      -- We get here only after coming back from a call of DoSignal --
      ENDLOOP -- for signal handlers -- ;
    -- we can't get here! --
    END;

  GeneralSend: PROC [pkt: MesaRPCLupine.RPCPkt] =
    BEGIN
    b: Buffer.PupBuffer;
    UNTIL (b ← Driver.GetInputBuffer[TRUE]) # NIL DO ENDLOOP;
    Inline.LongCOPY[
      from: @(pkt.header), to: @(b.pup.pupLength),
      nwords: ConcreteHeader[@pkt.header].length];
    b.type ← pup;
    PupDefs.PupRouterSendThis[b];
    END;


  idlerPkt: Buffer.PupBuffer ← NIL;
  idlerCond: CONDITION ← [timeout: 0];
  waiterCond: CONDITION ← [timeout: Process.MsecToTicks[minRetransmitMsecs]];
  WaiterArray: TYPE = ARRAY PSB.PsbIndex OF Buffer.PupBuffer;
  waiterPkts: LONG POINTER TO WaiterArray ← Heap.systemZone.NEW[
    WaiterArray ← ALL[NIL]];
  wanting: PACKED ARRAY PSB.PsbIndex OF BOOLEAN ← ALL[FALSE];  -- PSB expects a pkt --

  SetWanting: ENTRY PROC [myPSB: PSB.PsbIndex] = INLINE {wanting[myPSB] ← TRUE};

  ClearWanting: PROC [myPSB: PSB.PsbIndex] = INLINE
    BEGIN
    spare: Buffer.PupBuffer;
    InnerClear: ENTRY PROC = INLINE
      BEGIN
      wanting[myPSB] ← FALSE;
      IF (spare ← waiterPkts[myPSB]) # NIL THEN waiterPkts[myPSB] ← NIL;
      END;
    InnerClear[];
    IF spare # NIL THEN GiveBackBuffer[spare] --ignore it, outside monitor-- ;
    END;


  MyReceive: ENTRY PROC [myPSB: PSB.PsbIndex, sentTime, waitTime: System.Pulses]
    RETURNS [recvd: Buffer.PupBuffer] = INLINE
    BEGIN
    ENABLE UNWIND => NULL;
    -- Returns NIL if no packet arrives within waitTime pulses after sentTime --
    DO
      IF (recvd ← waiterPkts[myPSB]) = NIL THEN
        IF System.GetClockPulses[] - sentTime < waitTime THEN WAIT waiterCond
        ELSE EXIT
      ELSE {waiterPkts[myPSB] ← NIL; EXIT};
      ENDLOOP;
    END;

  RecvdPktTooLong: ERROR = CODE;
  KillServer: ERROR = CODE;
  servers: CARDINAL ← 0;
  serversMax: CARDINAL ← 20;
  idlers: CARDINAL ← 0;
  idlersMax: CARDINAL = 6;
  idlersMin: CARDINAL = 2;
  -- Number of server processes will never exceed "serversMax".
  -- If number of idle servers exceeds "idlersMax", one will abort.
  -- If number of idle servers drops below "idlersMin", one is forked.

  IdleReceive: PUBLIC PROC [pkt: MesaRPCLupine.RPCPkt, maxlength: CARDINAL] =
    BEGIN
    b: Buffer.PupBuffer;
    InnerIdler: ENTRY PROC = INLINE
      BEGIN
      IF idlers >= idlersMax THEN {
        servers ← servers - 1; RETURN WITH ERROR KillServer[]};
      idlers ← idlers + 1;
      DO WAIT idlerCond; IF idlerPkt # NIL THEN EXIT ENDLOOP;
      IF idlers < idlersMin AND servers < serversMax THEN {
        servers ← servers + 1; Process.Detach[FORK Server[]]};
      b ← idlerPkt;
      BEGIN
      recvdHeader: LONG POINTER TO Header = LOOPHOLE[@b.pup.pupLength];
      idlerPkt ← LOOPHOLE[idlerPkt.next, Buffer.PupBuffer];
      b.next ← NIL;
      IF recvdHeader.length > maxlength THEN ERROR RecvdPktTooLong[]  --NULL??--
      ELSE
        Inline.LongCOPY[
          from: @b.pup.pupLength, to: @pkt.header, nwords: recvdHeader.length];
      END;
      END;
    InnerIdler[];
    GiveBackBuffer[b];  --outside monitor--
    END;

  QueuesScrambled: ERROR = CODE;


  GiveBackBuffer: PROC [b: Buffer.PupBuffer] = Buffer.ReturnBuffer;



  Listener: PROC =
    BEGIN
    -- Catch any packets that get as far as the Pup socket level --
    soc: PupDefs.PupSocket = PupDefs.PupSocketMake[
      local: RPCPrivate.rpcSocket, remote:, ticks: PupDefs.veryLongWait];
    DO
      ENABLE ABORTED => EXIT;
      b: Buffer.PupBuffer = soc.get[];
      IF NOT AcceptPkt[b] THEN Buffer.ReturnBuffer[b];
      ENDLOOP;
    PupDefs.PupSocketDestroy[soc];
    END;

  AcceptPkt: PROC[b: Buffer.PupBuffer] RETURNS[BOOLEAN] =
    { RETURN[ EnqueueRecvd[b] ] };
  EnqueueAgain: PUBLIC PROC [b: Buffer.PupBuffer] =
    -- This is a procedure mainly for debugger breakpoints! --
    {IF NOT EnqueueRecvd[b] THEN Buffer.ReturnBuffer[b]};

  EnqueueRecvd: PUBLIC ENTRY PROC [b: Buffer.PupBuffer] RETURNS [BOOLEAN] =
    BEGIN
    -- despatch packet to appropriate RPC process, if any --
    -- Packet is known to be a Pup, and is addressed to our socket. --
    header: LONG POINTER TO Header = LOOPHOLE[@b.pup.pupLength];
    IF --header.destHost = myHost AND-- header.type.subType = rpc THEN
      BEGIN
      destPSB: PSB.PsbIndex = header.destPSB;
      recvd ← recvd + 1;
      IF destPSB NOT IN (PSB.PsbNull..LAST[PSB.PsbIndex]] OR NOT wanting[destPSB]
        THEN  -- give to idle process to deal with --
        -- Pkts are dealt with LIFO by idlers, but it doesn't matter (much)
        BEGIN
        IF idlers = 0 THEN  -- server too busy: throw it away! --
          RETURN[FALSE]
        ELSE {
          b.next ← idlerPkt; idlerPkt ← b; idlers ← idlers - 1; NOTIFY idlerCond};
        END
      ELSE  -- someone wants this packet: give them it --
        BEGIN
        IF waiterPkts[destPSB] # NIL THEN RETURN[FALSE];
        waiterPkts[destPSB] ← b;
        BROADCAST waiterCond;
        END;
      RETURN[TRUE]
      END
    ELSE RETURN[FALSE];
    END;


  listenerProcess: PROCESS;

  Server: PROC = BEGIN RPCInternal.ServerMain[ ! KillServer => CONTINUE]; END;




  -- ******** Initialization ******** --

  Initialize: ENTRY PROC =
    BEGIN
    myAddr: PupTypes.PupAddress;
    [] ← PupDefs.PupPackageMake[];
    myAddr ← PupDefs.GetLocalPupAddress[RPCPrivate.rpcSocket, NIL];
    myHost ← [myAddr.net, myAddr.host];
    START RPCInternal.RPCBinding;  -- exports "RPCInternal.exportTable" --
    START RPCInternal.RPCSecurity;  -- exports "RPCInternal.firstConversation" --
    START RPCInternal.RPCPktStreams;  -- initialize connection states --
    servers ← servers + 1;
    Process.Detach[FORK Server[]];
    listenerProcess ← FORK Listener[];
    END;

  Initialize[];

  END.