XNSStreamImpl.mesa
Copyright Ó 1989, 1991 by Xerox Corporation. All rights reserved.
Tim Diebert: November 21, 1989 6:06:18 pm PST
Willie-s, December 9, 1991 6:58 pm PST
DIRECTORY
Basics USING [BITAND, BITLSHIFT, BITOR, BITRSHIFT, ByteBlt, FillBytes, RawBytes, RawChars],
CStrings USING [CString],
IO USING [CreateStream, CreateStreamProcs, EndOfStream, PutFR, PutFR1, STREAM, StreamProcs, UnsafeBlock],
IOUtils USING [AmbushStream, closedStreamProcs],
MentatInterface USING [maxXNSPacketSize, xnsSPPDevice],
Process USING [Detach, EnableAborts, MsecToTicks, SetTimeout, Ticks],
Rope USING [Concat, ROPE],
RuntimeError USING [BoundsFault],
TLI USING [CONNECT, ConnectionData, ConnectionDataRep, DATA, DISCONNECT, EXDATA, EXPEDITED, LISTEN, MORE, ORDREL, States, TBind, TBindCall, TCall, TClose, TConnect, TERROR, TGetState, TLook, TOpen, TRcv, TRcvRel, TSnd, TSndDis, TSndRel, UDERR ],
UXStrings USING [Create],
UnixErrno USING [Errno, GetErrno],
UnixTypes USING [CHARPtr],
XNS USING [Address, IsMulticastHost],
XNSSPPTypes USING [defaultSST, SubSequenceType],
XNSStream USING [AttentionType, CloseReason, dontWait, Milliseconds, State, waitForever];
XNSStreamImpl: CEDAR MONITOR
LOCKS handle USING handle: Handle
IMPORTS Basics, IO, IOUtils, Process, Rope, RuntimeError, TLI, UXStrings, UnixErrno, XNS
EXPORTS XNSStream
~ BEGIN
Copied types
ROPE: TYPE ~ Rope.ROPE;
Milliseconds: TYPE ~ XNSStream.Milliseconds; -- TYPE ~ CARD32;
waitForever: Milliseconds ~ XNSStream.waitForever; -- 1800000 1/2 hour of milliseconds
dontWait: Milliseconds ~ XNSStream.dontWait; -- 0;
CloseReason: TYPE ~ XNSStream.CloseReason;
State: TYPE ~ XNSStream.State;
SubSequenceType: TYPE ~ XNSSPPTypes.SubSequenceType;
defaultSST: SubSequenceType ~ XNSSPPTypes.defaultSST;
AttentionType: TYPE ~ XNSStream.AttentionType; -- BYTE;
STREAM: TYPE ~ IO.STREAM;
Local types
While there is a lot of mention made here about packets, it is really an attempt to convince the Mentat kernel code to talk to us with 'packets'. This stuff is set up this way based upon some understanding of the kernel code as it is today, November 9, 1989. If the kernel changes in some significant way, all of this should continue to work, but with some performance changes. You decide which way it changed.
BufferIndex: TYPE ~ [0 .. MentatInterface.maxXNSPacketSize];
BufferState: TYPE ~ { empty, emptying, filling, full };
Buffer: TYPE ~ REF BufferRep;
BufferRep: TYPE ~ RECORD [
next: Buffer ¬ NIL, -- the next one on the chain. This is so that we can get more buffers filled ahead.
state: BufferState ¬ empty,
attention: BOOL ¬ FALSE, -- does the buffer contain expidited data
sequence: INT ¬ 0, -- the inband attention sequence number. This is used to remove OOB attentions from the queue
eom: BOOL ¬ FALSE, -- was eom set in this packet
sst: SubSequenceType ¬ 0, -- the data stream type of this packet
index: BufferIndex ¬ 0,
bytes: BufferIndex ¬ 0,
buffer: POINTER, -- conviently setup for us ahead of time
charPTR: UnixTypes.CHARPtr, -- also set up for us
bufferRef: REF TEXT, -- so we can hold onto the bytes
flags: REF INT -- this is here so we dont allocate one on each call to ReadBuffer
];
OBAttn: TYPE ~ REF OBAttnObject;
OBAttnObject: TYPE ~ RECORD [
next: OBAttn,
sequence: INT, -- which attention packet is this. The idea is that this number is assigned by us rather than by the packet sequence number. It's used to correlate both the OOB and in band copy.
attentionType: AttentionType
];
Calls: TYPE ~ REF CallsRep;
CallsRep: TYPE ~ RECORD [
reqTBind: REF TLI.TBindCall, -- what socket should try to use during TBind
reqTBindAddress: REF XNS.Address, -- the storage behind the reqTBind
retTBind: REF TLI.TBindCall, -- what socket did we actually bind to
retTBindAddress: REF XNS.Address, -- the storage behind the retTBind
sendTCall: REF TLI.TCall, -- used as the requested address of the TConnect
sendTCallAddress: REF XNS.Address, -- the storage behind sendTCall
rcvTCall: REF TLI.TCall, -- used as the answer from TConnect
rcvTCallAddress: REF XNS.Address -- the storage behind rcvTCall
];
Handle: TYPE ~ REF HandleRep;
HandleRep: TYPE ~ MONITORED RECORD [
next: Handle ¬ NIL,
cd: TLI.ConnectionData,
calls: Calls, -- the various call data structures we need
local: XNS.Address ¬ NULL, -- what socket are we using
remote: XNS.Address ¬ NULL, -- who are we connected to
closed: BOOL ¬ FALSE, -- for later calls to do things on this stream
closeReason: CloseReason ¬ unknown,
closeText: ROPE ¬ NIL,
Auxiliary processes associated with stream:
auxProcCnt: CARDINAL ¬ 0,
auxProcsDone: CONDITION,
Timeouts:
getTimeout: Milliseconds ¬ 0,
putTimeout: Milliseconds ¬ 0,
sendSoonEvent: CommTimer.Event ← CommTimer.nullEvent,
Input stuff
currentInputBuffer: Buffer ¬ NIL, -- the buffer we are working on
currentInputSST: SubSequenceType ¬ defaultSST, -- what the sst is now
inBuffers: Buffer, -- the input buffer chain. This contains all buffers that contain data to be passed to the client
inFreeBuffers: Buffer, -- the free buffers available for input
inputWaiting: BOOL ¬ FALSE, -- are we currently in ReceiveBuffer
inputReady: CONDITION,
inputBufferFree: CONDITION, -- this indicates that there is a newly freed buffer
outOfBandAttnNumber: INT ¬ 0, -- this number is assigned to and then incremented whenever an out of band attention is received. This number should never be found in the recvdOBAttnList since that means we wrapped around
inBandAttnNumber: INT ¬ 0, -- this number is assigned to the inband copy of the attention packet. It is used to locate the oob attention to kill when this one is delivered.
recvdOBAttnList: OBAttn ¬ NIL, -- Out-of-Band Attentions received and not yet read by WaitAttention
recvdOBAttn: CONDITION, -- wake up for WaitAttention
inputSSType: SubSequenceType ¬ 0, -- SubSequenceType last seen by the stream client
Output stuff
currentOutputBuffer: Buffer ¬ NIL, -- the buffer we are working on
currentOutputSST: SubSequenceType ¬ defaultSST, -- what the sst is now
outputBuffers: Buffer, -- the output buffer list. These are ready for sending
outputFreeBuffers: Buffer, -- the list of unused output buffers
outputReady: CONDITION,
outputBufferFree: CONDITION, -- indicates a buffer is available for output
flusher: CONDITION, -- there is a process watching us fill up buffers. When this fires, the buffer gets sent
Spare
spare: REF ¬ NIL -- for no good reason
];
ByteArrayPtr: TYPE = LONG POINTER TO Basics.RawBytes;
Local constants
packetSize: INT ~ MentatInterface.maxXNSPacketSize; -- for now, only deal with TRcv and TSnd calls using this size buffer. This size also determines the amount of storage in the buffer.buffer part.
flushTime: Process.Ticks ~ Process.MsecToTicks[500]; -- flush every 1/2 second
Signals and Errors
ConnectionClosed: PUBLIC ERROR [why: CloseReason, text: Rope.ROPE] ~ CODE;
Timeout: PUBLIC SIGNAL ~ CODE;
BadSysCall: ERROR [why: ROPE] ~ CODE;
LookEvent: ERROR [lookEvent: INT] ~ CODE;
SystemError: ERROR ~ CODE;
Stream Procs
streamProcs: REF IO.StreamProcs;
CreateStreamProcs: PROC ~ {
streamProcs ¬ IO.CreateStreamProcs [
variety: $inputOutput, class: $XNS,
getChar: GetChar,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
endOf: EndOf,
charsAvail: CharsAvail,
putChar: PutChar,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
close: Close
];
};
GetChar: PROC [self: STREAM] RETURNS [CHAR] = TRUSTED {
buff: PACKED ARRAY [0..3] OF CHAR;
bp: ByteArrayPtr = LOOPHOLE[LONG[@buff]];
IF UnsafeGetBlock[self, [base: bp, startIndex: 0, count: 1]] = 0 THEN
ERROR IO.EndOfStream[self];
RETURN[buff[0]]
};
GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT] = TRUSTED {
nBytesRead ¬ UnsafeGetBlock[self, [
base: LOOPHOLE[block, ByteArrayPtr]+SIZE[TEXT[0]],
startIndex: startIndex,
count: MAX[MIN[INT[count], INT[block.maxLength]-startIndex], 0] ]];
block.length ¬ startIndex + nBytesRead;
RETURN[nBytesRead];
};
UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock]
RETURNS
[nBytesRead: INT ¬ 0] ~ {
handle: Handle ~ NARROW[self.streamData];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
TRUSTED {nBytesRead ¬ EntryUnsafeGetBlock[handle: handle, block: block]};
RETURN[nBytesRead];
};
EntryUnsafeGetBlock: ENTRY PROC [handle: Handle, block: IO.UnsafeBlock]
RETURNS [nBytesRead: INT ¬ 0] ~ { -- with all these things that change the state kept by the handle, we best be in a monitor
ENABLE UNWIND => NULL;
startIndex: INT ¬ block.startIndex;
stop: INT ~ block.startIndex+block.count;
nBytes: NAT;
buffer: Buffer ¬ handle.currentInputBuffer;
WHILE startIndex < stop DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF buffer = NIL THEN { -- we need to get a buffer
buffer ¬ GetFirstInputBuffer[handle: handle]; -- this will block until a packet arrives
handle.currentInputBuffer ¬ buffer; -- so that we know where we are.
PrintF[IO.PutFR1["buffer = %g\n", [text[buffer.bufferRef]]]];
};
IF buffer.attention THEN RETURN[nBytesRead]; -- pass along any of the bytes that might have already been read into the block. If this is really the buffer from the previous call, it's ok since nBytesRead will still be 0. Let the caller worry about EndOfStream.
IF buffer.sst # handle.currentInputSST THEN RETURN[nBytesRead]; -- the currentSST isn't updated until the client calls GetStatus[..., reset: TRUE] to get passed it. This means EndOfStream
IF buffer.index = buffer.bytes THEN { -- this buffer is used up. Check for EOM
IF buffer.eom
THEN RETURN [nBytesRead] -- no more bytes in the packet and it's EOM
ELSE { -- we need to get more bytes since this buffer is empty and it's not EOM
FreeInputBuffer[handle: handle, buffer: buffer]; -- give back the empty one
buffer ¬ NIL; -- This is really cheating since the buffer = NIL check is going to fetch a fresh one for us when we go around
LOOP;
};
};
Once we get here we should have a buffer that has bytes left in it and there are no conditions that will create EndOfStream
TRUSTED {
nBytes ¬ Basics.ByteBlt[
to: [
blockPointer: block.base,
startIndex: startIndex,
stopIndexPlusOne: stop],
from: [
blockPointer: buffer.buffer,
startIndex: buffer.index,
stopIndexPlusOne: buffer.bytes]
];
};
We copied nBytes worth of data out of the packet. Update the counts and go around again.
buffer.index ¬ buffer.index + nBytes;
nBytesRead ¬ nBytesRead + nBytes;
startIndex ¬ startIndex + nBytes;
ENDLOOP; -- WHILE startIndex < stop
};
EndOf: PROC [self: STREAM] RETURNS [BOOL] ~ {
handle: Handle ~ NARROW[self.streamData];
RETURN [EntryEndOf[handle]];
};
EntryEndOf: ENTRY PROC [handle: Handle] RETURNS [BOOL] ~ {
ENABLE UNWIND => NULL;
buffer: Buffer ~ handle.currentInputBuffer;
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF buffer = NIL THEN RETURN[FALSE]; -- I'm not really sure what this means
RETURN [ buffer.attention OR
buffer.sst # handle.currentInputSST OR
((buffer.index = buffer.bytes) AND buffer.eom) ];
};
CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [nChars: INT] ~ {
handle: Handle ~ NARROW[self.streamData];
RETURN[EntryCharsAvail[handle: handle, wait: wait]];
};
EntryCharsAvail: ENTRY PROC [handle: Handle, wait: BOOL] RETURNS [INT] = {
ENABLE UNWIND => NULL;
DO
buffer: Buffer ~ handle.currentInputBuffer;
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF buffer = NIL THEN { -- there isn't a buffer now
IF NOT wait THEN RETURN[0]; -- there aren't any bytes
WAIT handle.inputReady;
LOOP; -- there should be one now
};
IF buffer.attention OR buffer.sst # handle.currentInputSST THEN RETURN[LAST[INT]]; -- GetChar will raise EndOfStream
IF buffer.index < buffer.bytes THEN RETURN [(buffer.bytes - buffer.index)];
IF buffer.eom THEN RETURN [LAST[INT]];
ENDLOOP;
};
PutChar: PROC [self: STREAM, char: CHAR] = TRUSTED {
buff: PACKED ARRAY [0..3] OF CHAR;
bp: ByteArrayPtr ¬ LOOPHOLE[LONG[@buff]];
buff[0] ¬ char;
UnsafePutBlock[self, [base: bp, startIndex: 0, count: 1]];
};
UnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ {
handle: Handle ~ NARROW[self.streamData];
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
TRUSTED {EntryUnsafePutBlock[handle: handle, block: block]};
};
EntryUnsafePutBlock: ENTRY PROC [handle: Handle, block: IO.UnsafeBlock] ~ {
startIndex: INT ¬ block.startIndex;
stop: INT ~ block.startIndex+block.count;
nBytes: NAT;
buffer: Buffer ¬ handle.currentOutputBuffer;
WHILE startIndex < stop DO
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF buffer = NIL THEN { -- we need to get a buffer
buffer ¬ AllocateOutputBuffer[handle: handle]; -- this will block one is free
handle.currentOutputBuffer ¬ buffer; -- so that we know where we are.
buffer.index ¬ 0; -- insure that the buffer is empty before we start filling it up
buffer.bytes ¬ LAST[BufferIndex]; -- set the last byte position
};
TRUSTED {
nBytes ¬ Basics.ByteBlt[
to: [
blockPointer: buffer.buffer,
startIndex: buffer.bytes,
stopIndexPlusOne: LAST[BufferIndex]],
from: [
blockPointer: block.base,
startIndex: startIndex,
stopIndexPlusOne: stop]
];
};
buffer.index ¬ buffer.index + nBytes;
startIndex ¬ startIndex + nBytes;
IF buffer.index = buffer.bytes THEN { -- the buffer is full
buffer.state ¬ full;
buffer.attention ¬ FALSE; -- this isn't an attention byte
buffer.eom ¬ FALSE; -- not eom either
buffer.sst ¬ handle.currentOutputSST; -- set the SST. The currentOutputSST is changed by SetSSType.
AppendOutputBuffer[handle: handle, buffer: buffer];
buffer ¬ NIL; -- so that we get a fresh one
LOOP; -- go around again to see if we got the whole block
};
ENDLOOP; -- WHILE startIndex < stop
};
Flush: PROC [self: STREAM] ~ {
handle: Handle ~ NARROW[self.streamData];
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
};
Close: PROC [self: STREAM, abort: BOOL] ~ {
handle: Handle ~ NARROW[self.streamData];
OrderlyClose[handle: handle, initiate: TRUE];
EntryNotifyClosed[handle: handle, closeReason: localClose, closeText: "close called"];
CloseTheStream[self: self];
};
BlastClose: PROC [handle: Handle] = {
cd: TLI.ConnectionData ~ handle.cd;
[] ¬ TLI.TClose[cd: cd];
};
OrderlyClose: PROC [handle: Handle, initiate: BOOL] = {
cd: TLI.ConnectionData ~ handle.cd;
state: TLI.States;
IF initiate THEN [] ¬ TLI.TSndRel[cd: cd];
state ¬ TLI.TGetState[cd: cd];
SELECT state FROM
inrel => [] ¬ TLI.TRcvRel[cd: cd];
unbnd, idle, outrel => NULL;
ENDCASE => [] ¬ TLI.TSndDis[cd: cd, call: NIL];
[] ¬ TLI.TClose[cd: cd];
RETURN;
};
CloseTheStream: PROC [self: STREAM] = {
IF self.streamProcs = IOUtils.closedStreamProcs THEN RETURN;
IOUtils.AmbushStream[self: self, streamProcs: IOUtils.closedStreamProcs,
streamData: self.streamData];
};
NotifyClosed: INTERNAL PROC [handle: Handle, closeReason: CloseReason, closeText: ROPE] ~ {
IF NOT handle.closed THEN {
handle.closeReason ¬ closeReason;
handle.closeText ¬ closeText;
handle.closed ¬ TRUE;
};
BROADCAST handle.inputReady;
BROADCAST handle.inputBufferFree;
BROADCAST handle.recvdOBAttn;
BROADCAST handle.outputReady;
BROADCAST handle.outputBufferFree;
BROADCAST handle.flusher;
};
EntryNotifyClosed: ENTRY PROC [handle: Handle, closeReason: CloseReason, closeText: ROPE] ~ {
ENABLE UNWIND => NULL;
NotifyClosed[handle, closeReason, closeText]
};
Byte Stream Interface
createBuffers: CARDINAL ¬ 6;  -- ????
Create: PUBLIC PROC [remote: XNS.Address, getTimeout: Milliseconds ¬ waitForever, putTimeout: Milliseconds ¬ waitForever] RETURNS [IO.STREAM] = {
handle: Handle ¬ NIL;
cd: TLI.ConnectionData ~ AllocateCD[];
calls: Calls ~ AllocateCalls[];
path: CStrings.CString ~ UXStrings.Create[MentatInterface.xnsSPPDevice];
return: INT;
retTBind: REF TLI.TBindCall ~ calls.retTBind;
sendCall: REF TLI.TCall ~ calls.sendTCall;
rcvCall: REF TLI.TCall ~ calls.rcvTCall;
calls.sendTCallAddress­ ¬ remote; -- load up the address we want to talk to
IF XNS.IsMulticastHost[remote.host] -- check to see if it's multicast
THEN ERROR ConnectionClosed[noRoute, "multicast remote"];
IF getTimeout # waitForever THEN
ERROR ConnectionClosed[localClose, "getTimeout # forever"];
IF putTimeout # waitForever THEN
ERROR ConnectionClosed[localClose, "putTimeout # forever"];
IF TLI.TOpen[cd: cd, path: path, flags: [access: RDWR]] < 0 THEN { -- can't open the device
error: ROPE ~ IO.PutFR["TOpen failed: cd.errno = %g, errno = %g", [integer[LOOPHOLE[cd.errno]]], [integer[LOOPHOLE[UnixErrno.GetErrno[]]]]];
ERROR ConnectionClosed[localClose, error];
};
IF TLI.TBind[cd: cd, request: NIL, return: retTBind] < 0 THEN { -- can't bind to any socket
error: ROPE ~ IO.PutFR["TBind failed: cd.errno = %g, errno = %g", [integer[LOOPHOLE[cd.errno]]], [integer[LOOPHOLE[UnixErrno.GetErrno[]]]]];
[] ¬ TLI.TClose[cd: cd];
ERROR ConnectionClosed[localClose, error];
};
return ¬ TLI.TConnect[cd: cd, sndCall: sendCall, rcvCall: rcvCall];
IF return < 0 THEN {
error: ROPE ¬ IO.PutFR["TConnect failed: cd.errno = %g, errno = %g ", [integer[LOOPHOLE[cd.errno]]], [integer[LOOPHOLE[UnixErrno.GetErrno[]]]]];
IF cd.errno = TLOOK THEN {
IsEvent: PROC [event: INT] RETURNS [BOOL] = INLINE {
RETURN[ Basics.BITAND[events, event] # 0];
};
events: INT ¬ TLI.TLook[cd: cd];
error ¬ IO.PutFR["%g\nTLook event = %g\n", [rope[error]], [integer[events]]];
IF IsEvent[TLI.LISTEN] THEN error ¬ Rope.Concat[error, "LISTEN "];
IF IsEvent[TLI.CONNECT] THEN error ¬ Rope.Concat[error, "CONNECT "];
IF IsEvent[TLI.DATA] THEN error ¬ Rope.Concat[error, "DATA "];
IF IsEvent[TLI.EXDATA] THEN error ¬ Rope.Concat[error, "EXDATA "];
IF IsEvent[TLI.DISCONNECT] THEN {
error ¬ Rope.Concat[error, "DISCONNECT "];
ERROR ConnectionClosed[remoteClose, error];
};
IF IsEvent[TLI.TERROR] THEN error ¬ Rope.Concat[error, "TERROR "];
IF IsEvent[TLI.UDERR] THEN error ¬ Rope.Concat[error, "UDERR "];
IF IsEvent[TLI.ORDREL] THEN error ¬ Rope.Concat[error, "ORDREL "];
};
[] ¬ TLI.TClose[cd: cd];
ERROR ConnectionClosed[localClose, error];
};
Ok, we have a connection established with the requested host. Create a handle and start filling in the pieces.
handle ¬ MakeHandle[sendBuffers: createBuffers, recvBuffers: createBuffers,
getTimeout: getTimeout, putTimeout: putTimeout];
handle.cd ¬ cd;
handle.calls ¬ calls;
handle.remote ¬ calls.rcvTCallAddress­;
handle.local ¬ calls.retTBindAddress­;
RETURN [IO.CreateStream[streamProcs: streamProcs, streamData: handle]];
};
GetLocal: PUBLIC PROC [self: STREAM] RETURNS [local: XNS.Address] = {
handle: Handle = NARROW[self.streamData];
RETURN [EntryGetLocal[handle: handle]];
};
EntryGetLocal: ENTRY PROC [handle: Handle] RETURNS [local: XNS.Address] = {
ENABLE UNWIND => NULL;
RETURN[handle.local];
};
GetRemote: PUBLIC PROC [self: STREAM] RETURNS [remote: XNS.Address] = {
handle: Handle = NARROW[self.streamData];
RETURN [EntryGetRemote[handle: handle]];
};
EntryGetRemote: ENTRY PROC [handle: Handle] RETURNS [local: XNS.Address] = {
ENABLE UNWIND => NULL;
RETURN[handle.remote];
};
GetTimeouts: PUBLIC PROC [self: STREAM]
RETURNS [getTimeout, putTimeout: Milliseconds ¬ 0] = {
Body
};
SetTimeouts: PUBLIC PROC [self: STREAM, getTimeout, putTimeout: Milliseconds ¬ waitForever] = {
Body
};
SetSSType: PUBLIC PROC [self: STREAM, ssType: SubSequenceType] = {
handle: Handle ~ NARROW[self.streamData];
EntrySetSSType[handle: handle, ssType: ssType];
};
EntrySetSSType: ENTRY PROC [handle: Handle, ssType: SubSequenceType] = {
ENABLE UNWIND => NULL;
buffer: Buffer ¬ handle.currentOutputBuffer;
IF buffer # NIL THEN { -- we aren't at a boundry. We need to force one
buffer.state ¬ full;
buffer.attention ¬ FALSE; -- this isn't an attention byte
buffer.eom ¬ FALSE; -- not eom either
buffer.sst ¬ handle.currentOutputSST; -- set the SST. The currentOutputSST is changed by SetSSType.
AppendOutputBuffer[handle: handle, buffer: buffer];
handle.currentOutputSST ¬ ssType; -- set the SST for all new packets
RETURN; -- we did what we needed to
};
We are at a buffer boundry. All we need to do is set the currentOutputSST
handle.currentOutputSST ¬ ssType; -- set the SST for all new packets
};
SendEndOfMessage: PUBLIC PROC [self: STREAM] = {
handle: Handle ~ NARROW[self.streamData];
EntrySendEndOfMessage[handle: handle];
};
EntrySendEndOfMessage: ENTRY PROC [handle: Handle] = {
ENABLE UNWIND => NULL;
buffer: Buffer ¬ handle.currentOutputBuffer;
IF buffer # NIL THEN { -- we aren't at a boundry. We need to force one
buffer.state ¬ full;
buffer.attention ¬ FALSE; -- this isn't an attention byte
buffer.eom ¬ TRUE; -- not eom either
buffer.sst ¬ handle.currentOutputSST; -- set the SST. The currentOutputSST is changed by SetSSType.
AppendOutputBuffer[handle: handle, buffer: buffer];
handle.currentOutputBuffer ¬ NIL; -- we have set eom and forced the buffer onto the queue.
RETURN; -- we did what we needed to
};
We are at a buffer boundry. We need to get a buffer, set is length to 0 and send it
buffer ¬ AllocateOutputBuffer[handle: handle]; -- this will block one is free
buffer.index ¬ 0; -- insure that the buffer is empty before we start filling it up
buffer.bytes ¬ 0;
buffer.state ¬ full;
buffer.attention ¬ FALSE; -- this isn't an attention byte
buffer.eom ¬ TRUE;
buffer.sst ¬ handle.currentOutputSST;
AppendOutputBuffer[handle: handle, buffer: buffer];
handle.currentOutputBuffer ¬ NIL;
};
SendAttention: PUBLIC PROC [self: STREAM, attentionType: AttentionType] = {
Body
};
SendClose: PUBLIC PROC [self: STREAM] RETURNS [ok: BOOL ¬ FALSE] = {
Body
};
SendCloseReply: PUBLIC PROC [self: STREAM] RETURNS [ok: BOOL ¬ FALSE] = {
Body
};
SendNow: PUBLIC PROC [self: STREAM] = {
Body
};
FlushInput: PUBLIC PROC [self: STREAM, wait: BOOL ¬ FALSE]
RETURNS [bytesSkipped: CARD ¬ 0] = {
Body
};
WaitAttention: PUBLIC PROC [self: STREAM, waitTimeout: Milliseconds ¬ waitForever]
RETURNS [AttentionType] = {
RETURN[0];
};
GetStatus: PUBLIC PROC [self: STREAM, reset: BOOL ¬ TRUE]
RETURNS [state: State, ssType: SubSequenceType, attentionType: AttentionType] = {
handle: Handle = NARROW[self.streamData];
[state, ssType, attentionType] ¬ EntryGetStatus[handle, reset];
};
State: TYPE ~ { open, ssTypeChange, endOfMessage, attention };
EntryGetStatus: ENTRY PROC [handle: Handle, reset: BOOL]
RETURNS [state: State, ssType: SubSequenceType, attentionType: AttentionType] ~ {
ENABLE UNWIND => NULL;
buffer: Buffer ¬ handle.currentInputBuffer;
IF handle.closed THEN
RETURN WITH ERROR ConnectionClosed[handle.closeReason, handle.closeText];
IF buffer = NIL THEN -- I think that the only state we can have now is --
RETURN
[state: open, ssType: handle.currentInputSST, attentionType: 0];
IF buffer.attention THEN {
state ¬ attention;
ssType ¬ buffer.sst; -- the sst of the attention packet
attentionType ¬ LOOPHOLE[buffer.bufferRef[0]]; -- still ugly
IF reset THEN { -- dispose of this packet
p: OBAttn ~ handle.recvdOBAttnList;
IF (p # NIL) AND (p.sequence = handle.inBandAttnNumber) THEN -- delete it from the expidited queue
handle.recvdOBAttnList ¬ p.next;
FreeInputBuffer[handle: handle, buffer: buffer];
handle.currentInputBuffer ¬ NIL; -- so that UnsafeGetBlock can make progress
};
RETURN;
}; -- IF buffer.attention
IF buffer.sst # handle.currentInputSST THEN {
IF reset THEN handle.currentInputSST ¬ buffer.sst;
RETURN[state: ssTypeChange, ssType: buffer.sst, attentionType: 0];
};
IF buffer.eom THEN {
IF reset THEN { -- give the packet back
FreeInputBuffer[handle: handle, buffer: buffer];
handle.currentInputBuffer ¬ NIL; -- so that UnsafeGetBlock can make progress
};
RETURN [state: endOfMessage, ssType: handle.currentInputSST, attentionType: 0];
};
RETURN [state: open, ssType: handle.currentInputSST, attentionType: 0];
};
Allocations
AllocateCD: PROC RETURNS [cd: TLI.ConnectionData] = {
bytes: CARD32 ~ BYTES[TLI.ConnectionDataRep];
cd ¬ NEW[TLI.ConnectionDataRep]; -- the rest gets filled in in the call to TOpen
TRUSTED {Basics.FillBytes[dstBase: LOOPHOLE[cd], dstStart: 0, count: bytes, value: 0]};
};
AllocateCalls: PROC RETURNS [calls: Calls] = {
calls ¬ NEW[CallsRep];
calls.reqTBindAddress ¬ NEW[XNS.Address];
calls.reqTBind ¬ NEW[TLI.TBindCall];
calls.reqTBind.addr.maxlen ¬ calls.reqTBind.addr.len ¬ BYTES[XNS.Address];
TRUSTED {calls.reqTBind.addr.char ¬ LOOPHOLE[calls.reqTBindAddress, UnixTypes.CHARPtr]};
calls.retTBindAddress ¬ NEW[XNS.Address];
calls.retTBind ¬ NEW[TLI.TBindCall];
calls.retTBind.addr.maxlen ¬ calls.retTBind.addr.len ¬ BYTES[XNS.Address];
TRUSTED {calls.retTBind.addr.char ¬ LOOPHOLE[calls.retTBindAddress, UnixTypes.CHARPtr]};
calls.sendTCallAddress ¬ NEW[XNS.Address];
calls.sendTCall ¬ NEW[TLI.TCall];
calls.sendTCall.addr.maxlen ¬ calls.sendTCall.addr.len ¬ BYTES[XNS.Address];
TRUSTED {calls.sendTCall.addr.char ¬ LOOPHOLE[calls.sendTCallAddress, UnixTypes.CHARPtr]};
calls.rcvTCallAddress ¬ NEW[XNS.Address];
calls.rcvTCall ¬ NEW[TLI.TCall];
calls.rcvTCall.addr.maxlen ¬ calls.rcvTCall.addr.len ¬ BYTES[XNS.Address];
TRUSTED {calls.rcvTCall.addr.char ¬ LOOPHOLE[calls.rcvTCallAddress, UnixTypes.CHARPtr]};
};
MakeHandle: PROC [sendBuffers, recvBuffers: CARD, getTimeout, putTimeout: Milliseconds]
RETURNS [handle: Handle] ~ {
handle ¬ NEW[HandleRep];
handle.cd ¬ NIL; -- filled in by our caller since it was needed before we got called
handle.calls ¬ NIL; -- filled in by our caller since it was needed before we got called
InitConditions[handle: handle];
InitTimeouts[handle, getTimeout, putTimeout];
handle.inBuffers ¬ NIL;
handle.inFreeBuffers ¬ AllocateBuffers[number: recvBuffers];
handle.outputBuffers ¬ NIL;
handle.outputFreeBuffers ¬ AllocateBuffers[number: sendBuffers];
Process.Detach[FORK Pull[handle]];
handle.auxProcCnt ¬ handle.auxProcCnt.SUCC;
Process.Detach[FORK Push[handle]];
handle.auxProcCnt ¬ handle.auxProcCnt.SUCC;
};
InitConditions: PROC [handle: Handle] ~ {
TRUSTED {
Process.EnableAborts[@handle.auxProcsDone]; -- ????
Process.EnableAborts[@handle.inputReady];
Process.EnableAborts[@handle.inputBufferFree];
Process.EnableAborts[@handle.recvdOBAttn];
Process.EnableAborts[@handle.outputReady];
Process.EnableAborts[@handle.outputBufferFree];
Process.EnableAborts[@handle.flusher];
};
};
InitTimeouts: PROC [handle: Handle, getTimeout, putTimeout: Milliseconds] ~ {
TRUSTED {
Process.SetTimeout[@handle.flusher, flushTime];
};
};
AllocateBuffers: PROC [number: INT] RETURNS [buffer: Buffer ¬ NIL] = {
FOR i: INT IN [0 .. number) DO
new: Buffer ~ NEW[BufferRep];
new.bufferRef ¬ NEW[TEXT[LAST[BufferIndex]]];
new.buffer ¬ LOOPHOLE[new.bufferRef, POINTER] + BYTES[TEXT[0]]; -- to the bits
new.charPTR ¬ LOOPHOLE[new.buffer, UnixTypes.CHARPtr];
new.flags ¬ NEW[INT ¬ 0];
IF buffer = NIL THEN buffer ¬ new ELSE {buffer.next ¬ new; buffer ¬ buffer.next};
ENDLOOP;
};
Buffer management
AllocateInputBuffer: ENTRY PROC [handle: Handle] RETURNS [buffer: Buffer]~ {
ENABLE UNWIND => NULL;
WHILE handle.inFreeBuffers = NIL DO -- we need to wait until one becomes available
WAIT handle.inputBufferFree;
ENDLOOP;
buffer ¬ handle.inFreeBuffers;
handle.inFreeBuffers ¬ handle.inFreeBuffers.next; -- who cares it it's nil
};
FreeInputBuffer: INTERNAL PROC [handle: Handle, buffer: Buffer] = {
ENABLE UNWIND => NULL;
tmp: Buffer ¬ handle.inFreeBuffers; -- might be nil
buffer.state ¬ empty;
handle.inFreeBuffers ¬ buffer;
handle.inFreeBuffers.next ¬ tmp; -- hang the free ones on the list
BROADCAST handle.inputBufferFree; -- tell someone who is waiting for a buffer
};
AppendInputBuffer: ENTRY PROC [handle: Handle, buffer: Buffer] = {
ENABLE UNWIND => NULL;
tmp: Buffer ¬ handle.inBuffers; -- the current head
IF tmp = NIL THEN { -- first time
handle.inBuffers ¬ buffer;
BROADCAST handle.inputReady;
RETURN;
};
UNTIL tmp.next = NIL DO -- find the end
tmp ¬ tmp.next;
ENDLOOP;
tmp.next ¬ buffer; -- tack it on
BROADCAST handle.inputReady;
};
GetFirstInputBuffer: INTERNAL PROC [handle: Handle] RETURNS [buffer: Buffer] = {
ENABLE UNWIND => NULL;
WHILE handle.inBuffers = NIL DO -- we need to wait until one becomes available
WAIT handle.inputReady;
ENDLOOP;
buffer ¬ handle.inBuffers; -- take off the first one
handle.inBuffers ¬ handle.inBuffers.next; -- might be nil, but who cares
};
AllocateOutputBuffer: INTERNAL PROC [handle: Handle] RETURNS [buffer: Buffer]~ {
ENABLE UNWIND => NULL;
WHILE handle.outputFreeBuffers = NIL DO -- we need to wait until one becomes available
WAIT handle.outputBufferFree;
ENDLOOP;
buffer ¬ handle.outputFreeBuffers;
handle.outputFreeBuffers ¬ handle.outputFreeBuffers.next; -- who cares if it's nil
};
FreeOutputBuffer: ENTRY PROC [handle: Handle, buffer: Buffer] = {
ENABLE UNWIND => NULL;
tmp: Buffer ¬ handle.outputFreeBuffers; -- might be nil
buffer.state ¬ empty;
handle.outputFreeBuffers ¬ buffer;
handle.outputFreeBuffers.next ¬ tmp; -- hang the free ones on the list
BROADCAST handle.outputBufferFree; -- tell someone who is waiting for a buffer
};
AppendOutputBuffer: INTERNAL PROC [handle: Handle, buffer: Buffer] = {
ENABLE UNWIND => NULL;
tmp: Buffer ¬ handle.outputBuffers; -- the current head
IF tmp = NIL THEN { -- first time
handle.outputBuffers ¬ buffer;
BROADCAST handle.outputReady;
RETURN;
};
UNTIL tmp.next = NIL DO -- find the end
tmp ¬ tmp.next;
ENDLOOP;
tmp.next ¬ buffer; -- tack it on
BROADCAST handle.outputReady;
};
GetFirstOutputBuffer: ENTRY PROC [handle: Handle] RETURNS [buffer: Buffer] = {
ENABLE UNWIND => NULL;
WHILE handle.outputBuffers = NIL DO -- we need to wait until one becomes available
WAIT handle.outputReady;
ENDLOOP;
buffer ¬ handle.outputBuffers; -- take off the first one
handle.outputBuffers ¬ handle.outputBuffers.next; -- might be nil, but who cares
};
Process management
NotifyAuxProcDone: ENTRY PROC [handle: Handle] ~ {
ENABLE UNWIND => NULL;
IF handle.auxProcCnt = 0 THEN ERROR;
handle.auxProcCnt ¬ handle.auxProcCnt.PRED;
IF handle.auxProcCnt = 0 THEN NOTIFY handle.auxProcsDone;
};
WaitAuxProcsDone: ENTRY PROC [handle: Handle] ~ {
ENABLE UNWIND => NULL;
WHILE handle.auxProcCnt > 0 DO
WAIT handle.auxProcsDone;
ENDLOOP;
};
Input (Pull) Process
Pull: PROC [handle: Handle] ~ {
SetGetting: ENTRY PROC [handle: Handle] = {handle.inputWaiting ¬ TRUE};
UnSetGetting: ENTRY PROC [handle: Handle] = {handle.inputWaiting ¬ FALSE};
WHILE NOT handle.closed DO
nRcv: INT;
cd: TLI.ConnectionData ~ handle.cd;
event: INT ¬ 0;
look: BOOL ¬ FALSE;
buffer: Buffer ¬ AllocateInputBuffer[handle: handle]; -- get (wait) for a free buffer. This really doesn't allocate anything, but that's the function it performs
SetGetting[handle: handle];
nRcv ¬ ReceiveBuffer[cd: cd, buffer: buffer !
LookEvent => {event ¬ lookEvent; look ¬ TRUE; CONTINUE}
]; -- get the packet
UnSetGetting[handle: handle];
IF NOT look
THEN {
ProcessBuffer[handle: handle, buffer: buffer, nRcv: nRcv];
AppendInputBuffer[handle: handle, buffer: buffer];
}
ELSE {
SELECT event FROM
TLI.ORDREL => { -- the other end wants us to close in an orderly way
PrintF["Orderly close requested"];
OrderlyClose[handle: handle, initiate: FALSE]; -- set up for the close. This proc returns only when we are closed with the other end
EXIT;
};
TLI.DISCONNECT => { -- the other end just wants us to go away
PrintF["Disconnect requested"];
BlastClose[handle: handle]; -- don't really care how we close, just close
EXIT;
};
TLI.LISTEN => {ERROR BadSysCall["LISTEN"]}; -- an connect indication received
TLI.CONNECT => {ERROR BadSysCall["CONNECT"]}; -- a connect confirmation received
TLI.DATA => {ERROR BadSysCall["DATA"]}; -- normal data
TLI.EXDATA => {ERROR BadSysCall["EXDATA"]}; -- expidited data
TLI.TERROR => {ERROR BadSysCall["TERROR"]}; -- a real problem
TLI.UDERR => {ERROR BadSysCall["UDERR"]}; -- should probably not happen either
ENDCASE => {ERROR BadSysCall["ENDCASE"]}; -- unknown event type
};
ENDLOOP;
NotifyAuxProcDone[handle];
};
ReceiveBuffer: PROC [cd: TLI.ConnectionData, buffer: Buffer] RETURNS [INT] = {
flags: REF INT ~ buffer.flags; -- will also contain the sst field
nRcv: INT ¬ 0;
IF buffer.state # empty THEN SystemError; -- this is probably not right, but I don't know what to do for the moment
buffer.state ¬ filling; -- set the flag for anyone that might be waiting on it
DO -- the loop is in here because of the errno and # TLOOK stuff in the error handler
nRcv ¬ TLI.TRcv[cd: cd, buf: buffer.charPTR, nbytes: packetSize, flags: flags];
IF nRcv < 0 THEN { -- something is wrong
error: UnixErrno.Errno ~ UnixErrno.GetErrno[];
IF (error = EINTR) AND (cd.errno # TLOOK) THEN LOOP; -- I'm not sure what this does
SELECT cd.errno FROM
TNODATA => { -- this is set when the underlying GetMsg returns < 0 and EAGAIN is the errno. I think that this can't happen according to the SunOS documentation
IF error = EWOULDBLOCK THEN RETURN[nRcv]; -- I'm not sure that this can happen under pcr since the XR¬GetMsg is done by pcr as blocked IO.
ERROR BadSysCall["TNODATA"];
};
TSYSERR => { -- this is the general catch-all for things that have gone wrong. I don't think that this can be recovered from
ERROR BadSysCall["TSYSERR"];
};
TLOOK => { -- an asynchonous event happened
event: INT ~ TLI.TLook[cd: cd];
ERROR LookEvent[event];
};
TBADF => -- a bad fd on TAccept -- ERROR BadSysCall["TBADF"];
TOUTSTATE => -- reports out of state -- ERROR BadSysCall["TOUTSTATE"];
TBUFOVFLW => -- control buffer to small -- ERROR BadSysCall["TBUFOVFLW"];
TFLOW => { -- indicates that flow control is invoked on this call. I think that this needs to be retried
ERROR BadSysCall["TFLOW"];
};
TNODIS => -- we didn't get the right answer on a dissconnect -- ERROR BadSysCall["TNODIS"];
TNOREL => -- we didn't get the orderly release -- ERROR BadSysCall["TNOREL"];
TNOTSUPPORT => -- something is really wrong -- ERROR BadSysCall["TNOTSUPPORT"];
TSTATECHNG => -- state is changing -- ERROR BadSysCall["TSTATECHNG"];
ENDCASE => ERROR BadSysCall["something else"];
}; -- something is wrong
buffer.state ¬ full; -- set the flag for anyone that might be waiting on it
RETURN[nRcv];
ENDLOOP;
};
ProcessBuffer: ENTRY PROC [handle: Handle, buffer: Buffer, nRcv: INT] = {
ENABLE UNWIND => NULL;
cd: TLI.ConnectionData ~ handle.cd;
flags: REF INT ~ buffer.flags; -- also contains the sst field
PrintF["ProcessBuffer"];
PrintF[IO.PutFR1[" nRcv = %g ", [integer[nRcv]]]];
buffer.index ¬ 0; -- reset the index
buffer.bytes ¬ nRcv; -- set the number of bytes in the packet
Determine the SubSequenceType of the incoming packet. The gray book says that this field is present even in attention packets, so pass it along
buffer.sst ¬ GetSST[flags: flags­];
Check to see if this is an attention packet.
IF GetAttention[flags: flags­] THEN { -- attention byte
IF GetMore[flags: flags­]
THEN { -- this is the expedited (OOB) copy of the attention
sequence: INT ~ handle.outOfBandAttnNumber;
IF buffer.bytes # 1 THEN ERROR SystemError; -- not good since there's supposed to be only one byte
GotOBAttn[handle: handle, sequence: sequence,
type: LOOPHOLE[buffer.bufferRef[0]]]; -- ugly ain't it
handle.outOfBandAttnNumber ¬ handle.outOfBandAttnNumber + 1; -- update the number to get ready for the next one
buffer.state ¬ full; -- mark our condition
PrintF[" attention, OOB\n"];
BROADCAST handle.inputReady;
RETURN;
}
ELSE { -- this is the second (InBand) copy
sequence: INT ~ handle.inBandAttnNumber;
IF buffer.bytes # 1 THEN ERROR SystemError; -- not good since there's supposed to be only one byte
buffer.attention ¬ TRUE; -- this is an attention packet
buffer.sequence ¬ sequence; -- to correlate with the OOB one
handle.inBandAttnNumber ¬ handle.inBandAttnNumber + 1;
buffer.state ¬ full; -- mark our condition
PrintF[" attention, inline\n"];
BROADCAST handle.inputReady;
RETURN;
};
};
At this point we know that it's just a data packet. Check for EOM
buffer.eom ¬ NOT GetMore[flags: flags­];
buffer.state ¬ full; -- mark our condition
BROADCAST handle.inputReady;
PrintF[IO.PutFR1["eom = %g\n", [boolean[buffer.eom]]]];
buffer.bufferRef.length ¬ nRcv;
};
GotOBAttn: INTERNAL PROC [handle: Handle, sequence: INT, type: AttentionType] ~ {
p, prev, new: OBAttn;
prev ¬ NIL; p ¬ handle.recvdOBAttnList;
DO
IF p = NIL THEN EXIT;
prev ¬ p; p ¬ p.next;
ENDLOOP;
new ¬ NEW[OBAttnObject ¬ [next: p, sequence: sequence, attentionType: type]];
IF prev = NIL THEN handle.recvdOBAttnList ¬ new ELSE prev.next ¬ new;
BROADCAST handle.recvdOBAttn;
};
Output (Push) Process
Push: PROC [handle: Handle] ~ { -- there had best be only one of these per stream or things might get real confused
cd: TLI.ConnectionData ~ handle.cd;
sent: INT ¬ 0;
WHILE NOT handle.closed DO
buffer: Buffer ¬ GetFirstOutputBuffer[handle: handle]; -- get a buffer to send. This will block until one is ready
flags: INT ¬ SetSST[sst: buffer.sst, flags: 0];
flags ¬ SetMore[more: NOT buffer.eom, flags: flags];
flags ¬ SetAttention[attention: buffer.attention, flags: flags];
sent ¬ TLI.TSnd[cd: cd, buf: buffer.charPTR, nbytes: buffer.bytes, flags: flags];
IF sent < 0 OR sent # buffer.bytes THEN { -- we need to disconnect
BlastClose[handle: handle];
handle.closed ¬ TRUE;
EXIT;
};
FreeOutputBuffer[handle: handle, buffer: buffer];
ENDLOOP;
NotifyAuxProcDone[handle];
};
Usefull things for dealing with the TLI
GetAttention: PROC [flags: INT] RETURNS [attention: BOOL] = {
RETURN[ Basics.BITAND[flags, TLI.EXPEDITED] # 0 ];
};
GetMore: PROC [flags: INT] RETURNS [more: BOOL] = {
RETURN[ Basics.BITAND[flags, TLI.MORE] # 0 ];
};
GetSST: PROC [flags: INT] RETURNS [SubSequenceType] = {
tmp: INT ~ Basics.BITAND[Basics.BITRSHIFT[value: flags, count: 8], 0FFh]; -- the sst
IF tmp <= 255 THEN RETURN[LOOPHOLE[tmp]] ELSE ERROR SystemError;
};
SetAttention: PROC [attention: BOOL, flags: INT ¬ 0] RETURNS [INT] = {
RETURN[(IF attention THEN Basics.BITOR[flags, TLI.EXPEDITED] ELSE flags)];
};
SetMore: PROC [more: BOOL, flags: INT ¬ 0] RETURNS [INT] = {
RETURN[(IF more THEN Basics.BITOR[flags, TLI.MORE] ELSE flags)];
};
SetSST: PROC [sst: SubSequenceType, flags: INT ¬ 0] RETURNS [INT] = {
tmp: INT ¬ sst;
tmp ¬ Basics.BITLSHIFT[value: flags, count: 8]; -- move it up 1 byte
tmp ¬ Basics.BITAND[tmp, 0FF00h]; -- make sure that it's only the sst
tmp ¬ Basics.BITOR[tmp, flags];
RETURN[tmp];
};
Debugging stuff
UnixPrintF: PROC [format: CStrings.CString] RETURNS [INT] = TRUSTED MACHINE CODE {"printf"};
PrintF: PROC [rope: ROPE] = {
s: CStrings.CString ~ UXStrings.Create[rope];
[] ¬ UnixPrintF[s];
};
Start trap
CreateStreamProcs[];
END.