/* RpcPktStreams.c
   Call-oriented packet streams, based on PktExchange
   Last modified by D. Swinehart, November 11, 1982 10:14 am
   L. Stewart December 27, 1982  3:07 PM, flush nested declarations
   L. Stewart January 1, 1983  4:21 PM, flush AllocZero
 */
/* L. Stewart & DCS, ack debugging, rpct, January 21, 1983  4:33 PM
   DCS, March 31, 1983 11:51 am, match Cedar 4.0 version.
   LCS, DCS, April 14, 1983  4:09 PM, rpc debug, Cedar 4.0 
 */

#include <Queue.h>
#include <Env.h>
#include <Signal.h>
#include <context.h>
#include <Rpc.h>
#include <RpcInternal.h>
#include <RpcLupine.h>
#include <RpcPkt.h>
#include <RPCBind.h>

    /* from RpcPktIO, etc. */
extern struct ExportInstance exportTable[1];
extern int used;	/* # current exports */
extern struct PktConversationID *firstConversation;
extern int GetConnectionState();
extern int IdleReceive();
extern struct Queue *myCtxQ;
extern union Machine myHost;
extern int *mySoc;
extern int PktExchange();
extern int rpct;

    /* from RPCSignals */
extern int CallFailed;
extern int CONT();
extern int RejectUnbound;
extern int RejectProtocol;

    /* from Pup package */
extern SendPup();
extern struct PBI *GetPBI();
extern int DoubleEq();
extern int MultEq();

    /* From Contexts */
extern CurrentContext();
extern Block();

    /* From Queue */
extern Enqueue();
extern int *Unqueue();

    /* From Utils */
extern int DoubleComp();
extern int DoubleDiff();
extern int DoubleInc();
extern Move2();
extern int StringSize();

    /* from OS */
extern int *GetFixed();
extern MoveBlock();
extern Zero();
extern int apply();

static int idlerAckCount;
static int idlerRequeueCount;
int MisusedConversation;
/* Callee statics and structures */

struct ConnectionData {
  struct ConnectionData *next;
  struct ConnectionID id;
  struct CallCount callCt;
  struct ConversationObject *conv; };
 #define lenConnectionData (sizeof(struct ConnectionData)/2)

struct ConnectionData *connections[0];
	/* POINTER TO ARRAY HashKey OF POINTER TO ConnectionData */

struct CalleeState {
  struct CalleeState *next;
  num callee;	/* PSB.PSBIndex */
  struct Header *state; };
 #define lenCalleeState (sizeof(struct CalleeState)/2)

struct Queue callees; /* pointer to CalleeState */
#define serverDataLength maxDataLength
	/* RPCLupine */

/* LookupCaller return codes */
#define lcNew 0
#define lcOld 1
#define lcPhoney 2
#define lcUnknown 3

/* ********** Caller ********** */

/* For each PSB that initiates a call, record last callee PSB, and use
   that PSB as destPSB hint for next call, to obtain implicit ack of
   last result packet.  The fact that the destPSB will be wrong if we next talk to
   a different server host is only a slight pessimization. */

int lastCallDest[0];	/* ARRAY {0..maxPSB;}; OF PSB */

static RecordCallDest(header) struct Header *header;{ lastCallDest[header->destPSB] = header->srcePSB; };

/* During a call, a single packet is used for buffering all data sent
   and received.  Whenever the client of RPCLupine has possession of
   the buffer (after StartCall), the buffer is set up correctly for
   transmitting.  I.e. buffer.header.dest = the remote machine.  Thus,
   this is true on exit from StartCall, and on entry and exit to/from
   Call and the dispatchers.  This
   causes an extra call of SetupResponse in Call in the case where
   there will be no subsequent call of ReceiveExtraPkt, but it
   preserves his sanity.
   .. or does it?  Remove this if never will there be such a call? */

StartCall(callPkt, interface, localConversation)
	struct PBI *callPkt;
	struct ImportInstance *interface;
	struct Conversation *localConversation; {
  int myPSB;
  struct Header *header;
  struct RPCCtx *CtxRunning;
  CtxRunning = CurrentContext();
  myPSB = CtxRunning->user.myPSB;
  header = callPkt->pup;
  header->destHost = interface->host;
  header->destSoc.LS = rpcSocket;
  header->destPSB = lastCallDest[myPSB];
  callPkt->convHandle = localConversation;
  if (localConversation == unencrypted)
  	Move2(&header->conv, firstConversation);
  else {
    /* ?? Move2(&header->conv, GetPktConversation[localConversation]); ?? */
    Move2(&header->conv, &localConversation->id.count);
    if (localConversation->id.originator.w == myHost.w)
    	header->conv.fields &= notCalleeMask; /* header->conv.originator = caller */
    else if (header->destHost.w == localConversation->id.originator.w)
    	header->conv.fields |= calleeField; /* header->conv.originator = callee */
    else ERROR(MisusedConversation); };
  header->pktID.activity = myPSB;
  /* header.pktID.callSeq gets filled in by PktExchange -- */
  header->pktID.pktSeq = 0; /* => new call -- */
  MoveBlock(&header->callSpec.dispatcherDet, &interface->dispatcherDet, lenDispatcherDetails);
  Block(); };

int/* returnLength: */
Call(pkt, callLength, maxReturnLength)
	struct PBI *pkt; int callLength, maxReturnLength; {
  int returnLength;
  struct Header *recvdHeader;
  recvdHeader = pkt->pup;
  returnLength = PktExchange(pkt, callLength, maxReturnLength, calling);
  RecordCallDest(recvdHeader);
  switch (recvdHeader->callSpec.outcome) {
    case unbound: SIGNAL(CallFailed, unbound);
    case result: break;
    case protocol: SIGNAL(CallFailed, runtimeProtocol);
    default: SIGNAL(CallFailed, runtimeProtocol); }; /* call failed, somehow, maybe a signal */
  SetupResponse(recvdHeader);
  if ((recvdHeader->type.fields)&eomField)  SIGNAL(CallFailed, runtimeProtocol);
  return returnLength; };

/* ******** Protocol implementation: callee and packets-while-notWanting ******** */

GenerateIdlerResponse(recvd) struct PBI *recvd; { /* Ack */
  /* Packet is encrypted! */
  struct PBI *ackPkt;
  struct Header *header;
  struct Header *recvdHeader;
  num workerPSB;
  ackPkt = GetPBI(mySoc);
  header = ackPkt->pup;
  recvdHeader = recvd->pup;
  workerPSB = recvdHeader->destPSB; /* as adjusted by FindCallee */
  ++idlerAckCount;
  SetupResponse(recvdHeader);
  MoveBlock(header, recvdHeader, lenHeader);
  header->type.fields = typeAck;
  header->srceHost.w = myHost.w;
  header->srceSoc.LS = rpcSocket;
  header->srcePSB = workerPSB;
  SendPup(ackPkt); Block(); }; /* hit it hard, and wish it well. */

static EnqueueForNewPSB(recvd) struct PBI *recvd; {
	struct PBI *pupPkt;
	struct Header *header, *recvdHeader;
	pupPkt = GetPBI(mySoc);
	header = pupPkt->pup;
   recvdHeader = recvd->pup;
   ++idlerRequeueCount;
   MoveBlock(header, recvdHeader, (Swab(recvdHeader->length)>>1));
   EnqueueAgain(pupPkt); };

/* We must maintain globally accessible state indicating current calls in the
   callee, so that the callee can respond to pings. */

static AddCallee(stateBlock) struct CalleeState *stateBlock; { Enqueue(&callees, stateBlock); };

 static RemoveCallee(stateBlock) struct CalleeState *stateBlock; {
 	if (Unqueue(&callees, stateBlock)==0) SIGNAL(ERROR, 0); Block(); };

static int /* BOOLEAN */
FindCallee(given) struct Header *given; {
/* Returns true iff there is a current callee for this call, 
   even if the callee's pktSeq differs.  If result is true, updates
   "given"s destPSB to match callee's.  Assumes pkt has been decrypted.  */
  struct CalleeState *p;
  struct Header *stateHeader;
  p = (struct CalleeState *) callees.head;
  while (p) {
    stateHeader = p->state;
    if (MultEq(&stateHeader->conv, /* works because callCt follows ConvCt */
	      /* AND same originator, AND same activity, AND same callCt ... */
    	      &given->conv, lenPktConversationID+1+lenCallCount))  {
	  given->destPSB = p->callee;
	  Block();
	  return true; };
    p = p->next; };
  Block();
  return false; };

/* Forget connection state, so that subsequent calls will cause an RFA */
ForgetConnections() {
	int i;
	struct ConnectionData *connection;
	struct ConnectionData *next;
	for (i=0; i<128; ++i) {
		connection = connections[i];
		while (connection) {
			next = connection->next;
/*
			Free(myZone, connection);
 */
			connection = next; };
		connections[i]=0; }; };

/* For each calling RPCPkt.ConnectionID we must maintain a seqence number, being the last call
   initiated on that conversation, so that we can eliminate duplicate call request packets.
   This information is maintained as a hash table with linked overflow.  The hash function is
   (connection.caller XOR connection.activity) MOD 128. The hash table is altered by
   LookupCaller and EndConnection.

   Received packets are dispatched to "ServerMain" processes (through IdleReceive) if the addressed
   process is not wanting to receive any packets at the time, or if the destPSB is PsbNull.  Thus
   ServerMain serves both as the listener waiting for RFC's on a conventional rendezvous
   protocol, and as the process listening to the incoming per-connection socket in more
   heavyweight protocols.  There are several cases.  The packet may be the first packet of a new
   call - in this case, this process will handle the call.  The packet may be an old duplicate packet
   from a dead call - in this case the packet can be ignored.  The packet may be a retransmission
   in a current call - in this case an ack may be required.  Remember that packets can arrive here
   in both the caller and callee hosts! */
/**/
static int RemC(nil1, nil2, seal)
	int nil1, nil2; struct Seal1 *seal; { RemoveCallee(seal->data[0]); return CONTINUE; };

ServerMain() {
  int myPSB;
  struct PBI *myPkt;
  struct Header *recvdHeader;
  struct CalleeState *myStateBlock;
  struct Seal1 s1;
  struct Seal s2;
  struct Seal s4;
  struct Seal1 s3;
  int /* BOOL */ decrypted, newPkt;
  int newLength;
  int ackLen;
  struct ConnectionData *connection;
  struct ConnectionID id;
  int /*BOOL*/ ok;
  struct DispatcherDetails *target;
  struct ExportInstance *instance;
  int resultLength, *rout, hint;
  int argv[4];
  struct CallCount aCall;
  struct Conversation *conv;
  struct RPCCtx *CtxRunning;
  int lll; /* Some sort of length */
  int oldDest; /* PsbIndex */
  int /*BOOL*/ knownCallee;
  CtxRunning = CurrentContext();
  myPSB = CtxRunning->user.myPSB;
  myPkt = GetPBI(mySoc)
  recvdHeader = myPkt->pup;
  myStateBlock = (struct CalleeState *) GetFixed(lenCalleeState);
  myStateBlock->callee = myPSB;
  myStateBlock->state = recvdHeader;
  newPkt = (decrypted = (newLength = 0));
  myStateBlock->next = 0;
  connection = 0;
  s1.data[0] = (s3.data[0] = (int) myStateBlock);
  
/* newPkt = true at top of loop iff we have the first pkt of next call already. */
/* At top of loop, myPkt is decrypted if newPkt = true -- */
/* ... Deal with ABORTED some day */
while (true) {
  DISABLE(&s1); DISABLE(&s2); DISABLE(&s3); DISABLE(&s4);
  if (newPkt == 0)  {
  	IdleReceive(myPkt, maxPupWords);
  	newPkt = true; decrypted = false;
	/* <<TEMP: Catch a Call/Ack Packet!!>> */
	if ((recvdHeader->type.fields&classField) == call
		&& ((recvdHeader->type.fields&ackField) !=0))  newPkt = newPkt; };
  Move2(&id.conv, &recvdHeader->conv);
  id.caller = recvdHeader->srceHost;
  id.activity = recvdHeader->srcePSB;
   
  /* LookupCaller: If pkt starts call and conversationID is unknown, returns "unknown";
     If pkt starts call and isn't duplicate, adds us as callee, returns "new";
     If pkt is part of some previously initiated call, returns "old";
     If pkt is part of some call with unknown ConnectionID, returns "phoney";
     If decrypted pkt is inconsistent, returns "phoney". Otherwise, returns "old".
     On entry, packet has previously been decrypted iff "decrypted".
     On exit if result is "new", pkt is decrypted
     On exit if "decrypted", then myPkt.convHandle is set.
     Note that if result is "old", pkt may or may not be decrypted. */
  /* In the Cedar 4.0 version (as of 4/1/83), there's a Cleanup procedure to deal with the
  	use of spaces to allocate non-MDS buffers.  Not needed here. */
  Block();
  connection = connections[(Swab(id.caller.w ↑ id.activity)) & 127];
  while (connection)  {
    if (MultEq(&id.conv, &connection->id.conv, lenPktConversationID+1) &&
    recvdHeader->srcePSB == connection->id.activity)  {
      myPkt->convHandle = connection->conv;
      if (decrypted == false) {
      	if (connection->conv != unencrypted) {
      		ok = DecryptPkt(recvdHeader, myPkt->convHandle, &newLength);
      		decrypted=true;
      		if (ok==false) goto CallerPhoney; };
      	else { newLength = (Swab(recvdHeader->length) >>1) - pktLengthOverhead;
      			decrypted=true; }; };
      if (recvdHeader->pktID.activity != recvdHeader->srcePSB)  goto CallerPhoney;
      if ((recvdHeader->type.fields&classField) != call)  goto CallerOld;
      if (DoubleComp(&recvdHeader->pktID.callCt, &connection->callCt)>0)  {
        if (recvdHeader->pktID.pktSeq != swapped1)  goto CallerPhoney;
		Move2(&connection->callCt, &recvdHeader->pktID.callCt);
		AddCallee(myStateBlock);
		goto CallerNew; };
      else goto CallerOld; };
      connection = connection->next; };
  if ((recvdHeader->type.fields&classField) == call) goto CallerUnknown; else goto CallerOld;
  SIGNAL(ERROR, 0); /* no where to go! */
  /**/
  /* switch LookupCaller (...) */
      CallerNew: {
		Block();
		target = &recvdHeader->callSpec.dispatcherDet;
		SetupResponse(recvdHeader);
		if (((hint=Swab(target->dispatcherHint)) >= used) ||
		   ((instance = &exportTable[hint]) != instance) ||
		   (!DoubleEq(&target->dispatcherID, &instance->id))) {
			recvdHeader->callSpec.outcome = unbound; resultLength = 0; };
		else {
			rout = instance->dispatcher;
			ENABLE(UNWIND, &RemC, &s3);
			if (ENABLE(RejectUnbound, &CONT, &s4))  {
				recvdHeader->callSpec.outcome = unbound; resultLength = 0; };
			else if (ENABLE(RejectProtocol, &CONT, &s2)) {
				recvdHeader->callSpec.outcome = protocol; resultLength = 0; };
			else if (ENABLE(CallFailed, &RemC, &s1))  { newPkt=false; continue; };
    		/* ***** This here's the RPC ***** */
    		else {
			argv[0] = (int) myPkt;
			argv[1] = newLength;
			argv[2] = (int) connection->conv;
			argv[3] = (int) instance->dispatcherArgs;
			resultLength= apply(argv, rout, 4); };
    		/* *********** */
			DISABLE(&s2); DISABLE(&s3); DISABLE(&s4);
			};
		RemoveCallee(myStateBlock);
        if (ENABLE(CallFailed, &CONT, &s1))  newPkt=false;
		else {
	   		newLength = PktExchange(myPkt, resultLength, serverDataLength, endCall);
	   		newPkt = (newLength >= 0); };
	   	if (newPkt) decrypted = true;
		/* Now newPkt = false or myPkt contains start of new call -- */
		continue;};
    CallerUnknown: {
    	Block();
		ok = (ENABLE(CallFailed, &CONT, &s1) == false && 
	        GetConnectionState(decrypted, myPkt, &id, &aCall, &conv, &lll) == true);
		if (ok) {
			if (newPkt == false) SIGNAL(ERROR, 0);
			if (decrypted == false) { decrypted = true; newLength = lll; };
			NoteConnection(&id, &aCall, conv); };
		else newPkt = false;
		continue ; };
    CallerPhoney: /* ignorable packet */
		newPkt = false; continue;
    CallerOld: {
    	/* Pkt may or may not have been decrypted.  If the packet came to us because it had an
    		incorrect destPSB, we should try correcting it and giving it to the correct process.  This
    		ensures that destPSB is only a hint.  Also, because of the restrictions on generating
    		ack's (described below), there are cases where an ack is required but only the correct
    		worker process is allowed to generate it. */
    	Block();
    	oldDest = recvdHeader->destPSB;
    	knownCallee = ((decrypted == true) && (FindCallee(recvdHeader) != 0));
    	if ((knownCallee==true) && (recvdHeader->destPSB != oldDest)) {
    		/* destPSB his was wrong: requeue pkt for correct process.  Note that if correct process
    			doesn't want the packet right now, it may come back to an idler process, but it will
    			have correct destPSB. */
    		if (decrypted) {
    			if (myPkt->convHandle == unencrypted)
    				ackLen = pktLengthOverhead + newLength;
    			else ackLen = EncryptPkt(myPkt, newLength);
    			recvdHeader->length = Swab(ackLen << 1); };
    		EnqueueForNewPSB(myPkt); };
    	else {
    		/* We're here because the packet doesn't start a new call.  We should respond if the
    			packet is a retransmission or a ping.  We generate an ack only if the packet has
    			eom-end.  Therefore, the last packet in any direction may only be sent when the
    			worker process has generated the ack for the preceding packet in that direction.
    			Therefore, the last packet in any direction comes to an idler process only after the
    			worker process has received a previous transmission of that packet (because of the
    			way "wanting" is set in PktExchange.)  We assume that class=data isn't used for
    			pings.  If we're still working on the call, we generate an ack containing the worker
    			process's PSBIndex.  Beware when caller and callee are on the same host! */
    		if ((recvdHeader->type.fields&ackField) &&
    			((recvdHeader->type.fields&eomField) == 0) &&
    			(((recvdHeader->type.fields&classField) == xtraData) || (knownCallee==true))) {
    				if ((!decrypted) || (myPkt->convHandle == unencrypted))
    					ackLen = pktLengthOverhead;
    				else ackLen = EncryptPkt(myPkt, 0);
    				recvdHeader->length = Swab(ackLen << 1);
    				GenerateIdlerResponse(myPkt); }; };
       		newPkt=false;
		continue; }; }; /* endloop */ };
		
static int one[2];	
static NoteConnection(id, callCt, conv) 
	struct ConnectionID *id;
	struct CallCount *callCt;
	struct Conversation *conv; {
  struct ConnectionData *dataPtr;
  struct ConnectionData *connection;
  dataPtr =
	(struct ConnectionData *) &connections[(Swab(id->caller.w ↑ id->activity)) & 127];
  connection = dataPtr->next;
  while (connection)   { /* SELECT TRUE FROM .. */
    if (MultEq(id, &connection->id, lenConnectionID))  return; /* Already there!? -- */
    dataPtr = connection;
    connection = dataPtr->next; }; /* endloop */
  connection = (struct ConnectionData *) GetFixed(lenConnectionData);
  connection->next = 0;
  MoveBlock(&connection->id, id, lenConnectionID);
  Move2(&connection->callCt, callCt);
  DoubleDiff(&connection->callCt, one);
  connection->conv = conv;
  dataPtr->next = connection;  Block(); };

 SetupResponse(header) struct Header *header; {
  header->destHost = header->srceHost;
  header->destSoc.LS = rpcSocket;
  header->destPSB = header->srcePSB;
  header->callSpec.outcome = result; Block(); };

StreamInitialize() {
  one[0] = swapped1;
  lastCallDest = GetFixed(maxPSB+1);
  connections = (struct ConnectionData **) GetFixed(128);
  MisusedConversation = CODE(); };
  
StreamRestart() {ForgetConnections()};