PupStream.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Andrew Birrell May 12, 1983 10:55 am
Hal Murray, June 5, 1986 3:11:25 am PDT
DIRECTORY
IO USING [STREAM],
Pup USING [Address, Socket],
Rope USING [ROPE];
PupStream: CEDAR DEFINITIONS = {
Overview
Unlike most streams in Cedar, PupStreams are "inputOutput": It is normal to both read and write an instance of a Pup Stream. Chat and FTP, for example, send and receive data via the same stream.
You should NOT have more than one process doing Gets or more than one process doing Puts concurrently. ConsumeMark counts as a Get, and SendMark counts as a Put. Do not Close the stream from two processes concurrently. A second consecutive Close is ok.
Beware: If you only have one process talking to a Pup stream, you can easily get into buffer lockup problems. This happens when both machines are trying to send but all the input buffers (on both machines) are full. Both ends get stuck waiting for an allocation. Since there is only one process at each end, and that's trying to send, nobody will ever read anything to free up a buffer, so the allocation will never arrive.
One process will work if the user and server both operate in lock step. FTP and Grapevine operate this way. This is in contrast to Telnet where either end can start sending at any time.
You need yet another process if you are using Attentions.
Reading a MARK byte causes an end-of-file condition. Use ConsumeMark to read beyond it. Think of this as a tape containing several files.
Types
MARK: TYPE = [0..100H);
STREAM: TYPE = IO.STREAM;
ROPE: TYPE = Rope.ROPE;
Milliseconds: TYPE = INT;
waitForever: Milliseconds = INT.LAST;
Byte Stream Interface
Create: PROC [remote: Pup.Address, getTimeout, putTimeout: Milliseconds] RETURNS [STREAM];
See SocketsFromStream if you want to know the local or remote address. (The remote address will be different than what you asked for if a listener answers.)
Opening a connection may take a long time - over a minute. ABORT the process if you get impatient.
Errors: StreamClosing.
CheckConnection: PROC [STREAM];
Raises StreamClosing if the stream has been closed and there is no more input data in the buffers.
Errors: StreamClosing.
Abort: PROC [STREAM, ROPE];
Smashes the stream into a closed state. Any Get/Put operations will be terminated, probably with a StreamClosing error. (GetBlock may return a short block.) You still need to call IO.Close.
Errors: None.
Push: PROC [STREAM];
Sends the current buffer but doesn't wait for it to be acked. (IO.Flush does wait.)
Errors: None.
SendMark: PROC [STREAM, MARK]; -- Includes a Push
ConsumeMark: PROC [STREAM] RETURNS [MARK]; -- Skip all input until the next MARK
Errors: StreamClosing, Timeout.
SendAttention: PROC [STREAM];
Sends asynchronous "interrupt" on stream.
WaitAttention: PROC [STREAM];
Waits until other end sends an "interrupt". Use Process.Abort if you want to get out.
Errors: StreamClosing.
Summary of IO operations
IO.GetChar
Errors: IO.EndOfStream, StreamClosing, Timeout.
IO.GetBlock, IO.UnsafeGetBlock
Returns a short block on EOF (MARK).
Errors: StreamClosing, Timeout.
IO.EndOf
Errors: StreamClosing. (This is a handy way to check the state of the stream.)
IO.CharsAvail
EOF (MARK) => INT.LAST
Errors: StreamClosing, Timeout (IF wait is TRUE).
IO.PutChar
Errors: StreamClosing, Timeout.
IO.PutBlock, IO.UnsafePutBlock
Errors: StreamClosing, Timeout.
IO.Flush
This waits until the remote server acknowledges all the data that has been buffered so far. That doesn't guarantee the client process will ever see it, only that it has been buffered at the remote machine. PupStream.Push can be used to force any partial buffer out of local memory without waiting for it to get acknowledged.
Errors: StreamClosing, Timeout.
IO.Close
If the stream is still alive and abort is FALSE, Close tries to Flush any remaining data. It catches and ignores any errors.
Reminder: There is no way for the communication package to guarantee that the application at the remote machine ever gets to see any data leftover in the local send buffers. If that is important to your application, you must do your own application level handshaking before calling Close.
Errors: None.
ERRORs
IO.EndOfStream will be raised during input operations when GetChar encounters a Mark byte.
Timeout: SIGNAL;
RESUME to keep trying.
Timeout is raised during input operations if no data arrives within the interval specified when the stream was created (even though the other end is responding to our probes). A GetBlock may take much longer than a single timeout if data is arriving in dribbles. CharsAvail is an input operation.
Timeout is raised during output operations if no data is sent within the interval specified when the stream was created (even though the other end is responding to our probes). A PutBlock may take much longer than a single timeout if allocates are arriving in dribbles. Flush is an output operation.
Close includes a Flush if the stream is still healthy, so it may take a while. Any Timeout will be ignored.
If a Timeout happens on a block operation, you can't tell how much data has been transfered. So far, nobody has wanted to know.
CloseReason: TYPE = {
localClose, localAbort, remoteClose, noRouteToNetwork, transmissionTimeout, remoteReject};
StreamClosing: ERROR [why: CloseReason, text: Rope.ROPE];
StreamClosing is raised during an IO operation if the stream has been aborted or closed by either end, or if the other end is not responding to our probes. It is also raised when creating a stream if there is no route to the remote end or it doesn't respond to our connection attempts.
Listener Interface
Listener: TYPE = REF ListenerRep;
ListenerRep: TYPE;
FilterProc: TYPE = PROC [clientData: REF ANY, remote: Pup.Address] RETURNS [reject: ROPENIL];
(reject=NIL) => Accept
ListenerProc: TYPE = PROC [stream: STREAM, clientData: REF ANY, remote: Pup.Address];
CreateListener: PROC [
local: Pup.Socket, -- Must be WellKnown
worker: ListenerProc,
getTimeout, putTimeout: Milliseconds,
clientData: REF ANYNIL,
filter: FilterProc ← NIL, -- NIL => Accept all requests
echoFilter: FilterProc ← NIL] -- NIL => Answer all echos
RETURNS [Listener];
Note: Local must be a small number. All the common values are included in PupWKS. All the values defined in PupWKS are WellKnown. PupName.IsWellKnown can be used to test other values.
Note: You cannon create a Listener using an arbitrary local socket. If that becomes interesting we should make an Extras interface.
An active Listener will respond to each Request For Connection packet (RFC) by:
1) Checking for duplicates.
2) Calling the clients filter. Returning any non-NIL value will reject the connection.
3) Creating a stream with the specified timeouts.
4) FORKing a new instance of worker to interact with the new stream.
5) Detaching the new process.
If filter returns TRUE, a worker will always be FORKed. (All error conditions have already been checked.) worker must catch StreamClosing and Timeout (unless waitForever was specified) and it should Close the stream before returning.
echoFilter is handy when several instances of a server run on different machines. Grapevine, for example, flings an echo packet at all the interesting servers, and then tries to open a connection to the first one that answers. echoFilter can be used to answer "no" if filter is very likley to reject a connection attempt. (There is no promise that filter or worker will be called if echoFilter doesn't reject a packet.)
Errors: SocketNotWellKnown if local is too big.
DestroyListener: PROC [Listener];
Errors: None.
Rendezvous
There are three ways to open a stream. Create is the user side of the normal user to server case. CreateListener is the corresponding server side. The third case is for two processes to Rendezvous. In order to rendezvous a back door communications path, such as RPC, is needed to pass the local socket number to the other process.
WaitForRendezvous is similar to Listening. It will only answer the first connection attempt. Connection requests that don't match the remote address will be rejected. Match checks each field separately. Each "null" field will match anything. (Strange connection requests are pretty unlikely since most of the world doesn't even know that our local socket number exists.)
Either Create or ActivelyRendezvous can be used to establish contact when the other process has called WaitForRendezvous. Use ActivelyRendezvous (and LocalAddress and more back door communicating) if you want to do host filtering at the other end.
Sockets: TYPE = REF SocketsRep;
SocketsRep: TYPE;
AllocateSocket: PROC [remote: Pup.Address] RETURNS [Sockets];
A Sockets can only be "used" once by passing it to WaitForRendezvous or ActivelyRendezvous.
Errors: None.
LocalAddress: PROC [Sockets] RETURNS [Pup.Address];
RemoteAddress: PROC [Sockets] RETURNS [Pup.Address];
SocketsFromStream: PROC [STREAM] RETURNS [Sockets];
Errors: None.
WaitForRendezvous: PROC [sockets: Sockets, getTimeout, putTimeout, waitTimeout: Milliseconds] RETURNS [STREAM];
Errors: SocketsAlreadyUsed.
ActivelyRendezvous: PROC [sockets: Sockets, getTimeout, putTimeout: Milliseconds] RETURNS [STREAM];
Errors: SocketsAlreadyUsed, StreamClosing.
Sample Rendezvous sequence:
Process A:   Process B:
sockets: PupStream.Sockets ← PupStream.AllocateSocket[hisAddressWithNullSocket];
me: Pup.Address ← PupStream.LocalAddress[sockets];
   him: Pup.Address;
 Now use your back door to do ProcessB.him ← ProcessA.me;
stream ← PupStream.WaitForRendezvous[sockets, 1000, 1000];
 Be sure to call WaitForRendezvous on ProcessA
 before trying to Create the stream on ProcessB
   stream ← PupStream.Create[him, 1000, 1000];
Errors
SocketNotWellKnown: ERROR;
SocketsAlreadyUsed: ERROR;
}.