<> <> <> DIRECTORY BasicTime USING[ GetClockPulses, MicrosecondsToPulses, Pulses ], Booting USING[ CheckpointProc, RegisterProcs, RollbackProc ], BufferDefs USING[ PupBuffer ], CommUtilDefs USING[ AllocateIocb ], DriverDefs USING[ ChangeNumberOfInputBuffers ], DriverTypes USING[ Encapsulation, ethernetEncapsulationBytes, ethernetEncapsulationOffset ], EthernetOneFace USING[ ControlBlock, ControlBlockRecord, controlBlockSize, DeviceHandle, GetNextDevice, GetStatus, hearSelf, nullDeviceHandle, QueueOutput ], PrincOpsUtils USING[ Free, IsBound, LongCOPY, PsbHandleToIndex, ReadPSB ], Process USING[ Detach, MsecToTicks, Pause, Yield ], PrincOps USING[ PsbIndex, PsbNull ], PupDefs USING[ AnyLocalPupAddress, GetFreePupBuffer, GetHopsToNetwork, PupAddress, PupPackageMake, PupRouterSendThis, PupSocket, PupSocketDestroy, PupSocketMake, ReturnFreePupBuffer, veryLongWait], PupTypes USING[ PupAddress, PupHostID, PupNetID, PupSocketID ], RPC USING[ CallFailed, unencrypted ], RPCInternal USING[ DecryptPkt, DoSignal, EncryptPkt, ReplyToRFA, RPCBinding, RPCPktStreams, RPCSecurity, ServerMain ], RPCLupine USING[ DataLength, Dispatcher, Header, RPCPkt ], RPCPkt USING[ CallCount, Header, Machine, PktExchangeState, PktID, pktLengthOverhead ], RPCPrivate USING[ GetRPCPackets, rpcSocket, ReturnBuffer ], RPCWatch USING[]; RPCPktIO: MONITOR IMPORTS BasicTime, Booting, CommUtilDefs, DriverDefs, EthernetOneFace, PrincOpsUtils, Process, PupDefs, RPC, RPCInternal, RPCPrivate EXPORTS RPCLupine--Encapsulation,Header--, RPCPkt--PktExchange,IdleReceive--, RPCWatch--SetSpyProc-- SHARES BufferDefs, RPCLupine = BEGIN Encapsulation: PUBLIC TYPE = DriverTypes.Encapsulation; Header: PUBLIC TYPE = RPCPkt.Header; ConcreteHeader: PROC[abstract: LONG POINTER TO RPCLupine.Header] RETURNS[LONG POINTER TO Header] = INLINE { RETURN[ abstract ] }; callSequence: RPCPkt.CallCount _ 0; -- monotonic from this host -- <<******** IOCB management ********>> <> <> <> IOCBrecord: TYPE = RECORD[next: IOCB]; IOCB: TYPE = LONG POINTER TO IOCBrecord; iocbSize: CARDINAL = MAX[EthernetOneFace.controlBlockSize, SIZE[IOCBrecord]]; freeIOCBs: IOCB _ NIL; <<******** sending and receiving packets ********>> firstDevice: EthernetOneFace.DeviceHandle = EthernetOneFace.GetNextDevice[EthernetOneFace.nullDeviceHandle]; myAddr: PupTypes.PupAddress; myNet: PupTypes.PupNetID; myDeviceHost: PupTypes.PupHostID; myHost: RPCPkt.Machine; sent: CARDINAL _ 0; recvd: CARDINAL _ 0; retransmitted: CARDINAL _ 0; <<"PktExchange" implements a reliable packet exchange. There are five cases:>> <> <> <> < start of new call)>> <> <> minRetransmitMsecs: CARDINAL = 100; -- retransmit delay for local net -- msecsPerHop: CARDINAL = 500; -- approximate typical gateway hop delay? -- minPingMsecs: CARDINAL = 5000; -- initial interval between pings -- maxPingSecs: CARDINAL = 300; -- long-term ping interval -- maxTransmissions: CARDINAL _ 20; -- give up after too many transmissions -- signalTimeout: BOOLEAN _ TRUE; -- debugging switch -- <> minRetransmitPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * minRetransmitMsecs]; pulsesPerHop: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * msecsPerHop]; minPingPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000] * minPingMsecs]; maxPingPulses: BasicTime.Pulses = BasicTime.MicrosecondsToPulses[LONG[1000]*LONG[1000] * maxPingSecs]; transmitLocalPkts: BOOLEAN _ TRUE; <> shortWait: BasicTime.Pulses = -- 10 msecs -- BasicTime.MicrosecondsToPulses[LONG[1000] * 10]; longerWait: BasicTime.Pulses = -- 5 secs -- BasicTime.MicrosecondsToPulses[LONG[1000]*LONG[1000] * 5]; PktExchange: PUBLIC PROC[inPkt: RPCLupine.RPCPkt, length: RPCLupine.DataLength, maxlength: RPCLupine.DataLength, state: RPCPkt.PktExchangeState, signalHandler: RPCLupine.Dispatcher _ NIL] RETURNS[newPkt: BOOLEAN, newLength: RPCLupine.DataLength] = BEGIN <> <> outPkt: RPCLupine.RPCPkt _ inPkt; -- altered after a signal -- outPktFrame: POINTER _ NIL; -- DoSignal's local frame -- DO -- loop for signal handlers -- sentTime: BasicTime.Pulses; -- initialized after sending any packet -- iocb: IOCB _ NIL; <> GetIOCB: ENTRY PROC = INLINE BEGIN IF freeIOCBs # NIL THEN { iocb _ freeIOCBs; freeIOCBs _ freeIOCBs.next } ELSE { iocb _ CommUtilDefs.AllocateIocb[iocbSize] } END; ControlBlock: PROC RETURNS[ EthernetOneFace.ControlBlock ] = INLINE { RETURN[LOOPHOLE[iocb]] }; ReturnIOCB: PROC = INLINE BEGIN InnerReturn: ENTRY PROC = INLINE { iocb.next _ freeIOCBs; freeIOCBs _ iocb }; IF iocb # NIL THEN { WaitUntilSent[]; InnerReturn[] }; END; WaitUntilSent: PROC = INLINE BEGIN <> <<(a) a suitable response coming in before we've finished retransmitting our packet;>> <<(b) the standard EthernetOneDriver resetting the microcode (so it forgets our request).>> <> WHILE EthernetOneFace.GetStatus[ControlBlock[]] = pending DO IF BasicTime.GetClockPulses[] - sentTime < shortWait THEN Process.Yield[] ELSE IF BasicTime.GetClockPulses[] - sentTime < longerWait THEN Process.Pause[1] ELSE EXIT -- assume someone reset the microcode --; ENDLOOP; END; NewCallNumber: ENTRY PROC RETURNS[RPCPkt.CallCount] = INLINE { RETURN[ callSequence _ callSequence+1 ] }; reply: BufferDefs.PupBuffer; recvdHeader: LONG POINTER TO Header; myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]]; acked: BOOLEAN; thisPktID: RPCPkt.PktID; header: LONG POINTER TO Header = @outPkt.header; localHost: BOOLEAN = header.destHost = myHost; localNet: BOOLEAN = header.destHost.net = myNet; pingPulses: BasicTime.Pulses _ minPingPulses; header.srceHost _ myHost; header.srceSoc _ RPCPrivate.rpcSocket; header.srcePSB _ myPSB; IF header.pktID.pktSeq = 0 -- first packet of a call; yucky interface! -- THEN BEGIN header.type _ SELECT state FROM sending => [0, rpc, notEnd, pleaseAck, call], call => [0, rpc, end, dontAck, call], authReq => [0, rpc, end, dontAck, rfa], ENDCASE => --receiving, endCall-- ERROR; header.pktID.callSeq _ NewCallNumber[]; header.pktID.pktSeq _ 1; acked _ FALSE; END ELSE BEGIN header.type _ SELECT state FROM sending => [0, rpc, notEnd, pleaseAck, data], receiving => [0, rpc, end, dontAck, ack], call => [0, rpc, end, dontAck, data], endCall => [0, rpc, end, dontAck, data], ENDCASE => --authReq-- ERROR; IF state # receiving --header.type.class = data -- THEN { header.pktID.pktSeq _ header.pktID.pktSeq+1; acked_FALSE } ELSE acked _ TRUE; END; thisPktID _ header.pktID; SetWanting[myPSB]; DO -- loop for pings -- ENABLE UNWIND => { ClearWanting[myPSB]; ReturnIOCB[]; IF outPktFrame # NIL THEN PrincOpsUtils.Free[outPktFrame] }; BEGIN transmissions: CARDINAL _ 0; retransmitPulses: BasicTime.Pulses _ IF localNet THEN minRetransmitPulses ELSE minRetransmitPulses + pulsesPerHop * PupDefs.GetHopsToNetwork[header.destHost.net]; IF outPkt.convHandle # RPC.unencrypted THEN header.length _ RPCInternal.EncryptPkt[outPkt, length] ELSE header.length _ length + RPCPkt.pktLengthOverhead; header.oddByte _ no; DO -- loop for retransmissions -- BEGIN IF NOT localNet THEN GeneralSend[outPkt] ELSE BEGIN IF localHost AND NOT( EthernetOneFace.hearSelf AND transmitLocalPkts ) THEN LocalSend[outPkt]; IF NOT localHost OR transmitLocalPkts THEN BEGIN IF iocb = NIL THEN GetIOCB[] ELSE WaitUntilSent[]; -- Pup checksum -- outPkt.data[header.length-RPCPkt.pktLengthOverhead] _ 177777B; outPkt.encapsulation _ Encapsulation[ethernetOne[,,,,,, header.destHost.host, myDeviceHost, pup]]; EthernetOneFace.QueueOutput[firstDevice, @outPkt.encapsulation + DriverTypes.ethernetEncapsulationOffset, header.length + (1+DriverTypes.ethernetEncapsulationBytes)/2, ControlBlock[]] END; END; sentTime _ BasicTime.GetClockPulses[]; sent _ sent+1; -- wait for response: an ack or the reply or a timeout -- DO -- loop for each incoming packet (including garbage) -- reply _ MyReceive[myPSB, sentTime, (IF acked THEN pingPulses ELSE retransmitPulses)]; IF reply = NIL THEN IF acked THEN GOTO ping ELSE { header.type.ack _ pleaseAck; GOTO retransmit }; recvdHeader _ LOOPHOLE[@reply.pupLength]; IF recvdHeader.type.class = rfa THEN BEGIN IF RPCInternal.ReplyToRFA[ reply, header--encrypted--, thisPktID--clear--, inPkt.convHandle] THEN NULL-- can't set "acked", because we must retransmit our data packet until we obtain the correct destPSB from some ack pkt --; LOOP END; IF outPkt.convHandle # RPC.unencrypted AND recvdHeader.conv = header.conv AND recvdHeader.srceHost = header.destHost THEN [,newLength] _ RPCInternal.DecryptPkt[recvdHeader, outPkt.convHandle] ELSE newLength _ recvdHeader.length - RPCPkt.pktLengthOverhead; IF recvdHeader.conv = header.conv AND recvdHeader.srceHost = header.destHost AND recvdHeader.pktID.activity = thisPktID.activity THEN -- pkt is related to our call -- SELECT TRUE FROM recvdHeader.pktID.callSeq = thisPktID.callSeq => <> SELECT TRUE FROM <> recvdHeader.pktID.pktSeq = 1+thisPktID.pktSeq => BEGIN IF state = sending OR state = endCall THEN -- he's not allowed to generate that pktSeq! -- ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; SELECT recvdHeader.type.class FROM data => GOTO done; ack => BEGIN <> <> <> -- data pkt. -- acked _ TRUE; GiveBackBuffer[reply]; END; ENDCASE => --call,rfa-- ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; END; <> recvdHeader.pktID.pktSeq = thisPktID.pktSeq => BEGIN SELECT recvdHeader.type.class FROM ack => -- acknowledgement of our packet -- BEGIN IF header.type.class = call THEN header.destPSB _ recvdHeader.srcePSB; acked _ TRUE; IF state = sending OR state = endCall THEN {GiveBackBuffer[reply]; reply_NIL; GOTO done} ELSE -- state = call, authReq, or receiving -- <> -- because other end should send data pkt soon -- GiveBackBuffer[reply]; END; data, call => -- retransmisson of his packet -- IF state = receiving THEN IF recvdHeader.type.ack = pleaseAck THEN BEGIN GiveBackBuffer[reply]; reply _ NIL; GOTO retransmit -- we're already sending an ack -- END ELSE GiveBackBuffer[reply] ELSE ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; ENDCASE => --rfa -- ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; END; <> recvdHeader.pktID.pktSeq < thisPktID.pktSeq => GiveBackBuffer[reply]; -- no need to ack it -- <> ENDCASE => ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; recvdHeader.pktID.callSeq > thisPktID.callSeq AND state=endCall => BEGIN IF recvdHeader.type.class # call THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply] ]; <> GOTO done END ENDCASE => -- wrong call: let someone else do it -- { recvdHeader.destPSB _ PrincOps.PsbNull; EnqueueAgain[reply] } ELSE -- wrong conversation or activity: let someone else do it -- { recvdHeader.destPSB _ PrincOps.PsbNull; EnqueueAgain[reply] }; <> ENDLOOP--for each incoming packet--; EXITS retransmit => BEGIN transmissions _ transmissions+1; IF (transmissions = maxTransmissions AND signalTimeout) OR state = authReq <> THEN { SIGNAL RPC.CallFailed[timeout]; transmissions _ 0 }; retransmitted _ retransmitted+1; retransmitPulses _ retransmitPulses + minRetransmitPulses; END; END; ENDLOOP-- for each transmission --; EXITS ping => BEGIN header.type _ [0, rpc, end, pleaseAck, ack]; length _ 0; header.pktID _ thisPktID; acked _ FALSE; pingPulses _ MIN[maxPingPulses, pingPulses * 2]; END; END; <> REPEAT done => BEGIN -- This isn't covered by any UNWIND -- ClearWanting[myPSB]; ReturnIOCB[]; IF outPktFrame # NIL THEN PrincOpsUtils.Free[outPktFrame]; IF reply = NIL THEN { --restore clear pktID-- header.pktID _ thisPktID; RETURN[FALSE,] } ELSE BEGIN IF recvdHeader.outcome = signal AND state = call AND signalHandler # NIL THEN [outPkt, length, outPktFrame] _ RPCInternal.DoSignal[reply, newLength, signalHandler, inPkt.convHandle] ELSE BEGIN IF newLength > maxlength THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[reply]]; PrincOpsUtils.LongCOPY[from: recvdHeader, to: @inPkt.header, nwords: recvdHeader.length]; GiveBackBuffer[reply]; RETURN[TRUE,newLength] END; END END; ENDLOOP-- for each ping--; <> ENDLOOP-- for signal handlers --; <> END; GeneralSend: PROC[pkt: RPCLupine.RPCPkt] = BEGIN b: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; PrincOpsUtils.LongCOPY[from: @(pkt.header), to: @(b.pupLength), nwords: ConcreteHeader[@pkt.header].length]; PupDefs.PupRouterSendThis[b]; END; LocalSend: PROC[pkt: RPCLupine.RPCPkt] = BEGIN <> b: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; PrincOpsUtils.LongCOPY[from: @(pkt.header), to: @(b.pupLength), nwords: ConcreteHeader[@pkt.header].length]; IF NOT AcceptPkt[b] THEN PupDefs.ReturnFreePupBuffer[b]; END; idlerPkt: BufferDefs.PupBuffer _ NIL; idlerCond: CONDITION _ [timeout:0]; waiterCond: CONDITION _ [timeout:Process.MsecToTicks[minRetransmitMsecs]]; WaiterArray: TYPE = ARRAY PrincOps.PsbIndex OF BufferDefs.PupBuffer; waiterPkts: REF WaiterArray _ NEW[WaiterArray _ ALL[NIL]]; wanting: PACKED ARRAY PrincOps.PsbIndex OF BOOLEAN _ ALL[FALSE]; -- PSB expects a pkt -- SetWanting: ENTRY PROC[myPSB: PrincOps.PsbIndex] = INLINE { wanting[myPSB] _ TRUE }; ClearWanting: PROC[myPSB: PrincOps.PsbIndex] = INLINE BEGIN spare: BufferDefs.PupBuffer; InnerClear: ENTRY PROC = INLINE BEGIN wanting[myPSB] _ FALSE; IF (spare _ waiterPkts[myPSB]) # NIL THEN waiterPkts[myPSB] _ NIL; END; InnerClear[]; IF spare # NIL THEN GiveBackBuffer[spare] --ignore it, outside monitor--; END; MyReceive: ENTRY PROC[myPSB: PrincOps.PsbIndex, sentTime, waitTime: BasicTime.Pulses] RETURNS[recvd: BufferDefs.PupBuffer] = INLINE BEGIN ENABLE UNWIND => NULL; <> DO IF (recvd _ waiterPkts[myPSB]) = NIL THEN IF BasicTime.GetClockPulses[] - sentTime < waitTime THEN WAIT waiterCond ELSE EXIT ELSE { waiterPkts[myPSB] _ NIL; EXIT }; ENDLOOP; END; RecvdPktTooLong: ERROR = CODE; KillServer: ERROR = CODE; servers: CARDINAL _ 0; serversMax: CARDINAL _ 20; idlers: CARDINAL _ 0; idlersMax: CARDINAL = 6; idlersMin: CARDINAL = 2; <> IdleReceive: PUBLIC PROC[pkt: RPCLupine.RPCPkt, maxlength: CARDINAL] = BEGIN b: BufferDefs.PupBuffer; InnerIdler: ENTRY PROC = INLINE BEGIN IF idlers >= idlersMax THEN { servers_servers-1; RETURN WITH ERROR KillServer[] }; idlers _ idlers + 1; DO WAIT idlerCond; IF idlerPkt # NIL THEN EXIT ENDLOOP; IF idlers < idlersMin AND servers < serversMax THEN { servers_servers+1; Process.Detach[FORK Server[]] }; b _ idlerPkt; BEGIN recvdHeader: LONG POINTER TO Header = LOOPHOLE[@b.pupLength]; idlerPkt _ LOOPHOLE[idlerPkt.next,BufferDefs.PupBuffer]; b.next _ NIL; IF recvdHeader.length > maxlength THEN ERROR RecvdPktTooLong[] --NULL??-- ELSE PrincOpsUtils.LongCOPY[from: @b.pupLength, to: @pkt.header, nwords: recvdHeader.length]; END; END; InnerIdler[]; GiveBackBuffer[b];--outside monitor-- END; QueuesScrambled: ERROR = CODE; spyProc: PROC[BufferDefs.PupBuffer] _ NIL; SetSpyProc: PUBLIC ENTRY SAFE PROC[p: PROC[BufferDefs.PupBuffer]] = CHECKED { spyProc _ p }; EnqueueRecvd: ENTRY PROC[b: BufferDefs.PupBuffer, report: BOOLEAN] RETURNS[BOOLEAN] = BEGIN <> header: LONG POINTER TO Header = LOOPHOLE[@b.pupLength]; IF report AND spyProc # NIL THEN spyProc[b]; IF header.destHost = myHost AND header.type.subType = rpc THEN BEGIN destPSB: PrincOps.PsbIndex = header.destPSB; recvd _ recvd+1; IF destPSB NOT IN (PrincOps.PsbNull..LAST[PrincOps.PsbIndex]] OR NOT wanting[destPSB] THEN -- give to idle process to deal with -- <> BEGIN IF idlers = 0 THEN -- server too busy: throw it away! -- RETURN[FALSE] ELSE { b.next _ idlerPkt; idlerPkt _ b; idlers _ idlers-1; NOTIFY idlerCond }; END ELSE -- someone wants this packet: give them it -- BEGIN IF waiterPkts[destPSB] # NIL THEN RETURN[FALSE]; waiterPkts[destPSB] _ b; BROADCAST waiterCond; END; RETURN[TRUE] END ELSE RETURN[FALSE]; END; GiveBackBuffer: PROC[b: BufferDefs.PupBuffer] = <> IF PrincOpsUtils.IsBound[RPCPrivate.ReturnBuffer] THEN RPCPrivate.ReturnBuffer ELSE PupDefs.ReturnFreePupBuffer; AcceptPkt: PROC[b: BufferDefs.PupBuffer] RETURNS[BOOLEAN] = { RETURN[ EnqueueRecvd[b: b, report: TRUE] ] }; EnqueueAgain: PUBLIC PROC[b: BufferDefs.PupBuffer] = { IF NOT EnqueueRecvd[b: b, report: FALSE] THEN GiveBackBuffer[b] }; Listener: PROC = BEGIN <> soc: PupDefs.PupSocket = PupDefs.PupSocketMake[local: RPCPrivate.rpcSocket, remote:, ticks: PupDefs.veryLongWait]; DO ENABLE ABORTED => EXIT; b: BufferDefs.PupBuffer = soc.get[]; IF NOT AcceptPkt[b] THEN PupDefs.ReturnFreePupBuffer[b]; ENDLOOP; PupDefs.PupSocketDestroy[soc]; END; listenerProcess: PROCESS; Server: PROC = BEGIN RPCInternal.ServerMain[ ! KillServer => CONTINUE ]; END; <<******** Initialization ********>> Initialize: ENTRY PROC = BEGIN PupDefs.PupPackageMake[]; DriverDefs.ChangeNumberOfInputBuffers[TRUE]; myAddr _ PupDefs.AnyLocalPupAddress[RPCPrivate.rpcSocket]; myNet _ myAddr.net; myDeviceHost _ myAddr.host; myHost _ [myNet, myAddr.host]; START RPCInternal.RPCBinding; -- exports "RPCInternal.exportTable" -- START RPCInternal.RPCSecurity; -- exports "RPCInternal.firstConversation" -- START RPCInternal.RPCPktStreams; -- initialize connection states -- Booting.RegisterProcs[c: Checkpoint, r: Rollback]; servers _ servers+1; Process.Detach[FORK Server[]]; listenerProcess _ FORK Listener[]; END; Checkpoint: Booting.CheckpointProc = TRUSTED { RPCPrivate.GetRPCPackets[NIL] }; Rollback: Booting.RollbackProc = TRUSTED BEGIN InnerRollback: ENTRY PROC = TRUSTED BEGIN RESTART RPCInternal.RPCPktStreams; -- forget connection counts -- RESTART RPCInternal.RPCSecurity; -- invalidate local conversations -- RESTART RPCInternal.RPCBinding; -- nothing? -- END; InnerRollback[]; RPCPrivate.GetRPCPackets[AcceptPkt]; END; Initialize[]; RPCPrivate.GetRPCPackets[AcceptPkt]; END.