-- Copyright (C) 1983 by Xerox Corporation. All rights reserved. -- File: PupPktHot.mesa, Last Edit: AOF 17-Jan-85 15:05:24 -- Tim Diebert, 7-Oct-85 10:27:19 DIRECTORY Buffer USING [GetBuffer, ReturnBuffer], Inline USING [LowHalf], Process USING [EnableAborts, SetTimeout, MsecToTicks], System USING [Pulses, GetClockPulses], Stats USING [StatBump, StatIncr], CommFlags USING [doDebug, doStats], Driver USING [Glitch], PupStream USING [StreamClosing], PupPktDefs, PupPktOps, PupDefs USING [Byte, PupBuffer, DequeuePup, EnqueuePup, PupRouterSendThis], PupTypes USING [PupAddress, maxDataBytesPerGatewayPup]; PupPktHot: MONITOR RETURNS [POINTER TO PupPktOps.InstanceData]LOCKS mine.lock IMPORTS Buffer, Inline, Process, System, Stats, PupStream, Driver, PupPktOps, PupDefs EXPORTS PupPktOps = BEGIN OPEN PupPktOps, PupDefs; mine: PupPktOps.InstanceData; NeitherDataNorMark: PUBLIC ERROR = CODE; BufferAlreadyRequeued: PUBLIC ERROR = CODE; StreamNotOpen: PUBLIC ERROR = CODE; -- Only called by InputPacket Diff: PROCEDURE [a, b: LONG INTEGER] RETURNS [INTEGER] = BEGIN maxInteger: INTEGER = 77777B; temp: LONG INTEGER ← a - b; SELECT TRUE FROM temp > maxInteger => RETURN[maxInteger]; temp < -maxInteger => RETURN[-maxInteger]; ENDCASE => RETURN[Inline.LowHalf[temp]]; END; Retransmitter: ENTRY PROCEDURE = -- Retransmit things which have not been acknowledged in a reasonable time. -- Such things include RFCs and ENDs as well as data. BEGIN OPEN mine; ENABLE UNWIND => NULL; now: System.Pulses; UNTIL pleaseDie DO now ← System.GetClockPulses[]; SELECT state FROM open => BEGIN IF now - timer > pingPulses AND ping THEN BEGIN probeCounter ← pingRetransmissions; allocatedPups ← 0; -- will start probing END; IF now - timer > ctlRetransmitPulses AND (outEnd = 0 OR allocatedPups = 0) THEN BEGIN ProbeForAck[]; now ← System.GetClockPulses[]; END; END; talking, finishing => BEGIN DO -- recycle things that have timed out IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue]; IF sentBuffer = NIL THEN EXIT; IF ackedID - Flip[sentBuffer.pup.pupID] > 0 THEN BEGIN -- this packet has been ACKed already IF (unackedPups ← unackedPups - 1) = 0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state ← open; finishing => state ← end; ENDCASE; BROADCAST stateChange; END; Buffer.ReturnBuffer[sentBuffer]; sentBuffer ← NIL; END ELSE EXIT; ENDLOOP; IF sentBuffer # NIL AND now - timer > retransmitPulses THEN BEGIN ProbeForAck[]; now ← System.GetClockPulses[]; IF now - timer > retransmitPulses THEN BEGIN -- couldn't get buffer, use one of ours IF sentBuffer.pup.pupType = data THEN sentBuffer.pup.pupType ← aData; timer ← System.GetClockPulses[]; PupRouterSendThis[sentBuffer]; IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsRetransmitted]; sentBuffer ← DequeuePup[@sentQueue]; END; END; END; halfOpen => IF now - timer > ctlRetransmitPulses THEN SendRfc[@mine]; end => IF now - timer > ctlRetransmitPulses THEN SendEnd[@mine]; closed => BEGIN DO -- flush anything left on the sentQueue IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue]; IF sentBuffer = NIL THEN EXIT; unackedPups ← unackedPups - 1; Buffer.ReturnBuffer[sentBuffer]; sentBuffer ← NIL; ENDLOOP; END; ENDCASE; IF outIntPending AND now - outIntTime > ctlRetransmitPulses THEN SendInt[@mine]; WAIT retransmitterReady; ENDLOOP; UNTIL unackedPups = 0 DO DO -- flush anything left on the sentQueue IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue]; IF sentBuffer = NIL THEN EXIT; unackedPups ← unackedPups - 1; Buffer.ReturnBuffer[sentBuffer]; sentBuffer ← NIL; ENDLOOP; -- If a buffer is on a device output queue, wait until it comes back. IF unackedPups # 0 THEN WAIT retransmitterReady; ENDLOOP; END; ProbeForAck: INTERNAL PROCEDURE = BEGIN OPEN mine; b: PupBuffer; IF (b ← Buffer.GetBuffer[ pup, PupPktOps.pupBuffers, send, fullBuffer, FALSE]) = NIL THEN RETURN; b.pup.pupLength ← bytesPerPupHeader; b.pup.pupType ← aData; b.pup.pupID ← Flop[nextOutputID]; b.pup.source ← local; b.pup.dest ← remote; timer ← System.GetClockPulses[]; aDataOut ← FALSE; clumpsSinceBump ← 0; PupRouterSendThis[b]; IF CommFlags.doStats THEN Stats.StatIncr[statProbesSent]; IF (probeCounter ← probeCounter + 1) > retransmitionsBeforeAbort THEN SmashClosed[@mine, transmissionTimeout]; IF probeCounter > probesBeforePanic THEN retransmitPulses ← [ MIN[System.Pulses[2*retransmitPulses], maxRetransmitPulses]]; END; ThrottleForward: PROCEDURE = BEGIN OPEN mine; old: CARDINAL ← pathMaxAllocate; clumpsSinceBump ← 0; -- We can actually get one packet ahead at this point. IF retransmitPulses = maxRetransmitPulses THEN RETURN; pathMaxAllocate ← MIN[pathMaxAllocate + 1, myMaxAllocate]; retransmitPulses ← [(retransmitPulses*pathMaxAllocate)/old]; END; ThrottleBack: INTERNAL PROCEDURE = BEGIN OPEN mine; old: CARDINAL; IF pathMaxAllocate = 1 THEN BEGIN -- This is a desperate attempt to avoid an instability -- It is/(was?) also a nasty bug under some strange case that Andrew found pause: CONDITION; Process.SetTimeout[@pause, Process.MsecToTicks[maxRetransmitTime]]; Process.EnableAborts[@pause]; WAIT pause; END; UNTIL throttle = 0 OR retransmitPulses = minRetransmitPulses OR pathMaxAllocate = 1 DO old ← pathMaxAllocate; pathMaxAllocate ← pathMaxAllocate - 1; -- Beware of rounding down, assume return ack takes as long as a send packet -- This goes unstable if much of the time is due to somebody else's packets -- retransmitPulses ← ((retransmitPulses+old)*old)/(old+1); throttle ← throttle - 1; ENDLOOP; clumpsSinceBump ← throttle ← 0; END; SendAck: INTERNAL PROCEDURE = BEGIN OPEN mine; b: PupBuffer; IF c # NIL THEN BEGIN b ← c; c ← NIL; END ELSE IF (b ← Buffer.GetBuffer[ pup, PupPktOps.pupBuffers, send, fullBuffer, FALSE]) = NIL THEN RETURN; b.pup.pupBody ← ack[ dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]], byteAllocate]; allocatedID ← nextInputID + byteAllocate; b.pup.pupType ← ack; b.pup.pupID ← Flop[nextInputID]; b.pup.pupLength ← bytesPerAck; b.pup.source ← local; b.pup.dest ← remote; PupRouterSendThis[b]; IF CommFlags.doStats THEN Stats.StatIncr[statAcksSent]; IF state = open AND outEnd # 0 AND allocatedPups # 0 THEN timer ← System.GetClockPulses[]; -- avoid pinging sendAck ← FALSE; END; Get: ENTRY PROCEDURE RETURNS [b: PupBuffer] = BEGIN OPEN mine; ENABLE UNWIND => NULL; IF inputQueue.length = 0 THEN SELECT TRUE FROM (state = closed) => ERROR PupStream.StreamClosing[whyClosed, text]; dontWait => RETURN[NIL]; ENDCASE => WAIT inputReady; IF inputQueue.length # 0 THEN b ← DequeuePup[@inputQueue] ELSE b ← NIL; IF b = NIL THEN BEGIN IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text]; RETURN; END; IF CommFlags.doStats THEN SELECT b.pup.pupType FROM data, aData => BEGIN Stats.StatIncr[statDataPacketsReceived]; Stats.StatBump[statDataBytesReceived, b.pup.pupLength - bytesPerPupHeader]; END; mark, aMark => Stats.StatIncr[statMarksReceived]; ENDCASE => Driver.Glitch[NeitherDataNorMark]; IF inputQueue.length = 0 AND sendAck THEN SendAck[]; END; Put: PROCEDURE [b: PupBuffer] = BEGIN OPEN mine; IF CommFlags.doDebug AND b.requeueProcedure # Buffer.ReturnBuffer THEN Driver.Glitch[BufferAlreadyRequeued]; SELECT state FROM open, talking => BEGIN IF CommFlags.doStats THEN BEGIN Stats.StatIncr[statDataPacketsSent]; Stats.StatBump[statDataBytesSent, b.pup.pupLength - bytesPerPupHeader]; END; SendPacket[b]; END; idle, halfOpen => Driver.Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[@mine, b]; END; PutMark: PROCEDURE [byte: Byte] = BEGIN OPEN mine; b: PupBuffer; b ← Buffer.GetBuffer[pup, PupPktOps.pupBuffers, send, smallBuffer]; b.pup.pupBytes[0] ← byte; b.pup.pupLength ← bytesPerPupHeader + 1; b.pup.pupType ← aMark; SendPacket[b]; IF CommFlags.doStats THEN Stats.StatIncr[statMarksSent]; END; SendPacket: ENTRY PROCEDURE [b: PupBuffer] = BEGIN OPEN mine; ENABLE UNWIND => NULL; SELECT state FROM open, talking => BEGIN b.pup.pupID ← Flop[nextOutputID]; IF b.pup.pupLength = bytesPerPupHeader THEN probeCounter ← 1 ELSE BEGIN b.requeueProcedure ← LOOPHOLE[PutOnSentQueue]; state ← talking; unackedPups ← unackedPups + 1; END; b.pup.source ← local; b.pup.dest ← remote; nextOutputID ← nextOutputID + (b.pup.pupLength - bytesPerPupHeader); IF b.pup.pupType = data -- we don't use mark, only aMark AND ~((maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups) THEN b.pup.pupType ← aData; IF b.pup.pupType # data THEN -- aMark or aData BEGIN timer ← System.GetClockPulses[]; aDataOut ← TRUE; END; PupRouterSendThis[b]; IF b.pup.pupType # data THEN WaitToSend[ ! UNWIND => NULL]; END; idle, halfOpen => Driver.Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[@mine, b ! UNWIND => NULL]; END; WaitToSend: INTERNAL PROCEDURE = BEGIN OPEN mine; SELECT state FROM open, talking => BEGIN -- Wait until all our packets have been acked so we don't shoot them down. DO SELECT state FROM open => EXIT; talking => WAIT stateChange; ENDCASE => StreamDied[@mine, NIL]; ENDLOOP; -- now wait for allocate UNTIL (maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups DO SELECT state FROM -- not really, but ..... open, talking => WAIT stateChange; ENDCASE => StreamDied[@mine, NIL]; ENDLOOP; IF throttle > 0 THEN ThrottleBack[]; END; idle, halfOpen => Driver.Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[@mine, NIL]; END; PutOnSentQueue: ENTRY PROCEDURE [b: PupBuffer] = BEGIN OPEN mine; -- better not SIGNAL from here IF ackedID > Flip[b.pup.pupID] THEN BEGIN IF (unackedPups ← unackedPups - 1) = 0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state ← open; finishing => state ← end; ENDCASE => SendAbort[@mine]; BROADCAST stateChange; END; Buffer.ReturnBuffer[b]; END ELSE IF sentBuffer = NIL THEN sentBuffer ← b ELSE EnqueuePup[@sentQueue, b]; END; Slurp: PROCEDURE = BEGIN OPEN mine; b: PupBuffer; UNTIL pleaseDie DO b ← socket.get[]; IF b # NIL THEN InputPacket[b]; ENDLOOP; END; InputPacket: ENTRY PROCEDURE [b: PupBuffer] = BEGIN OPEN mine; thisID: LONG INTEGER ← Flip[b.pup.pupID]; c ← b; IF ~(b.pup.pupType = rfc OR b.pup.pupType = error) AND (b.pup.source.socket # remote.socket) THEN BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadSource]; Buffer.ReturnBuffer[b]; c ← NIL; RETURN; END; SELECT b.pup.pupType FROM data, aData, mark, aMark => SELECT state FROM open, talking, end => BEGIN SELECT b.pup.pupType FROM aData, aMark => sendAck ← TRUE; ENDCASE; IF b.pup.pupLength > bytesPerPupHeader THEN BEGIN offset: INTEGER ← Diff[thisID, nextInputID]; SELECT offset FROM 0 => BEGIN -- nice - just what we wanted nextInputID ← nextInputID + (c.pup.pupLength - bytesPerPupHeader); EnqueuePup[@inputQueue, c]; NOTIFY inputReady; c ← NIL; END; ENDCASE => BEGIN IF CommFlags.doStats THEN SELECT offset FROM IN (0..duplicateWindow] => Stats.StatIncr[statDataPacketsReceivedEarly]; IN (-duplicateWindow..0) => Stats.StatIncr[statDataPacketsReceivedAgain]; ENDCASE => Stats.StatIncr[statDataPacketsReceivedVeryLate]; Buffer.ReturnBuffer[c]; c ← NIL; END; END ELSE -- funny length from way back there BEGIN IF (b.pup.pupLength = bytesPerPupHeader) AND (b.pup.pupType = aData) THEN BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statProbesReceived]; SendAck[]; END ELSE IF CommFlags.doStats THEN Stats.StatIncr[statEmptyFunnys]; END; -- answer probes immediately IF sendAck AND inputQueue.length = 0 THEN SendAck[]; END; halfOpen => SendRfc[@mine]; ENDCASE --idle, closed, finishing-- => SendAbort[@mine]; ack => BEGIN IF b.pup.pupLength < bytesPerAck THEN BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statMouseTrap]; GOTO SkipThisAck; END; IF CommFlags.doStats THEN Stats.StatIncr[statAcksReceived]; -- Try to avoid the funny stable SLOW case IF (thisID - ackedID) < 0 OR (thisID = ackedID AND probeCounter = 0) THEN BEGIN IF CommFlags.doStats THEN Stats.StatIncr[statDuplicateAcks]; GOTO SkipThisAck; END; probeCounter ← 0; IF aDataOut THEN BEGIN myRetrTime, responseTime: System.Pulses; responseTime ← [System.GetClockPulses[] - timer]; responseTime ← [MIN[responseTime, maxRetransmitPulses]]; -- retransmitPulses ← 2*Smooth[responseTime] -- retransmitPulses ← 2*((7/8)*(retransmitPulses/2)+(1/8)*responseTime) -- retransmitPulses ← (7*retransmitPulses+2*responseTime)/8 myRetrTime ← retransmitPulses; myRetrTime ← [(6*myRetrTime + myRetrTime + 2*responseTime)/8]; myRetrTime ← [ MAX[minRetransmitPulses, MIN[myRetrTime, maxRetransmitPulses]]]; retransmitPulses ← myRetrTime; aDataOut ← FALSE; clumpsSinceBump ← clumpsSinceBump + 1; IF clumpsSinceBump > clumpsBeforeBump THEN ThrottleForward[]; END; IF allocatedPups = 0 THEN BROADCAST stateChange; hisMaxAllocate ← b.pup.numberOfPupsAhead; IF hisMaxAllocate = 0 THEN BEGIN probeCounter ← 1; IF CommFlags.doStats THEN Stats.StatIncr[statEmptyAlloc]; END; allocatedPups ← MIN[hisMaxAllocate, pathMaxAllocate]; IF outEnd = 0 THEN BROADCAST stateChange; -- in case first time outEnd ← MIN[b.pup.maximumBytesPerPup, dataBytesPerPup]; IF ~sameNet THEN outEnd ← MIN[outEnd, PupTypes.maxDataBytesPerGatewayPup]; IF (thisID - maxOutputID) + b.pup.numberOfBytesAhead > 0 THEN maxOutputID ← b.pup.numberOfBytesAhead + thisID; ackedID ← thisID; IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue]; WHILE sentBuffer # NIL AND thisID > Flip[sentBuffer.pup.pupID] DO IF (unackedPups ← unackedPups - 1) = 0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state ← open; finishing => state ← end; ENDCASE => SendAbort[@mine]; BROADCAST stateChange; END; Buffer.ReturnBuffer[sentBuffer]; sentBuffer ← DequeuePup[@sentQueue]; ENDLOOP; UNTIL sentBuffer = NIL DO PupRouterSendThis[sentBuffer]; IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsRetransmitted]; sentBuffer ← DequeuePup[@sentQueue]; ENDLOOP; EXITS SkipThisAck => NULL; END; ENDCASE => GotOther[@mine, c]; IF c # NIL THEN BEGIN Buffer.ReturnBuffer[c]; c ← NIL; END; END; MyGetLocalAddress: PROCEDURE RETURNS [PupTypes.PupAddress] = BEGIN RETURN[GetLocalAddress[@mine]]; END; MyGetSenderSizeLimit: PROCEDURE RETURNS [CARDINAL] = BEGIN RETURN[GetSenderSizeLimit[@mine]]; END; MySendAttention: ENTRY PROCEDURE = BEGIN OPEN mine; ENABLE UNWIND => NULL; WHILE outIntPending DO WAIT stateChange; ENDLOOP; outIntPending ← TRUE; SendInt[@mine]; END; MyWaitForAttention: ENTRY PROCEDURE = BEGIN OPEN mine; ENABLE UNWIND => NULL; WHILE seenIntSeq = inIntSeq DO WAIT waitingForInterrupt; ENDLOOP; seenIntSeq ← seenIntSeq + 1; END; -- initialization mine.me.get ← Get; -- do cold stuff here to avoid recompilation hassels mine.me.put ← Put; mine.me.putMark ← PutMark; mine.me.getSenderSizeLimit ← MyGetSenderSizeLimit; mine.me.sendAttention ← MySendAttention; mine.me.waitForAttention ← MyWaitForAttention; mine.me.getLocalAddress ← MyGetLocalAddress; mine.slurp ← Slurp; mine.retransmitter ← Retransmitter; RETURN[@mine]; END. LOG 19-May-83 16:31:21 By: SMA Action: Converted to new BufferMgr. 24-May-83 11:08:44 By: SMA Action: Get some buffers without wait. 6-Jun-83 15:32:51 By: SMA Action: Enable aborts on condition variables. 10-Dec-84 17:16:19 By: SMA Action: Put in HGM's fix for PupPktHot. 14-Dec-84 14:55:09 By: SMA Action: Added SetWaitTime and GetWaitTime.