-- File: PtBSP.mesa, Last Edit:
-- MAS April 21, 1980 6:23 PM
-- HGM November 12, 1979 5:37 PM
DIRECTORY
IODefs: FROM "IODefs" USING [WriteChar, WriteLine, WriteOctal, WriteString],
MiscDefs: FROM "MiscDefs" USING [CallDebugger],
ProcessDefs: FROM "ProcessDefs" USING [Detach],
Stream: FROM "Stream" USING [
GetBlock, PutBlock, SendNow,
Block, Handle, TimeOut, CompletionCode, SubSequenceType],
StatsDefs: FROM "StatsDefs" USING [StatBump],
PupStream: FROM "PupStream" USING [
CreatePupByteStreamListener, RejectThisRequest, DestroyPupListener,
PupAddress, PupListener, StreamClosing,
PupByteStreamCreate, PupByteStreamMake, SecondsToTocks],
PtDefs: FROM "PtDefs" USING [
blockCycle, blockSize, CursorBits, DataBuffer,
PtInterface, PtLookerReady, UnListen, Random, Done, WaitUntilDone,
recvIndex, SeeStorms, sendIndex, slow];
PtBSP: PROGRAM [pt: PtDefs.PtInterface]
IMPORTS IODefs, MiscDefs, ProcessDefs, Stream, StatsDefs, PupStream, PtDefs
EXPORTS PtDefs = BEGIN
OPEN pt, IODefs, StatsDefs, PupStream, PtDefs;
listener: PupListener ← NIL;
secondPushBS: Stream.Handle ← NIL;
globalPushBS: Stream.Handle ← NIL;
globalPullBS: Stream.Handle ← NIL;
Header: PROCEDURE [s, t: STRING, where: PupAddress] =
BEGIN
WriteString[s];
SELECT mode FROM
byte => WriteString[" bytes"];
block => WriteString[" blocks"];
ENDCASE => ERROR;
IF where.host=myHost AND where.net=myNet
THEN WriteString[" locally"]
ELSE BEGIN WriteString[" "]; WriteString[t];
IF where.net#myNet
THEN BEGIN WriteString[" net "]; WriteOctal[where.net] END;
WriteString[" host "]; WriteOctal[where.host]; END;
WriteLine["."];
SeeStorms[];
END;
Push: PROCEDURE =
BEGIN
pushBS: Stream.Handle ← NIL;
buffer: DataBuffer;
block: Stream.Block ← [@buffer,0,blockSize];
pushI, len, k: CARDINAL;
r: CARDINAL ← 0;
BEGIN -- for EXITS
pushBS ← PupByteStreamCreate[pushHim,SecondsToTocks[1] !
StreamClosing =>
BEGIN
WriteString["Connection failed: "];
IF text#NIL THEN
BEGIN
WriteChar[’(];
WriteString[text];
WriteString[") "];
END;
WriteLine[whyText[why]];
GOTO Failed;
END];
globalPushBS ← pushBS;
WriteChar[’O];
SELECT length FROM
short, cyclic => len ← 0; -- start at short
long, ignore => len ← blockSize;
random => NULL;
ENDCASE => ERROR;
SELECT data FROM
alternating => FOR k IN [0..blockSize) DO buffer[k] ← 125B; ENDLOOP;
ones => FOR k IN [0..blockSize) DO buffer[k] ← 377B; ENDLOOP;
zeros => FOR k IN [0..blockSize) DO buffer[k] ← 0; ENDLOOP;
random => NULL;
cyclic => NULL;
ignore => NULL;
ENDCASE => ERROR;
UNTIL stopFlag DO
ENABLE StreamClosing =>
BEGIN
WriteString["Push Stream closed: "];
IF text#NIL THEN
BEGIN
WriteChar[’(];
WriteString[text];
WriteString[") "];
END;
WriteLine[whyText[why]];
GOTO Closed;
END;
SELECT mode FROM
block =>
BEGIN
FOR pushI IN [1..blockCycle) UNTIL stopFlag DO
SELECT length FROM
short, long, ignore => NULL;
cyclic => len ← pushI;
random => len ← ((r←Random[r]) MOD blockSize+1);
ENDCASE => ERROR;
SELECT data FROM
ignore => NULL;
cyclic => FOR k IN [0..len) DO buffer[k] ← k+pushI; ENDLOOP;
ones => NULL;
zeros => NULL;
alternating => NULL;
random => FOR k IN [0..len) DO buffer[k] ← (r←Random[r]); ENDLOOP;
ENDCASE => ERROR;
block.stopIndexPlusOne ← len;
Stream.PutBlock[pushBS,block,FALSE];
IF ~doStats THEN StatBump[statDataBytesSent,blockSize];
IF slow THEN CursorBits[sendIndex] ← CursorBits[sendIndex]+1;
ENDLOOP;
END;
ENDCASE => ERROR;
IF info THEN WriteChar[’S];
Stream.SendNow[pushBS];
IF ~slow THEN CursorBits[sendIndex] ← CursorBits[sendIndex]+1;
ENDLOOP;
EXITS Closed, Failed => NULL;
END;
IF pushBS#NIL THEN pushBS.delete[pushBS];
IF secondPushBS=pushBS THEN secondPushBS←NIL;
IF globalPushBS=pushBS THEN globalPushBS←NIL;
Done[];
END;
Pull: PROCEDURE [pullBS: Stream.Handle] =
BEGIN
listener: BOOLEAN ← pullBS#NIL;
buffer: DataBuffer;
block: Stream.Block ← [@buffer,0,blockSize];
pullI, len, k: CARDINAL;
r: CARDINAL ← 0;
bytes: CARDINAL;
why: Stream.CompletionCode;
sst: Stream.SubSequenceType;
IF pullBS=NIL THEN
BEGIN
pullBS ← PupByteStreamMake[
pullMe.socket,
pullHim,
SecondsToTocks[1],
wait,
[0,0]];
globalPullBS ← pullBS;
END;
SELECT length FROM
short, cyclic => len ← 0; -- start at short
long, ignore => len ← blockSize;
random => NULL;
ENDCASE => ERROR;
UNTIL stopFlag DO
ENABLE StreamClosing =>
BEGIN
WriteString["Pull Stream closed: "];
IF text#NIL THEN
BEGIN
WriteChar[’(];
WriteString[text];
WriteString[") "];
END;
WriteLine[whyText[why]];
GOTO Closed;
END;
SELECT mode FROM
block =>
BEGIN
FOR pullI IN [1..blockCycle) UNTIL stopFlag DO
SELECT length FROM
short, long, ignore => NULL;
cyclic => len ← pullI;
random => len ← ((r←Random[r]) MOD blockSize+1);
ENDCASE => ERROR;
SELECT data FROM
ignore => NULL;
cyclic => NULL;
ones, zeros, alternating, random =>
FOR k IN [0..len) DO buffer[k] ← 123; ENDLOOP;
ENDCASE => ERROR;
block.stopIndexPlusOne ← len;
[bytes,why,sst] ← Stream.GetBlock[pullBS,block !
Stream.TimeOut =>
BEGIN
IF info THEN WriteChar[’?];
RESUME;
END ];
SELECT why FROM
normal => NULL;
endRecord => BEGIN MiscDefs.CallDebugger["endRecord -- ???"]; END;
sstChange => BEGIN MiscDefs.CallDebugger["sstChange -- ???"]; END;
ENDCASE => ERROR;
IF bytes#len THEN BEGIN MiscDefs.CallDebugger["wrong length -- ???"]; END;
SELECT data FROM
ignore => NULL;
cyclic => FOR k IN [0..len) DO
IF buffer[k]#((k+pullI) MOD 400B) THEN ERROR; ENDLOOP;
ones => FOR k IN [0..len) DO
IF buffer[k]#377B THEN ERROR; ENDLOOP;
zeros => FOR k IN [0..len) DO
IF buffer[k]#0 THEN ERROR; ENDLOOP;
alternating => FOR k IN [0..len) DO
IF buffer[k]#125B THEN ERROR; ENDLOOP;
random => FOR k IN [0..len) DO
IF buffer[k]#((r←Random[r]) MOD 400B) THEN ERROR; ENDLOOP;
ENDCASE => ERROR;
IF slow THEN CursorBits[recvIndex] ← CursorBits[recvIndex]+1;
IF ~doStats THEN StatBump[statDataBytesReceived,bytes];
-- check buffer now
ENDLOOP;
END;
ENDCASE => ERROR;
IF ~slow THEN CursorBits[recvIndex] ← CursorBits[recvIndex]+1;
IF info THEN WriteChar[’R];
REPEAT Closed => NULL;
ENDLOOP;
pullBS.delete[pullBS];
IF globalPullBS=pullBS THEN globalPullBS ← NIL;
IF ~listener THEN Done[];
END;
Hello: PROCEDURE [bs: Stream.Handle, remote: PupAddress] =
BEGIN
IF info THEN Header["receiving","from",remote];
ProcessDefs.Detach[FORK Pull[bs]];
END;
PtBSPListen: PUBLIC PROCEDURE =
BEGIN
listener ← CreatePupByteStreamListener[socNum,Hello,SecondsToTocks[1]];
END;
PtBSPPushPull: PUBLIC PROCEDURE =
BEGIN
UnListen[];
IF pullHim.socket=[0,0] THEN BEGIN
WriteLine["Need a socket number."]; RETURN; END;
Header["Exchanging","with",pushHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
ProcessDefs.Detach[FORK Pull[NIL]];
WaitUntilDone[2];
END;
PtBSPRecv: PUBLIC PROCEDURE =
BEGIN
UnListen[];
IF pullHim.socket=[0,0] THEN BEGIN
WriteLine["Need a socket number."]; RETURN; END;
Header["Receiving","from",pullHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Pull[NIL]];
WaitUntilDone[1];
END;
PtBSPSend: PUBLIC PROCEDURE =
BEGIN
Header["Sending","to",pushHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
WaitUntilDone[1];
END;
PtBSPDoubleSend: PUBLIC PROCEDURE =
BEGIN
Header["Sending double","to",pushHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
ProcessDefs.Detach[FORK Push[]];
WaitUntilDone[2];
END;
PtBSPUnListen: PUBLIC PROCEDURE =
BEGIN
IF listener=NIL THEN RETURN[];
DestroyPupListener[listener]; listener←NIL;
WriteLine["ByteStream Unlisten."];
END;
rejecter: PupListener ← NIL;
RejectEverything: PROCEDURE [PupAddress] =
BEGIN
ERROR RejectThisRequest["Reject this turkey"];
END;
PtRejListen: PUBLIC PROCEDURE =
BEGIN
rejecter ← CreatePupByteStreamListener[
socNum,Hello,SecondsToTocks[1],RejectEverything];
END;
PtRejUnListen: PUBLIC PROCEDURE =
BEGIN
IF rejecter=NIL THEN RETURN[];
DestroyPupListener[rejecter]; rejecter←NIL;
WriteLine["Rejector Unlisten."];
END;
-- here is what gets STARTed
END.