DIRECTORY
Process USING [SetTimeout, MsecToTicks],
ProcessorFace USING [GetClockPulses],
StatsDefs USING [StatBump, StatIncr],
CommFlags USING [doDebug, doStats],
DriverDefs USING [MaybeGetFreePupBuffer, Glitch],
PupStream USING [StreamClosing],
PupPktDefs,
PupPktOps,
PupDefs
USING [
Byte, PupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer,
ReturnFreePupBuffer, PupRouterSendThis],
BufferDefs USING [ReturnFreeBuffer],
PupTypes USING [PupAddress, maxDataBytesPerGatewayPup];
PupPktHot:
MONITOR
LOCKS him
USING him: Instance
IMPORTS
Process, ProcessorFace, StatsDefs, PupStream, DriverDefs, PupPktOps, PupDefs, BufferDefs
EXPORTS PupPktDefs, PupPktOps =
BEGIN OPEN StatsDefs, DriverDefs, PupPktOps, PupDefs;
PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData;
NeitherDataNorMark: PUBLIC ERROR = CODE;
BufferAlreadyRequeued: PUBLIC ERROR = CODE;
StreamNotOpen: PUBLIC ERROR = CODE;
Only called by InputPacket
Diff:
PROCEDURE [a, b:
LONG
INTEGER]
RETURNS [
INTEGER] =
BEGIN
maxInteger: INTEGER = 77777B;
temp: LONG INTEGER ← a - b;
SELECT
TRUE
FROM
temp > maxInteger => RETURN[maxInteger];
temp < -maxInteger => RETURN[-maxInteger];
ENDCASE => RETURN[temp];
END;
Retransmitter:
PUBLIC
ENTRY
PROCEDURE[him: Instance] =
Retransmit things which have not been acknowledged in a reasonable time.
Such things include RFCs and ENDs as well as data.
BEGIN OPEN him;
now: PupPktOps.Pulses;
UNTIL pleaseDie
DO
now ← ProcessorFace.GetClockPulses[];
SELECT state
FROM
open =>
BEGIN
IF now - timer > pingPulses
AND ping
THEN
BEGIN
probeCounter ← pingRetransmissions;
allocatedPups ← 0; -- will start probing
END;
IF now - timer > ctlRetransmitPulses
AND
(outEnd = 0 OR allocatedPups = 0) THEN ProbeForAck[him];
END;
talking, finishing =>
BEGIN
DO
recycle things that have timed out
IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[sentQueue];
IF sentBuffer = NIL THEN EXIT;
IF ackedID - Flip[sentBuffer.pupID] > 0
THEN
BEGIN -- this packet has been ACKed already
IF (unackedPups ← unackedPups - 1) = 0
THEN
BEGIN
SELECT state FROM -- last packet has been ACKed
talking => state ← open;
finishing => state ← end;
ENDCASE;
BROADCAST stateChange;
END;
ReturnFreePupBuffer[sentBuffer];
sentBuffer ← NIL;
END
ELSE EXIT;
ENDLOOP;
IF sentBuffer #
NIL
AND now - timer > retransmitPulses
THEN
BEGIN
ProbeForAck[him];
IF now - timer > retransmitPulses
THEN
BEGIN -- couldn't get buffer, use one of ours
IF sentBuffer.pupType = data THEN sentBuffer.pupType ← aData;
timer ← ProcessorFace.GetClockPulses[];
PupRouterSendThis[sentBuffer];
IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
sentBuffer ← DequeuePup[sentQueue];
END;
END;
END;
halfOpen => IF now - timer > ctlRetransmitPulses THEN SendRfc[him];
end => IF now - timer > ctlRetransmitPulses THEN SendEnd[him];
closed =>
BEGIN
DO
flush anything left on the sentQueue
IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[sentQueue];
IF sentBuffer = NIL THEN EXIT;
unackedPups ← unackedPups - 1;
ReturnFreePupBuffer[sentBuffer];
sentBuffer ← NIL;
ENDLOOP;
END;
ENDCASE;
IF outIntPending
AND now - outIntTime > ctlRetransmitPulses
THEN
SendInt[him];
WAIT retransmitterReady;
ENDLOOP;
UNTIL unackedPups = 0
DO
DO
flush anything left on the sentQueue
IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[sentQueue];
IF sentBuffer = NIL THEN EXIT;
unackedPups ← unackedPups - 1;
ReturnFreePupBuffer[sentBuffer];
sentBuffer ← NIL;
ENDLOOP;
If a buffer is on a device output queue, wait until it comes back.
IF unackedPups # 0 THEN WAIT retransmitterReady;
ENDLOOP;
END;
ProbeForAck:
INTERNAL
PROCEDURE[him: Instance] =
BEGIN OPEN him;
b: PupBuffer;
IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
b.pupLength ← bytesPerPupHeader;
b.pupType ← aData;
b.pupID ← Flop[nextOutputID];
b.source ← local;
b.dest ← remote;
timer ← ProcessorFace.GetClockPulses[];
aDataOut ← FALSE;
clumpsSinceBump ← 0;
PupRouterSendThis[b];
IF CommFlags.doStats THEN StatIncr[statProbesSent];
IF (probeCounter ← probeCounter + 1) > retransmitionsBeforeAbort
THEN
SmashClosed[him, transmissionTimeout];
IF probeCounter > probesBeforePanic
THEN
retransmitPulses ←
MIN[2*retransmitPulses, maxRetransmitPulses];
END;
ThrottleForward:
PROCEDURE[him: Instance] =
BEGIN OPEN him;
old: CARDINAL ← pathMaxAllocate;
clumpsSinceBump ← 0;
We can actually get one packet ahead at this point.
IF retransmitPulses = maxRetransmitPulses THEN RETURN;
pathMaxAllocate ← MIN[pathMaxAllocate + 1, myMaxAllocate];
retransmitPulses ← (retransmitPulses*pathMaxAllocate)/old;
END;
ThrottleBack:
INTERNAL
PROCEDURE[him: Instance] =
BEGIN OPEN him;
old: CARDINAL;
IF pathMaxAllocate = 1
THEN
BEGIN -- This is a desperate attempt to avoid an instability
It is/(was?) also a nasty bug under some strange case that Andrew found
pause: CONDITION;
Process.SetTimeout[@pause, Process.MsecToTicks[maxRetransmitTime]];
WAIT pause;
END;
UNTIL throttle = 0
OR retransmitPulses = minRetransmitPulses
OR
pathMaxAllocate = 1 DO
old ← pathMaxAllocate;
pathMaxAllocate ← pathMaxAllocate - 1;
Beware of rounding down, assume return ack takes as long as a send packet
This goes unstable if much of the time is due to somebody else's packets
retransmitPulses ← ((retransmitPulses+old)*old)/(old+1);
throttle ← throttle - 1;
ENDLOOP;
clumpsSinceBump ← throttle ← 0;
END;
SendAck:
INTERNAL
PROCEDURE[him: Instance] =
BEGIN OPEN him;
b: PupBuffer;
IF c # NIL THEN BEGIN b ← c; c ← NIL; END
ELSE IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
b.pupBody ← ack[
dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]],
byteAllocate];
allocatedID ← nextInputID + byteAllocate;
b.pupType ← ack;
b.pupID ← Flop[nextInputID];
b.pupLength ← bytesPerAck;
b.source ← local;
b.dest ← remote;
PupRouterSendThis[b];
IF CommFlags.doStats THEN StatIncr[statAcksSent];
IF state = open
AND outEnd # 0
AND allocatedPups # 0
THEN
timer ← ProcessorFace.GetClockPulses[]; -- avoid pinging
sendAck ← FALSE;
END;
Get:
PUBLIC
ENTRY
PROCEDURE[him: Instance]
RETURNS [b: PupBuffer] =
BEGIN OPEN him;
ENABLE UNWIND => NULL;
IF inputQueue.length = 0
THEN
SELECT
TRUE
FROM
(state = closed) => ERROR PupStream.StreamClosing[whyClosed, text];
dontWait => RETURN[NIL];
ENDCASE => WAIT inputReady;
IF inputQueue.length # 0 THEN b ← DequeuePup[inputQueue] ELSE b ← NIL;
IF b =
NIL
THEN
BEGIN
IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text];
RETURN;
END;
IF CommFlags.doStats
THEN
SELECT b.pupType
FROM
data, aData =>
BEGIN
StatIncr[statDataPacketsReceived];
StatBump[statDataBytesReceived, b.pupLength - bytesPerPupHeader];
END;
mark, aMark => StatIncr[statMarksReceived];
ENDCASE => Glitch[NeitherDataNorMark];
IF inputQueue.length = 0 AND sendAck THEN SendAck[him];
END;
PktsAvailable:
PUBLIC
ENTRY
PROCEDURE[him: Instance]
RETURNS [
BOOLEAN] =
BEGIN OPEN him;
ENABLE UNWIND => NULL;
RETURN[ inputQueue.length # 0 ]
END;
Put:
PUBLIC
PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
IF CommFlags.doDebug
AND b.requeueProcedure # BufferDefs.ReturnFreeBuffer
THEN
Glitch[BufferAlreadyRequeued];
SELECT state
FROM
open, talking =>
BEGIN
IF CommFlags.doStats
THEN
BEGIN
StatIncr[statDataPacketsSent];
StatBump[statDataBytesSent, b.pupLength - bytesPerPupHeader];
END;
SendPacket[him, b];
END;
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, b];
END;
PutMark:
PUBLIC
PROCEDURE [him: Instance, byte: Byte] =
BEGIN OPEN him;
b: PupBuffer;
b ← GetFreePupBuffer[];
b.pupBytes[0] ← byte;
b.pupLength ← bytesPerPupHeader + 1;
b.pupType ← aMark;
SendPacket[him, b];
IF CommFlags.doStats THEN StatIncr[statMarksSent];
END;
SendPacket:
ENTRY
PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
SELECT state
FROM
open, talking =>
BEGIN
b.pupID ← Flop[nextOutputID];
IF b.pupLength = bytesPerPupHeader THEN probeCounter ← 1
ELSE
BEGIN
b.requeueProcedure ← LOOPHOLE[PutOnSentQueue];
b.requeueRef ← him; -- BEWARE: we rely on having another reference to the REF
state ← talking;
unackedPups ← unackedPups + 1;
END;
b.source ← local;
b.dest ← remote;
nextOutputID ← nextOutputID + (b.pupLength - bytesPerPupHeader);
IF b.pupType = data
-- we don't use mark, only aMark
AND ~((maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups)
THEN b.pupType ← aData;
IF b.pupType # data
THEN
-- aMark or aData
BEGIN timer ← ProcessorFace.GetClockPulses[]; aDataOut ← TRUE; END;
PupRouterSendThis[b];
IF b.pupType # data THEN WaitToSend[him ! UNWIND => NULL];
END;
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, b ! UNWIND => NULL];
END;
WaitToSend:
INTERNAL
PROCEDURE[him: Instance] =
BEGIN OPEN him;
SELECT state
FROM
open, talking =>
BEGIN
Wait until all our packets have been acked so we don't shoot them down.
DO
SELECT state
FROM
open => EXIT;
talking => WAIT stateChange;
ENDCASE => StreamDied[him, NIL];
ENDLOOP;
now wait for allocate
UNTIL (maxOutputID - nextOutputID) > 0
AND allocatedPups > unackedPups
DO
SELECT state
FROM
not really, but .....
open, talking => WAIT stateChange;
ENDCASE => StreamDied[him, NIL];
ENDLOOP;
IF throttle > 0 THEN ThrottleBack[him];
END;
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, NIL];
END;
PutOnSentQueue:
PROCEDURE [b: PupBuffer] =
{ ReallyPutOnSentQueue[NARROW[b.requeueRef], b] };
ReallyPutOnSentQueue:
ENTRY
PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
IF ackedID > Flip[b.pupID]
THEN
BEGIN
IF (unackedPups ← unackedPups - 1) = 0
THEN
BEGIN
SELECT state FROM -- last packet has been ACKed
talking => state ← open;
finishing => state ← end;
ENDCASE => SendAbort[him];
BROADCAST stateChange;
END;
ReturnFreePupBuffer[b];
END
ELSE IF sentBuffer = NIL THEN sentBuffer ← b ELSE EnqueuePup[sentQueue, b];
END;
Slurp:
PUBLIC
PROCEDURE[him: Instance] =
BEGIN OPEN him;
b: PupBuffer;
UNTIL pleaseDie
DO
b ← socket.get[];
IF b # NIL THEN InputPacket[him, b];
ENDLOOP;
END;
InputPacket:
ENTRY
PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
thisID: LONG INTEGER ← Flip[b.pupID];
ackWantedBefore: BOOLEAN = sendAck;
c ← b;
IF ~(b.pupType = rfc
OR b.pupType = error)
AND b.source.socket # remote.socket
THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadSource];
ReturnFreePupBuffer[b];
c ← NIL;
END;
SELECT b.pupType FROM
data, aData, mark, aMark =>
SELECT state
FROM
open, talking, end =>
BEGIN
SELECT b.pupType FROM aData, aMark => sendAck ← TRUE; ENDCASE;
IF b.pupLength > bytesPerPupHeader
THEN
BEGIN
offset: INTEGER ← Diff[thisID, nextInputID];
SELECT offset
FROM
0 =>
BEGIN -- nice - just what we wanted
nextInputID ← nextInputID + (c.pupLength - bytesPerPupHeader);
EnqueuePup[inputQueue, c];
NOTIFY inputReady;
c ← NIL;
END;
ENDCASE =>
BEGIN
IF CommFlags.doStats
THEN
SELECT offset
FROM
IN (0..duplicateWindow] =>
StatIncr[statDataPacketsReceivedEarly];
IN (-duplicateWindow..0) =>
StatIncr[statDataPacketsReceivedAgain];
ENDCASE => StatIncr[statDataPacketsReceivedVeryLate];
ReturnFreePupBuffer[c];
c ← NIL;
END;
END
ELSE
-- funny length from way back there
BEGIN
IF CommFlags.doStats
THEN
IF b.pupLength = bytesPerPupHeader
AND b.pupType = aData
THEN
BEGIN StatIncr[statProbesReceived]; SendAck[him]; END
ELSE StatIncr[statEmptyFunnys];
END;
answer probes immediately
IF sendAck
AND (inputQueue.length = 0
OR ackWantedBefore)
THEN
SendAck[him];
END;
halfOpen => SendRfc[him];
ENDCASE --idle, closed, finishing-- => SendAbort[him];
ack =>
BEGIN
IF b.pupLength < bytesPerAck
THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statMouseTrap];
GOTO SkipThisAck;
END;
IF CommFlags.doStats THEN StatIncr[statAcksReceived];
Try to avoid the funny stable SLOW case
IF (thisID - ackedID) < 0
OR (thisID = ackedID
AND probeCounter = 0)
THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statDuplicateAcks];
GOTO SkipThisAck;
END;
probeCounter ← 0;
IF aDataOut
THEN
BEGIN
myRetrTime, responseTime: PupPktOps.Pulses;
responseTime ← ProcessorFace.GetClockPulses[] - timer;
responseTime ← MIN[responseTime, maxRetransmitPulses];
retransmitPulses ← 2*Smooth[responseTime]
retransmitPulses ← 2*((7/8)*(retransmitPulses/2)+(1/8)*responseTime)
retransmitPulses ← (7*retransmitPulses+2*responseTime)/8
myRetrTime ← retransmitPulses;
myRetrTime ← (6*myRetrTime + myRetrTime + 2*responseTime)/8;
myRetrTime ←
MAX[minRetransmitPulses, MIN[myRetrTime, maxRetransmitPulses]];
retransmitPulses ← myRetrTime;
aDataOut ← FALSE;
clumpsSinceBump ← clumpsSinceBump + 1;
IF clumpsSinceBump > clumpsBeforeBump THEN ThrottleForward[him];
END;
IF allocatedPups = 0 THEN BROADCAST stateChange;
hisMaxAllocate ← b.numberOfPupsAhead;
IF hisMaxAllocate = 0
THEN
BEGIN
probeCounter ← 1;
IF CommFlags.doStats THEN StatIncr[statEmptyAlloc];
END;
allocatedPups ← MIN[hisMaxAllocate, pathMaxAllocate];
IF outEnd = 0 THEN BROADCAST stateChange; -- in case first time
outEnd ← MIN[b.maximumBytesPerPup, dataBytesPerPup];
IF ~sameNet THEN outEnd ← MIN[outEnd, PupTypes.maxDataBytesPerGatewayPup];
IF (thisID - maxOutputID) + b.numberOfBytesAhead > 0
THEN
maxOutputID ← b.numberOfBytesAhead + thisID;
ackedID ← thisID;
IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[sentQueue];
WHILE sentBuffer #
NIL
AND thisID > Flip[sentBuffer.pupID]
DO
IF (unackedPups ← unackedPups - 1) = 0
THEN
BEGIN
SELECT state
FROM
-- last packet has been ACKed
talking => state ← open;
finishing => state ← end;
ENDCASE => SendAbort[him];
BROADCAST stateChange;
END;
ReturnFreePupBuffer[sentBuffer];
sentBuffer ← DequeuePup[sentQueue];
ENDLOOP;
UNTIL sentBuffer =
NIL
DO
PupRouterSendThis[sentBuffer];
IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
sentBuffer ← DequeuePup[sentQueue];
ENDLOOP;
EXITS SkipThisAck => NULL;
END;
ENDCASE => GotOther[him, c];
IF c # NIL THEN BEGIN ReturnFreePupBuffer[c]; c ← NIL; END;
END;
SendAttention:
PUBLIC
ENTRY
PROCEDURE[him: Instance] =
BEGIN OPEN him;
ENABLE UNWIND => NULL;
WHILE outIntPending DO WAIT stateChange; ENDLOOP;
outIntPending ← TRUE;
SendInt[him];
END;
WaitForAttention:
PUBLIC
ENTRY
PROCEDURE[him: Instance] =
BEGIN OPEN him;
ENABLE UNWIND => NULL;
WHILE seenIntSeq = inIntSeq DO WAIT waitingForInterrupt; ENDLOOP;
seenIntSeq ← seenIntSeq + 1;
END;
END.