<<>> <> <> <> <> DIRECTORY Basics USING [BITAND, BITLSHIFT, BITOR, BITRSHIFT, ByteBlt, FillBytes, RawBytes, RawChars], CStrings USING [CString], IO USING [CreateStream, CreateStreamProcs, EndOfStream, PutFR, PutFR1, STREAM, StreamProcs, UnsafeBlock], IOUtils USING [AmbushStream, closedStreamProcs], MentatInterface USING [maxXNSPacketSize, xnsSPPDevice], Process USING [Detach, EnableAborts, MsecToTicks, SetTimeout, Ticks], Rope USING [Concat, ROPE], RuntimeError USING [BoundsFault], TLI USING [CONNECT, ConnectionData, ConnectionDataRep, DATA, DISCONNECT, EXDATA, EXPEDITED, LISTEN, MORE, ORDREL, States, TBind, TBindCall, TCall, TClose, TConnect, TERROR, TGetState, TLook, TOpen, TRcv, TRcvRel, TSnd, TSndDis, TSndRel, UDERR ], UXStrings USING [Create], UnixErrno USING [Errno, GetErrno], UnixTypes USING [CHARPtr], XNS USING [Address, IsMulticastHost], XNSSPPTypes USING [defaultSST, SubSequenceType], XNSStream USING [AttentionType, CloseReason, dontWait, Milliseconds, State, waitForever]; XNSStreamImpl: CEDAR MONITOR LOCKS handle USING handle: Handle IMPORTS Basics, IO, IOUtils, Process, Rope, RuntimeError, TLI, UXStrings, UnixErrno, XNS EXPORTS XNSStream ~ BEGIN <> ROPE: TYPE ~ Rope.ROPE; Milliseconds: TYPE ~ XNSStream.Milliseconds; -- TYPE ~ CARD32; waitForever: Milliseconds ~ XNSStream.waitForever; -- 1800000 1/2 hour of milliseconds dontWait: Milliseconds ~ XNSStream.dontWait; -- 0; CloseReason: TYPE ~ XNSStream.CloseReason; State: TYPE ~ XNSStream.State; SubSequenceType: TYPE ~ XNSSPPTypes.SubSequenceType; defaultSST: SubSequenceType ~ XNSSPPTypes.defaultSST; AttentionType: TYPE ~ XNSStream.AttentionType; -- BYTE; STREAM: TYPE ~ IO.STREAM; <> <> <<>> BufferIndex: TYPE ~ [0 .. MentatInterface.maxXNSPacketSize]; BufferState: TYPE ~ { empty, emptying, filling, full }; Buffer: TYPE ~ REF BufferRep; BufferRep: TYPE ~ RECORD [ next: Buffer ¬ NIL, -- the next one on the chain. This is so that we can get more buffers filled ahead. state: BufferState ¬ empty, attention: BOOL ¬ FALSE, -- does the buffer contain expidited data sequence: INT ¬ 0, -- the inband attention sequence number. This is used to remove OOB attentions from the queue eom: BOOL ¬ FALSE, -- was eom set in this packet sst: SubSequenceType ¬ 0, -- the data stream type of this packet index: BufferIndex ¬ 0, bytes: BufferIndex ¬ 0, buffer: POINTER, -- conviently setup for us ahead of time charPTR: UnixTypes.CHARPtr, -- also set up for us bufferRef: REF TEXT, -- so we can hold onto the bytes flags: REF INT -- this is here so we dont allocate one on each call to ReadBuffer ]; OBAttn: TYPE ~ REF OBAttnObject; OBAttnObject: TYPE ~ RECORD [ next: OBAttn, sequence: INT, -- which attention packet is this. The idea is that this number is assigned by us rather than by the packet sequence number. It's used to correlate both the OOB and in band copy. attentionType: AttentionType ]; Calls: TYPE ~ REF CallsRep; CallsRep: TYPE ~ RECORD [ reqTBind: REF TLI.TBindCall, -- what socket should try to use during TBind reqTBindAddress: REF XNS.Address, -- the storage behind the reqTBind retTBind: REF TLI.TBindCall, -- what socket did we actually bind to retTBindAddress: REF XNS.Address, -- the storage behind the retTBind sendTCall: REF TLI.TCall, -- used as the requested address of the TConnect sendTCallAddress: REF XNS.Address, -- the storage behind sendTCall rcvTCall: REF TLI.TCall, -- used as the answer from TConnect rcvTCallAddress: REF XNS.Address -- the storage behind rcvTCall ]; Handle: TYPE ~ REF HandleRep; HandleRep: TYPE ~ MONITORED RECORD [ next: Handle ¬ NIL, cd: TLI.ConnectionData, calls: Calls, -- the various call data structures we need local: XNS.Address ¬ NULL, -- what socket are we using remote: XNS.Address ¬ NULL, -- who are we connected to closed: BOOL ¬ FALSE, -- for later calls to do things on this stream closeReason: CloseReason ¬ unknown, closeText: ROPE ¬ NIL, <> auxProcCnt: CARDINAL ¬ 0, auxProcsDone: CONDITION, <> getTimeout: Milliseconds ¬ 0, putTimeout: Milliseconds ¬ 0, <> <> currentInputBuffer: Buffer ¬ NIL, -- the buffer we are working on currentInputSST: SubSequenceType ¬ defaultSST, -- what the sst is now inBuffers: Buffer, -- the input buffer chain. This contains all buffers that contain data to be passed to the client inFreeBuffers: Buffer, -- the free buffers available for input inputWaiting: BOOL ¬ FALSE, -- are we currently in ReceiveBuffer inputReady: CONDITION, inputBufferFree: CONDITION, -- this indicates that there is a newly freed buffer outOfBandAttnNumber: INT ¬ 0, -- this number is assigned to and then incremented whenever an out of band attention is received. This number should never be found in the recvdOBAttnList since that means we wrapped around inBandAttnNumber: INT ¬ 0, -- this number is assigned to the inband copy of the attention packet. It is used to locate the oob attention to kill when this one is delivered. recvdOBAttnList: OBAttn ¬ NIL, -- Out-of-Band Attentions received and not yet read by WaitAttention recvdOBAttn: CONDITION, -- wake up for WaitAttention inputSSType: SubSequenceType ¬ 0, -- SubSequenceType last seen by the stream client <> currentOutputBuffer: Buffer ¬ NIL, -- the buffer we are working on currentOutputSST: SubSequenceType ¬ defaultSST, -- what the sst is now outputBuffers: Buffer, -- the output buffer list. These are ready for sending outputFreeBuffers: Buffer, -- the list of unused output buffers outputReady: CONDITION, outputBufferFree: CONDITION, -- indicates a buffer is available for output flusher: CONDITION, -- there is a process watching us fill up buffers. When this fires, the buffer gets sent <> spare: REF ¬ NIL -- for no good reason ]; ByteArrayPtr: TYPE = LONG POINTER TO Basics.RawBytes; <> packetSize: INT ~ MentatInterface.maxXNSPacketSize; -- for now, only deal with TRcv and TSnd calls using this size buffer. This size also determines the amount of storage in the buffer.buffer part. flushTime: Process.Ticks ~ Process.MsecToTicks[500]; -- flush every 1/2 second <> ConnectionClosed: PUBLIC ERROR [why: CloseReason, text: Rope.ROPE] ~ CODE; Timeout: PUBLIC SIGNAL ~ CODE; BadSysCall: ERROR [why: ROPE] ~ CODE; LookEvent: ERROR [lookEvent: INT] ~ CODE; SystemError: ERROR ~ CODE; <> streamProcs: REF IO.StreamProcs; CreateStreamProcs: PROC ~ { streamProcs ¬ IO.CreateStreamProcs [ variety: $inputOutput, class: $XNS, getChar: GetChar, getBlock: GetBlock, unsafeGetBlock: UnsafeGetBlock, endOf: EndOf, charsAvail: CharsAvail, putChar: PutChar, unsafePutBlock: UnsafePutBlock, flush: Flush, close: Close ]; }; GetChar: PROC [self: STREAM] RETURNS [CHAR] = TRUSTED { buff: PACKED ARRAY [0..3] OF CHAR; bp: ByteArrayPtr = LOOPHOLE[LONG[@buff]]; IF UnsafeGetBlock[self, [base: bp, startIndex: 0, count: 1]] = 0 THEN ERROR IO.EndOfStream[self]; RETURN[buff[0]] }; GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT] = TRUSTED { nBytesRead ¬ UnsafeGetBlock[self, [ base: LOOPHOLE[block, ByteArrayPtr]+SIZE[TEXT[0]], startIndex: startIndex, count: MAX[MIN[INT[count], INT[block.maxLength]-startIndex], 0] ]]; block.length ¬ startIndex + nBytesRead; RETURN[nBytesRead]; }; UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT ¬ 0] ~ { handle: Handle ~ NARROW[self.streamData]; IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault; TRUSTED {nBytesRead ¬ EntryUnsafeGetBlock[handle: handle, block: block]}; RETURN[nBytesRead]; }; EntryUnsafeGetBlock: ENTRY PROC [handle: Handle, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT ¬ 0] ~ { -- with all these things that change the state kept by the handle, we best be in a monitor ENABLE UNWIND => NULL; startIndex: INT ¬ block.startIndex; stop: INT ~ block.startIndex+block.count; nBytes: NAT; buffer: Buffer ¬ handle.currentInputBuffer; WHILE startIndex < stop DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF buffer = NIL THEN { -- we need to get a buffer buffer ¬ GetFirstInputBuffer[handle: handle]; -- this will block until a packet arrives handle.currentInputBuffer ¬ buffer; -- so that we know where we are. PrintF[IO.PutFR1["buffer = %g\n", [text[buffer.bufferRef]]]]; }; IF buffer.attention THEN RETURN[nBytesRead]; -- pass along any of the bytes that might have already been read into the block. If this is really the buffer from the previous call, it's ok since nBytesRead will still be 0. Let the caller worry about EndOfStream. IF buffer.sst # handle.currentInputSST THEN RETURN[nBytesRead]; -- the currentSST isn't updated until the client calls GetStatus[..., reset: TRUE] to get passed it. This means EndOfStream IF buffer.index = buffer.bytes THEN { -- this buffer is used up. Check for EOM IF buffer.eom THEN RETURN [nBytesRead] -- no more bytes in the packet and it's EOM ELSE { -- we need to get more bytes since this buffer is empty and it's not EOM FreeInputBuffer[handle: handle, buffer: buffer]; -- give back the empty one buffer ¬ NIL; -- This is really cheating since the buffer = NIL check is going to fetch a fresh one for us when we go around LOOP; }; }; <> TRUSTED { nBytes ¬ Basics.ByteBlt[ to: [ blockPointer: block.base, startIndex: startIndex, stopIndexPlusOne: stop], from: [ blockPointer: buffer.buffer, startIndex: buffer.index, stopIndexPlusOne: buffer.bytes] ]; }; <<>> <> buffer.index ¬ buffer.index + nBytes; nBytesRead ¬ nBytesRead + nBytes; startIndex ¬ startIndex + nBytes; ENDLOOP; -- WHILE startIndex < stop }; EndOf: PROC [self: STREAM] RETURNS [BOOL] ~ { handle: Handle ~ NARROW[self.streamData]; RETURN [EntryEndOf[handle]]; }; EntryEndOf: ENTRY PROC [handle: Handle] RETURNS [BOOL] ~ { ENABLE UNWIND => NULL; buffer: Buffer ~ handle.currentInputBuffer; IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF buffer = NIL THEN RETURN[FALSE]; -- I'm not really sure what this means RETURN [ buffer.attention OR buffer.sst # handle.currentInputSST OR ((buffer.index = buffer.bytes) AND buffer.eom) ]; }; CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [nChars: INT] ~ { handle: Handle ~ NARROW[self.streamData]; RETURN[EntryCharsAvail[handle: handle, wait: wait]]; }; EntryCharsAvail: ENTRY PROC [handle: Handle, wait: BOOL] RETURNS [INT] = { ENABLE UNWIND => NULL; DO buffer: Buffer ~ handle.currentInputBuffer; IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF buffer = NIL THEN { -- there isn't a buffer now IF NOT wait THEN RETURN[0]; -- there aren't any bytes WAIT handle.inputReady; LOOP; -- there should be one now }; IF buffer.attention OR buffer.sst # handle.currentInputSST THEN RETURN[LAST[INT]]; -- GetChar will raise EndOfStream IF buffer.index < buffer.bytes THEN RETURN [(buffer.bytes - buffer.index)]; IF buffer.eom THEN RETURN [LAST[INT]]; ENDLOOP; }; PutChar: PROC [self: STREAM, char: CHAR] = TRUSTED { buff: PACKED ARRAY [0..3] OF CHAR; bp: ByteArrayPtr ¬ LOOPHOLE[LONG[@buff]]; buff[0] ¬ char; UnsafePutBlock[self, [base: bp, startIndex: 0, count: 1]]; }; UnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ { handle: Handle ~ NARROW[self.streamData]; IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault; TRUSTED {EntryUnsafePutBlock[handle: handle, block: block]}; }; EntryUnsafePutBlock: ENTRY PROC [handle: Handle, block: IO.UnsafeBlock] ~ { startIndex: INT ¬ block.startIndex; stop: INT ~ block.startIndex+block.count; nBytes: NAT; buffer: Buffer ¬ handle.currentOutputBuffer; WHILE startIndex < stop DO IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF buffer = NIL THEN { -- we need to get a buffer buffer ¬ AllocateOutputBuffer[handle: handle]; -- this will block one is free handle.currentOutputBuffer ¬ buffer; -- so that we know where we are. buffer.index ¬ 0; -- insure that the buffer is empty before we start filling it up buffer.bytes ¬ LAST[BufferIndex]; -- set the last byte position }; TRUSTED { nBytes ¬ Basics.ByteBlt[ to: [ blockPointer: buffer.buffer, startIndex: buffer.bytes, stopIndexPlusOne: LAST[BufferIndex]], from: [ blockPointer: block.base, startIndex: startIndex, stopIndexPlusOne: stop] ]; }; buffer.index ¬ buffer.index + nBytes; startIndex ¬ startIndex + nBytes; IF buffer.index = buffer.bytes THEN { -- the buffer is full buffer.state ¬ full; buffer.attention ¬ FALSE; -- this isn't an attention byte buffer.eom ¬ FALSE; -- not eom either buffer.sst ¬ handle.currentOutputSST; -- set the SST. The currentOutputSST is changed by SetSSType. AppendOutputBuffer[handle: handle, buffer: buffer]; buffer ¬ NIL; -- so that we get a fresh one LOOP; -- go around again to see if we got the whole block }; ENDLOOP; -- WHILE startIndex < stop }; Flush: PROC [self: STREAM] ~ { handle: Handle ~ NARROW[self.streamData]; IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; }; Close: PROC [self: STREAM, abort: BOOL] ~ { handle: Handle ~ NARROW[self.streamData]; OrderlyClose[handle: handle, initiate: TRUE]; EntryNotifyClosed[handle: handle, closeReason: localClose, closeText: "close called"]; CloseTheStream[self: self]; }; BlastClose: PROC [handle: Handle] = { cd: TLI.ConnectionData ~ handle.cd; [] ¬ TLI.TClose[cd: cd]; }; OrderlyClose: PROC [handle: Handle, initiate: BOOL] = { cd: TLI.ConnectionData ~ handle.cd; state: TLI.States; IF initiate THEN [] ¬ TLI.TSndRel[cd: cd]; state ¬ TLI.TGetState[cd: cd]; SELECT state FROM inrel => [] ¬ TLI.TRcvRel[cd: cd]; unbnd, idle, outrel => NULL; ENDCASE => [] ¬ TLI.TSndDis[cd: cd, call: NIL]; [] ¬ TLI.TClose[cd: cd]; RETURN; }; CloseTheStream: PROC [self: STREAM] = { IF self.streamProcs = IOUtils.closedStreamProcs THEN RETURN; IOUtils.AmbushStream[self: self, streamProcs: IOUtils.closedStreamProcs, streamData: self.streamData]; }; NotifyClosed: INTERNAL PROC [handle: Handle, closeReason: CloseReason, closeText: ROPE] ~ { IF NOT handle.closed THEN { handle.closeReason ¬ closeReason; handle.closeText ¬ closeText; handle.closed ¬ TRUE; }; BROADCAST handle.inputReady; BROADCAST handle.inputBufferFree; BROADCAST handle.recvdOBAttn; BROADCAST handle.outputReady; BROADCAST handle.outputBufferFree; BROADCAST handle.flusher; }; EntryNotifyClosed: ENTRY PROC [handle: Handle, closeReason: CloseReason, closeText: ROPE] ~ { ENABLE UNWIND => NULL; NotifyClosed[handle, closeReason, closeText] }; <> createBuffers: CARDINAL ¬ 6; -- ???? Create: PUBLIC PROC [remote: XNS.Address, getTimeout: Milliseconds ¬ waitForever, putTimeout: Milliseconds ¬ waitForever] RETURNS [IO.STREAM] = { handle: Handle ¬ NIL; cd: TLI.ConnectionData ~ AllocateCD[]; calls: Calls ~ AllocateCalls[]; path: CStrings.CString ~ UXStrings.Create[MentatInterface.xnsSPPDevice]; return: INT; retTBind: REF TLI.TBindCall ~ calls.retTBind; sendCall: REF TLI.TCall ~ calls.sendTCall; rcvCall: REF TLI.TCall ~ calls.rcvTCall; calls.sendTCallAddress­ ¬ remote; -- load up the address we want to talk to IF XNS.IsMulticastHost[remote.host] -- check to see if it's multicast THEN ERROR ConnectionClosed[noRoute, "multicast remote"]; <> <> IF TLI.TOpen[cd: cd, path: path, flags: [access: RDWR]] < 0 THEN { -- can't open the device error: ROPE ~ IO.PutFR["TOpen failed: cd.errno = %g, errno = %g", [integer[LOOPHOLE[cd.errno]]], [integer[LOOPHOLE[UnixErrno.GetErrno[]]]]]; ERROR ConnectionClosed[localClose, error]; }; IF TLI.TBind[cd: cd, request: NIL, return: retTBind] < 0 THEN { -- can't bind to any socket error: ROPE ~ IO.PutFR["TBind failed: cd.errno = %g, errno = %g", [integer[LOOPHOLE[cd.errno]]], [integer[LOOPHOLE[UnixErrno.GetErrno[]]]]]; [] ¬ TLI.TClose[cd: cd]; ERROR ConnectionClosed[localClose, error]; }; return ¬ TLI.TConnect[cd: cd, sndCall: sendCall, rcvCall: rcvCall]; IF return < 0 THEN { error: ROPE ¬ IO.PutFR["TConnect failed: cd.errno = %g, errno = %g ", [integer[LOOPHOLE[cd.errno]]], [integer[LOOPHOLE[UnixErrno.GetErrno[]]]]]; IF cd.errno = TLOOK THEN { IsEvent: PROC [event: INT] RETURNS [BOOL] = INLINE { RETURN[ Basics.BITAND[events, event] # 0]; }; events: INT ¬ TLI.TLook[cd: cd]; error ¬ IO.PutFR["%g\nTLook event = %g\n", [rope[error]], [integer[events]]]; IF IsEvent[TLI.LISTEN] THEN error ¬ Rope.Concat[error, "LISTEN "]; IF IsEvent[TLI.CONNECT] THEN error ¬ Rope.Concat[error, "CONNECT "]; IF IsEvent[TLI.DATA] THEN error ¬ Rope.Concat[error, "DATA "]; IF IsEvent[TLI.EXDATA] THEN error ¬ Rope.Concat[error, "EXDATA "]; IF IsEvent[TLI.DISCONNECT] THEN { error ¬ Rope.Concat[error, "DISCONNECT "]; ERROR ConnectionClosed[remoteClose, error]; }; IF IsEvent[TLI.TERROR] THEN error ¬ Rope.Concat[error, "TERROR "]; IF IsEvent[TLI.UDERR] THEN error ¬ Rope.Concat[error, "UDERR "]; IF IsEvent[TLI.ORDREL] THEN error ¬ Rope.Concat[error, "ORDREL "]; }; [] ¬ TLI.TClose[cd: cd]; ERROR ConnectionClosed[localClose, error]; }; <> handle ¬ MakeHandle[sendBuffers: createBuffers, recvBuffers: createBuffers, getTimeout: getTimeout, putTimeout: putTimeout]; handle.cd ¬ cd; handle.calls ¬ calls; handle.remote ¬ calls.rcvTCallAddress­; handle.local ¬ calls.retTBindAddress­; RETURN [IO.CreateStream[streamProcs: streamProcs, streamData: handle]]; }; GetLocal: PUBLIC PROC [self: STREAM] RETURNS [local: XNS.Address] = { handle: Handle = NARROW[self.streamData]; RETURN [EntryGetLocal[handle: handle]]; }; EntryGetLocal: ENTRY PROC [handle: Handle] RETURNS [local: XNS.Address] = { ENABLE UNWIND => NULL; RETURN[handle.local]; }; GetRemote: PUBLIC PROC [self: STREAM] RETURNS [remote: XNS.Address] = { handle: Handle = NARROW[self.streamData]; RETURN [EntryGetRemote[handle: handle]]; }; EntryGetRemote: ENTRY PROC [handle: Handle] RETURNS [local: XNS.Address] = { ENABLE UNWIND => NULL; RETURN[handle.remote]; }; GetTimeouts: PUBLIC PROC [self: STREAM] RETURNS [getTimeout, putTimeout: Milliseconds ¬ 0] = { <<Body>> }; SetTimeouts: PUBLIC PROC [self: STREAM, getTimeout, putTimeout: Milliseconds ¬ waitForever] = { <<Body>> }; SetSSType: PUBLIC PROC [self: STREAM, ssType: SubSequenceType] = { handle: Handle ~ NARROW[self.streamData]; EntrySetSSType[handle: handle, ssType: ssType]; }; EntrySetSSType: ENTRY PROC [handle: Handle, ssType: SubSequenceType] = { ENABLE UNWIND => NULL; buffer: Buffer ¬ handle.currentOutputBuffer; IF buffer # NIL THEN { -- we aren't at a boundry. We need to force one buffer.state ¬ full; buffer.attention ¬ FALSE; -- this isn't an attention byte buffer.eom ¬ FALSE; -- not eom either buffer.sst ¬ handle.currentOutputSST; -- set the SST. The currentOutputSST is changed by SetSSType. AppendOutputBuffer[handle: handle, buffer: buffer]; handle.currentOutputSST ¬ ssType; -- set the SST for all new packets RETURN; -- we did what we needed to }; <> handle.currentOutputSST ¬ ssType; -- set the SST for all new packets }; SendEndOfMessage: PUBLIC PROC [self: STREAM] = { handle: Handle ~ NARROW[self.streamData]; EntrySendEndOfMessage[handle: handle]; }; EntrySendEndOfMessage: ENTRY PROC [handle: Handle] = { ENABLE UNWIND => NULL; buffer: Buffer ¬ handle.currentOutputBuffer; IF buffer # NIL THEN { -- we aren't at a boundry. We need to force one buffer.state ¬ full; buffer.attention ¬ FALSE; -- this isn't an attention byte buffer.eom ¬ TRUE; -- not eom either buffer.sst ¬ handle.currentOutputSST; -- set the SST. The currentOutputSST is changed by SetSSType. AppendOutputBuffer[handle: handle, buffer: buffer]; handle.currentOutputBuffer ¬ NIL; -- we have set eom and forced the buffer onto the queue. RETURN; -- we did what we needed to }; <> buffer ¬ AllocateOutputBuffer[handle: handle]; -- this will block one is free buffer.index ¬ 0; -- insure that the buffer is empty before we start filling it up buffer.bytes ¬ 0; buffer.state ¬ full; buffer.attention ¬ FALSE; -- this isn't an attention byte buffer.eom ¬ TRUE; buffer.sst ¬ handle.currentOutputSST; AppendOutputBuffer[handle: handle, buffer: buffer]; handle.currentOutputBuffer ¬ NIL; }; SendAttention: PUBLIC PROC [self: STREAM, attentionType: AttentionType] = { <<Body>> }; SendClose: PUBLIC PROC [self: STREAM] RETURNS [ok: BOOL ¬ FALSE] = { <<Body>> }; SendCloseReply: PUBLIC PROC [self: STREAM] RETURNS [ok: BOOL ¬ FALSE] = { <<Body>> }; SendNow: PUBLIC PROC [self: STREAM] = { <<Body>> }; FlushInput: PUBLIC PROC [self: STREAM, wait: BOOL ¬ FALSE] RETURNS [bytesSkipped: CARD ¬ 0] = { <<Body>> }; WaitAttention: PUBLIC PROC [self: STREAM, waitTimeout: Milliseconds ¬ waitForever] RETURNS [AttentionType] = { RETURN[0]; }; GetStatus: PUBLIC PROC [self: STREAM, reset: BOOL ¬ TRUE] RETURNS [state: State, ssType: SubSequenceType, attentionType: AttentionType] = { handle: Handle = NARROW[self.streamData]; [state, ssType, attentionType] ¬ EntryGetStatus[handle, reset]; }; <> EntryGetStatus: ENTRY PROC [handle: Handle, reset: BOOL] RETURNS [state: State, ssType: SubSequenceType, attentionType: AttentionType] ~ { ENABLE UNWIND => NULL; buffer: Buffer ¬ handle.currentInputBuffer; IF handle.closed THEN RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText]; IF buffer = NIL THEN -- I think that the only state we can have now is -- RETURN[state: open, ssType: handle.currentInputSST, attentionType: 0]; IF buffer.attention THEN { state ¬ attention; ssType ¬ buffer.sst; -- the sst of the attention packet attentionType ¬ LOOPHOLE[buffer.bufferRef[0]]; -- still ugly IF reset THEN { -- dispose of this packet p: OBAttn ~ handle.recvdOBAttnList; IF (p # NIL) AND (p.sequence = handle.inBandAttnNumber) THEN -- delete it from the expidited queue handle.recvdOBAttnList ¬ p.next; FreeInputBuffer[handle: handle, buffer: buffer]; handle.currentInputBuffer ¬ NIL; -- so that UnsafeGetBlock can make progress }; RETURN; }; -- IF buffer.attention IF buffer.sst # handle.currentInputSST THEN { IF reset THEN handle.currentInputSST ¬ buffer.sst; RETURN[state: ssTypeChange, ssType: buffer.sst, attentionType: 0]; }; IF buffer.eom THEN { IF reset THEN { -- give the packet back FreeInputBuffer[handle: handle, buffer: buffer]; handle.currentInputBuffer ¬ NIL; -- so that UnsafeGetBlock can make progress }; RETURN [state: endOfMessage, ssType: handle.currentInputSST, attentionType: 0]; }; RETURN [state: open, ssType: handle.currentInputSST, attentionType: 0]; }; <> AllocateCD: PROC RETURNS [cd: TLI.ConnectionData] = { bytes: CARD32 ~ BYTES[TLI.ConnectionDataRep]; cd ¬ NEW[TLI.ConnectionDataRep]; -- the rest gets filled in in the call to TOpen TRUSTED {Basics.FillBytes[dstBase: LOOPHOLE[cd], dstStart: 0, count: bytes, value: 0]}; }; AllocateCalls: PROC RETURNS [calls: Calls] = { calls ¬ NEW[CallsRep]; calls.reqTBindAddress ¬ NEW[XNS.Address]; calls.reqTBind ¬ NEW[TLI.TBindCall]; calls.reqTBind.addr.maxlen ¬ calls.reqTBind.addr.len ¬ BYTES[XNS.Address]; TRUSTED {calls.reqTBind.addr.char ¬ LOOPHOLE[calls.reqTBindAddress, UnixTypes.CHARPtr]}; calls.retTBindAddress ¬ NEW[XNS.Address]; calls.retTBind ¬ NEW[TLI.TBindCall]; calls.retTBind.addr.maxlen ¬ calls.retTBind.addr.len ¬ BYTES[XNS.Address]; TRUSTED {calls.retTBind.addr.char ¬ LOOPHOLE[calls.retTBindAddress, UnixTypes.CHARPtr]}; calls.sendTCallAddress ¬ NEW[XNS.Address]; calls.sendTCall ¬ NEW[TLI.TCall]; calls.sendTCall.addr.maxlen ¬ calls.sendTCall.addr.len ¬ BYTES[XNS.Address]; TRUSTED {calls.sendTCall.addr.char ¬ LOOPHOLE[calls.sendTCallAddress, UnixTypes.CHARPtr]}; calls.rcvTCallAddress ¬ NEW[XNS.Address]; calls.rcvTCall ¬ NEW[TLI.TCall]; calls.rcvTCall.addr.maxlen ¬ calls.rcvTCall.addr.len ¬ BYTES[XNS.Address]; TRUSTED {calls.rcvTCall.addr.char ¬ LOOPHOLE[calls.rcvTCallAddress, UnixTypes.CHARPtr]}; }; MakeHandle: PROC [sendBuffers, recvBuffers: CARD, getTimeout, putTimeout: Milliseconds] RETURNS [handle: Handle] ~ { handle ¬ NEW[HandleRep]; handle.cd ¬ NIL; -- filled in by our caller since it was needed before we got called handle.calls ¬ NIL; -- filled in by our caller since it was needed before we got called InitConditions[handle: handle]; InitTimeouts[handle, getTimeout, putTimeout]; handle.inBuffers ¬ NIL; handle.inFreeBuffers ¬ AllocateBuffers[number: recvBuffers]; handle.outputBuffers ¬ NIL; handle.outputFreeBuffers ¬ AllocateBuffers[number: sendBuffers]; Process.Detach[FORK Pull[handle]]; handle.auxProcCnt ¬ handle.auxProcCnt.SUCC; Process.Detach[FORK Push[handle]]; handle.auxProcCnt ¬ handle.auxProcCnt.SUCC; }; InitConditions: PROC [handle: Handle] ~ { TRUSTED { Process.EnableAborts[@handle.auxProcsDone]; -- ???? Process.EnableAborts[@handle.inputReady]; Process.EnableAborts[@handle.inputBufferFree]; Process.EnableAborts[@handle.recvdOBAttn]; Process.EnableAborts[@handle.outputReady]; Process.EnableAborts[@handle.outputBufferFree]; Process.EnableAborts[@handle.flusher]; }; }; InitTimeouts: PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ { TRUSTED { Process.SetTimeout[@handle.flusher, flushTime]; }; }; AllocateBuffers: PROC [number: INT] RETURNS [buffer: Buffer ¬ NIL] = { FOR i: INT IN [0 .. number) DO new: Buffer ~ NEW[BufferRep]; new.bufferRef ¬ NEW[TEXT[LAST[BufferIndex]]]; new.buffer ¬ LOOPHOLE[new.bufferRef, POINTER] + BYTES[TEXT[0]]; -- to the bits new.charPTR ¬ LOOPHOLE[new.buffer, UnixTypes.CHARPtr]; new.flags ¬ NEW[INT ¬ 0]; IF buffer = NIL THEN buffer ¬ new ELSE {buffer.next ¬ new; buffer ¬ buffer.next}; ENDLOOP; }; <> AllocateInputBuffer: ENTRY PROC [handle: Handle] RETURNS [buffer: Buffer]~ { ENABLE UNWIND => NULL; WHILE handle.inFreeBuffers = NIL DO -- we need to wait until one becomes available WAIT handle.inputBufferFree; ENDLOOP; buffer ¬ handle.inFreeBuffers; handle.inFreeBuffers ¬ handle.inFreeBuffers.next; -- who cares it it's nil }; FreeInputBuffer: INTERNAL PROC [handle: Handle, buffer: Buffer] = { ENABLE UNWIND => NULL; tmp: Buffer ¬ handle.inFreeBuffers; -- might be nil buffer.state ¬ empty; handle.inFreeBuffers ¬ buffer; handle.inFreeBuffers.next ¬ tmp; -- hang the free ones on the list BROADCAST handle.inputBufferFree; -- tell someone who is waiting for a buffer }; AppendInputBuffer: ENTRY PROC [handle: Handle, buffer: Buffer] = { ENABLE UNWIND => NULL; tmp: Buffer ¬ handle.inBuffers; -- the current head IF tmp = NIL THEN { -- first time handle.inBuffers ¬ buffer; BROADCAST handle.inputReady; RETURN; }; UNTIL tmp.next = NIL DO -- find the end tmp ¬ tmp.next; ENDLOOP; tmp.next ¬ buffer; -- tack it on BROADCAST handle.inputReady; }; GetFirstInputBuffer: INTERNAL PROC [handle: Handle] RETURNS [buffer: Buffer] = { ENABLE UNWIND => NULL; WHILE handle.inBuffers = NIL DO -- we need to wait until one becomes available WAIT handle.inputReady; ENDLOOP; buffer ¬ handle.inBuffers; -- take off the first one handle.inBuffers ¬ handle.inBuffers.next; -- might be nil, but who cares }; AllocateOutputBuffer: INTERNAL PROC [handle: Handle] RETURNS [buffer: Buffer]~ { ENABLE UNWIND => NULL; WHILE handle.outputFreeBuffers = NIL DO -- we need to wait until one becomes available WAIT handle.outputBufferFree; ENDLOOP; buffer ¬ handle.outputFreeBuffers; handle.outputFreeBuffers ¬ handle.outputFreeBuffers.next; -- who cares if it's nil }; FreeOutputBuffer: ENTRY PROC [handle: Handle, buffer: Buffer] = { ENABLE UNWIND => NULL; tmp: Buffer ¬ handle.outputFreeBuffers; -- might be nil buffer.state ¬ empty; handle.outputFreeBuffers ¬ buffer; handle.outputFreeBuffers.next ¬ tmp; -- hang the free ones on the list BROADCAST handle.outputBufferFree; -- tell someone who is waiting for a buffer }; AppendOutputBuffer: INTERNAL PROC [handle: Handle, buffer: Buffer] = { ENABLE UNWIND => NULL; tmp: Buffer ¬ handle.outputBuffers; -- the current head IF tmp = NIL THEN { -- first time handle.outputBuffers ¬ buffer; BROADCAST handle.outputReady; RETURN; }; UNTIL tmp.next = NIL DO -- find the end tmp ¬ tmp.next; ENDLOOP; tmp.next ¬ buffer; -- tack it on BROADCAST handle.outputReady; }; GetFirstOutputBuffer: ENTRY PROC [handle: Handle] RETURNS [buffer: Buffer] = { ENABLE UNWIND => NULL; WHILE handle.outputBuffers = NIL DO -- we need to wait until one becomes available WAIT handle.outputReady; ENDLOOP; buffer ¬ handle.outputBuffers; -- take off the first one handle.outputBuffers ¬ handle.outputBuffers.next; -- might be nil, but who cares }; <> NotifyAuxProcDone: ENTRY PROC [handle: Handle] ~ { ENABLE UNWIND => NULL; IF handle.auxProcCnt = 0 THEN ERROR; handle.auxProcCnt ¬ handle.auxProcCnt.PRED; IF handle.auxProcCnt = 0 THEN NOTIFY handle.auxProcsDone; }; WaitAuxProcsDone: ENTRY PROC [handle: Handle] ~ { ENABLE UNWIND => NULL; WHILE handle.auxProcCnt > 0 DO WAIT handle.auxProcsDone; ENDLOOP; }; <> Pull: PROC [handle: Handle] ~ { SetGetting: ENTRY PROC [handle: Handle] = {handle.inputWaiting ¬ TRUE}; UnSetGetting: ENTRY PROC [handle: Handle] = {handle.inputWaiting ¬ FALSE}; WHILE NOT handle.closed DO nRcv: INT; cd: TLI.ConnectionData ~ handle.cd; event: INT ¬ 0; look: BOOL ¬ FALSE; buffer: Buffer ¬ AllocateInputBuffer[handle: handle]; -- get (wait) for a free buffer. This really doesn't allocate anything, but that's the function it performs SetGetting[handle: handle]; nRcv ¬ ReceiveBuffer[cd: cd, buffer: buffer ! LookEvent => {event ¬ lookEvent; look ¬ TRUE; CONTINUE} ]; -- get the packet UnSetGetting[handle: handle]; IF NOT look THEN { ProcessBuffer[handle: handle, buffer: buffer, nRcv: nRcv]; AppendInputBuffer[handle: handle, buffer: buffer]; } ELSE { SELECT event FROM TLI.ORDREL => { -- the other end wants us to close in an orderly way PrintF["Orderly close requested"]; OrderlyClose[handle: handle, initiate: FALSE]; -- set up for the close. This proc returns only when we are closed with the other end EXIT; }; TLI.DISCONNECT => { -- the other end just wants us to go away PrintF["Disconnect requested"]; BlastClose[handle: handle]; -- don't really care how we close, just close EXIT; }; TLI.LISTEN => {ERROR BadSysCall["LISTEN"]}; -- an connect indication received TLI.CONNECT => {ERROR BadSysCall["CONNECT"]}; -- a connect confirmation received TLI.DATA => {ERROR BadSysCall["DATA"]}; -- normal data TLI.EXDATA => {ERROR BadSysCall["EXDATA"]}; -- expidited data TLI.TERROR => {ERROR BadSysCall["TERROR"]}; -- a real problem TLI.UDERR => {ERROR BadSysCall["UDERR"]}; -- should probably not happen either ENDCASE => {ERROR BadSysCall["ENDCASE"]}; -- unknown event type }; ENDLOOP; NotifyAuxProcDone[handle]; }; ReceiveBuffer: PROC [cd: TLI.ConnectionData, buffer: Buffer] RETURNS [INT] = { flags: REF INT ~ buffer.flags; -- will also contain the sst field nRcv: INT ¬ 0; IF buffer.state # empty THEN SystemError; -- this is probably not right, but I don't know what to do for the moment buffer.state ¬ filling; -- set the flag for anyone that might be waiting on it DO -- the loop is in here because of the errno and # TLOOK stuff in the error handler nRcv ¬ TLI.TRcv[cd: cd, buf: buffer.charPTR, nbytes: packetSize, flags: flags]; IF nRcv < 0 THEN { -- something is wrong error: UnixErrno.Errno ~ UnixErrno.GetErrno[]; IF (error = EINTR) AND (cd.errno # TLOOK) THEN LOOP; -- I'm not sure what this does SELECT cd.errno FROM TNODATA => { -- this is set when the underlying GetMsg returns < 0 and EAGAIN is the errno. I think that this can't happen according to the SunOS documentation IF error = EWOULDBLOCK THEN RETURN[nRcv]; -- I'm not sure that this can happen under pcr since the XR¬GetMsg is done by pcr as blocked IO. ERROR BadSysCall["TNODATA"]; }; TSYSERR => { -- this is the general catch-all for things that have gone wrong. I don't think that this can be recovered from ERROR BadSysCall["TSYSERR"]; }; TLOOK => { -- an asynchonous event happened event: INT ~ TLI.TLook[cd: cd]; ERROR LookEvent[event]; }; TBADF => -- a bad fd on TAccept -- ERROR BadSysCall["TBADF"]; TOUTSTATE => -- reports out of state -- ERROR BadSysCall["TOUTSTATE"]; TBUFOVFLW => -- control buffer to small -- ERROR BadSysCall["TBUFOVFLW"]; TFLOW => { -- indicates that flow control is invoked on this call. I think that this needs to be retried ERROR BadSysCall["TFLOW"]; }; TNODIS => -- we didn't get the right answer on a dissconnect -- ERROR BadSysCall["TNODIS"]; TNOREL => -- we didn't get the orderly release -- ERROR BadSysCall["TNOREL"]; TNOTSUPPORT => -- something is really wrong -- ERROR BadSysCall["TNOTSUPPORT"]; TSTATECHNG => -- state is changing -- ERROR BadSysCall["TSTATECHNG"]; ENDCASE => ERROR BadSysCall["something else"]; }; -- something is wrong buffer.state ¬ full; -- set the flag for anyone that might be waiting on it RETURN[nRcv]; ENDLOOP; }; ProcessBuffer: ENTRY PROC [handle: Handle, buffer: Buffer, nRcv: INT] = { ENABLE UNWIND => NULL; cd: TLI.ConnectionData ~ handle.cd; flags: REF INT ~ buffer.flags; -- also contains the sst field PrintF["ProcessBuffer"]; PrintF[IO.PutFR1[" nRcv = %g ", [integer[nRcv]]]]; buffer.index ¬ 0; -- reset the index buffer.bytes ¬ nRcv; -- set the number of bytes in the packet <> buffer.sst ¬ GetSST[flags: flags­]; <<>> <> IF GetAttention[flags: flags­] THEN { -- attention byte IF GetMore[flags: flags­] THEN { -- this is the expedited (OOB) copy of the attention sequence: INT ~ handle.outOfBandAttnNumber; IF buffer.bytes # 1 THEN ERROR SystemError; -- not good since there's supposed to be only one byte GotOBAttn[handle: handle, sequence: sequence, type: LOOPHOLE[buffer.bufferRef[0]]]; -- ugly ain't it handle.outOfBandAttnNumber ¬ handle.outOfBandAttnNumber + 1; -- update the number to get ready for the next one buffer.state ¬ full; -- mark our condition PrintF[" attention, OOB\n"]; BROADCAST handle.inputReady; RETURN; } ELSE { -- this is the second (InBand) copy sequence: INT ~ handle.inBandAttnNumber; IF buffer.bytes # 1 THEN ERROR SystemError; -- not good since there's supposed to be only one byte buffer.attention ¬ TRUE; -- this is an attention packet buffer.sequence ¬ sequence; -- to correlate with the OOB one handle.inBandAttnNumber ¬ handle.inBandAttnNumber + 1; buffer.state ¬ full; -- mark our condition PrintF[" attention, inline\n"]; BROADCAST handle.inputReady; RETURN; }; }; <<>> <> buffer.eom ¬ NOT GetMore[flags: flags­]; buffer.state ¬ full; -- mark our condition BROADCAST handle.inputReady; PrintF[IO.PutFR1["eom = %g\n", [boolean[buffer.eom]]]]; buffer.bufferRef.length ¬ nRcv; }; GotOBAttn: INTERNAL PROC [handle: Handle, sequence: INT, type: AttentionType] ~ { p, prev, new: OBAttn; prev ¬ NIL; p ¬ handle.recvdOBAttnList; DO IF p = NIL THEN EXIT; prev ¬ p; p ¬ p.next; ENDLOOP; new ¬ NEW[OBAttnObject ¬ [next: p, sequence: sequence, attentionType: type]]; IF prev = NIL THEN handle.recvdOBAttnList ¬ new ELSE prev.next ¬ new; BROADCAST handle.recvdOBAttn; }; <> Push: PROC [handle: Handle] ~ { -- there had best be only one of these per stream or things might get real confused cd: TLI.ConnectionData ~ handle.cd; sent: INT ¬ 0; WHILE NOT handle.closed DO buffer: Buffer ¬ GetFirstOutputBuffer[handle: handle]; -- get a buffer to send. This will block until one is ready flags: INT ¬ SetSST[sst: buffer.sst, flags: 0]; flags ¬ SetMore[more: NOT buffer.eom, flags: flags]; flags ¬ SetAttention[attention: buffer.attention, flags: flags]; sent ¬ TLI.TSnd[cd: cd, buf: buffer.charPTR, nbytes: buffer.bytes, flags: flags]; IF sent < 0 OR sent # buffer.bytes THEN { -- we need to disconnect BlastClose[handle: handle]; handle.closed ¬ TRUE; EXIT; }; FreeOutputBuffer[handle: handle, buffer: buffer]; ENDLOOP; NotifyAuxProcDone[handle]; }; <> GetAttention: PROC [flags: INT] RETURNS [attention: BOOL] = { RETURN[ Basics.BITAND[flags, TLI.EXPEDITED] # 0 ]; }; GetMore: PROC [flags: INT] RETURNS [more: BOOL] = { RETURN[ Basics.BITAND[flags, TLI.MORE] # 0 ]; }; GetSST: PROC [flags: INT] RETURNS [SubSequenceType] = { tmp: INT ~ Basics.BITAND[Basics.BITRSHIFT[value: flags, count: 8], 0FFh]; -- the sst IF tmp <= 255 THEN RETURN[LOOPHOLE[tmp]] ELSE ERROR SystemError; }; SetAttention: PROC [attention: BOOL, flags: INT ¬ 0] RETURNS [INT] = { RETURN[(IF attention THEN Basics.BITOR[flags, TLI.EXPEDITED] ELSE flags)]; }; SetMore: PROC [more: BOOL, flags: INT ¬ 0] RETURNS [INT] = { RETURN[(IF more THEN Basics.BITOR[flags, TLI.MORE] ELSE flags)]; }; SetSST: PROC [sst: SubSequenceType, flags: INT ¬ 0] RETURNS [INT] = { tmp: INT ¬ sst; tmp ¬ Basics.BITLSHIFT[value: flags, count: 8]; -- move it up 1 byte tmp ¬ Basics.BITAND[tmp, 0FF00h]; -- make sure that it's only the sst tmp ¬ Basics.BITOR[tmp, flags]; RETURN[tmp]; }; <> UnixPrintF: PROC [format: CStrings.CString] RETURNS [INT] = TRUSTED MACHINE CODE {"printf"}; PrintF: PROC [rope: ROPE] = { s: CStrings.CString ~ UXStrings.Create[rope]; [] ¬ UnixPrintF[s]; }; <> CreateStreamProcs[]; END.