-- File: PtOC.mesa, Last Edit: HGM April 21, 1980 8:58 PM -- This package does lots of Opens and Closes. -- It also tests (very crudely) SendAttention and WaitForAttention. -- It uses ByteStreams to check them too since it isn't worried about speed. DIRECTORY IODefs: FROM "IODefs" USING [WriteChar, WriteLine, WriteOctal, WriteString], MiscDefs: FROM "MiscDefs" USING [CallDebugger], ProcessDefs: FROM "ProcessDefs" USING [Detach, Abort, Aborted], Stream: FROM "Stream" USING [Block, Handle, TimeOut, GetBlock, PutBlock, SendAttention, WaitForAttention, SetInputOptions, Delete, CompletionCode, SubSequenceType, defaultInputOptions], StatsDefs: FROM "StatsDefs" USING [StatBump], PupStream: FROM "PupStream" USING [ CreatePupByteStreamListener, DestroyPupListener, PupAddress, PupListener, StreamClosing, PupByteStreamCreate, PupByteStreamMake, SecondsToTocks], PtDefs: FROM "PtDefs" USING [ blockCycle, blockSize, CursorBits, DataBuffer, PtInterface, PtLookerReady, UnListen, Done, WaitUntilDone, recvIndex, SeeStorms, sendIndex, slow]; PtOC: PROGRAM [pt: PtDefs.PtInterface] IMPORTS IODefs, MiscDefs, ProcessDefs, Stream, StatsDefs, PupStream, PtDefs EXPORTS PtDefs = BEGIN OPEN pt, IODefs, StatsDefs, PupStream, PtDefs; lastCaller: PupAddress; listener: PupListener _ NIL; pushCycles: CARDINAL _ 0; pullCycles: CARDINAL _ 0; attnsPerCycle: CARDINAL = 5; blocksPerAttn: CARDINAL = blockCycle/attnsPerCycle; Header: PROCEDURE [s, t: STRING, remote: PupAddress] = BEGIN WriteString[s]; SELECT mode FROM byte => WriteString[" bytes"]; block => WriteString[" blocks"]; ENDCASE => ERROR; IF remote.host=myHost AND remote.net=myNet THEN WriteString[" locally"] ELSE BEGIN WriteString[" "]; WriteString[t]; IF remote.net#myNet THEN BEGIN WriteString[" net "]; WriteOctal[remote.net] END; WriteString[" host "]; WriteOctal[remote.host]; END; WriteLine["."]; SeeStorms[]; END; Push: PROCEDURE = BEGIN pushBS: Stream.Handle _ NIL; buffer: DataBuffer; block: Stream.Block _ [@buffer,0,blockSize]; pushI: CARDINAL; attn: CARDINAL _ 0; -- format buffer (constant) here UNTIL stopFlag DO BEGIN ENABLE StreamClosing => BEGIN WriteString["Push ByteStream closed: why="]; WriteLine[whyText[why]]; GOTO Closed END; attn _ (attn+1) MOD blocksPerAttn; pushBS _ PupByteStreamCreate[pushHim,SecondsToTocks[1] ! StreamClosing => BEGIN WriteString["Connection failed: "]; WriteLine[whyText[why]]; GOTO Failed; END]; IF info THEN WriteChar['O]; SELECT mode FROM block => BEGIN -- format buffer (changing) here FOR pushI IN [1..blockCycle] DO Stream.PutBlock[pushBS,block,FALSE]; IF ~doStats THEN StatBump[statDataBytesSent,blockSize]; IF slow THEN CursorBits[sendIndex] _ CursorBits[sendIndex]+1; IF (pushI MOD blocksPerAttn)=attn THEN Stream.SendAttention[pushBS,0]; ENDLOOP; IF info THEN WriteChar['S]; Stream.PutBlock[pushBS,[NIL,0,0],TRUE]; IF ~slow THEN CursorBits[sendIndex] _ CursorBits[sendIndex]+1; END; ENDCASE => ERROR; EXITS Closed, Failed => NULL; END; IF pushBS#NIL THEN Stream.Delete[pushBS]; pushBS _ NIL; pushCycles _ pushCycles+1; ENDLOOP; Done[]; END; Pull: PROCEDURE [listener: POINTER] = BEGIN attn: PROCESS; pullBS: Stream.Handle _ NIL; buffer: DataBuffer; block: Stream.Block _ [@buffer,0,blockSize]; pullI: CARDINAL; bytes: CARDINAL; why: Stream.CompletionCode; sst: Stream.SubSequenceType; attns: CARDINAL; EatAttn: PROCEDURE = BEGIN DO [] _ Stream.WaitForAttention[pullBS ! ProcessDefs.Aborted => GOTO AllDone]; WriteChar['a]; attns _ attns+1; ENDLOOP; EXITS AllDone => NULL; END; DO IF listener=NIL THEN BEGIN pullBS _ PupByteStreamMake[ pullMe.socket, pullHim, SecondsToTocks[1], wait, [0,0]]; END ELSE pullBS _ listener; attns _ 0; attn _ FORK EatAttn[]; BEGIN ENABLE StreamClosing => BEGIN WriteString["Pull ByteStream closed: why="]; WriteLine[whyText[why]]; GOTO Closed; END; SELECT mode FROM block => BEGIN Stream.SetInputOptions[pullBS,Stream.defaultInputOptions]; FOR pullI IN [1..blockCycle] DO -- zorch buffer (constant) here [bytes,why,sst] _ Stream.GetBlock[pullBS,block ! Stream.TimeOut => BEGIN IF info THEN WriteChar['?]; RESUME; END ]; SELECT why FROM normal => NULL; endRecord => BEGIN MiscDefs.CallDebugger["endRecord -- ???"]; END; sstChange => BEGIN MiscDefs.CallDebugger["sstChange -- ???"]; END; ENDCASE => ERROR; IF bytes#(blockSize) THEN BEGIN MiscDefs.CallDebugger["wrong length -- ???"]; END; IF slow THEN CursorBits[recvIndex] _ CursorBits[recvIndex]+1; IF ~doStats THEN StatBump[statDataBytesReceived,blockSize]; -- check buffer now ENDLOOP; IF info THEN WriteChar['R]; END; ENDCASE => ERROR; -- this shouldn't work [bytes,why,sst] _ Stream.GetBlock[pullBS,block ! Stream.TimeOut => BEGIN IF info THEN WriteChar['?]; RESUME; END; StreamClosing => BEGIN IF why=remoteClose THEN GOTO Closed; -- normal case WriteString["Pull ByteStream closed at right time: why="]; WriteLine[whyText[why]]; GOTO Closed; END]; WriteLine["Got something extra!"]; EXITS Closed=>NULL; END; ProcessDefs.Abort[attn]; JOIN attn; IF attns#attnsPerCycle THEN MiscDefs.CallDebugger["wrong number of attns"]; Stream.Delete[pullBS]; pullCycles _ pullCycles+1; IF listener#NIL THEN BEGIN IF info THEN WriteChar['.]; RETURN; END ELSE IF stopFlag THEN EXIT; ENDLOOP; Done[]; END; Hello: PROCEDURE [bs: Stream.Handle, remote: PupAddress] = BEGIN IF info THEN BEGIN WriteChar['L]; IF lastCaller.host#remote.host OR lastCaller.net#remote.net THEN BEGIN IF remote.net#myNet THEN BEGIN WriteOctal[remote.net]; WriteChar['#]; END; WriteOctal[remote.host]; END; END; lastCaller_remote; ProcessDefs.Detach[FORK Pull[bs]]; END; PtOCListen: PUBLIC PROCEDURE = BEGIN lastCaller _ [[0],[0],[0,0]]; listener _ CreatePupByteStreamListener[socNum,Hello,SecondsToTocks[1]]; END; PtOCPushPull: PUBLIC PROCEDURE = BEGIN Header["Exchanging","with",pushHim]; PtLookerReady[]; ProcessDefs.Detach[FORK Push[]]; ProcessDefs.Detach[FORK Pull[NIL]]; WaitUntilDone[2]; END; PtOCRecv: PUBLIC PROCEDURE = BEGIN UnListen[]; IF pullHim.socket=[0,0] THEN BEGIN WriteLine["Need a socket number."]; RETURN; END; Header["Receiving","from",pullHim]; PtLookerReady[]; ProcessDefs.Detach[FORK Pull[NIL]]; WaitUntilDone[1]; END; PtOCSend: PUBLIC PROCEDURE = BEGIN Header["Sending","to",pushHim]; PtLookerReady[]; ProcessDefs.Detach[FORK Push[]]; WaitUntilDone[1]; END; PtOCDoubleSend: PUBLIC PROCEDURE = BEGIN Header["Sending Double","to",pushHim]; PtLookerReady[]; ProcessDefs.Detach[FORK Push[]]; ProcessDefs.Detach[FORK Push[]]; WaitUntilDone[2]; END; PtOCUnListen: PUBLIC PROCEDURE = BEGIN IF listener=NIL THEN RETURN[]; DestroyPupListener[listener]; listener_NIL; WriteLine["OpenClose Unlisten."]; END; -- here is what gets STARTed END.(0,65535)(1,3528)(2,4057)(9,12347)\18t10 7t0 248t2 8t0 6t2 1t0 818t7 4t0 7t7 11t0 110t7 2t2 2t0 227b6B94t2 94t0 215t2 4t0 95b4B53t2 10t0 10t2 11t0 36t2 4t0 39t2 2t4 33t2 2t0 157t3 1t0 244t3 6t0 13t2 2t0 1t2 7t0 4t2 25t0 17t3 6t0 23t2 8t4 33t2 8t0 32t4 10t0 37t2 10t0 66t2 3t0 4t2 68t0 30t2 6t0 20t2 1t0 8t2 13t0 17t2 3t0 4t2 6t0 15t4 6t0 50t2 3t0 5t2 55t0 13t3 67t0 64t2 30t3 5t0 19t2 2t0 7b4B90t2 10t0 10t2 11t0 36t2 4t0 116t2 2b7B35t0 30t2 1t0 6t2 72t0 1t2 7t0 1t2 57t0 3t2 4t0 184t2 9t0 6t2 3t0 8t2 48t0 153t2 5t0 17t2 6t0 61t5 26t0 42t4 43t0 51t5 78t0 27t4 1t5 44t0 348t2 3t0 4t2 17t0 9t2 15t0 9t2 3t0 72t4 57t2 3t0 4t2 6t0 28t2 6t3 17t0 73t5 5t0 11t5 17t0 11t5 5t0 49t5 7t0 11t5 3t0 4t2 6t0 333t2 4t0 12t2 4t0 109t2 4t0 4t2 9t0 4t2 9t0 131t2 2t0 7b5B372b1B44b10B41t2 3t0 27t2 3t0 69b12B188b8B256b8B145b14B187b12B