-- File: PupPktCool.mesa, Last Edit: HGM March 14, 1981 10:48 AM DIRECTORY Storage USING [String], String USING [AppendChar], System USING [Pulses, GetClockPulses], StatsDefs USING [StatIncr], CommFlags USING [doDebug, doStats], PupRouterDefs USING [NextPupConnectionID], PupPktOps, DriverDefs USING [MaybeGetFreePupBuffer, Glitch], PupStream USING [CloseReason, StreamClosing, PupOpenMode], PupPktDefs USING [PupPktStream], PupDefs USING [ Pair, defaultPupsToAllocate, DequeuePup, GetLocalPupAddress, ReturnFreePupBuffer, PupAddress, PupBuffer, PupSocketID, PupRouterSendThis, SetPupContentsBytes, Tocks, veryLongWait, PupSocketMake], PupTypes USING [PupType, fillInSocketID]; PupPktCool: MONITOR LOCKS him.lock USING him: PupPktOps.Instance IMPORTS Storage, String, System, StatsDefs, PupStream, PupRouterDefs, DriverDefs, PupPktOps, PupDefs EXPORTS PupPktOps, PupPktDefs = BEGIN OPEN StatsDefs, PupPktOps, DriverDefs, PupPktDefs, PupDefs; myPing: BOOLEAN ← TRUE; myMaxAllocate, myPathMaxAllocate: CARDINAL ← defaultPupsToAllocate; myMaxBufferSize: CARDINAL ← 0; NoBufferToSend: PUBLIC ERROR = CODE; StreamAlreadyOpen: PUBLIC ERROR = CODE; PupPktStreamAbort: PUBLIC PROCEDURE [ps: PupPktStream, e: STRING] = BEGIN krock: Instance = NIL; offset: INTEGER = @krock.me - LOOPHOLE[krock, POINTER]; him: Instance ← LOOPHOLE[ps - offset]; SendAbortWithText[him, e]; END; GetSenderSizeLimit: PUBLIC ENTRY PROCEDURE [him: Instance] RETURNS [CARDINAL] = BEGIN OPEN him; ENABLE UNWIND => NULL; UNTIL outEnd # 0 DO WAIT stateChange; IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text]; ENDLOOP; RETURN[outEnd]; END; GetLocalAddress: PUBLIC PROCEDURE [him: Instance] RETURNS [PupAddress] = BEGIN OPEN him; RETURN[local]; END; GotOther: PUBLIC PROCEDURE [Instance, PupBuffer] = GotOtherInternal; GotOtherInternal: INTERNAL PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; SELECT b.pupType FROM rfc => GotRfc[him]; end => GotEnd[him]; endRep => GotEndReply[him]; abort => GotAbort[him]; error => GotError[him]; int => GotInt[him]; intRep => GotIntReply[him]; ENDCASE => IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType]; END; GotRfc: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF state # idle AND c.pupID # connectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM idle => BEGIN state ← open; remote ← c.address; OpenInit[him, c.pupID]; SendRfcInternal[him]; MySendAck[him]; BROADCAST stateChange; END; halfOpen => BEGIN state ← open; remote ← c.address; MySendAck[him]; BROADCAST stateChange; END; open, talking => IF mode # sendRfc THEN SendRfcInternal[him]; ENDCASE --end, closed, finishing-- => SendAbortInternal[him]; END; GotEnd: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c.pupID # connectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM talking, finishing => BEGIN state ← finishing; whyClosed ← remoteClose; END; ENDCASE => BEGIN SmashClosedInternal[him, remoteClose]; SendEndReplyInternal[him]; END; END; GotEndReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c.pupID # connectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; SELECT state FROM open, talking, halfOpen, finishing => SendAbortInternal[him]; ENDCASE --idle, end, closed-- => state ← closed; BROADCAST stateChange; NOTIFY inputReady; END; GotError: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; i, len: CARDINAL; IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived]; SELECT c.errorCode FROM noProcessPupErrorCode => BEGIN -- like an abort IF c.source.socket # remote.socket THEN RETURN; SELECT state FROM idle, closed => RETURN; ENDCASE => SmashClosedInternal[him, remoteReject]; IF text = NIL THEN BEGIN len ← c.pupLength - bytesPerPupHeader - 2*(10 + 1 + 1); len ← MIN[len, 100]; text ← Storage.String[len]; FOR i IN [0..len) DO String.AppendChar[text, c.errorText[i]]; ENDLOOP; END; END; gatewayResourceLimitsPupErrorCode => BEGIN throttle ← throttle + 1; IF pathMaxAllocate = 1 THEN -- Beware: We may have gone unstable retransmitPulses ← [MIN[System.Pulses[2*retransmitPulses], maxRetransmitPulses]]; END; ENDCASE; END; GotAbort: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; i, len: CARDINAL; IF c.pupID # connectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; RETURN; END; IF state = idle OR state = closed THEN RETURN; IF text = NIL THEN BEGIN len ← c.pupLength - bytesPerPupHeader - 2*(1); len ← MIN[len, 100]; text ← Storage.String[len]; FOR i IN [0..len) DO String.AppendChar[text, c.abortText[i]]; ENDLOOP; END; SELECT state FROM halfOpen => BEGIN SmashClosedInternal[him, remoteReject]; END; ENDCASE => SendAbortInternal[him]; END; GotInt: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; int: LONG INTEGER ← Flip[c.pupID]; SELECT TRUE FROM (int = inIntSeq - 1) => SendIntReply[him]; -- retransmission (int = inIntSeq) => BEGIN inIntSeq ← inIntSeq + 1; NOTIFY waitingForInterrupt; SendIntReply[him]; END; ENDCASE => RETURN; -- very old duplicate END; GotIntReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; int: LONG INTEGER ← Flip[c.pupID]; IF int = outIntSeq THEN BEGIN outIntPending ← FALSE; outIntSeq ← outIntSeq + 1; END; BROADCAST stateChange; END; SendEnd: PUBLIC PROCEDURE [him: Instance] = SendEndInternal; SendEndInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; timer ← System.GetClockPulses[]; IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN; Send[him, end, connectionID, 0]; -- 15*2 is 30 sec IF (probeCounter ← probeCounter + 1) > 15 THEN SmashClosed[him, transmissionTimeout]; END; SendEndReplyInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; Send[him, endRep, connectionID, 0]; END; SendAbort: PUBLIC PROCEDURE [him: Instance] = SendAbortInternal; SendAbortInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; SmashClosedInternal[him, remoteReject]; IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN; c.abortCode ← 1; Send[him, abort, connectionID, 2]; END; SendAbortWithText: ENTRY PROCEDURE [him: Instance, e: STRING] = BEGIN OPEN him; charsPerAbortHeader: CARDINAL = 2; chars: CARDINAL = MIN[ IF e = NIL THEN 0 ELSE e.length, dataBytesPerPup - charsPerAbortHeader]; SmashClosedInternal[him, localAbort]; IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN; c.abortCode ← 1; FOR i: CARDINAL IN [0..chars) DO c.abortText[i] ← e[i]; ENDLOOP; Send[him, abort, connectionID, charsPerAbortHeader + chars]; END; SendInt: PUBLIC PROCEDURE [him: Instance] = SendIntInternal; SendIntInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; outIntTime ← System.GetClockPulses[]; IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN; Send[him, int, Flop[outIntSeq], 0]; END; SendIntReply: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; Send[him, intRep, Flop[inIntSeq - 1], 0]; END; SendRfc: PUBLIC PROCEDURE [him: Instance] = SendRfcInternal; SendRfcInternal: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; IF c = NIL THEN c ← MaybeGetFreePupBuffer[]; IF c = NIL THEN RETURN; timer ← System.GetClockPulses[]; c.address ← local; Send[him, rfc, connectionID, dataBytesPerRFC]; END; AnswerRfc: INTERNAL PROCEDURE [him: Instance, listener: PupSocketID] = BEGIN OPEN him; b: PupBuffer ← MaybeGetFreePupBuffer[]; IF b = NIL THEN RETURN; b.address ← local; b.pupType ← rfc; b.pupID ← connectionID; SetPupContentsBytes[b, dataBytesPerRFC]; b.source ← local; b.source.socket ← listener; b.dest ← remote; PupRouterSendThis[b]; END; -- send a control message using the current buffer Send: INTERNAL PROCEDURE [ him: Instance, thisType: PupTypes.PupType, thisID: Pair, thisLen: CARDINAL] = BEGIN OPEN him; b: PupBuffer ← c; IF CommFlags.doDebug AND b = NIL THEN Glitch[NoBufferToSend]; c ← NIL; b.pupType ← thisType; b.pupID ← thisID; SetPupContentsBytes[b, thisLen]; b.source ← local; b.dest ← remote; PupRouterSendThis[b]; END; CloseReason: TYPE = PupStream.CloseReason; SmashClosed: PUBLIC PROCEDURE [Instance, CloseReason] = SmashClosedInternal; SmashClosedInternal: INTERNAL PROCEDURE [him: Instance, why: CloseReason] = BEGIN OPEN him; state ← closed; IF whyClosed = localClose THEN whyClosed ← why; -- don't clobber first reason BROADCAST stateChange; NOTIFY inputReady; END; StreamDied: PUBLIC PROCEDURE [him: Instance, b: PupBuffer] = BEGIN OPEN him; IF b # NIL THEN ReturnFreePupBuffer[b]; ERROR PupStream.StreamClosing[whyClosed, text]; END; Open: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; SELECT state FROM idle => BEGIN state ← halfOpen; OpenInit[him, PupRouterDefs.NextPupConnectionID[]]; SendRfcInternal[him]; THROUGH [0..60) WHILE state # open DO -- 1 min total SELECT state FROM open => RETURN; closed => ERROR PupStream.StreamClosing[whyClosed, text]; ENDCASE => WAIT stateChange; -- 1 sec ENDLOOP; IF state = open THEN RETURN; -- it gets opened fast if local state ← closed; whyClosed ← transmissionTimeout; ERROR PupStream.StreamClosing[whyClosed, text]; END; ENDCASE => Glitch[StreamAlreadyOpen]; END; OpenInit: INTERNAL PROCEDURE [him: Instance, newID: Pair] = BEGIN OPEN him; sameNet ← remote.net = local.net; probeCounter ← initialRetransmissions; -- also allows first (gratuitous) ack connectionID ← newID; nextInputID ← nextOutputID ← ackedID ← allocatedID ← allocationID ← maxOutputID ← outIntSeq ← inIntSeq ← seenIntSeq ← Flip[newID]; END; MakeLocal: PUBLIC ENTRY PROCEDURE [ him: Instance, l: PupSocketID, r: PupAddress, m: PupStream.PupOpenMode, id: Pair] = BEGIN OPEN him; ENABLE UNWIND => NULL; kludge: PupSocketID ← IF m # alreadyOpened THEN l ELSE PupTypes.fillInSocketID; local ← GetLocalPupAddress[kludge, @r]; remote ← r; mode ← m; connectionID ← id; socket ← PupSocketMake[local.socket, remote, veryLongWait, id]; retransmitterFork ← FORK retransmitter[]; slurpFork ← FORK slurp[]; SELECT mode FROM sendRfc => Open[him]; alreadyOpened => BEGIN state ← open; OpenInit[him, id]; AnswerRfc[him, l]; MySendAck[him]; END; wait => NULL; ENDCASE => ERROR; END; -- Copied (with only slight edits) from PupPktHot MySendAck: INTERNAL PROCEDURE [him: Instance] = BEGIN OPEN him; b: PupBuffer; IF c # NIL THEN BEGIN b ← c; c ← NIL; END ELSE IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN; b.pupBody ← ack[ dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]], byteAllocate]; allocatedID ← nextInputID + byteAllocate; b.pupType ← ack; b.pupID ← Flop[nextInputID]; b.pupLength ← bytesPerAck; b.source ← local; b.dest ← remote; PupRouterSendThis[b]; IF CommFlags.doStats THEN StatIncr[statAcksSent]; -- MAXC doesn't send free ACK, so set clock ahead timer ← [System.GetClockPulses[] + ctlRetransmitPulses/16 - ctlRetransmitPulses]; END; DestroyLocalLocked: PUBLIC ENTRY PROCEDURE [him: Instance] = BEGIN OPEN him; THROUGH [0..100) WHILE outIntPending DO IF state = closed THEN EXIT; -- probably smashed closed, don't hang WAIT stateChange; ENDLOOP; DO SELECT state FROM open => BEGIN state ← end; SendEndInternal[him]; probeCounter ← 0; EXIT; END; talking, finishing => WAIT stateChange; halfOpen => BEGIN SendAbortInternal[him]; EXIT; END; ENDCASE --closed, end, idle-- => EXIT; ENDLOOP; THROUGH [0..10) UNTIL state = idle OR state = closed DO -- extra layer for debugging UNTIL state = idle OR state = closed DO IF c # NIL THEN ReturnFreePupBuffer[c]; -- probe for allocate UNTIL (c ← DequeuePup[@inputQueue]) = NIL DO ReturnFreePupBuffer[c]; ENDLOOP; WAIT stateChange; ENDLOOP; ENDLOOP; state ← closed; NOTIFY retransmitterReady; UNTIL unackedPups = 0 DO WAIT stateChange; ENDLOOP; -- retransmitter gets them pleaseDie ← TRUE; NOTIFY retransmitterReady; END; END.