PupPktCool.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Birrell June 22, 1983 5:12 pm
Levin, September 22, 1983 10:33 am
HGM, February 20, 1984 10:31:23 pm PST
DIRECTORY
ConvertUnsafe USING [ToRope],
DriverDefs USING [MaybeGetFreePupBuffer, Glitch],
CommFlags USING [doDebug, doStats],
ProcessorFace USING [GetClockPulses],
PupRouterDefs USING [NextPupConnectionID],
PupPktOps,
PupStream USING [CloseReason, StreamClosing, PupOpenMode],
PupPktDefs USING [PupPktStream],
PupDefs USING [Pair, defaultPupsToAllocate, DequeuePup, GetLocalPupAddress, ReturnFreePupBuffer, PupAddress, PupBuffer, PupSocketID, PupRouterSendThis, SetPupContentsBytes, Tocks, veryLongWait, PupSocketMake],
PupTypes USING [PupType, fillInSocketID],
Rope USING [Fetch, Length, ROPE],
StatsDefs USING [StatIncr];
PupPktCool: MONITOR LOCKS him USING him: PupPktOps.Instance
IMPORTS ConvertUnsafe, DriverDefs, ProcessorFace, PupStream, PupRouterDefs, PupPktOps, PupDefs, Rope, StatsDefs
EXPORTS PupPktOps, PupPktDefs =
BEGIN OPEN StatsDefs, PupPktOps, DriverDefs, PupPktDefs, PupDefs;
PupPktStreamObject: PUBLIC TYPE = PupPktOps.InstanceData;
myPing: BOOLEANTRUE;
myMaxAllocate, myPathMaxAllocate: CARDINAL ← defaultPupsToAllocate;
myMaxBufferSize: CARDINAL ← 0;
NoBufferToSend: PUBLIC ERROR = CODE;
StreamAlreadyOpen: PUBLIC ERROR = CODE;
PupPktStreamAbort: PUBLIC PROCEDURE [ps: PupPktStream, e: Rope.ROPE] =
BEGIN
SendAbortWithText[ps, e];
END;
GetSenderSizeLimit: PUBLIC ENTRY PROCEDURE [him: Instance] RETURNS [CARDINAL] =
BEGIN OPEN him;
ENABLE UNWIND => NULL;
UNTIL outEnd # 0 DO
WAIT stateChange;
IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text];
ENDLOOP;
RETURN[outEnd];
END;
GetLocalAddress: PUBLIC PROCEDURE [him: Instance] RETURNS [PupAddress] =
BEGIN OPEN him; RETURN[local]; END;
GotOther: PUBLIC PROCEDURE [Instance, PupBuffer] = GotOtherInternal;
GotOtherInternal: INTERNAL PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
SELECT b.pupType FROM
rfc => GotRfc[him];
end => GotEnd[him];
endRep => GotEndReply[him];
abort => GotAbort[him];
error => GotError[him];
int => GotInt[him];
intRep => GotIntReply[him];
ENDCASE => IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType];
END;
GotRfc: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF state # idle AND c.pupID # connectionID THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state FROM
idle =>
BEGIN
state ← open;
remote ← c.address;
OpenInit[him, c.pupID];
SendRfcInternal[him];
MySendAck[him];
BROADCAST stateChange;
END;
halfOpen =>
BEGIN
state ← open;
remote ← c.address;
MySendAck[him];
BROADCAST stateChange;
END;
open, talking => IF mode # sendRfc THEN SendRfcInternal[him];
ENDCASE --end, closed, finishing-- => SendAbortInternal[him];
END;
GotEnd: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c.pupID # connectionID THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state FROM
talking, finishing => BEGIN state ← finishing; whyClosed ← remoteClose; END;
ENDCASE =>
BEGIN
SmashClosedInternal[him, remoteClose];
SendEndReplyInternal[him];
END;
END;
GotEndReply: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c.pupID # connectionID THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state FROM
open, talking, halfOpen, finishing => SendAbortInternal[him];
ENDCASE --idle, end, closed-- => state ← closed;
BROADCAST stateChange;
NOTIFY inputReady;
END;
GotError: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived];
SELECT c.errorCode FROM
noProcessPupErrorCode =>
BEGIN -- like an abort
IF c.source.socket # remote.socket THEN RETURN;
SELECT state FROM
idle, closed => RETURN;
ENDCASE => SmashClosedInternal[him, remoteReject];
IF text = NIL THEN
BEGIN
s: STRING = [100];
s.length ← MIN[c.pupLength - bytesPerPupHeader - 2*(10+1+1), s.maxlength];
FOR i: CARDINAL IN [0..s.length) DO
s[i] ← c.errorText[i];
ENDLOOP;
text ← ConvertUnsafe.ToRope[s];
END;
END;
gatewayResourceLimitsPupErrorCode =>
BEGIN
throttle ← throttle + 1;
IF pathMaxAllocate = 1 THEN
Beware: We may have gone unstable
retransmitPulses ←
MIN[2*retransmitPulses, maxRetransmitPulses];
END;
ENDCASE;
END;
GotAbort: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c.pupID # connectionID THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
IF state = idle OR state = closed THEN RETURN;
IF text = NIL THEN
BEGIN
s: STRING = [100];
s.length ← MIN[c.pupLength - bytesPerPupHeader - 2*(1), s.maxlength];
FOR i: CARDINAL IN [0..s.length) DO
s[i] ← c.abortText[i];
ENDLOOP;
text ← ConvertUnsafe.ToRope[s];
END;
SELECT state FROM
halfOpen => BEGIN SmashClosedInternal[him, remoteReject]; END;
ENDCASE => SendAbortInternal[him];
END;
GotInt: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
int: LONG INTEGER ← Flip[c.pupID];
SELECT TRUE FROM
(int = inIntSeq - 1) => SendIntReply[him]; -- retransmission
(int = inIntSeq) =>
BEGIN
inIntSeq ← inIntSeq + 1;
NOTIFY waitingForInterrupt;
SendIntReply[him];
END;
ENDCASE => RETURN; -- very old duplicate
END;
GotIntReply: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
int: LONG INTEGER ← Flip[c.pupID];
IF int = outIntSeq THEN
BEGIN outIntPending ← FALSE; outIntSeq ← outIntSeq + 1; END;
BROADCAST stateChange;
END;
SendEnd: PUBLIC PROCEDURE [him: Instance] = SendEndInternal;
SendEndInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
timer ← ProcessorFace.GetClockPulses[];
IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
Send[him, end, connectionID, 0];
15*2 is 30 sec
IF (probeCounter ← probeCounter + 1) > 15 THEN
SmashClosed[him, transmissionTimeout];
END;
SendEndReplyInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him; Send[him, endRep, connectionID, 0]; END;
SendAbort: PUBLIC PROCEDURE [him: Instance] = SendAbortInternal;
SendAbortInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
SmashClosedInternal[him, remoteReject];
IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
c.abortCode ← 1;
Send[him, abort, connectionID, 2];
END;
SendAbortWithText: ENTRY PROCEDURE [him: Instance, e: Rope.ROPE] =
BEGIN OPEN him;
charsPerAbortHeader: CARDINAL = 2;
chars: CARDINAL = MIN[e.Length[], dataBytesPerPup - charsPerAbortHeader];
SmashClosedInternal[him, localAbort];
IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
c.abortCode ← 1;
FOR i: CARDINAL IN [0..chars) DO c.abortText[i] ← e.Fetch[i]; ENDLOOP;
Send[him, abort, connectionID, charsPerAbortHeader + chars];
END;
SendInt: PUBLIC PROCEDURE [him: Instance] = SendIntInternal;
SendIntInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
outIntTime ← ProcessorFace.GetClockPulses[];
IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
Send[him, int, Flop[outIntSeq], 0];
END;
SendIntReply: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him; Send[him, intRep, Flop[inIntSeq - 1], 0]; END;
SendRfc: PUBLIC PROCEDURE [him: Instance] = SendRfcInternal;
SendRfcInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c = NIL THEN c ← MaybeGetFreePupBuffer[];
IF c = NIL THEN RETURN;
timer ← ProcessorFace.GetClockPulses[];
c.address ← local;
Send[him, rfc, connectionID, dataBytesPerRFC];
END;
AnswerRfc: INTERNAL PROCEDURE [him: Instance, listener: PupSocketID] =
BEGIN OPEN him;
b: PupBuffer ← MaybeGetFreePupBuffer[];
IF b = NIL THEN RETURN;
b.address ← local;
b.pupType ← rfc;
b.pupID ← connectionID;
SetPupContentsBytes[b, dataBytesPerRFC];
b.source ← local;
b.source.socket ← listener;
b.dest ← remote;
PupRouterSendThis[b];
END;
send a control message using the current buffer
Send: INTERNAL PROCEDURE [
him: Instance, thisType: PupTypes.PupType, thisID: Pair, thisLen: CARDINAL] =
BEGIN OPEN him;
b: PupBuffer ← c;
IF CommFlags.doDebug AND b = NIL THEN Glitch[NoBufferToSend];
c ← NIL;
b.pupType ← thisType;
b.pupID ← thisID;
SetPupContentsBytes[b, thisLen];
b.source ← local;
b.dest ← remote;
PupRouterSendThis[b];
END;
CloseReason: TYPE = PupStream.CloseReason;
SmashClosed: PUBLIC PROCEDURE [Instance, CloseReason] = SmashClosedInternal;
SmashClosedInternal: INTERNAL PROCEDURE [him: Instance, why: CloseReason] =
BEGIN OPEN him;
state ← closed;
IF whyClosed = localClose THEN whyClosed ← why; -- don't clobber first reason
BROADCAST stateChange;
NOTIFY inputReady;
END;
StreamDied: PUBLIC PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
IF b # NIL THEN ReturnFreePupBuffer[b];
ERROR PupStream.StreamClosing[whyClosed, text];
END;
Open: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
SELECT state FROM
idle =>
BEGIN
state ← halfOpen;
OpenInit[him, PupRouterDefs.NextPupConnectionID[]];
SendRfcInternal[him];
THROUGH [0..60) WHILE state # open DO
1 min total
SELECT state FROM
open => RETURN;
closed => ERROR PupStream.StreamClosing[whyClosed, text];
ENDCASE => WAIT stateChange; -- 1 sec
ENDLOOP;
IF state = open THEN RETURN; -- it gets opened fast if local
state ← closed;
whyClosed ← transmissionTimeout;
ERROR PupStream.StreamClosing[whyClosed, text];
END;
ENDCASE => Glitch[StreamAlreadyOpen];
END;
OpenInit: INTERNAL PROCEDURE [him: Instance, newID: Pair] =
BEGIN OPEN him;
sameNet ← remote.net = local.net;
probeCounter ← initialRetransmissions; -- also allows first (gratuitous) ack
connectionID ← newID;
nextInputID ← nextOutputID ← ackedID ← allocatedID ← allocationID ←
maxOutputID ← outIntSeq ← inIntSeq ← seenIntSeq ← Flip[newID];
END;
MakeLocal: PUBLIC ENTRY PROCEDURE [
him: Instance, l: PupSocketID, r: PupAddress, m: PupStream.PupOpenMode,
id: Pair] =
BEGIN OPEN him;
ENABLE UNWIND => NULL;
kludge: PupSocketID ←
IF m # alreadyOpened THEN l ELSE PupTypes.fillInSocketID;
local ← GetLocalPupAddress[kludge, r];
remote ← r;
mode ← m;
connectionID ← id;
socket ← PupSocketMake[local.socket, remote, veryLongWait, id];
retransmitterFork ← FORK Retransmitter[him];
slurpFork ← FORK Slurp[him];
SELECT mode FROM
sendRfc => Open[him];
alreadyOpened =>
BEGIN
state ← open;
OpenInit[him, id];
AnswerRfc[him, l];
MySendAck[him];
END;
wait => NULL;
ENDCASE => ERROR;
END;
Copied (with only slight edits) from PupPktHot
MySendAck: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
b: PupBuffer;
IF c # NIL THEN BEGIN b ← c; c ← NIL; END
ELSE IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
b.pupBody ← ack[
dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]],
byteAllocate];
allocatedID ← nextInputID + byteAllocate;
b.pupType ← ack;
b.pupID ← Flop[nextInputID];
b.pupLength ← bytesPerAck;
b.source ← local;
b.dest ← remote;
PupRouterSendThis[b];
IF CommFlags.doStats THEN StatIncr[statAcksSent];
MAXC doesn't send free ACK, so set clock ahead
timer ←
ProcessorFace.GetClockPulses[] + ctlRetransmitPulses/16 - ctlRetransmitPulses;
END;
DestroyLocalLocked: PUBLIC ENTRY PROCEDURE [him: Instance] =
BEGIN OPEN him;
THROUGH [0..100) WHILE outIntPending DO
IF state = closed THEN EXIT; -- probably smashed closed, don't hang
WAIT stateChange;
ENDLOOP;
DO
SELECT state FROM
open =>
BEGIN state ← end; SendEndInternal[him]; probeCounter ← 0; EXIT; END;
talking, finishing => WAIT stateChange;
halfOpen => BEGIN SendAbortInternal[him]; EXIT; END;
ENDCASE --closed, end, idle-- => EXIT;
ENDLOOP;
THROUGH [0..10) UNTIL state = idle OR state = closed DO
extra layer for debugging
UNTIL state = idle OR state = closed DO
IF c # NIL THEN ReturnFreePupBuffer[c]; -- probe for allocate
UNTIL (c ← DequeuePup[inputQueue]) = NIL DO
ReturnFreePupBuffer[c]; ENDLOOP;
WAIT stateChange;
ENDLOOP;
ENDLOOP;
state ← closed;
NOTIFY retransmitterReady;
UNTIL unackedPups = 0 DO WAIT stateChange; ENDLOOP; -- retransmitter gets them
pleaseDie ← TRUE;
NOTIFY retransmitterReady;
END;
END.