-- RPC: Reliable transmission and reception of packets

-- RPCPktIO.mesa

-- Andrew Birrell March 15, 1983 3:27 pm

DIRECTORY
BufferDefs   USING[ PupBuffer ],
CedarSnapshot  USING[ After, Register ],
CommUtilDefs  USING[ AllocateIocbs ],
DriverDefs   USING[ ChangeNumberOfInputBuffers ],
DriverTypes   USING[ Encapsulation, ethernetEncapsulationBytes,
         ethernetEncapsulationOffset ],
EthernetOneFace USING[ ControlBlock, ControlBlockRecord, controlBlockSize,
         DeviceHandle, GetNextDevice, GetStatus, hearSelf,
         nullDeviceHandle, QueueOutput ],
Frame    USING[ Free ],
Heap     USING[ systemZone ],
Inline    USING[ LongCOPY ],
Process    USING[ Detach, MsecToTicks, Pause, Yield ],
ProcessOperations USING[ HandleToIndex, ReadPSB ],
PSB     USING[ PsbIndex, PsbNull ],
PupDefs    USING[ GetFreePupBuffer, GetHopsToNetwork, GetLocalPupAddress,
         PupAddress, PupPackageMake, PupRouterSendThis,
         PupSocket, PupSocketDestroy, PupSocketMake,
         ReturnFreePupBuffer, veryLongWait],
PupTypes   USING[ PupAddress, PupHostID, PupNetID, PupSocketID ],
MesaRPC   USING[ CallFailed, unencrypted ],
RPCInternal   USING[ DecryptPkt, DoSignal, EncryptPkt, ReplyToRFA,
         RPCBinding, RPCPktStreams, RPCSecurity, ServerMain ],
RPCLupine   USING[ DataLength, Dispatcher, Header, RPCPkt ],
RPCPkt    USING[ CallCount, Header, Machine,
         PktExchangeState, PktID, pktLengthOverhead ],
RPCPrivate   USING[ GetRPCPackets, rpcSocket, ReturnBuffer ],
RPCWatch   USING[],
Runtime    USING[ IsBound ],
System    USING[ GetClockPulses, MicrosecondsToPulses, Pulses ];

RPCPktIO: MONITOR
IMPORTS CedarSnapshot, CommUtilDefs, DriverDefs, EthernetOneFace, Frame, Heap, Inline,
Process, ProcessOperations, PupDefs, MesaRPC, RPCInternal,
RPCPrivate, Runtime, System
EXPORTS RPCLupine--Encapsulation,Header--, RPCPkt--PktExchange,IdleReceive--,
RPCWatch--SetSpyProc--
SHARES BufferDefs, RPCLupine =

BEGIN

Encapsulation: PUBLIC TYPE = DriverTypes.Encapsulation;

Header: PUBLIC TYPE = RPCPkt.Header;

ConcreteHeader: PROC[abstract: LONG POINTER TO RPCLupine.Header]
RETURNS[LONG POINTER TO Header] = INLINE
{ RETURN[ abstract ] };

callSequence: RPCPkt.CallCount ← 0; -- monotonic from this host --



-- ******** IOCB management ******** --

-- IOCB's are kept on free chain. Allocation and freeing are imbedded in
-- "PktExchange" procedure.
-- IOCB's are allocated in first 64K of memory, in resident memory.
-- Use "ControlBlock" proc to get pointer to give to EthernetFace.

IOCBrecord: TYPE = RECORD[next: IOCB];
IOCB: TYPE = LONG POINTER TO IOCBrecord;
iocbSize: CARDINAL = MAX[EthernetOneFace.controlBlockSize, SIZE[IOCBrecord]];

freeIOCBs: IOCBNIL;



-- ******** sending and receiving packets ******** --

firstDevice: EthernetOneFace.DeviceHandle =
EthernetOneFace.GetNextDevice[EthernetOneFace.nullDeviceHandle];

myAddr: PupTypes.PupAddress;
myNet: PupTypes.PupNetID;
myDeviceHost: PupTypes.PupHostID;
myHost: RPCPkt.Machine;

sent: CARDINAL ← 0;
recvd: CARDINAL ← 0;
retransmitted: CARDINAL ← 0;

-- "PktExchange" implements a reliable packet exchange. There are five cases:
-- sending: transmit data, wait for ack
-- receiving: transmit ack, wait for data
-- call: transmit data, wait for data
-- endCall: transmit data, wait for ack or data (data => start of new call)
-- authReq: transmit RFA, wait for data (like "call", but no retransmissions)
-- Data and RFA packets are retransmitted until an ack is received.
-- Ack packets are retransmitted only in response to "pleaseAck" requests
-- No acknowledgement after 14 transmissions is fatal (CallFailed[timeout])
-- When the transmitted packet has been acknowledged (if needed), a ping is
-- sent periodically (ack packet saying pleaseAck). If necessary the ping is
-- retransmitted until it has been acked. Failure to receive an ack for the ping
-- is fatal (CallFailed[timeout]). Provided pings continue to be acknowledged,
-- there is no limit on how long we will wait for the next data packet. If the
-- client gets impatient, he can abort this process. (This corresponds to the
-- local-machine semantics of a procedure call.)

minRetransmitMsecs: CARDINAL = 100; -- retransmit delay for local net --
msecsPerHop:   CARDINAL = 500; -- approximate typical gateway hop delay? --
minPingMsecs:   CARDINAL = 5000; -- initial interval between pings --
maxPingSecs:   CARDINAL = 300; -- long-term ping interval --
maxTransmissions:  CARDINAL ← 20;  -- give up after too many transmissions --
signalTimeout:   BOOLEANTRUE; -- debugging switch --

-- The retransmission delay is incremented by minPingMsecs each time we timeout,
-- but is reset when we receive the desired packet. If n=maxTransmissions
-- and i=minRetransmitMsecs and j=hops*msecsPerHop, we give up after
-- i*(n*n+n)/2+n*j msecs.
-- For the local network, that comes to 21 seconds.
-- For a two-hop route, that comes to 41 seconds.
-- The ping delay is doubled at each ping, up to maxPingSecs, which is 5 minutes;
-- The maxPingSecs value is reached after about 5 minutes.

minRetransmitPulses: System.Pulses =
System.MicrosecondsToPulses[LONG[1000] * minRetransmitMsecs];
pulsesPerHop: System.Pulses =
System.MicrosecondsToPulses[LONG[1000] * msecsPerHop];
minPingPulses: System.Pulses =
System.MicrosecondsToPulses[LONG[1000] * minPingMsecs];
maxPingPulses: System.Pulses =
System.MicrosecondsToPulses[LONG[1000]*LONG[1000] * maxPingSecs];
transmitLocalPkts: BOOLEANTRUE;


-- Constants for WaitUntilSent (inside PktExchange) --

shortWait: System.Pulses = -- 10 msecs --
System.MicrosecondsToPulses[LONG[1000] * 10];
longerWait: System.Pulses = -- 5 secs --
System.MicrosecondsToPulses[LONG[1000]*LONG[1000] * 5];


PktExchange: PUBLIC PROC[inPkt: RPCLupine.RPCPkt, length: RPCLupine.DataLength,
maxlength: RPCLupine.DataLength,
state: RPCPkt.PktExchangeState,
signalHandler: RPCLupine.Dispatcher ← NIL]
RETURNS[newPkt: BOOLEAN, newLength: RPCLupine.DataLength] =
BEGIN
-- On exit if newPkt=TRUE, the packet has been decrypted if needed. --

-- Normally, transmits inPkt and copies result into inPkt.
-- If a signal occurs, calls DoSignal which handles the signal protocol
-- up to the last resumption packet. DoSignal returns the last resume
-- packet for us to transmit, which we do by assigning it to outPkt;
-- that packet was allocated in DoSignal's local frame, which we must
-- later deallocate.
outPkt: RPCLupine.RPCPkt ← inPkt; -- altered after a signal --
outPktFrame: POINTERNIL; -- DoSignal's local frame --

DO -- loop for signal handlers --

sentTime: System.Pulses; -- initialized after sending any packet --

iocb: IOCBNIL;
-- IOCB management procs are imbedded for efficiency! --

GetIOCB: ENTRY PROC = INLINE
BEGIN
IF freeIOCBs # NIL
THEN { iocb ← freeIOCBs; freeIOCBs ← freeIOCBs.next }
ELSE { iocb ← CommUtilDefs.AllocateIocbs[iocbSize] }
END;

ControlBlock: PROC RETURNS[ EthernetOneFace.ControlBlock ] = INLINE
{ RETURN[LOOPHOLE[iocb]] };

ReturnIOCB: PROC = INLINE
BEGIN
InnerReturn: ENTRY PROC = INLINE
{ iocb.next ← freeIOCBs; freeIOCBs ← iocb };
IF iocb # NIL THEN { WaitUntilSent[]; InnerReturn[] };
END;

WaitUntilSent: PROC = INLINE
BEGIN
-- Horrible! We must ensure that the ethernet microcode isn't still
-- using our IOCB when we free or re-use it. There are two problems:
-- (a) a suitable response coming in before we've finished
-- retransmitting our packet;
-- (b) the standard EthernetOneDriver resetting the microcode
-- (so it forgets our request).
-- To handle (a) we busy-wait for up to 10msec, then wait for single
-- ticks thereafter; to handle (b) we assume our iocb is no longer
-- being used after it's pending for 5 secs. To do better seems to
-- require changes to the EthernetOneFace.
WHILE EthernetOneFace.GetStatus[ControlBlock[]] = pending
DO IF System.GetClockPulses[] - sentTime < shortWait
THEN Process.Yield[]
ELSE IF System.GetClockPulses[] - sentTime < longerWait
THEN Process.Pause[1]
ELSE EXIT -- assume someone reset the microcode --;
ENDLOOP;
END;

NewCallNumber: ENTRY PROC RETURNS[RPCPkt.CallCount] = INLINE
{ RETURN[ callSequence ← callSequence+1 ] };

reply: BufferDefs.PupBuffer;
recvdHeader: LONG POINTER TO Header;
myPSB: PSB.PsbIndex =
ProcessOperations.HandleToIndex[ProcessOperations.ReadPSB[]];
acked: BOOLEAN;
thisPktID: RPCPkt.PktID;
header: LONG POINTER TO Header = @outPkt.header;
localHost: BOOLEAN = header.destHost = myHost;
localNet: BOOLEAN = header.destHost.net = myNet;
pingPulses: System.Pulses ← minPingPulses;
header.srceHost ← myHost;
header.srceSoc ← RPCPrivate.rpcSocket;
header.srcePSB ← myPSB;
IF header.pktID.pktSeq = 0 -- first packet of a call; yucky interface! --
THEN BEGIN
header.type ← SELECT state FROM
sending => [0, rpc, notEnd, pleaseAck, call],
call => [0, rpc, end, dontAck, call],
authReq => [0, rpc, end, dontAck, rfa],
ENDCASE => --receiving, endCall-- ERROR;
header.pktID.callSeq ← NewCallNumber[];
header.pktID.pktSeq ← 1;
acked ← FALSE;
END
ELSE BEGIN
header.type ← SELECT state FROM
sending => [0, rpc, notEnd, pleaseAck, data],
receiving => [0, rpc, end, dontAck, ack],
call => [0, rpc, end, dontAck, data],
endCall => [0, rpc, end, dontAck, data],
ENDCASE => --authReq-- ERROR;
IF state # receiving --header.type.class = data --
THEN { header.pktID.pktSeq ← header.pktID.pktSeq+1;
acked←FALSE }
ELSE acked ← TRUE;
END;
thisPktID ← header.pktID;
SetWanting[myPSB];

DO -- loop for pings --
ENABLE UNWIND => { ClearWanting[myPSB]; ReturnIOCB[];
IF outPktFrame # NIL THEN Frame.Free[outPktFrame] };
BEGIN
transmissions: CARDINAL ← 0;
retransmitPulses: System.Pulses ←
IF localNet
THEN minRetransmitPulses
ELSE [minRetransmitPulses +
pulsesPerHop * PupDefs.GetHopsToNetwork[header.destHost.net]];

IF outPkt.convHandle # MesaRPC.unencrypted
THEN header.length ← RPCInternal.EncryptPkt[outPkt, length]
ELSE header.length ← length + RPCPkt.pktLengthOverhead;
header.oddByte ← no;

DO -- loop for retransmissions --
BEGIN
IF NOT localNet
THEN GeneralSend[outPkt]
ELSE BEGIN
IF localHost
AND NOT( EthernetOneFace.hearSelf AND transmitLocalPkts )
THEN LocalSend[outPkt];
IF NOT localHost OR transmitLocalPkts
THEN BEGIN
IF iocb = NIL
THEN GetIOCB[]
ELSE WaitUntilSent[];
   -- Pup checksum --
outPkt.data[header.length-RPCPkt.pktLengthOverhead] ← 177777B;
outPkt.encapsulation ← Encapsulation[ethernetOne[,,,,,,
header.destHost.host, myDeviceHost, pup]];
EthernetOneFace.QueueOutput[firstDevice,
@outPkt.encapsulation + DriverTypes.ethernetEncapsulationOffset,
header.length + (1+DriverTypes.ethernetEncapsulationBytes)/2,
ControlBlock[]]
END;
END;
sentTime ← System.GetClockPulses[];
sent ← sent+1;
-- wait for response: an ack or the reply or a timeout --

DO -- loop for each incoming packet (including garbage) --
reply ← MyReceive[myPSB, sentTime,
(IF acked THEN pingPulses ELSE retransmitPulses)];
IF reply = NIL
THEN IF acked
THEN GOTO ping
ELSE { header.type.ack ← pleaseAck; GOTO retransmit };
recvdHeader ← LOOPHOLE[@reply.pupLength];
IF recvdHeader.type.class = rfa
THEN BEGIN
IF RPCInternal.ReplyToRFA[reply,
header,--encrypted--
thisPktID--clear--,
inPkt.convHandle]
THEN NULL-- can't set "acked", because we must retransmit our data
-- packet until we obtain the correct destPSB from some ack pkt --;
LOOP
END;
IF outPkt.convHandle # MesaRPC.unencrypted
AND recvdHeader.conv = header.conv
AND recvdHeader.srceHost = header.destHost
THEN [,newLength] ← RPCInternal.DecryptPkt[recvdHeader, outPkt.convHandle]
ELSE newLength ← recvdHeader.length - RPCPkt.pktLengthOverhead;
IF recvdHeader.conv = header.conv
AND recvdHeader.srceHost = header.destHost
AND recvdHeader.pktID.activity = thisPktID.activity
THEN -- pkt is related to our call --
SELECT TRUE FROM
recvdHeader.pktID.callSeq = thisPktID.callSeq =>
-- pkt is in our call --
SELECT TRUE FROM
-- a) pkt has next sequence number to ours --
recvdHeader.pktID.pktSeq = 1+thisPktID.pktSeq =>
BEGIN
IF state = sending OR state = endCall
THEN -- he's not allowed to generate that pktSeq! --
ERROR MesaRPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply] ];
SELECT recvdHeader.type.class FROM
data => GOTO done;
ack =>
BEGIN
-- This can happen if state=call and callee sent next data
-- pkt, but it was lost and now he is responding to our
-- retransmission or ping. Soon, he will retransmit his
-- data pkt. --

acked ← TRUE;
GiveBackBuffer[reply];
END;
ENDCASE => --call,rfa--
ERROR MesaRPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply] ];
END;
-- b) pkt has same sequence number as ours --
recvdHeader.pktID.pktSeq = thisPktID.pktSeq =>
BEGIN
SELECT recvdHeader.type.class FROM
ack => -- acknowledgement of our packet --
BEGIN
IF header.type.class = call
THEN header.destPSB ← recvdHeader.srcePSB;
acked ← TRUE;
IF state = sending OR state = endCall
THEN {GiveBackBuffer[reply]; reply←NIL; GOTO done}
ELSE -- state = call, authReq, or receiving --
-- Even if "pleaseAck", we don't need to ack it,
-- because other end should send data pkt soon --

GiveBackBuffer[reply];
END;
data, call => -- retransmisson of his packet --
IF state = receiving
THEN IF recvdHeader.type.ack = pleaseAck
THEN BEGIN
GiveBackBuffer[reply]; reply ← NIL;
GOTO retransmit -- we're already sending an ack --
END
ELSE GiveBackBuffer[reply]
ELSE ERROR MesaRPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply] ];
ENDCASE => --rfa --
ERROR MesaRPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply] ];
END;
-- c) pkt has earlier sequence number than ours --
recvdHeader.pktID.pktSeq < thisPktID.pktSeq =>
GiveBackBuffer[reply]; -- no need to ack it --
-- d) pkt has some future sequence number --
ENDCASE =>
ERROR MesaRPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply] ];
recvdHeader.pktID.callSeq > thisPktID.callSeq AND state=endCall =>
BEGIN
IF recvdHeader.type.class # call
THEN ERROR MesaRPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply] ];
-- acks our last packet, so we can handle the call --
GOTO done
END
ENDCASE => -- wrong call: let someone else do it --
{ recvdHeader.destPSB ← PSB.PsbNull; EnqueueAgain[reply] }
ELSE -- wrong conversation or activity: let someone else do it --
{ recvdHeader.destPSB ← PSB.PsbNull; EnqueueAgain[reply] };
-- Incoming RFA packets don't reach here. --
ENDLOOP--for each incoming packet--;
EXITS
retransmit =>
BEGIN
transmissions ← transmissions+1;
IF (transmissions = maxTransmissions AND signalTimeout)
OR state = authReq
-- Don't retransmit RFA: caller will retransmit call pkt.
-- Otherwise, if a spurious worker process occurred because of call pkt
-- retransmission before response to RFA, then the spurious worker
-- process sits needlessly retransmitting RFA's until it times out.
THEN { SIGNAL MesaRPC.CallFailed[timeout]; transmissions ← 0 };
retransmitted ← retransmitted+1;
retransmitPulses ← [retransmitPulses + minRetransmitPulses];
END;
END;
ENDLOOP-- for each transmission --;
EXITS
ping =>
BEGIN
header.type ← [0, rpc, end, pleaseAck, ack];
length ← 0; header.pktID ← thisPktID;
acked ← FALSE;
pingPulses ← [MIN[maxPingPulses, pingPulses * 2]];
END;
END;
-- only exit from loop is "GOTO done" or UNWIND --
REPEAT done =>
BEGIN
-- This isn't covered by any UNWIND --
ClearWanting[myPSB]; ReturnIOCB[];
IF outPktFrame # NIL THEN Frame.Free[outPktFrame];
IF reply = NIL
THEN { --restore clear pktID-- header.pktID ← thisPktID; RETURN[FALSE,] }
ELSE BEGIN
IF recvdHeader.outcome = signal AND state = call
AND signalHandler # NIL
THEN [outPkt, length, outPktFrame] ←
RPCInternal.DoSignal[reply, newLength, signalHandler, inPkt.convHandle]
ELSE BEGIN
IF newLength > maxlength
THEN ERROR MesaRPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply]];
Inline.LongCOPY[from: recvdHeader, to: @inPkt.header,
nwords: recvdHeader.length];
GiveBackBuffer[reply];
RETURN[TRUE,newLength]
END;
END
END;
ENDLOOP-- for each ping--;
-- We get here only after coming back from a call of DoSignal --
ENDLOOP-- for signal handlers --;
-- we can't get here! --
END;

GeneralSend: PROC[pkt: RPCLupine.RPCPkt] =
BEGIN
b: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[];
Inline.LongCOPY[from: @(pkt.header), to: @(b.pupLength),
nwords: ConcreteHeader[@pkt.header].length];
PupDefs.PupRouterSendThis[b];
END;

LocalSend: PROC[pkt: RPCLupine.RPCPkt] =
BEGIN
-- handle talking to ourselves if the ethernet head can't --
b: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[];
Inline.LongCOPY[from: @(pkt.header), to: @(b.pupLength),
nwords: ConcreteHeader[@pkt.header].length];
IF NOT AcceptPkt[b] THEN PupDefs.ReturnFreePupBuffer[b];
END;


idlerPkt: BufferDefs.PupBuffer ← NIL;
idlerCond: CONDITION ← [timeout:0];
waiterCond: CONDITION ← [timeout:Process.MsecToTicks[minRetransmitMsecs]];
WaiterArray: TYPE = ARRAY PSB.PsbIndex OF BufferDefs.PupBuffer;
waiterPkts: LONG POINTER TO WaiterArray ←
Heap.systemZone.NEW[WaiterArray ← ALL[NIL]];
wanting: PACKED ARRAY PSB.PsbIndex OF BOOLEANALL[FALSE]; -- PSB expects a pkt --

SetWanting: ENTRY PROC[myPSB: PSB.PsbIndex] = INLINE
{ wanting[myPSB] ← TRUE };

ClearWanting: PROC[myPSB: PSB.PsbIndex] = INLINE
BEGIN
spare: BufferDefs.PupBuffer;
InnerClear: ENTRY PROC = INLINE
BEGIN
wanting[myPSB] ← FALSE;
IF (spare ← waiterPkts[myPSB]) # NIL THEN waiterPkts[myPSB] ← NIL;
END;
InnerClear[];
IF spare # NIL THEN GiveBackBuffer[spare] --ignore it, outside monitor--;
END;


MyReceive: ENTRY PROC[myPSB: PSB.PsbIndex, sentTime, waitTime: System.Pulses]
RETURNS[recvd: BufferDefs.PupBuffer] = INLINE
BEGIN
ENABLE UNWIND => NULL;
-- Returns NIL if no packet arrives within waitTime pulses after sentTime --
DO IF (recvd ← waiterPkts[myPSB]) = NIL
THEN IF System.GetClockPulses[] - sentTime < waitTime
THEN WAIT waiterCond
ELSE EXIT
ELSE { waiterPkts[myPSB] ← NIL; EXIT };
ENDLOOP;
END;

RecvdPktTooLong: ERROR = CODE;
KillServer: ERROR = CODE;
servers: CARDINAL ← 0;
serversMax: CARDINAL ← 20;
idlers: CARDINAL ← 0;
idlersMax: CARDINAL = 6;
idlersMin: CARDINAL = 2;
-- Number of server processes will never exceed "serversMax".
-- If number of idle servers exceeds "idlersMax", one will abort.
-- If number of idle servers drops below "idlersMin", one is forked.

IdleReceive: PUBLIC PROC[pkt: RPCLupine.RPCPkt, maxlength: CARDINAL] =
BEGIN
b: BufferDefs.PupBuffer;
InnerIdler: ENTRY PROC = INLINE
BEGIN
IF idlers >= idlersMax
THEN { servers←servers-1; RETURN WITH ERROR KillServer[] };
idlers ← idlers + 1;
DO WAIT idlerCond; IF idlerPkt # NIL THEN EXIT ENDLOOP;
IF idlers < idlersMin AND servers < serversMax
THEN { servers←servers+1; Process.Detach[FORK Server[]] };
b ← idlerPkt;
BEGIN
recvdHeader: LONG POINTER TO Header = LOOPHOLE[@b.pupLength];
idlerPkt ← LOOPHOLE[idlerPkt.next,BufferDefs.PupBuffer]; b.next ← NIL;
IF recvdHeader.length > maxlength
THEN ERROR RecvdPktTooLong[] --NULL??--
ELSE Inline.LongCOPY[from: @b.pupLength, to: @pkt.header,
nwords: recvdHeader.length];
END;
END;
InnerIdler[];
GiveBackBuffer[b];--outside monitor--
END;

QueuesScrambled: ERROR = CODE;

spyProc: PROC[BufferDefs.PupBuffer] ← NIL;

SetSpyProc: PUBLIC ENTRY PROC[p: PROC[BufferDefs.PupBuffer]] =
{ spyProc ← p };

EnqueueRecvd: ENTRY PROC[b: BufferDefs.PupBuffer, report: BOOLEAN]
RETURNS[BOOLEAN] =
BEGIN
-- despatch packet to appropriate RPC process, if any --
-- Packet is known to be a Pup, and is addressed to our socket. --
header: LONG POINTER TO Header = LOOPHOLE[@b.pupLength];
IF report AND spyProc # NIL THEN spyProc[b];
IF header.destHost = myHost AND header.type.subType = rpc
THEN BEGIN
destPSB: PSB.PsbIndex = header.destPSB;
recvd ← recvd+1;
IF destPSB NOT IN (PSB.PsbNull..LAST[PSB.PsbIndex]]
OR NOT wanting[destPSB]
THEN -- give to idle process to deal with --
-- Pkts are dealt with LIFO by idlers, but it doesn't matter (much)
BEGIN
IF idlers = 0
THEN -- server too busy: throw it away! --
RETURN[FALSE]
ELSE { b.next ← idlerPkt; idlerPkt ← b;
idlers ← idlers-1; NOTIFY idlerCond };
END
ELSE -- someone wants this packet: give them it --
BEGIN
IF waiterPkts[destPSB] # NIL THEN RETURN[FALSE];
waiterPkts[destPSB] ← b; BROADCAST waiterCond;
END;
RETURN[TRUE]
END
ELSE RETURN[FALSE];
END;

GiveBackBuffer: PROC[b: BufferDefs.PupBuffer] =
-- NOTE: calls of this must be made outside our monitor, because
-- RPCPrivate.ReturnBuffer acquires the EthernetDriver monitor,
-- and the EthernetDriver may call EnqueueRecvd which acquires
-- our monitor!
IF Runtime.IsBound[RPCPrivate.ReturnBuffer]
THEN RPCPrivate.ReturnBuffer
ELSE PupDefs.ReturnFreePupBuffer;


AcceptPkt: PROC[b: BufferDefs.PupBuffer] RETURNS[BOOLEAN] =
{ RETURN[ EnqueueRecvd[b: b, report: TRUE] ] };

EnqueueAgain: PUBLIC PROC[b: BufferDefs.PupBuffer] =
{ IF NOT EnqueueRecvd[b: b, report: FALSE] THEN GiveBackBuffer[b] };


Listener: PROC =
BEGIN
-- Catch any packets that get as far as the Pup socket level --
soc: PupDefs.PupSocket = PupDefs.PupSocketMake[local: RPCPrivate.rpcSocket,
remote:,
ticks: PupDefs.veryLongWait];
DO ENABLE ABORTED => EXIT;
b: BufferDefs.PupBuffer = soc.get[];
IF NOT AcceptPkt[b] THEN PupDefs.ReturnFreePupBuffer[b];
ENDLOOP;
PupDefs.PupSocketDestroy[soc];
END;

listenerProcess: PROCESS;

Server: PROC =
BEGIN
RPCInternal.ServerMain[ ! KillServer => CONTINUE ];
END;




-- ******** Initialization ******** --

Initialize: ENTRY PROC =
BEGIN
PupDefs.PupPackageMake[];
DriverDefs.ChangeNumberOfInputBuffers[TRUE];
myAddr ← PupDefs.GetLocalPupAddress[RPCPrivate.rpcSocket,NIL];
myNet ← myAddr.net;
myDeviceHost ← myAddr.host;
myHost ← [myNet, myAddr.host];
START RPCInternal.RPCBinding; -- exports "RPCInternal.exportTable" --
START RPCInternal.RPCSecurity; -- exports "RPCInternal.firstConversation" --
START RPCInternal.RPCPktStreams; -- initialize connection states --
CedarSnapshot.Register[r: Rollback];
servers ← servers+1; Process.Detach[FORK Server[]];
listenerProcess ← FORK Listener[];
RPCPrivate.GetRPCPackets[AcceptPkt];
END;

Restart: INTERNAL PROC =
BEGIN
RESTART RPCInternal.RPCPktStreams; -- forget connection counts --
RESTART RPCInternal.RPCSecurity; -- invalidate local conversations --
RESTART RPCInternal.RPCBinding; -- nothing? --
END;

Rollback: ENTRY PROC[after: CedarSnapshot.After] =
{ IF after = rollback THEN Restart[] };

Initialize[];

END.