<<>> <> <> <> <> <> DIRECTORY Basics, Finalize, IO, NetworkStream, NetworkStreamSupportTCP, Process, RefText, Rope, RuntimeError ; NetworkStreamTCPImpl: CEDAR MONITOR LOCKS d USING d: PrivateStreamData IMPORTS Basics, Finalize, IO, NetworkStream, NetworkStreamSupportTCP, Process, RefText, RuntimeError ~ { OPEN NS: NetworkStream, NSS: NetworkStreamSupportTCP; <> myProtocolFamily: ATOM ¬ $ARPA; myTransportClasses: LIST OF ATOM ¬ LIST[$TCP, $basicStream]; defaultInBufferLength: CARDINAL ¬ 2048; defaultOutBufferLength: CARDINAL ¬ 2048; <> CompletionCode: TYPE ~ NSS.CompletionCode; ROPE: TYPE ~ Rope.ROPE; STREAM: TYPE ~ IO.STREAM; <> BoundaryCondition: TYPE ~ { none, attention, endOfStream }; PrivateStreamData: TYPE ~ REF PrivateStreamDataRecord; PrivateStreamDataRecord: TYPE ~ MONITORED RECORD [ sd: NSS.StreamDescriptor, sibling: PrivateStreamData, streamIndex: INT ¬ 0, buffer: REF TEXT ¬ NIL, bufferIndex: CARDINAL ¬ 0, localClose: BOOL ¬ FALSE, boundaryCondition: BoundaryCondition ¬ none ]; NewPrivateStreamDataPair: PROC [sd: NSS.StreamDescriptor, inBufLen: CARDINAL, outBufLen: CARDINAL] RETURNS [dIn, dOut: PrivateStreamData] ~ { dIn ¬ NEW[ PrivateStreamDataRecord ¬ [sd~sd] ]; dOut ¬ NEW[ PrivateStreamDataRecord ¬ [sd~sd] ]; dIn.sibling ¬ dOut; dOut.sibling ¬ dIn; dIn.buffer ¬ RefText.New[inBufLen]; dOut.buffer ¬ RefText.New[outBufLen]; }; NetworkStreamDataFromStream: PROC [s: STREAM] RETURNS [NS.NetworkStreamData] ~ INLINE { RETURN [NARROW[s.streamData]] }; PrivateStreamDataFromStream: PROC [s: STREAM] RETURNS [PrivateStreamData] ~ INLINE { RETURN [NARROW[NARROW[s.streamData, NS.NetworkStreamData].data]] }; PrivateStreamDataFromNetworkStreamData: PROC [d: NS.NetworkStreamData] RETURNS [PrivateStreamData] ~ INLINE { RETURN [NARROW[d.data]] }; RaiseClosed: PROC [s: STREAM] ~ { NS.RaiseIOError[StreamClosed, s, LIST[$streamClosed], "stream closed"]; }; RaiseEndOf: PROC [s: STREAM] ~ { NS.RaiseIOError[Failure, s, LIST[$endOfStream], "end of stream"]; }; RaiseErrorFromCC: PROC [s: STREAM, cc: NSS.CompletionCode, codes: LIST OF ATOM, msg: ROPE] ~ { cca: ATOM; cca ¬ NSS.AtomFromCC[cc]; codes ¬ CONS[cca, codes]; IF (s = NIL) AND (cca # $timeout) THEN codes ¬ CONS[$networkStreamError, codes]; NS.RaiseIOError[Failure, s, codes, msg]; }; MyRegisteredCreateStreamsProc: NS.RegisteredCreateStreamsProc -- [registration, remote, timeout, transportParameters] RETURNS [in, out] -- ~ { sd: NSS.StreamDescriptor ¬ NIL; { ENABLE UNWIND => { IF sd # NIL THEN [] ¬ NSS.DestroyStreamDescriptor[sd, TRUE]; }; cc: NSS.CompletionCode; tRemote, tLocal: ROPE; nsdIn, nsdOut: NS.NetworkStreamData; [cc, sd] ¬ NSS.CreateStreamDescriptor[remote, timeout]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$connect], "can't connect"]; [cc, tLocal, tRemote] ¬ NSS.AddressesFromStreamDescriptor[sd]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$getStreamInfo], "can't get local/remote address"]; [in, out] ¬ NS.CreateUninitializedStreams[registration]; nsdIn ¬ NARROW[in.streamData]; nsdOut ¬ NARROW[out.streamData]; nsdIn.local ¬ nsdOut.local ¬ tLocal; nsdIn.remote ¬ nsdOut.remote ¬ tRemote; [dIn~nsdIn.data, dOut~nsdOut.data] ¬ NewPrivateStreamDataPair[sd, defaultInBufferLength, defaultOutBufferLength]; }; }; FillBuf: PROC [s: STREAM, d: PrivateStreamData] ~ { b: REF TEXT ¬ d.buffer; cc: NSS.CompletionCode; nBytes: INT; atAttn: BOOL; IF d.bufferIndex < b.length THEN ERROR; TRUSTED { [cc, nBytes] ¬ NSS.Receive[ d.sd, [ base~LOOPHOLE[ LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]] ], startIndex~0, count~b.maxLength ] ]; }; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$Read], "read error"]; IF nBytes < b.maxLength THEN { [cc, atAttn] ¬ NSS.AtIBAttn[d.sd]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$AtIBAttn], "read error (AtIBAttn)"]; d.boundaryCondition ¬ SELECT TRUE FROM atAttn => attention, nBytes = 0 => endOfStream, ENDCASE => none; } ELSE { d.boundaryCondition ¬ none; }; d.bufferIndex ¬ 0; b.length ¬ nBytes; }; FlushBuf: PROC [s: STREAM, d: PrivateStreamData] ~ { b: REF TEXT ¬ d.buffer; cc: NSS.CompletionCode; TRUSTED { cc ¬ NSS.Send[ d.sd, [ base~LOOPHOLE[ LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]] ], startIndex~d.bufferIndex, count~b.length-d.bufferIndex ] ]; }; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$Write], "write error"]; d.bufferIndex ¬ b.length ¬ 0; }; MyEntryGetChar: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [c: CHAR] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; i: CARDINAL; DO IF d.localClose THEN RaiseClosed[s]; IF (i ¬ d.bufferIndex) < b.length THEN EXIT; IF d.boundaryCondition # none THEN RaiseEndOf[s]; FillBuf[s, d]; ENDLOOP; c ¬ b[i]; d.bufferIndex ¬ i + 1; d.streamIndex ¬ d.streamIndex + 1; }; MyGetChar: PROC [self: STREAM] RETURNS [CHAR] ~ { RETURN [MyEntryGetChar[self, PrivateStreamDataFromStream[self]]]; }; MyEntryUnsafeGetBlock: ENTRY PROC [s: STREAM, d: PrivateStreamData, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT ¬ 0] ~ CHECKED { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; i: CARD; len: INT; nBytesMoreWanted: INT ¬ block.count; blockIndex: INT ¬ block.startIndex; DO IF d.localClose THEN RaiseClosed[s]; IF nBytesMoreWanted <= 0 THEN RETURN; i ¬ d.bufferIndex; len ¬ INT[b.length] - INT[i]; IF len <= 0 THEN { IF d.boundaryCondition # none THEN RETURN; FillBuf[s, d]; LOOP; }; IF len > nBytesMoreWanted THEN len ¬ nBytesMoreWanted; TRUSTED { len ¬ Basics.ByteBlt[ to~[ blockPointer~LOOPHOLE[block.base], startIndex~blockIndex, stopIndexPlusOne~blockIndex+len ], from~[ blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]], startIndex~i, stopIndexPlusOne~i+len ] ]; }; d.bufferIndex ¬ i + len; d.streamIndex ¬ d.streamIndex + len; nBytesRead ¬ nBytesRead + len; blockIndex ¬ blockIndex + len; nBytesMoreWanted ¬ nBytesMoreWanted - len; ENDLOOP; }; MyUnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault; RETURN [MyEntryUnsafeGetBlock[self, d, block]]; }; MyEntryEndOf: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [BOOL] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; IF d.localClose THEN RaiseClosed[s]; IF d.bufferIndex < b.length THEN RETURN [FALSE]; RETURN [d.boundaryCondition # none]; }; MyEndOf: PROC [self: STREAM] RETURNS [BOOL] ~ { RETURN [MyEntryEndOf[self, PrivateStreamDataFromStream[self]]]; }; MyEntryCharsAvail: ENTRY PROC [s: STREAM, d: PrivateStreamData, wait: BOOL] RETURNS [INT] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; nLeftInBuffer: INT; cc: NSS.CompletionCode; inputReady: BOOL; DO IF d.localClose THEN RaiseClosed[s]; nLeftInBuffer ¬ INT[b.length] - INT[d.bufferIndex]; IF nLeftInBuffer > 0 THEN RETURN [nLeftInBuffer]; IF d.boundaryCondition # none THEN RETURN [INT.LAST]; [cc, inputReady] ¬ NSS.InputReady[d.sd, FALSE]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CharsAvail], "can't test input ready"]; IF (NOT inputReady) AND (NOT wait) THEN RETURN[0]; FillBuf[s, d]; ENDLOOP; }; MyCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [INT] ~ { RETURN [MyEntryCharsAvail[self, PrivateStreamDataFromStream[self], wait]]; }; MyEntryPutChar: ENTRY PROC [s: STREAM, d: PrivateStreamData, char: CHAR] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; i: CARDINAL; DO IF d.localClose THEN RaiseClosed[s]; IF (i ¬ b.length) < b.maxLength THEN EXIT; FlushBuf[s, d]; ENDLOOP; d.streamIndex ¬ d.streamIndex + 1; b.length ¬ i + 1; b[i] ¬ char; }; MyPutChar: PROC [self: STREAM, char: CHAR] ~ { MyEntryPutChar[self, PrivateStreamDataFromStream[self], char]; }; MyEntryUnsafePutBlock: ENTRY PROC [s: STREAM, d: PrivateStreamData, block: IO.UnsafeBlock] ~ CHECKED { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; i: CARD; len: INT; nBytesMoreToWrite: INT ¬ block.count; blockIndex: INT ¬ block.startIndex; DO IF d.localClose THEN RaiseClosed[s]; IF nBytesMoreToWrite <= 0 THEN RETURN; i ¬ b.length; len ¬ INT[b.maxLength] - INT[i]; IF len <= 0 THEN { FlushBuf[s, d]; LOOP; }; IF len > nBytesMoreToWrite THEN len ¬ nBytesMoreToWrite; TRUSTED { len ¬ Basics.ByteBlt[ to~[ blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]], startIndex~i, stopIndexPlusOne~i+len ], from~[ blockPointer~LOOPHOLE[block.base], startIndex~blockIndex, stopIndexPlusOne~blockIndex+len ] ]; }; b.length ¬ i + len; d.streamIndex ¬ d.streamIndex + len; blockIndex ¬ blockIndex + len; nBytesMoreToWrite ¬ nBytesMoreToWrite - len; ENDLOOP; }; MyUnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault; MyEntryUnsafePutBlock[self, d, block]; }; MyEntryFlush: ENTRY PROC [s: STREAM, d: PrivateStreamData] ~ { ENABLE UNWIND => NULL; IF d.localClose THEN RaiseClosed[s]; IF d.buffer.length > 0 THEN FlushBuf[s, d]; }; MyFlush: PROC [self: STREAM] ~ { MyEntryFlush[self, PrivateStreamDataFromStream[self]]; }; MyEntryInputReset: ENTRY PROC [s: STREAM, d: PrivateStreamData] ~ { ENABLE UNWIND => NULL; n: CARDINAL; DO IF d.localClose THEN RETURN; IF (n ¬ (d.buffer.length - d.bufferIndex)) > 0 THEN { d.bufferIndex ¬ d.bufferIndex + n; d.streamIndex ¬ d.streamIndex + n; }; IF d.boundaryCondition # none THEN RETURN; FillBuf[s, d]; ENDLOOP; }; MyInputReset: PROC [self: STREAM] ~ { MyEntryInputReset[self, PrivateStreamDataFromStream[self]]; }; specialLockForMustDoClose: PrivateStreamData ¬ NEW[ PrivateStreamDataRecord ¬ []]; MustDoClose: ENTRY PROC [real<>: PrivateStreamData, d<>: PrivateStreamData ¬ specialLockForMustDoClose] RETURNS [doClose: BOOL] ~ { mySibling: PrivateStreamData; IF real.localClose THEN RETURN[FALSE]; real.localClose ¬ TRUE; IF (mySibling ¬ real.sibling) = NIL THEN ERROR; real.sibling ¬ NIL; RETURN[ mySibling.localClose ]; }; MyIOClose: PROC [self: STREAM, abort: BOOL] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; cc: NSS.CompletionCode; IF NOT MustDoClose[d] THEN RETURN; [cc] ¬ NSS.DestroyStreamDescriptor[d.sd, abort]; IF NSS.FailureCC[cc] AND (NOT abort) THEN RaiseErrorFromCC[self, cc, LIST[$Close], "I/O error in close"]; }; MyOutputClose: PROC [self: STREAM, abort: BOOL] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; IF d.localClose THEN RETURN; IF NOT abort THEN IO.Flush[self -- ! IO.Error, NS.Timeout => CONTINUE -- ]; MyIOClose[self, abort]; }; MyEntryGetIndex: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [INT] ~ { ENABLE UNWIND => NULL; IF d.localClose THEN RaiseClosed[s]; RETURN[d.streamIndex]; }; MyGetIndex: PROC [self: STREAM] RETURNS [INT] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; RETURN[MyEntryGetIndex[self, d]]; }; myInStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[ variety~input, class~$networkStream, getChar~MyGetChar, unsafeGetBlock~MyUnsafeGetBlock, endOf~MyEndOf, charsAvail~MyCharsAvail, reset~MyInputReset, close~MyIOClose, getIndex~MyGetIndex ]; myOutStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[ variety~output, class~$networkStream, putChar~MyPutChar, unsafePutBlock~MyUnsafePutBlock, flush~MyFlush, close~MyOutputClose, getIndex~MyGetIndex ]; MyEntrySetTimeout: ENTRY PROC [s: STREAM, nsd: NS.NetworkStreamData, d: PrivateStreamData, timeout: NS.Milliseconds, signalTimeout: BOOL, output: BOOL] ~ { ENABLE UNWIND => NULL; cc: NSS.CompletionCode; IF d.localClose THEN RaiseClosed[s]; nsd.timeout ¬ timeout; nsd.signalTimeout ¬ signalTimeout; IF output THEN cc ¬ NSS.SetPutTimeout[d.sd, timeout] ELSE cc ¬ NSS.SetGetTimeout[d.sd, timeout]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$SetTimeout], "can't set timeout"]; }; MyInputSetTimeout: NS.SetTimeoutProc -- [stream, timeout, signalTimeout] -- ~ { nsd: NS.NetworkStreamData ¬ NARROW[stream.streamData]; d: PrivateStreamData ¬ NARROW[nsd.data]; MyEntrySetTimeout[stream, nsd, d, timeout, signalTimeout, FALSE]; }; MyOutputSetTimeout: NS.SetTimeoutProc -- [stream, timeout, signalTimeout] -- ~ { nsd: NS.NetworkStreamData ¬ NARROW[stream.streamData]; d: PrivateStreamData ¬ NARROW[nsd.data]; MyEntrySetTimeout[stream, nsd, d, timeout, signalTimeout, TRUE]; }; MyEntryGetIndexDetails: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [index, bytesInBuffer: INT] ~ { ENABLE UNWIND => NULL; IF d.localClose THEN RaiseClosed[s]; bytesInBuffer ¬ INT[d.buffer.length] - INT[d.bufferIndex]; index ¬ d.streamIndex; }; MyInputGetIndexDetails: NS.GetIndexDetailsProc -- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[stream]; bytesInBuffer: INT; [index~index, bytesInBuffer~bytesInBuffer] ¬ MyEntryGetIndexDetails[stream, d]; RETURN[ index, index+bytesInBuffer, -1 ]; }; MyOutputGetIndexDetails: NS.GetIndexDetailsProc -- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[stream]; bytesInBuffer: INT; [index~index, bytesInBuffer~bytesInBuffer] ¬ MyEntryGetIndexDetails[stream, d]; RETURN[ index, index-bytesInBuffer, -1 ]; }; MyEntryGetStreamState: ENTRY PROC [s: STREAM, d: PrivateStreamData, reset: BOOL] RETURNS [ss: NS.StreamState] ~ { ENABLE UNWIND => NULL; IF d.localClose THEN RaiseClosed[s]; IF d.buffer.length > d.bufferIndex THEN RETURN [ss~open]; SELECT d.boundaryCondition FROM none => ss ¬ open; attention => { ss ¬ attention; IF reset THEN d.boundaryCondition ¬ none }; endOfStream => { ss ¬ remoteClosed }; ENDCASE => ERROR; }; MyGetStreamState: NS.GetStreamStateProc -- [in, reset] RETURNS [streamState, subStreamType, attentionType] -- ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[in]; streamState ¬ MyEntryGetStreamState[in, d, reset]; subStreamType ¬ 0; attentionType ¬ 0; }; MyWaitAttention: NS.WaitAttentionProc -- [in, timeout] RETURNS [attentionType] -- ~ { <> d: PrivateStreamData ~ PrivateStreamDataFromStream[in]; cc: NSS.CompletionCode; at: NS.AttentionType; IF d.localClose THEN RaiseClosed[in]; [cc, at] ¬ NSS.WaitOOBAttn[d.sd, timeout]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[in, cc, LIST[$WaitAttn], "error in wait attention proc"]; RETURN [0]; }; MySendAttention: NS.SendAttentionProc -- [out, attentionType] -- ~ { <> d: PrivateStreamData ~ PrivateStreamDataFromStream[out]; cc: NSS.CompletionCode; IF d.localClose THEN RaiseClosed[out]; [cc] ¬ NSS.SendAttn[d.sd, attentionType]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[out, cc, LIST[$SendAttn], "error in send attention proc"]; }; myInNetworkStreamProcs: NS.NetworkStreamProcs ¬ NEW[ NS.NetworkStreamProcsRecord ¬ [ setTimeout~MyInputSetTimeout, getIndexDetails~MyInputGetIndexDetails, getStreamState~MyGetStreamState, waitAttention~MyWaitAttention ] ]; myOutNetworkStreamProcs: NS.NetworkStreamProcs ¬ NEW[ NS.NetworkStreamProcsRecord ¬ [ setTimeout~MyOutputSetTimeout, getIndexDetails~MyOutputGetIndexDetails, sendAttention~MySendAttention ] ]; <> PrivateListenerData: TYPE ~ REF PrivateListenerDataRecord; PrivateListenerDataRecord: TYPE ~ RECORD [ ld: NSS.ListenerDescriptor, daemon: PROCESS, destroyed: BOOL ¬ FALSE ]; ListenerDaemon: PROC [h: Finalize.Handle -- for listener -- ] ~ { ENABLE ABORTED => CONTINUE; listener: NS.Listener; d: PrivateListenerData; r: NS.Registration; cc: NSS.CompletionCode ¬ NSS.successCC; sd: NSS.StreamDescriptor ¬ NIL; tLocal, tRemote: ROPE; in, out: STREAM; nsdIn, nsdOut: NS.NetworkStreamData; CleanUp: PROC ~ { IF NSS.FailureCC[cc] THEN Process.PauseMsec[50]; -- reduce CPU load IF sd # NIL THEN [] ¬ NSS.DestroyStreamDescriptor[sd, TRUE]; sd ¬ NIL; }; listener ¬ NARROW[Finalize.HandleToObject[h]]; d ¬ NARROW[listener.data]; r ¬ listener.registration; listener ¬ NIL; { ENABLE UNWIND => CleanUp[]; DO CleanUp[]; [cc, sd] ¬ NSS.AcceptConnection[d.ld]; IF NSS.FailureCC[cc] THEN { sd ¬ NIL -- (unnecessary) --; LOOP }; [cc, tLocal, tRemote] ¬ NSS.AddressesFromStreamDescriptor[sd]; IF NSS.FailureCC[cc] THEN LOOP; [in, out] ¬ NS.CreateUninitializedStreams[r]; nsdIn ¬ NARROW[in.streamData]; nsdOut ¬ NARROW[out.streamData]; nsdIn.local ¬ nsdOut.local ¬ tLocal; nsdIn.remote ¬ nsdOut.remote ¬ tRemote; [dIn~nsdIn.data, dOut~nsdOut.data] ¬ NewPrivateStreamDataPair[sd, defaultInBufferLength, defaultOutBufferLength]; sd ¬ NIL; listener ¬ NARROW[Finalize.HandleToObject[h]]; TRUSTED { Process.Detach[ FORK listener.listenerWorkerProc[listener, in, out] ]; }; listener ¬ NIL; ENDLOOP; }; }; dummyListenerFQ: Finalize.FinalizationQueue ¬ Finalize.NewFQ[]; MyRegisteredCreateListenerProc: NS.RegisteredCreateListenerProc -- [registration, local, transportParameters, listenerWorkerProc, listenerWorkerClientData] RETURNS [listener] -- ~ { ld: NSS.ListenerDescriptor ¬ NIL; { ENABLE UNWIND => { IF ld # NIL THEN [] ¬ NSS.DestroyListenerDescriptor[ld]; }; d: PrivateListenerData; h: Finalize.Handle ¬ NIL; cc: NSS.CompletionCode; [cc, ld] ¬ NSS.CreateListenerDescriptor[local]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CreateListener], "can't create listener"]; [cc, local] ¬ NSS.AddressFromListenerDescriptor[ld]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CreateListener], "can't determine listener address"]; listener ¬ NS.CreateUninitializedListener[registration]; h ¬ Finalize.EnableFinalization[listener, dummyListenerFQ]; [] ¬ Finalize.DisableFinalization[h]; listener.local ¬ local; listener.listenerWorkerProc ¬ listenerWorkerProc; listener.listenerWorkerClientData ¬ listenerWorkerClientData; listener.data ¬ d ¬ NEW [PrivateListenerDataRecord]; d.ld ¬ ld; d.daemon ¬ FORK ListenerDaemon[h]; ld ¬ NIL; }; }; destroyListenerLock: PrivateStreamData ¬ NEW[PrivateStreamDataRecord ¬ [sd~NIL, sibling~NIL]]; TestAndSetListenerDestroyed: ENTRY PROC [pld: PrivateListenerData, d: PrivateStreamData ¬ destroyListenerLock] RETURNS [alreadyDestroyed: BOOL] ~ { ENABLE UNWIND => NULL; alreadyDestroyed ¬ pld.destroyed; pld.destroyed ¬ TRUE; }; MyDestroyListener: NS.DestroyListenerProc -- [listener] -- ~ { pld: PrivateListenerData ¬ NARROW[listener.data]; IF TestAndSetListenerDestroyed[pld].alreadyDestroyed THEN RETURN; [] ¬ NSS.DestroyListenerDescriptor[pld.ld]; TRUSTED { Process.Abort[pld.daemon]; JOIN pld.daemon; }; }; myListenerProcs: NS.ListenerProcs ¬ NEW[NS.ListenerProcsRecord ¬ [ destroyListener~MyDestroyListener ]]; <> EchoWorker: NetworkStream.ListenerWorkerProc -- [listener, in, out] -- ~ { ENABLE UNWIND => { IO.Close[in, TRUE]; IO.Close[out, TRUE] }; c: CHAR; DO c ¬ IO.GetChar[in ! IO.EndOfStream => EXIT]; IO.PutChar[out, c]; NS.SendSoon[out, 0]; ENDLOOP; IO.Close[in, TRUE]; IO.Close[out, TRUE]; }; StartEchoer: PROC [pf: ATOM, local: ROPE, tc: ATOM] RETURNS [listener: NetworkStream.Listener] ~ { listener ¬ NetworkStream.CreateListener[pf, local, tc, NIL, EchoWorker, NIL]; }; StartEchoerArpaBasicStream: PROC [local: ROPE] RETURNS [listener: NetworkStream.Listener] ~ { listener ¬ NetworkStream.CreateListener[$ARPA, local, $basicStream, NIL, EchoWorker, NIL]; }; StreamPair: TYPE ~ RECORD [ in, out: IO.STREAM ]; CreateStreamPairArpaBasicStream: PROC [remote: ROPE, timeoutMsec: CARD32] RETURNS [streamPair: REF StreamPair] ~ { streamPair ¬ NEW[StreamPair ¬ [NIL, NIL]]; [streamPair.in, streamPair.out] ¬ NetworkStream.CreateStreams[$ARPA, remote, $basicStream, timeoutMsec, NIL]; }; PutC: PROC [s: IO.STREAM, c: CHAR] ~ { IO.PutChar[s, c] }; FlushC: PROC [s: IO.STREAM] ~ { IO.Flush[s] }; GetC: PROC [s: IO.STREAM] RETURNS[c: CHAR] ~ { c ¬ IO.GetChar[s] }; <> RegisterSelf: PROC ~ { registrationForThisTransportClass: NS.Registration; MyRegistrationCallbackProc: NS.RegistrationCallbackProc -- [old] RETURNS [new] -- ~ { RETURN[registrationForThisTransportClass] }; FOR each: LIST OF ATOM ¬ myTransportClasses, each.rest WHILE each # NIL DO registrationForThisTransportClass ¬ NEW[NS.RegistrationRecord ¬ [ protocolFamily~myProtocolFamily, transportClass~each.first, createStreams~MyRegisteredCreateStreamsProc, inStreamProcs~myInStreamProcs, inNetworkStreamProcs~myInNetworkStreamProcs, outStreamProcs~myOutStreamProcs, outNetworkStreamProcs~myOutNetworkStreamProcs, createListener~MyRegisteredCreateListenerProc, listenerProcs~myListenerProcs, clientData~NIL ] ]; NS.Register[myProtocolFamily, each.first, MyRegistrationCallbackProc]; ENDLOOP; }; RegisterSelf[]; }.