/* RPCPktIO.c */
/*
 Last modified by D. Swinehart, November 11, 1982 10:06 am
 L. Stewart October 21, 1982  1:17 PM, change RPC process stack size
 L. Stewart November 28, 1982  3:08 PM,
    formatting, InitQueue, remove AllocZero
 L. Stewart December 27, 1982  2:47 PM,
    remove all nested declarations, (compiler bugs, junp out of loops)
 L. Stewart & DCS, ack debugging, rpct, January 21, 1983  4:33 PM
 L. Stewart, March 6, 1983  4:08 PM, remove Zone and Alloc
 D. Swinehart, March 31, 1983 10:39 am, update to match Cedar 4.0 protocol implementation.
 L. Stewart, March 31, 1983  2:59 PM, split into rpcpktiob.c
 LCS, DCS, April 14, 1983  4:10 PM, decrease call failed
 LCS, DCS, April 28, 1983  2:14 PM, comment bug, conv matching
 */

#include <Context.h>
#include <Queue.h>
#include <Env.h>
#include <Signal.h>
#include <rpc.h>
#include <rpcinternal.h>
#include <rpclupine.h>
#include <rpcpkt.h>

    /* Imported from RPCSecurity, RPCPktStreams */
extern int /*BOOL*/ ReplyToRFA();
extern SignalInitialize();

    /* From RPCSignals */
extern int CallFailed;

    /* From Pup Package */
extern SendPup();
extern WaitUntilSent();
extern struct PBI *GetPBI();
extern int GetPupHost(); /* returns machine, but can't declare that */
extern InitPupLevel1();
extern OpenLevel1Socket(/*lclPort, PortProc, queue*/);
extern RequestRoute(/*net*/);
extern InitCalendar(/*retries*/);
extern int Min();
extern int/*BOOL*/ MultEq();
extern int/*BOOL*/ DoubleEq();
extern ReleasePBI();

    /* From Queue */
extern int *Dequeue();
extern Enqueue();
extern InitQueue();

    /* From Timer package */
extern SetTmr();
extern int /*BOOL*/ TmrExp();

    /* From Context */
extern Block();
extern CurrentContext();

    /* From RPCUtils */
extern int DoubleComp();
extern int DoubleInc();
extern Move2();
extern StartNProcess();

    /* From OS */
extern int *CallersFrame();
extern MoveBlock();
extern Zero();

union Machine myHost;	/* net,,host */
struct Queue *myCtxQ;
int *mySoc;
static struct CallCount callSequence;	/* pointer to CallCount generator */
static int sent;	/* statistics */
static int recvd;
static int retransmitted;
static int ReceivePktTooLong; /* SIGNAL */
static int listenerRunning;	/* set false to abort listener */
static int signalTimeout;	/* debugging: false to avoid transmission timeouts */
static int maxTransmissions /* Normally a large constant; reduced during broadcasts. */

/* Listener-server statics */
struct Queue	idlerQ;
static int idlers;	/* CARDINAL; -- number idle servers */
static int wanting[maxPSBp1];	/* ARRAY PSB.PsbIndex OF BOOLEAN */
static struct PBI *waiterPkts[maxPSBp1];	/* ARRAY PSB.PsbIndex OF BufferDefs.PupBuffer */
static struct RPCCtx *contexts[maxPSBp1];	/* contexts corresp. to wanting, for debugging. */

/* "PktExchange" implements a reliable packet exchange.
  There are three cases: 
      calling: transmit data, wait for data 
      endCall: transmit data, wait for ack or data (data => start of new call) 
      authReq: transmit RFA, wait for data (like "call", but no retransmissions) 

Data and RFA packets are retransmitted until an ack is received. 
Ack packets are retransmitted only in response
    to "pleaseAck" requests 
No acknowledgement after 14 transmissions is fatal
    (returns timeout) 
When the transmitted packet has been acknowledged (if needed), a ping is sent periodically (ack packet saying pleaseAck).  If necessary the ping is retransmitted until it has been acked.  Failure to receive an ack for the ping is fatal ( returns timeout).  Provided pings continue to be acknowledged, there is no limit on how long we will wait for the next data packet.  If the client gets impatient, he can reinitialize the package.  (This corresponds to the local-machine semantics of a procedure call.)
 */

#define minRetransmitMsecs 250
	/* retransmit delay, local or remote, happy medium */
#define minPingMsecs 5000
	/* initial interval between pings */
#define maxPingSecs 30
	/* long-term ping interval, was 300 sec, but that wraps! */
#define defaultMaxTransmissions 5
	/* give up after too many */
int broadcastRetransmissions;
	/* broadcasts succeed quickly or not at all -- used for binding, etc. */

/* The retransmission delay is incremented by minPingMsecs each time we timeout, but is reset when we receive the desired packet.  If n=maxTransmissions and i=minRetransmitMsecs, we give up after i*(n*n+n)/2+n msecs. The algorithm operates the same for any number (including 0) of gateway hops. That comes to around 20 seconds. The ping delay is doubled at each ping, up to maxPingSecs, which is 5 minutes; The maxPingSecs value is reached after about 5 minutes. If we are willing to limit the duration of a procedure call, it may be possible to eliminate the pings, allowing a simple timeout on the endCall.
 */

static int minRetransmitPulses; /* Must be set up in initialization code */
static int minPingPulses;	/*  to above values in some reasonable tick */
static int maxPingPulses;	/*  units. */

int rpct;

/**/
static Cl(sig, code, sl)
  int sig, code, *sl;
  {
  CleanUp(sl);
  };

int /* newLength: */
PktExchange(inPkt, length, maxLength, state)
  struct PBI *inPkt;
  int length, maxLength, state;
  {
  /* returns -1 if no packet */
  /* inPkt is a pbi, properly loaded (except for length, I guess.) 
        On exit if newPkt=TRUE, the packet has been decrypted if needed. 
        Normally, transmits inPkt and copies result into inPkt. 
        If a signal occurs, die a horrible death.  We don't do signals. */

  struct PBI *reply;
  struct Seal1 s1, *sl;
  struct Queue myQ;
  int myPSB, acked, pingPulses;
  struct RPCCtx *CtxRunning;
  struct Header *header, *recvdHeader;
  struct PktID thisPktID;
  int newLength, pktLength;
  int transmissions, retransmitPulses, pLen;
  int /* BOOL */ isConv;
  struct PBI *rp;
  
  sl = &s1; s1.data[0] = (int) &reply; reply=0;
  DISABLE(sl);

  /* WARNING: calling context must be of "type" RPCCtx */
  CtxRunning = CurrentContext();
  myPSB = CtxRunning->user.myPSB;
  header = inPkt->pup;
  myQ.head = 0;
  inPkt->queue = &myQ;
  InitQueue(&myQ);
  Enqueue(&myQ, inPkt);
  pingPulses = minPingPulses;
  header->srceHost.w = myHost.w;
  header->srceSoc.LS = rpcSocket;
  header->srcePSB = myPSB;
  maxTransmissions = defaultMaxTransmissions;
  if (header->destHost.b.host == 0) {
    /* This is a broadcast packet!! */
    maxTransmissions = broadcastRetransmissions;
    header->destSoc.LS = rpcBcstSocket;
    };
  if (header->pktID.pktSeq == 0) {
    switch (state) {
      case calling: header->type.fields = typeCall; break;
      case authReq: header->type.fields = typeRFA; break;
      default: SIGNAL(ERROR, 0);
      }; /* endCall */
    NewCallNumber(&header->pktID.callCt);
    header->pktID.pktSeq=swapped1;
    };
  else {
    switch (state) {
      case endCall: header->type.fields = typeData; break;
      case calling: /* Subsequent call packets -- we don't do it. */
      default: SIGNAL(ERROR, 0);
      }; /* authReq */
    header->pktID.pktSeq += swapped1;
    };
  acked=false;
  MoveBlock(&thisPktID, &header->pktID, lenPktID);
  ENABLE(UNWIND, &Cl, sl);
  for (;;) { /* loop for Pings */
    transmissions = 0;
    retransmitPulses=minRetransmitPulses;
    if (inPkt->convHandle != unencrypted)
      pLen = EncryptPkt(inPkt, length);
    else pLen = (length + pktLengthOverhead);
    header->length = swab(pLen<< 1);
  
    for (;;) { /* loop for ReTransmissions */
      SetWanting(myPSB);
  	
      WaitUntilSent(inPkt);	/* make sure transmission's complete??? */
      SendPup(inPkt);
      sent += 1;
      /* wait for response: an ack or the reply or a timeout -- */
      for (;;) { /* loop for Receiving each incoming packet (including garbage) */
  
        /* Main Packet Receive Loop */

/*  start of include <RPCRcvPktIO.ic>  */
/* RPCRcvPktIO.c -- inner loop of PktExchange routine
	Last edited by D. Swinehart, July 20, 1982  2:04 PM
	
	This whole thing is embedded in a "while (true) { }" loop */

SetWanting(myPSB);
reply = MyReceive(myPSB, ((acked)? pingPulses: retransmitPulses));
if (reply == 0)
	if (acked) goto Ping;
	else { header->type.fields |= ackField; goto Retransmit; };
recvdHeader = reply->pup;
/* SELECT TRUE FROM .. */
if ((recvdHeader->type.fields&classField) == rfa) {
        rp=reply;
        reply=0;
	if (ReplyToRFA(rp, header /*encrypted*/,
	&thisPktID /*clear*/, inPkt->convHandle)) acked=acked;
		/* can't set "acked", because we must retransmit our data packet until we obtain the
			correct destPSB from some ack pkt */
	continue; };
	/* XOR should be 0 for match, unless call was a broadcast and response isn't */
isConv = swab(recvdHeader->srceHost.w ↑ header->destHost.w); /* comparison results */
isConv = (isConv==0 || isConv==recvdHeader->srceHost.b.host); /* equal or near-equal */
isConv = (DoubleEq(&recvdHeader->conv,&header->conv)  && isConv); /*our conv*/
if ((inPkt->convHandle != unencrypted) && isConv) {
	DecryptPkt(recvdHeader, inPkt->convHandle, &newLength);
	pktLength = newLength + pktLengthOverhead; }
else {
	pktLength = (swab(recvdHeader->length)) >> 1;
	newLength = pktLength - pktLengthOverhead; };
if (isConv && (recvdHeader->pktID.activity == thisPktID.activity))
	/* pkt is related to our call -- */
	/* SELECT TRUE FROM .. */
	if (DoubleEq(&recvdHeader->pktID.callCt, &thisPktID.callCt))
		/* pkt is in our call */
	    /* SELECT TRUE FROM .. */
	    
	    /* a) pkt has next sequence number to ours -- */
	    if (recvdHeader->pktID.pktSeq == swapped1+thisPktID.pktSeq) {
			if (state == endCall) /* he's not allowed to generate that pktSeq! */
	        	SIGNAL(CallFailed, runtimeProtocol);
			switch (recvdHeader->type.fields&classField) {
	        	case xtraData: break;
	        	case ack: {
	        		/* This can happen if state=call and callee sent next data pkt, but it was
	        			lost and now he is responding to our retransmission or ping.  Soon, he
	        			will retransmit his data packet. */
	        			acked = true;
	        			CleanUp(sl);
	        			continue; };
	        	default: SIGNAL(CallFailed, runtimeProtocol); }; /* call, rfa */
			goto Done; };
		
	    /* b) pkt has same sequence number as ours -- */
	    else if (recvdHeader->pktID.pktSeq == thisPktID.pktSeq) {
			switch (recvdHeader->type.fields&classField) {
				case ack:  { /* acknowledgement of our packet -- */
					if ((header->type.fields&classField) == call) header->destPSB = recvdHeader->srcePSB;
					acked = true;
					if (state == endCall) { CleanUp(sl); goto Done; };
		  			else /* state = calling or authReq -- */
		  				/* Even if "pleaseAck, we don't need to ack it,
		  				because other end should send data pkt soon */
		  				CleanUp(sl);
		  			break; };
	        	case xtraData: case call: /* retransmission of his packet;
	      							      interesting only in multi-packet systems */
	        	default: SIGNAL(CallFailed, runtimeProtocol); };
			};
			
		/* c) pkt has earlier sequence number than ours --
			   [ assume sequence numbers stay small enough to compare "nums" directly ]	  */
	    else if (recvdHeader->pktID.pktSeq < thisPktID.pktSeq) CleanUp(sl); /* no need to ack it */
	    
	    /* d) pkt has some future sequence number -- */
	    else SIGNAL(CallFailed, runtimeProtocol);
	/* end pkt is in our call */
	else
		if (((DoubleComp(&recvdHeader->pktID.callCt, &thisPktID.callCt)+1) > 1) &&
			(state == endCall)) {
		if ((recvdHeader->type.fields&classField) != call) SIGNAL(CallFailed, runtimeProtocol);
	    /* acks our last packet, so we can handle the call -- */
	    goto Done; };
	/* ... eliminate redundancy someday. */
	else { /* wrong call; let someone else do it -- */
		recvdHeader->destPSB = nullPSB; EnqueueAgain(reply); reply=0; };
	/* end pkt is in our conversation */
else { recvdHeader->destPSB = nullPSB; EnqueueAgain(reply); reply=0; };
/* end packet analysis */
/*  end of include <RPCRcvPktIO.ic>  */
  
        /* Incoming RFA packets don't reach here */
        }; /* endloop for each incoming packet */
/**/
      /* EXITS */
      Retransmit:
        transmissions += 1;
        if ((transmissions == maxTransmissions) || state == authReq) {
        	/* Don't retransmit RFA: caller will retransmit call pkt.
        		Otherwise, if a spurious worker process occurred because of call pkt
        		retransmission before response to RFA, then the spurious worker
        		process sits needlessly retransmitting RFA's until it times out. */
        if (signalTimeout) SIGNAL(CallFailed, timeout);
        transmissions = 0; retransmitPulses = minRetransmitPulses*maxTransmissions;
        };
      retransmitted += 1;
      retransmitPulses += minRetransmitPulses;
      Block();
      continue;
      }; /* end loop for retransmissions */
    /* EXITS .. */
    Ping:
      header->type.fields = typePing;
      length=0;
      MoveBlock(&header->pktID, &thisPktID, lenPktID);
      acked=false;
      pingPulses = pingPulses*2;
      if ( (pingPulses<0) || (pingPulses>maxPingPulses) ) pingPulses = maxPingPulses;
      Block();
    }; /* end loop for pings -- */
  /* EXITS .. */
  Done: {
    ClearWanting(myPSB);
    if (reply == 0) {
      MoveBlock(&header->pktID, &thisPktID, lenPktID);
      return -1;
      };
    if ((recvdHeader->callSpec.outcome == signal) ||
      (newLength > maxLength)) SIGNAL(CallFailed, runtimeProtocol);
    MoveBlock(header, recvdHeader, pktLength);
    CleanUp(sl);
    Block();
    return newLength;
    };
  /* Since we're not handling signals, we can't get here. */
  SIGNAL(ERROR, 0);
  };

static CleanUp(sl)
  struct Seal1 *sl;
  { /* UNWIND proc */
    /* WARNING: calling context must be of "type" RPCCtx */
  int myPSB, *lvReply;
  struct RPCCtx *CtxRunning;
  CtxRunning = CurrentContext();
  myPSB = CtxRunning->user.myPSB;
  ClearWanting(myPSB);
  lvReply = (int *) sl->data[0];
  if (*lvReply != 0) ReleasePBI(*lvReply);
  *lvReply=0;
  };
/**/
static int NewCallNumber(lvResult)
  struct CallCount *lvResult;
  {
  DoubleInc(&callSequence, 1);
  Move2(lvResult, &callSequence);
  };

static SetWanting(myPSB)
  int myPSB;
  {
  wanting[myPSB]=true;
  };

static ClearWanting(myPSB)
  int myPSB;
  {
  wanting[myPSB]=false;
  if (waiterPkts[myPSB]) {
    ReleasePBI(waiterPkts[myPSB]);
    waiterPkts[myPSB] = 0;
    };
  };

static struct PBI *MyReceive(myPSB, waitTime)
  int myPSB, waitTime;
  {
  /* Returns NIL if no packet arrives within waitTime pulses after sentTime -- */
  int timer;
    struct PBI *recvd;
  timer=0;
  SetTmr(waitTime, &timer);	/* sentTime is implicit, if SetTimer called quickly enuf. */
  for (;;) {
    recvd = waiterPkts[myPSB];
    if (recvd) {
      waiterPkts[myPSB] = 0;
      return recvd;
      };
    if (TmrExp(&timer)) return 0;
    Block();
    };
  };
      
IdleReceive(pkt, maxLength)
  struct PBI *pkt;
  int maxLength;
  {
  struct PBI *b;
  struct Header *recvdHeader;
  int pktLength;
  idlers += 1;
  while (idlerQ.head == 0) Block(); /* WAIT idlerCond */
  b = (struct PBI *) Dequeue(&idlerQ);
  recvdHeader = b->pup;
  pktLength = (swab(recvdHeader->length)) >> 1;
  if (pktLength <= maxLength) MoveBlock(pkt->pup, recvdHeader, pktLength);
  if (rpct) PutChar('i');
  ReleasePBI(b);
  if (pktLength > maxLength) SIGNAL(ReceivePktTooLong, 0);
  };
  
 /* Current Cedar version (as of 4/1/83) adds AcceptPkt and adds report argument to
 	EnqueueRecvd.  All to get Cedar Spy involved, and to avoid having PupWatch see
 	packets twice.  None of that is necessary here.  This is still compatible with the Cedar 4.0
 	version of Cedar RPC.  See your local Waterlily dif file for future changes that do have
 	to be reflected here. */
int /*BOOL*/
EnqueueRecvd(b)
  struct PBI *b;
  { /* the main scheduler. */
  /* dispatch packet to appropriate process, or something --  */
  /* packet is known to be a PUP addressed to the RPC socket -- */
  struct Header *header;
  int destPSB;
  header = b->pup;
  if (header->destHost.w != myHost.w) return false;
  if ((header->type.fields&subtypeField) != rpc) return false;
  /* << TEMP: Catch aCalls >> */
  if (((header->type.fields&classField)==call) &&
    ((header->type.fields&ackField)!=0)) b=b;
  /* << end TEMP: Catch aCalls >> */
  destPSB = header->destPSB;
  recvd += 1;
  if ((destPSB <= 0) || (destPSB > maxPSB) || (wanting[destPSB] == false)) {
    /* give to idle process to deal with -- */
    /* pkts are dealt with FIFO by idlers, isn't that wonderful? -- */
    if (idlers == 0) return false; /* server too busy, throw away */
    Enqueue(&idlerQ, b);
    if (rpct) PutChar('e');
    idlers -= 1;
    };
  else { /* someone wants this packet: give them it -- */
    if (waiterPkts[destPSB]) return false; /* two packets to same process! */
    waiterPkts[destPSB]=b;
    if (rpct) PutChar('w');
    };
  return true;
  };
      
EnqueueAgain(b)
  struct PBI *b;
  {
  if (EnqueueRecvd(b) == false) {
    ReleasePBI(b);
    if (rpct) PutChar('d');
    };
  };

EnqueueBcst(b)
  struct PBI *b;
  { /* Incoming broadcast dispatcher. */
  struct Header *header;
  int destPSB;
  header = b->pup;
  header->destHost.b.host = myHost.b.host;
  header->destSoc.LS = rpcSocket;
  EnqueueAgain(b);
  };

static int generator;

int NewPSB(context)
  struct RPCCtx *context;
  { /* generates non-null PSB unique to this epoch. */
  if (++generator > maxPSB) SIGNAL(ERROR, 0);
  contexts[generator] = context;
  return generator;
  };
/**/
/* ******** Initialization ******** -- */

RPCInitialize(pCtxQ)
  struct Queue *pCtxQ
  {
  /* do any non-socket specific Pup initialization */
  /* Pulses are 1 ms. long  on this machine!! */
  listenerRunning = (signalTimeout = true);
  minRetransmitPulses = minRetransmitMsecs;
  minPingPulses = minPingMsecs;
  maxPingPulses = maxPingSecs*1000;
  broadcastRetransmissions = 5;
  myCtxQ=pCtxQ;
  SignalInitialize();
  Zero(waiterPkts, maxPSB + 1);
  Zero(wanting, maxPSB + 1);
  Zero(contexts, maxPSB + 1);
  ReceivePktTooLong = CODE();
  };