NetworkStreamTCPImpl.mesa
Copyright Ó 1989, 1992 by Xerox Corporation. All rights reserved.
Demers, September 20, 1989 8:16:49 am PDT
Christian Jacobi, July 24, 1992 2:04 pm PDT
Chauser, January 6, 1992 10:53 am PST
DIRECTORY
Basics,
Finalize,
IO,
NetworkStream,
NetworkStreamSupportTCP,
Process,
RefText,
Rope,
RuntimeError
;
NetworkStreamTCPImpl: CEDAR MONITOR
LOCKS d USING d: PrivateStreamData
IMPORTS Basics, Finalize, IO, NetworkStream, NetworkStreamSupportTCP, Process, RefText, RuntimeError
~ {
OPEN NS: NetworkStream, NSS: NetworkStreamSupportTCP;
Parameters
myProtocolFamily: ATOM ¬ $ARPA;
myTransportClasses: LIST OF ATOM ¬ LIST[$TCP, $basicStream];
defaultInBufferLength: CARDINAL ¬ 2048;
defaultOutBufferLength: CARDINAL ¬ 2048;
Types
CompletionCode: TYPE ~ NSS.CompletionCode;
ROPE: TYPE ~ Rope.ROPE;
STREAM: TYPE ~ IO.STREAM;
Stream Implementation
BoundaryCondition: TYPE ~ { none, attention, endOfStream };
PrivateStreamData: TYPE ~ REF PrivateStreamDataRecord;
PrivateStreamDataRecord: TYPE ~ MONITORED RECORD [
sd: NSS.StreamDescriptor,
sibling: PrivateStreamData,
streamIndex: INT ¬ 0,
buffer: REF TEXT ¬ NIL,
bufferIndex: CARDINAL ¬ 0,
localClose: BOOL ¬ FALSE,
boundaryCondition: BoundaryCondition ¬ none
];
NewPrivateStreamDataPair: PROC [sd: NSS.StreamDescriptor, inBufLen: CARDINAL, outBufLen: CARDINAL] RETURNS [dIn, dOut: PrivateStreamData] ~ {
dIn ¬ NEW[ PrivateStreamDataRecord ¬ [sd~sd] ];
dOut ¬ NEW[ PrivateStreamDataRecord ¬ [sd~sd] ];
dIn.sibling ¬ dOut;
dOut.sibling ¬ dIn;
dIn.buffer ¬ RefText.New[inBufLen];
dOut.buffer ¬ RefText.New[outBufLen];
};
NetworkStreamDataFromStream: PROC [s: STREAM] RETURNS [NS.NetworkStreamData] ~ INLINE {
RETURN [NARROW[s.streamData]] };
PrivateStreamDataFromStream: PROC [s: STREAM] RETURNS [PrivateStreamData] ~ INLINE {
RETURN [NARROW[NARROW[s.streamData, NS.NetworkStreamData].data]] };
PrivateStreamDataFromNetworkStreamData: PROC [d: NS.NetworkStreamData] RETURNS [PrivateStreamData] ~ INLINE {
RETURN [NARROW[d.data]] };
RaiseClosed: PROC [s: STREAM] ~ {
NS.RaiseIOError[StreamClosed, s, LIST[$streamClosed], "stream closed"];
};
RaiseEndOf: PROC [s: STREAM] ~ {
NS.RaiseIOError[Failure, s, LIST[$endOfStream], "end of stream"];
};
RaiseErrorFromCC: PROC [s: STREAM, cc: NSS.CompletionCode, codes: LIST OF ATOM, msg: ROPE] ~ {
cca: ATOM;
cca ¬ NSS.AtomFromCC[cc];
codes ¬ CONS[cca, codes];
IF (s = NIL) AND (cca # $timeout) THEN codes ¬ CONS[$networkStreamError, codes];
NS.RaiseIOError[Failure, s, codes, msg];
};
MyRegisteredCreateStreamsProc: NS.RegisteredCreateStreamsProc -- [registration, remote, timeout, transportParameters] RETURNS [in, out] -- ~ {
sd: NSS.StreamDescriptor ¬ NIL;
{
ENABLE UNWIND => {
IF sd # NIL THEN [] ¬ NSS.DestroyStreamDescriptor[sd, TRUE];
};
cc: NSS.CompletionCode;
tRemote, tLocal: ROPE;
nsdIn, nsdOut: NS.NetworkStreamData;
[cc, sd] ¬ NSS.CreateStreamDescriptor[remote, timeout];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$connect], "can't connect"];
[cc, tLocal, tRemote] ¬ NSS.AddressesFromStreamDescriptor[sd];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$getStreamInfo], "can't get local/remote address"];
[in, out] ¬ NS.CreateUninitializedStreams[registration];
nsdIn ¬ NARROW[in.streamData];
nsdOut ¬ NARROW[out.streamData];
nsdIn.local ¬ nsdOut.local ¬ tLocal;
nsdIn.remote ¬ nsdOut.remote ¬ tRemote;
[dIn~nsdIn.data, dOut~nsdOut.data] ¬ NewPrivateStreamDataPair[sd, defaultInBufferLength, defaultOutBufferLength];
};
};
FillBuf: PROC [s: STREAM, d: PrivateStreamData] ~ {
b: REF TEXT ¬ d.buffer;
cc: NSS.CompletionCode;
nBytes: INT;
atAttn: BOOL;
IF d.bufferIndex < b.length THEN ERROR;
TRUSTED {
[cc, nBytes] ¬ NSS.Receive[
d.sd,
[
base~LOOPHOLE[ LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]] ],
startIndex~0,
count~b.maxLength
]
];
};
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$Read], "read error"];
IF nBytes < b.maxLength
THEN {
[cc, atAttn] ¬ NSS.AtIBAttn[d.sd];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$AtIBAttn], "read error (AtIBAttn)"];
d.boundaryCondition ¬ SELECT TRUE FROM
atAttn => attention,
nBytes = 0 => endOfStream,
ENDCASE => none;
}
ELSE {
d.boundaryCondition ¬ none;
};
d.bufferIndex ¬ 0;
b.length ¬ nBytes;
};
FlushBuf: PROC [s: STREAM, d: PrivateStreamData] ~ {
b: REF TEXT ¬ d.buffer;
cc: NSS.CompletionCode;
TRUSTED {
cc ¬ NSS.Send[
d.sd,
[
base~LOOPHOLE[ LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]] ],
startIndex~d.bufferIndex,
count~b.length-d.bufferIndex
]
];
};
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$Write], "write error"];
d.bufferIndex ¬ b.length ¬ 0;
};
MyEntryGetChar: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [c: CHAR] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
i: CARDINAL;
DO
IF d.localClose THEN RaiseClosed[s];
IF (i ¬ d.bufferIndex) < b.length THEN EXIT;
IF d.boundaryCondition # none THEN RaiseEndOf[s];
FillBuf[s, d];
ENDLOOP;
c ¬ b[i];
d.bufferIndex ¬ i + 1;
d.streamIndex ¬ d.streamIndex + 1;
};
MyGetChar: PROC [self: STREAM] RETURNS [CHAR] ~ {
RETURN [MyEntryGetChar[self, PrivateStreamDataFromStream[self]]];
};
MyEntryUnsafeGetBlock: ENTRY PROC [s: STREAM, d: PrivateStreamData, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT ¬ 0] ~ CHECKED {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
i: CARD;
len: INT;
nBytesMoreWanted: INT ¬ block.count;
blockIndex: INT ¬ block.startIndex;
DO
IF d.localClose THEN RaiseClosed[s];
IF nBytesMoreWanted <= 0 THEN RETURN;
i ¬ d.bufferIndex;
len ¬ INT[b.length] - INT[i];
IF len <= 0 THEN {
IF d.boundaryCondition # none THEN RETURN;
FillBuf[s, d];
LOOP;
};
IF len > nBytesMoreWanted THEN len ¬ nBytesMoreWanted;
TRUSTED {
len ¬ Basics.ByteBlt[
to~[
blockPointer~LOOPHOLE[block.base],
startIndex~blockIndex,
stopIndexPlusOne~blockIndex+len
],
from~[
blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]],
startIndex~i,
stopIndexPlusOne~i+len
]
];
};
d.bufferIndex ¬ i + len;
d.streamIndex ¬ d.streamIndex + len;
nBytesRead ¬ nBytesRead + len;
blockIndex ¬ blockIndex + len;
nBytesMoreWanted ¬ nBytesMoreWanted - len;
ENDLOOP;
};
MyUnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
RETURN [MyEntryUnsafeGetBlock[self, d, block]];
};
MyEntryEndOf: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [BOOL] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
IF d.localClose THEN RaiseClosed[s];
IF d.bufferIndex < b.length THEN RETURN [FALSE];
RETURN [d.boundaryCondition # none];
};
MyEndOf: PROC [self: STREAM] RETURNS [BOOL] ~ {
RETURN [MyEntryEndOf[self, PrivateStreamDataFromStream[self]]];
};
MyEntryCharsAvail: ENTRY PROC [s: STREAM, d: PrivateStreamData, wait: BOOL] RETURNS [INT] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
nLeftInBuffer: INT;
cc: NSS.CompletionCode;
inputReady: BOOL;
DO
IF d.localClose THEN RaiseClosed[s];
nLeftInBuffer ¬ INT[b.length] - INT[d.bufferIndex];
IF nLeftInBuffer > 0 THEN RETURN [nLeftInBuffer];
IF d.boundaryCondition # none THEN RETURN [INT.LAST];
[cc, inputReady] ¬ NSS.InputReady[d.sd, FALSE];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CharsAvail], "can't test input ready"];
IF (NOT inputReady) AND (NOT wait) THEN RETURN[0];
FillBuf[s, d];
ENDLOOP;
};
MyCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [INT] ~ {
RETURN [MyEntryCharsAvail[self, PrivateStreamDataFromStream[self], wait]];
};
MyEntryPutChar: ENTRY PROC [s: STREAM, d: PrivateStreamData, char: CHAR] ~ {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
i: CARDINAL;
DO
IF d.localClose THEN RaiseClosed[s];
IF (i ¬ b.length) < b.maxLength THEN EXIT;
FlushBuf[s, d];
ENDLOOP;
d.streamIndex ¬ d.streamIndex + 1;
b.length ¬ i + 1;
b[i] ¬ char;
};
MyPutChar: PROC [self: STREAM, char: CHAR] ~ {
MyEntryPutChar[self, PrivateStreamDataFromStream[self], char];
};
MyEntryUnsafePutBlock: ENTRY PROC [s: STREAM, d: PrivateStreamData, block: IO.UnsafeBlock] ~ CHECKED {
ENABLE UNWIND => NULL;
b: REF TEXT ¬ d.buffer;
i: CARD;
len: INT;
nBytesMoreToWrite: INT ¬ block.count;
blockIndex: INT ¬ block.startIndex;
DO
IF d.localClose THEN RaiseClosed[s];
IF nBytesMoreToWrite <= 0 THEN RETURN;
i ¬ b.length;
len ¬ INT[b.maxLength] - INT[i];
IF len <= 0 THEN {
FlushBuf[s, d];
LOOP;
};
IF len > nBytesMoreToWrite THEN len ¬ nBytesMoreToWrite;
TRUSTED {
len ¬ Basics.ByteBlt[
to~[
blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]],
startIndex~i,
stopIndexPlusOne~i+len
],
from~[
blockPointer~LOOPHOLE[block.base],
startIndex~blockIndex,
stopIndexPlusOne~blockIndex+len
]
];
};
b.length ¬ i + len;
d.streamIndex ¬ d.streamIndex + len;
blockIndex ¬ blockIndex + len;
nBytesMoreToWrite ¬ nBytesMoreToWrite - len;
ENDLOOP;
};
MyUnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
MyEntryUnsafePutBlock[self, d, block];
};
MyEntryFlush: ENTRY PROC [s: STREAM, d: PrivateStreamData] ~ {
ENABLE UNWIND => NULL;
IF d.localClose THEN RaiseClosed[s];
IF d.buffer.length > 0 THEN FlushBuf[s, d];
};
MyFlush: PROC [self: STREAM] ~ {
MyEntryFlush[self, PrivateStreamDataFromStream[self]];
};
MyEntryInputReset: ENTRY PROC [s: STREAM, d: PrivateStreamData] ~ {
ENABLE UNWIND => NULL;
n: CARDINAL;
DO
IF d.localClose THEN RETURN;
IF (n ¬ (d.buffer.length - d.bufferIndex)) > 0 THEN {
d.bufferIndex ¬ d.bufferIndex + n;
d.streamIndex ¬ d.streamIndex + n;
};
IF d.boundaryCondition # none THEN RETURN;
FillBuf[s, d];
ENDLOOP;
};
MyInputReset: PROC [self: STREAM] ~ {
MyEntryInputReset[self, PrivateStreamDataFromStream[self]];
};
specialLockForMustDoClose: PrivateStreamData ¬ NEW[ PrivateStreamDataRecord ¬ []];
MustDoClose: ENTRY PROC [real<<to be closed>>: PrivateStreamData, d<<special lock>>: PrivateStreamData ¬ specialLockForMustDoClose] RETURNS [doClose: BOOL] ~ {
mySibling: PrivateStreamData;
IF real.localClose THEN RETURN[FALSE];
real.localClose ¬ TRUE;
IF (mySibling ¬ real.sibling) = NIL THEN ERROR;
real.sibling ¬ NIL;
RETURN[ mySibling.localClose ];
};
MyIOClose: PROC [self: STREAM, abort: BOOL] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
cc: NSS.CompletionCode;
IF NOT MustDoClose[d] THEN RETURN;
[cc] ¬ NSS.DestroyStreamDescriptor[d.sd, abort];
IF NSS.FailureCC[cc] AND (NOT abort) THEN RaiseErrorFromCC[self, cc, LIST[$Close], "I/O error in close"];
};
MyOutputClose: PROC [self: STREAM, abort: BOOL] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
IF d.localClose THEN RETURN;
IF NOT abort THEN IO.Flush[self
-- ! IO.Error, NS.Timeout => CONTINUE -- ];
MyIOClose[self, abort];
};
MyEntryGetIndex: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [INT] ~ {
ENABLE UNWIND => NULL;
IF d.localClose THEN RaiseClosed[s];
RETURN[d.streamIndex];
};
MyGetIndex: PROC [self: STREAM] RETURNS [INT] ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[self];
RETURN[MyEntryGetIndex[self, d]];
};
myInStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[
variety~input,
class~$networkStream,
getChar~MyGetChar,
unsafeGetBlock~MyUnsafeGetBlock,
endOf~MyEndOf,
charsAvail~MyCharsAvail,
reset~MyInputReset,
close~MyIOClose,
getIndex~MyGetIndex
];
myOutStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[
variety~output,
class~$networkStream,
putChar~MyPutChar,
unsafePutBlock~MyUnsafePutBlock,
flush~MyFlush,
close~MyOutputClose,
getIndex~MyGetIndex
];
MyEntrySetTimeout: ENTRY PROC [s: STREAM, nsd: NS.NetworkStreamData, d: PrivateStreamData, timeout: NS.Milliseconds, signalTimeout: BOOL, output: BOOL] ~ {
ENABLE UNWIND => NULL;
cc: NSS.CompletionCode;
IF d.localClose THEN RaiseClosed[s];
nsd.timeout ¬ timeout;
nsd.signalTimeout ¬ signalTimeout;
IF output
THEN cc ¬ NSS.SetPutTimeout[d.sd, timeout]
ELSE cc ¬ NSS.SetGetTimeout[d.sd, timeout];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$SetTimeout], "can't set timeout"];
};
MyInputSetTimeout: NS.SetTimeoutProc -- [stream, timeout, signalTimeout] -- ~ {
nsd: NS.NetworkStreamData ¬ NARROW[stream.streamData];
d: PrivateStreamData ¬ NARROW[nsd.data];
MyEntrySetTimeout[stream, nsd, d, timeout, signalTimeout, FALSE];
};
MyOutputSetTimeout: NS.SetTimeoutProc -- [stream, timeout, signalTimeout] -- ~ {
nsd: NS.NetworkStreamData ¬ NARROW[stream.streamData];
d: PrivateStreamData ¬ NARROW[nsd.data];
MyEntrySetTimeout[stream, nsd, d, timeout, signalTimeout, TRUE];
};
MyEntryGetIndexDetails: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [index, bytesInBuffer: INT] ~ {
ENABLE UNWIND => NULL;
IF d.localClose THEN RaiseClosed[s];
bytesInBuffer ¬ INT[d.buffer.length] - INT[d.bufferIndex];
index ¬ d.streamIndex;
};
MyInputGetIndexDetails: NS.GetIndexDetailsProc -- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[stream];
bytesInBuffer: INT;
[index~index, bytesInBuffer~bytesInBuffer] ¬ MyEntryGetIndexDetails[stream, d];
RETURN[ index, index+bytesInBuffer, -1 ];
};
MyOutputGetIndexDetails: NS.GetIndexDetailsProc -- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[stream];
bytesInBuffer: INT;
[index~index, bytesInBuffer~bytesInBuffer] ¬ MyEntryGetIndexDetails[stream, d];
RETURN[ index, index-bytesInBuffer, -1 ];
};
MyEntryGetStreamState: ENTRY PROC [s: STREAM, d: PrivateStreamData, reset: BOOL] RETURNS [ss: NS.StreamState] ~ {
ENABLE UNWIND => NULL;
IF d.localClose THEN RaiseClosed[s];
IF d.buffer.length > d.bufferIndex THEN RETURN [ss~open];
SELECT d.boundaryCondition FROM
none => ss ¬ open;
attention => { ss ¬ attention; IF reset THEN d.boundaryCondition ¬ none };
endOfStream => { ss ¬ remoteClosed };
ENDCASE => ERROR;
};
MyGetStreamState: NS.GetStreamStateProc -- [in, reset] RETURNS [streamState, subStreamType, attentionType] -- ~ {
d: PrivateStreamData ~ PrivateStreamDataFromStream[in];
streamState ¬ MyEntryGetStreamState[in, d, reset];
subStreamType ¬ 0;
attentionType ¬ 0;
};
MyWaitAttention: NS.WaitAttentionProc -- [in, timeout] RETURNS [attentionType] -- ~ {
This doesn't need to (and better not) be an ENTRY proc!
d: PrivateStreamData ~ PrivateStreamDataFromStream[in];
cc: NSS.CompletionCode;
at: NS.AttentionType;
IF d.localClose THEN RaiseClosed[in];
[cc, at] ¬ NSS.WaitOOBAttn[d.sd, timeout];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[in, cc, LIST[$WaitAttn], "error in wait attention proc"];
RETURN [0];
};
MySendAttention: NS.SendAttentionProc -- [out, attentionType] -- ~ {
This doesn't need to be an ENTRY proc!
d: PrivateStreamData ~ PrivateStreamDataFromStream[out];
cc: NSS.CompletionCode;
IF d.localClose THEN RaiseClosed[out];
[cc] ¬ NSS.SendAttn[d.sd, attentionType];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[out, cc, LIST[$SendAttn], "error in send attention proc"];
};
myInNetworkStreamProcs: NS.NetworkStreamProcs ¬ NEW[
NS.NetworkStreamProcsRecord ¬ [
setTimeout~MyInputSetTimeout,
getIndexDetails~MyInputGetIndexDetails,
getStreamState~MyGetStreamState,
waitAttention~MyWaitAttention
]
];
myOutNetworkStreamProcs: NS.NetworkStreamProcs ¬ NEW[
NS.NetworkStreamProcsRecord ¬ [
setTimeout~MyOutputSetTimeout,
getIndexDetails~MyOutputGetIndexDetails,
sendAttention~MySendAttention
]
];
Listener Implementation
PrivateListenerData: TYPE ~ REF PrivateListenerDataRecord;
PrivateListenerDataRecord: TYPE ~ RECORD [
ld: NSS.ListenerDescriptor,
daemon: PROCESS,
destroyed: BOOL ¬ FALSE
];
ListenerDaemon: PROC [h: Finalize.Handle -- for listener -- ] ~ {
ENABLE ABORTED => CONTINUE;
listener: NS.Listener;
d: PrivateListenerData;
r: NS.Registration;
cc: NSS.CompletionCode ¬ NSS.successCC;
sd: NSS.StreamDescriptor ¬ NIL;
tLocal, tRemote: ROPE;
in, out: STREAM;
nsdIn, nsdOut: NS.NetworkStreamData;
CleanUp: PROC ~ {
IF NSS.FailureCC[cc] THEN Process.PauseMsec[50]; -- reduce CPU load
IF sd # NIL THEN [] ¬ NSS.DestroyStreamDescriptor[sd, TRUE];
sd ¬ NIL;
};
listener ¬ NARROW[Finalize.HandleToObject[h]];
d ¬ NARROW[listener.data];
r ¬ listener.registration;
listener ¬ NIL;
{ ENABLE UNWIND => CleanUp[];
DO
CleanUp[];
[cc, sd] ¬ NSS.AcceptConnection[d.ld];
IF NSS.FailureCC[cc] THEN { sd ¬ NIL -- (unnecessary) --; LOOP };
[cc, tLocal, tRemote] ¬ NSS.AddressesFromStreamDescriptor[sd];
IF NSS.FailureCC[cc] THEN LOOP;
[in, out] ¬ NS.CreateUninitializedStreams[r];
nsdIn ¬ NARROW[in.streamData];
nsdOut ¬ NARROW[out.streamData];
nsdIn.local ¬ nsdOut.local ¬ tLocal;
nsdIn.remote ¬ nsdOut.remote ¬ tRemote;
[dIn~nsdIn.data, dOut~nsdOut.data] ¬ NewPrivateStreamDataPair[sd, defaultInBufferLength, defaultOutBufferLength];
sd ¬ NIL;
listener ¬ NARROW[Finalize.HandleToObject[h]];
TRUSTED {
Process.Detach[ FORK listener.listenerWorkerProc[listener, in, out] ];
};
listener ¬ NIL;
ENDLOOP;
};
};
dummyListenerFQ: Finalize.FinalizationQueue ¬ Finalize.NewFQ[];
MyRegisteredCreateListenerProc: NS.RegisteredCreateListenerProc -- [registration, local, transportParameters, listenerWorkerProc, listenerWorkerClientData] RETURNS [listener] -- ~ {
ld: NSS.ListenerDescriptor ¬ NIL;
{ ENABLE UNWIND => { IF ld # NIL THEN [] ¬ NSS.DestroyListenerDescriptor[ld]; };
d: PrivateListenerData;
h: Finalize.Handle ¬ NIL;
cc: NSS.CompletionCode;
[cc, ld] ¬ NSS.CreateListenerDescriptor[local];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CreateListener], "can't create listener"];
[cc, local] ¬ NSS.AddressFromListenerDescriptor[ld];
IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CreateListener], "can't determine listener address"];
listener ¬ NS.CreateUninitializedListener[registration];
h ¬ Finalize.EnableFinalization[listener, dummyListenerFQ];
[] ¬ Finalize.DisableFinalization[h];
listener.local ¬ local;
listener.listenerWorkerProc ¬ listenerWorkerProc;
listener.listenerWorkerClientData ¬ listenerWorkerClientData;
listener.data ¬ d ¬ NEW [PrivateListenerDataRecord];
d.ld ¬ ld;
d.daemon ¬ FORK ListenerDaemon[h];
ld ¬ NIL;
};
};
destroyListenerLock: PrivateStreamData ¬ NEW[PrivateStreamDataRecord ¬ [sd~NIL, sibling~NIL]];
TestAndSetListenerDestroyed: ENTRY PROC [pld: PrivateListenerData, d: PrivateStreamData ¬ destroyListenerLock] RETURNS [alreadyDestroyed: BOOL] ~ {
ENABLE UNWIND => NULL;
alreadyDestroyed ¬ pld.destroyed;
pld.destroyed ¬ TRUE;
};
MyDestroyListener: NS.DestroyListenerProc -- [listener] -- ~ {
pld: PrivateListenerData ¬ NARROW[listener.data];
IF TestAndSetListenerDestroyed[pld].alreadyDestroyed THEN RETURN;
[] ¬ NSS.DestroyListenerDescriptor[pld.ld];
TRUSTED {
Process.Abort[pld.daemon];
JOIN pld.daemon;
};
};
myListenerProcs: NS.ListenerProcs ¬ NEW[NS.ListenerProcsRecord ¬ [
destroyListener~MyDestroyListener
]];
Debugging Stuff
EchoWorker: NetworkStream.ListenerWorkerProc -- [listener, in, out] -- ~ {
ENABLE UNWIND => { IO.Close[in, TRUE]; IO.Close[out, TRUE] };
c: CHAR;
DO
c ¬ IO.GetChar[in ! IO.EndOfStream => EXIT];
IO.PutChar[out, c];
NS.SendSoon[out, 0];
ENDLOOP;
IO.Close[in, TRUE]; IO.Close[out, TRUE];
};
StartEchoer: PROC [pf: ATOM, local: ROPE, tc: ATOM]
RETURNS [listener: NetworkStream.Listener] ~ {
listener ¬ NetworkStream.CreateListener[pf, local, tc, NIL, EchoWorker, NIL];
};
StartEchoerArpaBasicStream: PROC [local: ROPE] RETURNS [listener: NetworkStream.Listener] ~ {
listener ¬ NetworkStream.CreateListener[$ARPA, local, $basicStream, NIL, EchoWorker, NIL];
};
StreamPair: TYPE ~ RECORD [ in, out: IO.STREAM ];
CreateStreamPairArpaBasicStream: PROC [remote: ROPE, timeoutMsec: CARD32] RETURNS [streamPair: REF StreamPair] ~ {
streamPair ¬ NEW[StreamPair ¬ [NIL, NIL]];
[streamPair.in, streamPair.out] ¬ NetworkStream.CreateStreams[$ARPA, remote, $basicStream, timeoutMsec, NIL];
};
PutC: PROC [s: IO.STREAM, c: CHAR] ~ { IO.PutChar[s, c] };
FlushC: PROC [s: IO.STREAM] ~ { IO.Flush[s] };
GetC: PROC [s: IO.STREAM] RETURNS[c: CHAR] ~ { c ¬ IO.GetChar[s] };
Registration
RegisterSelf: PROC ~ {
registrationForThisTransportClass: NS.Registration;
MyRegistrationCallbackProc: NS.RegistrationCallbackProc -- [old] RETURNS [new] -- ~ { RETURN[registrationForThisTransportClass] };
FOR each: LIST OF ATOM ¬ myTransportClasses, each.rest WHILE each # NIL DO
registrationForThisTransportClass ¬ NEW[NS.RegistrationRecord ¬ [
protocolFamily~myProtocolFamily,
transportClass~each.first,
createStreams~MyRegisteredCreateStreamsProc,
inStreamProcs~myInStreamProcs,
inNetworkStreamProcs~myInNetworkStreamProcs,
outStreamProcs~myOutStreamProcs,
outNetworkStreamProcs~myOutNetworkStreamProcs,
createListener~MyRegisteredCreateListenerProc,
listenerProcs~myListenerProcs,
clientData~NIL
] ];
NS.Register[myProtocolFamily, each.first, MyRegistrationCallbackProc];
ENDLOOP;
};
RegisterSelf[];
}.