-- File: PupPktCool.mesa - last edit: -- MAS October 13, 1980 5:03 PM -- HGM October 27, 1980 10:09 AM -- Copyright Xerox Corporation 1979, 1980 DIRECTORY StatsDefs USING [StatIncr], CommUtilDefs USING [ AppendChar, AllocateHeapString, DisableTimeout, FreeHeapString, GetTicks, GlobalFrame, InitializeCondition, InitializeMonitor, MsecToTicks, SetTimeout, Ticks, UnNew], PupRouterDefs USING [NextPupConnectionID], PupPktPrivateDefs, DriverDefs USING [MaybeGetFreePupBuffer, doCheck, doStats, Glitch], PupStream USING [CloseReason, StreamClosing, PupOpenMode], PupPktDefs USING [PupPktStream], PupDefs USING [ Pair, defaultPupsToAllocate, DequeuePup, GetLocalPupAddress, DataWordsPerPupBuffer, ReturnFreePupBuffer, PupAddress, PupBuffer, PupSocketID, PupRouterSendThis, SetPupContentsBytes, Tocks, veryLongWait, veryShortWait, PupSocketMake, PupSocketKick, PupSocketDestroy], BufferDefs USING [QueueCleanup, QueueInitialize], PupTypes USING [ PupType, fillInSocketID, noProcessPupErrorCode, gatewayResourceLimitsPupErrorCode]; PupPktCool: MONITOR LOCKS him.lock USING him: PupPktPrivateDefs.Instance IMPORTS StatsDefs, CommUtilDefs, PupStream, PupRouterDefs, DriverDefs, PupPktPrivateDefs, PupDefs, BufferDefs EXPORTS PupPktPrivateDefs, PupStream, PupPktDefs = BEGIN OPEN StatsDefs, PupPktPrivateDefs, DriverDefs, PupPktDefs, PupDefs; myPing: BOOLEAN _ TRUE; myMaxAllocate, myPathMaxAllocate: CARDINAL _ defaultPupsToAllocate; myMaxBufferSize: CARDINAL _ 0; NoBufferToSend: PUBLIC ERROR = CODE; StreamAlreadyOpen: PUBLIC ERROR = CODE; ctlRetransmitPulses: CommUtilDefs.Ticks _ CommUtilDefs.MsecToTicks[2000]; PupPktStreamCreate: PUBLIC PROCEDURE [remote: PupAddress, ticks: Tocks] RETURNS [PupPktStream] = BEGIN RETURN[PupPktStreamMake[PupTypes.fillInSocketID,remote,ticks,sendRfc,[0,0]]]; END; PupPktStreamMake: PUBLIC PROCEDURE [ local: PupSocketID, remote: PupAddress, ticks: Tocks, mode: PupStream.PupOpenMode, id: Pair] RETURNS [PupPktStream] = BEGIN new: PROGRAM RETURNS [Instance] _ NEW PupPktPrivateDefs.PupPktHot; him: Instance; him _ START new; -- Do initialization code InitializeEverything[him]; SELECT ticks FROM veryShortWait => him.dontWait _ TRUE; veryLongWait => CommUtilDefs.DisableTimeout[@him.inputReady]; ENDCASE => CommUtilDefs.SetTimeout[@him.inputReady,ticks]; MakeLocal[him,local,remote,mode,id ! UNWIND => PupPktStreamDestroy[@him.me] ]; RETURN[@him.me]; END; PupPktStreamDestroy: PUBLIC PROCEDURE [ps: PupPktStream] = BEGIN who: POINTER _ CommUtilDefs.GlobalFrame[ps.put]; krock: Instance = LOOPHOLE[1234]; offset: INTEGER = @krock.me-LOOPHOLE[krock,POINTER]; him: Instance _ LOOPHOLE[ps-offset]; DestroyLocal[him]; CommUtilDefs.UnNew[who]; END; SetMaxAllocation: PUBLIC PROCEDURE [n: CARDINAL] = BEGIN myMaxAllocate _ n; myPathMaxAllocate _ MIN[defaultPupsToAllocate,n]; END; SetMaxBufferSize: PUBLIC PROCEDURE [n: CARDINAL] = BEGIN myMaxBufferSize _ 2*MIN[n,DataWordsPerPupBuffer[]]; END; SetPinging: PUBLIC PROCEDURE [ping: BOOLEAN] = BEGIN myPing _ ping; END; GetSenderSizeLimit: PUBLIC ENTRY PROCEDURE [him: Instance] RETURNS [CARDINAL] = BEGIN OPEN him; ENABLE UNWIND => NULL; UNTIL outEnd#0 DO WAIT stateChange; IF state=closed THEN ERROR PupStream.StreamClosing[whyClosed,text]; ENDLOOP; RETURN[outEnd]; END; GetLocalAddress: PUBLIC PROCEDURE [him: Instance] RETURNS [PupAddress] = BEGIN OPEN him; RETURN[local]; END; InitializeEverything: PROCEDURE [him: Instance] = BEGIN him.state _ idle; him.c _ NIL; him.dontWait _ FALSE; him.dataBytesPerPup _ 2*DataWordsPerPupBuffer[]; him.outIntPending _ FALSE; him.outEnd _ 0; him.probeCounter _ 0; him.ping _ myPing; him.myMaxAllocate _ myMaxAllocate; him.pathMaxAllocate _ myPathMaxAllocate; him.hisMaxAllocate _ 0; him.throttle _ 0; him.unackedPups _ 0; him.allocatedPups _ 0; him.clumpsSinceBump _ 0; him.sentBuffer _ NIL; him.pleaseDie _ FALSE; him.sameNet _ FALSE; him.sendAck _ FALSE; him.aDataOut _ FALSE; him.retransmitTicks _ initialRetransmitTicks; him.whyClosed _ localClose; him.text _ NIL; BufferDefs.QueueInitialize[@him.inputQueue]; BufferDefs.QueueInitialize[@him.sentQueue]; BEGIN OPEN CommUtilDefs; InitializeCondition[@him.stateChange,MsecToTicks[1000]]; InitializeCondition[@him.retransmitterReady,MsecToTicks[100]]; InitializeCondition[@him.inputReady,MsecToTicks[5000]]; InitializeCondition[@him.waitingForInterrupt,MsecToTicks[5000]]; DisableTimeout[@him.waitingForInterrupt]; InitializeMonitor[@him.lock]; END; IF myMaxBufferSize#0 THEN him.dataBytesPerPup _ myMaxBufferSize; END; GotOther: PUBLIC PROCEDURE [Instance, PupBuffer] = GotOtherInternal; GotOtherInternal: INTERNAL PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; SELECT b.pupType FROM rfc => GotRfc[him]; end => GotEnd[him]; endRep => GotEndReply[him]; abort => GotAbort[him]; error => GotError[him]; int => GotInt[him]; intRep => GotIntReply[him]; ENDCASE => IF doStats THEN StatIncr[statPacketsRejectedBadType]; END; GotRfc: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF state#idle AND c.pupID#connectionID THEN BEGIN IF doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM idle => BEGIN state _ open; remote _ c.address; OpenInit[him,c.pupID]; SendRfcInternal[him]; MySendAck[him]; BROADCAST stateChange; END; halfOpen => BEGIN state _ open; remote _ c.address; MySendAck[him]; BROADCAST stateChange; END; open, talking => IF mode#sendRfc THEN SendRfcInternal[him]; ENDCASE --end, closed, finishing-- => SendAbortInternal[him]; END; GotEnd: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c.pupID#connectionID THEN BEGIN IF doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM talking, finishing => BEGIN state _ finishing; whyClosed _ remoteClose; END; ENDCASE => BEGIN SmashClosedInternal[him,remoteClose]; SendEndReplyInternal[him]; END; END; GotEndReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c.pupID#connectionID THEN BEGIN IF doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM open, talking, halfOpen, finishing => SendAbortInternal[him]; ENDCASE --idle, end, closed-- => state_closed; BROADCAST stateChange; NOTIFY inputReady; END; GotError: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; i, len: CARDINAL; IF doStats THEN StatIncr[statErrorPacketsReceived]; SELECT c.errorCode FROM PupTypes.noProcessPupErrorCode => BEGIN -- like an abort IF c.source.socket#remote.socket THEN RETURN; SELECT state FROM idle, closed => RETURN; ENDCASE => SmashClosedInternal[him,remoteReject]; IF text=NIL THEN BEGIN len _ c.pupLength-bytesPerPupHeader-2*(10+1+1); len _ MIN[len,100]; text _ CommUtilDefs.AllocateHeapString[len]; FOR i IN [0..len) DO CommUtilDefs.AppendChar[text,c.errorText[i]]; ENDLOOP; END; END; PupTypes.gatewayResourceLimitsPupErrorCode => BEGIN throttle _ throttle+1; IF pathMaxAllocate=1 THEN -- Beware: We may have gone unstable retransmitTicks _ MIN[2*retransmitTicks,maxRetransmitTicks]; END; ENDCASE; END; GotAbort: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; i, len: CARDINAL; IF c.pupID#connectionID THEN BEGIN IF doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; IF state=idle OR state=closed THEN RETURN; IF text=NIL THEN BEGIN len _ c.pupLength-bytesPerPupHeader-2*(1); len _ MIN[len,100]; text _ CommUtilDefs.AllocateHeapString[len]; FOR i IN [0..len) DO CommUtilDefs.AppendChar[text,c.abortText[i]]; ENDLOOP; END; SELECT state FROM halfOpen => BEGIN SmashClosedInternal[him,remoteReject]; END; ENDCASE => SendAbortInternal[him]; END; GotInt: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; int: LONG INTEGER _ Flip[c.pupID]; SELECT TRUE FROM (int=inIntSeq-1) => SendIntReply[him]; -- retransmission (int=inIntSeq) => BEGIN inIntSeq _ inIntSeq+1; NOTIFY waitingForInterrupt; SendIntReply[him]; END; ENDCASE => RETURN; -- very old duplicate END; GotIntReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; int: LONG INTEGER _ Flip[c.pupID]; IF int=outIntSeq THEN BEGIN outIntPending _ FALSE; outIntSeq _ outIntSeq+1; END; BROADCAST stateChange; END; SendEnd: PUBLIC PROCEDURE [him: Instance] = SendEndInternal; SendEndInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; timer _ CommUtilDefs.GetTicks[]; IF c=NIL AND (c_MaybeGetFreePupBuffer[])=NIL THEN RETURN; Send[him,end,connectionID,0]; -- 15*2 is 30 sec IF (probeCounter_probeCounter+1)>15 THEN SmashClosed[him,transmissionTimeout]; END; SendEndReplyInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; Send[him,endRep,connectionID,0]; END; SendAbort: PUBLIC PROCEDURE [him: Instance] = SendAbortInternal; SendAbortInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; SmashClosedInternal[him,remoteReject]; IF c=NIL AND (c_MaybeGetFreePupBuffer[])=NIL THEN RETURN; c.abortCode _ 1; Send[him,abort,connectionID,2]; END; SendInt: PUBLIC PROCEDURE [him: Instance] = SendIntInternal; SendIntInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; outIntTime _ CommUtilDefs.GetTicks[]; IF c=NIL AND (c_MaybeGetFreePupBuffer[])=NIL THEN RETURN; Send[him,int,Flop[outIntSeq],0]; END; SendIntReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; Send[him,intRep,Flop[inIntSeq-1],0]; END; SendRfc: PUBLIC PROCEDURE [him: Instance] = SendRfcInternal; SendRfcInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c=NIL THEN c _ MaybeGetFreePupBuffer[]; IF c=NIL THEN RETURN; timer _ CommUtilDefs.GetTicks[]; c.address _ local; Send[him,rfc,connectionID,dataBytesPerRFC]; END; AnswerRfc: INTERNAL PROCEDURE [him: Instance, listener: PupSocketID] = BEGIN OPEN him; b: PupBuffer _ MaybeGetFreePupBuffer[]; IF b=NIL THEN RETURN; b.address _ local; b.pupType _ rfc; b.pupID _ connectionID; SetPupContentsBytes[b,dataBytesPerRFC]; b.source _ local; b.source.socket _ listener; b.dest _ remote; PupRouterSendThis[b]; END; -- send a control message using the current buffer Send: INTERNAL PROCEDURE [him: Instance, thisType: PupTypes.PupType, thisID: Pair, thisLen: CARDINAL] = BEGIN OPEN him; b: PupBuffer _ c; IF doCheck AND b=NIL THEN Glitch[NoBufferToSend]; c _ NIL; b.pupType _ thisType; b.pupID _ thisID; SetPupContentsBytes[b,thisLen]; b.source.socket _ local.socket; b.dest _ remote; PupRouterSendThis[b]; END; CloseReason: TYPE = PupStream.CloseReason; SmashClosed: PUBLIC PROCEDURE [Instance, CloseReason] = SmashClosedInternal; SmashClosedInternal: INTERNAL PROCEDURE [him: Instance, why: CloseReason] = BEGIN OPEN him; state _ closed; IF whyClosed=localClose THEN whyClosed _ why; -- don't clobber first reason BROADCAST stateChange; NOTIFY inputReady; END; StreamDied: PUBLIC PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; IF b#NIL THEN ReturnFreePupBuffer[b]; ERROR PupStream.StreamClosing[whyClosed,text]; END; Open: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; SELECT state FROM idle=> BEGIN state _ halfOpen; OpenInit[him,PupRouterDefs.NextPupConnectionID[]]; SendRfcInternal[him]; THROUGH [0..60) WHILE state#open DO -- 1 min total SELECT state FROM open => RETURN; closed => ERROR PupStream.StreamClosing[whyClosed,text]; ENDCASE => WAIT stateChange; -- 1 sec ENDLOOP; IF state=open THEN RETURN; -- it gets opened fast if local state _ closed; whyClosed _ transmissionTimeout; ERROR PupStream.StreamClosing[whyClosed,text]; END; ENDCASE=> Glitch[StreamAlreadyOpen]; END; OpenInit: INTERNAL PROCEDURE [him: Instance, newID: Pair] = BEGIN OPEN him; sameNet _ remote.net=local.net; probeCounter _ initialRetransmissions; -- also allows first (gratuitous) ack connectionID _ newID; nextInputID _ nextOutputID _ ackedID _ allocatedID _ allocationID _ maxOutputID _ outIntSeq _ inIntSeq _ seenIntSeq _ Flip[newID]; END; MakeLocal: ENTRY PROCEDURE [ him: Instance, l: PupSocketID, r: PupAddress, m: PupStream.PupOpenMode, id: Pair] = BEGIN OPEN him; ENABLE UNWIND => NULL; kludge: PupSocketID _ IF m#alreadyOpened THEN l ELSE PupTypes.fillInSocketID; local _ GetLocalPupAddress[kludge,@r]; remote _ r; mode _ m; connectionID _ id; socket _ PupSocketMake[local.socket,remote,veryLongWait,id]; retransmitterFork _ FORK retransmitter[]; slurpFork _ FORK slurp[]; SELECT mode FROM sendRfc => Open[him]; alreadyOpened => BEGIN state _ open; OpenInit[him,id]; AnswerRfc[him,l]; MySendAck[him]; END; wait => NULL; ENDCASE => ERROR; END; -- Copied (with only slight edits) from PupPktHot MySendAck: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; b: PupBuffer; IF c#NIL THEN BEGIN b_c; c_NIL; END ELSE IF (b_MaybeGetFreePupBuffer[])=NIL THEN RETURN; b.pupBody _ ack [ dataBytesPerPup, MAX[0,INTEGER[myMaxAllocate-inputQueue.length]], byteAllocate]; allocatedID _ nextInputID+byteAllocate; b.pupType _ ack; b.pupID _ Flop[nextInputID]; b.pupLength _ bytesPerAck; b.source.socket _ local.socket; b.dest _ remote; PupRouterSendThis[b]; IF doStats THEN StatIncr[statAcksSent]; -- MAXC doesn't send free ACK, so set clock ahead timer _ CommUtilDefs.GetTicks[]+ctlRetransmitPulses/16-ctlRetransmitPulses; END; DestroyLocal: PROCEDURE [him: Instance] = BEGIN OPEN him; DestroyLocalLocked[him]; JOIN retransmitterFork; JOIN slurpFork; -- IF sentBuffer#NIL THEN ReturnFreePupBuffer[sentBuffer]; BufferDefs.QueueCleanup[@inputQueue]; BufferDefs.QueueCleanup[@sentQueue]; PupSocketDestroy[socket]; IF text#NIL THEN CommUtilDefs.FreeHeapString[text]; END; DestroyLocalLocked: ENTRY PROCEDURE [him: Instance] = INLINE BEGIN OPEN him; THROUGH [0..100) WHILE outIntPending DO IF state=closed THEN EXIT; -- probably smashed closed, don't hang WAIT stateChange; ENDLOOP; DO SELECT state FROM open=> BEGIN state_end; SendEndInternal[him]; probeCounter _ 0; EXIT; END; talking, finishing => WAIT stateChange; halfOpen => BEGIN SendAbortInternal[him]; EXIT; END; ENDCASE--closed, end, idle-- => EXIT; ENDLOOP; THROUGH [0..10) UNTIL state=idle OR state=closed DO -- extra layer for debugging UNTIL state=idle OR state=closed DO IF c#NIL THEN ReturnFreePupBuffer[c]; -- probe for allocate UNTIL (c_DequeuePup[@inputQueue])=NIL DO ReturnFreePupBuffer[c]; ENDLOOP; WAIT stateChange; ENDLOOP; ENDLOOP; state _ closed; NOTIFY retransmitterReady; UNTIL unackedPups=0 DO WAIT stateChange; ENDLOOP; -- retransmitter gets them pleaseDie _ TRUE; NOTIFY retransmitterReady; PupSocketKick[socket]; END; -- initialization END. (1792)\4042t10 10t0 42t10 4t0 16t10 7t0 16t10 2t0 22t10 5t0