-- Copyright (C) 1983, 1984 by Xerox Corporation. All rights reserved. -- RPCPktIO.mesa, HGM, 21-Jan-84 20:38:50 -- Cedar 5, HGM, 21-Jan-84 20:38:47 -- Yetch. Similar patch on Recv, HGM, 12-Apr-83 21:58:43 -- Patch out myHost checking, HGM, 12-Apr-83 20:11:32 -- RPC: Reliable transmission and reception of packets -- RPCPktIO.mesa -- Andrew Birrell September 1, 1982 4:00 pm DIRECTORY Buffer USING [PupBuffer, ReturnBuffer], Driver USING [GetInputBuffer], Frame USING [Free], Heap USING [systemZone], Inline USING [LongCOPY], Process USING [Detach, MsecToTicks], ProcessOperations USING [HandleToIndex, ReadPSB], PSB USING [PsbIndex, PsbNull], PupDefs USING [ GetHopsToNetwork, GetLocalPupAddress, PupAddress, PupPackageMake, PupRouterSendThis, PupSocket, PupSocketDestroy, PupSocketMake, veryLongWait], PupTypes USING [PupAddress, PupHostID, PupNetID, PupSocketID], MesaRPC USING [CallFailed, unencrypted], RPCInternal USING [ DecryptPkt, DoSignal, EncryptPkt, ReplyToRFA, RPCBinding, RPCPktStreams, RPCSecurity, ServerMain], MesaRPCLupine USING [DataLength, Dispatcher, Header, RPCPkt], RPCPkt USING [ CallCount, Header, Machine, PktExchangeState, PktID, pktLengthOverhead], RPCPrivate USING [rpcSocket], System USING [GetClockPulses, MicrosecondsToPulses, Pulses]; RPCPktIO: MONITOR IMPORTS Buffer, Driver, Frame, Heap, Inline, Process, ProcessOperations, PupDefs, MesaRPC, RPCInternal, System EXPORTS MesaRPCLupine --Encapsulation,Header-- , RPCPkt --PktExchange,IdleReceive-- SHARES MesaRPCLupine = BEGIN Header: PUBLIC TYPE = RPCPkt.Header; ConcreteHeader: PROC [abstract: LONG POINTER TO MesaRPCLupine.Header] RETURNS [LONG POINTER TO Header] = INLINE {RETURN[abstract]}; callSequence: RPCPkt.CallCount ← 0; -- monotonic from this host -- -- ******** sending and receiving packets ******** -- myHost: RPCPkt.Machine; sent: CARDINAL ← 0; recvd: CARDINAL ← 0; retransmitted: CARDINAL ← 0; -- "PktExchange" implements a reliable packet exchange. There are five cases: -- sending: transmit data, wait for ack -- receiving: transmit ack, wait for data -- call: transmit data, wait for data -- endCall: transmit data, wait for ack or data (data => start of new call) -- authReq: transmit RFA, wait for data (like "call", but no retransmissions) -- Data and RFA packets are retransmitted until an ack is received. -- Ack packets are retransmitted only in response to "pleaseAck" requests -- No acknowledgement after 14 transmissions is fatal (CallFailed[timeout]) -- When the transmitted packet has been acknowledged (if needed), a ping is -- sent periodically (ack packet saying pleaseAck). If necessary the ping is -- retransmitted until it has been acked. Failure to receive an ack for the ping -- is fatal (CallFailed[timeout]). Provided pings continue to be acknowledged, -- there is no limit on how long we will wait for the next data packet. If the -- client gets impatient, he can abort this process. (This corresponds to the -- local-machine semantics of a procedure call.) minRetransmitMsecs: CARDINAL = 100; -- retransmit delay for local net -- msecsPerHop: CARDINAL = 500; -- approximate typical gateway hop delay? -- minPingMsecs: CARDINAL = 5000; -- initial interval between pings -- maxPingSecs: CARDINAL = 300; -- long-term ping interval -- maxTransmissions: CARDINAL = 20; -- give up after too many transmissions -- signalTimeout: BOOLEAN ← TRUE; -- debugging switch -- -- The retransmission delay is incremented by minPingMsecs each time we timeout, -- but is reset when we receive the desired packet. If n=maxTransmissions -- and i=minRetransmitMsecs and j=hops*msecsPerHop, we give up after -- i*(n*n+n)/2+n*j msecs. -- For the local network, that comes to 21 seconds. -- For a two-hop route, that comes to 41 seconds. -- The ping delay is doubled at each ping, up to maxPingSecs, which is 5 minutes; -- The maxPingSecs value is reached after about 5 minutes. minRetransmitPulses: System.Pulses = System.MicrosecondsToPulses[ LONG[1000]*minRetransmitMsecs]; pulsesPerHop: System.Pulses = System.MicrosecondsToPulses[ LONG[1000]*msecsPerHop]; minPingPulses: System.Pulses = System.MicrosecondsToPulses[ LONG[1000]*minPingMsecs]; maxPingPulses: System.Pulses = System.MicrosecondsToPulses[ LONG[1000]*LONG[1000]*maxPingSecs]; transmitLocalPkts: BOOLEAN ← TRUE; PktExchange: PUBLIC PROC [ inPkt: MesaRPCLupine.RPCPkt, length: MesaRPCLupine.DataLength, maxlength: MesaRPCLupine.DataLength, state: RPCPkt.PktExchangeState, signalHandler: MesaRPCLupine.Dispatcher ← NIL] RETURNS [newPkt: BOOLEAN, newLength: MesaRPCLupine.DataLength] = BEGIN -- On exit if newPkt=TRUE, the packet has been decrypted if needed. -- -- Normally, transmits inPkt and copies result into inPkt. -- If a signal occurs, calls DoSignal which handles the signal protocol -- up to the last resumption packet. DoSignal returns the last resume -- packet for us to transmit, which we do by assigning it to outPkt; -- that packet was allocated in DoSignal's local frame, which we must -- later deallocate. outPkt: MesaRPCLupine.RPCPkt ← inPkt; -- altered after a signal -- outPktFrame: POINTER ← NIL; -- DoSignal's local frame -- DO -- loop for signal handlers -- sentTime: System.Pulses; -- initialized after sending any packet -- NewCallNumber: ENTRY PROC RETURNS [RPCPkt.CallCount] = INLINE { RETURN[callSequence ← callSequence + 1]}; reply: Buffer.PupBuffer; recvdHeader: LONG POINTER TO Header; myPSB: PSB.PsbIndex = ProcessOperations.HandleToIndex[ ProcessOperations.ReadPSB[]]; acked: BOOLEAN; thisPktID: RPCPkt.PktID; header: LONG POINTER TO Header = @outPkt.header; pingPulses: System.Pulses ← minPingPulses; header.srceHost ← myHost; header.srceSoc ← RPCPrivate.rpcSocket; header.srcePSB ← myPSB; IF header.pktID.pktSeq = 0 -- first packet of a call; yucky interface! -- THEN BEGIN header.type ← SELECT state FROM sending => [0, rpc, notEnd, pleaseAck, call], call => [0, rpc, end, dontAck, call], authReq => [0, rpc, end, dontAck, rfa], ENDCASE => --receiving, endCall-- ERROR; header.pktID.callSeq ← NewCallNumber[]; header.pktID.pktSeq ← 1; acked ← FALSE; END ELSE BEGIN header.type ← SELECT state FROM sending => [0, rpc, notEnd, pleaseAck, data], receiving => [0, rpc, end, dontAck, ack], call => [0, rpc, end, dontAck, data], endCall => [0, rpc, end, dontAck, data], ENDCASE => --authReq-- ERROR; IF state # receiving --header.type.class = data -- THEN {header.pktID.pktSeq ← header.pktID.pktSeq + 1; acked ← FALSE} ELSE acked ← TRUE; END; thisPktID ← header.pktID; SetWanting[myPSB]; DO -- loop for pings -- ENABLE UNWIND => { ClearWanting[myPSB]; IF outPktFrame # NIL THEN Frame.Free[outPktFrame]}; BEGIN transmissions: CARDINAL ← 0; retransmitPulses: System.Pulses ← [minRetransmitPulses + pulsesPerHop*PupDefs.GetHopsToNetwork[header.destHost.net]]; IF outPkt.convHandle # MesaRPC.unencrypted THEN header.length ← RPCInternal.EncryptPkt[outPkt, length] ELSE header.length ← length + RPCPkt.pktLengthOverhead; header.oddByte ← no; DO -- loop for retransmissions -- BEGIN GeneralSend[outPkt]; sentTime ← System.GetClockPulses[]; sent ← sent + 1; -- wait for response: an ack or the reply or a timeout -- DO -- loop for each incoming packet (including garbage) -- reply ← MyReceive[ myPSB, sentTime, (IF acked THEN pingPulses ELSE retransmitPulses)]; IF reply = NIL THEN IF acked THEN GOTO ping ELSE {header.type.ack ← pleaseAck; GOTO retransmit}; recvdHeader ← LOOPHOLE[@reply.pup.pupLength]; IF recvdHeader.type.class = rfa THEN BEGIN IF RPCInternal.ReplyToRFA[ reply, header, --encrypted-- thisPktID --clear-- , inPkt.convHandle] THEN NULL-- can't set "acked", because we must retransmit our data -- packet until we obtain the correct destPSB from some ack pkt --; LOOP END; IF outPkt.convHandle # MesaRPC.unencrypted AND recvdHeader.conv = header.conv AND recvdHeader.srceHost = header.destHost THEN [, newLength] ← RPCInternal.DecryptPkt[ recvdHeader, outPkt.convHandle] ELSE newLength ← recvdHeader.length - RPCPkt.pktLengthOverhead; IF recvdHeader.conv = header.conv -- PATCH AND recvdHeader.srceHost = header.destHost AND recvdHeader.pktID.activity = thisPktID.activity THEN -- pkt is related to our call -- SELECT TRUE FROM recvdHeader.pktID.callSeq = thisPktID.callSeq => -- pkt is in our call -- SELECT TRUE FROM -- a) pkt has next sequence number to ours -- recvdHeader.pktID.pktSeq = 1 + thisPktID.pktSeq => BEGIN IF state = sending OR state = endCall THEN -- he's not allowed to generate that pktSeq! -- ERROR MesaRPC.CallFailed[ runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; SELECT recvdHeader.type.class FROM data => GOTO done; ack => BEGIN -- This can happen if state=call and callee sent next data -- pkt, but it was lost and now he is responding to our -- retransmission or ping. Soon, he will retransmit his -- data pkt. -- acked ← TRUE; GiveBackBuffer[reply]; END; ENDCASE => --call,rfa-- ERROR MesaRPC.CallFailed[ runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; END; -- b) pkt has same sequence number as ours -- recvdHeader.pktID.pktSeq = thisPktID.pktSeq => BEGIN SELECT recvdHeader.type.class FROM ack => -- acknowledgement of our packet -- BEGIN IF header.type.class = call THEN header.destPSB ← recvdHeader.srcePSB; acked ← TRUE; IF state = sending OR state = endCall THEN { GiveBackBuffer[reply]; reply ← NIL; GOTO done} ELSE -- state = call, authReq, or receiving -- -- Even if "pleaseAck", we don't need to ack it, -- because other end should send data pkt soon -- GiveBackBuffer[reply]; END; data, call => -- retransmisson of his packet -- IF state = receiving THEN IF recvdHeader.type.ack = pleaseAck THEN BEGIN GiveBackBuffer[reply]; reply ← NIL; GOTO retransmit -- we're already sending an ack -- END ELSE GiveBackBuffer[reply] ELSE ERROR MesaRPC.CallFailed[ runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; ENDCASE => --rfa -- ERROR MesaRPC.CallFailed[ runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; END; -- c) pkt has earlier sequence number than ours -- recvdHeader.pktID.pktSeq < thisPktID.pktSeq => GiveBackBuffer[reply]; -- no need to ack it -- -- d) pkt has some future sequence number -- ENDCASE => ERROR MesaRPC.CallFailed[ runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; recvdHeader.pktID.callSeq > thisPktID.callSeq AND state = endCall => BEGIN IF recvdHeader.type.class # call THEN ERROR MesaRPC.CallFailed[ runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; -- acks our last packet, so we can handle the call -- GOTO done END ENDCASE => -- wrong call: let someone else do it -- {recvdHeader.destPSB ← PSB.PsbNull; EnqueueAgain[reply]} ELSE -- wrong conversation or activity: let someone else do it -- {recvdHeader.destPSB ← PSB.PsbNull; EnqueueAgain[reply]}; -- Incoming RFA packets don't reach here. -- ENDLOOP --for each incoming packet-- ; EXITS retransmit => BEGIN transmissions ← transmissions + 1; IF (transmissions = maxTransmissions AND signalTimeout) OR state = authReq THEN -- Don't retransmit RFA: caller will retransmit call pkt. -- Otherwise, if a spurious worker process occurred because of call pkt -- retransmission before response to RFA, then the spurious worker -- process sits needlessly retransmitting RFA's until it times out. {SIGNAL MesaRPC.CallFailed[timeout]; transmissions ← 0}; retransmitted ← retransmitted + 1; retransmitPulses ← [retransmitPulses + minRetransmitPulses]; END; END; ENDLOOP -- for each transmission -- ; EXITS ping => BEGIN header.type ← [0, rpc, end, pleaseAck, ack]; length ← 0; header.pktID ← thisPktID; acked ← FALSE; pingPulses ← [MIN[maxPingPulses, pingPulses*2]]; END; END; -- only exit from loop is "GOTO done" or UNWIND -- REPEAT done => BEGIN -- This isn't covered by any UNWIND -- ClearWanting[myPSB]; IF outPktFrame # NIL THEN Frame.Free[outPktFrame]; IF reply = NIL THEN { --restore clear pktID-- header.pktID ← thisPktID; RETURN[FALSE, ]} ELSE BEGIN IF recvdHeader.outcome = signal AND state = call AND signalHandler # NIL THEN [outPkt, length, outPktFrame] ← RPCInternal.DoSignal[ reply, newLength, signalHandler, inPkt.convHandle] ELSE BEGIN IF newLength > maxlength THEN ERROR MesaRPC.CallFailed[ runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; Inline.LongCOPY[ from: recvdHeader, to: @inPkt.header, nwords: recvdHeader.length]; GiveBackBuffer[reply]; RETURN[TRUE, newLength] END; END END; ENDLOOP -- for each ping-- ; -- We get here only after coming back from a call of DoSignal -- ENDLOOP -- for signal handlers -- ; -- we can't get here! -- END; GeneralSend: PROC [pkt: MesaRPCLupine.RPCPkt] = BEGIN b: Buffer.PupBuffer; UNTIL (b ← Driver.GetInputBuffer[TRUE]) # NIL DO ENDLOOP; Inline.LongCOPY[ from: @(pkt.header), to: @(b.pup.pupLength), nwords: ConcreteHeader[@pkt.header].length]; PupDefs.PupRouterSendThis[b]; END; idlerPkt: Buffer.PupBuffer ← NIL; idlerCond: CONDITION ← [timeout: 0]; waiterCond: CONDITION ← [timeout: Process.MsecToTicks[minRetransmitMsecs]]; WaiterArray: TYPE = ARRAY PSB.PsbIndex OF Buffer.PupBuffer; waiterPkts: LONG POINTER TO WaiterArray ← Heap.systemZone.NEW[ WaiterArray ← ALL[NIL]]; wanting: PACKED ARRAY PSB.PsbIndex OF BOOLEAN ← ALL[FALSE]; -- PSB expects a pkt -- SetWanting: ENTRY PROC [myPSB: PSB.PsbIndex] = INLINE {wanting[myPSB] ← TRUE}; ClearWanting: PROC [myPSB: PSB.PsbIndex] = INLINE BEGIN spare: Buffer.PupBuffer; InnerClear: ENTRY PROC = INLINE BEGIN wanting[myPSB] ← FALSE; IF (spare ← waiterPkts[myPSB]) # NIL THEN waiterPkts[myPSB] ← NIL; END; InnerClear[]; IF spare # NIL THEN GiveBackBuffer[spare] --ignore it, outside monitor-- ; END; MyReceive: ENTRY PROC [myPSB: PSB.PsbIndex, sentTime, waitTime: System.Pulses] RETURNS [recvd: Buffer.PupBuffer] = INLINE BEGIN ENABLE UNWIND => NULL; -- Returns NIL if no packet arrives within waitTime pulses after sentTime -- DO IF (recvd ← waiterPkts[myPSB]) = NIL THEN IF System.GetClockPulses[] - sentTime < waitTime THEN WAIT waiterCond ELSE EXIT ELSE {waiterPkts[myPSB] ← NIL; EXIT}; ENDLOOP; END; RecvdPktTooLong: ERROR = CODE; KillServer: ERROR = CODE; servers: CARDINAL ← 0; serversMax: CARDINAL ← 20; idlers: CARDINAL ← 0; idlersMax: CARDINAL = 6; idlersMin: CARDINAL = 2; -- Number of server processes will never exceed "serversMax". -- If number of idle servers exceeds "idlersMax", one will abort. -- If number of idle servers drops below "idlersMin", one is forked. IdleReceive: PUBLIC PROC [pkt: MesaRPCLupine.RPCPkt, maxlength: CARDINAL] = BEGIN b: Buffer.PupBuffer; InnerIdler: ENTRY PROC = INLINE BEGIN IF idlers >= idlersMax THEN { servers ← servers - 1; RETURN WITH ERROR KillServer[]}; idlers ← idlers + 1; DO WAIT idlerCond; IF idlerPkt # NIL THEN EXIT ENDLOOP; IF idlers < idlersMin AND servers < serversMax THEN { servers ← servers + 1; Process.Detach[FORK Server[]]}; b ← idlerPkt; BEGIN recvdHeader: LONG POINTER TO Header = LOOPHOLE[@b.pup.pupLength]; idlerPkt ← LOOPHOLE[idlerPkt.next, Buffer.PupBuffer]; b.next ← NIL; IF recvdHeader.length > maxlength THEN ERROR RecvdPktTooLong[] --NULL??-- ELSE Inline.LongCOPY[ from: @b.pup.pupLength, to: @pkt.header, nwords: recvdHeader.length]; END; END; InnerIdler[]; GiveBackBuffer[b]; --outside monitor-- END; QueuesScrambled: ERROR = CODE; GiveBackBuffer: PROC [b: Buffer.PupBuffer] = Buffer.ReturnBuffer; Listener: PROC = BEGIN -- Catch any packets that get as far as the Pup socket level -- soc: PupDefs.PupSocket = PupDefs.PupSocketMake[ local: RPCPrivate.rpcSocket, remote:, ticks: PupDefs.veryLongWait]; DO ENABLE ABORTED => EXIT; b: Buffer.PupBuffer = soc.get[]; IF NOT AcceptPkt[b] THEN Buffer.ReturnBuffer[b]; ENDLOOP; PupDefs.PupSocketDestroy[soc]; END; AcceptPkt: PROC[b: Buffer.PupBuffer] RETURNS[BOOLEAN] = { RETURN[ EnqueueRecvd[b] ] }; EnqueueAgain: PUBLIC PROC [b: Buffer.PupBuffer] = -- This is a procedure mainly for debugger breakpoints! -- {IF NOT EnqueueRecvd[b] THEN Buffer.ReturnBuffer[b]}; EnqueueRecvd: PUBLIC ENTRY PROC [b: Buffer.PupBuffer] RETURNS [BOOLEAN] = BEGIN -- despatch packet to appropriate RPC process, if any -- -- Packet is known to be a Pup, and is addressed to our socket. -- header: LONG POINTER TO Header = LOOPHOLE[@b.pup.pupLength]; IF --header.destHost = myHost AND-- header.type.subType = rpc THEN BEGIN destPSB: PSB.PsbIndex = header.destPSB; recvd ← recvd + 1; IF destPSB NOT IN (PSB.PsbNull..LAST[PSB.PsbIndex]] OR NOT wanting[destPSB] THEN -- give to idle process to deal with -- -- Pkts are dealt with LIFO by idlers, but it doesn't matter (much) BEGIN IF idlers = 0 THEN -- server too busy: throw it away! -- RETURN[FALSE] ELSE { b.next ← idlerPkt; idlerPkt ← b; idlers ← idlers - 1; NOTIFY idlerCond}; END ELSE -- someone wants this packet: give them it -- BEGIN IF waiterPkts[destPSB] # NIL THEN RETURN[FALSE]; waiterPkts[destPSB] ← b; BROADCAST waiterCond; END; RETURN[TRUE] END ELSE RETURN[FALSE]; END; listenerProcess: PROCESS; Server: PROC = BEGIN RPCInternal.ServerMain[ ! KillServer => CONTINUE]; END; -- ******** Initialization ******** -- Initialize: ENTRY PROC = BEGIN myAddr: PupTypes.PupAddress; PupDefs.PupPackageMake[]; myAddr ← PupDefs.GetLocalPupAddress[RPCPrivate.rpcSocket, NIL]; myHost ← [myAddr.net, myAddr.host]; START RPCInternal.RPCBinding; -- exports "RPCInternal.exportTable" -- START RPCInternal.RPCSecurity; -- exports "RPCInternal.firstConversation" -- START RPCInternal.RPCPktStreams; -- initialize connection states -- servers ← servers + 1; Process.Detach[FORK Server[]]; listenerProcess ← FORK Listener[]; END; Initialize[]; END.