XNSStream.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Hal Murray, January 17, 1986 3:04:15 am PST
Demers, December 19, 1986 8:52:26 pm PST
TODO:
Rendezvous.
DIRECTORY
IO USING [STREAM],
Rope USING [ROPE],
XNS USING [Address, Socket],
XNSSPPTypes USING [SubSequenceType];
XNSStream: CEDAR DEFINITIONS ~ {
Overview
Unlike most streams in Cedar, XNSStreams are "inputOutput": It is normal to both read and write an instance of an XNS stream.
Beware: If you only have one process talking to an XNS stream, you can easily get into buffer lockup problems. This happens when both machines are trying to send but all the input buffers (on both machines) are full. Both ends get stuck waiting for an allocation. Since there is only one process at each end, and that's trying to send, nobody will ever read anything to free up a buffer, so the allocation will never arrive.
One process will work if the user and server both operate in lock step.
You need yet another process if you are using Attentions.
Reading an in-line ATTENTION byte, a SubStreamType change, or an EndOfMessage causes an end-of-file condition. Use GetStatus[..., reset~TRUE] to read beyond it. Think of this as a tape containing several files.
There are two basic ways to create a stream:
(1) the usual client-server interaction usies Create[...] at the client end and CreateListener at the server end.
(2) rendezvous uses a yet-to-be-determined mechanism.
BasicTypes
BYTE: TYPE ~ [0..100H);
Timeouts are kept internally in BasicTime.Pulses and Process.Ticks. To avoid overflow, they are limited to 1/2 hour.
Milliseconds: TYPE ~ LONG CARDINAL;
waitForever: Milliseconds ~ 1800000; -- 1/2 hour of milliseconds
dontWait: Milliseconds ~ 0;
SubSequenceType: TYPE ~ XNSSPPTypes.SubSequenceType;
AttentionType: TYPE ~ [0..100H);
Listening
Listener: TYPE ~ REF ListenerObject;
ListenerObject: TYPE;
FilterProc: TYPE = PROC [remote: XNS.Address] RETURNS [accept: BOOL];
ListenerProc: TYPE = PROC [stream: IO.STREAM, remote: XNS.Address];
CreateListener: PROC [
socket: XNS.Socket,
worker: ListenerProc,
getTimeout, putTimeout: Milliseconds ← waitForever,
filter: FilterProc ← NIL, -- NIL => Accept all requests
echoFilter: FilterProc ← NIL] -- NIL => Answer all echos
RETURNS [Listener];
An active Listener will respond to a connection request by:
1) Checking for duplicates.
2) Calling the client's filter. Returning FALSE will reject the connection.
3) Creating a stream with the specified timeouts.
4) FORKing a new instance of worker to interact with the new stream.
5) Detaching the new process.
If filter returns TRUE, a worker will always be FORKed. (All error conditions have already been checked.) worker must catch StreamClosing and Timeout (unless waitForever was specified) and it should Close the stream before returning.
echoFilter is handy when several instances of a server run on different machines. Grapevine, for example, flings an echo packet at all the interesting servers, and then tries to open a connection to the first one that answers. echoFilter can be used to answer "no" if filter is very likley to reject a connection attempt. (There is no promise that filter or worker will be called if echoFilter doesn't reject a packet.)
DestroyListener: PROC [Listener];
Rendezvous
To be arranged.
Byte Stream Interface
Create: PROC [remote: XNS.Address, getTimeout: Milliseconds ← waitForever, putTimeout: Milliseconds ← waitForever] RETURNS [IO.STREAM];
Opening a connection may take a long time - over a minute. ABORT the process if you get impatient.
! ConnectionClosed.
GetTimeouts: PROC [self: IO.STREAM] RETURNS [getTimeout, putTimeout: Milliseconds];
SetTimeouts: PROC [self: IO.STREAM, getTimeout, putTimeout: Milliseconds ← waitForever];
Get or set the timeouts associated with an existing stream.
! ConnectionClosed.
SetSSType: PROC [self: IO.STREAM, ssType: SubSequenceType];
Change the SubSequenceType associated with the stream (output). "Changing" to the current SubSequenceType is a no-op.
The receiving end will see an end-of-file condition, which must be cleared using GetStatus[..., reset~TRUE], at each SubSequenceType change boundary.
An empty subsequence — two consecutive SubSequenceType change boundaries with no data in between — is possible.
! ConnectionClosed, Timeout.
SendEndOfMessage: PROC [self: IO.STREAM];
Set the end-of-message bit in the current output packet and do SendNow[self].
The receiving end will see an end-of-file condition, which must be cleared using GetStatus[..., reset~TRUE].
! ConnectionClosed, Timeout.
SendAttention: PROC [self: IO.STREAM, attentionType: AttentionType];
Send an attention packet with the given attentionType as its data contents.
The attention is sent both out-of-band and in-band.
At the receiving end, an out-of-band attention is detected using WaitAttention[self]; an in-band attention causes an end-of-file condition, which must be cleared using GetStatus[..., reset~TRUE]. When the in-band attention is cleared, the out-of-band attention is deleted as well.
! ConnectionClosed
SendClose: PROC [self: IO.STREAM] RETURNS [ok: BOOL];
The XNS close protocol: Send SSType change to XNSSPPTypes.endSST. Call IO.Flush[self]. Look for a timely reply with SSType of XNSSPPTypes.endReplySST. If one is received, send SSType change to XNSSPPTypes.endReplySST and return TRUE; otherwise return FALSE.
No ERRORS.
SendCloseReply: PROC [self: IO.STREAM] RETURNS [ok: BOOL];
The XNS close reply protocol: Send SSType change to XNSSPPTypes.endReplySST. Call IO.Flush[self]. Look for a timely reply with SSType of XNSSPPTypes.endReplySST. If one is received, return TRUE; otherwise return FALSE.
No ERRORS.
SendNow: PROC [self: IO.STREAM];
Like IO.Flush[self], except that Flush waits for the packets to be acknowledged and SendNow doesn't.
! ConnectionClosed
FlushInput: PROC [self: IO.STREAM, wait: BOOLFALSE]
RETURNS [bytesSkipped: LONG CARDINAL];
Discard some input. Return at the next EndOfStream position, or (if wait it FALSE) when the input queue is empty.
After calling FlushInput[..., wait~TRUE], self.EndOf[] is guaranteed to be TRUE (unless Timeout was raised).
! ConnectionClosed
! Timeout (only if wait = TRUE)
WaitAttention: PROC [self: IO.STREAM, waitTimeout: Milliseconds ← waitForever]
RETURNS
[AttentionType];
Wait for an out-of-band attention and return it.
! ConnectionClosed, Timeout
State: TYPE ~ { open, ssTypeChange, endOfMessage, attention };
GetStatus: PROC [self: IO.STREAM, reset: BOOLTRUE]
RETURNS [state: State, ssType: SubSequenceType, attentionType: AttentionType];
Return status of stream. The returned attentionType is meaningful only if state is attention. If reset is TRUE and the returned state is attention, endOfMessage or ssTypeChange then the EndOfStream condition due to that state is cleared.
! ConnectionClosed
SIGNALs and ERRORs
Timeout: SIGNAL;
Raised during input operation if no data arrives within the getTimeout interval specified when the stream was created (although the other end is responding to our probes).
Raised during output operation if no data can be sent within the putTimeout interval specified when the stream was created (although the other end is responding to our probes).
CloseReason: TYPE = {
notClosed, unknown, localClose, noResponse, noRoute, remoteReject, remoteClose, timeout };
ConnectionClosed: ERROR [why: CloseReason, text: Rope.ROPE];
Raised during an i/o operation if the stream has been closed by either end, or if the other end is not responding to our probes. Also raised when creating a stream if creation fails.
}.