-- File: RPCPktIO.mesa - last edit:
-- BJackson.pa 3-Jul-85 4:05:57
-- Created? HGM, 21-Jan-84 20:38:50
-- Cedar 5, HGM, 23-Jun-85 11:34:33
-- Yetch. Similar patch on Recv, HGM, 12-Apr-83 21:58:43
-- Patch out myHost checking, HGM, 12-Apr-83 20:11:32
-- Copyright (C) 1983, 1984 , 1985, 1985, 1985 by Xerox Corporation. All rights reserved.
-- RPC: Reliable transmission and reception of packets
-- RPCPktIO.mesa
-- Andrew Birrell September 1, 1982 4:00 pm
DIRECTORY
Buffer USING [PupBuffer, ReturnBuffer],
Driver USING [GetInputBuffer],
Frame USING [Free],
Heap USING [systemZone],
Inline USING [LongCOPY],
Process USING [Detach, MsecToTicks],
ProcessOperations USING [HandleToIndex, ReadPSB],
PSB USING [PsbIndex, PsbNull],
PupDefs USING [
GetHopsToNetwork, GetLocalPupAddress, PupAddress,
PupPackageMake, PupRouterSendThis, PupSocket, PupSocketDestroy, PupSocketMake,
veryLongWait],
PupTypes USING [PupAddress, PupHostID, PupNetID, PupSocketID],
MesaRPC USING [CallFailed, unencrypted],
RPCInternal USING [
DecryptPkt, DoSignal, EncryptPkt, ReplyToRFA, RPCBinding, RPCPktStreams,
RPCSecurity, ServerMain],
MesaRPCLupine USING [DataLength, Dispatcher, Header, RPCPkt],
RPCPkt USING [
CallCount, Header, Machine, PktExchangeState, PktID, pktLengthOverhead],
RPCPrivate USING [rpcSocket],
System USING [GetClockPulses, MicrosecondsToPulses, Pulses];
RPCPktIO: MONITOR
IMPORTS
Buffer, Driver, Frame, Heap, Inline, Process, ProcessOperations, PupDefs, MesaRPC,
RPCInternal, System
EXPORTS MesaRPCLupine --Encapsulation,Header-- , RPCPkt --PktExchange,IdleReceive--
SHARES MesaRPCLupine =
BEGIN
Header: PUBLIC TYPE = RPCPkt.Header;
ConcreteHeader: PROC [abstract: LONG POINTER TO MesaRPCLupine.Header]
RETURNS [LONG POINTER TO Header] = INLINE {RETURN[abstract]};
callSequence: RPCPkt.CallCount ← 0; -- monotonic from this host --
-- ******** sending and receiving packets ******** --
myHost: RPCPkt.Machine;
sent: CARDINAL ← 0;
recvd: CARDINAL ← 0;
retransmitted: CARDINAL ← 0;
-- "PktExchange" implements a reliable packet exchange. There are five cases:
-- sending: transmit data, wait for ack
-- receiving: transmit ack, wait for data
-- call: transmit data, wait for data
-- endCall: transmit data, wait for ack or data (data => start of new call)
-- authReq: transmit RFA, wait for data (like "call", but no retransmissions)
-- Data and RFA packets are retransmitted until an ack is received.
-- Ack packets are retransmitted only in response to "pleaseAck" requests
-- No acknowledgement after 14 transmissions is fatal (CallFailed[timeout])
-- When the transmitted packet has been acknowledged (if needed), a ping is
-- sent periodically (ack packet saying pleaseAck). If necessary the ping is
-- retransmitted until it has been acked. Failure to receive an ack for the ping
-- is fatal (CallFailed[timeout]). Provided pings continue to be acknowledged,
-- there is no limit on how long we will wait for the next data packet. If the
-- client gets impatient, he can abort this process. (This corresponds to the
-- local-machine semantics of a procedure call.)
minRetransmitMsecs: CARDINAL = 100; -- retransmit delay for local net --
msecsPerHop: CARDINAL = 500; -- approximate typical gateway hop delay? --
minPingMsecs: CARDINAL = 5000; -- initial interval between pings --
maxPingSecs: CARDINAL = 300; -- long-term ping interval --
maxTransmissions: CARDINAL = 20; -- give up after too many transmissions --
signalTimeout: BOOLEAN ← TRUE; -- debugging switch --
-- The retransmission delay is incremented by minPingMsecs each time we timeout,
-- but is reset when we receive the desired packet. If n=maxTransmissions
-- and i=minRetransmitMsecs and j=hops*msecsPerHop, we give up after
-- i*(n*n+n)/2+n*j msecs.
-- For the local network, that comes to 21 seconds.
-- For a two-hop route, that comes to 41 seconds.
-- The ping delay is doubled at each ping, up to maxPingSecs, which is 5 minutes;
-- The maxPingSecs value is reached after about 5 minutes.
minRetransmitPulses: System.Pulses = System.MicrosecondsToPulses[
LONG[1000]*minRetransmitMsecs];
pulsesPerHop: System.Pulses = System.MicrosecondsToPulses[
LONG[1000]*msecsPerHop];
minPingPulses: System.Pulses = System.MicrosecondsToPulses[
LONG[1000]*minPingMsecs];
maxPingPulses: System.Pulses = System.MicrosecondsToPulses[
LONG[1000]*LONG[1000]*maxPingSecs];
transmitLocalPkts: BOOLEAN ← TRUE;
PktExchange: PUBLIC PROC [
inPkt: MesaRPCLupine.RPCPkt, length: MesaRPCLupine.DataLength,
maxlength: MesaRPCLupine.DataLength, state: RPCPkt.PktExchangeState,
signalHandler: MesaRPCLupine.Dispatcher ← NIL]
RETURNS [newPkt: BOOLEAN, newLength: MesaRPCLupine.DataLength] =
BEGIN
-- On exit if newPkt=TRUE, the packet has been decrypted if needed. --
-- Normally, transmits inPkt and copies result into inPkt.
-- If a signal occurs, calls DoSignal which handles the signal protocol
-- up to the last resumption packet. DoSignal returns the last resume
-- packet for us to transmit, which we do by assigning it to outPkt;
-- that packet was allocated in DoSignal's local frame, which we must
-- later deallocate.
outPkt: MesaRPCLupine.RPCPkt ← inPkt; -- altered after a signal --
outPktFrame: POINTER ← NIL; -- DoSignal's local frame --
DO -- loop for signal handlers --
sentTime: System.Pulses; -- initialized after sending any packet --
NewCallNumber: ENTRY PROC RETURNS [RPCPkt.CallCount] = INLINE {
RETURN[callSequence ← callSequence + 1]};
reply: Buffer.PupBuffer;
recvdHeader: LONG POINTER TO Header;
myPSB: PSB.PsbIndex = ProcessOperations.HandleToIndex[
ProcessOperations.ReadPSB[]];
acked: BOOLEAN;
thisPktID: RPCPkt.PktID;
header: LONG POINTER TO Header = @outPkt.header;
pingPulses: System.Pulses ← minPingPulses;
header.srceHost ← myHost;
header.srceSoc ← RPCPrivate.rpcSocket;
header.srcePSB ← myPSB;
IF header.pktID.pktSeq = 0 -- first packet of a call; yucky interface! --
THEN
BEGIN
header.type ←
SELECT state FROM
sending => [0, rpc, notEnd, pleaseAck, call],
call => [0, rpc, end, dontAck, call],
authReq => [0, rpc, end, dontAck, rfa],
ENDCASE => --receiving, endCall-- ERROR;
header.pktID.callSeq ← NewCallNumber[];
header.pktID.pktSeq ← 1;
acked ← FALSE;
END
ELSE
BEGIN
header.type ←
SELECT state FROM
sending => [0, rpc, notEnd, pleaseAck, data],
receiving => [0, rpc, end, dontAck, ack],
call => [0, rpc, end, dontAck, data],
endCall => [0, rpc, end, dontAck, data],
ENDCASE => --authReq-- ERROR;
IF state # receiving --header.type.class = data --
THEN {header.pktID.pktSeq ← header.pktID.pktSeq + 1; acked ← FALSE}
ELSE acked ← TRUE;
END;
thisPktID ← header.pktID;
SetWanting[myPSB];
DO -- loop for pings --
ENABLE
UNWIND => {
ClearWanting[myPSB];
IF outPktFrame # NIL THEN Frame.Free[outPktFrame]};
BEGIN
transmissions: CARDINAL ← 0;
retransmitPulses: System.Pulses ← [minRetransmitPulses +
pulsesPerHop*PupDefs.GetHopsToNetwork[header.destHost.net]];
IF outPkt.convHandle # MesaRPC.unencrypted THEN
header.length ← RPCInternal.EncryptPkt[outPkt, length]
ELSE header.length ← length + RPCPkt.pktLengthOverhead;
header.oddByte ← no;
DO -- loop for retransmissions --
BEGIN
GeneralSend[outPkt];
sentTime ← System.GetClockPulses[];
sent ← sent + 1;
-- wait for response: an ack or the reply or a timeout --
DO -- loop for each incoming packet (including garbage) --
reply ← MyReceive[
myPSB, sentTime, (IF acked THEN pingPulses ELSE retransmitPulses)];
IF reply = NIL THEN
IF acked THEN GOTO ping
ELSE {header.type.ack ← pleaseAck; GOTO retransmit};
recvdHeader ← LOOPHOLE[@reply.pup.pupLength];
IF recvdHeader.type.class = rfa THEN
BEGIN
IF RPCInternal.ReplyToRFA[
reply, header, --encrypted--
thisPktID --clear-- , inPkt.convHandle] THEN
NULL-- can't set "acked", because we must retransmit our data
-- packet until we obtain the correct destPSB from some ack pkt --;
LOOP
END;
IF outPkt.convHandle # MesaRPC.unencrypted
AND recvdHeader.conv = header.conv
AND recvdHeader.srceHost = header.destHost THEN
[, newLength] ← RPCInternal.DecryptPkt[
recvdHeader, outPkt.convHandle]
ELSE newLength ← recvdHeader.length - RPCPkt.pktLengthOverhead;
IF recvdHeader.conv = header.conv
-- PATCH AND recvdHeader.srceHost = header.destHost
AND recvdHeader.pktID.activity = thisPktID.activity THEN -- pkt is related to our call --
SELECT TRUE FROM
recvdHeader.pktID.callSeq = thisPktID.callSeq =>
-- pkt is in our call --
SELECT TRUE FROM
-- a) pkt has next sequence number to ours --
recvdHeader.pktID.pktSeq = 1 + thisPktID.pktSeq =>
BEGIN
IF state = sending OR state = endCall THEN -- he's not allowed to generate that pktSeq! --
ERROR MesaRPC.CallFailed[
runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
SELECT recvdHeader.type.class FROM
data => GOTO done;
ack =>
BEGIN
-- This can happen if state=call and callee sent next data
-- pkt, but it was lost and now he is responding to our
-- retransmission or ping. Soon, he will retransmit his
-- data pkt. --
acked ← TRUE;
GiveBackBuffer[reply];
END;
ENDCASE => --call,rfa--
ERROR MesaRPC.CallFailed[
runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
END;
-- b) pkt has same sequence number as ours --
recvdHeader.pktID.pktSeq = thisPktID.pktSeq =>
BEGIN
SELECT recvdHeader.type.class FROM
ack => -- acknowledgement of our packet --
BEGIN
IF header.type.class = call THEN
header.destPSB ← recvdHeader.srcePSB;
acked ← TRUE;
IF state = sending OR state = endCall THEN {
GiveBackBuffer[reply]; reply ← NIL; GOTO done}
ELSE -- state = call, authReq, or receiving --
-- Even if "pleaseAck", we don't need to ack it,
-- because other end should send data pkt soon --
GiveBackBuffer[reply];
END;
data, call => -- retransmisson of his packet --
IF state = receiving THEN
IF recvdHeader.type.ack = pleaseAck THEN
BEGIN
GiveBackBuffer[reply]; reply ← NIL;
GOTO retransmit -- we're already sending an ack --
END
ELSE GiveBackBuffer[reply]
ELSE
ERROR MesaRPC.CallFailed[
runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
ENDCASE => --rfa --
ERROR MesaRPC.CallFailed[
runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
END;
-- c) pkt has earlier sequence number than ours --
recvdHeader.pktID.pktSeq < thisPktID.pktSeq =>
GiveBackBuffer[reply]; -- no need to ack it --
-- d) pkt has some future sequence number --
ENDCASE =>
ERROR MesaRPC.CallFailed[
runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
recvdHeader.pktID.callSeq > thisPktID.callSeq AND state = endCall
=>
BEGIN
IF recvdHeader.type.class # call THEN
ERROR MesaRPC.CallFailed[
runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
-- acks our last packet, so we can handle the call --
GOTO done
END
ENDCASE => -- wrong call: let someone else do it --
{recvdHeader.destPSB ← PSB.PsbNull; EnqueueAgain[reply]}
ELSE -- wrong conversation or activity: let someone else do it --
{recvdHeader.destPSB ← PSB.PsbNull; EnqueueAgain[reply]};
-- Incoming RFA packets don't reach here. --
ENDLOOP --for each incoming packet-- ;
EXITS
retransmit =>
BEGIN
transmissions ← transmissions + 1;
IF (transmissions = maxTransmissions AND signalTimeout)
OR state = authReq THEN
-- Don't retransmit RFA: caller will retransmit call pkt.
-- Otherwise, if a spurious worker process occurred because of call pkt
-- retransmission before response to RFA, then the spurious worker
-- process sits needlessly retransmitting RFA's until it times out.
{SIGNAL MesaRPC.CallFailed[timeout]; transmissions ← 0};
retransmitted ← retransmitted + 1;
retransmitPulses ← [retransmitPulses + minRetransmitPulses];
END;
END;
ENDLOOP -- for each transmission -- ;
EXITS
ping =>
BEGIN
header.type ← [0, rpc, end, pleaseAck, ack];
length ← 0;
header.pktID ← thisPktID;
acked ← FALSE;
pingPulses ← [MIN[maxPingPulses, pingPulses*2]];
END;
END;
-- only exit from loop is "GOTO done" or UNWIND --
REPEAT
done =>
BEGIN
-- This isn't covered by any UNWIND --
ClearWanting[myPSB];
IF outPktFrame # NIL THEN Frame.Free[outPktFrame];
IF reply = NIL THEN {
--restore clear pktID-- header.pktID ← thisPktID; RETURN[FALSE, ]}
ELSE
BEGIN
IF recvdHeader.outcome = signal AND state = call
AND signalHandler # NIL THEN
[outPkt, length, outPktFrame] ← RPCInternal.DoSignal[
reply, newLength, signalHandler, inPkt.convHandle]
ELSE
BEGIN
IF newLength > maxlength THEN
ERROR MesaRPC.CallFailed[
runtimeProtocol ! UNWIND => GiveBackBuffer[reply]];
Inline.LongCOPY[
from: recvdHeader, to: @inPkt.header,
nwords: recvdHeader.length];
GiveBackBuffer[reply];
RETURN[TRUE, newLength]
END;
END
END;
ENDLOOP -- for each ping-- ;
-- We get here only after coming back from a call of DoSignal --
ENDLOOP -- for signal handlers -- ;
-- we can't get here! --
END;
GeneralSend: PROC [pkt: MesaRPCLupine.RPCPkt] =
BEGIN
b: Buffer.PupBuffer;
UNTIL (b ← Driver.GetInputBuffer[TRUE]) # NIL DO ENDLOOP;
Inline.LongCOPY[
from: @(pkt.header), to: @(b.pup.pupLength),
nwords: ConcreteHeader[@pkt.header].length];
b.type ← pup;
PupDefs.PupRouterSendThis[b];
END;
idlerPkt: Buffer.PupBuffer ← NIL;
idlerCond: CONDITION ← [timeout: 0];
waiterCond: CONDITION ← [timeout: Process.MsecToTicks[minRetransmitMsecs]];
WaiterArray: TYPE = ARRAY PSB.PsbIndex OF Buffer.PupBuffer;
waiterPkts: LONG POINTER TO WaiterArray ← Heap.systemZone.NEW[
WaiterArray ← ALL[NIL]];
wanting: PACKED ARRAY PSB.PsbIndex OF BOOLEAN ← ALL[FALSE]; -- PSB expects a pkt --
SetWanting: ENTRY PROC [myPSB: PSB.PsbIndex] = INLINE {wanting[myPSB] ← TRUE};
ClearWanting: PROC [myPSB: PSB.PsbIndex] = INLINE
BEGIN
spare: Buffer.PupBuffer;
InnerClear: ENTRY PROC = INLINE
BEGIN
wanting[myPSB] ← FALSE;
IF (spare ← waiterPkts[myPSB]) # NIL THEN waiterPkts[myPSB] ← NIL;
END;
InnerClear[];
IF spare # NIL THEN GiveBackBuffer[spare] --ignore it, outside monitor-- ;
END;
MyReceive: ENTRY PROC [myPSB: PSB.PsbIndex, sentTime, waitTime: System.Pulses]
RETURNS [recvd: Buffer.PupBuffer] = INLINE
BEGIN
ENABLE UNWIND => NULL;
-- Returns NIL if no packet arrives within waitTime pulses after sentTime --
DO
IF (recvd ← waiterPkts[myPSB]) = NIL THEN
IF System.GetClockPulses[] - sentTime < waitTime THEN WAIT waiterCond
ELSE EXIT
ELSE {waiterPkts[myPSB] ← NIL; EXIT};
ENDLOOP;
END;
RecvdPktTooLong: ERROR = CODE;
KillServer: ERROR = CODE;
servers: CARDINAL ← 0;
serversMax: CARDINAL ← 20;
idlers: CARDINAL ← 0;
idlersMax: CARDINAL = 6;
idlersMin: CARDINAL = 2;
-- Number of server processes will never exceed "serversMax".
-- If number of idle servers exceeds "idlersMax", one will abort.
-- If number of idle servers drops below "idlersMin", one is forked.
IdleReceive: PUBLIC PROC [pkt: MesaRPCLupine.RPCPkt, maxlength: CARDINAL] =
BEGIN
b: Buffer.PupBuffer;
InnerIdler: ENTRY PROC = INLINE
BEGIN
IF idlers >= idlersMax THEN {
servers ← servers - 1; RETURN WITH ERROR KillServer[]};
idlers ← idlers + 1;
DO WAIT idlerCond; IF idlerPkt # NIL THEN EXIT ENDLOOP;
IF idlers < idlersMin AND servers < serversMax THEN {
servers ← servers + 1; Process.Detach[FORK Server[]]};
b ← idlerPkt;
BEGIN
recvdHeader: LONG POINTER TO Header = LOOPHOLE[@b.pup.pupLength];
idlerPkt ← LOOPHOLE[idlerPkt.next, Buffer.PupBuffer];
b.next ← NIL;
IF recvdHeader.length > maxlength THEN ERROR RecvdPktTooLong[] --NULL??--
ELSE
Inline.LongCOPY[
from: @b.pup.pupLength, to: @pkt.header, nwords: recvdHeader.length];
END;
END;
InnerIdler[];
GiveBackBuffer[b]; --outside monitor--
END;
QueuesScrambled: ERROR = CODE;
GiveBackBuffer: PROC [b: Buffer.PupBuffer] = Buffer.ReturnBuffer;
Listener: PROC =
BEGIN
-- Catch any packets that get as far as the Pup socket level --
soc: PupDefs.PupSocket = PupDefs.PupSocketMake[
local: RPCPrivate.rpcSocket, remote:, ticks: PupDefs.veryLongWait];
DO
ENABLE ABORTED => EXIT;
b: Buffer.PupBuffer = soc.get[];
IF NOT AcceptPkt[b] THEN Buffer.ReturnBuffer[b];
ENDLOOP;
PupDefs.PupSocketDestroy[soc];
END;
AcceptPkt: PROC[b: Buffer.PupBuffer] RETURNS[BOOLEAN] =
{ RETURN[ EnqueueRecvd[b] ] };
EnqueueAgain: PUBLIC PROC [b: Buffer.PupBuffer] =
-- This is a procedure mainly for debugger breakpoints! --
{IF NOT EnqueueRecvd[b] THEN Buffer.ReturnBuffer[b]};
EnqueueRecvd: PUBLIC ENTRY PROC [b: Buffer.PupBuffer] RETURNS [BOOLEAN] =
BEGIN
-- despatch packet to appropriate RPC process, if any --
-- Packet is known to be a Pup, and is addressed to our socket. --
header: LONG POINTER TO Header = LOOPHOLE[@b.pup.pupLength];
IF --header.destHost = myHost AND-- header.type.subType = rpc THEN
BEGIN
destPSB: PSB.PsbIndex = header.destPSB;
recvd ← recvd + 1;
IF destPSB NOT IN (PSB.PsbNull..LAST[PSB.PsbIndex]] OR NOT wanting[destPSB]
THEN -- give to idle process to deal with --
-- Pkts are dealt with LIFO by idlers, but it doesn't matter (much)
BEGIN
IF idlers = 0 THEN -- server too busy: throw it away! --
RETURN[FALSE]
ELSE {
b.next ← idlerPkt; idlerPkt ← b; idlers ← idlers - 1; NOTIFY idlerCond};
END
ELSE -- someone wants this packet: give them it --
BEGIN
IF waiterPkts[destPSB] # NIL THEN RETURN[FALSE];
waiterPkts[destPSB] ← b;
BROADCAST waiterCond;
END;
RETURN[TRUE]
END
ELSE RETURN[FALSE];
END;
listenerProcess: PROCESS;
Server: PROC = BEGIN RPCInternal.ServerMain[ ! KillServer => CONTINUE]; END;
-- ******** Initialization ******** --
Initialize: ENTRY PROC =
BEGIN
myAddr: PupTypes.PupAddress;
[] ← PupDefs.PupPackageMake[];
myAddr ← PupDefs.GetLocalPupAddress[RPCPrivate.rpcSocket, NIL];
myHost ← [myAddr.net, myAddr.host];
START RPCInternal.RPCBinding; -- exports "RPCInternal.exportTable" --
START RPCInternal.RPCSecurity; -- exports "RPCInternal.firstConversation" --
START RPCInternal.RPCPktStreams; -- initialize connection states --
servers ← servers + 1;
Process.Detach[FORK Server[]];
listenerProcess ← FORK Listener[];
END;
Initialize[];
END.