<> <> <> <> <> <> <<>> DIRECTORY Process USING [SetTimeout, MsecToTicks], ProcessorFace USING [GetClockPulses], StatsDefs USING [StatBump, StatIncr], CommFlags USING [doDebug, doStats], DriverDefs USING [MaybeGetFreePupBuffer, Glitch], PupStream USING [StreamClosing], PupPktDefs, PupPktOps, PupDefs USING [Byte, PupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer, ReturnFreePupBuffer, PupRouterSendThis], BufferDefs USING [ReturnFreeBuffer], PupTypes USING [PupAddress, maxDataBytesPerGatewayPup]; PupPktHot: MONITOR LOCKS him USING him: Instance IMPORTS Process, ProcessorFace, StatsDefs, PupStream, DriverDefs, PupPktOps, PupDefs, BufferDefs EXPORTS PupPktDefs, PupPktOps = { OPEN StatsDefs, DriverDefs, PupPktOps, PupDefs; PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData; NeitherDataNorMark: PUBLIC ERROR = CODE; BufferAlreadyRequeued: PUBLIC ERROR = CODE; StreamNotOpen: PUBLIC ERROR = CODE; <> Diff: PROC [a, b: INT] RETURNS [INTEGER] = { maxInteger: INTEGER = 77777B; temp: INT _ a - b; SELECT TRUE FROM temp > maxInteger => RETURN[maxInteger]; temp < -maxInteger => RETURN[-maxInteger]; ENDCASE => RETURN[temp]; }; Retransmitter: PUBLIC ENTRY PROC [him: Instance] = { <> <> UNTIL him.pleaseDie DO now: PupPktOps.Pulses _ ProcessorFace.GetClockPulses[]; SELECT him.state FROM open => { IF now - him.timer > pingPulses AND him.ping THEN { him.probeCounter _ pingRetransmissions; him.allocatedPups _ 0; -- will start probing }; IF now - him.timer > ctlRetransmitPulses AND (him.outEnd = 0 OR him.allocatedPups = 0) THEN ProbeForAck[him]; }; talking, finishing => { DO <> IF him.sentBuffer = NIL THEN him.sentBuffer _ DequeuePup[him.sentQueue]; IF him.sentBuffer = NIL THEN EXIT; IF him.ackedID - Flip[him.sentBuffer.pupID] > 0 THEN { <> IF (him.unackedPups _ him.unackedPups - 1) = 0 THEN { <> SELECT him.state FROM talking => him.state _ open; finishing => him.state _ end; ENDCASE; BROADCAST him.stateChange; }; ReturnFreePupBuffer[him.sentBuffer]; him.sentBuffer _ NIL; } ELSE EXIT; ENDLOOP; IF him.sentBuffer # NIL AND now - him.timer > him.retransmitPulses THEN { ProbeForAck[him]; IF now - him.timer > him.retransmitPulses THEN { <> IF him.sentBuffer.pupType = data THEN him.sentBuffer.pupType _ aData; him.timer _ ProcessorFace.GetClockPulses[]; PupRouterSendThis[him.sentBuffer]; IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted]; him.sentBuffer _ DequeuePup[him.sentQueue]; }; }; }; halfOpen => IF now - him.timer > ctlRetransmitPulses THEN SendRfc[him]; end => IF now - him.timer > ctlRetransmitPulses THEN SendEnd[him]; closed => { DO <> IF him.sentBuffer = NIL THEN him.sentBuffer _ DequeuePup[him.sentQueue]; IF him.sentBuffer = NIL THEN EXIT; him.unackedPups _ him.unackedPups - 1; ReturnFreePupBuffer[him.sentBuffer]; him.sentBuffer _ NIL; ENDLOOP; }; ENDCASE; IF him.outIntPending AND now - him.outIntTime > ctlRetransmitPulses THEN SendInt[him]; WAIT him.retransmitterReady; ENDLOOP; UNTIL him.unackedPups = 0 DO DO <> IF him.sentBuffer = NIL THEN him.sentBuffer _ DequeuePup[him.sentQueue]; IF him.sentBuffer = NIL THEN EXIT; him.unackedPups _ him.unackedPups - 1; ReturnFreePupBuffer[him.sentBuffer]; him.sentBuffer _ NIL; ENDLOOP; <> IF him.unackedPups # 0 THEN WAIT him.retransmitterReady; ENDLOOP; }; ProbeForAck: INTERNAL PROC [him: Instance] = { b: PupBuffer _ MaybeGetFreePupBuffer[]; IF b = NIL THEN RETURN; b.pupLength _ bytesPerPupHeader; b.pupType _ aData; b.pupID _ Flop[him.nextOutputID]; b.source _ him.local; b.dest _ him.remote; him.timer _ ProcessorFace.GetClockPulses[]; him.aDataOut _ FALSE; him.clumpsSinceBump _ 0; PupRouterSendThis[b]; IF CommFlags.doStats THEN StatIncr[statProbesSent]; IF (him.probeCounter _ him.probeCounter + 1) > retransmitionsBeforeAbort THEN SmashClosed[him, transmissionTimeout]; IF him.probeCounter > probesBeforePanic THEN him.retransmitPulses _ MIN[2*him.retransmitPulses, maxRetransmitPulses]; }; ThrottleForward: PROC [him: Instance] = { old: CARDINAL _ him.pathMaxAllocate; him.clumpsSinceBump _ 0; <> IF him.retransmitPulses = maxRetransmitPulses THEN RETURN; him.pathMaxAllocate _ MIN[him.pathMaxAllocate + 1, him.myMaxAllocate]; him.retransmitPulses _ (him.retransmitPulses*him.pathMaxAllocate)/old; }; ThrottleBack: INTERNAL PROC [him: Instance] = { IF him.pathMaxAllocate = 1 THEN { <> <> pause: CONDITION; Process.SetTimeout[@pause, Process.MsecToTicks[maxRetransmitTime]]; WAIT pause; }; UNTIL him.throttle = 0 OR him.retransmitPulses = minRetransmitPulses OR him.pathMaxAllocate = 1 DO old: CARDINAL _ him.pathMaxAllocate; him.pathMaxAllocate _ him.pathMaxAllocate - 1; <> <> <> him.throttle _ him.throttle - 1; ENDLOOP; him.clumpsSinceBump _ him.throttle _ 0; }; SendAck: INTERNAL PROC [him: Instance] = { b: PupBuffer _ him.c; IF b # NIL THEN him.c _ NIL ELSE IF (b _ MaybeGetFreePupBuffer[]) = NIL THEN RETURN; b.pupBody _ ack[ him.dataBytesPerPup, MAX[0, INTEGER[him.myMaxAllocate - him.inputQueue.length]], byteAllocate]; him.allocatedID _ him.nextInputID + byteAllocate; b.pupType _ ack; b.pupID _ Flop[him.nextInputID]; b.pupLength _ bytesPerAck; b.source _ him.local; b.dest _ him.remote; PupRouterSendThis[b]; IF CommFlags.doStats THEN StatIncr[statAcksSent]; IF him.state = open AND him.outEnd # 0 AND him.allocatedPups # 0 THEN him.timer _ ProcessorFace.GetClockPulses[]; -- avoid pinging him.sendAck _ FALSE; }; Get: PUBLIC ENTRY PROC [him: Instance] RETURNS [b: PupBuffer _ NIL] = { ENABLE UNWIND => NULL; IF him.inputQueue.length = 0 THEN SELECT TRUE FROM (him.state = closed) => ERROR PupStream.StreamClosing[him.whyClosed, him.text]; him.dontWait => RETURN[NIL]; ENDCASE => WAIT him.inputReady; IF him.inputQueue.length # 0 THEN b _ DequeuePup[him.inputQueue]; IF b = NIL THEN { IF him.state = closed THEN ERROR PupStream.StreamClosing[him.whyClosed, him.text]; RETURN; }; IF CommFlags.doStats THEN SELECT b.pupType FROM data, aData => { StatIncr[statDataPacketsReceived]; StatBump[statDataBytesReceived, b.pupLength - bytesPerPupHeader]; }; mark, aMark => StatIncr[statMarksReceived]; ENDCASE => Glitch[NeitherDataNorMark]; IF him.inputQueue.length = 0 AND him.sendAck THEN SendAck[him]; }; PktsAvailable: PUBLIC ENTRY PROC [him: Instance] RETURNS [BOOL] = { ENABLE UNWIND => NULL; RETURN[ him.inputQueue.length # 0 ] }; Put: PUBLIC PROC [him: Instance, b: PupBuffer] = { IF CommFlags.doDebug AND b.requeueProcedure # BufferDefs.ReturnFreeBuffer THEN Glitch[BufferAlreadyRequeued]; SELECT him.state FROM open, talking => { IF CommFlags.doStats THEN { StatIncr[statDataPacketsSent]; StatBump[statDataBytesSent, b.pupLength - bytesPerPupHeader]; }; SendPacket[him, b]; }; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[him, b]; }; PutMark: PUBLIC PROC [him: Instance, byte: Byte] = { b: PupBuffer _ GetFreePupBuffer[]; b.pupBytes[0] _ byte; b.pupLength _ bytesPerPupHeader + 1; b.pupType _ aMark; SendPacket[him, b]; IF CommFlags.doStats THEN StatIncr[statMarksSent]; }; SendPacket: ENTRY PROC [him: Instance, b: PupBuffer] = { SELECT him.state FROM open, talking => { b.pupID _ Flop[him.nextOutputID]; IF b.pupLength = bytesPerPupHeader THEN him.probeCounter _ 1 ELSE { b.requeueProcedure _ LOOPHOLE[PutOnSentQueue]; b.requeueRef _ him; -- BEWARE: we rely on having another reference to the REF him.state _ talking; him.unackedPups _ him.unackedPups + 1; }; b.source _ him.local; b.dest _ him.remote; him.nextOutputID _ him.nextOutputID + (b.pupLength - bytesPerPupHeader); <> IF b.pupType = data AND ~((him.maxOutputID - him.nextOutputID) > 0 AND him.allocatedPups > him.unackedPups) THEN b.pupType _ aData; IF b.pupType # data THEN { <> him.timer _ ProcessorFace.GetClockPulses[]; him.aDataOut _ TRUE; }; PupRouterSendThis[b]; IF b.pupType # data THEN WaitToSend[him ! UNWIND => NULL]; }; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[him, b ! UNWIND => NULL]; }; WaitToSend: INTERNAL PROC [him: Instance] = { SELECT him.state FROM open, talking => { <> DO SELECT him.state FROM open => EXIT; talking => WAIT him.stateChange; ENDCASE => StreamDied[him, NIL]; ENDLOOP; <> UNTIL (him.maxOutputID - him.nextOutputID) > 0 AND him.allocatedPups > him.unackedPups DO SELECT him.state FROM <> open, talking => WAIT him.stateChange; ENDCASE => StreamDied[him, NIL]; ENDLOOP; IF him.throttle > 0 THEN ThrottleBack[him]; }; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[him, NIL]; }; PutOnSentQueue: PROC [b: PupBuffer] = { ReallyPutOnSentQueue[NARROW[b.requeueRef], b]; }; ReallyPutOnSentQueue: ENTRY PROC [him: Instance, b: PupBuffer] = { SELECT TRUE FROM him.ackedID > Flip[b.pupID] => { IF (him.unackedPups _ him.unackedPups - 1) = 0 THEN { <> SELECT him.state FROM talking => him.state _ open; finishing => him.state _ end; ENDCASE => SendAbort[him]; BROADCAST him.stateChange; }; ReturnFreePupBuffer[b]; }; him.sentBuffer = NIL => him.sentBuffer _ b; ENDCASE => EnqueuePup[him.sentQueue, b]; }; Slurp: PUBLIC PROC [him: Instance] = { UNTIL him.pleaseDie DO b: PupBuffer _ him.socket.get[]; IF b # NIL THEN InputPacket[him, b]; ENDLOOP; }; InputPacket: ENTRY PROC [him: Instance, b: PupBuffer] = { ENABLE UNWIND => NULL; thisID: INT _ Flip[b.pupID]; ackWantedBefore: BOOL = him.sendAck; him.c _ b; IF ~(b.pupType = rfc OR b.pupType = error) AND b.source.socket # him.remote.socket THEN { IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadSource]; GO TO freeC; }; SELECT b.pupType FROM data, aData, mark, aMark => SELECT him.state FROM open, talking, end => { SELECT b.pupType FROM aData, aMark => him.sendAck _ TRUE; ENDCASE; IF b.pupLength > bytesPerPupHeader THEN { offset: INTEGER _ Diff[thisID, him.nextInputID]; SELECT offset FROM 0 => { <> him.nextInputID _ him.nextInputID + (him.c.pupLength - bytesPerPupHeader); EnqueuePup[him.inputQueue, him.c]; NOTIFY him.inputReady; him.c _ NIL; }; ENDCASE => { IF CommFlags.doStats THEN SELECT offset FROM IN (0..duplicateWindow] => StatIncr[statDataPacketsReceivedEarly]; IN (-duplicateWindow..0) => StatIncr[statDataPacketsReceivedAgain]; ENDCASE => StatIncr[statDataPacketsReceivedVeryLate]; ReturnFreePupBuffer[him.c]; him.c _ NIL; }; } ELSE { <> IF CommFlags.doStats THEN IF b.pupLength = bytesPerPupHeader AND b.pupType = aData THEN { StatIncr[statProbesReceived]; SendAck[him]; } ELSE StatIncr[statEmptyFunnys]; }; <> IF him.sendAck AND (him.inputQueue.length = 0 OR ackWantedBefore) THEN SendAck[him]; }; halfOpen => SendRfc[him]; ENDCASE --idle, closed, finishing-- => SendAbort[him]; ack => { IF b.pupLength < bytesPerAck THEN { IF CommFlags.doStats THEN StatIncr[statMouseTrap]; GOTO SkipThisAck; }; IF CommFlags.doStats THEN StatIncr[statAcksReceived]; <> IF (thisID - him.ackedID) < 0 OR (thisID = him.ackedID AND him.probeCounter = 0) THEN { IF CommFlags.doStats THEN StatIncr[statDuplicateAcks]; GOTO SkipThisAck; }; him.probeCounter _ 0; IF him.aDataOut THEN { myRetrTime: PupPktOps.Pulses _ him.retransmitPulses; responseTime: PupPktOps.Pulses _ ProcessorFace.GetClockPulses[] - him.timer; responseTime _ MIN[responseTime, maxRetransmitPulses]; <> <> <> myRetrTime _ (6*myRetrTime + myRetrTime + 2*responseTime)/8; myRetrTime _ MAX[minRetransmitPulses, MIN[myRetrTime, maxRetransmitPulses]]; him.retransmitPulses _ myRetrTime; him.aDataOut _ FALSE; him.clumpsSinceBump _ him.clumpsSinceBump + 1; IF him.clumpsSinceBump > clumpsBeforeBump THEN ThrottleForward[him]; }; IF him.allocatedPups = 0 THEN BROADCAST him.stateChange; him.hisMaxAllocate _ b.numberOfPupsAhead; IF him.hisMaxAllocate = 0 THEN { him.probeCounter _ 1; IF CommFlags.doStats THEN StatIncr[statEmptyAlloc]; }; him.allocatedPups _ MIN[him.hisMaxAllocate, him.pathMaxAllocate]; IF him.outEnd = 0 THEN BROADCAST him.stateChange; -- in case first time him.outEnd _ MIN[b.maximumBytesPerPup, him.dataBytesPerPup]; IF ~him.sameNet THEN him.outEnd _ MIN[him.outEnd, PupTypes.maxDataBytesPerGatewayPup]; IF (thisID - him.maxOutputID) + b.numberOfBytesAhead > 0 THEN him.maxOutputID _ b.numberOfBytesAhead + thisID; him.ackedID _ thisID; IF him.sentBuffer = NIL THEN him.sentBuffer _ DequeuePup[him.sentQueue]; WHILE him.sentBuffer # NIL AND thisID > Flip[him.sentBuffer.pupID] DO IF (him.unackedPups _ him.unackedPups - 1) = 0 THEN { SELECT him.state FROM -- last packet has been ACKed talking => him.state _ open; finishing => him.state _ end; ENDCASE => SendAbort[him]; BROADCAST him.stateChange; }; ReturnFreePupBuffer[him.sentBuffer]; him.sentBuffer _ DequeuePup[him.sentQueue]; ENDLOOP; UNTIL him.sentBuffer = NIL DO PupRouterSendThis[him.sentBuffer]; IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted]; him.sentBuffer _ DequeuePup[him.sentQueue]; ENDLOOP; EXITS SkipThisAck => NULL; }; ENDCASE => GotOther[him, him.c]; IF him.c # NIL THEN GO TO freeC; EXITS freeC => { ReturnFreePupBuffer[him.c]; him.c _ NIL; }; }; SendAttention: PUBLIC ENTRY PROC [him: Instance] = { ENABLE UNWIND => NULL; WHILE him.outIntPending DO WAIT him.stateChange; ENDLOOP; him.outIntPending _ TRUE; SendInt[him]; }; WaitForAttention: PUBLIC ENTRY PROC [him: Instance] = { ENABLE UNWIND => NULL; WHILE him.seenIntSeq = him.inIntSeq DO WAIT him.waitingForInterrupt; ENDLOOP; him.seenIntSeq _ him.seenIntSeq + 1; }; }.