<> <> <> DIRECTORY Process USING [SetTimeout, MsecToTicks], ProcessorFace USING [GetClockPulses], StatsDefs USING [StatBump, StatIncr], CommFlags USING [doDebug, doStats], DriverDefs USING [MaybeGetFreePupBuffer, Glitch], PupStream USING [StreamClosing], PupPktDefs, PupPktOps, PupDefs USING [ Byte, PupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer, ReturnFreePupBuffer, PupRouterSendThis], BufferDefs USING [ReturnFreeBuffer], PupTypes USING [PupAddress, maxDataBytesPerGatewayPup]; PupPktHot: MONITOR LOCKS him USING him: Instance IMPORTS Process, ProcessorFace, StatsDefs, PupStream, DriverDefs, PupPktOps, PupDefs, BufferDefs EXPORTS PupPktDefs, PupPktOps = BEGIN OPEN StatsDefs, DriverDefs, PupPktOps, PupDefs; PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData; NeitherDataNorMark: PUBLIC ERROR = CODE; BufferAlreadyRequeued: PUBLIC ERROR = CODE; StreamNotOpen: PUBLIC ERROR = CODE; <> Diff: PROCEDURE [a, b: LONG INTEGER] RETURNS [INTEGER] = BEGIN maxInteger: INTEGER = 77777B; temp: LONG INTEGER _ a - b; SELECT TRUE FROM temp > maxInteger => RETURN[maxInteger]; temp < -maxInteger => RETURN[-maxInteger]; ENDCASE => RETURN[temp]; END; Retransmitter: PUBLIC ENTRY PROCEDURE[him: Instance] = <> <> BEGIN OPEN him; now: PupPktOps.Pulses; UNTIL pleaseDie DO now _ ProcessorFace.GetClockPulses[]; SELECT state FROM open => BEGIN IF now - timer > pingPulses AND ping THEN BEGIN probeCounter _ pingRetransmissions; allocatedPups _ 0; -- will start probing END; IF now - timer > ctlRetransmitPulses AND (outEnd = 0 OR allocatedPups = 0) THEN ProbeForAck[him]; END; talking, finishing => BEGIN DO <> IF sentBuffer = NIL THEN sentBuffer _ DequeuePup[sentQueue]; IF sentBuffer = NIL THEN EXIT; IF ackedID - Flip[sentBuffer.pupID] > 0 THEN BEGIN -- this packet has been ACKed already IF (unackedPups _ unackedPups - 1) = 0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state _ open; finishing => state _ end; ENDCASE; BROADCAST stateChange; END; ReturnFreePupBuffer[sentBuffer]; sentBuffer _ NIL; END ELSE EXIT; ENDLOOP; IF sentBuffer # NIL AND now - timer > retransmitPulses THEN BEGIN ProbeForAck[him]; IF now - timer > retransmitPulses THEN BEGIN -- couldn't get buffer, use one of ours IF sentBuffer.pupType = data THEN sentBuffer.pupType _ aData; timer _ ProcessorFace.GetClockPulses[]; PupRouterSendThis[sentBuffer]; IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted]; sentBuffer _ DequeuePup[sentQueue]; END; END; END; halfOpen => IF now - timer > ctlRetransmitPulses THEN SendRfc[him]; end => IF now - timer > ctlRetransmitPulses THEN SendEnd[him]; closed => BEGIN DO <> IF sentBuffer = NIL THEN sentBuffer _ DequeuePup[sentQueue]; IF sentBuffer = NIL THEN EXIT; unackedPups _ unackedPups - 1; ReturnFreePupBuffer[sentBuffer]; sentBuffer _ NIL; ENDLOOP; END; ENDCASE; IF outIntPending AND now - outIntTime > ctlRetransmitPulses THEN SendInt[him]; WAIT retransmitterReady; ENDLOOP; UNTIL unackedPups = 0 DO DO <> IF sentBuffer = NIL THEN sentBuffer _ DequeuePup[sentQueue]; IF sentBuffer = NIL THEN EXIT; unackedPups _ unackedPups - 1; ReturnFreePupBuffer[sentBuffer]; sentBuffer _ NIL; ENDLOOP; <> IF unackedPups # 0 THEN WAIT retransmitterReady; ENDLOOP; END; ProbeForAck: INTERNAL PROCEDURE[him: Instance] = BEGIN OPEN him; b: PupBuffer; IF (b _ MaybeGetFreePupBuffer[]) = NIL THEN RETURN; b.pupLength _ bytesPerPupHeader; b.pupType _ aData; b.pupID _ Flop[nextOutputID]; b.source _ local; b.dest _ remote; timer _ ProcessorFace.GetClockPulses[]; aDataOut _ FALSE; clumpsSinceBump _ 0; PupRouterSendThis[b]; IF CommFlags.doStats THEN StatIncr[statProbesSent]; IF (probeCounter _ probeCounter + 1) > retransmitionsBeforeAbort THEN SmashClosed[him, transmissionTimeout]; IF probeCounter > probesBeforePanic THEN retransmitPulses _ MIN[2*retransmitPulses, maxRetransmitPulses]; END; ThrottleForward: PROCEDURE[him: Instance] = BEGIN OPEN him; old: CARDINAL _ pathMaxAllocate; clumpsSinceBump _ 0; <> IF retransmitPulses = maxRetransmitPulses THEN RETURN; pathMaxAllocate _ MIN[pathMaxAllocate + 1, myMaxAllocate]; retransmitPulses _ (retransmitPulses*pathMaxAllocate)/old; END; ThrottleBack: INTERNAL PROCEDURE[him: Instance] = BEGIN OPEN him; old: CARDINAL; IF pathMaxAllocate = 1 THEN BEGIN -- This is a desperate attempt to avoid an instability <> pause: CONDITION; Process.SetTimeout[@pause, Process.MsecToTicks[maxRetransmitTime]]; WAIT pause; END; UNTIL throttle = 0 OR retransmitPulses = minRetransmitPulses OR pathMaxAllocate = 1 DO old _ pathMaxAllocate; pathMaxAllocate _ pathMaxAllocate - 1; <> <> <> throttle _ throttle - 1; ENDLOOP; clumpsSinceBump _ throttle _ 0; END; SendAck: INTERNAL PROCEDURE[him: Instance] = BEGIN OPEN him; b: PupBuffer; IF c # NIL THEN BEGIN b _ c; c _ NIL; END ELSE IF (b _ MaybeGetFreePupBuffer[]) = NIL THEN RETURN; b.pupBody _ ack[ dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]], byteAllocate]; allocatedID _ nextInputID + byteAllocate; b.pupType _ ack; b.pupID _ Flop[nextInputID]; b.pupLength _ bytesPerAck; b.source _ local; b.dest _ remote; PupRouterSendThis[b]; IF CommFlags.doStats THEN StatIncr[statAcksSent]; IF state = open AND outEnd # 0 AND allocatedPups # 0 THEN timer _ ProcessorFace.GetClockPulses[]; -- avoid pinging sendAck _ FALSE; END; Get: PUBLIC ENTRY PROCEDURE[him: Instance] RETURNS [b: PupBuffer] = BEGIN OPEN him; ENABLE UNWIND => NULL; IF inputQueue.length = 0 THEN SELECT TRUE FROM (state = closed) => ERROR PupStream.StreamClosing[whyClosed, text]; dontWait => RETURN[NIL]; ENDCASE => WAIT inputReady; IF inputQueue.length # 0 THEN b _ DequeuePup[inputQueue] ELSE b _ NIL; IF b = NIL THEN BEGIN IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text]; RETURN; END; IF CommFlags.doStats THEN SELECT b.pupType FROM data, aData => BEGIN StatIncr[statDataPacketsReceived]; StatBump[statDataBytesReceived, b.pupLength - bytesPerPupHeader]; END; mark, aMark => StatIncr[statMarksReceived]; ENDCASE => Glitch[NeitherDataNorMark]; IF inputQueue.length = 0 AND sendAck THEN SendAck[him]; END; PktsAvailable: PUBLIC ENTRY PROCEDURE[him: Instance] RETURNS [BOOLEAN] = BEGIN OPEN him; ENABLE UNWIND => NULL; RETURN[ inputQueue.length # 0 ] END; Put: PUBLIC PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; IF CommFlags.doDebug AND b.requeueProcedure # BufferDefs.ReturnFreeBuffer THEN Glitch[BufferAlreadyRequeued]; SELECT state FROM open, talking => BEGIN IF CommFlags.doStats THEN BEGIN StatIncr[statDataPacketsSent]; StatBump[statDataBytesSent, b.pupLength - bytesPerPupHeader]; END; SendPacket[him, b]; END; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[him, b]; END; PutMark: PUBLIC PROCEDURE [him: Instance, byte: Byte] = BEGIN OPEN him; b: PupBuffer; b _ GetFreePupBuffer[]; b.pupBytes[0] _ byte; b.pupLength _ bytesPerPupHeader + 1; b.pupType _ aMark; SendPacket[him, b]; IF CommFlags.doStats THEN StatIncr[statMarksSent]; END; SendPacket: ENTRY PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; SELECT state FROM open, talking => BEGIN b.pupID _ Flop[nextOutputID]; IF b.pupLength = bytesPerPupHeader THEN probeCounter _ 1 ELSE BEGIN b.requeueProcedure _ LOOPHOLE[PutOnSentQueue]; b.requeueRef _ him; -- BEWARE: we rely on having another reference to the REF state _ talking; unackedPups _ unackedPups + 1; END; b.source _ local; b.dest _ remote; nextOutputID _ nextOutputID + (b.pupLength - bytesPerPupHeader); IF b.pupType = data -- we don't use mark, only aMark AND ~((maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups) THEN b.pupType _ aData; IF b.pupType # data THEN -- aMark or aData BEGIN timer _ ProcessorFace.GetClockPulses[]; aDataOut _ TRUE; END; PupRouterSendThis[b]; IF b.pupType # data THEN WaitToSend[him ! UNWIND => NULL]; END; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[him, b ! UNWIND => NULL]; END; WaitToSend: INTERNAL PROCEDURE[him: Instance] = BEGIN OPEN him; SELECT state FROM open, talking => BEGIN <> DO SELECT state FROM open => EXIT; talking => WAIT stateChange; ENDCASE => StreamDied[him, NIL]; ENDLOOP; <> UNTIL (maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups DO SELECT state FROM <> open, talking => WAIT stateChange; ENDCASE => StreamDied[him, NIL]; ENDLOOP; IF throttle > 0 THEN ThrottleBack[him]; END; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[him, NIL]; END; PutOnSentQueue: PROCEDURE [b: PupBuffer] = { ReallyPutOnSentQueue[NARROW[b.requeueRef], b] }; ReallyPutOnSentQueue: ENTRY PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; IF ackedID > Flip[b.pupID] THEN BEGIN IF (unackedPups _ unackedPups - 1) = 0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state _ open; finishing => state _ end; ENDCASE => SendAbort[him]; BROADCAST stateChange; END; ReturnFreePupBuffer[b]; END ELSE IF sentBuffer = NIL THEN sentBuffer _ b ELSE EnqueuePup[sentQueue, b]; END; Slurp: PUBLIC PROCEDURE[him: Instance] = BEGIN OPEN him; b: PupBuffer; UNTIL pleaseDie DO b _ socket.get[]; IF b # NIL THEN InputPacket[him, b]; ENDLOOP; END; InputPacket: ENTRY PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; thisID: LONG INTEGER _ Flip[b.pupID]; ackWantedBefore: BOOLEAN = sendAck; c _ b; IF ~(b.pupType = rfc OR b.pupType = error) AND b.source.socket # remote.socket THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadSource]; ReturnFreePupBuffer[b]; c _ NIL; END; SELECT b.pupType FROM data, aData, mark, aMark => SELECT state FROM open, talking, end => BEGIN SELECT b.pupType FROM aData, aMark => sendAck _ TRUE; ENDCASE; IF b.pupLength > bytesPerPupHeader THEN BEGIN offset: INTEGER _ Diff[thisID, nextInputID]; SELECT offset FROM 0 => BEGIN -- nice - just what we wanted nextInputID _ nextInputID + (c.pupLength - bytesPerPupHeader); EnqueuePup[inputQueue, c]; NOTIFY inputReady; c _ NIL; END; ENDCASE => BEGIN IF CommFlags.doStats THEN SELECT offset FROM IN (0..duplicateWindow] => StatIncr[statDataPacketsReceivedEarly]; IN (-duplicateWindow..0) => StatIncr[statDataPacketsReceivedAgain]; ENDCASE => StatIncr[statDataPacketsReceivedVeryLate]; ReturnFreePupBuffer[c]; c _ NIL; END; END ELSE -- funny length from way back there BEGIN IF CommFlags.doStats THEN IF b.pupLength = bytesPerPupHeader AND b.pupType = aData THEN BEGIN StatIncr[statProbesReceived]; SendAck[him]; END ELSE StatIncr[statEmptyFunnys]; END; <> IF sendAck AND (inputQueue.length = 0 OR ackWantedBefore) THEN SendAck[him]; END; halfOpen => SendRfc[him]; ENDCASE --idle, closed, finishing-- => SendAbort[him]; ack => BEGIN IF b.pupLength < bytesPerAck THEN BEGIN IF CommFlags.doStats THEN StatIncr[statMouseTrap]; GOTO SkipThisAck; END; IF CommFlags.doStats THEN StatIncr[statAcksReceived]; <> IF (thisID - ackedID) < 0 OR (thisID = ackedID AND probeCounter = 0) THEN BEGIN IF CommFlags.doStats THEN StatIncr[statDuplicateAcks]; GOTO SkipThisAck; END; probeCounter _ 0; IF aDataOut THEN BEGIN myRetrTime, responseTime: PupPktOps.Pulses; responseTime _ ProcessorFace.GetClockPulses[] - timer; responseTime _ MIN[responseTime, maxRetransmitPulses]; <> <> <> myRetrTime _ retransmitPulses; myRetrTime _ (6*myRetrTime + myRetrTime + 2*responseTime)/8; myRetrTime _ MAX[minRetransmitPulses, MIN[myRetrTime, maxRetransmitPulses]]; retransmitPulses _ myRetrTime; aDataOut _ FALSE; clumpsSinceBump _ clumpsSinceBump + 1; IF clumpsSinceBump > clumpsBeforeBump THEN ThrottleForward[him]; END; IF allocatedPups = 0 THEN BROADCAST stateChange; hisMaxAllocate _ b.numberOfPupsAhead; IF hisMaxAllocate = 0 THEN BEGIN probeCounter _ 1; IF CommFlags.doStats THEN StatIncr[statEmptyAlloc]; END; allocatedPups _ MIN[hisMaxAllocate, pathMaxAllocate]; IF outEnd = 0 THEN BROADCAST stateChange; -- in case first time outEnd _ MIN[b.maximumBytesPerPup, dataBytesPerPup]; IF ~sameNet THEN outEnd _ MIN[outEnd, PupTypes.maxDataBytesPerGatewayPup]; IF (thisID - maxOutputID) + b.numberOfBytesAhead > 0 THEN maxOutputID _ b.numberOfBytesAhead + thisID; ackedID _ thisID; IF sentBuffer = NIL THEN sentBuffer _ DequeuePup[sentQueue]; WHILE sentBuffer # NIL AND thisID > Flip[sentBuffer.pupID] DO IF (unackedPups _ unackedPups - 1) = 0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state _ open; finishing => state _ end; ENDCASE => SendAbort[him]; BROADCAST stateChange; END; ReturnFreePupBuffer[sentBuffer]; sentBuffer _ DequeuePup[sentQueue]; ENDLOOP; UNTIL sentBuffer = NIL DO PupRouterSendThis[sentBuffer]; IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted]; sentBuffer _ DequeuePup[sentQueue]; ENDLOOP; EXITS SkipThisAck => NULL; END; ENDCASE => GotOther[him, c]; IF c # NIL THEN BEGIN ReturnFreePupBuffer[c]; c _ NIL; END; END; SendAttention: PUBLIC ENTRY PROCEDURE[him: Instance] = BEGIN OPEN him; ENABLE UNWIND => NULL; WHILE outIntPending DO WAIT stateChange; ENDLOOP; outIntPending _ TRUE; SendInt[him]; END; WaitForAttention: PUBLIC ENTRY PROCEDURE[him: Instance] = BEGIN OPEN him; ENABLE UNWIND => NULL; WHILE seenIntSeq = inIntSeq DO WAIT waitingForInterrupt; ENDLOOP; seenIntSeq _ seenIntSeq + 1; END; END.