<<>> <> <> <> <> <> <> <> <> DIRECTORY Basics, Finalize, IO, NetworkStream, Process, RefText, Rope ; NetworkStreamSPPOnBasicStreamImpl: CEDAR MONITOR LOCKS data USING data: PrivateStreamData IMPORTS Basics, Finalize, IO, NetworkStream, Process, RefText ~ { OPEN NS: NetworkStream; <> HWORD: TYPE ~ Basics.HWORD; ROPE: TYPE ~ Rope.ROPE; STREAM: TYPE ~ IO.STREAM; Milliseconds: TYPE ~ NetworkStream.Milliseconds; maxData: CARDINAL ~ 504; -- maxData+SIZE[DGramHeaderObject] = 512 <> myTransportClass: ATOM ¬ $SPP; transportClassUnderMe: ATOM ¬ $basicStream; protocolFamiliesUnderMe: LIST OF ATOM ¬ LIST[$ARPA]; outBufBytes: CARD ¬ 2048; <> <> <<>> <> <> <> <<};>> <<>> <> <<>> <> <<>> <> <> <> <<};>> <> <> LayeredHdr: TYPE ~ REF LayeredHdrObject; LayeredHdrObject: TYPE ~ MACHINE DEPENDENT RECORD [ length: HWORD, sst: HWORD, eom: HWORD, attn: HWORD ]; PrivateStreamData: TYPE ~ REF PrivateStreamDataObject; PrivateStreamDataObject: TYPE ~ MONITORED RECORD [ underStream: STREAM ¬ NIL, sst: NS.SubStreamType ¬ 0, hdrBytesProcessed: INT ¬ 0, inHdr: LayeredHdrObject ¬ [ [0,0], [0,0], [0,0], [0,0] ], inBytesLeft: INT ¬ 0, outBuf: REF TEXT ¬ NIL ]; myInStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[ variety~input, class~$networkStream, getChar~MyGetChar, unsafeGetBlock~MyUnsafeGetBlock, endOf~MyEndOf, charsAvail~MyCharsAvail, reset~MyInReset, close~MyClose, getIndex~MyGetIndex ]; myOutStreamProcs: REF IO.StreamProcs ¬ IO.CreateStreamProcs[ variety~output, class~$networkStream, putChar~MyPutChar, unsafePutBlock~MyUnsafePutBlock, flush~MyFlush, close~MyClose, getIndex~MyGetIndex ]; PrivateStreamDataFromStream: PROC [s: STREAM] RETURNS [PrivateStreamData] ~ INLINE { RETURN [NARROW[NARROW[s.streamData, NS.NetworkStreamData].data]] }; StreamPairFromUnderStreamPair: PROC [registration: NS.Registration, uIn: STREAM, uOut: STREAM] RETURNS [in: STREAM, out: STREAM] ~ { nsdIn, nsdOut: NS.NetworkStreamData; outBuf: REF TEXT; [in, out] ¬ NS.CreateUninitializedStreams[registration]; nsdIn ¬ NARROW[in.streamData]; [local~nsdIn.local, remote~nsdIn.remote] ¬ NS.GetStreamInfo[uIn]; nsdIn.data ¬ NEW[PrivateStreamDataObject ¬ [underStream~uIn]]; nsdOut ¬ NARROW[out.streamData]; [local~nsdOut.local, remote~nsdOut.remote] ¬ NS.GetStreamInfo[uOut]; outBuf ¬ RefText.New[outBufBytes]; nsdOut.data ¬ NEW[PrivateStreamDataObject ¬ [underStream~uOut, outBuf~outBuf]]; }; MyRegisteredCreateStreamsProc: NS.RegisteredCreateStreamsProc -- [registration, remote, timeout, transportParameters] RETURNS [in, out] -- ~ { tIn, tOut: STREAM; nsdIn, nsdOut: NS.NetworkStreamData; [tIn, tOut] ¬ NS.CreateStreams[registration.protocolFamily, remote, transportClassUnderMe, timeout, transportParameters]; -- this might fail, raise Error unwinding me ??? [in, out] ¬ StreamPairFromUnderStreamPair[registration, tIn, tOut]; }; Push: INTERNAL PROC [data: PrivateStreamData, self: STREAM, sendIfNoData: BOOL, sendEOM: BOOL] ~ { b: REF TEXT ¬ data.outBuf; IF (b.length # 0) OR sendIfNoData OR sendEOM THEN { SendHeader[data, self, b.length, sendEOM]; IF b.length > 0 THEN IO.PutBlock[data.underStream, b, 0, b.length]; b.length ¬ 0; }; }; SendHeader: INTERNAL PROC [data: PrivateStreamData, self: STREAM, nBytes: CARDINAL, sendEOM: BOOL] ~ { outHdr: LayeredHdrObject; outHdr.length ¬ Basics.HFromCard16[nBytes]; outHdr.sst ¬ [hi~0, lo~data.sst]; outHdr.eom ¬ [hi~0, lo~IF sendEOM THEN 1 ELSE 0]; outHdr.attn ¬ [0, 0]; TRUSTED { IO.UnsafePutBlock[ data.underStream, [base~LOOPHOLE[@outHdr], startIndex~0, count~BYTES[LayeredHdrObject]] ] }; data.hdrBytesProcessed ¬ data.hdrBytesProcessed + BYTES[LayeredHdrObject]; }; ReadHeader: INTERNAL PROC [data: PrivateStreamData, self: STREAM] ~ { n: INT; TRUSTED { n ¬ IO.UnsafeGetBlock[ data.underStream, [base~LOOPHOLE[@data.inHdr], startIndex~0, count~BYTES[LayeredHdrObject]] ]; }; IF n # BYTES[LayeredHdrObject] THEN NS.RaiseIOError[Failure, self, LIST[$hdrErr], "hdr end of stream"]; data.hdrBytesProcessed ¬ data.hdrBytesProcessed + BYTES[LayeredHdrObject]; <> SELECT Basics.Card16FromH[data.inHdr.eom] FROM 0, 1 => NULL; ENDCASE => NS.RaiseIOError[Failure, self, LIST[$hdrErr], "hdr eom err"]; SELECT Basics.Card16FromH[data.inHdr.attn] FROM 0, 1 => NULL; ENDCASE => NS.RaiseIOError[Failure, self, LIST[$hdrErr], "hdr attn err"]; SELECT Basics.Card16FromH[data.inHdr.sst] FROM < 256 => NULL; ENDCASE => NS.RaiseIOError[Failure, self, LIST[$hdrErr], "hdr sst err"]; data.inBytesLeft ¬ Basics.Card16FromH[data.inHdr.length]; }; MyEntryGetChar: ENTRY PROC [data: PrivateStreamData, self: STREAM] RETURNS [c: CHAR] ~ { ENABLE UNWIND => NULL; DO IF data.inHdr.sst.lo # data.sst THEN ERROR IO.EndOfStream[self]; IF data.inBytesLeft > 0 THEN EXIT; IF data.inHdr.eom # [0, 0] THEN ERROR IO.EndOfStream[self]; ReadHeader[data, self]; -- sets data.inBytesLeft ENDLOOP; c ¬ IO.GetChar[data.underStream]; data.inBytesLeft ¬ data.inBytesLeft.PRED; }; MyGetChar: PROC [self: STREAM] RETURNS [c: CHAR] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; c ¬ MyEntryGetChar[data, self]; }; MyEntryEndOf: ENTRY PROC [data: PrivateStreamData, self: STREAM] RETURNS [BOOL] ~ { ENABLE UNWIND => NULL; IF data.inHdr.sst.lo # data.sst THEN RETURN[TRUE]; IF data.inBytesLeft > 0 THEN RETURN[FALSE]; IF data.inHdr.eom # [0, 0] THEN RETURN[TRUE]; RETURN[ IO.EndOf[data.underStream] ]; }; MyEndOf: PROC [self: STREAM] RETURNS [BOOL] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; RETURN[ MyEntryEndOf[data, self] ]; }; MyEntryCharsAvail: ENTRY PROC [data: PrivateStreamData, self: STREAM, wait: BOOL] RETURNS [charsAvail: INT] ~ { ENABLE UNWIND => NULL; DO IF data.inHdr.sst.lo # data.sst THEN RETURN[INT.LAST]; IF (data.inBytesLeft = 0) AND (data.inHdr.eom # [0, 0]) THEN RETURN[INT.LAST]; charsAvail ¬ IO.CharsAvail[data.underStream, wait]; IF IO.EndOf[data.underStream] THEN RETURN[INT.LAST]; IF data.inBytesLeft > 0 THEN RETURN[ MIN[charsAvail, data.inBytesLeft] ]; IF (charsAvail < BYTES[LayeredHdrObject]) AND (NOT wait) THEN RETURN[0]; ReadHeader[data, self]; ENDLOOP; }; MyCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [charsAvail: INT] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; RETURN[ MyEntryCharsAvail[data, self, wait] ]; }; MyEntryUnsafeGetBlock: ENTRY UNSAFE PROC [data: PrivateStreamData, self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT ¬ 0] ~ { ENABLE UNWIND => NULL; nGot: INT; count: INT ¬ block.count; WHILE count > 0 DO <> IF data.inHdr.sst.lo # data.sst THEN EXIT; -- substreamType change IF data.inBytesLeft = 0 THEN { IF data.inHdr.eom # [0, 0] THEN EXIT; -- endOfMessage IF IO.EndOf[data.underStream] THEN EXIT; -- endOfStream ReadHeader[data, self]; LOOP; }; <> block.count ¬ MIN[count, data.inBytesLeft]; TRUSTED { nGot ¬ IO.UnsafeGetBlock[data.underStream, block]; }; IF nGot = 0 THEN EXIT; -- race? data.inBytesLeft ¬ data.inBytesLeft - nGot; <> nBytesRead ¬ nBytesRead + nGot; block.startIndex ¬ block.startIndex + nGot; count ¬ count - nGot; ENDLOOP; }; MyUnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; TRUSTED { nBytesRead ¬ MyEntryUnsafeGetBlock[data, self, block] }; RETURN; }; MyEntryPutChar: ENTRY PROC [data: PrivateStreamData, self: STREAM, char: CHAR] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ data.outBuf; IF b.length >= maxData THEN Push[data, self, FALSE, FALSE]; b[b.length] ¬ char; b.length ¬ b.length.SUCC; }; MyPutChar: PROC [self: STREAM, char: CHAR] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; MyEntryPutChar[data, self, char]; }; MyEntryUnsafePutBlock: ENTRY PROC [data: PrivateStreamData, self: STREAM, block: IO.UnsafeBlock] ~ { ENABLE UNWIND => NULL; b: REF TEXT ¬ data.outBuf; PushB: PROC ~ { nToMove: INT ¬ MIN[block.count, (maxData - b.length)]; IF nToMove = 0 THEN RETURN; TRUSTED { startIndex: INT ¬ BYTES[TEXT[0]] + b.length; [] ¬ Basics.ByteBlt[ to~[ blockPointer~LOOPHOLE[b], startIndex~startIndex, stopIndexPlusOne~(startIndex + nToMove)], from~[ blockPointer~block.base, startIndex~block.startIndex, stopIndexPlusOne~(block.startIndex + nToMove)] ]; }; b.length ¬ b.length + nToMove; block.startIndex ¬ block.startIndex + nToMove; block.count ¬ block.count - nToMove; }; PushB[]; IF block.count = 0 THEN RETURN; Push[data, self, FALSE, FALSE]; WHILE block.count > maxData DO SendHeader[data, self, maxData, FALSE]; TRUSTED { IO.UnsafePutBlock[data.underStream, [block.base, block.startIndex, maxData]]; }; block.startIndex ¬ block.startIndex + maxData; block.count ¬ block.count - maxData; ENDLOOP; PushB[]; }; MyUnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; MyEntryUnsafePutBlock[data, self, block]; }; MyEntryFlush: ENTRY PROC [data: PrivateStreamData, self: STREAM] ~ { ENABLE UNWIND => NULL; Push[data, self, FALSE, FALSE]; }; MyFlush: PROC [self: STREAM] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; MyEntryFlush[data, self]; IO.Flush[data.underStream]; }; MyEntryInReset: ENTRY PROC [data: PrivateStreamData, self: STREAM] ~ { ENABLE UNWIND => NULL; resetBufBytes: INT ~ 1024; resetBuf: REF TEXT ¬ RefText.ObtainScratch[resetBufBytes]; nGot: INT; { ENABLE UNWIND => RefText.ReleaseScratch[resetBuf]; DO IF data.inHdr.sst.lo # data.sst THEN EXIT; IF data.inBytesLeft = 0 THEN { IF data.inHdr.eom # [0, 0] THEN EXIT; IF IO.EndOf[data.underStream] THEN EXIT; ReadHeader[data, self]; LOOP; }; nGot ¬ IO.GetBlock[data.underStream, resetBuf, 0, data.inBytesLeft]; data.inBytesLeft ¬ data.inBytesLeft - nGot; ENDLOOP; }; RefText.ReleaseScratch[resetBuf]; }; MyInReset: PROC [self: STREAM] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; MyEntryInReset[data, self]; }; MyClose: PROC [self: STREAM, abort: BOOL] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; IO.Close[data.underStream, abort]; }; MyEntryGetIndex: ENTRY PROC [data: PrivateStreamData, self: STREAM] RETURNS [INT] ~ { ENABLE UNWIND => NULL; RETURN[ IO.GetIndex[data.underStream] - data.hdrBytesProcessed ]; }; MyGetIndex: PROC [self: STREAM] RETURNS [INT] ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[self]; RETURN[ MyEntryGetIndex[data, self] ]; }; myInNetworkStreamProcs: NS.NetworkStreamProcs ¬ NEW[ NS.NetworkStreamProcsRecord ¬ [ setTimeout~MySetTimeout, sendSoon~MySendSoon, getIndexDetails~MyInGetIndexDetails, getStreamState~MyGetStreamState <> <> ] ]; myOutNetworkStreamProcs: NS.NetworkStreamProcs ¬ NEW[ NS.NetworkStreamProcsRecord ¬ [ setTimeout~MySetTimeout, sendSoon~MySendSoon, getIndexDetails~MyOutGetIndexDetails, setSubStreamType~MySetSubStreamType, sendEndOfMessage~MySendEndOfMessage <> ] ]; MyGetTimeout: NS.GetTimeoutProc -- [stream] RETURNS [timeout, signalTimeout] -- ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[stream]; [timeout, signalTimeout] ¬ NS.GetTimeout[data.underStream]; }; MySetTimeout: NS.SetTimeoutProc -- [stream, timeout, signalTimeout] -- ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[stream]; NS.SetTimeout[data.underStream, timeout, signalTimeout]; }; MyEntrySendSoon: ENTRY PROC [data: PrivateStreamData, out: STREAM] ~ { ENABLE UNWIND => NULL; Push[data, out, FALSE, FALSE]; }; MySendSoon: NS.SendSoonProc -- [out, when] -- ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[out]; MyEntrySendSoon[data, out]; NS.SendSoon[data.underStream, when]; }; MyEntryGetIndexDetails: ENTRY PROC [data: PrivateStreamData, stream: STREAM, output: BOOL] RETURNS [index: INT, bufferIndex: INT, ackIndex: INT] ~ { ENABLE UNWIND => NULL; [index, bufferIndex, ackIndex] ¬ NS.GetIndexDetails[data.underStream]; index ¬ index - data.hdrBytesProcessed; IF output THEN index ¬ index + data.outBuf.length; <> bufferIndex ¬ bufferIndex - data.hdrBytesProcessed; IF ackIndex # (-1) THEN ackIndex ¬ ackIndex - data.hdrBytesProcessed; }; MyInGetIndexDetails: NS.GetIndexDetailsProc -- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[stream]; [index, bufferIndex, ackIndex] ¬ MyEntryGetIndexDetails[data, stream, FALSE]; }; MyOutGetIndexDetails: NS.GetIndexDetailsProc -- [stream] RETURNS [index, bufferIndex, ackIndex] -- ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[stream]; [index, bufferIndex, ackIndex] ¬ MyEntryGetIndexDetails[data, stream, TRUE]; }; MyEntryGetStreamState: ENTRY PROC [data: PrivateStreamData, in: STREAM, reset: BOOL] RETURNS [streamState: NS.StreamState, subStreamType: NS.SubStreamType, attentionType: NS.AttentionType] ~ { ENABLE UNWIND => NULL; subStreamType ¬ data.sst; attentionType ¬ 0; DO IF data.inHdr.sst.lo # data.sst THEN { IF reset THEN data.sst ¬ data.inHdr.sst.lo; streamState ¬ subStreamTypeChange; subStreamType ¬ data.inHdr.sst.lo; RETURN; }; IF data.inBytesLeft > 0 THEN { streamState ¬ open; RETURN; }; IF data.inHdr.eom # [0, 0] THEN { IF reset THEN data.inHdr.eom ¬ [0, 0]; streamState ¬ endOfMessage; RETURN; }; IF IO.EndOf[data.underStream] THEN { streamState ¬ remoteClosed; RETURN; }; IF (IO.CharsAvail[data.underStream, FALSE] < BYTES[LayeredHdrObject]) THEN { streamState ¬ open; RETURN; }; ReadHeader[data, in]; ENDLOOP; }; MyGetStreamState: NS.GetStreamStateProc -- [in, reset] RETURNS [streamState, subStreamType, attentionType] -- ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[in]; [streamState, subStreamType, attentionType] ¬ MyEntryGetStreamState[data, in, reset]; }; MyEntrySetSubStreamType: ENTRY PROC [data: PrivateStreamData, out: STREAM, subStreamType: NS.SubStreamType] ~ { ENABLE UNWIND => NULL; IF subStreamType # data.sst THEN { Push[data~data, self~out, sendIfNoData~TRUE, sendEOM~FALSE]; data.sst ¬ subStreamType; NS.SendSoon[data.underStream, 0]; }; }; MySetSubStreamType: NS.SetSubStreamTypeProc -- [out, subStreamType] -- ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[out]; IF subStreamType # data.sst THEN MyEntrySetSubStreamType[data, out, subStreamType]; }; MyEntrySendEndOfMessage: ENTRY PROC [data: PrivateStreamData, out: STREAM] ~ { ENABLE UNWIND => NULL; Push[data~data, self~out, sendIfNoData~TRUE, sendEOM~TRUE]; NS.SendSoon[data.underStream, 0]; }; MySendEndOfMessage: NS.SendEndOfMessageProc -- [out] -- ~ { data: PrivateStreamData ¬ PrivateStreamDataFromStream[out]; MyEntrySendEndOfMessage[data, out]; }; <> myListenerProcs: NS.ListenerProcs ¬ NEW[NS.ListenerProcsRecord ¬ [ getListenerInfo~NIL, destroyListener~MyDestroyListener ]]; MyDestroyListener: NS.DestroyListenerProc -- [listener] -- ~ { underListener: NS.Listener ¬ NARROW[listener.data]; NS.DestroyListener[underListener]; }; UnderListenerWorkerClientData: TYPE ~ REF UnderListenerWorkerClientDataObject; UnderListenerWorkerClientDataObject: TYPE ~ RECORD [ overListenerHandle: Finalize.Handle ]; MyUnderListenerWorkerProc: NS.ListenerWorkerProc -- [listener, in, out] -- ~ { underListenerWorkerClientData: UnderListenerWorkerClientData; overListener: NS.Listener; newIn, newOut: STREAM; underListenerWorkerClientData ¬ NARROW[listener.listenerWorkerClientData]; overListener ¬ NARROW[ Finalize.HandleToObject[underListenerWorkerClientData.overListenerHandle] ]; WHILE overListener.data = NIL DO Process.PauseMsec[50] ENDLOOP; -- race with CreateListener IF overListener.listenerWorkerProc = NIL THEN RETURN; [newIn, newOut] ¬ StreamPairFromUnderStreamPair[overListener.registration, in, out]; overListener.listenerWorkerProc[overListener, newIn, newOut]; }; dummyListenerFQ: Finalize.FinalizationQueue ¬ Finalize.NewFQ[]; MyRegisteredCreateListenerProc: NS.RegisteredCreateListenerProc -- [registration, local, transportParameters, listenerWorkerProc, listenerWorkerClientData] RETURNS [listener] -- ~ { underListener: NS.Listener ¬ NIL; disguisedListener: UnderListenerWorkerClientData; listener ¬ NS.CreateUninitializedListener[registration]; disguisedListener ¬ NEW[UnderListenerWorkerClientDataObject ¬ [NIL] ]; disguisedListener.overListenerHandle ¬ Finalize.EnableFinalization[listener, dummyListenerFQ]; [] ¬ Finalize.DisableFinalization[disguisedListener.overListenerHandle]; underListener ¬ NS.CreateListener[registration.protocolFamily, local, transportClassUnderMe, NIL, MyUnderListenerWorkerProc, disguisedListener]; listener.local ¬ NS.GetListenerInfo[underListener].local; listener.listenerWorkerProc ¬ listenerWorkerProc; listener.listenerWorkerClientData ¬ listenerWorkerClientData; listener.data ¬ underListener; }; <> <> RegisterSelf: PROC ~ { thisRegistration: NS.Registration; MyRegistrationCallbackProc: NS.RegistrationCallbackProc -- [old] RETURNS [new] -- ~ { RETURN[thisRegistration] }; FOR each: LIST OF ATOM ¬ protocolFamiliesUnderMe, each.rest WHILE each # NIL DO thisRegistration ¬ NEW[NS.RegistrationRecord ¬ [ protocolFamily~each.first, transportClass~myTransportClass, createStreams~MyRegisteredCreateStreamsProc, inStreamProcs~myInStreamProcs, inNetworkStreamProcs~myInNetworkStreamProcs, outStreamProcs~myOutStreamProcs, outNetworkStreamProcs~myOutNetworkStreamProcs, createListener~MyRegisteredCreateListenerProc, listenerProcs~myListenerProcs, clientData~NIL ] ]; NS.Register[each.first, myTransportClass, MyRegistrationCallbackProc]; ENDLOOP; }; RegisterSelf[]; }.