-- File: PupPktCool.mesa - last edit:
-- MAS October 13, 1980 5:03 PM
-- HGM October 27, 1980 10:09 AM

-- Copyright Xerox Corporation 1979, 1980

DIRECTORY
StatsDefs USING [StatIncr],
CommUtilDefs USING [
AppendChar, AllocateHeapString, DisableTimeout, FreeHeapString, GetTicks,
GlobalFrame, InitializeCondition, InitializeMonitor, MsecToTicks,
SetTimeout, Ticks, UnNew],
PupRouterDefs USING [NextPupConnectionID],
PupPktPrivateDefs,
DriverDefs USING [MaybeGetFreePupBuffer, doCheck, doStats, Glitch],
PupStream USING [CloseReason, StreamClosing, PupOpenMode],
PupPktDefs USING [PupPktStream],
PupDefs USING [
Pair, defaultPupsToAllocate,
DequeuePup, GetLocalPupAddress,
DataWordsPerPupBuffer, ReturnFreePupBuffer,
PupAddress, PupBuffer, PupSocketID,
PupRouterSendThis, SetPupContentsBytes,
Tocks, veryLongWait, veryShortWait,
PupSocketMake, PupSocketKick, PupSocketDestroy],
BufferDefs USING [QueueCleanup, QueueInitialize],
PupTypes USING [
PupType, fillInSocketID, noProcessPupErrorCode,
gatewayResourceLimitsPupErrorCode];

PupPktCool: MONITOR LOCKS him.lock USING him: PupPktPrivateDefs.Instance
IMPORTS StatsDefs, CommUtilDefs, PupStream, PupRouterDefs, DriverDefs,
PupPktPrivateDefs, PupDefs, BufferDefs
EXPORTS PupPktPrivateDefs, PupStream, PupPktDefs =
BEGIN OPEN StatsDefs, PupPktPrivateDefs, DriverDefs, PupPktDefs, PupDefs;


myPing: BOOLEAN ← TRUE;
myMaxAllocate, myPathMaxAllocate: CARDINAL ← defaultPupsToAllocate;
myMaxBufferSize: CARDINAL ← 0;

NoBufferToSend: PUBLIC ERROR = CODE;
StreamAlreadyOpen: PUBLIC ERROR = CODE;

ctlRetransmitPulses: CommUtilDefs.Ticks ← CommUtilDefs.MsecToTicks[2000];

PupPktStreamCreate: PUBLIC PROCEDURE [remote: PupAddress, ticks: Tocks]
RETURNS [PupPktStream] =
BEGIN
RETURN[PupPktStreamMake[PupTypes.fillInSocketID,remote,ticks,sendRfc,[0,0]]];
END;

PupPktStreamMake: PUBLIC PROCEDURE [
local: PupSocketID, remote: PupAddress, ticks: Tocks,
mode: PupStream.PupOpenMode, id: Pair]
RETURNS [PupPktStream] =
BEGIN
new: PROGRAM RETURNS [Instance] ← NEW PupPktPrivateDefs.PupPktHot;
him: Instance;
him ← START new; -- Do initialization code
InitializeEverything[him];
SELECT ticks FROM
veryShortWait => him.dontWait ← TRUE;
veryLongWait => CommUtilDefs.DisableTimeout[@him.inputReady];
ENDCASE => CommUtilDefs.SetTimeout[@him.inputReady,ticks];
MakeLocal[him,local,remote,mode,id !
UNWIND => PupPktStreamDestroy[@him.me] ];
RETURN[@him.me];
END;

PupPktStreamDestroy: PUBLIC PROCEDURE [ps: PupPktStream] =
BEGIN
who: POINTER ← CommUtilDefs.GlobalFrame[ps.put];
krock: Instance = LOOPHOLE[1234];
offset: INTEGER = @krock.me-LOOPHOLE[krock,POINTER];
him: Instance ← LOOPHOLE[ps-offset];
DestroyLocal[him];
CommUtilDefs.UnNew[who];
END;

SetMaxAllocation: PUBLIC PROCEDURE [n: CARDINAL] =
BEGIN
myMaxAllocate ← n;
myPathMaxAllocate ← MIN[defaultPupsToAllocate,n];
END;

SetMaxBufferSize: PUBLIC PROCEDURE [n: CARDINAL] =
BEGIN
myMaxBufferSize ← 2*MIN[n,DataWordsPerPupBuffer[]];
END;

SetPinging: PUBLIC PROCEDURE [ping: BOOLEAN] =
BEGIN
myPing ← ping;
END;

GetSenderSizeLimit: PUBLIC ENTRY PROCEDURE [him: Instance] RETURNS [CARDINAL] =
BEGIN OPEN him; ENABLE UNWIND => NULL;
UNTIL outEnd#0 DO
WAIT stateChange;
IF state=closed THEN ERROR PupStream.StreamClosing[whyClosed,text];
ENDLOOP;
RETURN[outEnd];
END;

GetLocalAddress: PUBLIC PROCEDURE [him: Instance] RETURNS [PupAddress] =
BEGIN OPEN him;
RETURN[local];
END;

InitializeEverything: PROCEDURE [him: Instance] =
BEGIN
him.state ← idle;
him.c ← NIL;
him.dontWait ← FALSE;
him.dataBytesPerPup ← 2*DataWordsPerPupBuffer[];
him.outIntPending ← FALSE;
him.outEnd ← 0;
him.probeCounter ← 0;
him.ping ← myPing;
him.myMaxAllocate ← myMaxAllocate;
him.pathMaxAllocate ← myPathMaxAllocate;
him.hisMaxAllocate ← 0;
him.throttle ← 0;
him.unackedPups ← 0;
him.allocatedPups ← 0;
him.clumpsSinceBump ← 0;
him.
sentBuffer ← NIL;
him.pleaseDie ← FALSE;
him.sam
eNet ← FALSE;
him.
sendAck ← FALSE;
him.
aDataOut ← FALSE;
him.
retransmitTicks ← initialRetransmitTicks;
him.whyClosed ← localClose;
him.text ← NIL;
BufferDefs.QueueInitialize[@him.inputQueue];
BufferDefs.QueueInitialize[@him.sentQueue];
BEGIN OPEN CommUtilDefs;
InitializeCondition[@him.stateChange,MsecToTicks[1000]];
InitializeCondition[@him.retransmitterReady,MsecToTicks[100]];
InitializeCondition[@him.inputReady,MsecToTicks[5000]];
InitializeCondition[@him.waitingForInterrupt,MsecToTicks[5000]];
DisableTimeout[@him.waitingForInterrupt];
InitializeMonitor[@him.lock];
END;
IF myMaxBufferSize#0 THEN him.dataBytesPerPup ← myMaxBufferSize;
END;

GotOther: PUBLIC PROCEDURE [Instance, PupBuffer] = GotOtherInternal;
GotOtherInternal: INTERNAL PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
SELECT b.pupType FROM
rfc => GotRfc[him];
end => GotEnd[him];
endRep => GotEndReply[him];
abort => GotAbort[him];
error => GotError[him];
int => GotInt[him];
intRep => GotIntReply[him];
ENDCASE => IF doStats THEN StatIncr[statPacketsRejectedBadType];
END;

GotRfc: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF state#idle AND c.pupID#connectionID THEN
BEGIN
IF doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state FROM
idle =>
BEGIN
state ← open;
remote ← c.address;
OpenInit[him,c.pupID];
SendRfcInternal[him];
MySendAck[him];
BROADCAST stateChange;
END;
halfOpen =>
BEGIN
state ← open;
remote ← c.address;
MySendAck[him];
BROADCAST stateChange;
END;
open, talking => IF mode#sendRfc THEN SendRfcInternal[him];
ENDCASE --end, closed, finishing-- => SendAbortInternal[him];
END;

GotEnd: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c.pupID#connectionID THEN
BEGIN
IF doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state FROM
talking, finishing =>
BEGIN
state ← finishing;
whyClosed ← remoteClose;
END;
ENDCASE =>
BEGIN
SmashClosedInternal[him,remoteClose];
SendEndReplyInternal[him];
END;
END;

GotEndReply: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c.pupID#connectionID THEN
BEGIN
IF doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
SELECT state FROM
open, talking, halfOpen, finishing => SendAbortInternal[him];
ENDCASE --idle, end, closed-- => state←closed;
BROADCAST stateChange;
NOTIFY inputReady;
END;

GotError: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
i, len: CARDINAL;
IF doStats THEN StatIncr[statErrorPacketsReceived];
SELECT c.errorCode FROM
PupTypes.noProcessPupErrorCode =>
BEGIN -- like an abort
IF c.source.socket#remote.socket THEN RETURN;
SELECT state FROM
idle, closed => RETURN;
ENDCASE => SmashClosedInternal[him,remoteReject];
IF text=NIL THEN
BEGIN
len ← c.pupLength-bytesPerPupHeader-2*(10+1+1);
len ← MIN[len,100];
text ← CommUtilDefs.AllocateHeapString[len];
FOR i IN [0..len) DO
CommUtilDefs.AppendChar[text,c.errorText[i]];
ENDLOOP;
END;
END;
PupTypes.gatewayResourceLimitsPupErrorCode =>
BEGIN
throttle ← throttle+1;
IF pathMaxAllocate=1 THEN
-- Beware: We may have gone unstable
retransmitTicks ← MIN[2*retransmitTicks,maxRetransmitTicks];
END;
ENDCASE;
END;

GotAbort: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
i, len: CARDINAL;
IF c.pupID#connectionID THEN
BEGIN
IF doStats THEN StatIncr[statPacketsRejectedBadID];
RETURN;
END;
IF state=idle OR state=closed THEN RETURN;
IF text=NIL THEN
BEGIN
len ← c.pupLength-bytesPerPupHeader-2*(1);
len ← MIN[len,100];
text ← CommUtilDefs.AllocateHeapString[len];
FOR i IN [0..len) DO
CommUtilDefs.AppendChar[text,c.abortText[i]];
ENDLOOP;
END;
SELECT state FROM
halfOpen => BEGIN SmashClosedInternal[him,remoteReject]; END;
ENDCASE => SendAbortInternal[him];
END;

GotInt: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
int: LONG INTEGER ← Flip[c.pupID];
SELECT TRUE FROM
(int=inIntSeq-1) => SendIntReply[him]; -- retransmission
(int=inIntSeq) =>
BEGIN
inIntSeq ← inIntSeq+1;
NOTIFY waitingForInterrupt;
SendIntReply[him];
END;
ENDCASE => RETURN; -- very old duplicate
END;

GotIntReply: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
int: LONG INTEGER ← Flip[c.pupID];
IF int=outIntSeq THEN
BEGIN
outIntPending ← FALSE;
outIntSeq ← outIntSeq+1;
END;
BROADCAST stateChange;
END;

SendEnd: PUBLIC PROCEDURE [him: Instance] = SendEndInternal;
SendEndInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
timer ← CommUtilDefs.GetTicks[];
IF c=NIL AND (c←MaybeGetFreePupBuffer[])=NIL THEN RETURN;
Send[him,end,connectionID,0];
-- 15*2 is 30 sec
IF (probeCounter←probeCounter+1)>15 THEN
SmashClosed[him,transmissionTimeout];
END;

SendEndReplyInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
Send[him,endRep,connectionID,0];
END;

SendAbort: PUBLIC PROCEDURE [him: Instance] = SendAbortInternal;
SendAbortInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
SmashClosedInternal[him,remoteReject];
IF c=NIL AND (c←MaybeGetFreePupBuffer[])=NIL THEN RETURN;
c.abortCode ← 1;
Send[him,abort,connectionID,2];
END;

SendInt: PUBLIC PROCEDURE [him: Instance] = SendIntInternal;
SendIntInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
outIntTime ← CommUtilDefs.GetTicks[];
IF c=NIL AND (c←MaybeGetFreePupBuffer[])=NIL THEN RETURN;
Send[him,int,Flop[outIntSeq],0];
END;

SendIntReply: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
Send[him,intRep,Flop[inIntSeq-1],0];
END;

SendRfc: PUBLIC PROCEDURE [him: Instance] = SendRfcInternal;
SendRfcInternal: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
IF c=NIL THEN c ← MaybeGetFreePupBuffer[];
IF c=NIL THEN RETURN;
timer ← CommUtilDefs.GetTicks[];
c.address ← local;
Send[him,rfc,connectionID,dataBytesPerRFC];
END;

AnswerRfc: INTERNAL PROCEDURE [him: Instance, listener: PupSocketID] =
BEGIN OPEN him;
b: PupBuffer ← MaybeGetFreePupBuffer[];
IF b=NIL THEN RETURN;
b.address ← local;
b.pupType ← rfc;
b.pupID ← connectionID;
SetPupContentsBytes[b,dataBytesPerRFC];
b.source ← local;
b.source.socket ← listener;
b.dest ← remote;
PupRouterSendThis[b];
END;

-- send a control message using the current buffer
Send: INTERNAL PROCEDURE [him: Instance, thisType: PupTypes.PupType, thisID: Pair, thisLen: CARDINAL] =
BEGIN OPEN him;
b: PupBuffer ← c;
IF doCheck AND b=NIL THEN Glitch[NoBufferToSend];
c ← NIL;
b.pupType ← thisType;
b.pupID ← thisID;
SetPupContentsBytes[b,thisLen];
b.source.socket ← local.socket;
b.dest ← remote;
PupRouterSendThis[b];
END;

CloseReason: TYPE = PupStream.CloseReason;

SmashClosed: PUBLIC PROCEDURE [Instance, CloseReason] = SmashClosedInternal;
SmashClosedInternal: INTERNAL PROCEDURE [him: Instance, why: CloseReason] =
BEGIN OPEN him;
state ← closed;
IF whyClosed=localClose THEN whyClosed ← why; -- don’t clobber first reason
BROADCAST stateChange;
NOTIFY inputReady;
END;

StreamDied: PUBLIC PROCEDURE [him: Instance, b: PupBuffer] =
BEGIN OPEN him;
IF b#NIL THEN ReturnFreePupBuffer[b];
ERROR PupStream.StreamClosing[whyClosed,text];
END;

Open: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
SELECT state FROM
idle=>
BEGIN
state ← halfOpen;
OpenInit[him,PupRouterDefs.NextPupConnectionID[]];
SendRfcInternal[him];
THROUGH [0..60) WHILE state#open DO -- 1 min total
SELECT state FROM
open => RETURN;
closed => ERROR PupStream.StreamClosing[whyClosed,text];
ENDCASE => WAIT stateChange; -- 1 sec
ENDLOOP;
IF state=open THEN RETURN; -- it gets opened fast if local
state ← closed; whyClosed ← transmissionTimeout;
ERROR PupStream.StreamClosing[whyClosed,text];
END;
ENDCASE=> Glitch[StreamAlreadyOpen];
END;

OpenInit: INTERNAL PROCEDURE [him: Instance, newID: Pair] =
BEGIN OPEN him;
sameNet ← remote.net=local.net;
probeCounter ← initialRetransmissions; -- also allows first (gratuitous) ack
connectionID ← newID;
nextInputID ← nextOutputID ← ackedID ← allocatedID ← allocationID ← maxOutputID ←
outIntSeq ← inIntSeq ← seenIntSeq ← Flip[newID];
END;

MakeLocal: ENTRY PROCEDURE [
him: Instance, l: PupSocketID, r: PupAddress, m: PupStream.PupOpenMode, id: Pair] =
BEGIN OPEN him; ENABLE UNWIND => NULL;
kludge: PupSocketID ← IF m#alreadyOpened THEN l ELSE PupTypes.fillInSocketID;
local ← GetLocalPupAddress[kludge,@r];
remote ← r;
mode ← m;
connectionID ← id;
socket ← PupSocketMake[local.socket,remote,veryLongWait,id];
retransmitterFork ← FORK retransmitter[];
slurpFork ← FORK slurp[];
SELECT mode FROM
sendRfc => Open[him];
alreadyOpened =>
BEGIN
state ← open;
OpenInit[him,id];
AnswerRfc[him,l];
MySendAck[him];
END;
wait => NULL;
ENDCASE => ERROR;
END;

-- Copied (with only slight edits) from PupPktHot
MySendAck: INTERNAL PROCEDURE [him: Instance] =
BEGIN OPEN him;
b: PupBuffer;
IF c#NIL THEN BEGIN b←c; c←NIL; END
ELSE IF (b←MaybeGetFreePupBuffer[])=NIL THEN RETURN;
b.pupBody ← ack [
dataBytesPerPup,
MAX[0,INTEGER[myMaxAllocate-inputQueue.length]],
byteAllocate];
allocatedID ← nextInputID+byteAllocate;
b.pupType ← ack;
b.pupID ← Flop[nextInputID];
b.pupLength ← bytesPerAck;
b.source.socket ← local.socket;
b.dest ← remote;
PupRouterSendThis[b];
IF doStats THEN StatIncr[statAcksSent];
-- MAXC doesn’t send free ACK, so set clock ahead
timer ← CommUtilDefs.GetTicks[]+ctlRetransmitPulses/16-ctlRetransmitPulses;
END;

DestroyLocal: PROCEDURE [him: Instance] =
BEGIN OPEN him;
DestroyLocalLocked[him];
JOIN retransmitterFork;
JOIN slurpFork;
-- IF sentBuffer#NIL THEN ReturnFreePupBuffer[sentBuffer];
BufferDefs.QueueCleanup[@inputQueue];
BufferDefs.QueueCleanup[@sentQueue];
PupSocketDestroy[socket];
IF text#NIL THEN CommUtilDefs.FreeHeapString[text];
END;

DestroyLocalLocked: ENTRY PROCEDURE [him: Instance] = INLINE
BEGIN OPEN him;
THROUGH [0..100) WHILE outIntPending DO
IF state=closed THEN EXIT; -- probably smashed closed, don’t hang
WAIT stateChange;
ENDLOOP;
DO
SELECT state FROM
open=> BEGIN state←end; SendEndInternal[him]; probeCounter ← 0; EXIT; END;
talking, finishing => WAIT stateChange;
halfOpen => BEGIN SendAbortInternal[him]; EXIT; END;
ENDCASE--closed, end, idle-- => EXIT;
ENDLOOP;
THROUGH [0..10) UNTIL state=idle OR state=closed DO -- extra layer for debugging
UNTIL state=idle OR state=closed DO
IF c#NIL THEN ReturnFreePupBuffer[c]; -- probe for allocate
UNTIL (c←DequeuePup[@inputQueue])=NIL DO ReturnFreePupBuffer[c]; ENDLOOP;
WAIT stateChange;
ENDLOOP;
ENDLOOP;
state ← closed;
NOTIFY retransmitterReady;
UNTIL unackedPups=0 DO WAIT stateChange; ENDLOOP; -- retransmitter gets them
pleaseDie ← TRUE;
NOTIFY retransmitterReady;
PupSocketKick[socket];
END;

-- initialization
END.