PupPktHot.mesa
Copyright © 1984, 1985 by Xerox Corporation. All rights reserved.
HGM March 14, 1981 11:40 AM
Andrew Birrell June 23, 1983 4:31 pm
Levin, August 9, 1983 9:44 am
Russ Atkinson, February 4, 1985 12:40:12 pm PST
DIRECTORY
Process USING [SetTimeout, MsecToTicks],
ProcessorFace USING [GetClockPulses],
StatsDefs USING [StatBump, StatIncr],
CommFlags USING [doDebug, doStats],
DriverDefs USING [MaybeGetFreePupBuffer, Glitch],
PupStream USING [StreamClosing],
PupPktDefs,
PupPktOps,
PupDefs USING [Byte, PupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer, ReturnFreePupBuffer, PupRouterSendThis],
BufferDefs USING [ReturnFreeBuffer],
PupTypes USING [PupAddress, maxDataBytesPerGatewayPup];
PupPktHot: MONITOR LOCKS him USING him: Instance
IMPORTS Process, ProcessorFace, StatsDefs, PupStream, DriverDefs, PupPktOps, PupDefs, BufferDefs
EXPORTS PupPktDefs, PupPktOps = {
OPEN StatsDefs, DriverDefs, PupPktOps, PupDefs;
PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData;
NeitherDataNorMark: PUBLIC ERROR = CODE;
BufferAlreadyRequeued: PUBLIC ERROR = CODE;
StreamNotOpen: PUBLIC ERROR = CODE;
Only called by InputPacket
Diff: PROC [a, b: INT] RETURNS [INTEGER] = {
maxInteger: INTEGER = 77777B;
temp: INT ← a - b;
SELECT TRUE FROM
temp > maxInteger => RETURN[maxInteger];
temp < -maxInteger => RETURN[-maxInteger];
ENDCASE => RETURN[temp];
};
Retransmitter: PUBLIC ENTRY PROC [him: Instance] = {
Retransmit things which have not been acknowledged in a reasonable time.
Such things include RFCs and ENDs as well as data.
UNTIL him.pleaseDie DO
now: PupPktOps.Pulses ← ProcessorFace.GetClockPulses[];
SELECT him.state FROM
open => {
IF now - him.timer > pingPulses AND him.ping THEN {
him.probeCounter ← pingRetransmissions;
him.allocatedPups ← 0; -- will start probing
};
IF now - him.timer > ctlRetransmitPulses AND
(him.outEnd = 0 OR him.allocatedPups = 0) THEN ProbeForAck[him];
};
talking, finishing => {
DO
recycle things that have timed out
IF him.sentBuffer = NIL THEN him.sentBuffer ← DequeuePup[him.sentQueue];
IF him.sentBuffer = NIL THEN EXIT;
IF him.ackedID - Flip[him.sentBuffer.pupID] > 0 THEN {
this packet has been ACKed already
IF (him.unackedPups ← him.unackedPups - 1) = 0 THEN {
last packet has been ACKed
SELECT him.state FROM
talking => him.state ← open;
finishing => him.state ← end;
ENDCASE;
BROADCAST him.stateChange;
};
ReturnFreePupBuffer[him.sentBuffer];
him.sentBuffer ← NIL;
}
ELSE EXIT;
ENDLOOP;
IF him.sentBuffer # NIL AND now - him.timer > him.retransmitPulses THEN {
ProbeForAck[him];
IF now - him.timer > him.retransmitPulses THEN {
couldn't get buffer, use one of ours
IF him.sentBuffer.pupType = data THEN him.sentBuffer.pupType ← aData;
him.timer ← ProcessorFace.GetClockPulses[];
PupRouterSendThis[him.sentBuffer];
IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
him.sentBuffer ← DequeuePup[him.sentQueue];
};
};
};
halfOpen => IF now - him.timer > ctlRetransmitPulses THEN SendRfc[him];
end => IF now - him.timer > ctlRetransmitPulses THEN SendEnd[him];
closed => {
DO
flush anything left on the him.sentQueue
IF him.sentBuffer = NIL THEN him.sentBuffer ← DequeuePup[him.sentQueue];
IF him.sentBuffer = NIL THEN EXIT;
him.unackedPups ← him.unackedPups - 1;
ReturnFreePupBuffer[him.sentBuffer];
him.sentBuffer ← NIL;
ENDLOOP;
};
ENDCASE;
IF him.outIntPending AND now - him.outIntTime > ctlRetransmitPulses THEN
SendInt[him];
WAIT him.retransmitterReady;
ENDLOOP;
UNTIL him.unackedPups = 0 DO
DO
flush anything left on the him.sentQueue
IF him.sentBuffer = NIL THEN him.sentBuffer ← DequeuePup[him.sentQueue];
IF him.sentBuffer = NIL THEN EXIT;
him.unackedPups ← him.unackedPups - 1;
ReturnFreePupBuffer[him.sentBuffer];
him.sentBuffer ← NIL;
ENDLOOP;
If a buffer is on a device output queue, wait until it comes back.
IF him.unackedPups # 0 THEN WAIT him.retransmitterReady;
ENDLOOP;
};
ProbeForAck: INTERNAL PROC [him: Instance] = {
b: PupBuffer ← MaybeGetFreePupBuffer[];
IF b = NIL THEN RETURN;
b.pupLength ← bytesPerPupHeader;
b.pupType ← aData;
b.pupID ← Flop[him.nextOutputID];
b.source ← him.local;
b.dest ← him.remote;
him.timer ← ProcessorFace.GetClockPulses[];
him.aDataOut ← FALSE;
him.clumpsSinceBump ← 0;
PupRouterSendThis[b];
IF CommFlags.doStats THEN StatIncr[statProbesSent];
IF (him.probeCounter ← him.probeCounter + 1) > retransmitionsBeforeAbort THEN
SmashClosed[him, transmissionTimeout];
IF him.probeCounter > probesBeforePanic THEN
him.retransmitPulses ← MIN[2*him.retransmitPulses, maxRetransmitPulses];
};
ThrottleForward: PROC [him: Instance] = {
old: CARDINAL ← him.pathMaxAllocate;
him.clumpsSinceBump ← 0;
We can actually get one packet ahead at this point.
IF him.retransmitPulses = maxRetransmitPulses THEN RETURN;
him.pathMaxAllocate ← MIN[him.pathMaxAllocate + 1, him.myMaxAllocate];
him.retransmitPulses ← (him.retransmitPulses*him.pathMaxAllocate)/old;
};
ThrottleBack: INTERNAL PROC [him: Instance] = {
IF him.pathMaxAllocate = 1 THEN {
This is a desperate attempt to avoid an instability
It is/(was?) also a nasty bug under some strange case that Andrew found
pause: CONDITION;
Process.SetTimeout[@pause, Process.MsecToTicks[maxRetransmitTime]];
WAIT pause;
};
UNTIL him.throttle = 0 OR him.retransmitPulses = minRetransmitPulses OR him.pathMaxAllocate = 1 DO
old: CARDINAL ← him.pathMaxAllocate;
him.pathMaxAllocate ← him.pathMaxAllocate - 1;
Beware of rounding down, assume return ack takes as long as a send packet
This goes unstable if much of the time is due to somebody else's packets
him.retransmitPulses ← ((him.retransmitPulses+old)*old)/(old+1);
him.throttle ← him.throttle - 1;
ENDLOOP;
him.clumpsSinceBump ← him.throttle ← 0;
};
SendAck: INTERNAL PROC [him: Instance] = {
b: PupBuffer ← him.c;
IF b # NIL
THEN him.c ← NIL
ELSE IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
b.pupBody ← ack[
him.dataBytesPerPup, MAX[0, INTEGER[him.myMaxAllocate - him.inputQueue.length]],
byteAllocate];
him.allocatedID ← him.nextInputID + byteAllocate;
b.pupType ← ack;
b.pupID ← Flop[him.nextInputID];
b.pupLength ← bytesPerAck;
b.source ← him.local;
b.dest ← him.remote;
PupRouterSendThis[b];
IF CommFlags.doStats THEN StatIncr[statAcksSent];
IF him.state = open AND him.outEnd # 0 AND him.allocatedPups # 0 THEN
him.timer ← ProcessorFace.GetClockPulses[]; -- avoid pinging
him.sendAck ← FALSE;
};
Get: PUBLIC ENTRY PROC [him: Instance] RETURNS [b: PupBuffer ← NIL] = {
ENABLE UNWIND => NULL;
IF him.inputQueue.length = 0 THEN
SELECT TRUE FROM
(him.state = closed) => ERROR PupStream.StreamClosing[him.whyClosed, him.text];
him.dontWait => RETURN[NIL];
ENDCASE => WAIT him.inputReady;
IF him.inputQueue.length # 0 THEN b ← DequeuePup[him.inputQueue];
IF b = NIL THEN {
IF him.state = closed THEN ERROR PupStream.StreamClosing[him.whyClosed, him.text];
RETURN;
};
IF CommFlags.doStats THEN
SELECT b.pupType FROM
data, aData => {
StatIncr[statDataPacketsReceived];
StatBump[statDataBytesReceived, b.pupLength - bytesPerPupHeader];
};
mark, aMark => StatIncr[statMarksReceived];
ENDCASE => Glitch[NeitherDataNorMark];
IF him.inputQueue.length = 0 AND him.sendAck THEN SendAck[him];
};
PktsAvailable: PUBLIC ENTRY PROC [him: Instance] RETURNS [BOOL] = {
ENABLE UNWIND => NULL;
RETURN[ him.inputQueue.length # 0 ]
};
Put: PUBLIC PROC [him: Instance, b: PupBuffer] = {
IF CommFlags.doDebug AND b.requeueProcedure # BufferDefs.ReturnFreeBuffer THEN
Glitch[BufferAlreadyRequeued];
SELECT him.state FROM
open, talking => {
IF CommFlags.doStats THEN {
StatIncr[statDataPacketsSent];
StatBump[statDataBytesSent, b.pupLength - bytesPerPupHeader];
};
SendPacket[him, b];
};
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, b];
};
PutMark: PUBLIC PROC [him: Instance, byte: Byte] = {
b: PupBuffer ← GetFreePupBuffer[];
b.pupBytes[0] ← byte;
b.pupLength ← bytesPerPupHeader + 1;
b.pupType ← aMark;
SendPacket[him, b];
IF CommFlags.doStats THEN StatIncr[statMarksSent];
};
SendPacket: ENTRY PROC [him: Instance, b: PupBuffer] = {
SELECT him.state FROM
open, talking => {
b.pupID ← Flop[him.nextOutputID];
IF b.pupLength = bytesPerPupHeader
THEN him.probeCounter ← 1
ELSE {
b.requeueProcedure ← LOOPHOLE[PutOnSentQueue];
b.requeueRef ← him; -- BEWARE: we rely on having another reference to the REF
him.state ← talking;
him.unackedPups ← him.unackedPups + 1;
};
b.source ← him.local;
b.dest ← him.remote;
him.nextOutputID ← him.nextOutputID + (b.pupLength - bytesPerPupHeader);
we don't use mark, only aMark
IF b.pupType = data AND ~((him.maxOutputID - him.nextOutputID) > 0 AND him.allocatedPups > him.unackedPups) THEN b.pupType ← aData;
IF b.pupType # data THEN {
aMark or aData
him.timer ← ProcessorFace.GetClockPulses[];
him.aDataOut ← TRUE;
};
PupRouterSendThis[b];
IF b.pupType # data THEN WaitToSend[him ! UNWIND => NULL];
};
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, b ! UNWIND => NULL];
};
WaitToSend: INTERNAL PROC [him: Instance] = {
SELECT him.state FROM
open, talking => {
Wait until all our packets have been acked so we don't shoot them down.
DO
SELECT him.state FROM
open => EXIT;
talking => WAIT him.stateChange;
ENDCASE => StreamDied[him, NIL];
ENDLOOP;
now wait for allocate
UNTIL (him.maxOutputID - him.nextOutputID) > 0 AND him.allocatedPups > him.unackedPups DO
SELECT him.state FROM
not really, but .....
open, talking => WAIT him.stateChange;
ENDCASE => StreamDied[him, NIL];
ENDLOOP;
IF him.throttle > 0 THEN ThrottleBack[him];
};
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, NIL];
};
PutOnSentQueue: PROC [b: PupBuffer] = {
ReallyPutOnSentQueue[NARROW[b.requeueRef], b];
};
ReallyPutOnSentQueue: ENTRY PROC [him: Instance, b: PupBuffer] = {
SELECT TRUE FROM
him.ackedID > Flip[b.pupID] => {
IF (him.unackedPups ← him.unackedPups - 1) = 0 THEN {
last packet has been ACKed
SELECT him.state FROM
talking => him.state ← open;
finishing => him.state ← end;
ENDCASE => SendAbort[him];
BROADCAST him.stateChange;
};
ReturnFreePupBuffer[b];
};
him.sentBuffer = NIL => him.sentBuffer ← b;
ENDCASE => EnqueuePup[him.sentQueue, b];
};
Slurp: PUBLIC PROC [him: Instance] = {
UNTIL him.pleaseDie DO
b: PupBuffer ← him.socket.get[];
IF b # NIL THEN InputPacket[him, b];
ENDLOOP;
};
InputPacket: ENTRY PROC [him: Instance, b: PupBuffer] = {
ENABLE UNWIND => NULL;
thisID: INT ← Flip[b.pupID];
ackWantedBefore: BOOL = him.sendAck;
him.c ← b;
IF ~(b.pupType = rfc OR b.pupType = error) AND b.source.socket # him.remote.socket THEN {
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadSource];
GO TO freeC;
};
SELECT b.pupType FROM
data, aData, mark, aMark =>
SELECT him.state FROM
open, talking, end => {
SELECT b.pupType FROM aData, aMark => him.sendAck ← TRUE; ENDCASE;
IF b.pupLength > bytesPerPupHeader
THEN {
offset: INTEGER ← Diff[thisID, him.nextInputID];
SELECT offset FROM
0 => {
nice - just what we wanted
him.nextInputID ← him.nextInputID + (him.c.pupLength - bytesPerPupHeader);
EnqueuePup[him.inputQueue, him.c];
NOTIFY him.inputReady;
him.c ← NIL;
};
ENDCASE => {
IF CommFlags.doStats THEN
SELECT offset FROM
IN (0..duplicateWindow] =>
StatIncr[statDataPacketsReceivedEarly];
IN (-duplicateWindow..0) =>
StatIncr[statDataPacketsReceivedAgain];
ENDCASE => StatIncr[statDataPacketsReceivedVeryLate];
ReturnFreePupBuffer[him.c];
him.c ← NIL;
};
}
ELSE {
funny length from way back there
IF CommFlags.doStats THEN
IF b.pupLength = bytesPerPupHeader AND b.pupType = aData
THEN {
StatIncr[statProbesReceived];
SendAck[him];
}
ELSE StatIncr[statEmptyFunnys];
};
answer probes immediately
IF him.sendAck AND (him.inputQueue.length = 0 OR ackWantedBefore) THEN
SendAck[him];
};
halfOpen => SendRfc[him];
ENDCASE --idle, closed, finishing-- => SendAbort[him];
ack => {
IF b.pupLength < bytesPerAck THEN {
IF CommFlags.doStats THEN StatIncr[statMouseTrap];
GOTO SkipThisAck;
};
IF CommFlags.doStats THEN StatIncr[statAcksReceived];
Try to avoid the funny stable SLOW case
IF (thisID - him.ackedID) < 0 OR (thisID = him.ackedID AND him.probeCounter = 0) THEN {
IF CommFlags.doStats THEN StatIncr[statDuplicateAcks];
GOTO SkipThisAck;
};
him.probeCounter ← 0;
IF him.aDataOut THEN {
myRetrTime: PupPktOps.Pulses ← him.retransmitPulses;
responseTime: PupPktOps.Pulses ← ProcessorFace.GetClockPulses[] - him.timer;
responseTime ← MIN[responseTime, maxRetransmitPulses];
him.retransmitPulses ← 2*Smooth[responseTime]
him.retransmitPulses ← 2*((7/8)*(him.retransmitPulses/2)+(1/8)*responseTime)
him.retransmitPulses ← (7*him.retransmitPulses+2*responseTime)/8
myRetrTime ← (6*myRetrTime + myRetrTime + 2*responseTime)/8;
myRetrTime ← MAX[minRetransmitPulses, MIN[myRetrTime, maxRetransmitPulses]];
him.retransmitPulses ← myRetrTime;
him.aDataOut ← FALSE;
him.clumpsSinceBump ← him.clumpsSinceBump + 1;
IF him.clumpsSinceBump > clumpsBeforeBump THEN ThrottleForward[him];
};
IF him.allocatedPups = 0 THEN BROADCAST him.stateChange;
him.hisMaxAllocate ← b.numberOfPupsAhead;
IF him.hisMaxAllocate = 0 THEN {
him.probeCounter ← 1;
IF CommFlags.doStats THEN StatIncr[statEmptyAlloc];
};
him.allocatedPups ← MIN[him.hisMaxAllocate, him.pathMaxAllocate];
IF him.outEnd = 0 THEN BROADCAST him.stateChange; -- in case first time
him.outEnd ← MIN[b.maximumBytesPerPup, him.dataBytesPerPup];
IF ~him.sameNet THEN
him.outEnd ← MIN[him.outEnd, PupTypes.maxDataBytesPerGatewayPup];
IF (thisID - him.maxOutputID) + b.numberOfBytesAhead > 0 THEN
him.maxOutputID ← b.numberOfBytesAhead + thisID;
him.ackedID ← thisID;
IF him.sentBuffer = NIL THEN him.sentBuffer ← DequeuePup[him.sentQueue];
WHILE him.sentBuffer # NIL AND thisID > Flip[him.sentBuffer.pupID] DO
IF (him.unackedPups ← him.unackedPups - 1) = 0 THEN {
SELECT him.state FROM -- last packet has been ACKed
talking => him.state ← open;
finishing => him.state ← end;
ENDCASE => SendAbort[him];
BROADCAST him.stateChange;
};
ReturnFreePupBuffer[him.sentBuffer];
him.sentBuffer ← DequeuePup[him.sentQueue];
ENDLOOP;
UNTIL him.sentBuffer = NIL DO
PupRouterSendThis[him.sentBuffer];
IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
him.sentBuffer ← DequeuePup[him.sentQueue];
ENDLOOP;
EXITS SkipThisAck => NULL;
};
ENDCASE => GotOther[him, him.c];
IF him.c # NIL THEN GO TO freeC;
EXITS freeC => { ReturnFreePupBuffer[him.c]; him.c ← NIL; };
};
SendAttention: PUBLIC ENTRY PROC [him: Instance] = {
ENABLE UNWIND => NULL;
WHILE him.outIntPending DO WAIT him.stateChange; ENDLOOP;
him.outIntPending ← TRUE;
SendInt[him];
};
WaitForAttention: PUBLIC ENTRY PROC [him: Instance] = {
ENABLE UNWIND => NULL;
WHILE him.seenIntSeq = him.inIntSeq DO WAIT him.waitingForInterrupt; ENDLOOP;
him.seenIntSeq ← him.seenIntSeq + 1;
};
}.