-- File: PupPktHot.mesa, Last Edit: -- HGM October 13, 1980 8:08 PM -- MAS August 20, 1980 12:57 PM -- MDS June 18, 1982 3:17 PM -- Copyright Xerox Corporation 1979, 1980 DIRECTORY InlineDefs: FROM "InlineDefs" USING [LowHalf], StatsDefs: FROM "StatsDefs" USING [StatBump, StatIncr], CommUtilDefs: FROM "CommUtilDefs" USING [ GetTicks, SetTimeout, MsecToTicks], DriverDefs: FROM "DriverDefs" USING [ MaybeGetFreePupBuffer, doDebug, doStats, Glitch], PupStream: FROM "PupStream" USING [StreamClosing], PupPktPrivateDefs: FROM "PupPktPrivateDefs", PupPktDefs: FROM "PupPktDefs", PupDefs: FROM "PupDefs" USING [ Byte, PupBuffer, DequeuePup, EnqueuePup, GetFreePupBuffer, ReturnFreePupBuffer, PupRouterSendThis], BufferDefs: FROM "BufferDefs" USING [ReturnFreeBuffer], PupTypes: FROM "PupTypes" USING [PupAddress, maxDataBytesPerGatewayPup]; PupPktHot: MONITOR RETURNS [POINTER TO PupPktPrivateDefs.InstanceData] LOCKS mine.lock IMPORTS InlineDefs, StatsDefs, CommUtilDefs, PupStream, DriverDefs, PupPktPrivateDefs, PupDefs, BufferDefs EXPORTS PupPktPrivateDefs = BEGIN OPEN StatsDefs, DriverDefs, PupPktPrivateDefs, PupDefs; mine: PupPktPrivateDefs.InstanceData; NeitherDataNorMark: PUBLIC ERROR = CODE; BufferAlreadyRequeued: PUBLIC ERROR = CODE; StreamNotOpen: PUBLIC ERROR = CODE; -- Only called by InputPacket Diff: PROCEDURE [a, b: LONG INTEGER] RETURNS [INTEGER] = BEGIN maxInteger: INTEGER = 77777B; temp: LONG INTEGER ← a-b; SELECT TRUE FROM temp>maxInteger => RETURN [maxInteger]; temp<-maxInteger => RETURN [-maxInteger]; ENDCASE => RETURN[InlineDefs.LowHalf[temp]]; END; Retransmitter: ENTRY PROCEDURE = -- Retransmit things which have not been acknowledged in a reasonable time. -- Such things include RFCs and ENDs as well as data. BEGIN OPEN mine; now: CARDINAL; UNTIL pleaseDie DO now ← CommUtilDefs.GetTicks[]; SELECT state FROM open => BEGIN IF now-timer>pingTicks AND ping THEN BEGIN probeCounter ← pingRetransmissions; allocatedPups ← 0; -- will start probing END; IF now-timer>ctlRetransmitTicks AND (outEnd=0 OR allocatedPups=0) THEN ProbeForAck[]; END; talking, finishing => BEGIN DO -- recycle things that have timed out IF sentBuffer=NIL THEN sentBuffer ← DequeuePup[@sentQueue]; IF sentBuffer=NIL THEN EXIT; IF ackedID-Flip[sentBuffer.pupID]>0 THEN BEGIN -- this packet has been ACKed already IF (unackedPups←unackedPups-1)=0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state ← open; finishing => state ← end; ENDCASE; BROADCAST stateChange; END; ReturnFreePupBuffer[sentBuffer]; sentBuffer←NIL; END ELSE EXIT; ENDLOOP; IF sentBuffer#NIL AND now-timer>retransmitTicks THEN BEGIN ProbeForAck[]; IF now-timer>retransmitTicks THEN BEGIN -- couldn't get buffer, use one of ours IF sentBuffer.pupType=data THEN sentBuffer.pupType←aData; timer ← CommUtilDefs.GetTicks[]; PupRouterSendThis[sentBuffer]; IF doStats THEN StatIncr[statDataPacketsRetransmitted]; sentBuffer ← DequeuePup[@sentQueue]; END; END; END; halfOpen => IF now-timer>ctlRetransmitTicks THEN SendRfc[@mine]; end => IF now-timer>ctlRetransmitTicks THEN SendEnd[@mine]; closed => BEGIN DO -- flush anything left on the sentQueue IF sentBuffer=NIL THEN sentBuffer ← DequeuePup[@sentQueue]; IF sentBuffer=NIL THEN EXIT; unackedPups ← unackedPups-1; ReturnFreePupBuffer[sentBuffer]; sentBuffer ← NIL; ENDLOOP; END; ENDCASE; IF outIntPending AND now-outIntTime>ctlRetransmitTicks THEN SendInt[@mine]; WAIT retransmitterReady; ENDLOOP; UNTIL unackedPups=0 DO DO -- flush anything left on the sentQueue IF sentBuffer=NIL THEN sentBuffer ← DequeuePup[@sentQueue]; IF sentBuffer=NIL THEN EXIT; unackedPups ← unackedPups-1; ReturnFreePupBuffer[sentBuffer]; sentBuffer ← NIL; ENDLOOP; -- If a buffer is on a device output queue, wait until it comes back. IF unackedPups#0 THEN WAIT retransmitterReady; ENDLOOP; END; ProbeForAck: INTERNAL PROCEDURE = BEGIN OPEN mine; b: PupBuffer; IF (b←MaybeGetFreePupBuffer[])=NIL THEN RETURN; b.pupLength ← bytesPerPupHeader; b.pupType ← aData; b.pupID ← Flop[nextOutputID]; b.source.socket ← local.socket; b.dest ← remote; timer ← CommUtilDefs.GetTicks[]; aDataOut ← FALSE; clumpsSinceBump ← 0; PupRouterSendThis[b]; IF doStats THEN StatIncr[statProbesSent]; IF (probeCounter←probeCounter+1)>retransmitionsBeforeAbort THEN SmashClosed[@mine,transmissionTimeout]; IF probeCounter>probesBeforePanic THEN retransmitTicks ← MIN[2*retransmitTicks,maxRetransmitTicks]; END; ThrottleForward: PROCEDURE = BEGIN OPEN mine; old: CARDINAL ← pathMaxAllocate; clumpsSinceBump ← 0; -- We can actually get one packet ahead at this point. IF retransmitTicks=maxRetransmitTicks THEN RETURN; pathMaxAllocate ← MIN[pathMaxAllocate+1,myMaxAllocate]; retransmitTicks ← (retransmitTicks*pathMaxAllocate)/old; END; ThrottleBack: INTERNAL PROCEDURE = BEGIN OPEN mine; old: CARDINAL; IF pathMaxAllocate=1 THEN BEGIN -- This is a desperate attempt to avoid an instability -- It is/(was?) also a nasty bug under some strange case that Andrew found pause: CONDITION; CommUtilDefs.SetTimeout[@pause,CommUtilDefs.MsecToTicks[maxRetransmitTime]]; WAIT pause; END; UNTIL throttle=0 OR retransmitTicks=minRetransmitTicks OR pathMaxAllocate=1 DO old ← pathMaxAllocate; pathMaxAllocate ← pathMaxAllocate-1; -- Beware of rounding down, assume return ack takes as long as a send packet -- This goes unstable if much of the time is due to somebody else's packets -- retransmitTicks ← ((retransmitTicks+old)*old)/(old+1); throttle ← throttle-1; ENDLOOP; clumpsSinceBump ← throttle ← 0; END; SendAck: INTERNAL PROCEDURE = BEGIN OPEN mine; b: PupBuffer; IF c#NIL THEN BEGIN b←c; c←NIL; END ELSE IF (b←MaybeGetFreePupBuffer[])=NIL THEN RETURN; b.pupBody ← ack [ dataBytesPerPup, MAX[0,INTEGER[myMaxAllocate-inputQueue.length]], byteAllocate]; allocatedID ← nextInputID+byteAllocate; b.pupType ← ack; b.pupID ← Flop[nextInputID]; b.pupLength ← bytesPerAck; b.source.socket ← local.socket; b.dest ← remote; PupRouterSendThis[b]; IF doStats THEN StatIncr[statAcksSent]; IF state=open AND outEnd#0 AND allocatedPups#0 THEN timer ← CommUtilDefs.GetTicks[]; -- avoid pinging sendAck ← FALSE; END; Get: ENTRY PROCEDURE RETURNS [b: PupBuffer] = BEGIN OPEN mine; ENABLE UNWIND => NULL; IF inputQueue.length=0 THEN SELECT TRUE FROM (state=closed) => ERROR PupStream.StreamClosing[whyClosed,text]; dontWait => RETURN[NIL]; ENDCASE => WAIT inputReady; IF inputQueue.length#0 THEN b ← DequeuePup[@inputQueue] ELSE b ← NIL; IF b=NIL THEN BEGIN IF state=closed THEN ERROR PupStream.StreamClosing[whyClosed,text]; RETURN; END; IF doStats THEN SELECT b.pupType FROM data, aData => BEGIN StatIncr[statDataPacketsReceived]; StatBump[statDataBytesReceived,b.pupLength-bytesPerPupHeader]; END; mark, aMark => StatIncr[statMarksReceived]; ENDCASE => Glitch[NeitherDataNorMark]; IF inputQueue.length=0 AND sendAck THEN SendAck[]; END; Put: PROCEDURE [b: PupBuffer]= BEGIN OPEN mine; IF doDebug AND b.requeueProcedure#BufferDefs.ReturnFreeBuffer THEN Glitch[BufferAlreadyRequeued]; SELECT state FROM open, talking => BEGIN IF doStats THEN BEGIN StatIncr[statDataPacketsSent]; StatBump[statDataBytesSent,b.pupLength-bytesPerPupHeader]; END; SendPacket[b]; END; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[@mine,b]; END; PutMark: PROCEDURE [byte: Byte] = BEGIN OPEN mine; b: PupBuffer; b ← GetFreePupBuffer[]; b.pupBytes[0] ← byte; b.pupLength ← bytesPerPupHeader+1; b.pupType ← aMark; SendPacket[b]; IF doStats THEN StatIncr[statMarksSent]; END; SendPacket: ENTRY PROCEDURE [b: PupBuffer] = BEGIN OPEN mine; SELECT state FROM open, talking => BEGIN b.pupID ← Flop[nextOutputID]; IF b.pupLength=bytesPerPupHeader THEN probeCounter ← 1 ELSE BEGIN b.requeueProcedure ← LOOPHOLE[PutOnSentQueue]; state ← talking; unackedPups ← unackedPups+1; END; b.source.socket ← local.socket; b.dest ← remote; nextOutputID ← nextOutputID+(b.pupLength-bytesPerPupHeader); IF b.pupType=data -- we don't use mark, only aMark AND ~((maxOutputID-nextOutputID)>0 AND allocatedPups>unackedPups) THEN b.pupType ← aData; IF b.pupType#data THEN -- aMark or aData BEGIN timer ← CommUtilDefs.GetTicks[]; aDataOut ← TRUE; END; PupRouterSendThis[b]; IF b.pupType#data THEN WaitToSend[ ! UNWIND => NULL]; END; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[@mine, b ! UNWIND => NULL]; END; WaitToSend: INTERNAL PROCEDURE = BEGIN OPEN mine; SELECT state FROM open, talking => BEGIN -- Wait until all our packets have been acked so we don't shoot them down. DO SELECT state FROM open => EXIT; talking => WAIT stateChange; ENDCASE => StreamDied[@mine,NIL]; ENDLOOP; -- now wait for allocate UNTIL (maxOutputID-nextOutputID)>0 AND allocatedPups>unackedPups DO SELECT state FROM -- not really, but ..... open, talking => WAIT stateChange; ENDCASE => StreamDied[@mine,NIL]; ENDLOOP; IF throttle>0 THEN ThrottleBack[]; END; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE --end, closed, finishing-- => StreamDied[@mine,NIL]; END; PutOnSentQueue: ENTRY PROCEDURE [b: PupBuffer] = BEGIN OPEN mine; -- better not SIGNAL from here IF ackedID-Flip[b.pupID]>0 THEN BEGIN IF (unackedPups←unackedPups-1)=0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state ← open; finishing => state ← end; ENDCASE => SendAbort[@mine]; BROADCAST stateChange; END; ReturnFreePupBuffer[b]; END ELSE IF sentBuffer=NIL THEN sentBuffer ← b ELSE EnqueuePup[@sentQueue,b]; END; Slurp: PROCEDURE = BEGIN OPEN mine; b: PupBuffer; UNTIL pleaseDie DO b ← socket.get[]; IF b#NIL THEN InputPacket[b]; ENDLOOP; END; InputPacket: ENTRY PROCEDURE [b: PupBuffer] = BEGIN OPEN mine; thisID: LONG INTEGER ← Flip[b.pupID]; c ← b; IF ~(b.pupType=rfc OR b.pupType=error) AND b.source.socket#remote.socket THEN BEGIN IF doStats THEN StatIncr[statPacketsRejectedBadSource]; ReturnFreePupBuffer[b]; c ← NIL; END; SELECT b.pupType FROM data, aData, mark, aMark => SELECT state FROM open, talking, end => BEGIN SELECT b.pupType FROM aData, aMark => sendAck ← TRUE; ENDCASE; IF b.pupLength>bytesPerPupHeader THEN BEGIN offset: INTEGER ← Diff[thisID,nextInputID]; SELECT offset FROM 0 => BEGIN -- nice - just what we wanted nextInputID ← nextInputID+(c.pupLength-bytesPerPupHeader); EnqueuePup[@inputQueue,c]; NOTIFY inputReady; c ← NIL; END; ENDCASE => BEGIN IF doStats THEN SELECT offset FROM IN (0..duplicateWindow] => StatIncr[statDataPacketsReceivedEarly]; IN (-duplicateWindow..0) => StatIncr[statDataPacketsReceivedAgain]; ENDCASE => StatIncr[statDataPacketsReceivedVeryLate]; ReturnFreePupBuffer[c]; c ← NIL; END; END ELSE -- funny length from way back there BEGIN IF doStats THEN IF b.pupLength=bytesPerPupHeader AND b.pupType=aData THEN StatIncr[statProbesReceived] ELSE StatIncr[statEmptyFunnys]; END; -- answer probes immediately IF sendAck AND inputQueue.length=0 THEN SendAck[]; END; halfOpen => SendRfc[@mine]; ENDCASE --idle, closed, finishing-- => SendAbort[@mine]; ack => BEGIN IF b.pupLength<bytesPerAck THEN BEGIN IF doStats THEN StatIncr[statMouseTrap]; GOTO SkipThisAck; END; IF doStats THEN StatIncr[statAcksReceived]; -- Try to avoid the funny stable SLOW case IF (thisID-ackedID)<0 OR (thisID=ackedID AND probeCounter=0) THEN BEGIN IF doStats THEN StatIncr[statDuplicateAcks]; GOTO SkipThisAck; END; probeCounter ← 0; IF aDataOut THEN BEGIN responseTime, myRetrTime: CARDINAL; responseTime ← CommUtilDefs.GetTicks[]-timer; responseTime ← MIN[responseTime,maxRetransmitTicks]; -- retransmitTicks ← 2*Smooth[responseTime] -- retransmitTicks ← 2*((7/8)*(retransmitTicks/2)+(1/8)*responseTime) -- retransmitTicks ← (7*retransmitTicks+2*responseTime)/8 myRetrTime ← retransmitTicks; myRetrTime ← (6*myRetrTime+myRetrTime+2*responseTime)/8; myRetrTime ← MAX[minRetransmitTicks,MIN[myRetrTime,maxRetransmitTicks]]; retransmitTicks ← myRetrTime; aDataOut ← FALSE; clumpsSinceBump ← clumpsSinceBump+1; IF clumpsSinceBump>clumpsBeforeBump THEN ThrottleForward[]; END; IF allocatedPups=0 THEN BROADCAST stateChange; hisMaxAllocate ← b.numberOfPupsAhead; IF hisMaxAllocate=0 THEN BEGIN probeCounter ← 1; IF doStats THEN StatIncr[statEmptyAlloc]; END; allocatedPups ← MIN[hisMaxAllocate,pathMaxAllocate]; IF outEnd=0 THEN BROADCAST stateChange; -- in case first time outEnd ← MIN[b.maximumBytesPerPup,dataBytesPerPup]; IF ~sameNet THEN outEnd←MIN[outEnd,PupTypes.maxDataBytesPerGatewayPup]; IF (thisID-maxOutputID)+b.numberOfBytesAhead>0 THEN maxOutputID ← b.numberOfBytesAhead+thisID; ackedID ← thisID; IF sentBuffer=NIL THEN sentBuffer ← DequeuePup[@sentQueue]; WHILE sentBuffer#NIL AND thisID>Flip[sentBuffer.pupID] DO IF (unackedPups←unackedPups-1)=0 THEN BEGIN SELECT state FROM -- last packet has been ACKed talking => state ← open; finishing => state ← end; ENDCASE => SendAbort[@mine]; BROADCAST stateChange; END; ReturnFreePupBuffer[sentBuffer]; sentBuffer ← DequeuePup[@sentQueue]; ENDLOOP; UNTIL sentBuffer=NIL DO PupRouterSendThis[sentBuffer]; IF doStats THEN StatIncr[statDataPacketsRetransmitted]; sentBuffer ← DequeuePup[@sentQueue]; ENDLOOP; EXITS SkipThisAck => NULL; END; ENDCASE => GotOther[@mine,c]; IF c#NIL THEN BEGIN ReturnFreePupBuffer[c]; c←NIL; END; END; MyGetLocalAddress: PROCEDURE RETURNS [PupTypes.PupAddress] = BEGIN RETURN[GetLocalAddress[@mine]]; END; MyGetSenderSizeLimit: PROCEDURE RETURNS [CARDINAL] = BEGIN RETURN[GetSenderSizeLimit[@mine]]; END; MySendAttention: ENTRY PROCEDURE = BEGIN OPEN mine; ENABLE UNWIND => NULL; WHILE outIntPending DO SELECT state FROM open, talking => NULL; idle, halfOpen => Glitch[StreamNotOpen]; ENDCASE => StreamDied[@mine, NIL ! UNWIND => NULL]; WAIT stateChange; ENDLOOP; outIntPending ← TRUE; SendInt[@mine]; END; MyWaitForAttention: ENTRY PROCEDURE = BEGIN OPEN mine; ENABLE UNWIND => NULL; WHILE seenIntSeq=inIntSeq DO WAIT waitingForInterrupt; ENDLOOP; seenIntSeq ← seenIntSeq+1; END; -- initialization mine.me.get ← Get; -- do cold stuff here to avoid recompilation hassels mine.me.put ← Put; mine.me.putMark ← PutMark; mine.me.getSenderSizeLimit ← MyGetSenderSizeLimit; mine.me.sendAttention ← MySendAttention; mine.me.waitForAttention ← MyWaitForAttention; mine.me.getLocalAddress ← MyGetLocalAddress; mine.slurp ← Slurp; mine.retransmitter ← Retransmitter; RETURN[@mine]; END. -- PupPktHot