Overview
This is a generic network stream package. It provides (most of) the functionality of a TCP, XNS or other network stream, with a uniform interface.
Transport providers register themselves by "protocolFamily" and "transportClass".
The protocol family implies the addressing domain, e.g. $XNS, $ARPA, $ISO. In this interface addresses are represented as ROPEs. The interpretation of these ROPEs is protocolFamily-specific
The transport class may be generic, describing a collection of supported functions, e.g. $basicStream, $SPPEmulation. It may also identify a specific transport protocol within the family, e.g. $SPP, $TCP, $TP4. A transport provider will probably register itself under more than one transport class, e.g. TCP will be registered under (at least) $basicStream and $TCP.
Warning: closing the output stream of the local stream pair does won't necessarily cause an immediate end-of-file on the input stream of the remote stream pair, even though the documentation in the IO interface suggests it should. The end-of-file isn't guaranteed to happen until both streams of the local stream pair have been closed.
Basic Types and Constants
ROPE: TYPE ~ Rope.ROPE;
STREAM: TYPE ~ IO.STREAM;
Error:
ERROR[which:
REF, codes:
LIST
OF
ATOM, msg:
ROPE];
Non-stream errors.
The which parameter is either NIL or the Listener or STREAM to which the error refers.
The list of codes is ordered from most general to most specific.
The msg parameter is human-readable.
Milliseconds: TYPE ~ CARD;
waitForever: Milliseconds ~ CARD.LAST;
dontWait: Milliseconds ~ 0;
Timeout:
READONLY
SIGNAL [which:
REF, codes:
LIST
OF
ATOM, msg:
ROPE];
Raised during input operation if no data arrives within the specified getTimeout interval (although the other end of a stream may be responding to our probes).
Raised during output operation if no data can be sent within the specified putTimeout interval (although the other end of a stream may be responding to our probes).
The which parameter is either NIL or the STREAM or Listener to which the signal applies.
The codes parameter is LIST[$timeout, $read, ...] or LIST[$timeout, $write, ...] or LIST[$timeout, $waitAttention, ...].
The msg parameter is human-readable.
If this signal is RESUMEd, it can be raised again after another timeout interval ...
If all timeouts are set to waitForever, Timeout will never be raised.
If a stream has signalTimeout set to FALSE, Timeout will never be raised.
GetIOErrorDetails:
PROC [which:
STREAM]
RETURNS [codes:
LIST
OF
ATOM, msg:
ROPE];
Client can call this proc from a catch phrase for IO.Error[...]; it will return a detailed description of the failure as in Error or Timeout above.
Basic Stream Operations
Every transport provider is expected to implement these.
CreateStreams:
PROC [protocolFamily:
ATOM, remote:
ROPE, transportClass:
ATOM ¬
NIL, timeout: Milliseconds ¬ waitForever, transportParameters:
REF ¬
NIL]
RETURNS [in:
STREAM, out:
STREAM];
Initiate connection to (remote) host.
If transportClass = NIL, use any registered transport in given protocol family.
The timeout argument affects stream creation only; the initial timeout value of the created streams is waitForever, but can be changed by SetTimeout[...].
The transportParameters argument is for transport-class-specific tuning, and may always be specified as NIL to get best-effort defaults.
! Error[...]
! Timeout[...]
GetStreamInfoProc: TYPE ~ PROC [stream: STREAM]
RETURNS [protocolFamily: ATOM, local: ROPE, remote: ROPE, transportClass: ATOM];
GetStreamInfo: GetStreamInfoProc ~
INLINE {
[protocolFamily, local, remote, transportClass] ¬ NARROW[stream.streamData, NetworkStreamData].procs.getStreamInfo[stream];
};
Get protocol family, local address, remote address (if connected), transport class.
GetTimeoutProc: TYPE ~ PROC [stream: STREAM] RETURNS [timeout: Milliseconds, signalTimeout: BOOL];
GetTimeout: GetTimeoutProc ~
INLINE {
[timeout, signalTimeout] ¬ NARROW[stream.streamData, NetworkStreamData].procs.getTimeout[stream];
};
SetTimeoutProc: TYPE ~ PROC [stream: STREAM, timeout: Milliseconds ¬ waitForever, signalTimeout: BOOL ¬ FALSE];
SetTimeout: SetTimeoutProc ~
INLINE {
NARROW[stream.streamData, NetworkStreamData].procs.setTimeout[stream, timeout, signalTimeout];
};
Get or set the timeout value associated with a stream.
If signalTimeout is TRUE, a timeout raises the (resumable) Timeout signal; if signalTimeout is FALSE, it raises IO.Error[Failure, self].
SendSoonProc: TYPE ~ PROC [out: STREAM, when: Milliseconds ¬ 0];
SendSoon: SendSoonProc ~
INLINE {
NARROW[out.streamData, NetworkStreamData].procs.sendSoon[out, when];
};
Ensure that at least all bytes that have been put to the stream prior to now are flushed (IO.Flush[...]) no later than (now+when).
! IO.Error[StreamClosed, ...]
Listener Operations
A transport provider is free not to implement listeners.
ListenerWorkerProc:
TYPE ~
PROC [listener: Listener, in:
STREAM, out:
STREAM];
CreateListener:
PROC [
protocolFamily: ATOM,
local: ROPE ¬ NIL,
transportClass: ATOM ¬ NIL,
transportParameters: REF ¬ NIL,
listenerWorkerProc: ListenerWorkerProc,
listenerWorkerClientData: REF ¬ NIL]
RETURNS [listener: Listener];
Create a listener at the specified local address (which may include a port).
If transportClass = NIL, use any registered transport in given protocol family.
The transportParameters argument is for transport-class-specific tuning, and may always be specified as NIL to get best-effort defaults.
The ListenerWorkerProc is called with a newly-created stream pair for each connection request. The stream timeouts are initially infinite, but can be changed with SetTimeout.
Note that in and out might be the same IO.STREAM.
! Error[...]
GetListenerInfoProc: TYPE ~ PROC [listener: Listener] RETURNS [protocolFamily: ATOM, local: ROPE, transportClass: ATOM, proc: ListenerWorkerProc, clientData: REF];
GetListenerInfo: GetListenerInfoProc ~
INLINE {
[protocolFamily, local, transportClass, proc, clientData] ¬ listener.procs.getListenerInfo[listener];
};
Get transport class, local address, worker proc and clientData associated with listener.
DestroyListenerProc: TYPE ~ PROC [listener: Listener];
DestroyListener: DestroyListenerProc ~
INLINE {
listener.procs.destroyListener[listener];
};
! No ERRORs
Specialized Stream Operations
A transport provider is free not to implement these completely.
AttentionType:
TYPE ~
CARD;
Data sent with in-band / out-of-band attention.
See StreamState.
SubStreamType:
TYPE ~
CARD;
Type data sent with contiguous substream. When this changes, IO.EndOfStream is raised.
See StreamState.
StreamState:
TYPE ~ {
open, -- normal state
remoteClosed, -- closed by other end
subStreamTypeChange, -- at SubStreamType change boundary
endOfMessage, -- at EndOfMessage boundary
attention -- at attention boundary
};
Change in stream state causes IO.EndOfStream, which must be cleared by calling GetStreamState with reset~TRUE, as described below.
Default (but possibly incomplete) implementations of the following are provided by every transport. They should work in usual cases, but may raise IO.Error[NotImplementedForThisStream, ...] in exceptional cases:
GetIndexDetailsProc: TYPE ~ PROC [stream: STREAM] RETURNS [index: INT, bufferIndex: INT, ackIndex: INT ¬ -1];
GetIndexDetails: GetIndexDetailsProc ~
INLINE {
[index, bufferIndex, ackIndex] ¬ NARROW[stream.streamData, NetworkStreamData].procs.getIndexDetails[stream];
};
The returned index value is like IO.GetIndex[stream].
The returned bufferIndex value reflects the other end of any internal buffering maintained by the (top-level) transport implementation:
(index <= bufferIndex) for an input stream;
(index >= bufferIndex) for an output stream;
(index = bufferIndex) immediately after IO.Flush[] for an output stream.
The ackIndex value is the index of the first unacknowledged byte. The transport may not implement this feature, in which case the default value -1 is always returned.
GetStreamStateProc: TYPE ~ PROC [in: STREAM, reset: BOOL ¬ TRUE] RETURNS [streamState: StreamState, subStreamType: SubStreamType, attentionType: AttentionType];
GetStreamState: GetStreamStateProc ~
INLINE {
[streamState, subStreamType, attentionType] ¬ NARROW[in.streamData, NetworkStreamData].procs.getStreamState[in, reset];
};
Return state of (input) stream. The returned attentionType is meaningful only if state is attention.
If reset is TRUE and the returned state is (attention, endOfMessage or subStreamTypeChange) then the EndOfStream condition due to that state is cleared. In this case the implementation may raise IO.Error[NotImplementedForThisStream, ...].
WaitAttentionProc: TYPE ~ PROC [in: IO.STREAM, timeout: Milliseconds ¬ waitForever] RETURNS [attentionType: AttentionType];
WaitAttention: WaitAttentionProc ~
INLINE {
attentionType ¬ NARROW[in.streamData, NetworkStreamData].procs.waitAttention[in, timeout];
};
Wait for an out-of-band attention and return it (see SendAttention).
If out-of-band is not implemented by the transport provider, this just waits (abortably) for the specified timeout.
! IO.Error[StreamClosed, ...]
! Timeout[...]
SetSubStreamTypeProc: TYPE ~ PROC [out: STREAM, subStreamType: SubStreamType];
SetSubStreamType: SetSubStreamTypeProc ~
INLINE {
NARROW[out.streamData, NetworkStreamData].procs.setSubStreamType[out, subStreamType];
};
Change the SubStreamType associated with the (output) stream. "Changing" to the current SubStreamType is a no-op.
An empty subsequence — two consecutive SubSequenceType change boundaries with no data in between — is possible.
If substream types are not implemented by the transport provider, then setting the type to a nonzero value may raise IO.Error[NotImplementedForThisStream, ...].
! IO.Error[StreamClosed, ...]
! Timeout[...]
The following may raise IO.Error[NotImplementedForThisStream, ...] in all cases:
WaitAckProc: TYPE ~ PROC [out: IO.STREAM, timeout: Milliseconds ¬ waitForever, index: INT ¬ -1];
WaitAck: WaitAckProc ~
INLINE {
NARROW[out.streamData, NetworkStreamData].procs.waitAck[out, timeout, index];
};
Wait for all data before the specified index to be acknowledged.
If index < 0, use current bufferIndex value (see GetIndexDetails).
! IO.Error[StreamClosed, ...]
! Timeout[...]
SendAttentionProc: TYPE ~ PROC [out: IO.STREAM, attentionType: AttentionType];
SendAttention: SendAttentionProc ~
INLINE {
NARROW[out.streamData, NetworkStreamData].procs.sendAttention[out, attentionType];
};
Send an attention packet with the given attentionType as its data contents.
The attention is sent both out-of-band and in-band.
At the receiving end, an out-of-band attention is detected using WaitAttention[self]; an in-band attention causes an end-of-file condition, which must be cleared using GetStatus[..., reset~TRUE]. When the in-band attention is cleared, the out-of-band attention is deleted as well.
! Error[StreamClosed]
! Timeout
SendEndOfMessageProc: TYPE ~ PROC [out: STREAM];
SendEndOfMessage: SendEndOfMessageProc ~
INLINE {
NARROW[out.streamData, NetworkStreamData].procs.sendEndOfMessage[out];
};
If possible, set end-of-message at current output position and do SendNow[out].
! Error[StreamClosed]
! Timeout
Implementation Details and Registration
RegisteredCreateStreamsProc:
TYPE ~
PROC [registration: Registration, remote:
ROPE, timeout: Milliseconds, transportParameters:
REF]
RETURNS [in:
STREAM, out:
STREAM];
Client implementation of CreateStreams (called by CreateStreams). It should:
Call CreateUninitializedStreams[registration].
Set up NetworkStreamDataRecord data fields.
Establish the connection.
It's okay to call RaiseIOError[...] on either stream here, or to raise Error[...].
FinalizeStreamProc:
TYPE ~
PROC [stream:
STREAM];
Called by PFinalize when client has dropped a stream. The stream is about to disappear; this proc should recover any resources allocated to it.
RegisteredCreateListenerProc:
TYPE ~
PROC [registration: Registration, local:
ROPE, transportParameters:
REF, listenerWorkerProc: ListenerWorkerProc, listenerWorkerClientData:
REF]
RETURNS [listener: Listener];
Client Implementation of CreateListener (called by CreateListener). It should:
Call CreateUninitializedListener[registration].
Set up its ListenerRecord data field.
Start listening.
It's okay to raise Error[...] here.
FinalizeListenerProc:
TYPE ~
PROC [listener: Listener];
Called by PFinalize when client has dropped a listener. The listener is about to disappear; this proc should recover any resources allocated to it.
NetworkStreamProcs: TYPE ~ REF NetworkStreamProcsRecord;
NetworkStreamProcsRecord:
TYPE ~
RECORD [
getStreamInfo: GetStreamInfoProc ¬ NIL,
getTimeout: GetTimeoutProc ¬ NIL,
setTimeout: SetTimeoutProc ¬ NIL,
sendSoon: SendSoonProc ¬ NIL,
getIndexDetails: GetIndexDetailsProc ¬ NIL,
getStreamState: GetStreamStateProc ¬ NIL,
setSubStreamType: SetSubStreamTypeProc ¬ NIL,
sendEndOfMessage: SendEndOfMessageProc ¬ NIL,
waitAttention: WaitAttentionProc ¬ NIL,
sendAttention: SendAttentionProc ¬ NIL,
waitAck: WaitAckProc ¬ NIL,
finalizeStream: FinalizeStreamProc ¬ NIL
];
NetworkStreamData: TYPE ~ REF NetworkStreamDataRecord;
NetworkStreamDataRecord:
TYPE ~
RECORD [
registration: Registration,
procs: NetworkStreamProcs,
remote: ROPE ¬ NIL,
local: ROPE ¬ NIL,
timeout: Milliseconds ¬ waitForever,
signalTimeout: BOOL ¬ FALSE,
pendingErrors: REF ¬ NIL,
sendSoonIndex: INT ¬ -1,
sendSoonNext: STREAM ¬ NIL,
data: REF ¬ NIL,
extension: REF ¬ NIL
];
Listener: TYPE ~ REF ListenerRecord;
ListenerRecord:
TYPE ~
RECORD [
registration: Registration,
procs: ListenerProcs,
local: ROPE ¬ NIL,
listenerWorkerProc: ListenerWorkerProc ¬ NIL,
listenerWorkerClientData: REF ¬ NIL,
data: REF ¬ NIL,
extension: REF ¬ NIL
];
ListenerProcs: TYPE ~ REF ListenerProcsRecord;
ListenerProcsRecord:
TYPE ~
RECORD [
getListenerInfo: GetListenerInfoProc,
destroyListener: DestroyListenerProc,
finalizeListener: FinalizeListenerProc ¬ NIL
];
Registration: TYPE ~ REF RegistrationRecord;
RegistrationRecord:
TYPE ~
RECORD [
protocolFamily: ATOM,
transportClass: ATOM,
createStreams: RegisteredCreateStreamsProc,
inStreamProcs: REF IO.StreamProcs,
inNetworkStreamProcs: NetworkStreamProcs,
outStreamProcs: REF IO.StreamProcs,
outNetworkStreamProcs: NetworkStreamProcs,
createListener: RegisteredCreateListenerProc,
listenerProcs: ListenerProcs ¬ NIL,
clientData: REF ¬ NIL
];
RaiseIOError:
PROC [ec:
IO.ErrorCode, stream:
STREAM, codes:
LIST
OF
ATOM, msg:
ROPE];
To be called by transport providers to raise IO.Error, NetworkStream.Error or Timeout as appropriate:
If ec # Failure it raises IO.Error[ec, ...], and the handler can use GetIOErrorDetails to access codes and msg.
If ec = Failure and codes.first = $timeout, it raises Timeout[...] if stream has signalTimeout = TRUE, and IO.Error[Failure, ...] otherwise.
IF ec = Failure and codes.first = $networkStreamError, it raises Error[stream, codes.rest, msg].
Else it raises Error[Failure, ...], and the handler can use GetIOErrorDetails to access codes and msg.
Note this proc usually raises an ERROR, and thus does not return. However, a call to RaiseIOError[Failure, LIST[$timeout, ...], ...] can return if the resulting Timeout[...] signal is RESUMEd.
CreateUninitializedStreams:
PROC [registration: Registration]
RETURNS [in:
STREAM, out:
STREAM];
To be called by transport providers to create a pair of STREAM objects. The resulting streams are initialized from the Registration as much as possible. The client is expected to fill in the NetworkStreamDataRecord remote and data fields and do whatever is necessary to finish opening the streams.
CreateUninitializedListener:
PROC [registration: Registration]
RETURNS [listener: Listener];
To be called by transport providers to create a Listener object. The resulting listener is initialized from the Registration as much as possible. The client is expected to fill in the ListenerRecord local and data fields and do whatever is necessary to start up the listener.
Register:
PROC [protocolFamily:
ATOM, transportClass:
ATOM, proc: RegistrationCallbackProc];
Register a transport implementation.
RegistrationCallbackProc:
TYPE ~
PROC [old: Registration]
RETURNS [new: Registration];
Called with old Registration for transport class (NIL if none exists); should return new Registration, which may be equal to the old one or may be NIL to delete the registration.
}.