DIRECTORY CommTimer, Convert, -- debug only CStrings, FinalizeOps, IO, NetworkStream, Process, RefText, -- debug only Rope, UXStrings -- debug only ; NetworkStreamImpl: CEDAR MONITOR IMPORTS CommTimer, Convert, FinalizeOps, IO, NetworkStream, Process, RefText, UXStrings EXPORTS NetworkStream ~ { ROPE: TYPE ~ Rope.ROPE; STREAM: TYPE ~ IO.STREAM; sendSoonGrainSizeMsec: INT ¬ 50; sendSoonExpectedWaitMsec: INT ¬ 2000; Error: PUBLIC ERROR [which: REF, codes: LIST OF ATOM, msg: ROPE] ~ CODE; Timeout: PUBLIC SIGNAL [which: REF, codes: LIST OF ATOM, msg: ROPE] ¬ CODE; Self: PROC [] RETURNS [PROCESS] ~ TRUSTED INLINE { RETURN [ LOOPHOLE[ Process.GetCurrent[] ] ] }; ErrorDetails: TYPE ~ REF ErrorDetailsRecord; ErrorDetailsRecord: TYPE ~ RECORD [ next: ErrorDetails, process: PROCESS, codes: LIST OF ATOM, msg: ROPE ]; nilStreamData: NetworkStream.NetworkStreamData ¬ NEW[ NetworkStream.NetworkStreamDataRecord ¬ [ registration~NIL, procs~NIL ] ]; AddErrorDetails: ENTRY PROC [d: NetworkStream.NetworkStreamData, codes: LIST OF ATOM, msg: ROPE] ~ { IF d = NIL THEN RETURN; d.pendingErrors ¬ NEW[ErrorDetailsRecord ¬ [ next~NARROW[d.pendingErrors], process~Self[], codes~codes, msg~msg ]]; }; DeleteErrorDetails: ENTRY PROC [d: NetworkStream.NetworkStreamData] ~ { p, prev: ErrorDetails ¬ NIL; self: PROCESS; IF d = NIL THEN RETURN; self ¬ Self[]; p ¬ NARROW[d.pendingErrors]; DO IF p = NIL THEN RETURN; IF p.process = self THEN EXIT; prev ¬ p; p ¬ p.next; ENDLOOP; IF prev = NIL THEN d.pendingErrors ¬ p.next ELSE prev.next ¬ p.next; p.next ¬ NIL; -- helps GC }; LookupErrorDetails: ENTRY PROC [d: NetworkStream.NetworkStreamData] RETURNS [codes: LIST OF ATOM, msg: ROPE] ~ { self: PROCESS; p: ErrorDetails; IF Process.GetCurrent[] # Process.GetCurrent[] THEN ERROR; IF d = NIL THEN RETURN [codes~NIL, msg~NIL]; self ¬ Self[]; p ¬ NARROW[d.pendingErrors]; DO IF p = NIL THEN RETURN [codes~NIL, msg~NIL]; IF p.process = self THEN RETURN [codes~p.codes, msg~p.msg]; p ¬ p.next; ENDLOOP; }; GetIOErrorDetails: PUBLIC PROC [which: IO.STREAM] RETURNS [codes: LIST OF ATOM, msg: ROPE] ~ { d: NetworkStream.NetworkStreamData; d ¬ IF which # NIL THEN NARROW[which.streamData] ELSE nilStreamData; [codes, msg] ¬ LookupErrorDetails[d]; }; RaiseIOError: PUBLIC PROC [ec: IO.ErrorCode, stream: STREAM, codes: LIST OF ATOM, msg: ROPE] ~ { d: NetworkStream.NetworkStreamData; d ¬ IF stream # NIL THEN NARROW[stream.streamData] ELSE nilStreamData; { ENABLE UNWIND => { DeleteErrorDetails[d] }; AddErrorDetails[d, codes, msg]; SELECT ec FROM Failure => { IF codes = NIL THEN ERROR IO.Error[ec, stream]; SELECT codes.first FROM $timeout => { IF NOT d.signalTimeout THEN ERROR IO.Error[ec, stream]; SIGNAL Timeout[stream, codes, msg] }; $endOfStream => { ERROR IO.EndOfStream[stream]; }; $networkStreamError => { ERROR Error[stream, codes.rest, msg]; }; ENDCASE => { ERROR IO.Error[ec, stream]; }; }; ENDCASE => { ERROR IO.Error[ec, stream]; }; }; DeleteErrorDetails[d]; }; registrations: LIST OF NetworkStream.Registration ¬ NIL; Register: PUBLIC PROC [ protocolFamily: ATOM, transportClass: ATOM, proc: NetworkStream.RegistrationCallbackProc ] ~ { prev: LIST OF NetworkStream.Registration; each: LIST OF NetworkStream.Registration; rNew: NetworkStream.Registration; LookupForRegister: ENTRY PROC ~ { ENABLE UNWIND => NULL; prev ¬ NIL; each ¬ registrations; DO IF each = NIL THEN EXIT; IF (each.first.protocolFamily = protocolFamily) AND (each.first.transportClass = transportClass) THEN EXIT; prev ¬ each; each ¬ each.rest; ENDLOOP; }; IF protocolFamily = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $protocolFamily], "missing protocol family"]; IF transportClass = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $transportClass], "missing transport class"]; LookupForRegister[]; SELECT TRUE FROM (each = NIL ) => { rNew ¬ proc[NIL]; IF rNew # NIL THEN registrations ¬ CONS[ CompleteRegistration[protocolFamily, transportClass, rNew], registrations]; }; (each.first.protocolFamily = protocolFamily) AND (each.first.transportClass = transportClass) => { rNew ¬ proc[NIL]; SELECT TRUE FROM rNew = each.first => NULL; rNew # NIL => each.first ¬ CompleteRegistration[protocolFamily, transportClass, rNew]; prev = NIL => registrations ¬ each.rest; ENDCASE => prev.rest ¬ each.rest; }; ENDCASE => ERROR; }; CompleteRegistration: PROC [family: ATOM, class: ATOM, r: NetworkStream.Registration] RETURNS [NetworkStream.Registration] ~ { insp: NetworkStream.NetworkStreamProcs ¬ r.inNetworkStreamProcs; onsp: NetworkStream.NetworkStreamProcs ¬ r.outNetworkStreamProcs; lp: NetworkStream.ListenerProcs ¬ r.listenerProcs; IF r.protocolFamily # family THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $protocolFamily], "bad protocol family"]; IF r.transportClass # class THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $transportClass], "bad transport class"]; IF (r.inStreamProcs = NIL) OR (insp = NIL) THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $inputProcs], "bad input procs"]; IF (r.outStreamProcs = NIL) OR (onsp = NIL) THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $outputProcs], "bad output procs"]; IF (r.createListener # NIL) AND (lp = NIL) THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $listenerProcs], "bad listener procs"]; IF (r.createStreams = NIL) AND (r.createListener = NIL) THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $create], "no create proc for streams or listener"]; IF r.createStreams = NIL THEN r.createStreams ¬ DefaultRegisteredCreateStreamsProc; IF insp.getStreamInfo = NIL THEN insp.getStreamInfo ¬ DefaultGetStreamInfoProc; IF insp.getTimeout = NIL THEN insp.getTimeout ¬ DefaultGetTimeoutProc; IF insp.setTimeout = NIL THEN insp.setTimeout ¬ DefaultSetTimeoutProc; IF insp.sendSoon = NIL THEN insp.sendSoon ¬ InputSendSoonProc; IF insp.getIndexDetails = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $getIndexDetails], "no input GetIndexDetails proc"]; IF insp.getStreamState = NIL THEN insp.getStreamState ¬ DefaultGetStreamStateProc; IF insp.setSubStreamType = NIL THEN insp.setSubStreamType ¬ InputSetSubStreamTypeProc; IF insp.sendEndOfMessage = NIL THEN insp.sendEndOfMessage ¬ InputSendEndOfMessageProc; IF insp.waitAttention = NIL THEN insp.waitAttention ¬ DefaultWaitAttentionProc; IF insp.sendAttention = NIL THEN insp.sendAttention ¬ InputSendAttentionProc; IF insp.waitAck = NIL THEN insp.waitAck ¬ InputWaitAckProc; IF insp.finalizeStream = NIL THEN insp.finalizeStream ¬ DefaultFinalizeStreamProc; IF onsp.getStreamInfo = NIL THEN onsp.getStreamInfo ¬ DefaultGetStreamInfoProc; IF onsp.getTimeout = NIL THEN onsp.getTimeout ¬ DefaultGetTimeoutProc; IF onsp.setTimeout = NIL THEN onsp.setTimeout ¬ DefaultSetTimeoutProc; IF onsp.sendSoon = NIL THEN onsp.sendSoon ¬ DefaultSendSoonProc; IF onsp.getIndexDetails = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $getIndexDetails], "no output GetIndexDetails proc"]; IF onsp.getStreamState = NIL THEN onsp.getStreamState ¬ OutputGetStreamStateProc; IF onsp.setSubStreamType = NIL THEN onsp.setSubStreamType ¬ DefaultSetSubStreamTypeProc; IF onsp.sendEndOfMessage = NIL THEN onsp.sendEndOfMessage ¬ DefaultSendEndOfMessageProc; IF onsp.waitAttention = NIL THEN onsp.waitAttention ¬ OutputWaitAttentionProc; IF onsp.sendAttention = NIL THEN onsp.sendAttention ¬ DefaultSendAttentionProc; IF onsp.waitAck = NIL THEN onsp.waitAck ¬ DefaultWaitAckProc; IF onsp.finalizeStream = NIL THEN onsp.finalizeStream ¬ DefaultFinalizeStreamProc; IF r.createListener = NIL THEN r.createListener ¬ DefaultRegisteredCreateListenerProc; IF lp # NIL THEN { IF lp.getListenerInfo = NIL THEN lp.getListenerInfo ¬ DefaultGetListenerInfoProc; IF lp.destroyListener = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $destroyListener], "bad destroyListener proc"]; IF lp.finalizeListener = NIL THEN lp.finalizeListener ¬ DefaultFinalizeListenerProc; }; RETURN [r]; }; Enumerate: PUBLIC ENTRY PROC [ families: ATOM, classes: ATOM, proc: NetworkStream.EnumerateCallbackProc ] ~ { FOR each: LIST OF NetworkStream.Registration ¬ registrations, each.rest WHILE each # NIL DO IF (families # NIL) AND (families # each.first.protocolFamily) THEN LOOP; IF (classes # NIL) AND (classes # each.first.transportClass) THEN LOOP; IF NOT proc[each.first.protocolFamily, each.first.transportClass] THEN EXIT; ENDLOOP; }; LookupRegistration: ENTRY PROC [protocolFamily: ATOM, transportClass: ATOM] RETURNS [r: NetworkStream.Registration] ~ { FOR each: LIST OF NetworkStream.Registration ¬ registrations, each.rest WHILE each # NIL DO IF (protocolFamily # NIL) AND (protocolFamily # each.first.protocolFamily) THEN LOOP; IF (transportClass # NIL) AND (transportClass # each.first.transportClass) THEN LOOP; RETURN [each.first]; ENDLOOP; RETURN [NIL]; }; CreateUninitializedListener: PUBLIC PROC [registration: NetworkStream.Registration] RETURNS [listener: NetworkStream.Listener] ~ { listener ¬ NEW[NetworkStream.ListenerRecord ¬ [ registration~registration, procs~registration.listenerProcs ]]; }; CreateListener: PUBLIC PROC [protocolFamily: ATOM, local: ROPE, transportClass: ATOM, transportParameters: REF, listenerWorkerProc: NetworkStream.ListenerWorkerProc, listenerWorkerClientData: REF] RETURNS [listener: NetworkStream.Listener] ~ { reg: NetworkStream.Registration; reg ¬ LookupRegistration[protocolFamily, transportClass]; IF reg = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $notRegistered], "specified transport not registered"]; listener ¬ reg.createListener[reg, local, transportParameters, listenerWorkerProc, listenerWorkerClientData]; [] ¬ FinalizeOps.EnableFinalization[listener, theFQ]; }; DefaultRegisteredCreateListenerProc: NetworkStream.RegisteredCreateListenerProc ~ { RaiseIOError[Failure, NIL, LIST[$networkStreamError, $InitListener, $notImplemented], "listeners not implemented"]; }; DefaultGetListenerInfoProc: NetworkStream.GetListenerInfoProc -- [listener] RETURNS [protocolFamily, local, transportClass, proc, clientData] -- ~ { r: NetworkStream.Registration ¬ listener.registration; protocolFamily ¬ r.protocolFamily; local ¬ listener.local; transportClass ¬ r.transportClass; proc ¬ listener.listenerWorkerProc; clientData ¬ listener.listenerWorkerClientData; }; DefaultFinalizeListenerProc: NetworkStream.FinalizeListenerProc ~ { ENABLE IO.Error, IO.EndOfStream, Error, Timeout => CONTINUE; NetworkStream.DestroyListener[listener]; }; CreateUninitializedStreams: PUBLIC PROC [registration: NetworkStream.Registration] RETURNS [in: IO.STREAM, out: IO.STREAM] ~ { inNetworkStreamData: NetworkStream.NetworkStreamData; outNetworkStreamData: NetworkStream.NetworkStreamData; inNetworkStreamData ¬ NEW[NetworkStream.NetworkStreamDataRecord ¬ [registration~registration, procs~registration.inNetworkStreamProcs]]; in ¬ IO.CreateStream[streamProcs~registration.inStreamProcs, streamData~inNetworkStreamData]; outNetworkStreamData ¬ NEW[NetworkStream.NetworkStreamDataRecord ¬ [registration~registration, procs~registration.outNetworkStreamProcs]]; out ¬ IO.CreateStream[streamProcs~registration.outStreamProcs, streamData~outNetworkStreamData]; }; CreateStreams: PUBLIC PROC [protocolFamily: ATOM, remote: ROPE, transportClass: ATOM, timeout: NetworkStream.Milliseconds, transportParameters: REF] RETURNS [in: IO.STREAM, out: IO.STREAM] ~ { reg: NetworkStream.Registration; reg ¬ LookupRegistration[protocolFamily, transportClass]; IF reg = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $notRegistered], "specified transport not registered"]; [in, out] ¬ reg.createStreams[reg, remote, timeout, transportParameters]; [] ¬ FinalizeOps.EnableFinalization[in, theFQ]; [] ¬ FinalizeOps.EnableFinalization[out, theFQ]; }; DefaultRegisteredCreateStreamsProc: NetworkStream.RegisteredCreateStreamsProc ~ { RaiseIOError[Failure, NIL, LIST[$networkStreamError, $CreateStreams, $notImplemented], "connect not implemented"]; }; DefaultGetStreamInfoProc: NetworkStream.GetStreamInfoProc ~ { d: NetworkStream.NetworkStreamData ¬ NARROW[stream.streamData]; RETURN [protocolFamily~d.registration.protocolFamily, local~d.local, remote~d.remote, transportClass~d.registration.transportClass] }; DefaultGetTimeoutProc: NetworkStream.GetTimeoutProc ~ { d: NetworkStream.NetworkStreamData ¬ NARROW[stream.streamData]; RETURN [timeout~d.timeout, signalTimeout~d.signalTimeout] }; DefaultSetTimeoutProc: NetworkStream.SetTimeoutProc ~ { IF timeout # NetworkStream.waitForever THEN RaiseIOError[NotImplementedForThisStream, stream, LIST[$notImplemented, $SetTimeout], "SetTimeout not implemented"]; }; DefaultGetStreamStateProc: NetworkStream.GetStreamStateProc ~ { IF reset THEN RaiseIOError[NotImplementedForThisStream, in, LIST[$notImplemented, $GetStreamState], "reset stream state not implemented"]; RETURN [streamState~open, subStreamType~0, attentionType~0]; }; OutputGetStreamStateProc: NetworkStream.GetStreamStateProc ~ { RaiseIOError[NotImplementedForThisStream, in, LIST[$notImplemented, $GetStreamState], "GetStreamState not implemented for output stream"]; ERROR; }; DefaultSetSubStreamTypeProc: NetworkStream.SetSubStreamTypeProc ~ { IF subStreamType # 0 THEN RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SetSubStreamType], "SetSubStreamType not implemented"]; }; InputSetSubStreamTypeProc: NetworkStream.SetSubStreamTypeProc ~ { RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SetSubStreamType], "SetSubStreamType not implemented for input stream"]; }; DefaultSendEndOfMessageProc: NetworkStream.SendEndOfMessageProc ~ { RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendEndOfMessage], "SendEndOfMessage not implemented"]; }; InputSendEndOfMessageProc: NetworkStream.SendEndOfMessageProc ~ { RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendEndOfMessage], "SendEndOfMessage not implemented for input stream"]; }; DefaultWaitAttentionProc: NetworkStream.WaitAttentionProc ~ { EntryWaitAttention: ENTRY PROC ~ { ENABLE UNWIND => NULL; attn: CONDITION; TRUSTED { Process.EnableAborts[@attn]; IF timeout = NetworkStream.waitForever THEN { Process.DisableTimeout[@attn] } ELSE { Process.SetTimeout[@attn, Process.MsecToTicks[timeout]] }; }; WAIT attn; }; DO EntryWaitAttention[]; RaiseIOError[Failure, in, LIST[$timeout, $waitAttention], "WaitAttention timeout"]; ENDLOOP; }; OutputWaitAttentionProc: NetworkStream.WaitAttentionProc ~ { RaiseIOError[NotImplementedForThisStream, in, LIST[$notImplemented, $WaitAttention], "WaitAttention not implemented for output stream"]; RETURN [0] }; DefaultSendAttentionProc: NetworkStream.SendAttentionProc ~ { RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendAttention], "SendAttention not implemented"]; }; InputSendAttentionProc: NetworkStream.SendAttentionProc ~ { RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendAttention], "SendAttention not implemented for input stream"]; }; InputWaitAckProc: NetworkStream.WaitAckProc ~ { RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $WaitAck], "WaitAck not implemented for input stream"]; }; DefaultWaitAckProc: NetworkStream.WaitAckProc ~ { ackIndex, bufferIndex: INT; IF index = 0 THEN RETURN; [ackIndex~ackIndex, bufferIndex~bufferIndex] ¬ NetworkStream.GetIndexDetails[out]; IF index < 0 THEN index ¬ bufferIndex; IF ackIndex >= index THEN RETURN; RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $WaitAck], "WaitAck not implemented"]; }; DefaultFinalizeStreamProc: NetworkStream.FinalizeStreamProc ~ { ENABLE IO.Error, IO.EndOfStream, Error, Timeout => CONTINUE; IO.Close[stream]; }; sendSoonCommTimer: CommTimer.Timer; sendSoonTail: STREAM ¬ NIL; sendSoonListNotEmpty: CONDITION; GetFromSendSoonList: ENTRY PROC RETURNS [s: STREAM, sendSoonIndex: INT] ~ { ENABLE UNWIND => NULL; tailData, sData: NetworkStream.NetworkStreamData; WHILE sendSoonTail = NIL DO WAIT sendSoonListNotEmpty ENDLOOP; tailData ¬ NARROW[sendSoonTail.streamData]; s ¬ tailData.sendSoonNext; sData ¬ NARROW[s.streamData]; IF s = sendSoonTail THEN sendSoonTail ¬ NIL ELSE tailData.sendSoonNext ¬ sData.sendSoonNext; sendSoonIndex ¬ sData.sendSoonIndex; sData.sendSoonIndex ¬ -1; sData.sendSoonNext ¬ NIL; }; AddToSendSoonList: ENTRY PROC [s: STREAM] ~ { ENABLE UNWIND => NULL; sData, tailData: NetworkStream.NetworkStreamData; sData ¬ NARROW[s.streamData]; IF sData.sendSoonNext # NIL THEN RETURN; IF sendSoonTail = NIL THEN { sendSoonTail ¬ sData.sendSoonNext ¬ s; } ELSE { tailData ¬ NARROW[sendSoonTail.streamData]; sData.sendSoonNext ¬ tailData.sendSoonNext; sendSoonTail ¬ tailData.sendSoonNext ¬ s; }; NOTIFY sendSoonListNotEmpty; }; SendSoonEventProc: CommTimer.EventProc -- [clientData] -- ~ { AddToSendSoonList[NARROW[clientData]]; }; SendSoonDaemon: PROC ~ { s: STREAM; sendSoonIndex: INT; DO [s, sendSoonIndex] ¬ GetFromSendSoonList[]; {ENABLE IO.Error, Error, Timeout, ABORTED => CONTINUE; IF NetworkStream.GetIndexDetails[s].bufferIndex < sendSoonIndex THEN IO.Flush[s]; }; ENDLOOP; }; DefaultSendSoonProc: NetworkStream.SendSoonProc ~ { d: NetworkStream.NetworkStreamData; oldIndex: INT ¬ 0; EntrySwapIndex: ENTRY PROC ~ { oldIndex ¬ d.sendSoonIndex; d.sendSoonIndex ¬ IO.GetIndex[out]; }; IF when = 0 THEN { IO.Flush[out]; RETURN }; d ¬ NARROW[out.streamData]; EntrySwapIndex[]; IF oldIndex < 0 THEN [] ¬ CommTimer.ScheduleEvent[sendSoonCommTimer, when, SendSoonEventProc, out]; }; InputSendSoonProc: NetworkStream.SendSoonProc ~ { RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendSoon], "SendSoon not implemented for input stream"]; }; StartCodeForSendSoon: PROC ~ { sendSoonCommTimer ¬ CommTimer.CreateTimer[sendSoonGrainSizeMsec, sendSoonExpectedWaitMsec]; TRUSTED { Process.Detach[ FORK SendSoonDaemon[] ] }; }; theFQ: FinalizeOps.CallQueue ¬ FinalizeOps.CreateCallQueue[Finalizer]; Finalizer: FinalizeOps.FinalizeProc ~ { ENABLE ABORTED => CONTINUE; WITH object SELECT FROM s: IO.STREAM => { d: NetworkStream.NetworkStreamData ¬ NARROW[s.streamData]; d.procs.finalizeStream[s]; }; l: NetworkStream.Listener => { l.procs.finalizeListener[l]; }; ENDCASE => ERROR; }; debugMsgs: BOOL ¬ FALSE; SetDebugMsgs: PROC [i: INT] RETURNS [prev: INT] ~ { prev ¬ IF debugMsgs THEN 1 ELSE 0; debugMsgs ¬ (i # 0); }; DBMsgInner: PROC [fmt, s: CStrings.CString] ~ TRUSTED MACHINE CODE { "XR_PrintF" }; debugFmt: CStrings.CString ¬ UXStrings.Create["%s"]; DBMsg: PROC [r: ROPE] ~ { s: CStrings.CString ¬ UXStrings.Create[r]; DBMsgInner[debugFmt, s]; }; DBPutErr: PROC [m1: ROPE, codes: LIST OF ATOM, m2: ROPE] ~ { DBMsg[m1]; DBMsg[" codes: "]; FOR each: LIST OF ATOM ¬ codes, each.rest WHILE each # NIL DO DBMsg[Convert.RopeFromAtom[each.first]]; DBMsg[" "]; ENDLOOP; DBMsg[", msg: "]; DBMsg[m2]; DBMsg["\n"]; }; DBAtomFromString: PROC [s: CStrings.CString] RETURNS [ATOM] ~ { r: ROPE ¬ UXStrings.ToRope[s, 100]; RETURN [Convert.AtomFromRope[r]]; }; DBEchoWorker: NetworkStream.ListenerWorkerProc -- [listener, in, out] -- ~ { ENABLE UNWIND => { IO.Close[in, TRUE]; IO.Close[out, TRUE] }; c: CHAR; DO c ¬ IO.GetChar[in ! IO.EndOfStream => EXIT]; IO.PutChar[out, c]; NetworkStream.SendSoon[out, 0]; ENDLOOP; IO.Close[in, TRUE]; IO.Close[out, TRUE]; }; DBStartEchoer: PROC [pf: ATOM, local: ROPE, tc: ATOM] RETURNS [listener: NetworkStream.Listener] ~ { listener ¬ NetworkStream.CreateListener[pf, local, tc, NIL, DBEchoWorker, NIL]; }; DBComplexEchoWorker: NetworkStream.ListenerWorkerProc -- [listener, in, out] -- ~ { ENABLE UNWIND => { IO.Close[in, TRUE]; IO.Close[out, TRUE] }; c: CHAR; atEndOfStream: BOOL ¬ FALSE; streamState: NetworkStream.StreamState; subStreamType: NetworkStream.SubStreamType; attentionType: NetworkStream.AttentionType; errorKind: CARD ¬ 0; errorCodes: LIST OF ATOM; errorMsg: ROPE; IF debugMsgs THEN DBMsg["Echoer entered\n"]; DO IF atEndOfStream THEN { [streamState, subStreamType, attentionType] ¬ NetworkStream.GetStreamState[in, TRUE]; IF debugMsgs THEN { m: ROPE ¬ IO.PutFR["Echoer atEndOfStream, state %g sst %g attn %g\n", IO.card[ORD[streamState]], IO.card[subStreamType], IO.card[attentionType]]; DBMsg[m]; }; IF streamState = remoteClosed THEN EXIT; atEndOfStream ¬ FALSE; LOOP; }; IF errorKind # 0 THEN { IF debugMsgs THEN { DBPutErr[IO.PutFR1["Echoer err %g ", IO.card[errorKind]], errorCodes, errorMsg]; }; EXIT; }; { ENABLE { NetworkStream.Error => { errorKind ¬ 1; errorCodes ¬ codes; errorMsg ¬ msg; LOOP }; NetworkStream.Timeout => { errorKind ¬ 2; errorCodes ¬ codes; errorMsg ¬ msg; LOOP }; IO.Error => { errorKind ¬ 3; [codes~errorCodes, msg~errorMsg] ¬ NetworkStream.GetIOErrorDetails[stream]; LOOP }; IO.EndOfStream => { atEndOfStream ¬ TRUE; LOOP }; }; c ¬ IO.GetChar[in]; }; IF debugMsgs THEN { m: ROPE ¬ IO.PutFR1["Echoer got %g, replying ...\n", IO.char[c]]; DBMsg[m]; }; IO.PutChar[out, c]; IF debugMsgs THEN { DBMsg["echoer done putchar ...\n"]; }; NetworkStream.SendSoon[out, 0]; IF debugMsgs THEN { DBMsg["echoer done SendSoon.\n"]; }; ENDLOOP; IF debugMsgs THEN { DBMsg["Echoer exit\n"]; }; IO.Close[in, TRUE]; IO.Close[out, TRUE]; }; DBStartComplexEchoer: PROC [pf: ATOM, local: ROPE, tc: ATOM] RETURNS [listener: NetworkStream.Listener] ~ { listener ¬ NetworkStream.CreateListener[pf, local, tc, NIL, DBComplexEchoWorker, NIL]; }; DBListenerInfo: TYPE ~ RECORD [pf: ATOM, local: ROPE, tc: ATOM]; DBGetListenerInfo: PROC [l: NetworkStream.Listener] RETURNS [info: REF DBListenerInfo] ~ { info ¬ NEW[DBListenerInfo ¬ [NIL, NIL, NIL]]; [protocolFamily~info.pf, local~info.local, transportClass~info.tc] ¬ NetworkStream.GetListenerInfo[l]; }; DBStreamInfo: TYPE ~ RECORD [pf: ATOM, local: ROPE, remote: ROPE, tc: ATOM]; DBGetStreamInfo: PROC [s: IO.STREAM] RETURNS [info: REF DBStreamInfo] ~ { info ¬ NEW[DBStreamInfo ¬ [NIL, NIL, NIL, NIL]]; [protocolFamily~info.pf, local~info.local, remote~info.remote, transportClass~info.tc] ¬ NetworkStream.GetStreamInfo[s]; }; DBStreamPair: TYPE ~ RECORD [ in, out: IO.STREAM ]; DBCreateStreamPair: PROC [pf: ATOM, remote: ROPE, tc: ATOM, timeoutMsec: CARD32] RETURNS [streamPair: REF DBStreamPair] ~ { streamPair ¬ NEW[DBStreamPair ¬ [NIL, NIL]]; [streamPair.in, streamPair.out] ¬ NetworkStream.CreateStreams[pf, remote, tc, timeoutMsec, NIL]; }; DBCallProtected: PROC [callee: PROC] ~ { ENABLE { NetworkStream.Error => { IF debugMsgs THEN { DBPutErr["NS.Err", codes, msg]; CONTINUE; }; REJECT; }; NetworkStream.Timeout => { IF debugMsgs THEN { DBPutErr["NS.Timeout", codes, msg]; CONTINUE; }; REJECT; }; IO.Error => { IF debugMsgs THEN { codes: LIST OF ATOM; msg: ROPE; [codes, msg] ¬ NetworkStream.GetIOErrorDetails[stream]; DBPutErr["IO.Err", codes, msg]; CONTINUE; }; REJECT; }; IO.EndOfStream => { IF debugMsgs THEN { DBPutErr["IO.EndOfStream", NIL, NIL]; CONTINUE; }; REJECT; }; }; callee[]; }; DBPutC: PROC [s: IO.STREAM, c: CHAR] ~ { DoIt: PROC ~ { IO.PutChar[s, c] }; DBCallProtected[DoIt]; }; DBFlushC: PROC [s: IO.STREAM] ~ { DoIt: PROC ~ { IO.Flush[s] }; DBCallProtected[DoIt]; }; DBGetC: PROC [s: IO.STREAM] RETURNS[c: CHAR] ~ { DoIt: PROC ~ { c ¬ IO.GetChar[s] }; c ¬ '?; DBCallProtected[DoIt]; }; DBGetBlock: PROC [s: IO.STREAM, n: INT] RETURNS [b: REF TEXT] ~ { DoIt: PROC ~ { nGot: INT; b ¬ RefText.New[n]; nGot ¬ IO.GetBlock[s, b, 0, n]; IF (nGot < n) AND (debugMsgs) THEN { DBMsg[IO.PutFR["GetBlock got only %g of %g\n", IO.int[nGot], IO.int[n]]]; }; }; DBCallProtected[DoIt]; }; DBPutBlock: PROC [s: IO.STREAM, n: INT, c: CHAR] ~ { DoIt: PROC ~ { b: REF TEXT ¬ RefText.New[n]; FOR i: INT IN [0..n) DO b[i] ¬ c; ENDLOOP; IO.PutBlock[s, b, 0, n]; }; DBCallProtected[DoIt]; }; StartCodeForSendSoon[]; }.   NetworkStreamImpl.mesa Copyright Σ 1989, 1991, 1992 by Xerox Corporation. All rights reserved. Demers, May 7, 1990 9:21 am PDT Last tweaked by Mike Spreitzer on November 9, 1989 4:36:12 pm PST Willie-s, September 30, 1991 1:49 pm PDT Michael Plass, November 22, 1991 4:21 pm PST Christian Jacobi, July 24, 1992 2:05 pm PDT Types Parameters Public Errors Error Reporting Implementation Registration Listener Creation Stream Creation Default Stream Procs DefaultSendSoonProc: NetworkStream.FinalizeListenerProc ~ { (see below) }; InputSendSoonProc: NetworkStream.FinalizeListenerProc ~ { (see below) }; SendSoon Implementation Invariant: d.sendSoonIndex >= 0 ==> eventually an IO.Flush will be performed to flush beyond that index value. d.sendSoonIndex < 0 ==> stream is not on sendSoonList. THIS SHOULD BE FIXED TO HAVE A SEPARATE PROCESS PER STREAM - ajd. FinalizeOps Debugging Stuff IF streamState = open THEN Start Κe–(cedarcode) style•NewlineDelimiter ™code™Kšœ Οeœ=™HK™K™AK™(K™,K™+K˜—šΟk ˜ K˜ Kšœ Οc ˜K˜ K˜ Kšžœ˜K˜K˜Kšœ Ÿ ˜Kšœ˜Kšœ Ÿ ˜K˜K˜—šΟnœžœž˜ Kšžœ"žœ,˜WKšžœ˜K˜head™Kšžœžœžœ˜Kšžœžœžœžœ˜—™ Kšœžœ˜ Kšœžœ˜%—™ š œžœžœ žœ žœžœžœžœžœ˜HK˜—š œžœžœ žœ žœžœžœžœžœ˜KK˜——™š  œžœžœžœžœžœ˜2Kšžœžœ˜.K˜—Kšœžœžœ˜,šœžœžœ˜#K˜Kšœ žœ˜Kšœžœžœžœ˜Kšœž˜ K˜K˜—šœ1žœ˜5šœ)˜)Kšœ žœ˜Kšœž˜ K˜—K˜K˜—š œžœžœ-žœžœžœžœ˜dKšžœžœžœžœ˜šœžœ˜,Kšœžœ˜Kšœ˜K˜ K˜K˜—K˜K˜—š œžœžœ)˜GKšœžœ˜Kšœžœ˜Kšžœžœžœžœ˜Kšœ˜Kšœžœ˜šž˜Kšžœžœžœžœ˜Kšžœžœžœ˜K˜Kšžœ˜—Kšžœžœžœžœ˜DKšœ žœŸ ˜K˜K˜—š œžœžœ&žœ žœžœžœžœ˜pKšœžœ˜Kšœ˜Kšžœ-žœžœ˜:Kš žœžœžœžœžœžœ˜,Kšœ˜Kšœžœ˜šž˜Kš žœžœžœžœžœžœ˜,Kšžœžœžœ˜;Kšœ ˜ Kšžœ˜—K˜K˜—š œžœžœ žœžœžœ žœžœžœžœ˜^Kšœ#˜#Kš œžœ žœžœžœžœ˜DKšœ%˜%K˜K˜—š  œžœžœžœžœ žœžœžœžœ˜`Kšœ#˜#Kš œžœ žœžœžœžœ˜F˜Kšžœžœ˜+K˜šžœž˜˜ Kšžœ žœžœ˜/šžœ ž˜šœ ˜ Kš žœžœžœžœžœ˜7Kšžœ˜"K˜—šœ˜Kšžœžœ˜K˜—˜Kšžœ ˜%K˜—šžœ˜ Kšžœžœ˜K˜——K˜—šžœ˜ Kšžœžœ˜K˜——K˜—K˜K˜——™ šœžœžœžœ˜8K˜—š  œžœžœžœžœ0˜rKšœ˜Kšœžœžœ˜)Kšœžœžœ˜)Kšœ!˜!š œžœžœ˜!Kšžœžœžœ˜Kšœžœ˜!šž˜Kšžœžœžœžœ˜Kšžœ.žœ.žœžœ˜kK˜Kšžœ˜—K˜—Kš žœžœžœžœžœN˜ŠKš žœžœžœžœžœN˜ŠKšœ˜šžœžœž˜šœžœ˜Kšœ žœ˜šžœž˜ KšžœžœM˜f—K˜—šœ-žœ2˜bKšœ žœ˜šžœžœž˜Kšœžœ˜KšœžœL˜VKšœžœ˜(Kšžœ˜!—K˜—Kšžœžœ˜—K˜K˜—š  œžœ žœ žœ!žœ˜zKšœ˜Kšœ@˜@KšœA˜Ašœ2˜2K˜—KšžœžœžœžœJ˜‹KšžœžœžœžœJ˜ŠKšžœžœžœ žœžœžœžœB˜‘Kšžœžœžœ žœžœžœžœD˜”KšžœžœžœžœžœžœžœH˜—K˜KšžœžœžœžœžœžœžœU˜±K˜Kšžœžœžœ6˜SK˜Kšžœžœžœ/˜OKšžœžœžœ)˜FKšžœžœžœ)˜FKšžœžœžœ#˜>Kš žœžœžœžœžœU˜—Kšžœžœžœ1˜RKšžœžœžœ3˜VKšžœžœžœ3˜VKšžœžœžœ/˜OKšžœžœžœ-˜MKšžœžœžœ!˜;Kšžœžœžœ1˜RK˜Kšžœžœ/˜OKšžœžœžœ)˜FKšžœžœžœ)˜FKšžœžœžœ%˜@Kš žœžœžœžœžœV˜˜Kšžœžœžœ0˜QKšžœžœžœ5˜XKšžœžœžœ5˜XKšžœžœžœ.˜NKšžœžœžœ/˜OKšžœžœžœ#˜=Kšžœžœžœ1˜RK˜Kšžœžœžœ8˜VK˜šžœžœžœ˜Kšžœžœžœ1˜QKš žœžœžœžœžœP˜Kšžœžœžœ3˜TK˜K˜—Kšžœ˜ K˜K˜—š   œž œžœ žœ žœ.˜jK˜š žœžœžœ7žœžœž˜[Kš žœ žœžœ(žœžœ˜IKš žœ žœžœ'žœžœ˜GKšžœžœ<žœžœ˜LKšžœ˜—K˜K˜—š  œžœžœžœžœžœ$˜wš žœžœžœ7žœžœž˜[Kš žœžœžœ.žœžœ˜UKš žœžœžœ.žœžœ˜UKšžœ˜Kšžœ˜—Kšžœžœ˜ K˜——™š œžœžœ,žœ#˜~Kšœ˜šœ žœ!˜/K˜Kšœ ˜ Kšœ˜—K˜K˜—š œžœžœžœ žœžœžœRžœžœ#˜οKšœ˜Kšœ ˜ Kšœ9˜9Kš žœžœžœžœžœM˜~Kšœm˜mK˜5K˜K˜—š #œ0˜SKšœžœT˜sK˜K˜—š Πbnœ$ŸRœ˜”K˜6Kšœ"˜"K˜Kšœ"˜"Kšœ#˜#Kšœ/˜/K˜K˜—š œ(˜CKšžœžœžœ žœ˜˜y—K˜K˜—š œ&˜?Kšžœžœ/žœJ˜ŠKšžœ6˜Kšœ.žœX˜ŠKšžœ˜K˜K˜—š œ(˜CKšžœžœ0žœJ˜—K˜K˜—š œ(˜AKšœ/žœ[˜ŽK˜K˜—š œ(˜CKšœ/žœJ˜}K˜K˜—š œ(˜AKšœ/žœ[˜ŽK˜K˜—š œ%˜=š œžœžœ˜"Kšžœžœžœ˜Kšœž œ˜šžœ˜ Kšœ˜šžœ$˜&Kšžœ"˜&Kšžœ>˜B—Kšœ˜—Kšžœ˜ K˜—šž˜K˜Kšœžœ5˜SKšžœ˜—K˜K˜—š œ%˜Kšœ žœ˜+Kšœ˜Kšœžœ˜šžœ˜Kšžœž˜Kšžœ,˜0—Kšœ$˜$Kšœ˜Kšœžœ˜K˜K˜—š œžœžœžœ˜-Kšžœžœžœ˜Kšœ1˜1Kšœžœ˜Kšžœžœžœžœ˜(šžœž˜šžœ˜Kšœ&˜&Kšœ˜—šžœ˜Kšœ žœ˜+Kšœ+˜+Kšœ)˜)K˜——Kšžœ˜K˜K˜—š œŸœ˜=Kšœžœ˜&Kšœ˜K˜—š œžœ˜Kšœžœ˜ Kšœžœ˜šž˜šœ+˜+Kš œžœžœžœžœ˜6šžœ=˜?Kšžœžœ ˜—Kšœ˜—Kšžœ˜—K˜K˜—š œ ˜3Kšœ#˜#Kšœ žœ˜š œžœžœ˜Kšœ˜Kšœžœ˜#K˜—Kšžœ žœžœ žœ˜+Kšœžœ˜K˜KšžœžœO˜c˜K˜——š œ ˜1Kšœ/žœK˜~K˜K˜—š œžœ˜Kšœ[˜[Kšžœžœ˜4K˜——™ K˜FK˜š  œ˜'Kšžœžœžœ˜šžœžœž˜šœžœžœ˜Kšœ%žœ˜:Kšœ˜K˜—˜Kšœ˜K˜—Kšžœžœ˜—K˜—K˜—™Kšœ žœžœ˜K˜š   œžœžœžœžœ˜3Kšœžœ žœžœ˜"Kšœ˜K˜K˜—Kš   œžœžœžœžœ˜SK˜K˜5š œžœžœ˜K˜*Kšœ˜K˜—K˜š œžœžœ žœžœžœžœ˜