-- File: PtPkt.mesa, Last Edit:
-- MAS April 21, 1980 6:52 PM
-- HGM November 19, 1979 12:02 PM

DIRECTORY
IODefs
: FROM "IODefs" USING [WriteChar, WriteLine, WriteOctal, WriteString],
ProcessDefs: FROM "ProcessDefs" USING [Detach],
StatsDefs: FROM "StatsDefs" USING [StatBump, StatIncr],
PupStream: FROM "PupStream" USING [StreamClosing],
PupPktDefs: FROM "PupPktDefs" USING [
CreatePupPktStreamListener, DestroyPupListener, PupListener,
PupPktStream, PupPktStreamCreate, PupPktStreamDestroy, PupPktStreamMake],
PupDefs: FROM "PupDefs" USING [
GetFreePupBuffer, ReturnFreePupBuffer, PupAddress, PupBuffer,
GetPupContentsBytes, SetPupContentsWords, SecondsToTocks],
PtDefs: FROM "PtDefs" USING [
CursorBits, recvIndex, sendIndex, slow,
DataMismatch, LengthMismatch, packetCycle, PtInterface,
PtLookerReady, Random, SeeStorms, UnListen, Done, WaitUntilDone];

PtPkt: PROGRAM [
pt: PtDefs.PtInterface]
IMPORTS
IODefs, ProcessDefs,
StatsDefs, PupStream, PupPktDefs, PupDefs, PtDefs
EXPORTS PtDefs =
BEGIN OPEN
pt, IODefs, StatsDefs, PupPktDefs, PupDefs, PtDefs;

listener: PupListener ← NIL;
globalPushPS: PupPktStream ← NIL;
secondPushPS: PupPktStream ← NIL;
globalPullPS: PupPktStream ← NIL;

Header: PROCEDURE [s, t: STRING, where: PupAddress] =
BEGIN
WriteString[s];
SELECT length FROM
short => WriteString[" short"];
long => WriteString[" long"];
random => WriteString[" random length"];
cyclic => WriteString[" cyclic length"];
ignore => WriteString[" unknown length"];
ENDCASE => ERROR;
WriteString[" packets"];
IF where.host=myHost AND where.net=myNet
THEN WriteString[" locally"]
ELSE BEGIN WriteString[" "]; WriteString[t];
IF where.net#myNet
THEN BEGIN WriteString[" net "]; WriteOctal[where.net] END;
WriteString[" host "]; WriteOctal[where.host]; END;
WriteLine["."];
WriteString["The data pattern is"];
SELECT data FROM
alternating => WriteString[" alternating ones and zeros"];
ones => WriteString[" all ones"];
zeros => WriteString[" all zeros"];
random => WriteString[" random"];
cyclic => WriteString[" cyclic"];
ignore => WriteString[" not being generated/checked"];
ENDCASE => ERROR;
WriteLine["."];
SeeStorms[];
END;

Push: PROCEDURE =
BEGIN
pushPS: PupPktStream ← NIL;
pushI: CARDINAL;
k, l: CARDINAL;
r: CARDINAL ← 0;
b: PupBuffer;
BEGIN -- extra BEGIN for EXITS
pushPS ←
PupPktStreamCreate[pushHim,SecondsToTocks[1] !
PupStream.StreamClosing =>
BEGIN
WriteString["Connection failed: "];
IF text#NIL THEN
BEGIN
WriteChar[’(];
WriteString[text];
WriteString[") "];
END;
WriteLine[whyText[why]];
GOTO
Failed;
END
];
SELECT NIL FROM
globalPushPS => globalPushPS ← pushPS;
secondPushPS => secondPushPS ← pushPS;
ENDCASE => ERROR;
IF info THEN WriteChar[’o];
PtLookerReady[];
SELECT length FROM -- can’t use zero
short => l←1;
cyclic => l←1; -- start at short
long, ignore => l←dataWordsPerPup;
random => NULL;
ENDCASE => ERROR;
UNTIL stopFlag DO
FOR pushI IN [0..packetCycle) UNTIL stopFlag DO
ENABLE PupStream.StreamClosing =>
BEGIN
WriteString["Push PktStream closed: "];
IF text#NIL THEN
BEGIN
WriteChar[’(];
WriteString[text];
WriteString[") "];
END;
WriteLine[whyText[why]];
GOTO
Closed;
END;
b←GetFreePupBuffer[];
SELECT length FROM
short, long, ignore => NULL;
cyclic => l←pushI;
random => l←((r←Random[r]) MOD dataWordsPerPup)+1;
ENDCASE => ERROR;
SELECT data FROM
ignore => NULL;
cyclic => FOR k IN [0..l) DO b.pupWords[k]←k+pushI; ENDLOOP;
ones => FOR k IN [0..l) DO b.pupWords[k]←177777B; ENDLOOP;
zeros => FOR k IN [0..l) DO b.pupWords[k]←0; ENDLOOP;
alternating => FOR k IN [0..l) DO b.pupWords[k]←125252B; ENDLOOP;
random => FOR k IN [0..l) DO b.pupWords[k]←(r←Random[r]); ENDLOOP;
ENDCASE => ERROR;
SetPupContentsWords[b,l];
IF ~doStats THEN
BEGIN
StatIncr[statDataPacketsSent];
StatBump[statDataBytesSent,l*2];
END;
pushPS.put[b];
IF slow THEN CursorBits[sendIndex] ← CursorBits[sendIndex]+1;
ENDLOOP;
IF ~slow THEN CursorBits[sendIndex] ← CursorBits[sendIndex]+1;
IF info THEN WriteChar[’s];
ENDLOOP;
EXITS
Closed, Failed => NULL;
END;
IF pushPS#NIL THEN PupPktStreamDestroy[pushPS];
IF globalPushPS=pushPS THEN globalPushPS←NIL;
IF secondPushPS=pushPS THEN secondPushPS←NIL;
Done[];
END;

Pull: PROCEDURE [pullPS: PupPktStream] =
BEGIN
listener: BOOLEAN ← pullPS#NIL;
pullI: CARDINAL;
bytes: CARDINAL;
k, l: CARDINAL;
r: CARDINAL ← 0;
b: PupBuffer;
IF pullPS=NIL THEN
BEGIN
pullPS ←
PupPktStreamMake[
pullMe.socket,
pullHim,
SecondsToTocks[1],
wait,
[0,0]
];
globalPullPS ← pullPS;
END;
SELECT length FROM
-- can’t use zero
short => l←1;
cyclic => l←1;
-- start at short
long => l←dataWordsPerPup;
random, ignore => NULL;
ENDCASE => ERROR;
UNTIL stopFlag DO
FOR pullI IN [0..packetCycle) UNTIL stopFlag DO
ENABLE PupStream.StreamClosing =>
BEGIN
WriteString["Pull PktStream closed: "];
IF text#NIL THEN
BEGIN
WriteChar[’(];
WriteString[text];
WriteString[") "];
END;
WriteLine[whyText[why]];
GOTO Closed;
END;
UNTIL (b←pullPS.get[])#NIL DO
IF stopFlag THEN GOTO AllDone;
WriteChar[’?];
ENDLOOP;
IF slow THEN CursorBits[recvIndex] ← CursorBits[recvIndex]+1;
bytes ← GetPupContentsBytes[b];
IF ~doStats THEN
BEGIN
StatIncr[statDataPacketsReceived];
StatBump[statDataBytesReceived,bytes];
END;
SELECT length FROM
short, long, ignore => NULL;
cyclic => IF (bytes)#(l←pullI)*2 THEN SIGNAL LengthMismatch;
random =>
BEGIN
l ← ((r←Random[r]) MOD dataWordsPerPup)+1;
IF (bytes)#l*2 THEN SIGNAL LengthMismatch;
END;
ENDCASE => ERROR;
SELECT data FROM
ignore => NULL;
cyclic => FOR k IN [0..l) DO IF b.pupWords[k]#(k+pullI) THEN SIGNAL DataMismatch; ENDLOOP;
ones => FOR k IN [0..l) DO IF b.pupWords[k]#177777B THEN SIGNAL DataMismatch; ENDLOOP;
zeros => FOR k IN [0..l) DO IF b.pupWords[k]#0 THEN SIGNAL DataMismatch; ENDLOOP;
alternating => FOR k IN [0..l) DO IF b.pupWords[k]#125252B THEN SIGNAL DataMismatch; ENDLOOP;
random => FOR k IN [0..l) DO
IF b.pupWords[k]#(r←Random[r]) THEN SIGNAL DataMismatch; ENDLOOP;
ENDCASE => ERROR;
ReturnFreePupBuffer[b];
ENDLOOP;
IF ~slow THEN CursorBits[recvIndex] ← CursorBits[recvIndex]+1;
IF info THEN WriteChar[’r];
REPEAT AllDone, Closed=>NULL;
ENDLOOP;
PupPktStreamDestroy[pullPS];
IF pullPS=globalPullPS THEN globalPullPS ← NIL;
IF ~listener THEN Done[];
END;

Hello: PROCEDURE [ps: PupPktStream, remote: PupAddress] =
BEGIN
IF info THEN Header["receiving","from",remote];
ProcessDefs.Detach[FORK Pull[ps]];
END;

PtPktListen: PUBLIC PROCEDURE =
BEGIN
listener ← CreatePupPktStreamListener[socNum,Hello,SecondsToTocks[1]];
END;

PtPktPushPull: PUBLIC PROCEDURE =
BEGIN
UnListen[];
IF pullHim.socket=[0,0] THEN BEGIN
WriteLine["Need a socket number."]; RETURN; END;
Header["Exchanging","with",pullHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
ProcessDefs.Detach[FORK Pull[NIL]];
WaitUntilDone[2];
END;

PtPktRecv: PUBLIC PROCEDURE =
BEGIN
UnListen[];
IF pullHim.socket=[0,0] THEN BEGIN
WriteLine["Need a socket number."]; RETURN; END;
Header["receiving","from",pullHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Pull[NIL]];
WaitUntilDone[1];
END;

PtPktSend: PUBLIC PROCEDURE =
BEGIN
Header["Sending","to",pushHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
WaitUntilDone[1];
END;

PtPktDoubleSend: PUBLIC PROCEDURE =
BEGIN
Header["Sending Double","to",pushHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
ProcessDefs.Detach[FORK Push[]];
WaitUntilDone[2];
END;

PtPktUnListen: PUBLIC PROCEDURE =
BEGIN
IF listener=NIL THEN RETURN[];
DestroyPupListener[listener]; listener←NIL;
WriteLine["PktStream Unlisten."];
END;

-- here is what gets STARTed
END.