<> <> <> <> <> <> <> <<>> DIRECTORY BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses], Booting USING [RegisterProcs, RollbackProc], PrincOpsUtils USING [LongCopy, PsbHandleToIndex, ReadPSB], Process USING [Detach, MsecToTicks, priorityClient3, priorityForeground, SetPriority], PrincOps USING [PsbIndex, PsbNull], Pup USING [Address, Host, Net, Socket], PupBuffer USING [Buffer], PupHop USING [InitialTimeoutPulses], PupSocket USING [AllocBuffer, CreateServer, FreeBuffer, Get, GetMyAddress, Send, Socket, waitForever], PupSocketBackdoor USING [SetDirectReceive], PupWKS USING [rpc], RPC USING [CallFailed, unencrypted], RPCInternal USING [DecryptPkt, DoSignal, EncryptPkt, PktExchangeState, pktLengthOverhead, ReplyToRFA, RollbackRPCBinding, RollbackRPCPktStreams, RollbackRPCSecurity, ServerMain, StartRPCBinding, StartRPCPktStreams, StartRPCSecurity], RPCLupine USING [DataLength, Dispatcher, Header, RPCPkt], RPCPkt USING [CallCount, Header, Machine, PktID]; RPCPktIO: MONITOR IMPORTS BasicTime, Booting, PrincOpsUtils, Process, PupHop, PupSocket, PupSocketBackdoor, RPC, RPCInternal EXPORTS RPCInternal, RPCLupine SHARES RPCLupine = { Header: PUBLIC TYPE = RPCPkt.Header; ConcreteHeader: PROC [abstract: LONG POINTER TO RPCLupine.Header] RETURNS [LONG POINTER TO Header] = INLINE { RETURN [ abstract ]; }; callSequence: RPCPkt.CallCount _ 0; -- monotonic from this host -- <<******** sending and receiving packets ********>> myHost: RPCPkt.Machine; sent: CARDINAL _ 0; recvd: CARDINAL _ 0; retransmitted: CARDINAL _ 0; <<"PktExchange" implements a reliable packet exchange. There are five cases:>> <> <> <> < start of new call)>> <> <> minRetransmitMsecs: CARDINAL = 100; -- retransmit delay for local net -- msecsPerHop: CARDINAL = 500; -- approximate typical gateway hop delay? -- minPingMsecs: CARDINAL = 5000; -- initial interval between pings -- maxPingSecs: CARDINAL = 300; -- long-term ping interval -- maxTransmissions: CARDINAL _ 20; -- give up after too many transmissions -- signalTimeout: BOOL _ TRUE; -- debugging switch -- <> minRetransmitPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * minRetransmitMsecs]; pulsesPerHop: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * msecsPerHop]; minPingPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * minPingMsecs]; maxPingPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000]*LONG[1000] * maxPingSecs]; transmitLocalPkts: BOOL _ TRUE; <> shortWait: BasicTime.Pulses = -- 10 msecs -- BasicTime.MicrosecondsToPulses[LONG[1000] * 10]; longerWait: BasicTime.Pulses = -- 5 secs -- BasicTime.MicrosecondsToPulses[LONG[1000]*LONG[1000] * 5]; <> idlerPkt: PupBuffer.Buffer _ NIL; idlerCond: CONDITION _ [timeout:0]; waiterCond: CONDITION _ [timeout:Process.MsecToTicks[minRetransmitMsecs]]; waiterPkts: REF WaiterArray = NEW[WaiterArray _ ALL[NIL]]; WaiterArray: TYPE = ARRAY PrincOps.PsbIndex OF PupBuffer.Buffer; wanting: REF WantingArray = NEW[WantingArray _ ALL[FALSE]]; WantingArray: TYPE = PACKED ARRAY PrincOps.PsbIndex OF BOOL; <> <> <<>> RecvdPktTooLong: ERROR = CODE; KillServer: ERROR = CODE; servers: NAT _ 0; serversMax: NAT _ 20; idlers: NAT _ 0; idlersMax: NAT = 6; idlersMin: NAT = 2; PktExchange: PUBLIC PROC [inPkt: RPCLupine.RPCPkt, length: RPCLupine.DataLength, maxlength: RPCLupine.DataLength, state: RPCInternal.PktExchangeState, signalHandler: RPCLupine.Dispatcher _ NIL] RETURNS [newPkt: BOOL, newLength: RPCLupine.DataLength] = { <> <> outPkt: RPCLupine.RPCPkt _ inPkt; -- altered after a signal -- outPktFrame: POINTER _ NIL; -- DoSignal's local frame -- newLen: NAT _ 0; <> DO -- loop for signal handlers -- sentTime: BasicTime.Pulses; -- initialized after sending any packet -- NewCallNumber: ENTRY PROC RETURNS [RPCPkt.CallCount] = INLINE { RETURN [ callSequence _ callSequence+1 ]; }; reply: PupBuffer.Buffer; recvdHeader: LONG POINTER TO Header; myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]]; acked: BOOL; thisPktID: RPCPkt.PktID; header: LONG POINTER TO Header = @outPkt.header; pingPulses: BasicTime.Pulses _ minPingPulses; header.srceHost _ myHost; header.srceSoc _ PupWKS.rpc; header.srcePSB _ myPSB; IF header.pktID.pktSeq = 0 THEN { -- first packet of a call; yucky interface! header.type _ SELECT state FROM sending => [0, rpc, notEnd, pleaseAck, call], call => [0, rpc, end, dontAck, call], authReq => [0, rpc, end, dontAck, rfa], ENDCASE => ERROR; --receiving, endCall header.pktID.callSeq _ NewCallNumber[]; header.pktID.pktSeq _ 1; acked _ FALSE; } ELSE { header.type _ SELECT state FROM sending => [0, rpc, notEnd, pleaseAck, data], receiving => [0, rpc, end, dontAck, ack], call => [0, rpc, end, dontAck, data], endCall => [0, rpc, end, dontAck, data], ENDCASE => ERROR; --authReq IF state # receiving THEN { -- header.type.class = data header.pktID.pktSeq _ header.pktID.pktSeq+1; acked _ FALSE; } ELSE acked _ TRUE; }; thisPktID _ header.pktID; SetWanting[myPSB]; DO -- loop for pings -- ENABLE UNWIND => { ClearWanting[myPSB]; <> }; { transmissions: CARDINAL _ 0; retransmitPulses: BasicTime.Pulses; retransmitPulses _ PupHop.InitialTimeoutPulses[header.destHost.net, minRetransmitPulses]; IF retransmitPulses = 0 THEN retransmitPulses _ minRetransmitPulses; -- unreachable IF outPkt.convHandle # RPC.unencrypted THEN header.length _ RPCInternal.EncryptPkt[outPkt, length] ELSE header.length _ length + RPCInternal.pktLengthOverhead; header.oddByte _ no; DO -- loop for retransmissions -- { GeneralSend[outPkt]; sentTime _ BasicTime.GetClockPulses[]; sent _ sent+1; <> DO -- loop for each incoming packet (including garbage) -- reply _ MyReceive[ myPSB, sentTime, (IF acked THEN pingPulses ELSE retransmitPulses)]; IF reply = NIL THEN IF acked THEN GOTO ping ELSE { header.type.ack _ pleaseAck; GOTO retransmit }; recvdHeader _ LOOPHOLE[@reply.byteLength]; IF recvdHeader.type.class = rfa THEN { [] _ RPCInternal.ReplyToRFA[ reply, header--encrypted--, thisPktID--clear--, inPkt.convHandle]; <> LOOP; }; IF outPkt.convHandle # RPC.unencrypted AND recvdHeader.conv = header.conv AND recvdHeader.srceHost = header.destHost THEN { ok: BOOL; [ok, newLen] _ RPCInternal.DecryptPkt[recvdHeader, outPkt.convHandle]; IF ~ok AND recvdHeader.type.class#ack THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; } ELSE newLen _ recvdHeader.length - RPCInternal.pktLengthOverhead; IF recvdHeader.conv = header.conv AND recvdHeader.srceHost = header.destHost AND recvdHeader.pktID.activity = thisPktID.activity THEN -- pkt is related to our call SELECT TRUE FROM recvdHeader.pktID.callSeq = thisPktID.callSeq => -- pkt is in our call SELECT TRUE FROM <> recvdHeader.pktID.pktSeq = 1+thisPktID.pktSeq => { IF state = sending OR state = endCall THEN <> ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; SELECT recvdHeader.type.class FROM data => GOTO done; ack => { <> acked _ TRUE; GiveBackBuffer[reply]; }; ENDCASE => --call,rfa-- ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; }; <> recvdHeader.pktID.pktSeq = thisPktID.pktSeq => { SELECT recvdHeader.type.class FROM ack => { <> IF header.type.class = call THEN header.destPSB _ recvdHeader.srcePSB; acked _ TRUE; IF state = sending OR state = endCall THEN {GiveBackBuffer[reply]; reply_NIL; GOTO done} ELSE -- state = call, authReq, or receiving -- <> <> GiveBackBuffer[reply]; }; data, call => -- retransmisson of his packet -- IF state = receiving THEN IF recvdHeader.type.ack = pleaseAck THEN { GiveBackBuffer[reply]; reply _ NIL; GOTO retransmit -- we're already sending an ack -- } ELSE GiveBackBuffer[reply] ELSE ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; ENDCASE => --rfa -- ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; }; <> recvdHeader.pktID.pktSeq < thisPktID.pktSeq => GiveBackBuffer[reply]; -- no need to ack it -- <> ENDCASE => ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; recvdHeader.pktID.callSeq > thisPktID.callSeq AND state=endCall => { IF recvdHeader.type.class # call THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; <> GOTO done } ENDCASE => { <> recvdHeader.destPSB _ PrincOps.PsbNull; EnqueueAgain[reply]; } ELSE { <> recvdHeader.destPSB _ PrincOps.PsbNull; EnqueueAgain[reply]; }; <> ENDLOOP--for each incoming packet--; EXITS retransmit => { transmissions _ transmissions+1; IF (transmissions = maxTransmissions AND signalTimeout) OR state = authReq <> THEN { SIGNAL RPC.CallFailed[timeout]; transmissions _ 0 }; retransmitted _ retransmitted+1; retransmitPulses _ retransmitPulses + minRetransmitPulses; }; }; ENDLOOP-- for each transmission --; EXITS ping => { header.type _ [0, rpc, end, pleaseAck, ack]; length _ 0; header.pktID _ thisPktID; acked _ FALSE; pingPulses _ MIN[maxPingPulses, pingPulses * 2]; }; }; <> REPEAT done => { <> ClearWanting[myPSB]; <> IF reply = NIL THEN { --restore clear pktID-- header.pktID _ thisPktID; RETURN [FALSE, newLen] } ELSE { IF recvdHeader.outcome = signal AND state = call AND signalHandler # NIL THEN [outPkt, length, outPktFrame] _ RPCInternal.DoSignal[reply, newLen, signalHandler, inPkt.convHandle] ELSE { IF newLen > maxlength THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; PrincOpsUtils.LongCopy[ from: recvdHeader, to: @inPkt.header, nwords: recvdHeader.length]; GiveBackBuffer[reply]; RETURN [TRUE, newLen] }; } }; ENDLOOP-- for each ping--; <> ENDLOOP-- for signal handlers --; <> }; GeneralSend: PROC [pkt: RPCLupine.RPCPkt] = { b: PupBuffer.Buffer = PupSocket.AllocBuffer[socket]; PrincOpsUtils.LongCopy[from: @(pkt.header), to: @(b.byteLength), nwords: ConcreteHeader[@pkt.header].length]; PupSocket.Send[socket, b, b.dest]; }; SetWanting: ENTRY PROC [myPSB: PrincOps.PsbIndex] = INLINE { wanting[myPSB] _ TRUE; }; ClearWanting: PROC [myPSB: PrincOps.PsbIndex] = INLINE { spare: PupBuffer.Buffer; InnerClear: ENTRY PROC = INLINE { wanting[myPSB] _ FALSE; IF (spare _ waiterPkts[myPSB]) # NIL THEN waiterPkts[myPSB] _ NIL; }; InnerClear[]; IF spare # NIL THEN GiveBackBuffer[spare] --ignore it, outside monitor--; }; MyReceive: ENTRY PROC [myPSB: PrincOps.PsbIndex, sentTime, waitTime: BasicTime.Pulses] RETURNS [recvd: PupBuffer.Buffer] = INLINE { ENABLE UNWIND => NULL; <> DO IF (recvd _ waiterPkts[myPSB]) = NIL THEN IF BasicTime.GetClockPulses[] - sentTime < waitTime THEN WAIT waiterCond ELSE EXIT ELSE { waiterPkts[myPSB] _ NIL; EXIT }; ENDLOOP; }; IdleReceive: PUBLIC PROC [pkt: RPCLupine.RPCPkt, maxlength: CARDINAL] = { b: PupBuffer.Buffer; InnerIdler: ENTRY PROC = { IF idlers >= idlersMax THEN { servers _ servers-1; RETURN WITH ERROR KillServer[] }; idlers _ idlers + 1; DO WAIT idlerCond; IF idlerPkt # NIL THEN EXIT; ENDLOOP; IF idlers < idlersMin AND servers < serversMax THEN { servers_servers+1; Process.Detach[FORK Server[]]; }; b _ idlerPkt; { recvdHeader: LONG POINTER TO Header = LOOPHOLE[@b.byteLength]; idlerPkt _ NARROW[idlerPkt.ovh.next]; b.ovh.next _ NIL; IF recvdHeader.length > maxlength THEN ERROR RecvdPktTooLong[]; --NULL?? PrincOpsUtils.LongCopy[ from: @b.byteLength, to: @pkt.header, nwords: recvdHeader.length]; }; }; InnerIdler[]; GiveBackBuffer[b];--outside monitor-- }; QueuesScrambled: ERROR = CODE; EnqueueRecvd: ENTRY SAFE PROC [socket: PupSocket.Socket, b: PupBuffer.Buffer, foo: REF ANY] RETURNS [PupBuffer.Buffer] = TRUSTED { -- SAFE/TRUSTED For PupSocketBackdoor <> header: LONG POINTER TO Header = LOOPHOLE[@b.byteLength]; IF header.type.subType = rpc THEN { destPSB: PrincOps.PsbIndex = header.destPSB; recvd _ recvd+1; IF ~(destPSB IN (PrincOps.PsbNull..LAST[PrincOps.PsbIndex]]) OR ~wanting[destPSB] THEN { <> <> IF idlers = 0 THEN RETURN[b]; -- too busy, drop the packet b.ovh.next _ idlerPkt; idlerPkt _ b; idlers _ idlers-1; NOTIFY idlerCond; } ELSE { <> IF waiterPkts[destPSB] # NIL THEN RETURN [b]; waiterPkts[destPSB] _ b; BROADCAST waiterCond; }; RETURN[NIL] } ELSE RETURN[b]; }; EnqueueAgain: PUBLIC PROC [b: PupBuffer.Buffer] = { b _ EnqueueRecvd[socket, b, NIL]; IF b # NIL THEN GiveBackBuffer[b]; }; socket: PupSocket.Socket; Listener: PROC = { Process.SetPriority[Process.priorityClient3]; -- Same as Drivers <> socket _ PupSocket.CreateServer[ local: PupWKS.rpc, sendBuffers: 10, recvBuffers: 10, getTimeout: PupSocket.waitForever]; PupSocketBackdoor.SetDirectReceive[socket, EnqueueRecvd, NIL]; DO b: PupBuffer.Buffer _ PupSocket.Get[socket]; b _ EnqueueRecvd[socket, b, NIL]; IF b # NIL THEN GiveBackBuffer[b]; ENDLOOP; }; AllocBuffer: PUBLIC PROC RETURNS [b: PupBuffer.Buffer] = { RETURN[PupSocket.AllocBuffer[socket]]; }; SendBuffer: PUBLIC PROC [b: PupBuffer.Buffer] = { PupSocket.Send[socket, b, b.dest]; }; GiveBackBuffer: PUBLIC PROC [b: PupBuffer.Buffer] = { PupSocket.FreeBuffer[b]; }; listenerProcess: PROCESS; Server: PROC = { Process.SetPriority[Process.priorityForeground]; RPCInternal.ServerMain[ ! KillServer => CONTINUE ]; }; <<******** Initialization ********>> Initialize: ENTRY PROC = { myAddr: Pup.Address _ PupSocket.GetMyAddress[]; myHost _ [myAddr.net, myAddr.host]; RPCInternal.StartRPCBinding[myHost]; -- exports "RPCInternal.exportTable" RPCInternal.StartRPCSecurity[myHost]; -- exports "RPCInternal.firstConversation" RPCInternal.StartRPCPktStreams[myHost]; -- initialize connection states Booting.RegisterProcs[r: Rollback]; servers _ servers+1; Process.Detach[FORK Server[]]; listenerProcess _ FORK Listener[]; }; Rollback: Booting.RollbackProc = TRUSTED { InnerRollback: ENTRY PROC = TRUSTED { RPCInternal.RollbackRPCPktStreams[]; -- forget connection counts RPCInternal.RollbackRPCSecurity[]; -- invalidate local conversations RPCInternal.RollbackRPCBinding[]; -- nothing? }; InnerRollback[]; }; Initialize[]; }. <> <> <> <> <> <>