PupPktHot.mesa
Copyright © 1984, 1985 by Xerox Corporation. All rights reserved.
HGM March 14, 1981 11:40 AM
Andrew Birrell June 23, 1983 4:31 pm
Levin, August 9, 1983 9:44 am
Russ Atkinson, February 4, 1985 12:40:12 pm PST
DIRECTORY
Process USING [SetTimeout, MsecToTicks],
ProcessorFace USING [GetClockPulses],
StatsDefs USING [StatBump, StatIncr],
CommFlags USING [doDebug, doStats],
DriverDefs USING [MaybeGetFreePupBuffer, Glitch],
PupStream USING [StreamClosing],
PupPktDefs,
PupPktOps,
PupDefs USING [Byte, PupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer, ReturnFreePupBuffer, PupRouterSendThis],
BufferDefs USING [ReturnFreeBuffer],
PupTypes USING [PupAddress, maxDataBytesPerGatewayPup];
PupPktHot:
MONITOR
LOCKS him
USING him: Instance
IMPORTS Process, ProcessorFace, StatsDefs, PupStream, DriverDefs, PupPktOps, PupDefs, BufferDefs
EXPORTS PupPktDefs, PupPktOps = {
OPEN StatsDefs, DriverDefs, PupPktOps, PupDefs;
PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData;
NeitherDataNorMark: PUBLIC ERROR = CODE;
BufferAlreadyRequeued: PUBLIC ERROR = CODE;
StreamNotOpen: PUBLIC ERROR = CODE;
Only called by InputPacket
Diff:
PROC [a, b:
INT]
RETURNS [
INTEGER] = {
maxInteger: INTEGER = 77777B;
temp: INT ← a - b;
SELECT
TRUE
FROM
temp > maxInteger => RETURN[maxInteger];
temp < -maxInteger => RETURN[-maxInteger];
ENDCASE => RETURN[temp];
};
Retransmitter:
PUBLIC
ENTRY
PROC [him: Instance] = {
Retransmit things which have not been acknowledged in a reasonable time.
Such things include RFCs and ENDs as well as data.
UNTIL him.pleaseDie
DO
now: PupPktOps.Pulses ← ProcessorFace.GetClockPulses[];
SELECT him.state
FROM
open => {
IF now - him.timer > pingPulses
AND him.ping
THEN {
him.probeCounter ← pingRetransmissions;
him.allocatedPups ← 0; -- will start probing
};
IF now - him.timer > ctlRetransmitPulses
AND
(him.outEnd = 0 OR him.allocatedPups = 0) THEN ProbeForAck[him];
};
talking, finishing => {
DO
recycle things that have timed out
IF him.sentBuffer = NIL THEN him.sentBuffer ← DequeuePup[him.sentQueue];
IF him.sentBuffer = NIL THEN EXIT;
IF him.ackedID - Flip[him.sentBuffer.pupID] > 0
THEN {
this packet has been ACKed already
IF (him.unackedPups ← him.unackedPups - 1) = 0
THEN {
last packet has been ACKed
SELECT him.state
FROM
talking => him.state ← open;
finishing => him.state ← end;
ENDCASE;
BROADCAST him.stateChange;
};
ReturnFreePupBuffer[him.sentBuffer];
him.sentBuffer ← NIL;
}
ELSE EXIT;
ENDLOOP;
IF him.sentBuffer #
NIL
AND now - him.timer > him.retransmitPulses
THEN {
ProbeForAck[him];
IF now - him.timer > him.retransmitPulses
THEN {
couldn't get buffer, use one of ours
IF him.sentBuffer.pupType = data THEN him.sentBuffer.pupType ← aData;
him.timer ← ProcessorFace.GetClockPulses[];
PupRouterSendThis[him.sentBuffer];
IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
him.sentBuffer ← DequeuePup[him.sentQueue];
};
};
};
halfOpen => IF now - him.timer > ctlRetransmitPulses THEN SendRfc[him];
end => IF now - him.timer > ctlRetransmitPulses THEN SendEnd[him];
closed => {
DO
flush anything left on the him.sentQueue
IF him.sentBuffer = NIL THEN him.sentBuffer ← DequeuePup[him.sentQueue];
IF him.sentBuffer = NIL THEN EXIT;
him.unackedPups ← him.unackedPups - 1;
ReturnFreePupBuffer[him.sentBuffer];
him.sentBuffer ← NIL;
ENDLOOP;
};
ENDCASE;
IF him.outIntPending
AND now - him.outIntTime > ctlRetransmitPulses
THEN
SendInt[him];
WAIT him.retransmitterReady;
ENDLOOP;
UNTIL him.unackedPups = 0
DO
DO
flush anything left on the him.sentQueue
IF him.sentBuffer = NIL THEN him.sentBuffer ← DequeuePup[him.sentQueue];
IF him.sentBuffer = NIL THEN EXIT;
him.unackedPups ← him.unackedPups - 1;
ReturnFreePupBuffer[him.sentBuffer];
him.sentBuffer ← NIL;
ENDLOOP;
If a buffer is on a device output queue, wait until it comes back.
IF him.unackedPups # 0 THEN WAIT him.retransmitterReady;
ENDLOOP;
};
ProbeForAck:
INTERNAL
PROC [him: Instance] =
{
b: PupBuffer ← MaybeGetFreePupBuffer[];
IF b = NIL THEN RETURN;
b.pupLength ← bytesPerPupHeader;
b.pupType ← aData;
b.pupID ← Flop[him.nextOutputID];
b.source ← him.local;
b.dest ← him.remote;
him.timer ← ProcessorFace.GetClockPulses[];
him.aDataOut ← FALSE;
him.clumpsSinceBump ← 0;
PupRouterSendThis[b];
IF CommFlags.doStats THEN StatIncr[statProbesSent];
IF (him.probeCounter ← him.probeCounter + 1) > retransmitionsBeforeAbort
THEN
SmashClosed[him, transmissionTimeout];
IF him.probeCounter > probesBeforePanic
THEN
him.retransmitPulses ← MIN[2*him.retransmitPulses, maxRetransmitPulses];
};
ThrottleForward:
PROC [him: Instance] = {
old: CARDINAL ← him.pathMaxAllocate;
him.clumpsSinceBump ← 0;
We can actually get one packet ahead at this point.
IF him.retransmitPulses = maxRetransmitPulses THEN RETURN;
him.pathMaxAllocate ← MIN[him.pathMaxAllocate + 1, him.myMaxAllocate];
him.retransmitPulses ← (him.retransmitPulses*him.pathMaxAllocate)/old;
};
ThrottleBack:
INTERNAL
PROC [him: Instance] = {
IF him.pathMaxAllocate = 1
THEN {
This is a desperate attempt to avoid an instability
It is/(was?) also a nasty bug under some strange case that Andrew found
pause: CONDITION;
Process.SetTimeout[@pause, Process.MsecToTicks[maxRetransmitTime]];
WAIT pause;
};
UNTIL him.throttle = 0
OR him.retransmitPulses = minRetransmitPulses
OR him.pathMaxAllocate = 1
DO
old: CARDINAL ← him.pathMaxAllocate;
him.pathMaxAllocate ← him.pathMaxAllocate - 1;
Beware of rounding down, assume return ack takes as long as a send packet
This goes unstable if much of the time is due to somebody else's packets
him.retransmitPulses ← ((him.retransmitPulses+old)*old)/(old+1);
him.throttle ← him.throttle - 1;
ENDLOOP;
him.clumpsSinceBump ← him.throttle ← 0;
};
SendAck:
INTERNAL
PROC [him: Instance] = {
b: PupBuffer ← him.c;
IF b #
NIL
THEN him.c ← NIL
ELSE IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
b.pupBody ← ack[
him.dataBytesPerPup, MAX[0, INTEGER[him.myMaxAllocate - him.inputQueue.length]],
byteAllocate];
him.allocatedID ← him.nextInputID + byteAllocate;
b.pupType ← ack;
b.pupID ← Flop[him.nextInputID];
b.pupLength ← bytesPerAck;
b.source ← him.local;
b.dest ← him.remote;
PupRouterSendThis[b];
IF CommFlags.doStats THEN StatIncr[statAcksSent];
IF him.state = open
AND him.outEnd # 0
AND him.allocatedPups # 0
THEN
him.timer ← ProcessorFace.GetClockPulses[]; -- avoid pinging
him.sendAck ← FALSE;
};
Get:
PUBLIC
ENTRY
PROC [him: Instance]
RETURNS [b: PupBuffer ←
NIL] = {
ENABLE UNWIND => NULL;
IF him.inputQueue.length = 0
THEN
SELECT
TRUE
FROM
(him.state = closed) => ERROR PupStream.StreamClosing[him.whyClosed, him.text];
him.dontWait => RETURN[NIL];
ENDCASE => WAIT him.inputReady;
IF him.inputQueue.length # 0 THEN b ← DequeuePup[him.inputQueue];
IF b =
NIL
THEN {
IF him.state = closed THEN ERROR PupStream.StreamClosing[him.whyClosed, him.text];
RETURN;
};
IF CommFlags.doStats
THEN
SELECT b.pupType
FROM
data, aData => {
StatIncr[statDataPacketsReceived];
StatBump[statDataBytesReceived, b.pupLength - bytesPerPupHeader];
};
mark, aMark => StatIncr[statMarksReceived];
ENDCASE => Glitch[NeitherDataNorMark];
IF him.inputQueue.length = 0 AND him.sendAck THEN SendAck[him];
};
PktsAvailable:
PUBLIC
ENTRY
PROC [him: Instance]
RETURNS [
BOOL] = {
ENABLE UNWIND => NULL;
RETURN[ him.inputQueue.length # 0 ]
};
Put:
PUBLIC
PROC [him: Instance, b: PupBuffer] = {
IF CommFlags.doDebug
AND b.requeueProcedure # BufferDefs.ReturnFreeBuffer
THEN
Glitch[BufferAlreadyRequeued];
SELECT him.state
FROM
open, talking => {
IF CommFlags.doStats
THEN {
StatIncr[statDataPacketsSent];
StatBump[statDataBytesSent, b.pupLength - bytesPerPupHeader];
};
SendPacket[him, b];
};
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, b];
};
PutMark:
PUBLIC
PROC [him: Instance, byte: Byte] = {
b: PupBuffer ← GetFreePupBuffer[];
b.pupBytes[0] ← byte;
b.pupLength ← bytesPerPupHeader + 1;
b.pupType ← aMark;
SendPacket[him, b];
IF CommFlags.doStats THEN StatIncr[statMarksSent];
};
SendPacket:
ENTRY
PROC [him: Instance, b: PupBuffer] = {
SELECT him.state
FROM
open, talking => {
b.pupID ← Flop[him.nextOutputID];
IF b.pupLength = bytesPerPupHeader
THEN him.probeCounter ← 1
ELSE {
b.requeueProcedure ← LOOPHOLE[PutOnSentQueue];
b.requeueRef ← him; -- BEWARE: we rely on having another reference to the REF
him.state ← talking;
him.unackedPups ← him.unackedPups + 1;
};
b.source ← him.local;
b.dest ← him.remote;
him.nextOutputID ← him.nextOutputID + (b.pupLength - bytesPerPupHeader);
we don't use mark, only aMark
IF b.pupType = data AND ~((him.maxOutputID - him.nextOutputID) > 0 AND him.allocatedPups > him.unackedPups) THEN b.pupType ← aData;
IF b.pupType # data
THEN {
aMark or aData
him.timer ← ProcessorFace.GetClockPulses[];
him.aDataOut ← TRUE;
};
PupRouterSendThis[b];
IF b.pupType # data THEN WaitToSend[him ! UNWIND => NULL];
};
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, b ! UNWIND => NULL];
};
WaitToSend:
INTERNAL
PROC [him: Instance] = {
SELECT him.state
FROM
open, talking => {
Wait until all our packets have been acked so we don't shoot them down.
DO
SELECT him.state
FROM
open => EXIT;
talking => WAIT him.stateChange;
ENDCASE => StreamDied[him, NIL];
ENDLOOP;
now wait for allocate
UNTIL (him.maxOutputID - him.nextOutputID) > 0
AND him.allocatedPups > him.unackedPups
DO
SELECT him.state
FROM
not really, but .....
open, talking => WAIT him.stateChange;
ENDCASE => StreamDied[him, NIL];
ENDLOOP;
IF him.throttle > 0 THEN ThrottleBack[him];
};
idle, halfOpen => Glitch[StreamNotOpen];
ENDCASE --end, closed, finishing-- => StreamDied[him, NIL];
};
PutOnSentQueue:
PROC [b: PupBuffer] = {
ReallyPutOnSentQueue[NARROW[b.requeueRef], b];
};
ReallyPutOnSentQueue:
ENTRY
PROC [him: Instance, b: PupBuffer] = {
SELECT
TRUE
FROM
him.ackedID > Flip[b.pupID] => {
IF (him.unackedPups ← him.unackedPups - 1) = 0
THEN {
last packet has been ACKed
SELECT him.state
FROM
talking => him.state ← open;
finishing => him.state ← end;
ENDCASE => SendAbort[him];
BROADCAST him.stateChange;
};
ReturnFreePupBuffer[b];
};
him.sentBuffer = NIL => him.sentBuffer ← b;
ENDCASE => EnqueuePup[him.sentQueue, b];
};
Slurp:
PUBLIC
PROC [him: Instance] =
{
UNTIL him.pleaseDie
DO
b: PupBuffer ← him.socket.get[];
IF b # NIL THEN InputPacket[him, b];
ENDLOOP;
};
InputPacket:
ENTRY
PROC [him: Instance, b: PupBuffer] = {
ENABLE UNWIND => NULL;
thisID: INT ← Flip[b.pupID];
ackWantedBefore: BOOL = him.sendAck;
him.c ← b;
IF ~(b.pupType = rfc
OR b.pupType = error)
AND b.source.socket # him.remote.socket
THEN {
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadSource];
GO TO freeC;
};
SELECT b.pupType
FROM
data, aData, mark, aMark =>
SELECT him.state
FROM
open, talking, end => {
SELECT b.pupType FROM aData, aMark => him.sendAck ← TRUE; ENDCASE;
IF b.pupLength > bytesPerPupHeader
THEN {
offset: INTEGER ← Diff[thisID, him.nextInputID];
SELECT offset
FROM
0 => {
nice - just what we wanted
him.nextInputID ← him.nextInputID + (him.c.pupLength - bytesPerPupHeader);
EnqueuePup[him.inputQueue, him.c];
NOTIFY him.inputReady;
him.c ← NIL;
};
ENDCASE => {
IF CommFlags.doStats
THEN
SELECT offset
FROM
IN (0..duplicateWindow] =>
StatIncr[statDataPacketsReceivedEarly];
IN (-duplicateWindow..0) =>
StatIncr[statDataPacketsReceivedAgain];
ENDCASE => StatIncr[statDataPacketsReceivedVeryLate];
ReturnFreePupBuffer[him.c];
him.c ← NIL;
};
}
ELSE {
funny length from way back there
IF CommFlags.doStats
THEN
IF b.pupLength = bytesPerPupHeader
AND b.pupType = aData
THEN {
StatIncr[statProbesReceived];
SendAck[him];
}
ELSE StatIncr[statEmptyFunnys];
};
answer probes immediately
IF him.sendAck
AND (him.inputQueue.length = 0
OR ackWantedBefore)
THEN
SendAck[him];
};
halfOpen => SendRfc[him];
ENDCASE --idle, closed, finishing-- => SendAbort[him];
ack => {
IF b.pupLength < bytesPerAck
THEN {
IF CommFlags.doStats THEN StatIncr[statMouseTrap];
GOTO SkipThisAck;
};
IF CommFlags.doStats THEN StatIncr[statAcksReceived];
Try to avoid the funny stable SLOW case
IF (thisID - him.ackedID) < 0
OR (thisID = him.ackedID
AND him.probeCounter = 0)
THEN {
IF CommFlags.doStats THEN StatIncr[statDuplicateAcks];
GOTO SkipThisAck;
};
him.probeCounter ← 0;
IF him.aDataOut
THEN {
myRetrTime: PupPktOps.Pulses ← him.retransmitPulses;
responseTime: PupPktOps.Pulses ← ProcessorFace.GetClockPulses[] - him.timer;
responseTime ← MIN[responseTime, maxRetransmitPulses];
him.retransmitPulses ← 2*Smooth[responseTime]
him.retransmitPulses ← 2*((7/8)*(him.retransmitPulses/2)+(1/8)*responseTime)
him.retransmitPulses ← (7*him.retransmitPulses+2*responseTime)/8
myRetrTime ← (6*myRetrTime + myRetrTime + 2*responseTime)/8;
myRetrTime ← MAX[minRetransmitPulses, MIN[myRetrTime, maxRetransmitPulses]];
him.retransmitPulses ← myRetrTime;
him.aDataOut ← FALSE;
him.clumpsSinceBump ← him.clumpsSinceBump + 1;
IF him.clumpsSinceBump > clumpsBeforeBump THEN ThrottleForward[him];
};
IF him.allocatedPups = 0 THEN BROADCAST him.stateChange;
him.hisMaxAllocate ← b.numberOfPupsAhead;
IF him.hisMaxAllocate = 0
THEN {
him.probeCounter ← 1;
IF CommFlags.doStats THEN StatIncr[statEmptyAlloc];
};
him.allocatedPups ← MIN[him.hisMaxAllocate, him.pathMaxAllocate];
IF him.outEnd = 0 THEN BROADCAST him.stateChange; -- in case first time
him.outEnd ← MIN[b.maximumBytesPerPup, him.dataBytesPerPup];
IF ~him.sameNet
THEN
him.outEnd ← MIN[him.outEnd, PupTypes.maxDataBytesPerGatewayPup];
IF (thisID - him.maxOutputID) + b.numberOfBytesAhead > 0
THEN
him.maxOutputID ← b.numberOfBytesAhead + thisID;
him.ackedID ← thisID;
IF him.sentBuffer = NIL THEN him.sentBuffer ← DequeuePup[him.sentQueue];
WHILE him.sentBuffer #
NIL
AND thisID > Flip[him.sentBuffer.pupID]
DO
IF (him.unackedPups ← him.unackedPups - 1) = 0
THEN {
SELECT him.state
FROM
-- last packet has been ACKed
talking => him.state ← open;
finishing => him.state ← end;
ENDCASE => SendAbort[him];
BROADCAST him.stateChange;
};
ReturnFreePupBuffer[him.sentBuffer];
him.sentBuffer ← DequeuePup[him.sentQueue];
ENDLOOP;
UNTIL him.sentBuffer =
NIL
DO
PupRouterSendThis[him.sentBuffer];
IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
him.sentBuffer ← DequeuePup[him.sentQueue];
ENDLOOP;
EXITS SkipThisAck => NULL;
};
ENDCASE => GotOther[him, him.c];
IF him.c # NIL THEN GO TO freeC;
EXITS freeC => { ReturnFreePupBuffer[him.c]; him.c ← NIL; };
};
SendAttention:
PUBLIC
ENTRY
PROC [him: Instance] = {
ENABLE UNWIND => NULL;
WHILE him.outIntPending DO WAIT him.stateChange; ENDLOOP;
him.outIntPending ← TRUE;
SendInt[him];
};
WaitForAttention:
PUBLIC
ENTRY
PROC [him: Instance] = {
ENABLE UNWIND => NULL;
WHILE him.seenIntSeq = him.inIntSeq DO WAIT him.waitingForInterrupt; ENDLOOP;
him.seenIntSeq ← him.seenIntSeq + 1;
};
}.