<> <> <> <> <> <> DIRECTORY BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses], Booting USING [CheckpointProc, RegisterProcs, RollbackProc], BufferDefs USING [PupBuffer], CommUtilDefs USING [AllocateIocb], DriverDefs USING [ChangeNumberOfInputBuffers], DriverTypes USING [Encapsulation, ethernetEncapsulationBytes, ethernetEncapsulationOffset], EthernetOneFace USING [ControlBlock, ControlBlockRecord, controlBlockSize, DeviceHandle, GetNextDevice, GetStatus, hearSelf, nullDeviceHandle, QueueOutput], PrincOpsUtils USING [IsBound, LongCopy, PsbHandleToIndex, ReadPSB], Process USING [Detach, MsecToTicks, Pause, Yield], PrincOps USING [PsbIndex, PsbNull], PupDefs USING [AnyLocalPupAddress, GetFreePupBuffer, GetHopsToNetwork, PupAddress, PupPackageMake, PupRouterSendThis, PupSocket, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, veryLongWait], PupRouterDefs USING [maxHop], PupTypes USING [PupAddress, PupHostID, PupNetID, PupSocketID], RPC USING [CallFailed, unencrypted], RPCInternal USING [DecryptPkt, DoSignal, EncryptPkt, ReplyToRFA, RPCBinding, RPCPktStreams, RPCSecurity, ServerMain], RPCLupine USING [DataLength, Dispatcher, Header, RPCPkt], RPCPkt USING [CallCount, Header, Machine, PktExchangeState, PktID, pktLengthOverhead], RPCPrivate USING [GetRPCPackets, rpcSocket, ReturnBuffer], RPCWatch USING []; RPCPktIO: MONITOR IMPORTS BasicTime, Booting, CommUtilDefs, DriverDefs, EthernetOneFace, PrincOpsUtils, Process, PupDefs, RPC, RPCInternal, RPCPrivate EXPORTS RPCLupine--Encapsulation,Header--, RPCPkt--PktExchange,IdleReceive--, RPCWatch--SetSpyProc-- SHARES BufferDefs, RPCLupine = { Encapsulation: PUBLIC TYPE = DriverTypes.Encapsulation; Header: PUBLIC TYPE = RPCPkt.Header; ConcreteHeader: PROC [abstract: LONG POINTER TO RPCLupine.Header] RETURNS [LONG POINTER TO Header] = INLINE { RETURN [ abstract ]; }; callSequence: RPCPkt.CallCount _ 0; -- monotonic from this host -- <<******** IOCB management ********>> <> <> <> IOCBrecord: TYPE = RECORD[next: IOCB]; IOCB: TYPE = LONG POINTER TO IOCBrecord; iocbSize: CARDINAL = MAX[EthernetOneFace.controlBlockSize, SIZE[IOCBrecord]]; freeIOCBs: IOCB _ NIL; <<******** sending and receiving packets ********>> firstDevice: EthernetOneFace.DeviceHandle = EthernetOneFace.GetNextDevice[EthernetOneFace.nullDeviceHandle]; myAddr: PupTypes.PupAddress; myNet: PupTypes.PupNetID; myDeviceHost: PupTypes.PupHostID; myHost: RPCPkt.Machine; sent: CARDINAL _ 0; recvd: CARDINAL _ 0; retransmitted: CARDINAL _ 0; <<"PktExchange" implements a reliable packet exchange. There are five cases:>> <> <> <> < start of new call)>> <> <> minRetransmitMsecs: CARDINAL = 100; -- retransmit delay for local net -- msecsPerHop: CARDINAL = 500; -- approximate typical gateway hop delay? -- minPingMsecs: CARDINAL = 5000; -- initial interval between pings -- maxPingSecs: CARDINAL = 300; -- long-term ping interval -- maxTransmissions: CARDINAL _ 20; -- give up after too many transmissions -- signalTimeout: BOOL _ TRUE; -- debugging switch -- <> minRetransmitPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * minRetransmitMsecs]; pulsesPerHop: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * msecsPerHop]; minPingPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * minPingMsecs]; maxPingPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000]*LONG[1000] * maxPingSecs]; transmitLocalPkts: BOOL _ TRUE; <> shortWait: BasicTime.Pulses = -- 10 msecs -- BasicTime.MicrosecondsToPulses[LONG[1000] * 10]; longerWait: BasicTime.Pulses = -- 5 secs -- BasicTime.MicrosecondsToPulses[LONG[1000]*LONG[1000] * 5]; <> idlerPkt: BufferDefs.PupBuffer _ NIL; idlerCond: CONDITION _ [timeout:0]; waiterCond: CONDITION _ [timeout:Process.MsecToTicks[minRetransmitMsecs]]; waiterPkts: REF WaiterArray = NEW[WaiterArray _ ALL[NIL]]; WaiterArray: TYPE = ARRAY PrincOps.PsbIndex OF BufferDefs.PupBuffer; wanting: REF WantingArray = NEW[WantingArray _ ALL[FALSE]]; WantingArray: TYPE = PACKED ARRAY PrincOps.PsbIndex OF BOOL; <> <> <<>> RecvdPktTooLong: ERROR = CODE; KillServer: ERROR = CODE; servers: CARDINAL _ 0; serversMax: CARDINAL _ 20; idlers: CARDINAL _ 0; idlersMax: NAT _ 4; idlersMin: NAT _ 2; PktExchange: PUBLIC PROC [inPkt: RPCLupine.RPCPkt, length: RPCLupine.DataLength, maxlength: RPCLupine.DataLength, state: RPCPkt.PktExchangeState, signalHandler: RPCLupine.Dispatcher _ NIL] RETURNS [newPkt: BOOL, newLength: RPCLupine.DataLength] = { <> <> outPkt: RPCLupine.RPCPkt _ inPkt; -- altered after a signal -- outPktFrame: POINTER _ NIL; -- DoSignal's local frame -- newLen: NAT _ 0; <> DO -- loop for signal handlers -- sentTime: BasicTime.Pulses; -- initialized after sending any packet -- iocb: IOCB _ NIL; <> GetIOCB: ENTRY PROC = INLINE { IF freeIOCBs # NIL THEN { iocb _ freeIOCBs; freeIOCBs _ freeIOCBs.next } ELSE { iocb _ CommUtilDefs.AllocateIocb[iocbSize] } }; ControlBlock: PROC RETURNS [ EthernetOneFace.ControlBlock ] = INLINE { RETURN [LOOPHOLE[iocb]]; }; ReturnIOCB: PROC = INLINE { InnerReturn: ENTRY PROC = INLINE { iocb.next _ freeIOCBs; freeIOCBs _ iocb; }; IF iocb # NIL THEN { WaitUntilSent[]; InnerReturn[] }; }; WaitUntilSent: PROC = INLINE { <> <<(a) a suitable response coming in before we've finished retransmitting our packet;>> <<(b) the standard EthernetOneDriver resetting the microcode (so it forgets our request).>> <> WHILE EthernetOneFace.GetStatus[ControlBlock[]] = pending DO IF BasicTime.GetClockPulses[] - sentTime < shortWait THEN Process.Yield[] ELSE IF BasicTime.GetClockPulses[] - sentTime < longerWait THEN Process.Pause[1] ELSE EXIT -- assume someone reset the microcode --; ENDLOOP; }; NewCallNumber: ENTRY PROC RETURNS [RPCPkt.CallCount] = INLINE { RETURN [ callSequence _ callSequence+1 ]; }; reply: BufferDefs.PupBuffer; recvdHeader: LONG POINTER TO Header; myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]]; acked: BOOL; thisPktID: RPCPkt.PktID; header: LONG POINTER TO Header = @outPkt.header; localHost: BOOL = header.destHost = myHost; localNet: BOOL = header.destHost.net = myNet; pingPulses: BasicTime.Pulses _ minPingPulses; header.srceHost _ myHost; header.srceSoc _ RPCPrivate.rpcSocket; header.srcePSB _ myPSB; IF header.pktID.pktSeq = 0 -- first packet of a call; yucky interface! -- THEN { header.type _ SELECT state FROM sending => [0, rpc, notEnd, pleaseAck, call], call => [0, rpc, end, dontAck, call], authReq => [0, rpc, end, dontAck, rfa], ENDCASE => --receiving, endCall-- ERROR; header.pktID.callSeq _ NewCallNumber[]; header.pktID.pktSeq _ 1; acked _ FALSE; } ELSE { header.type _ SELECT state FROM sending => [0, rpc, notEnd, pleaseAck, data], receiving => [0, rpc, end, dontAck, ack], call => [0, rpc, end, dontAck, data], endCall => [0, rpc, end, dontAck, data], ENDCASE => --authReq-- ERROR; IF state # receiving --header.type.class = data -- THEN { header.pktID.pktSeq _ header.pktID.pktSeq+1; acked_FALSE } ELSE acked _ TRUE; }; thisPktID _ header.pktID; SetWanting[myPSB]; DO -- loop for pings -- ENABLE UNWIND => { ClearWanting[myPSB]; ReturnIOCB[]; <> }; { foo: CARDINAL _ 0; transmissions: CARDINAL _ 0; hops: CARDINAL _ IF localNet THEN 0 ELSE PupDefs.GetHopsToNetwork[header.destHost.net]; retransmitPulses: BasicTime.Pulses _ minRetransmitPulses; <> IF hops IN [1..PupRouterDefs.maxHop] THEN retransmitPulses _ minRetransmitPulses + pulsesPerHop * hops; IF outPkt.convHandle # RPC.unencrypted THEN header.length _ RPCInternal.EncryptPkt[outPkt, length] ELSE header.length _ length + RPCPkt.pktLengthOverhead; header.oddByte _ no; DO -- loop for retransmissions -- { IF NOT localNet OR firstDevice = EthernetOneFace.nullDeviceHandle THEN GeneralSend[outPkt] ELSE { IF localHost AND NOT( EthernetOneFace.hearSelf AND transmitLocalPkts ) THEN LocalSend[outPkt]; IF NOT localHost OR transmitLocalPkts THEN { IF iocb = NIL THEN GetIOCB[] ELSE WaitUntilSent[]; -- Pup checksum -- outPkt.data[header.length-RPCPkt.pktLengthOverhead] _ 177777B; outPkt.encapsulation _ Encapsulation[ethernetOne[,,,,,, header.destHost.host, myDeviceHost, pup]]; EthernetOneFace.QueueOutput[firstDevice, @outPkt.encapsulation + DriverTypes.ethernetEncapsulationOffset, header.length + (1+DriverTypes.ethernetEncapsulationBytes)/2, ControlBlock[]] }; }; sentTime _ BasicTime.GetClockPulses[]; sent _ sent+1; <> DO -- loop for each incoming packet (including garbage) -- reply _ MyReceive[ myPSB, sentTime, (IF acked THEN pingPulses ELSE retransmitPulses)]; IF reply = NIL THEN IF acked THEN GOTO ping ELSE { header.type.ack _ pleaseAck; GOTO retransmit }; recvdHeader _ LOOPHOLE[@reply.pupLength]; IF recvdHeader.type.class = rfa THEN { [] _ RPCInternal.ReplyToRFA[ reply, header--encrypted--, thisPktID--clear--, inPkt.convHandle]; <> LOOP; }; IF outPkt.convHandle # RPC.unencrypted AND recvdHeader.conv = header.conv AND recvdHeader.srceHost = header.destHost THEN { ok: BOOL; [ok, newLen] _ RPCInternal.DecryptPkt[recvdHeader, outPkt.convHandle]; IF ~ok AND recvdHeader.type.class#ack THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; } ELSE newLen _ recvdHeader.length - RPCPkt.pktLengthOverhead; IF recvdHeader.conv = header.conv AND recvdHeader.srceHost = header.destHost AND recvdHeader.pktID.activity = thisPktID.activity THEN -- pkt is related to our call -- SELECT TRUE FROM recvdHeader.pktID.callSeq = thisPktID.callSeq => <> SELECT TRUE FROM <> recvdHeader.pktID.pktSeq = 1+thisPktID.pktSeq => { IF state = sending OR state = endCall THEN <> ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; SELECT recvdHeader.type.class FROM data => GOTO done; ack => { <> acked _ TRUE; GiveBackBuffer[reply]; }; ENDCASE => --call,rfa-- ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; }; <> recvdHeader.pktID.pktSeq = thisPktID.pktSeq => { SELECT recvdHeader.type.class FROM ack => { <> IF header.type.class = call THEN header.destPSB _ recvdHeader.srcePSB; acked _ TRUE; IF state = sending OR state = endCall THEN {GiveBackBuffer[reply]; reply_NIL; GOTO done} ELSE -- state = call, authReq, or receiving -- <> <> GiveBackBuffer[reply]; }; data, call => -- retransmisson of his packet -- IF state = receiving THEN IF recvdHeader.type.ack = pleaseAck THEN { GiveBackBuffer[reply]; reply _ NIL; GOTO retransmit -- we're already sending an ack -- } ELSE GiveBackBuffer[reply] ELSE ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; ENDCASE => --rfa -- ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; }; <> recvdHeader.pktID.pktSeq < thisPktID.pktSeq => GiveBackBuffer[reply]; -- no need to ack it -- <> ENDCASE => ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; recvdHeader.pktID.callSeq > thisPktID.callSeq AND state=endCall => { IF recvdHeader.type.class # call THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; <> GOTO done } ENDCASE => { <> recvdHeader.destPSB _ PrincOps.PsbNull; EnqueueAgain[reply]; } ELSE { <> recvdHeader.destPSB _ PrincOps.PsbNull; EnqueueAgain[reply]; }; <> ENDLOOP--for each incoming packet--; EXITS retransmit => { transmissions _ transmissions+1; IF (transmissions = maxTransmissions AND signalTimeout) OR state = authReq <> THEN { SIGNAL RPC.CallFailed[timeout]; transmissions _ 0 }; retransmitted _ retransmitted+1; retransmitPulses _ retransmitPulses + minRetransmitPulses; }; }; ENDLOOP-- for each transmission --; EXITS ping => { header.type _ [0, rpc, end, pleaseAck, ack]; length _ 0; header.pktID _ thisPktID; acked _ FALSE; pingPulses _ MIN[maxPingPulses, pingPulses * 2]; }; }; <> REPEAT done => { <> ClearWanting[myPSB]; ReturnIOCB[]; <> IF reply = NIL THEN { --restore clear pktID-- header.pktID _ thisPktID; RETURN [FALSE, newLen] } ELSE { IF recvdHeader.outcome = signal AND state = call AND signalHandler # NIL THEN [outPkt, length, outPktFrame] _ RPCInternal.DoSignal[reply, newLen, signalHandler, inPkt.convHandle] ELSE { IF newLen > maxlength THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; PrincOpsUtils.LongCopy[ from: recvdHeader, to: @inPkt.header, nwords: recvdHeader.length]; GiveBackBuffer[reply]; RETURN [TRUE, newLen] }; } }; ENDLOOP-- for each ping--; <> ENDLOOP-- for signal handlers --; <> }; GeneralSend: PROC [pkt: RPCLupine.RPCPkt] = { b: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; PrincOpsUtils.LongCopy[from: @(pkt.header), to: @(b.pupLength), nwords: ConcreteHeader[@pkt.header].length]; PupDefs.PupRouterSendThis[b]; }; LocalSend: PROC [pkt: RPCLupine.RPCPkt] = { <> b: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; PrincOpsUtils.LongCopy[ from: @(pkt.header), to: @(b.pupLength), nwords: ConcreteHeader[@pkt.header].length]; IF NOT AcceptPkt[b] THEN PupDefs.ReturnFreePupBuffer[b]; }; SetWanting: ENTRY PROC [myPSB: PrincOps.PsbIndex] = INLINE { wanting[myPSB] _ TRUE; }; ClearWanting: PROC [myPSB: PrincOps.PsbIndex] = INLINE { spare: BufferDefs.PupBuffer; InnerClear: ENTRY PROC = INLINE { wanting[myPSB] _ FALSE; IF (spare _ waiterPkts[myPSB]) # NIL THEN waiterPkts[myPSB] _ NIL; }; InnerClear[]; IF spare # NIL THEN GiveBackBuffer[spare] --ignore it, outside monitor--; }; MyReceive: ENTRY PROC [myPSB: PrincOps.PsbIndex, sentTime, waitTime: BasicTime.Pulses] RETURNS [recvd: BufferDefs.PupBuffer] = INLINE { ENABLE UNWIND => NULL; <> DO IF (recvd _ waiterPkts[myPSB]) = NIL THEN IF BasicTime.GetClockPulses[] - sentTime < waitTime THEN WAIT waiterCond ELSE EXIT ELSE { waiterPkts[myPSB] _ NIL; EXIT }; ENDLOOP; }; IdleReceive: PUBLIC PROC [pkt: RPCLupine.RPCPkt, maxlength: CARDINAL] = { b: BufferDefs.PupBuffer; InnerIdler: ENTRY PROC = { IF idlers >= idlersMax THEN { servers_servers-1; RETURN WITH ERROR KillServer[] }; idlers _ idlers + 1; DO WAIT idlerCond; IF idlerPkt # NIL THEN EXIT ENDLOOP; IF idlers < idlersMin AND servers < serversMax THEN { servers_servers+1; Process.Detach[FORK Server[]]; }; b _ idlerPkt; { recvdHeader: LONG POINTER TO Header = LOOPHOLE[@b.pupLength]; idlerPkt _ LOOPHOLE[idlerPkt.next, BufferDefs.PupBuffer]; b.next _ NIL; IF recvdHeader.length > maxlength THEN ERROR RecvdPktTooLong[] --NULL??-- ELSE PrincOpsUtils.LongCopy[ from: @b.pupLength, to: @pkt.header, nwords: recvdHeader.length]; }; }; InnerIdler[]; GiveBackBuffer[b];--outside monitor-- }; QueuesScrambled: ERROR = CODE; spyProc: PROC [BufferDefs.PupBuffer] _ NIL; SetSpyProc: PUBLIC ENTRY SAFE PROC [p: PROC [BufferDefs.PupBuffer]] = CHECKED { spyProc _ p; }; EnqueueRecvd: ENTRY PROC [b: BufferDefs.PupBuffer, report: BOOL] RETURNS [BOOL] = { <> header: LONG POINTER TO Header = LOOPHOLE[@b.pupLength]; IF report AND spyProc # NIL THEN spyProc[b]; IF header.destHost = myHost AND header.type.subType = rpc THEN { destPSB: PrincOps.PsbIndex = header.destPSB; recvd _ recvd+1; IF destPSB NOT IN (PrincOps.PsbNull..LAST[PrincOps.PsbIndex]] OR NOT wanting[destPSB] THEN { <> <> IF idlers = 0 THEN RETURN [FALSE]; -- too busy, drop the packet b.next _ idlerPkt; idlerPkt _ b; idlers _ idlers-1; NOTIFY idlerCond; } ELSE { <> IF waiterPkts[destPSB] # NIL THEN RETURN [FALSE]; waiterPkts[destPSB] _ b; BROADCAST waiterCond; }; RETURN [TRUE] } ELSE RETURN [FALSE]; }; GiveBackBuffer: PROC [b: BufferDefs.PupBuffer] = <> IF PrincOpsUtils.IsBound[LOOPHOLE[RPCPrivate.ReturnBuffer]] THEN RPCPrivate.ReturnBuffer ELSE PupDefs.ReturnFreePupBuffer; AcceptPkt: PROC [b: BufferDefs.PupBuffer] RETURNS [BOOL] = { RETURN [ EnqueueRecvd[b: b, report: TRUE] ]; }; EnqueueAgain: PUBLIC PROC [b: BufferDefs.PupBuffer] = { IF NOT EnqueueRecvd[b: b, report: FALSE] THEN GiveBackBuffer[b]; }; Listener: PROC = { <> soc: PupDefs.PupSocket = PupDefs.PupSocketMake[ local: RPCPrivate.rpcSocket, remote:, ticks: PupDefs.veryLongWait]; DO ENABLE ABORTED => EXIT; b: BufferDefs.PupBuffer = soc.get[]; IF b = NIL THEN LOOP; <> IF NOT AcceptPkt[b] THEN PupDefs.ReturnFreePupBuffer[b]; ENDLOOP; PupDefs.PupSocketDestroy[soc]; }; listenerProcess: PROCESS; Server: PROC = { RPCInternal.ServerMain[ ! KillServer => CONTINUE ]; }; <<******** Initialization ********>> Initialize: ENTRY PROC = { PupDefs.PupPackageMake[]; DriverDefs.ChangeNumberOfInputBuffers[TRUE]; myAddr _ PupDefs.AnyLocalPupAddress[RPCPrivate.rpcSocket]; myNet _ myAddr.net; myDeviceHost _ myAddr.host; myHost _ [myNet, myAddr.host]; START RPCInternal.RPCBinding; -- exports "RPCInternal.exportTable" -- START RPCInternal.RPCSecurity; -- exports "RPCInternal.firstConversation" -- START RPCInternal.RPCPktStreams; -- initialize connection states -- Booting.RegisterProcs[c: Checkpoint, r: Rollback]; servers _ servers+1; Process.Detach[FORK Server[]]; listenerProcess _ FORK Listener[]; }; Checkpoint: Booting.CheckpointProc = TRUSTED { RPCPrivate.GetRPCPackets[NIL]; }; Rollback: Booting.RollbackProc = TRUSTED { InnerRollback: ENTRY PROC = TRUSTED { RESTART RPCInternal.RPCPktStreams; -- forget connection counts -- RESTART RPCInternal.RPCSecurity; -- invalidate local conversations -- RESTART RPCInternal.RPCBinding; -- nothing? -- }; InnerRollback[]; RPCPrivate.GetRPCPackets[AcceptPkt]; }; Initialize[]; RPCPrivate.GetRPCPackets[AcceptPkt]; }. <> <> <> <> <> <>