-- File: PtOC.mesa, Last Edit: HGM April 21, 1980 8:58 PM
-- This package does lots of Opens and Closes.
-- It also tests (very crudely) SendAttention and WaitForAttention.
-- It uses ByteStreams to check them too since it isn’t worried about speed.
DIRECTORY
IODefs: FROM "IODefs" USING [WriteChar, WriteLine, WriteOctal, WriteString],
MiscDefs: FROM "MiscDefs" USING [CallDebugger],
ProcessDefs: FROM "ProcessDefs" USING [Detach, Abort, Aborted],
Stream: FROM "Stream" USING [Block, Handle, TimeOut,
GetBlock, PutBlock, SendAttention, WaitForAttention, SetInputOptions, Delete,
CompletionCode, SubSequenceType, defaultInputOptions],
StatsDefs: FROM "StatsDefs" USING [StatBump],
PupStream: FROM "PupStream" USING [
CreatePupByteStreamListener, DestroyPupListener,
PupAddress, PupListener, StreamClosing,
PupByteStreamCreate, PupByteStreamMake, SecondsToTocks],
PtDefs: FROM "PtDefs" USING [
blockCycle, blockSize, CursorBits, DataBuffer,
PtInterface, PtLookerReady, UnListen, Done, WaitUntilDone,
recvIndex, SeeStorms, sendIndex, slow];
PtOC: PROGRAM [pt: PtDefs.PtInterface]
IMPORTS IODefs, MiscDefs, ProcessDefs, Stream, StatsDefs, PupStream, PtDefs
EXPORTS PtDefs = BEGIN
OPEN pt, IODefs, StatsDefs, PupStream, PtDefs;
lastCaller: PupAddress;
listener: PupListener ← NIL;
pushCycles: CARDINAL ← 0;
pullCycles: CARDINAL ← 0;
attnsPerCycle: CARDINAL = 5;
blocksPerAttn: CARDINAL = blockCycle/attnsPerCycle;
Header: PROCEDURE [s, t: STRING, remote: PupAddress] =
BEGIN
WriteString[s];
SELECT mode FROM
byte => WriteString[" bytes"];
block => WriteString[" blocks"];
ENDCASE => ERROR;
IF remote.host=myHost AND remote.net=myNet
THEN WriteString[" locally"]
ELSE BEGIN WriteString[" "]; WriteString[t];
IF remote.net#myNet
THEN BEGIN WriteString[" net "]; WriteOctal[remote.net] END;
WriteString[" host "]; WriteOctal[remote.host]; END;
WriteLine["."];
SeeStorms[];
END;
Push: PROCEDURE =
BEGIN
pushBS: Stream.Handle ← NIL;
buffer: DataBuffer;
block: Stream.Block ← [@buffer,0,blockSize];
pushI: CARDINAL;
attn: CARDINAL ← 0;
-- format buffer (constant) here
UNTIL stopFlag DO
BEGIN ENABLE StreamClosing =>
BEGIN
WriteString["Push ByteStream closed: why="];
WriteLine[whyText[why]];
GOTO Closed
END;
attn ← (attn+1) MOD blocksPerAttn;
pushBS ← PupByteStreamCreate[pushHim,SecondsToTocks[1] !
StreamClosing =>
BEGIN
WriteString["Connection failed: "];
WriteLine[whyText[why]];
GOTO Failed;
END];
IF info THEN WriteChar[’O];
SELECT mode FROM
block =>
BEGIN
-- format buffer (changing) here
FOR pushI IN [1..blockCycle] DO
Stream.PutBlock[pushBS,block,FALSE];
IF ~doStats THEN StatBump[statDataBytesSent,blockSize];
IF slow THEN CursorBits[sendIndex] ← CursorBits[sendIndex]+1;
IF (pushI MOD blocksPerAttn)=attn THEN Stream.SendAttention[pushBS,0];
ENDLOOP;
IF info THEN WriteChar[’S];
Stream.PutBlock[pushBS,[NIL,0,0],TRUE];
IF ~slow THEN CursorBits[sendIndex] ← CursorBits[sendIndex]+1;
END;
ENDCASE => ERROR;
EXITS Closed, Failed => NULL;
END;
IF pushBS#NIL THEN Stream.Delete[pushBS];
pushBS ← NIL;
pushCycles ← pushCycles+1;
ENDLOOP;
Done[];
END;
Pull: PROCEDURE [listener: POINTER] =
BEGIN
attn: PROCESS;
pullBS: Stream.Handle ← NIL;
buffer: DataBuffer;
block: Stream.Block ← [@buffer,0,blockSize];
pullI: CARDINAL;
bytes: CARDINAL;
why: Stream.CompletionCode;
sst: Stream.SubSequenceType;
attns: CARDINAL;
EatAttn: PROCEDURE =
BEGIN
DO
[] ← Stream.WaitForAttention[pullBS ! ProcessDefs.Aborted => GOTO AllDone];
WriteChar[’a];
attns ← attns+1;
ENDLOOP;
EXITS AllDone => NULL;
END;
DO
IF listener=NIL THEN
BEGIN
pullBS ← PupByteStreamMake[
pullMe.socket,
pullHim,
SecondsToTocks[1],
wait,
[0,0]];
END
ELSE pullBS ← listener;
attns ← 0;
attn ← FORK EatAttn[];
BEGIN ENABLE StreamClosing =>
BEGIN
WriteString["Pull ByteStream closed: why="];
WriteLine[whyText[why]];
GOTO Closed;
END;
SELECT mode FROM
block =>
BEGIN
Stream.SetInputOptions[pullBS,Stream.defaultInputOptions];
FOR pullI IN [1..blockCycle] DO
-- zorch buffer (constant) here
[bytes,why,sst] ← Stream.GetBlock[pullBS,block !
Stream.TimeOut =>
BEGIN
IF info THEN WriteChar[’?];
RESUME;
END ];
SELECT why FROM
normal => NULL;
endRecord => BEGIN MiscDefs.CallDebugger["endRecord -- ???"]; END;
sstChange => BEGIN MiscDefs.CallDebugger["sstChange -- ???"]; END;
ENDCASE => ERROR;
IF bytes#(blockSize) THEN BEGIN MiscDefs.CallDebugger["wrong length -- ???"]; END;
IF slow THEN CursorBits[recvIndex] ← CursorBits[recvIndex]+1;
IF ~doStats THEN StatBump[statDataBytesReceived,blockSize];
-- check buffer now
ENDLOOP;
IF info THEN WriteChar[’R];
END;
ENDCASE => ERROR;
-- this shouldn’t work
[bytes,why,sst] ← Stream.GetBlock[pullBS,block !
Stream.TimeOut =>
BEGIN
IF info THEN WriteChar[’?];
RESUME;
END;
StreamClosing =>
BEGIN
IF why=remoteClose THEN GOTO Closed; -- normal case
WriteString["Pull ByteStream closed at right time: why="];
WriteLine[whyText[why]];
GOTO Closed;
END];
WriteLine["Got something extra!"];
EXITS Closed=>NULL;
END;
ProcessDefs.Abort[attn];
JOIN attn;
IF attns#attnsPerCycle THEN MiscDefs.CallDebugger["wrong number of attns"];
Stream.Delete[pullBS];
pullCycles ← pullCycles+1;
IF listener#NIL
THEN BEGIN IF info THEN WriteChar[’.]; RETURN; END
ELSE IF stopFlag THEN EXIT;
ENDLOOP;
Done[];
END;
Hello: PROCEDURE [bs: Stream.Handle, remote: PupAddress] =
BEGIN
IF info THEN
BEGIN
WriteChar[’L];
IF lastCaller.host#remote.host OR lastCaller.net#remote.net THEN
BEGIN
IF remote.net#myNet THEN
BEGIN
WriteOctal[remote.net];
WriteChar[’#];
END;
WriteOctal[remote.host];
END;
END;
lastCaller←remote;
ProcessDefs.Detach[FORK Pull[bs]];
END;
PtOCListen: PUBLIC PROCEDURE =
BEGIN
lastCaller ← [[0],[0],[0,0]];
listener ← CreatePupByteStreamListener[socNum,Hello,SecondsToTocks[1]];
END;
PtOCPushPull: PUBLIC PROCEDURE =
BEGIN
Header["Exchanging","with",pushHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
ProcessDefs.Detach[FORK Pull[NIL]];
WaitUntilDone[2];
END;
PtOCRecv: PUBLIC PROCEDURE =
BEGIN
UnListen[];
IF pullHim.socket=[0,0] THEN BEGIN
WriteLine["Need a socket number."]; RETURN; END;
Header["Receiving","from",pullHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Pull[NIL]];
WaitUntilDone[1];
END;
PtOCSend: PUBLIC PROCEDURE =
BEGIN
Header["Sending","to",pushHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
WaitUntilDone[1];
END;
PtOCDoubleSend: PUBLIC PROCEDURE =
BEGIN
Header["Sending Double","to",pushHim];
PtLookerReady[];
ProcessDefs.Detach[FORK Push[]];
ProcessDefs.Detach[FORK Push[]];
WaitUntilDone[2];
END;
PtOCUnListen: PUBLIC PROCEDURE =
BEGIN
IF listener=NIL THEN RETURN[];
DestroyPupListener[listener]; listener←NIL;
WriteLine["OpenClose Unlisten."];
END;
-- here is what gets STARTed
END.