DIRECTORY Basics, Finalize, IO, NetworkStream, NetworkStreamSupportTCP, Process, RefText, Rope, RuntimeError ; NetworkStreamTCPImpl: CEDAR MONITOR LOCKS d USING d: PrivateStreamData IMPORTS Basics, Finalize, IO, NetworkStream, NetworkStreamSupportTCP, Process, RefText, RuntimeError ~ { OPEN NS: NetworkStream, NSS: NetworkStreamSupportTCP; myProtocolFamily: ATOM ¬ $ARPA; myTransportClasses: LIST OF ATOM ¬ LIST[$TCP, $basicStream]; defaultInBufferLength: CARDINAL ¬ 2048; defaultOutBufferLength: CARDINAL ¬ 2048; CompletionCode: TYPE ~ NSS.CompletionCode; ROPE: TYPE ~ Rope.ROPE; STREAM: TYPE ~ IO.STREAM; BoundaryCondition: TYPE ~ { none, attention, endOfStream }; PrivateStreamData: TYPE ~ REF PrivateStreamDataRecord; PrivateStreamDataRecord: TYPE ~ MONITORED RECORD [ sd: NSS.StreamDescriptor, sibling: PrivateStreamData, streamIndex: INT ¬ 0, buffer: REF TEXT ¬ NIL, bufferIndex: CARDINAL ¬ 0, localClose: BOOL ¬ FALSE, boundaryCondition: BoundaryCondition ¬ none ]; NewPrivateStreamDataPair: PROC [sd: NSS.StreamDescriptor, inBufLen: CARDINAL, outBufLen: CARDINAL] RETURNS [dIn, dOut: PrivateStreamData] ~ { dIn ¬ NEW[ PrivateStreamDataRecord ¬ [sd~sd] ]; dOut ¬ NEW[ PrivateStreamDataRecord ¬ [sd~sd] ]; dIn.sibling ¬ dOut; dOut.sibling ¬ dIn; dIn.buffer ¬ RefText.New[inBufLen]; dOut.buffer ¬ RefText.New[outBufLen]; }; NetworkStreamDataFromStream: PROC [s: STREAM] RETURNS [NS.NetworkStreamData] ~ INLINE { RETURN [NARROW[s.streamData]] }; PrivateStreamDataFromStream: PROC [s: STREAM] RETURNS [PrivateStreamData] ~ INLINE { RETURN [NARROW[NARROW[s.streamData, NS.NetworkStreamData].data]] }; PrivateStreamDataFromNetworkStreamData: PROC [d: NS.NetworkStreamData] RETURNS [PrivateStreamData] ~ INLINE { RETURN [NARROW[d.data]] }; RaiseClosed: PROC [s: STREAM] ~ { NS.RaiseIOError[StreamClosed, s, LIST[$streamClosed], "stream closed"]; }; RaiseEndOf: PROC [s: STREAM] ~ { NS.RaiseIOError[Failure, s, LIST[$endOfStream], "end of stream"]; }; RaiseErrorFromCC: PROC [s: STREAM, cc: NSS.CompletionCode, codes: LIST OF ATOM, msg: ROPE] ~ { cca: ATOM; cca ¬ NSS.AtomFromCC[cc]; codes ¬ CONS[cca, codes]; IF (s = NIL) AND (cca # $timeout) THEN codes ¬ CONS[$networkStreamError, codes]; NS.RaiseIOError[Failure, s, codes, msg]; }; MyRegisteredCreateStreamsProc: NS.RegisteredCreateStreamsProc -- [registration, remote, timeout, transportParameters] RETURNS [in, out] -- ~ { sd: NSS.StreamDescriptor ¬ NIL; { ENABLE UNWIND => { IF sd # NIL THEN [] ¬ NSS.DestroyStreamDescriptor[sd, TRUE]; }; cc: NSS.CompletionCode; tRemote, tLocal: ROPE; nsdIn, nsdOut: NS.NetworkStreamData; [cc, sd] ¬ NSS.CreateStreamDescriptor[remote, timeout]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$connect], "can't connect"]; [cc, tLocal, tRemote] ¬ NSS.AddressesFromStreamDescriptor[sd]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$getStreamInfo], "can't get local/remote address"]; [in, out] ¬ NS.CreateUninitializedStreams[registration]; nsdIn ¬ NARROW[in.streamData]; nsdOut ¬ NARROW[out.streamData]; nsdIn.local ¬ nsdOut.local ¬ tLocal; nsdIn.remote ¬ nsdOut.remote ¬ tRemote; [dIn~nsdIn.data, dOut~nsdOut.data] ¬ NewPrivateStreamDataPair[sd, defaultInBufferLength, defaultOutBufferLength]; }; }; FillBuf: PROC [s: STREAM, d: PrivateStreamData] ~ { b: REF TEXT ¬ d.buffer; cc: NSS.CompletionCode; nBytes: INT; atAttn: BOOL; IF d.bufferIndex < b.length THEN ERROR; TRUSTED { [cc, nBytes] ¬ NSS.Receive[ d.sd, [ base~LOOPHOLE[ LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]] ], startIndex~0, count~b.maxLength ] ]; }; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$Read], "read error"]; IF nBytes < b.maxLength THEN { [cc, atAttn] ¬ NSS.AtIBAttn[d.sd]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$AtIBAttn], "read error (AtIBAttn)"]; d.boundaryCondition ¬ SELECT TRUE FROM atAttn => attention, nBytes = 0 => endOfStream, ENDCASE => none; } ELSE { d.boundaryCondition ¬ none; }; d.bufferIndex ¬ 0; b.length ¬ nBytes; }; FlushBuf: PROC [s: STREAM, d: PrivateStreamData] ~ { b: REF TEXT ¬ d.buffer; cc: NSS.CompletionCode; TRUSTED { cc ¬ NSS.Send[ d.sd, [ base~LOOPHOLE[ LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]] ], startIndex~d.bufferIndex, count~b.length-d.bufferIndex ] ]; }; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$Write], "write error"]; d.bufferIndex ¬ b.length ¬ 0; }; MyEntryGetChar: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [c: CHAR] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; i: CARDINAL; DO IF d.localClose THEN RaiseClosed[s]; IF (i ¬ d.bufferIndex) < b.length THEN EXIT; IF d.boundaryCondition # none THEN RaiseEndOf[s]; FillBuf[s, d]; ENDLOOP; c ¬ b[i]; d.bufferIndex ¬ i + 1; d.streamIndex ¬ d.streamIndex + 1; }; MyGetChar: PROC [self: STREAM] RETURNS [CHAR] ~ { RETURN [MyEntryGetChar[self, PrivateStreamDataFromStream[self]]]; }; MyEntryUnsafeGetBlock: ENTRY PROC [s: STREAM, d: PrivateStreamData, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT ¬ 0] ~ CHECKED { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; i: CARD; len: INT; nBytesMoreWanted: INT ¬ block.count; blockIndex: INT ¬ block.startIndex; DO IF d.localClose THEN RaiseClosed[s]; IF nBytesMoreWanted <= 0 THEN RETURN; i ¬ d.bufferIndex; len ¬ INT[b.length] - INT[i]; IF len <= 0 THEN { IF d.boundaryCondition # none THEN RETURN; FillBuf[s, d]; LOOP; }; IF len > nBytesMoreWanted THEN len ¬ nBytesMoreWanted; TRUSTED { len ¬ Basics.ByteBlt[ to~[ blockPointer~LOOPHOLE[block.base], startIndex~blockIndex, stopIndexPlusOne~blockIndex+len ], from~[ blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]], startIndex~i, stopIndexPlusOne~i+len ] ]; }; d.bufferIndex ¬ i + len; d.streamIndex ¬ d.streamIndex + len; nBytesRead ¬ nBytesRead + len; blockIndex ¬ blockIndex + len; nBytesMoreWanted ¬ nBytesMoreWanted - len; ENDLOOP; }; MyUnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault; RETURN [MyEntryUnsafeGetBlock[self, d, block]]; }; MyEntryEndOf: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [BOOL] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; IF d.localClose THEN RaiseClosed[s]; IF d.bufferIndex < b.length THEN RETURN [FALSE]; RETURN [d.boundaryCondition # none]; }; MyEndOf: PROC [self: STREAM] RETURNS [BOOL] ~ { RETURN [MyEntryEndOf[self, PrivateStreamDataFromStream[self]]]; }; MyEntryCharsAvail: ENTRY PROC [s: STREAM, d: PrivateStreamData, wait: BOOL] RETURNS [INT] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; nLeftInBuffer: INT; cc: NSS.CompletionCode; inputReady: BOOL; DO IF d.localClose THEN RaiseClosed[s]; nLeftInBuffer ¬ INT[b.length] - INT[d.bufferIndex]; IF nLeftInBuffer > 0 THEN RETURN [nLeftInBuffer]; IF d.boundaryCondition # none THEN RETURN [INT.LAST]; [cc, inputReady] ¬ NSS.InputReady[d.sd, FALSE]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CharsAvail], "can't test input ready"]; IF (NOT inputReady) AND (NOT wait) THEN RETURN[0]; FillBuf[s, d]; ENDLOOP; }; MyCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [INT] ~ { RETURN [MyEntryCharsAvail[self, PrivateStreamDataFromStream[self], wait]]; }; MyEntryPutChar: ENTRY PROC [s: STREAM, d: PrivateStreamData, char: CHAR] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; i: CARDINAL; DO IF d.localClose THEN RaiseClosed[s]; IF (i ¬ b.length) < b.maxLength THEN EXIT; FlushBuf[s, d]; ENDLOOP; d.streamIndex ¬ d.streamIndex + 1; b.length ¬ i + 1; b[i] ¬ char; }; MyPutChar: PROC [self: STREAM, char: CHAR] ~ { MyEntryPutChar[self, PrivateStreamDataFromStream[self], char]; }; MyEntryUnsafePutBlock: ENTRY PROC [s: STREAM, d: PrivateStreamData, block: IO.UnsafeBlock] ~ CHECKED { ENABLE UNWIND => NULL; b: REF TEXT ¬ d.buffer; i: CARD; len: INT; nBytesMoreToWrite: INT ¬ block.count; blockIndex: INT ¬ block.startIndex; DO IF d.localClose THEN RaiseClosed[s]; IF nBytesMoreToWrite <= 0 THEN RETURN; i ¬ b.length; len ¬ INT[b.maxLength] - INT[i]; IF len <= 0 THEN { FlushBuf[s, d]; LOOP; }; IF len > nBytesMoreToWrite THEN len ¬ nBytesMoreToWrite; TRUSTED { len ¬ Basics.ByteBlt[ to~[ blockPointer~LOOPHOLE[b, LONG POINTER]+SIZE[TEXT[0]], startIndex~i, stopIndexPlusOne~i+len ], from~[ blockPointer~LOOPHOLE[block.base], startIndex~blockIndex, stopIndexPlusOne~blockIndex+len ] ]; }; b.length ¬ i + len; d.streamIndex ¬ d.streamIndex + len; blockIndex ¬ blockIndex + len; nBytesMoreToWrite ¬ nBytesMoreToWrite - len; ENDLOOP; }; MyUnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault; MyEntryUnsafePutBlock[self, d, block]; }; MyEntryFlush: ENTRY PROC [s: STREAM, d: PrivateStreamData] ~ { ENABLE UNWIND => NULL; IF d.localClose THEN RaiseClosed[s]; IF d.buffer.length > 0 THEN FlushBuf[s, d]; }; MyFlush: PROC [self: STREAM] ~ { MyEntryFlush[self, PrivateStreamDataFromStream[self]]; }; MyEntryInputReset: ENTRY PROC [s: STREAM, d: PrivateStreamData] ~ { ENABLE UNWIND => NULL; n: CARDINAL; DO IF d.localClose THEN RETURN; IF (n ¬ (d.buffer.length - d.bufferIndex)) > 0 THEN { d.bufferIndex ¬ d.bufferIndex + n; d.streamIndex ¬ d.streamIndex + n; }; IF d.boundaryCondition # none THEN RETURN; FillBuf[s, d]; ENDLOOP; }; MyInputReset: PROC [self: STREAM] ~ { MyEntryInputReset[self, PrivateStreamDataFromStream[self]]; }; specialLockForMustDoClose: PrivateStreamData ¬ NEW[ PrivateStreamDataRecord ¬ []]; MustDoClose: ENTRY PROC [real<>: PrivateStreamData, d<>: PrivateStreamData ¬ specialLockForMustDoClose] RETURNS [doClose: BOOL] ~ { mySibling: PrivateStreamData; IF real.localClose THEN RETURN[FALSE]; real.localClose ¬ TRUE; IF (mySibling ¬ real.sibling) = NIL THEN ERROR; real.sibling ¬ NIL; RETURN[ mySibling.localClose ]; }; MyIOClose: PROC [self: STREAM, abort: BOOL] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; cc: NSS.CompletionCode; IF NOT MustDoClose[d] THEN RETURN; [cc] ¬ NSS.DestroyStreamDescriptor[d.sd, abort]; IF NSS.FailureCC[cc] AND (NOT abort) THEN RaiseErrorFromCC[self, cc, LIST[$Close], "I/O error in close"]; }; MyOutputClose: PROC [self: STREAM, abort: BOOL] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; IF d.localClose THEN RETURN; IF NOT abort THEN IO.Flush[self -- ! IO.Error, NS.Timeout => CONTINUE -- ]; MyIOClose[self, abort]; }; MyEntryGetIndex: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [INT] ~ { ENABLE UNWIND => NULL; IF d.localClose THEN RaiseClosed[s]; RETURN[d.streamIndex]; }; MyGetIndex: PROC [self: STREAM] RETURNS [INT] ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[self]; RETURN[MyEntryGetIndex[self, d]]; }; myInStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[ variety~input, class~$networkStream, getChar~MyGetChar, unsafeGetBlock~MyUnsafeGetBlock, endOf~MyEndOf, charsAvail~MyCharsAvail, reset~MyInputReset, close~MyIOClose, getIndex~MyGetIndex ]; myOutStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[ variety~output, class~$networkStream, putChar~MyPutChar, unsafePutBlock~MyUnsafePutBlock, flush~MyFlush, close~MyOutputClose, getIndex~MyGetIndex ]; MyEntrySetTimeout: ENTRY PROC [s: STREAM, nsd: NS.NetworkStreamData, d: PrivateStreamData, timeout: NS.Milliseconds, signalTimeout: BOOL, output: BOOL] ~ { ENABLE UNWIND => NULL; cc: NSS.CompletionCode; IF d.localClose THEN RaiseClosed[s]; nsd.timeout ¬ timeout; nsd.signalTimeout ¬ signalTimeout; IF output THEN cc ¬ NSS.SetPutTimeout[d.sd, timeout] ELSE cc ¬ NSS.SetGetTimeout[d.sd, timeout]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[s, cc, LIST[$SetTimeout], "can't set timeout"]; }; MyInputSetTimeout: NS.SetTimeoutProc -- [stream, timeout, signalTimeout] -- ~ { nsd: NS.NetworkStreamData ¬ NARROW[stream.streamData]; d: PrivateStreamData ¬ NARROW[nsd.data]; MyEntrySetTimeout[stream, nsd, d, timeout, signalTimeout, FALSE]; }; MyOutputSetTimeout: NS.SetTimeoutProc -- [stream, timeout, signalTimeout] -- ~ { nsd: NS.NetworkStreamData ¬ NARROW[stream.streamData]; d: PrivateStreamData ¬ NARROW[nsd.data]; MyEntrySetTimeout[stream, nsd, d, timeout, signalTimeout, TRUE]; }; MyEntryGetIndexDetails: ENTRY PROC [s: STREAM, d: PrivateStreamData] RETURNS [index, bytesInBuffer: INT] ~ { ENABLE UNWIND => NULL; IF d.localClose THEN RaiseClosed[s]; bytesInBuffer ¬ INT[d.buffer.length] - INT[d.bufferIndex]; index ¬ d.streamIndex; }; MyInputGetIndexDetails: NS.GetIndexDetailsProc -- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[stream]; bytesInBuffer: INT; [index~index, bytesInBuffer~bytesInBuffer] ¬ MyEntryGetIndexDetails[stream, d]; RETURN[ index, index+bytesInBuffer, -1 ]; }; MyOutputGetIndexDetails: NS.GetIndexDetailsProc -- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[stream]; bytesInBuffer: INT; [index~index, bytesInBuffer~bytesInBuffer] ¬ MyEntryGetIndexDetails[stream, d]; RETURN[ index, index-bytesInBuffer, -1 ]; }; MyEntryGetStreamState: ENTRY PROC [s: STREAM, d: PrivateStreamData, reset: BOOL] RETURNS [ss: NS.StreamState] ~ { ENABLE UNWIND => NULL; IF d.localClose THEN RaiseClosed[s]; IF d.buffer.length > d.bufferIndex THEN RETURN [ss~open]; SELECT d.boundaryCondition FROM none => ss ¬ open; attention => { ss ¬ attention; IF reset THEN d.boundaryCondition ¬ none }; endOfStream => { ss ¬ remoteClosed }; ENDCASE => ERROR; }; MyGetStreamState: NS.GetStreamStateProc -- [in, reset] RETURNS [streamState, subStreamType, attentionType] -- ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[in]; streamState ¬ MyEntryGetStreamState[in, d, reset]; subStreamType ¬ 0; attentionType ¬ 0; }; MyWaitAttention: NS.WaitAttentionProc -- [in, timeout] RETURNS [attentionType] -- ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[in]; cc: NSS.CompletionCode; at: NS.AttentionType; IF d.localClose THEN RaiseClosed[in]; [cc, at] ¬ NSS.WaitOOBAttn[d.sd, timeout]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[in, cc, LIST[$WaitAttn], "error in wait attention proc"]; RETURN [0]; }; MySendAttention: NS.SendAttentionProc -- [out, attentionType] -- ~ { d: PrivateStreamData ~ PrivateStreamDataFromStream[out]; cc: NSS.CompletionCode; IF d.localClose THEN RaiseClosed[out]; [cc] ¬ NSS.SendAttn[d.sd, attentionType]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[out, cc, LIST[$SendAttn], "error in send attention proc"]; }; myInNetworkStreamProcs: NS.NetworkStreamProcs ¬ NEW[ NS.NetworkStreamProcsRecord ¬ [ setTimeout~MyInputSetTimeout, getIndexDetails~MyInputGetIndexDetails, getStreamState~MyGetStreamState, waitAttention~MyWaitAttention ] ]; myOutNetworkStreamProcs: NS.NetworkStreamProcs ¬ NEW[ NS.NetworkStreamProcsRecord ¬ [ setTimeout~MyOutputSetTimeout, getIndexDetails~MyOutputGetIndexDetails, sendAttention~MySendAttention ] ]; PrivateListenerData: TYPE ~ REF PrivateListenerDataRecord; PrivateListenerDataRecord: TYPE ~ RECORD [ ld: NSS.ListenerDescriptor, daemon: PROCESS, destroyed: BOOL ¬ FALSE ]; ListenerDaemon: PROC [h: Finalize.Handle -- for listener -- ] ~ { ENABLE ABORTED => CONTINUE; listener: NS.Listener; d: PrivateListenerData; r: NS.Registration; cc: NSS.CompletionCode ¬ NSS.successCC; sd: NSS.StreamDescriptor ¬ NIL; tLocal, tRemote: ROPE; in, out: STREAM; nsdIn, nsdOut: NS.NetworkStreamData; CleanUp: PROC ~ { IF NSS.FailureCC[cc] THEN Process.PauseMsec[50]; -- reduce CPU load IF sd # NIL THEN [] ¬ NSS.DestroyStreamDescriptor[sd, TRUE]; sd ¬ NIL; }; listener ¬ NARROW[Finalize.HandleToObject[h]]; d ¬ NARROW[listener.data]; r ¬ listener.registration; listener ¬ NIL; { ENABLE UNWIND => CleanUp[]; DO CleanUp[]; [cc, sd] ¬ NSS.AcceptConnection[d.ld]; IF NSS.FailureCC[cc] THEN { sd ¬ NIL -- (unnecessary) --; LOOP }; [cc, tLocal, tRemote] ¬ NSS.AddressesFromStreamDescriptor[sd]; IF NSS.FailureCC[cc] THEN LOOP; [in, out] ¬ NS.CreateUninitializedStreams[r]; nsdIn ¬ NARROW[in.streamData]; nsdOut ¬ NARROW[out.streamData]; nsdIn.local ¬ nsdOut.local ¬ tLocal; nsdIn.remote ¬ nsdOut.remote ¬ tRemote; [dIn~nsdIn.data, dOut~nsdOut.data] ¬ NewPrivateStreamDataPair[sd, defaultInBufferLength, defaultOutBufferLength]; sd ¬ NIL; listener ¬ NARROW[Finalize.HandleToObject[h]]; TRUSTED { Process.Detach[ FORK listener.listenerWorkerProc[listener, in, out] ]; }; listener ¬ NIL; ENDLOOP; }; }; dummyListenerFQ: Finalize.FinalizationQueue ¬ Finalize.NewFQ[]; MyRegisteredCreateListenerProc: NS.RegisteredCreateListenerProc -- [registration, local, transportParameters, listenerWorkerProc, listenerWorkerClientData] RETURNS [listener] -- ~ { ld: NSS.ListenerDescriptor ¬ NIL; { ENABLE UNWIND => { IF ld # NIL THEN [] ¬ NSS.DestroyListenerDescriptor[ld]; }; d: PrivateListenerData; h: Finalize.Handle ¬ NIL; cc: NSS.CompletionCode; [cc, ld] ¬ NSS.CreateListenerDescriptor[local]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CreateListener], "can't create listener"]; [cc, local] ¬ NSS.AddressFromListenerDescriptor[ld]; IF NSS.FailureCC[cc] THEN RaiseErrorFromCC[NIL, cc, LIST[$CreateListener], "can't determine listener address"]; listener ¬ NS.CreateUninitializedListener[registration]; h ¬ Finalize.EnableFinalization[listener, dummyListenerFQ]; [] ¬ Finalize.DisableFinalization[h]; listener.local ¬ local; listener.listenerWorkerProc ¬ listenerWorkerProc; listener.listenerWorkerClientData ¬ listenerWorkerClientData; listener.data ¬ d ¬ NEW [PrivateListenerDataRecord]; d.ld ¬ ld; d.daemon ¬ FORK ListenerDaemon[h]; ld ¬ NIL; }; }; destroyListenerLock: PrivateStreamData ¬ NEW[PrivateStreamDataRecord ¬ [sd~NIL, sibling~NIL]]; TestAndSetListenerDestroyed: ENTRY PROC [pld: PrivateListenerData, d: PrivateStreamData ¬ destroyListenerLock] RETURNS [alreadyDestroyed: BOOL] ~ { ENABLE UNWIND => NULL; alreadyDestroyed ¬ pld.destroyed; pld.destroyed ¬ TRUE; }; MyDestroyListener: NS.DestroyListenerProc -- [listener] -- ~ { pld: PrivateListenerData ¬ NARROW[listener.data]; IF TestAndSetListenerDestroyed[pld].alreadyDestroyed THEN RETURN; [] ¬ NSS.DestroyListenerDescriptor[pld.ld]; TRUSTED { Process.Abort[pld.daemon]; JOIN pld.daemon; }; }; myListenerProcs: NS.ListenerProcs ¬ NEW[NS.ListenerProcsRecord ¬ [ destroyListener~MyDestroyListener ]]; EchoWorker: NetworkStream.ListenerWorkerProc -- [listener, in, out] -- ~ { ENABLE UNWIND => { IO.Close[in, TRUE]; IO.Close[out, TRUE] }; c: CHAR; DO c ¬ IO.GetChar[in ! IO.EndOfStream => EXIT]; IO.PutChar[out, c]; NS.SendSoon[out, 0]; ENDLOOP; IO.Close[in, TRUE]; IO.Close[out, TRUE]; }; StartEchoer: PROC [pf: ATOM, local: ROPE, tc: ATOM] RETURNS [listener: NetworkStream.Listener] ~ { listener ¬ NetworkStream.CreateListener[pf, local, tc, NIL, EchoWorker, NIL]; }; StartEchoerArpaBasicStream: PROC [local: ROPE] RETURNS [listener: NetworkStream.Listener] ~ { listener ¬ NetworkStream.CreateListener[$ARPA, local, $basicStream, NIL, EchoWorker, NIL]; }; StreamPair: TYPE ~ RECORD [ in, out: IO.STREAM ]; CreateStreamPairArpaBasicStream: PROC [remote: ROPE, timeoutMsec: CARD32] RETURNS [streamPair: REF StreamPair] ~ { streamPair ¬ NEW[StreamPair ¬ [NIL, NIL]]; [streamPair.in, streamPair.out] ¬ NetworkStream.CreateStreams[$ARPA, remote, $basicStream, timeoutMsec, NIL]; }; PutC: PROC [s: IO.STREAM, c: CHAR] ~ { IO.PutChar[s, c] }; FlushC: PROC [s: IO.STREAM] ~ { IO.Flush[s] }; GetC: PROC [s: IO.STREAM] RETURNS[c: CHAR] ~ { c ¬ IO.GetChar[s] }; RegisterSelf: PROC ~ { registrationForThisTransportClass: NS.Registration; MyRegistrationCallbackProc: NS.RegistrationCallbackProc -- [old] RETURNS [new] -- ~ { RETURN[registrationForThisTransportClass] }; FOR each: LIST OF ATOM ¬ myTransportClasses, each.rest WHILE each # NIL DO registrationForThisTransportClass ¬ NEW[NS.RegistrationRecord ¬ [ protocolFamily~myProtocolFamily, transportClass~each.first, createStreams~MyRegisteredCreateStreamsProc, inStreamProcs~myInStreamProcs, inNetworkStreamProcs~myInNetworkStreamProcs, outStreamProcs~myOutStreamProcs, outNetworkStreamProcs~myOutNetworkStreamProcs, createListener~MyRegisteredCreateListenerProc, listenerProcs~myListenerProcs, clientData~NIL ] ]; NS.Register[myProtocolFamily, each.first, MyRegistrationCallbackProc]; ENDLOOP; }; RegisterSelf[]; }. œ NetworkStreamTCPImpl.mesa Copyright Σ 1989, 1992 by Xerox Corporation. All rights reserved. Demers, September 20, 1989 8:16:49 am PDT Christian Jacobi, July 24, 1992 2:04 pm PDT Chauser, January 6, 1992 10:53 am PST Parameters Types Stream Implementation This doesn't need to (and better not) be an ENTRY proc! This doesn't need to be an ENTRY proc! Listener Implementation Debugging Stuff Registration Κΰ–(cedarcode) style•NewlineDelimiter ™code™Kšœ Οeœ7™BK™)K™+K™%—K˜šΟk ˜ K˜K˜ Kšžœ˜K˜Kšœ˜K˜K˜K˜K˜ K˜K˜—šΟnœžœž˜#Kšžœžœ˜"KšžœžœH˜dK˜Kšžœžœžœ˜5head™ Kšœžœ ˜Kš œžœžœžœžœ˜Kš žœžœžœžœžœ4˜lKšœ žœ*˜8Kšœžœ˜Kšœ žœ˜ K˜$K˜'K˜qK˜—K˜K˜—šŸœžœžœ˜3Kšœžœžœ ˜Kšœžœ˜Kšœžœ˜ Kšœžœ˜ Kšžœžœžœ˜'šžœ˜ šœžœ ˜Kšœ˜šœ˜Kš œžœžœžœžœžœ˜9K˜ Kšœ˜K˜—K˜—K˜—Kšžœžœžœžœ˜Mšžœ˜šžœ˜Kšœžœ˜"Kšžœžœžœžœ&˜\šœžœžœž˜&K˜Kšœ˜Kšžœ ˜—K˜—šžœ˜K˜K˜——K˜K˜K˜K˜—šŸœžœžœ˜4Kšœžœžœ ˜Kšœžœ˜šžœ˜ šœžœ˜Kšœ˜šœ˜Kš œžœžœžœžœžœžœ˜9K˜Kšœ˜K˜—K˜—K˜—Kšžœžœžœžœ˜OK˜K˜K˜—š Ÿœžœžœžœžœžœ˜RKšžœžœžœ˜Kšœžœžœ ˜Kšœžœ˜ šž˜Kšžœžœ˜$Kšžœ žœžœ˜,Kšžœžœ˜1K˜Kšžœ˜—K˜ K˜K˜"K˜K˜—š Ÿ œžœžœžœžœ˜1Kšžœ;˜AK˜K˜—šŸœžœžœžœžœžœžœžœ˜„Kšžœžœžœ˜Kšœžœžœ ˜Kšœžœ˜Kšœžœ˜ Kšœžœ˜$Kšœ žœ˜#šž˜Kšžœžœ˜$Kšžœžœžœ˜%K˜Kšœžœ žœ˜šžœ žœ˜Kšžœžœžœ˜*Kšœ˜Kšžœ˜Kšœ˜—Kšžœžœ˜6šžœ˜ ˜šœ˜Kšœ žœ ˜"Kšœ˜Kšœ˜K˜—˜Kš œ žœžœžœžœžœ˜5K˜ Kšœ˜K˜—Kšœ˜—K˜—K˜K˜$K˜K˜K˜*Kšžœ˜—K˜K˜—šŸœžœžœžœ žœžœžœ˜aKšœ9˜9KšΠkrΟr‘’‘’‘’˜OKšžœ)˜/K˜K˜—š Ÿ œžœžœžœžœžœ˜MKšžœžœžœ˜Kšœžœžœ ˜Kšžœžœ˜$Kšžœžœžœžœ˜0Kšžœ˜$K˜K˜—š Ÿœžœžœžœžœ˜/Kšžœ9˜?K˜K˜—šŸœžœžœžœžœžœžœ˜]Kšžœžœžœ˜Kšœžœžœ ˜Kšœžœ˜Kšœžœ˜Kšœ žœ˜šž˜Kšžœžœ˜$Kšœžœ žœ˜3Kšžœžœžœ˜1Kš žœžœžœžœžœ˜5Kšœžœžœ˜/Kš žœžœžœžœžœ)˜aKš žœžœ žœžœžœžœ˜2Kšœ˜Kšžœ˜—K˜K˜—š Ÿ œžœžœžœžœžœ˜?KšžœD˜JK˜K˜—šŸœž œžœžœ˜LKšžœžœžœ˜Kšœžœžœ ˜Kšœžœ˜ šž˜Kšžœžœ˜$Kšžœžœžœ˜*K˜Kšžœ˜—K˜"K˜K˜ K˜K˜—šŸ œžœžœžœ˜.Kšœ>˜>K˜K˜—š Ÿœžœžœžœžœžœ˜fKšžœžœžœ˜Kšœžœžœ ˜Kšœžœ˜Kšœžœ˜ Kšœžœ˜%Kšœ žœ˜#šž˜Kšžœžœ˜$Kšžœžœžœ˜&K˜ Kšœžœžœ˜ šžœ žœ˜Kšœ˜Kšžœ˜Kšœ˜—Kšžœžœ˜8šžœ˜ ˜˜Kš œ žœžœžœžœžœ˜5K˜ Kšœ˜K˜—šœ˜Kšœ žœ ˜"Kšœ˜Kšœ˜K˜—Kšœ˜—K˜—K˜K˜$K˜K˜,Kšžœ˜—K˜K˜—šŸœžœžœ žœ˜@Kšœ9˜9Kš‘’‘’‘’‘’˜OKšœ&˜&K˜K˜—šŸ œžœžœžœ˜>Kšžœžœžœ˜Kšžœžœ˜$Kšžœžœ˜+K˜K˜—šŸœžœžœ˜ Kšœ6˜6K˜K˜—šŸœž œžœ˜CKšžœžœžœ˜Kšœžœ˜ šž˜Kšžœžœžœ˜šžœ-žœ˜5K˜"K˜"K˜—Kšžœžœžœ˜*K˜Kšžœ˜—K˜K˜—šŸ œžœžœ˜%Kšœ;˜;K˜K˜—Kšœ/žœ ˜RšŸ œžœžœ œ œ1žœ žœ˜ŸKšœ˜Kšžœžœžœžœ˜&Kšœžœ˜Kšžœžœžœžœ˜/Kšœžœ˜Kšžœ˜K˜—šŸ œžœžœ žœ˜/Kšœ9˜9K˜Kšžœžœžœžœ˜"Kšœžœ&˜0Kš žœžœžœžœžœžœ ˜iK˜K˜—šŸ œžœžœ žœ˜3Kšœ9˜9Kšžœžœžœ˜šžœžœžœžœ ˜Kš (œ˜+—Kšœ˜K˜K˜—š Ÿœž œžœžœžœ˜OKšžœžœžœ˜Kšœžœ˜$Kšžœ˜K˜K˜—š Ÿ œžœžœžœžœ˜1Kšœ9˜9Kšžœ˜!K˜K˜—šœžœžœžœ˜;Kšœ˜Kšœ˜Kšœ˜K˜ K˜Kšœ˜K˜Kšœ˜Kšœ˜K˜K˜—šœžœžœžœ˜Kšžœžœžœžœ˜Kšœ žœ˜-Kšœžœ˜Kšœ žœ˜ K˜$K˜'K˜qKšœžœ˜ Kšœ žœ˜.šžœ˜ Kšœžœ2˜FK˜—Kšœ žœ˜Kšžœ˜—K˜—K˜K˜—K˜?K˜šŸœžœ qœ˜΅Kšœžœžœ˜!š œžœžœžœžœžœžœ"˜PK˜Kšœžœ˜Kšœžœ˜Kšœ žœ!˜/Kš žœžœžœžœžœ,˜dKšœžœ#˜4Kš žœžœžœžœžœ7˜oKšœ žœ+˜8K˜;K˜%K˜K˜1K˜=Kšœžœ˜4K˜ Kšœ žœ˜"Kšœžœ˜ K˜—K˜K˜—Kšœ)žœžœ žœ˜^K˜š ŸœžœžœHžœžœ˜“Kšžœžœžœ˜K˜!Kšœžœ˜K˜—K˜šŸœžœ œ˜>Kšœžœ˜1Kšžœ3žœžœ˜AKšœžœ#˜+šžœ˜ Kšœ˜Kšžœ ˜K˜—K˜K˜—šœžœžœžœ˜BKšœ!˜!K˜——™šŸ œ# œ˜JKš žœžœžœ žœžœ žœ˜=Kšœžœ˜šž˜Kšœžœžœžœ˜,Kšžœ˜Kšžœ˜Kšžœ˜—Kšžœ žœžœ žœ˜)K˜K˜—š Ÿ œžœžœ žœžœžœ'˜bKšœ7žœžœ˜MK˜K˜—šŸœžœ žœžœ'˜]KšœDžœžœ˜ZK˜K˜—š œ žœžœ žœžœ˜1K˜—š Ÿœžœ žœžœžœžœ˜rKšœ žœžœžœ˜*Kšœhžœ˜mK˜K˜—Kš Ÿœžœžœžœžœžœ˜:Kš Ÿœžœžœžœžœ ˜.KšŸœžœžœžœžœžœ žœ˜C—™ šŸ œžœ˜Kšœ#žœ˜3KšŸœžœ œžœ&˜‚š žœžœžœžœ!žœžœž˜Jšœ$žœžœ˜AKšœ ˜ Kšœ˜Kšœ,˜,Kšœ˜Kšœ,˜,Kšœ ˜ Kšœ.˜.Kšœ.˜.Kšœ˜Kšœ ž˜K˜—KšžœD˜FKšžœ˜—K˜K˜—Kšœ˜—K˜——…—Nςjn