RPC: Reliable transmission and reception of packets
RPCPktIO.mesa
Andrew Birrell October 24, 1983 4:26 pm
DIRECTORY
BasicTime USING[ GetClockPulses, MicrosecondsToPulses, Pulses ],
Booting USING[ CheckpointProc, RegisterProcs, RollbackProc ],
BufferDefs USING[ PupBuffer ],
CommUtilDefs USING[ AllocateIocb ],
DriverDefs USING[ ChangeNumberOfInputBuffers ],
DriverTypes USING[ Encapsulation, ethernetEncapsulationBytes, ethernetEncapsulationOffset ],
EthernetOneFace USING[ ControlBlock, ControlBlockRecord, controlBlockSize, DeviceHandle, GetNextDevice, GetStatus, hearSelf, nullDeviceHandle, QueueOutput ],
PrincOpsUtils USING[ Free, IsBound, LongCOPY, PsbHandleToIndex, ReadPSB ],
Process USING[ Detach, MsecToTicks, Pause, Yield ],
PrincOps USING[ PsbIndex, PsbNull ],
PupDefs USING[ AnyLocalPupAddress, GetFreePupBuffer, GetHopsToNetwork, PupAddress, PupPackageMake, PupRouterSendThis, PupSocket, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, veryLongWait],
PupTypes USING[ PupAddress, PupHostID, PupNetID, PupSocketID ],
RPC USING[ CallFailed, unencrypted ],
RPCInternal USING[ DecryptPkt, DoSignal, EncryptPkt, ReplyToRFA, RPCBinding, RPCPktStreams, RPCSecurity, ServerMain ],
RPCLupine USING[ DataLength, Dispatcher, Header, RPCPkt ],
RPCPkt USING[ CallCount, Header, Machine, PktExchangeState, PktID, pktLengthOverhead ],
RPCPrivate USING[ GetRPCPackets, rpcSocket, ReturnBuffer ],
RPCWatch USING[];
RPCPktIO: MONITOR
IMPORTS BasicTime, Booting, CommUtilDefs, DriverDefs, EthernetOneFace, PrincOpsUtils, Process, PupDefs, RPC, RPCInternal, RPCPrivate
EXPORTS RPCLupine--Encapsulation,Header--, RPCPkt--PktExchange,IdleReceive--, RPCWatch--SetSpyProc--
SHARES BufferDefs, RPCLupine =
BEGIN
Encapsulation: PUBLIC TYPE = DriverTypes.Encapsulation;
Header: PUBLIC TYPE = RPCPkt.Header;
ConcreteHeader: PROC[abstract: LONG POINTER TO RPCLupine.Header]
RETURNS[LONG POINTER TO Header] = INLINE
{ RETURN[ abstract ] };
callSequence: RPCPkt.CallCount ← 0; -- monotonic from this host --
******** IOCB management ********
IOCB's are kept on free chain. Allocation and freeing are imbedded in "PktExchange" procedure.
IOCB's are allocated in first 64K of memory, in resident memory.
Use "ControlBlock" proc to get pointer to give to EthernetFace.
IOCBrecord: TYPE = RECORD[next: IOCB];
IOCB: TYPE = LONG POINTER TO IOCBrecord;
iocbSize: CARDINAL = MAX[EthernetOneFace.controlBlockSize, SIZE[IOCBrecord]];
freeIOCBs: IOCBNIL;
******** sending and receiving packets ********
firstDevice: EthernetOneFace.DeviceHandle =
EthernetOneFace.GetNextDevice[EthernetOneFace.nullDeviceHandle];
myAddr: PupTypes.PupAddress;
myNet: PupTypes.PupNetID;
myDeviceHost: PupTypes.PupHostID;
myHost: RPCPkt.Machine;
sent: CARDINAL ← 0;
recvd: CARDINAL ← 0;
retransmitted: CARDINAL ← 0;
"PktExchange" implements a reliable packet exchange. There are five cases:
sending: transmit data, wait for ack
receiving: transmit ack, wait for data
call: transmit data, wait for data
endCall: transmit data, wait for ack or data (data => start of new call)
authReq: transmit RFA, wait for data (like "call", but no retransmissions)
Data and RFA packets are retransmitted until an ack is received. Ack packets are retransmitted only in response to "pleaseAck" requests. No acknowledgement after 14 transmissions is fatal (CallFailed[timeout]). When the transmitted packet has been acknowledged (if needed), a ping is sent periodically (ack packet saying pleaseAck). If necessary the ping is retransmitted until it has been acked. Failure to receive an ack for the ping is fatal (CallFailed[timeout]). Provided pings continue to be acknowledged, there is no limit on how long we will wait for the next data packet. If the client gets impatient, he can abort this process. (This corresponds to the local-machine semantics of a procedure call.)
minRetransmitMsecs: CARDINAL = 100; -- retransmit delay for local net --
msecsPerHop: CARDINAL = 500; -- approximate typical gateway hop delay? --
minPingMsecs: CARDINAL = 5000; -- initial interval between pings --
maxPingSecs: CARDINAL = 300; -- long-term ping interval --
maxTransmissions: CARDINAL ← 20; -- give up after too many transmissions --
signalTimeout: BOOLEANTRUE; -- debugging switch --
The retransmission delay is incremented by minPingMsecs each time we timeout, but is reset when we receive the desired packet. If n=maxTransmissions and i=minRetransmitMsecs and j=hops*msecsPerHop, we give up after i*(n*n+n)/2+n*j msecs. For the local network, that comes to 21 seconds. For a two-hop route, that comes to 41 seconds. The ping delay is doubled at each ping, up to maxPingSecs, which is 5 minutes. The maxPingSecs value is reached after about 5 minutes.
minRetransmitPulses: BasicTime.Pulses =
BasicTime.MicrosecondsToPulses[LONG[1000] * minRetransmitMsecs];
pulsesPerHop: BasicTime.Pulses =
BasicTime.MicrosecondsToPulses[LONG[1000] * msecsPerHop];
minPingPulses: BasicTime.Pulses =
BasicTime.MicrosecondsToPulses[LONG[1000] * minPingMsecs];
maxPingPulses: BasicTime.Pulses =
BasicTime.MicrosecondsToPulses[LONG[1000]*LONG[1000] * maxPingSecs];
transmitLocalPkts: BOOLEANTRUE;
Constants for WaitUntilSent (inside PktExchange)
shortWait: BasicTime.Pulses = -- 10 msecs --
BasicTime.MicrosecondsToPulses[LONG[1000] * 10];
longerWait: BasicTime.Pulses = -- 5 secs --
BasicTime.MicrosecondsToPulses[LONG[1000]*LONG[1000] * 5];
PktExchange: PUBLIC PROC[inPkt: RPCLupine.RPCPkt, length: RPCLupine.DataLength,
maxlength: RPCLupine.DataLength,
state: RPCPkt.PktExchangeState,
signalHandler: RPCLupine.Dispatcher ← NIL]
RETURNS[newPkt: BOOLEAN, newLength: RPCLupine.DataLength] =
BEGIN
On exit if newPkt=TRUE, the packet has been decrypted if needed.
Normally, transmits inPkt and copies result into inPkt. If a signal occurs, calls DoSignal which handles the signal protocol up to the last resumption packet. DoSignal returns the last resume packet for us to transmit, which we do by assigning it to outPkt; that packet was allocated in DoSignal's local frame, which we must later deallocate.
outPkt: RPCLupine.RPCPkt ← inPkt; -- altered after a signal --
outPktFrame: POINTERNIL; -- DoSignal's local frame --
DO -- loop for signal handlers --
sentTime: BasicTime.Pulses; -- initialized after sending any packet --
iocb: IOCBNIL;
IOCB management procs are imbedded for efficiency!
GetIOCB: ENTRY PROC = INLINE
BEGIN
IF freeIOCBs # NIL
THEN { iocb ← freeIOCBs; freeIOCBs ← freeIOCBs.next }
ELSE { iocb ← CommUtilDefs.AllocateIocb[iocbSize] }
END;
ControlBlock: PROC RETURNS[ EthernetOneFace.ControlBlock ] = INLINE
{ RETURN[LOOPHOLE[iocb]] };
ReturnIOCB: PROC = INLINE
BEGIN
InnerReturn: ENTRY PROC = INLINE
{ iocb.next ← freeIOCBs; freeIOCBs ← iocb };
IF iocb # NIL THEN { WaitUntilSent[]; InnerReturn[] };
END;
WaitUntilSent: PROC = INLINE
BEGIN
Horrible! We must ensure that the ethernet microcode isn't still using our IOCB when we free or re-use it. There are two problems:
(a) a suitable response coming in before we've finished retransmitting our packet;
(b) the standard EthernetOneDriver resetting the microcode (so it forgets our request).
To handle (a) we busy-wait for up to 10msec, then wait for single ticks thereafter; to handle (b) we assume our iocb is no longer being used after it's pending for 5 secs. To do better seems to require changes to the EthernetOneFace.
WHILE EthernetOneFace.GetStatus[ControlBlock[]] = pending
DO IF BasicTime.GetClockPulses[] - sentTime < shortWait
THEN Process.Yield[]
ELSE IF BasicTime.GetClockPulses[] - sentTime < longerWait
THEN Process.Pause[1]
ELSE EXIT -- assume someone reset the microcode --;
ENDLOOP;
END;
NewCallNumber: ENTRY PROC RETURNS[RPCPkt.CallCount] = INLINE
{ RETURN[ callSequence ← callSequence+1 ] };
reply: BufferDefs.PupBuffer;
recvdHeader: LONG POINTER TO Header;
myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]];
acked: BOOLEAN;
thisPktID: RPCPkt.PktID;
header: LONG POINTER TO Header = @outPkt.header;
localHost: BOOLEAN = header.destHost = myHost;
localNet: BOOLEAN = header.destHost.net = myNet;
pingPulses: BasicTime.Pulses ← minPingPulses;
header.srceHost ← myHost;
header.srceSoc ← RPCPrivate.rpcSocket;
header.srcePSB ← myPSB;
IF header.pktID.pktSeq = 0 -- first packet of a call; yucky interface! --
THEN BEGIN
header.type ← SELECT state FROM
sending => [0, rpc, notEnd, pleaseAck, call],
call => [0, rpc, end, dontAck, call],
authReq => [0, rpc, end, dontAck, rfa],
ENDCASE => --receiving, endCall-- ERROR;
header.pktID.callSeq ← NewCallNumber[];
header.pktID.pktSeq ← 1;
acked ← FALSE;
END
ELSE BEGIN
header.type ← SELECT state FROM
sending => [0, rpc, notEnd, pleaseAck, data],
receiving => [0, rpc, end, dontAck, ack],
call => [0, rpc, end, dontAck, data],
endCall => [0, rpc, end, dontAck, data],
ENDCASE => --authReq-- ERROR;
IF state # receiving --header.type.class = data --
THEN { header.pktID.pktSeq ← header.pktID.pktSeq+1; acked←FALSE }
ELSE acked ← TRUE;
END;
thisPktID ← header.pktID;
SetWanting[myPSB];
DO -- loop for pings --
ENABLE UNWIND => { ClearWanting[myPSB]; ReturnIOCB[];
IF outPktFrame # NIL THEN PrincOpsUtils.Free[outPktFrame] };
BEGIN
transmissions: CARDINAL ← 0;
retransmitPulses: BasicTime.Pulses ←
IF localNet
THEN minRetransmitPulses
ELSE minRetransmitPulses + pulsesPerHop * PupDefs.GetHopsToNetwork[header.destHost.net];
IF outPkt.convHandle # RPC.unencrypted
THEN header.length ← RPCInternal.EncryptPkt[outPkt, length]
ELSE header.length ← length + RPCPkt.pktLengthOverhead;
header.oddByte ← no;
DO -- loop for retransmissions --
BEGIN
IF NOT localNet
THEN GeneralSend[outPkt]
ELSE BEGIN
IF localHost AND NOT( EthernetOneFace.hearSelf AND transmitLocalPkts )
THEN LocalSend[outPkt];
IF NOT localHost OR transmitLocalPkts
THEN BEGIN
IF iocb = NIL THEN GetIOCB[] ELSE WaitUntilSent[];
-- Pup checksum --
outPkt.data[header.length-RPCPkt.pktLengthOverhead] ← 177777B;
outPkt.encapsulation ← Encapsulation[ethernetOne[,,,,,,
header.destHost.host, myDeviceHost, pup]];
EthernetOneFace.QueueOutput[firstDevice,
@outPkt.encapsulation + DriverTypes.ethernetEncapsulationOffset,
header.length + (1+DriverTypes.ethernetEncapsulationBytes)/2,
ControlBlock[]]
END;
END;
sentTime ← BasicTime.GetClockPulses[];
sent ← sent+1;
-- wait for response: an ack or the reply or a timeout --
DO -- loop for each incoming packet (including garbage) --
reply ← MyReceive[myPSB, sentTime,
(IF acked THEN pingPulses ELSE retransmitPulses)];
IF reply = NIL
THEN IF acked
THEN GOTO ping
ELSE { header.type.ack ← pleaseAck; GOTO retransmit };
recvdHeader ← LOOPHOLE[@reply.pupLength];
IF recvdHeader.type.class = rfa
THEN BEGIN
IF RPCInternal.ReplyToRFA[
reply, header--encrypted--, thisPktID--clear--, inPkt.convHandle]
THEN NULL-- can't set "acked", because we must retransmit our data packet until we obtain the correct destPSB from some ack pkt --;
LOOP
END;
IF outPkt.convHandle # RPC.unencrypted
AND recvdHeader.conv = header.conv
AND recvdHeader.srceHost = header.destHost
THEN [,newLength] ← RPCInternal.DecryptPkt[recvdHeader, outPkt.convHandle]
ELSE newLength ← recvdHeader.length - RPCPkt.pktLengthOverhead;
IF recvdHeader.conv = header.conv
AND recvdHeader.srceHost = header.destHost
AND recvdHeader.pktID.activity = thisPktID.activity
THEN -- pkt is related to our call --
SELECT TRUE FROM
recvdHeader.pktID.callSeq = thisPktID.callSeq =>
pkt is in our call
SELECT TRUE FROM
a) pkt has next sequence number to ours
recvdHeader.pktID.pktSeq = 1+thisPktID.pktSeq =>
BEGIN
IF state = sending OR state = endCall
THEN -- he's not allowed to generate that pktSeq! --
ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ];
SELECT recvdHeader.type.class FROM
data => GOTO done;
ack =>
BEGIN
This can happen if state=call and callee sent next data
pkt, but it was lost and now he is responding to our
retransmission or ping. Soon, he will retransmit his
-- data pkt. --
acked ← TRUE;
GiveBackBuffer[reply];
END;
ENDCASE => --call,rfa--
ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ];
END;
b) pkt has same sequence number as ours
recvdHeader.pktID.pktSeq = thisPktID.pktSeq =>
BEGIN
SELECT recvdHeader.type.class FROM
ack => -- acknowledgement of our packet --
BEGIN
IF header.type.class = call
THEN header.destPSB ← recvdHeader.srcePSB;
acked ← TRUE;
IF state = sending OR state = endCall
THEN {GiveBackBuffer[reply]; reply←NIL; GOTO done}
ELSE -- state = call, authReq, or receiving --
Even if "pleaseAck", we don't need to ack it,
-- because other end should send data pkt soon --
GiveBackBuffer[reply];
END;
data, call => -- retransmisson of his packet --
IF state = receiving
THEN IF recvdHeader.type.ack = pleaseAck
THEN BEGIN
GiveBackBuffer[reply]; reply ← NIL;
GOTO retransmit -- we're already sending an ack --
END
ELSE GiveBackBuffer[reply]
ELSE ERROR RPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply] ];
ENDCASE => --rfa --
ERROR RPC.CallFailed[runtimeProtocol !
UNWIND => GiveBackBuffer[reply] ];
END;
c) pkt has earlier sequence number than ours
recvdHeader.pktID.pktSeq < thisPktID.pktSeq =>
GiveBackBuffer[reply]; -- no need to ack it --
d) pkt has some future sequence number
ENDCASE =>
ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ];
recvdHeader.pktID.callSeq > thisPktID.callSeq AND state=endCall =>
BEGIN
IF recvdHeader.type.class # call
THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ];
acks our last packet, so we can handle the call
GOTO done
END
ENDCASE => -- wrong call: let someone else do it --
{ recvdHeader.destPSB ← PrincOps.PsbNull; EnqueueAgain[reply] }
ELSE -- wrong conversation or activity: let someone else do it --
{ recvdHeader.destPSB ← PrincOps.PsbNull; EnqueueAgain[reply] };
Incoming RFA packets don't reach here.
ENDLOOP--for each incoming packet--;
EXITS
retransmit =>
BEGIN
transmissions ← transmissions+1;
IF (transmissions = maxTransmissions AND signalTimeout)
OR state = authReq
Don't retransmit RFA: caller will retransmit call pkt. Otherwise, if a spurious worker process occurred because of call pkt retransmission before response to RFA, then the spurious worker process sits needlessly retransmitting RFA's until it times out.
THEN { SIGNAL RPC.CallFailed[timeout]; transmissions ← 0 };
retransmitted ← retransmitted+1;
retransmitPulses ← retransmitPulses + minRetransmitPulses;
END;
END;
ENDLOOP-- for each transmission --;
EXITS
ping =>
BEGIN
header.type ← [0, rpc, end, pleaseAck, ack];
length ← 0; header.pktID ← thisPktID;
acked ← FALSE;
pingPulses ← MIN[maxPingPulses, pingPulses * 2];
END;
END;
only exit from loop is "GOTO done" or UNWIND
REPEAT done =>
BEGIN
-- This isn't covered by any UNWIND --
ClearWanting[myPSB]; ReturnIOCB[];
IF outPktFrame # NIL THEN PrincOpsUtils.Free[outPktFrame];
IF reply = NIL
THEN { --restore clear pktID-- header.pktID ← thisPktID; RETURN[FALSE,] }
ELSE BEGIN
IF recvdHeader.outcome = signal AND state = call
AND signalHandler # NIL
THEN [outPkt, length, outPktFrame] ←
RPCInternal.DoSignal[reply, newLength, signalHandler, inPkt.convHandle]
ELSE BEGIN
IF newLength > maxlength
THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
PrincOpsUtils.LongCOPY[from: recvdHeader, to: @inPkt.header,
nwords: recvdHeader.length];
GiveBackBuffer[reply];
RETURN[TRUE,newLength]
END;
END
END;
ENDLOOP-- for each ping--;
We get here only after coming back from a call of DoSignal
ENDLOOP-- for signal handlers --;
we can't get here!
END;
GeneralSend: PROC[pkt: RPCLupine.RPCPkt] =
BEGIN
b: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[];
PrincOpsUtils.LongCOPY[from: @(pkt.header), to: @(b.pupLength),
nwords: ConcreteHeader[@pkt.header].length];
PupDefs.PupRouterSendThis[b];
END;
LocalSend: PROC[pkt: RPCLupine.RPCPkt] =
BEGIN
handle talking to ourselves if the ethernet head can't
b: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[];
PrincOpsUtils.LongCOPY[from: @(pkt.header), to: @(b.pupLength),
nwords: ConcreteHeader[@pkt.header].length];
IF NOT AcceptPkt[b] THEN PupDefs.ReturnFreePupBuffer[b];
END;
idlerPkt: BufferDefs.PupBuffer ← NIL;
idlerCond: CONDITION ← [timeout:0];
waiterCond: CONDITION ← [timeout:Process.MsecToTicks[minRetransmitMsecs]];
WaiterArray: TYPE = ARRAY PrincOps.PsbIndex OF BufferDefs.PupBuffer;
waiterPkts: REF WaiterArray ← NEW[WaiterArray ← ALL[NIL]];
wanting: PACKED ARRAY PrincOps.PsbIndex OF BOOLEANALL[FALSE]; -- PSB expects a pkt --
SetWanting: ENTRY PROC[myPSB: PrincOps.PsbIndex] = INLINE
{ wanting[myPSB] ← TRUE };
ClearWanting: PROC[myPSB: PrincOps.PsbIndex] = INLINE
BEGIN
spare: BufferDefs.PupBuffer;
InnerClear: ENTRY PROC = INLINE
BEGIN
wanting[myPSB] ← FALSE;
IF (spare ← waiterPkts[myPSB]) # NIL THEN waiterPkts[myPSB] ← NIL;
END;
InnerClear[];
IF spare # NIL THEN GiveBackBuffer[spare] --ignore it, outside monitor--;
END;
MyReceive: ENTRY PROC[myPSB: PrincOps.PsbIndex, sentTime, waitTime: BasicTime.Pulses]
RETURNS[recvd: BufferDefs.PupBuffer] = INLINE
BEGIN
ENABLE UNWIND => NULL;
Returns NIL if no packet arrives within waitTime pulses after sentTime
DO IF (recvd ← waiterPkts[myPSB]) = NIL
THEN IF BasicTime.GetClockPulses[] - sentTime < waitTime
THEN WAIT waiterCond
ELSE EXIT
ELSE { waiterPkts[myPSB] ← NIL; EXIT };
ENDLOOP;
END;
RecvdPktTooLong: ERROR = CODE;
KillServer: ERROR = CODE;
servers: CARDINAL ← 0;
serversMax: CARDINAL ← 20;
idlers: CARDINAL ← 0;
idlersMax: CARDINAL = 6;
idlersMin: CARDINAL = 2;
Number of server processes will never exceed "serversMax". If number of idle servers exceeds "idlersMax", one will abort. If number of idle servers drops below "idlersMin", one is forked.
IdleReceive: PUBLIC PROC[pkt: RPCLupine.RPCPkt, maxlength: CARDINAL] =
BEGIN
b: BufferDefs.PupBuffer;
InnerIdler: ENTRY PROC = INLINE
BEGIN
IF idlers >= idlersMax
THEN { servers←servers-1; RETURN WITH ERROR KillServer[] };
idlers ← idlers + 1;
DO WAIT idlerCond; IF idlerPkt # NIL THEN EXIT ENDLOOP;
IF idlers < idlersMin AND servers < serversMax
THEN { servers←servers+1; Process.Detach[FORK Server[]] };
b ← idlerPkt;
BEGIN
recvdHeader: LONG POINTER TO Header = LOOPHOLE[@b.pupLength];
idlerPkt ← LOOPHOLE[idlerPkt.next,BufferDefs.PupBuffer]; b.next ← NIL;
IF recvdHeader.length > maxlength
THEN ERROR RecvdPktTooLong[] --NULL??--
ELSE PrincOpsUtils.LongCOPY[from: @b.pupLength, to: @pkt.header,
nwords: recvdHeader.length];
END;
END;
InnerIdler[];
GiveBackBuffer[b];--outside monitor--
END;
QueuesScrambled: ERROR = CODE;
spyProc: PROC[BufferDefs.PupBuffer] ← NIL;
SetSpyProc: PUBLIC ENTRY SAFE PROC[p: PROC[BufferDefs.PupBuffer]] = CHECKED
{ spyProc ← p };
EnqueueRecvd: ENTRY PROC[b: BufferDefs.PupBuffer, report: BOOLEAN]
RETURNS[BOOLEAN] =
BEGIN
Dispatch packet to appropriate RPC process, if any. Packet is known to be a Pup, and is addressed to our socket.
header: LONG POINTER TO Header = LOOPHOLE[@b.pupLength];
IF report AND spyProc # NIL THEN spyProc[b];
IF header.destHost = myHost AND header.type.subType = rpc
THEN BEGIN
destPSB: PrincOps.PsbIndex = header.destPSB;
recvd ← recvd+1;
IF destPSB NOT IN (PrincOps.PsbNull..LAST[PrincOps.PsbIndex]]
OR NOT wanting[destPSB]
THEN -- give to idle process to deal with --
Pkts are dealt with LIFO by idlers, but it doesn't matter (much)
BEGIN
IF idlers = 0
THEN -- server too busy: throw it away! --
RETURN[FALSE]
ELSE { b.next ← idlerPkt; idlerPkt ← b;
idlers ← idlers-1; NOTIFY idlerCond };
END
ELSE -- someone wants this packet: give them it --
BEGIN
IF waiterPkts[destPSB] # NIL THEN RETURN[FALSE];
waiterPkts[destPSB] ← b; BROADCAST waiterCond;
END;
RETURN[TRUE]
END
ELSE RETURN[FALSE];
END;
GiveBackBuffer: PROC[b: BufferDefs.PupBuffer] =
NOTE: calls of this must be made outside our monitor, because RPCPrivate.ReturnBuffer acquires the EthernetDriver monitor, and the EthernetDriver may call EnqueueRecvd which acquires our monitor!
IF PrincOpsUtils.IsBound[RPCPrivate.ReturnBuffer]
THEN RPCPrivate.ReturnBuffer
ELSE PupDefs.ReturnFreePupBuffer;
AcceptPkt: PROC[b: BufferDefs.PupBuffer] RETURNS[BOOLEAN] =
{ RETURN[ EnqueueRecvd[b: b, report: TRUE] ] };
EnqueueAgain: PUBLIC PROC[b: BufferDefs.PupBuffer] =
{ IF NOT EnqueueRecvd[b: b, report: FALSE] THEN GiveBackBuffer[b] };
Listener: PROC =
BEGIN
Catch any packets that get as far as the Pup socket level
soc: PupDefs.PupSocket = PupDefs.PupSocketMake[local: RPCPrivate.rpcSocket,
remote:,
ticks: PupDefs.veryLongWait];
DO ENABLE ABORTED => EXIT;
b: BufferDefs.PupBuffer = soc.get[];
IF NOT AcceptPkt[b] THEN PupDefs.ReturnFreePupBuffer[b];
ENDLOOP;
PupDefs.PupSocketDestroy[soc];
END;
listenerProcess: PROCESS;
Server: PROC =
BEGIN
RPCInternal.ServerMain[ ! KillServer => CONTINUE ];
END;
******** Initialization ********
Initialize: ENTRY PROC =
BEGIN
PupDefs.PupPackageMake[];
DriverDefs.ChangeNumberOfInputBuffers[TRUE];
myAddr ← PupDefs.AnyLocalPupAddress[RPCPrivate.rpcSocket];
myNet ← myAddr.net;
myDeviceHost ← myAddr.host;
myHost ← [myNet, myAddr.host];
START RPCInternal.RPCBinding; -- exports "RPCInternal.exportTable" --
START RPCInternal.RPCSecurity; -- exports "RPCInternal.firstConversation" --
START RPCInternal.RPCPktStreams; -- initialize connection states --
Booting.RegisterProcs[c: Checkpoint, r: Rollback];
servers ← servers+1; Process.Detach[FORK Server[]];
listenerProcess ← FORK Listener[];
END;
Checkpoint: Booting.CheckpointProc = TRUSTED
{ RPCPrivate.GetRPCPackets[NIL] };
Rollback: Booting.RollbackProc = TRUSTED
BEGIN
InnerRollback: ENTRY PROC = TRUSTED
BEGIN
RESTART RPCInternal.RPCPktStreams; -- forget connection counts --
RESTART RPCInternal.RPCSecurity; -- invalidate local conversations --
RESTART RPCInternal.RPCBinding; -- nothing? --
END;
InnerRollback[];
RPCPrivate.GetRPCPackets[AcceptPkt];
END;
Initialize[];
RPCPrivate.GetRPCPackets[AcceptPkt];
END.