<> <> <> <> <> <<>> DIRECTORY ConvertUnsafe USING [ToRope], DriverDefs USING [MaybeGetFreePupBuffer, Glitch], CommFlags USING [doDebug, doStats], ProcessorFace USING [GetClockPulses], PupRouterDefs USING [NextPupConnectionID], PupPktOps, PupStream USING [CloseReason, StreamClosing, PupOpenMode], PupPktDefs USING [PupPktStream], PupDefs USING [Pair, defaultPupsToAllocate, DequeuePup, GetLocalPupAddress, ReturnFreePupBuffer, PupAddress, PupBuffer, PupSocketID, PupRouterSendThis, SetPupContentsBytes, Tocks, veryLongWait, PupSocketMake], PupTypes USING [PupType, fillInSocketID], Rope USING [Fetch, Length, ROPE], StatsDefs USING [StatIncr]; PupPktCool: MONITOR LOCKS him USING him: PupPktOps.Instance IMPORTS ConvertUnsafe, DriverDefs, ProcessorFace, PupStream, PupRouterDefs, PupPktOps, PupDefs, Rope, StatsDefs EXPORTS PupPktOps, PupPktDefs = BEGIN OPEN StatsDefs, PupPktOps, DriverDefs, PupPktDefs, PupDefs; PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData; myPing: BOOLEAN _ TRUE; myMaxAllocate, myPathMaxAllocate: CARDINAL _ defaultPupsToAllocate; myMaxBufferSize: CARDINAL _ 0; NoBufferToSend: PUBLIC ERROR = CODE; StreamAlreadyOpen: PUBLIC ERROR = CODE; PupPktStreamAbort: PUBLIC PROCEDURE [ps: PupPktStream, e: Rope.ROPE] = BEGIN SendAbortWithText[ps, e]; END; GetSenderSizeLimit: PUBLIC ENTRY PROCEDURE [him: Instance] RETURNS [CARDINAL] = BEGIN OPEN him; ENABLE UNWIND => NULL; UNTIL outEnd # 0 DO WAIT stateChange; IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text]; ENDLOOP; RETURN[outEnd]; END; GetLocalAddress: PUBLIC PROCEDURE [him: Instance] RETURNS [PupAddress] = BEGIN OPEN him; RETURN[local]; END; GotOther: PUBLIC PROCEDURE [Instance, PupBuffer] = GotOtherInternal; GotOtherInternal: INTERNAL PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; SELECT b.pupType FROM rfc => GotRfc[him]; end => GotEnd[him]; endRep => GotEndReply[him]; abort => GotAbort[him]; error => GotError[him]; int => GotInt[him]; intRep => GotIntReply[him]; ENDCASE => IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType]; END; GotRfc: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF state # idle AND c.pupID # connectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM idle => BEGIN state _ open; remote _ c.address; OpenInit[him, c.pupID]; SendRfcInternal[him]; MySendAck[him]; BROADCAST stateChange; END; halfOpen => BEGIN state _ open; remote _ c.address; MySendAck[him]; BROADCAST stateChange; END; open, talking => IF mode # sendRfc THEN SendRfcInternal[him]; ENDCASE --end, closed, finishing-- => SendAbortInternal[him]; END; GotEnd: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c.pupID # connectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM talking, finishing => BEGIN state _ finishing; whyClosed _ remoteClose; END; ENDCASE => BEGIN SmashClosedInternal[him, remoteClose]; SendEndReplyInternal[him]; END; END; GotEndReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c.pupID # connectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM open, talking, halfOpen, finishing => SendAbortInternal[him]; ENDCASE --idle, end, closed-- => state _ closed; BROADCAST stateChange; NOTIFY inputReady; END; GotError: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived]; SELECT c.errorCode FROM noProcessPupErrorCode => BEGIN -- like an abort IF c.source.socket # remote.socket THEN RETURN; SELECT state FROM idle, closed => RETURN; ENDCASE => SmashClosedInternal[him, remoteReject]; IF text = NIL THEN BEGIN s: STRING = [100]; s.length _ MIN[c.pupLength - bytesPerPupHeader - 2*(10+1+1), s.maxlength]; FOR i: CARDINAL IN [0..s.length) DO s[i] _ c.errorText[i]; ENDLOOP; text _ ConvertUnsafe.ToRope[s]; END; END; gatewayResourceLimitsPupErrorCode => BEGIN throttle _ throttle + 1; IF pathMaxAllocate = 1 THEN <> retransmitPulses _ MIN[2*retransmitPulses, maxRetransmitPulses]; END; ENDCASE; END; GotAbort: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c.pupID # connectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; IF state = idle OR state = closed THEN RETURN; IF text = NIL THEN BEGIN s: STRING = [100]; s.length _ MIN[c.pupLength - bytesPerPupHeader - 2*(1), s.maxlength]; FOR i: CARDINAL IN [0..s.length) DO s[i] _ c.abortText[i]; ENDLOOP; text _ ConvertUnsafe.ToRope[s]; END; SELECT state FROM halfOpen => BEGIN SmashClosedInternal[him, remoteReject]; END; ENDCASE => SendAbortInternal[him]; END; GotInt: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; int: LONG INTEGER _ Flip[c.pupID]; SELECT TRUE FROM (int = inIntSeq - 1) => SendIntReply[him]; -- retransmission (int = inIntSeq) => BEGIN inIntSeq _ inIntSeq + 1; NOTIFY waitingForInterrupt; SendIntReply[him]; END; ENDCASE => RETURN; -- very old duplicate END; GotIntReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; int: LONG INTEGER _ Flip[c.pupID]; IF int = outIntSeq THEN BEGIN outIntPending _ FALSE; outIntSeq _ outIntSeq + 1; END; BROADCAST stateChange; END; SendEnd: PUBLIC PROCEDURE [him: Instance] = SendEndInternal; SendEndInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; timer _ ProcessorFace.GetClockPulses[]; IF c = NIL AND (c _ MaybeGetFreePupBuffer[]) = NIL THEN RETURN; Send[him, end, connectionID, 0]; <<15*2 is 30 sec>> IF (probeCounter _ probeCounter + 1) > 15 THEN SmashClosed[him, transmissionTimeout]; END; SendEndReplyInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; Send[him, endRep, connectionID, 0]; END; SendAbort: PUBLIC PROCEDURE [him: Instance] = SendAbortInternal; SendAbortInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; SmashClosedInternal[him, remoteReject]; IF c = NIL AND (c _ MaybeGetFreePupBuffer[]) = NIL THEN RETURN; c.abortCode _ 1; Send[him, abort, connectionID, 2]; END; SendAbortWithText: ENTRY PROCEDURE [him: Instance, e: Rope.ROPE] = BEGIN OPEN him; charsPerAbortHeader: CARDINAL = 2; chars: CARDINAL = MIN[e.Length[], dataBytesPerPup - charsPerAbortHeader]; SmashClosedInternal[him, localAbort]; IF c = NIL AND (c _ MaybeGetFreePupBuffer[]) = NIL THEN RETURN; c.abortCode _ 1; FOR i: CARDINAL IN [0..chars) DO c.abortText[i] _ e.Fetch[i]; ENDLOOP; Send[him, abort, connectionID, charsPerAbortHeader + chars]; END; SendInt: PUBLIC PROCEDURE [him: Instance] = SendIntInternal; SendIntInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; outIntTime _ ProcessorFace.GetClockPulses[]; IF c = NIL AND (c _ MaybeGetFreePupBuffer[]) = NIL THEN RETURN; Send[him, int, Flop[outIntSeq], 0]; END; SendIntReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; Send[him, intRep, Flop[inIntSeq - 1], 0]; END; SendRfc: PUBLIC PROCEDURE [him: Instance] = SendRfcInternal; SendRfcInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c = NIL THEN c _ MaybeGetFreePupBuffer[]; IF c = NIL THEN RETURN; timer _ ProcessorFace.GetClockPulses[]; c.address _ local; Send[him, rfc, connectionID, dataBytesPerRFC]; END; AnswerRfc: INTERNAL PROCEDURE [him: Instance, listener: PupSocketID] = BEGIN OPEN him; b: PupBuffer _ MaybeGetFreePupBuffer[]; IF b = NIL THEN RETURN; b.address _ local; b.pupType _ rfc; b.pupID _ connectionID; SetPupContentsBytes[b, dataBytesPerRFC]; b.source _ local; b.source.socket _ listener; b.dest _ remote; PupRouterSendThis[b]; END; <> Send: INTERNAL PROCEDURE [ him: Instance, thisType: PupTypes.PupType, thisID: Pair, thisLen: CARDINAL] = BEGIN OPEN him; b: PupBuffer _ c; IF CommFlags.doDebug AND b = NIL THEN Glitch[NoBufferToSend]; c _ NIL; b.pupType _ thisType; b.pupID _ thisID; SetPupContentsBytes[b, thisLen]; b.source _ local; b.dest _ remote; PupRouterSendThis[b]; END; CloseReason: TYPE = PupStream.CloseReason; SmashClosed: PUBLIC PROCEDURE [Instance, CloseReason] = SmashClosedInternal; SmashClosedInternal: INTERNAL PROCEDURE [him: Instance, why: CloseReason] = BEGIN OPEN him; state _ closed; IF whyClosed = localClose THEN whyClosed _ why; -- don't clobber first reason BROADCAST stateChange; NOTIFY inputReady; END; StreamDied: PUBLIC PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; IF b # NIL THEN ReturnFreePupBuffer[b]; ERROR PupStream.StreamClosing[whyClosed, text]; END; Open: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; SELECT state FROM idle => BEGIN state _ halfOpen; OpenInit[him, PupRouterDefs.NextPupConnectionID[]]; SendRfcInternal[him]; THROUGH [0..60) WHILE state # open DO <<1 min total>> SELECT state FROM open => RETURN; closed => ERROR PupStream.StreamClosing[whyClosed, text]; ENDCASE => WAIT stateChange; -- 1 sec ENDLOOP; IF state = open THEN RETURN; -- it gets opened fast if local state _ closed; whyClosed _ transmissionTimeout; ERROR PupStream.StreamClosing[whyClosed, text]; END; ENDCASE => Glitch[StreamAlreadyOpen]; END; OpenInit: INTERNAL PROCEDURE [him: Instance, newID: Pair] = BEGIN OPEN him; sameNet _ remote.net = local.net; probeCounter _ initialRetransmissions; -- also allows first (gratuitous) ack connectionID _ newID; nextInputID _ nextOutputID _ ackedID _ allocatedID _ allocationID _ maxOutputID _ outIntSeq _ inIntSeq _ seenIntSeq _ Flip[newID]; END; MakeLocal: PUBLIC ENTRY PROCEDURE [ him: Instance, l: PupSocketID, r: PupAddress, m: PupStream.PupOpenMode, id: Pair] = BEGIN OPEN him; ENABLE UNWIND => NULL; kludge: PupSocketID _ IF m # alreadyOpened THEN l ELSE PupTypes.fillInSocketID; local _ GetLocalPupAddress[kludge, r]; remote _ r; mode _ m; connectionID _ id; socket _ PupSocketMake[local.socket, remote, veryLongWait, id]; retransmitterFork _ FORK Retransmitter[him]; slurpFork _ FORK Slurp[him]; SELECT mode FROM sendRfc => Open[him]; alreadyOpened => BEGIN state _ open; OpenInit[him, id]; AnswerRfc[him, l]; MySendAck[him]; END; wait => NULL; ENDCASE => ERROR; END; <> MySendAck: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; b: PupBuffer; IF c # NIL THEN BEGIN b _ c; c _ NIL; END ELSE IF (b _ MaybeGetFreePupBuffer[]) = NIL THEN RETURN; b.pupBody _ ack[ dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]], byteAllocate]; allocatedID _ nextInputID + byteAllocate; b.pupType _ ack; b.pupID _ Flop[nextInputID]; b.pupLength _ bytesPerAck; b.source _ local; b.dest _ remote; PupRouterSendThis[b]; IF CommFlags.doStats THEN StatIncr[statAcksSent]; <> timer _ ProcessorFace.GetClockPulses[] + ctlRetransmitPulses/16 - ctlRetransmitPulses; END; DestroyLocalLocked: PUBLIC ENTRY PROCEDURE [him: Instance] = BEGIN OPEN him; THROUGH [0..100) WHILE outIntPending DO IF state = closed THEN EXIT; -- probably smashed closed, don't hang WAIT stateChange; ENDLOOP; DO SELECT state FROM open => BEGIN state _ end; SendEndInternal[him]; probeCounter _ 0; EXIT; END; talking, finishing => WAIT stateChange; halfOpen => BEGIN SendAbortInternal[him]; EXIT; END; ENDCASE --closed, end, idle-- => EXIT; ENDLOOP; THROUGH [0..10) UNTIL state = idle OR state = closed DO <> UNTIL state = idle OR state = closed DO IF c # NIL THEN ReturnFreePupBuffer[c]; -- probe for allocate UNTIL (c _ DequeuePup[inputQueue]) = NIL DO ReturnFreePupBuffer[c]; ENDLOOP; WAIT stateChange; ENDLOOP; ENDLOOP; state _ closed; NOTIFY retransmitterReady; UNTIL unackedPups = 0 DO WAIT stateChange; ENDLOOP; -- retransmitter gets them pleaseDie _ TRUE; NOTIFY retransmitterReady; END; END.