DIRECTORY
ConvertUnsafe USING [ToRope],
DriverDefs USING [MaybeGetFreePupBuffer, Glitch],
CommFlags USING [doDebug, doStats],
ProcessorFace USING [GetClockPulses],
PupRouterDefs USING [NextPupConnectionID],
PupPktOps,
PupStream USING [CloseReason, StreamClosing, PupOpenMode],
PupPktDefs USING [PupPktStream],
PupDefs USING [Pair, defaultPupsToAllocate, DequeuePup, GetLocalPupAddress, ReturnFreePupBuffer, PupAddress, PupBuffer, PupSocketID, PupRouterSendThis, SetPupContentsBytes, Tocks, veryLongWait, PupSocketMake],
PupTypes USING [PupType, fillInSocketID],
Rope USING [Fetch, Length, ROPE],
StatsDefs USING [StatIncr];
PupPktCool:
MONITOR
LOCKS him
USING him: PupPktOps.Instance
IMPORTS ConvertUnsafe, DriverDefs, ProcessorFace, PupStream, PupRouterDefs, PupPktOps, PupDefs, Rope, StatsDefs
EXPORTS PupPktOps, PupPktDefs =
BEGIN OPEN StatsDefs, PupPktOps, DriverDefs, PupPktDefs, PupDefs;
PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData;
myPing: BOOLEAN ← TRUE;
myMaxAllocate, myPathMaxAllocate: CARDINAL ← defaultPupsToAllocate;
myMaxBufferSize: CARDINAL ← 0;
NoBufferToSend: PUBLIC ERROR = CODE;
StreamAlreadyOpen: PUBLIC ERROR = CODE;
PupPktStreamAbort:
PUBLIC
PROCEDURE [ps: PupPktStream, e: Rope.
ROPE] =
BEGIN
SendAbortWithText[ps, e];
END;
GetSenderSizeLimit:
PUBLIC
ENTRY
PROCEDURE [him: Instance]
RETURNS [
CARDINAL] =
BEGIN OPEN him;
ENABLE UNWIND => NULL;
UNTIL outEnd # 0
DO
WAIT stateChange;
IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text];
ENDLOOP;
RETURN[outEnd];
END;
GetLocalAddress:
PUBLIC
PROCEDURE [him: Instance]
RETURNS [PupAddress] =
BEGIN OPEN him; RETURN[local]; END;
GotOther: PUBLIC PROCEDURE [Instance, PupBuffer] = GotOtherInternal;
GotOtherInternal:
INTERNAL
PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
SELECT b.pupType
FROM
rfc => GotRfc[him];
end => GotEnd[him];
endRep => GotEndReply[him];
abort => GotAbort[him];
error => GotError[him];
int => GotInt[him];
intRep => GotIntReply[him];
ENDCASE => IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType];
END;
GotRfc:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF state # idle
AND c.pupID # connectionID
THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state
FROM
idle =>
BEGIN
state ← open;
remote ← c.address;
OpenInit[him, c.pupID];
SendRfcInternal[him];
MySendAck[him];
BROADCAST stateChange;
END;
halfOpen =>
BEGIN
state ← open;
remote ← c.address;
MySendAck[him];
BROADCAST stateChange;
END;
open, talking => IF mode # sendRfc THEN SendRfcInternal[him];
ENDCASE --end, closed, finishing-- => SendAbortInternal[him];
END;
GotEnd:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c.pupID # connectionID
THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state
FROM
talking, finishing => BEGIN state ← finishing; whyClosed ← remoteClose; END;
ENDCASE =>
BEGIN
SmashClosedInternal[him, remoteClose];
SendEndReplyInternal[him];
END;
END;
GotEndReply:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c.pupID # connectionID
THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state
FROM
open, talking, halfOpen, finishing => SendAbortInternal[him];
ENDCASE --idle, end, closed-- => state ← closed;
BROADCAST stateChange;
NOTIFY inputReady;
END;
GotError:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived];
SELECT c.errorCode
FROM
noProcessPupErrorCode =>
BEGIN -- like an abort
IF c.source.socket # remote.socket THEN RETURN;
SELECT state
FROM
idle, closed => RETURN;
ENDCASE => SmashClosedInternal[him, remoteReject];
IF text =
NIL
THEN
BEGIN
s: STRING = [100];
s.length ← MIN[c.pupLength - bytesPerPupHeader - 2*(10+1+1), s.maxlength];
FOR i:
CARDINAL
IN [0..s.length)
DO
s[i] ← c.errorText[i];
ENDLOOP;
text ← ConvertUnsafe.ToRope[s];
END;
END;
gatewayResourceLimitsPupErrorCode =>
BEGIN
throttle ← throttle + 1;
IF pathMaxAllocate = 1
THEN
Beware: We may have gone unstable
retransmitPulses ←
MIN[2*retransmitPulses, maxRetransmitPulses];
END;
ENDCASE;
END;
GotAbort:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c.pupID # connectionID
THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
IF state = idle OR state = closed THEN RETURN;
IF text =
NIL
THEN
BEGIN
s:
STRING = [100];
s.length ← MIN[c.pupLength - bytesPerPupHeader - 2*(1), s.maxlength];
FOR i: CARDINAL IN [0..s.length) DO
s[i] ← c.abortText[i];
ENDLOOP;
text ← ConvertUnsafe.ToRope[s];
END;
SELECT state
FROM
halfOpen => BEGIN SmashClosedInternal[him, remoteReject]; END;
ENDCASE => SendAbortInternal[him];
END;
GotInt:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
int: LONG INTEGER ← Flip[c.pupID];
SELECT
TRUE
FROM
(int = inIntSeq - 1) => SendIntReply[him]; -- retransmission
(int = inIntSeq) =>
BEGIN
inIntSeq ← inIntSeq + 1;
NOTIFY waitingForInterrupt;
SendIntReply[him];
END;
ENDCASE => RETURN; -- very old duplicate
END;
GotIntReply:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
int: LONG INTEGER ← Flip[c.pupID];
IF int = outIntSeq
THEN
BEGIN outIntPending ← FALSE; outIntSeq ← outIntSeq + 1; END;
BROADCAST stateChange;
END;
SendEnd: PUBLIC PROCEDURE [him: Instance] = SendEndInternal;
SendEndInternal:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
timer ← ProcessorFace.GetClockPulses[];
IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
Send[him, end, connectionID, 0];
15*2 is 30 sec
IF (probeCounter ← probeCounter + 1) > 15
THEN
SmashClosed[him, transmissionTimeout];
END;
SendEndReplyInternal:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him; Send[him, endRep, connectionID, 0]; END;
SendAbort: PUBLIC PROCEDURE [him: Instance] = SendAbortInternal;
SendAbortInternal:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
SmashClosedInternal[him, remoteReject];
IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
c.abortCode ← 1;
Send[him, abort, connectionID, 2];
END;
SendAbortWithText:
ENTRY
PROCEDURE [him: Instance, e: Rope.
ROPE] =
BEGIN OPEN him;
charsPerAbortHeader: CARDINAL = 2;
chars: CARDINAL = MIN[e.Length[], dataBytesPerPup - charsPerAbortHeader];
SmashClosedInternal[him, localAbort];
IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
c.abortCode ← 1;
FOR i: CARDINAL IN [0..chars) DO c.abortText[i] ← e.Fetch[i]; ENDLOOP;
Send[him, abort, connectionID, charsPerAbortHeader + chars];
END;
SendInt: PUBLIC PROCEDURE [him: Instance] = SendIntInternal;
SendIntInternal:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
outIntTime ← ProcessorFace.GetClockPulses[];
IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
Send[him, int, Flop[outIntSeq], 0];
END;
SendIntReply:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him; Send[him, intRep, Flop[inIntSeq - 1], 0]; END;
SendRfc: PUBLIC PROCEDURE [him: Instance] = SendRfcInternal;
SendRfcInternal:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c = NIL THEN c ← MaybeGetFreePupBuffer[];
IF c = NIL THEN RETURN;
timer ← ProcessorFace.GetClockPulses[];
c.address ← local;
Send[him, rfc, connectionID, dataBytesPerRFC];
END;
AnswerRfc:
INTERNAL
PROCEDURE [him: Instance, listener: PupSocketID] =
BEGIN OPEN him;
b: PupBuffer ← MaybeGetFreePupBuffer[];
IF b = NIL THEN RETURN;
b.address ← local;
b.pupType ← rfc;
b.pupID ← connectionID;
SetPupContentsBytes[b, dataBytesPerRFC];
b.source ← local;
b.source.socket ← listener;
b.dest ← remote;
PupRouterSendThis[b];
END;
send a control message using the current buffer
Send:
INTERNAL
PROCEDURE [
him: Instance, thisType: PupTypes.PupType, thisID: Pair, thisLen: CARDINAL] =
BEGIN OPEN him;
b: PupBuffer ← c;
IF CommFlags.doDebug AND b = NIL THEN Glitch[NoBufferToSend];
c ← NIL;
b.pupType ← thisType;
b.pupID ← thisID;
SetPupContentsBytes[b, thisLen];
b.source ← local;
b.dest ← remote;
PupRouterSendThis[b];
END;
CloseReason: TYPE = PupStream.CloseReason;
SmashClosed: PUBLIC PROCEDURE [Instance, CloseReason] = SmashClosedInternal;
SmashClosedInternal:
INTERNAL
PROCEDURE [him: Instance, why: CloseReason] =
BEGIN OPEN him;
state ← closed;
IF whyClosed = localClose THEN whyClosed ← why; -- don't clobber first reason
BROADCAST stateChange;
NOTIFY inputReady;
END;
StreamDied:
PUBLIC
PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
IF b # NIL THEN ReturnFreePupBuffer[b];
ERROR PupStream.StreamClosing[whyClosed, text];
END;
Open:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
SELECT state
FROM
idle =>
BEGIN
state ← halfOpen;
OpenInit[him, PupRouterDefs.NextPupConnectionID[]];
SendRfcInternal[him];
THROUGH [0..60)
WHILE state # open
DO
1 min total
SELECT state
FROM
open => RETURN;
closed => ERROR PupStream.StreamClosing[whyClosed, text];
ENDCASE => WAIT stateChange; -- 1 sec
ENDLOOP;
IF state = open THEN RETURN; -- it gets opened fast if local
state ← closed;
whyClosed ← transmissionTimeout;
ERROR PupStream.StreamClosing[whyClosed, text];
END;
ENDCASE => Glitch[StreamAlreadyOpen];
END;
OpenInit:
INTERNAL
PROCEDURE [him: Instance, newID: Pair] =
BEGIN OPEN him;
sameNet ← remote.net = local.net;
probeCounter ← initialRetransmissions; -- also allows first (gratuitous) ack
connectionID ← newID;
nextInputID ← nextOutputID ← ackedID ← allocatedID ← allocationID ←
maxOutputID ← outIntSeq ← inIntSeq ← seenIntSeq ← Flip[newID];
END;
MakeLocal:
PUBLIC
ENTRY
PROCEDURE [
him: Instance, l: PupSocketID, r: PupAddress, m: PupStream.PupOpenMode,
id: Pair] =
BEGIN OPEN him;
ENABLE UNWIND => NULL;
kludge: PupSocketID ←
IF m # alreadyOpened THEN l ELSE PupTypes.fillInSocketID;
local ← GetLocalPupAddress[kludge, r];
remote ← r;
mode ← m;
connectionID ← id;
socket ← PupSocketMake[local.socket, remote, veryLongWait, id];
retransmitterFork ← FORK Retransmitter[him];
slurpFork ← FORK Slurp[him];
SELECT mode
FROM
sendRfc => Open[him];
alreadyOpened =>
BEGIN
state ← open;
OpenInit[him, id];
AnswerRfc[him, l];
MySendAck[him];
END;
wait => NULL;
ENDCASE => ERROR;
END;
Copied (with only slight edits) from PupPktHot
MySendAck:
INTERNAL
PROCEDURE [him: Instance] =
BEGIN OPEN him;
b: PupBuffer;
IF c # NIL THEN BEGIN b ← c; c ← NIL; END
ELSE IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
b.pupBody ← ack[
dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]],
byteAllocate];
allocatedID ← nextInputID + byteAllocate;
b.pupType ← ack;
b.pupID ← Flop[nextInputID];
b.pupLength ← bytesPerAck;
b.source ← local;
b.dest ← remote;
PupRouterSendThis[b];
IF CommFlags.doStats THEN StatIncr[statAcksSent];
MAXC doesn't send free ACK, so set clock ahead
timer ←
ProcessorFace.GetClockPulses[] + ctlRetransmitPulses/16 - ctlRetransmitPulses;
END;
DestroyLocalLocked:
PUBLIC
ENTRY
PROCEDURE [him: Instance] =
BEGIN OPEN him;
THROUGH [0..100)
WHILE outIntPending
DO
IF state = closed THEN EXIT; -- probably smashed closed, don't hang
WAIT stateChange;
ENDLOOP;
DO
SELECT state
FROM
open =>
BEGIN state ← end; SendEndInternal[him]; probeCounter ← 0; EXIT; END;
talking, finishing => WAIT stateChange;
halfOpen => BEGIN SendAbortInternal[him]; EXIT; END;
ENDCASE --closed, end, idle-- => EXIT;
ENDLOOP;
THROUGH [0..10)
UNTIL state = idle
OR state = closed
DO
extra layer for debugging
UNTIL state = idle
OR state = closed
DO
IF c # NIL THEN ReturnFreePupBuffer[c]; -- probe for allocate
UNTIL (c ← DequeuePup[inputQueue]) =
NIL
DO
ReturnFreePupBuffer[c]; ENDLOOP;
WAIT stateChange;
ENDLOOP;
ENDLOOP;
state ← closed;
NOTIFY retransmitterReady;
UNTIL unackedPups = 0 DO WAIT stateChange; ENDLOOP; -- retransmitter gets them
pleaseDie ← TRUE;
NOTIFY retransmitterReady;
END;
END.