NetworkStreamImpl.mesa
Copyright Ó 1989, 1991, 1992 by Xerox Corporation. All rights reserved.
Demers, May 7, 1990 9:21 am PDT
Last tweaked by Mike Spreitzer on November 9, 1989 4:36:12 pm PST
Willie-s, September 30, 1991 1:49 pm PDT
Michael Plass, November 22, 1991 4:21 pm PST
Christian Jacobi, July 24, 1992 2:05 pm PDT
DIRECTORY
CommTimer,
Convert, -- debug only
CStrings,
FinalizeOps,
IO,
NetworkStream,
Process,
RefText, -- debug only
Rope,
UXStrings -- debug only
;
NetworkStreamImpl: CEDAR MONITOR
IMPORTS CommTimer, Convert, FinalizeOps, IO, NetworkStream, Process, RefText, UXStrings
EXPORTS NetworkStream
~ {
Types
ROPE: TYPE ~ Rope.ROPE;
STREAM: TYPE ~ IO.STREAM;
Parameters
sendSoonGrainSizeMsec: INT ¬ 50;
sendSoonExpectedWaitMsec: INT ¬ 2000;
Public Errors
Error: PUBLIC ERROR [which: REF, codes: LIST OF ATOM, msg: ROPE] ~ CODE;
Timeout: PUBLIC SIGNAL [which: REF, codes: LIST OF ATOM, msg: ROPE] ¬ CODE;
Error Reporting Implementation
Self: PROC [] RETURNS [PROCESS] ~ TRUSTED INLINE {
RETURN [ LOOPHOLE[ Process.GetCurrent[] ] ] };
ErrorDetails: TYPE ~ REF ErrorDetailsRecord;
ErrorDetailsRecord: TYPE ~ RECORD [
next: ErrorDetails,
process: PROCESS,
codes: LIST OF ATOM,
msg: ROPE
];
nilStreamData: NetworkStream.NetworkStreamData ¬ NEW[
NetworkStream.NetworkStreamDataRecord ¬ [
registration~NIL,
procs~NIL
]
];
AddErrorDetails: ENTRY PROC [d: NetworkStream.NetworkStreamData, codes: LIST OF ATOM, msg: ROPE] ~ {
IF d = NIL THEN RETURN;
d.pendingErrors ¬ NEW[ErrorDetailsRecord ¬ [
next~NARROW[d.pendingErrors],
process~Self[],
codes~codes,
msg~msg
]];
};
DeleteErrorDetails: ENTRY PROC [d: NetworkStream.NetworkStreamData] ~ {
p, prev: ErrorDetails ¬ NIL;
self: PROCESS;
IF d = NIL THEN RETURN;
self ¬ Self[];
p ¬ NARROW[d.pendingErrors];
DO
IF p = NIL THEN RETURN;
IF p.process = self THEN EXIT;
prev ¬ p; p ¬ p.next;
ENDLOOP;
IF prev = NIL THEN d.pendingErrors ¬ p.next ELSE prev.next ¬ p.next;
p.next ¬ NIL; -- helps GC
};
LookupErrorDetails: ENTRY PROC [d: NetworkStream.NetworkStreamData] RETURNS [codes: LIST OF ATOM, msg: ROPE] ~ {
self: PROCESS;
p: ErrorDetails;
IF Process.GetCurrent[] # Process.GetCurrent[] THEN ERROR;
IF d = NIL THEN RETURN [codes~NIL, msg~NIL];
self ¬ Self[];
p ¬ NARROW[d.pendingErrors];
DO
IF p = NIL THEN RETURN [codes~NIL, msg~NIL];
IF p.process = self THEN RETURN [codes~p.codes, msg~p.msg];
p ¬ p.next;
ENDLOOP;
};
GetIOErrorDetails: PUBLIC PROC [which: IO.STREAM]
RETURNS [codes: LIST OF ATOM, msg: ROPE] ~ {
d: NetworkStream.NetworkStreamData;
d ¬ IF which # NIL THEN NARROW[which.streamData] ELSE nilStreamData;
[codes, msg] ¬ LookupErrorDetails[d];
};
RaiseIOError: PUBLIC PROC [ec: IO.ErrorCode, stream: STREAM, codes: LIST OF ATOM, msg: ROPE] ~ {
d: NetworkStream.NetworkStreamData;
d ¬ IF stream # NIL THEN NARROW[stream.streamData] ELSE nilStreamData;
{
ENABLE UNWIND => { DeleteErrorDetails[d] };
AddErrorDetails[d, codes, msg];
SELECT ec FROM
Failure => {
IF codes = NIL THEN ERROR IO.Error[ec, stream];
SELECT codes.first FROM
$timeout => {
IF NOT d.signalTimeout THEN ERROR IO.Error[ec, stream];
SIGNAL Timeout[stream, codes, msg]
};
$endOfStream => {
ERROR IO.EndOfStream[stream];
};
$networkStreamError => {
ERROR Error[stream, codes.rest, msg];
};
ENDCASE => {
ERROR IO.Error[ec, stream];
};
};
ENDCASE => {
ERROR IO.Error[ec, stream];
};
};
DeleteErrorDetails[d];
};
Registration
registrations: LIST OF NetworkStream.Registration ¬ NIL;
Register: PUBLIC PROC [
protocolFamily: ATOM,
transportClass: ATOM,
proc: NetworkStream.RegistrationCallbackProc ]
~ {
prev: LIST OF NetworkStream.Registration;
each: LIST OF NetworkStream.Registration;
rNew: NetworkStream.Registration;
LookupForRegister: ENTRY PROC ~ {
ENABLE UNWIND => NULL;
prev ¬ NIL; each ¬ registrations;
DO
IF each = NIL THEN EXIT;
IF (each.first.protocolFamily = protocolFamily)
AND (each.first.transportClass = transportClass) THEN EXIT;
prev ¬ each; each ¬ each.rest;
ENDLOOP;
};
IF protocolFamily = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $protocolFamily], "missing protocol family"];
IF transportClass = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $transportClass], "missing transport class"];
LookupForRegister[];
SELECT TRUE FROM
(each = NIL ) => {
rNew ¬ proc[NIL];
IF rNew # NIL
THEN registrations ¬ CONS[
CompleteRegistration[protocolFamily, transportClass, rNew],
registrations];
};
(each.first.protocolFamily = protocolFamily)
AND (each.first.transportClass = transportClass) => {
rNew ¬ proc[NIL];
SELECT TRUE FROM
rNew = each.first => NULL;
rNew # NIL => each.first ¬
CompleteRegistration[protocolFamily, transportClass, rNew];
prev = NIL => registrations ¬ each.rest;
ENDCASE => prev.rest ¬ each.rest;
};
ENDCASE => ERROR;
};
CompleteRegistration: PROC [family: ATOM, class: ATOM, r: NetworkStream.Registration]
RETURNS [NetworkStream.Registration]
~ {
insp: NetworkStream.NetworkStreamProcs ¬ r.inNetworkStreamProcs;
onsp: NetworkStream.NetworkStreamProcs ¬ r.outNetworkStreamProcs;
lp: NetworkStream.ListenerProcs ¬ r.listenerProcs;
IF r.protocolFamily # family THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $protocolFamily], "bad protocol family"];
IF r.transportClass # class THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $transportClass], "bad transport class"];
IF (r.inStreamProcs = NIL) OR (insp = NIL) THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $inputProcs], "bad input procs"];
IF (r.outStreamProcs = NIL) OR (onsp = NIL) THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $outputProcs], "bad output procs"];
IF (r.createListener # NIL) AND (lp = NIL) THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $listenerProcs], "bad listener procs"];
IF (r.createStreams = NIL) AND (r.createListener = NIL) THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $create], "no create proc for streams or listener"];
IF r.createStreams = NIL THEN r.createStreams ¬ DefaultRegisteredCreateStreamsProc;
IF insp.getStreamInfo = NIL THEN insp.getStreamInfo ¬ DefaultGetStreamInfoProc;
IF insp.getTimeout = NIL THEN insp.getTimeout ¬ DefaultGetTimeoutProc;
IF insp.setTimeout = NIL THEN insp.setTimeout ¬ DefaultSetTimeoutProc;
IF insp.sendSoon = NIL THEN insp.sendSoon ¬ InputSendSoonProc;
IF insp.getIndexDetails = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $getIndexDetails], "no input GetIndexDetails proc"];
IF insp.getStreamState = NIL THEN insp.getStreamState ¬ DefaultGetStreamStateProc;
IF insp.setSubStreamType = NIL THEN insp.setSubStreamType ¬ InputSetSubStreamTypeProc;
IF insp.sendEndOfMessage = NIL THEN insp.sendEndOfMessage ¬ InputSendEndOfMessageProc;
IF insp.waitAttention = NIL THEN insp.waitAttention ¬ DefaultWaitAttentionProc;
IF insp.sendAttention = NIL THEN insp.sendAttention ¬ InputSendAttentionProc;
IF insp.waitAck = NIL THEN insp.waitAck ¬ InputWaitAckProc;
IF insp.finalizeStream = NIL THEN insp.finalizeStream ¬ DefaultFinalizeStreamProc;
IF onsp.getStreamInfo = NIL THEN onsp.getStreamInfo ¬ DefaultGetStreamInfoProc;
IF onsp.getTimeout = NIL THEN onsp.getTimeout ¬ DefaultGetTimeoutProc;
IF onsp.setTimeout = NIL THEN onsp.setTimeout ¬ DefaultSetTimeoutProc;
IF onsp.sendSoon = NIL THEN onsp.sendSoon ¬ DefaultSendSoonProc;
IF onsp.getIndexDetails = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $getIndexDetails], "no output GetIndexDetails proc"];
IF onsp.getStreamState = NIL THEN onsp.getStreamState ¬ OutputGetStreamStateProc;
IF onsp.setSubStreamType = NIL THEN onsp.setSubStreamType ¬ DefaultSetSubStreamTypeProc;
IF onsp.sendEndOfMessage = NIL THEN onsp.sendEndOfMessage ¬ DefaultSendEndOfMessageProc;
IF onsp.waitAttention = NIL THEN onsp.waitAttention ¬ OutputWaitAttentionProc;
IF onsp.sendAttention = NIL THEN onsp.sendAttention ¬ DefaultSendAttentionProc;
IF onsp.waitAck = NIL THEN onsp.waitAck ¬ DefaultWaitAckProc;
IF onsp.finalizeStream = NIL THEN onsp.finalizeStream ¬ DefaultFinalizeStreamProc;
IF r.createListener = NIL THEN r.createListener ¬ DefaultRegisteredCreateListenerProc;
IF lp # NIL THEN {
IF lp.getListenerInfo = NIL THEN lp.getListenerInfo ¬ DefaultGetListenerInfoProc;
IF lp.destroyListener = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $Register, $destroyListener], "bad destroyListener proc"];
IF lp.finalizeListener = NIL THEN lp.finalizeListener ¬ DefaultFinalizeListenerProc;
};
RETURN [r];
};
Enumerate: PUBLIC ENTRY PROC [
families: ATOM,
classes: ATOM,
proc: NetworkStream.EnumerateCallbackProc ]
~ {
FOR each: LIST OF NetworkStream.Registration ¬ registrations, each.rest WHILE each # NIL DO
IF (families # NIL) AND (families # each.first.protocolFamily) THEN LOOP;
IF (classes # NIL) AND (classes # each.first.transportClass) THEN LOOP;
IF NOT proc[each.first.protocolFamily, each.first.transportClass] THEN EXIT;
ENDLOOP;
};
LookupRegistration: ENTRY PROC [protocolFamily: ATOM, transportClass: ATOM]
RETURNS [r: NetworkStream.Registration] ~ {
FOR each: LIST OF NetworkStream.Registration ¬ registrations, each.rest WHILE each # NIL DO
IF (protocolFamily # NIL) AND (protocolFamily # each.first.protocolFamily) THEN LOOP;
IF (transportClass # NIL) AND (transportClass # each.first.transportClass) THEN LOOP;
RETURN [each.first];
ENDLOOP;
RETURN [NIL];
};
Listener Creation
CreateUninitializedListener: PUBLIC PROC [registration: NetworkStream.Registration] RETURNS [listener: NetworkStream.Listener]
~ {
listener ¬ NEW[NetworkStream.ListenerRecord ¬ [
registration~registration,
procs~registration.listenerProcs
]];
};
CreateListener: PUBLIC PROC [protocolFamily: ATOM, local: ROPE, transportClass: ATOM, transportParameters: REF, listenerWorkerProc: NetworkStream.ListenerWorkerProc, listenerWorkerClientData: REF] RETURNS [listener: NetworkStream.Listener]
~ {
reg: NetworkStream.Registration;
reg ¬ LookupRegistration[protocolFamily, transportClass];
IF reg = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $notRegistered], "specified transport not registered"];
listener ¬ reg.createListener[reg, local, transportParameters, listenerWorkerProc, listenerWorkerClientData];
[] ¬ FinalizeOps.EnableFinalization[listener, theFQ];
};
DefaultRegisteredCreateListenerProc: NetworkStream.RegisteredCreateListenerProc ~ {
RaiseIOError[Failure, NIL, LIST[$networkStreamError, $InitListener, $notImplemented], "listeners not implemented"];
};
DefaultGetListenerInfoProc: NetworkStream.GetListenerInfoProc -- [listener] RETURNS [protocolFamily, local, transportClass, proc, clientData] -- ~ {
r: NetworkStream.Registration ¬ listener.registration;
protocolFamily ¬ r.protocolFamily;
local ¬ listener.local;
transportClass ¬ r.transportClass;
proc ¬ listener.listenerWorkerProc;
clientData ¬ listener.listenerWorkerClientData;
};
DefaultFinalizeListenerProc: NetworkStream.FinalizeListenerProc ~ {
ENABLE IO.Error, IO.EndOfStream, Error, Timeout => CONTINUE;
NetworkStream.DestroyListener[listener];
};
Stream Creation
CreateUninitializedStreams: PUBLIC PROC [registration: NetworkStream.Registration] RETURNS [in: IO.STREAM, out: IO.STREAM] ~ {
inNetworkStreamData: NetworkStream.NetworkStreamData;
outNetworkStreamData: NetworkStream.NetworkStreamData;
inNetworkStreamData ¬ NEW[NetworkStream.NetworkStreamDataRecord ¬ [registration~registration, procs~registration.inNetworkStreamProcs]];
in ¬ IO.CreateStream[streamProcs~registration.inStreamProcs, streamData~inNetworkStreamData];
outNetworkStreamData ¬ NEW[NetworkStream.NetworkStreamDataRecord ¬ [registration~registration, procs~registration.outNetworkStreamProcs]];
out ¬ IO.CreateStream[streamProcs~registration.outStreamProcs, streamData~outNetworkStreamData];
};
CreateStreams: PUBLIC PROC [protocolFamily: ATOM, remote: ROPE, transportClass: ATOM, timeout: NetworkStream.Milliseconds, transportParameters: REF] RETURNS [in: IO.STREAM, out: IO.STREAM] ~ {
reg: NetworkStream.Registration;
reg ¬ LookupRegistration[protocolFamily, transportClass];
IF reg = NIL THEN RaiseIOError[Failure, NIL, LIST[$networkStreamError, $notRegistered], "specified transport not registered"];
[in, out] ¬ reg.createStreams[reg, remote, timeout, transportParameters];
[] ¬ FinalizeOps.EnableFinalization[in, theFQ];
[] ¬ FinalizeOps.EnableFinalization[out, theFQ];
};
DefaultRegisteredCreateStreamsProc: NetworkStream.RegisteredCreateStreamsProc ~ {
RaiseIOError[Failure, NIL, LIST[$networkStreamError, $CreateStreams, $notImplemented], "connect not implemented"];
};
Default Stream Procs
DefaultGetStreamInfoProc: NetworkStream.GetStreamInfoProc ~ {
d: NetworkStream.NetworkStreamData ¬ NARROW[stream.streamData];
RETURN [protocolFamily~d.registration.protocolFamily, local~d.local, remote~d.remote, transportClass~d.registration.transportClass] };
DefaultGetTimeoutProc: NetworkStream.GetTimeoutProc ~ {
d: NetworkStream.NetworkStreamData ¬ NARROW[stream.streamData];
RETURN [timeout~d.timeout, signalTimeout~d.signalTimeout] };
DefaultSetTimeoutProc: NetworkStream.SetTimeoutProc ~ {
IF timeout # NetworkStream.waitForever
THEN RaiseIOError[NotImplementedForThisStream, stream, LIST[$notImplemented, $SetTimeout], "SetTimeout not implemented"];
};
DefaultGetStreamStateProc: NetworkStream.GetStreamStateProc ~ {
IF reset THEN RaiseIOError[NotImplementedForThisStream, in, LIST[$notImplemented, $GetStreamState], "reset stream state not implemented"];
RETURN [streamState~open, subStreamType~0, attentionType~0];
};
OutputGetStreamStateProc: NetworkStream.GetStreamStateProc ~ {
RaiseIOError[NotImplementedForThisStream, in, LIST[$notImplemented, $GetStreamState], "GetStreamState not implemented for output stream"];
ERROR;
};
DefaultSetSubStreamTypeProc: NetworkStream.SetSubStreamTypeProc ~ {
IF subStreamType # 0 THEN RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SetSubStreamType], "SetSubStreamType not implemented"];
};
InputSetSubStreamTypeProc: NetworkStream.SetSubStreamTypeProc ~ {
RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SetSubStreamType], "SetSubStreamType not implemented for input stream"];
};
DefaultSendEndOfMessageProc: NetworkStream.SendEndOfMessageProc ~ {
RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendEndOfMessage], "SendEndOfMessage not implemented"];
};
InputSendEndOfMessageProc: NetworkStream.SendEndOfMessageProc ~ {
RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendEndOfMessage], "SendEndOfMessage not implemented for input stream"];
};
DefaultWaitAttentionProc: NetworkStream.WaitAttentionProc ~ {
EntryWaitAttention: ENTRY PROC ~ {
ENABLE UNWIND => NULL;
attn: CONDITION;
TRUSTED {
Process.EnableAborts[@attn];
IF timeout = NetworkStream.waitForever
THEN { Process.DisableTimeout[@attn] }
ELSE { Process.SetTimeout[@attn, Process.MsecToTicks[timeout]] };
};
WAIT attn;
};
DO
EntryWaitAttention[];
RaiseIOError[Failure, in, LIST[$timeout, $waitAttention], "WaitAttention timeout"];
ENDLOOP;
};
OutputWaitAttentionProc: NetworkStream.WaitAttentionProc ~ {
RaiseIOError[NotImplementedForThisStream, in, LIST[$notImplemented, $WaitAttention], "WaitAttention not implemented for output stream"];
RETURN [0]
};
DefaultSendAttentionProc: NetworkStream.SendAttentionProc ~ {
RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendAttention], "SendAttention not implemented"];
};
InputSendAttentionProc: NetworkStream.SendAttentionProc ~ {
RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendAttention], "SendAttention not implemented for input stream"];
};
InputWaitAckProc: NetworkStream.WaitAckProc ~ {
RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $WaitAck], "WaitAck not implemented for input stream"];
};
DefaultWaitAckProc: NetworkStream.WaitAckProc ~ {
ackIndex, bufferIndex: INT;
IF index = 0 THEN RETURN;
[ackIndex~ackIndex, bufferIndex~bufferIndex] ¬ NetworkStream.GetIndexDetails[out];
IF index < 0 THEN index ¬ bufferIndex;
IF ackIndex >= index THEN RETURN;
RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $WaitAck], "WaitAck not implemented"];
};
DefaultFinalizeStreamProc: NetworkStream.FinalizeStreamProc ~ {
ENABLE IO.Error, IO.EndOfStream, Error, Timeout => CONTINUE;
IO.Close[stream];
};
DefaultSendSoonProc: NetworkStream.FinalizeListenerProc ~ { (see below) };
InputSendSoonProc: NetworkStream.FinalizeListenerProc ~ { (see below) };
SendSoon Implementation
Invariant:
d.sendSoonIndex >= 0 ==> eventually an IO.Flush will be performed to flush beyond that index value.
d.sendSoonIndex < 0 ==> stream is not on sendSoonList.
THIS SHOULD BE FIXED TO HAVE A SEPARATE PROCESS PER STREAM - ajd.
sendSoonCommTimer: CommTimer.Timer;
sendSoonTail: STREAM ¬ NIL;
sendSoonListNotEmpty: CONDITION;
GetFromSendSoonList: ENTRY PROC RETURNS [s: STREAM, sendSoonIndex: INT] ~ {
ENABLE UNWIND => NULL;
tailData, sData: NetworkStream.NetworkStreamData;
WHILE sendSoonTail = NIL DO WAIT sendSoonListNotEmpty ENDLOOP;
tailData ¬ NARROW[sendSoonTail.streamData];
s ¬ tailData.sendSoonNext;
sData ¬ NARROW[s.streamData];
IF s = sendSoonTail
THEN sendSoonTail ¬ NIL
ELSE tailData.sendSoonNext ¬ sData.sendSoonNext;
sendSoonIndex ¬ sData.sendSoonIndex;
sData.sendSoonIndex ¬ -1;
sData.sendSoonNext ¬ NIL;
};
AddToSendSoonList: ENTRY PROC [s: STREAM] ~ {
ENABLE UNWIND => NULL;
sData, tailData: NetworkStream.NetworkStreamData;
sData ¬ NARROW[s.streamData];
IF sData.sendSoonNext # NIL THEN RETURN;
IF sendSoonTail = NIL
THEN {
sendSoonTail ¬ sData.sendSoonNext ¬ s;
}
ELSE {
tailData ¬ NARROW[sendSoonTail.streamData];
sData.sendSoonNext ¬ tailData.sendSoonNext;
sendSoonTail ¬ tailData.sendSoonNext ¬ s;
};
NOTIFY sendSoonListNotEmpty;
};
SendSoonEventProc: CommTimer.EventProc -- [clientData] -- ~ {
AddToSendSoonList[NARROW[clientData]];
};
SendSoonDaemon: PROC ~ {
s: STREAM;
sendSoonIndex: INT;
DO
[s, sendSoonIndex] ¬ GetFromSendSoonList[];
{ENABLE IO.Error, Error, Timeout, ABORTED => CONTINUE;
IF NetworkStream.GetIndexDetails[s].bufferIndex < sendSoonIndex
THEN IO.Flush[s];
};
ENDLOOP;
};
DefaultSendSoonProc: NetworkStream.SendSoonProc ~ {
d: NetworkStream.NetworkStreamData;
oldIndex: INT ¬ 0;
EntrySwapIndex: ENTRY PROC ~ {
oldIndex ¬ d.sendSoonIndex;
d.sendSoonIndex ¬ IO.GetIndex[out];
};
IF when = 0 THEN { IO.Flush[out]; RETURN };
d ¬ NARROW[out.streamData];
EntrySwapIndex[];
IF oldIndex < 0 THEN [] ¬ CommTimer.ScheduleEvent[sendSoonCommTimer, when, SendSoonEventProc, out];
};
InputSendSoonProc: NetworkStream.SendSoonProc ~ {
RaiseIOError[NotImplementedForThisStream, out, LIST[$notImplemented, $SendSoon], "SendSoon not implemented for input stream"];
};
StartCodeForSendSoon: PROC ~ {
sendSoonCommTimer ¬ CommTimer.CreateTimer[sendSoonGrainSizeMsec, sendSoonExpectedWaitMsec];
TRUSTED { Process.Detach[ FORK SendSoonDaemon[] ] };
};
FinalizeOps
theFQ: FinalizeOps.CallQueue ¬ FinalizeOps.CreateCallQueue[Finalizer];
Finalizer: FinalizeOps.FinalizeProc ~ {
ENABLE ABORTED => CONTINUE;
WITH object SELECT FROM
s: IO.STREAM => {
d: NetworkStream.NetworkStreamData ¬ NARROW[s.streamData];
d.procs.finalizeStream[s];
};
l: NetworkStream.Listener => {
l.procs.finalizeListener[l];
};
ENDCASE => ERROR;
};
Debugging Stuff
debugMsgs: BOOL ¬ FALSE;
SetDebugMsgs: PROC [i: INT] RETURNS [prev: INT] ~ {
prev ¬ IF debugMsgs THEN 1 ELSE 0;
debugMsgs ¬ (i # 0);
};
DBMsgInner: PROC [fmt, s: CStrings.CString] ~ TRUSTED MACHINE CODE { "XR←PrintF" };
debugFmt: CStrings.CString ¬ UXStrings.Create["%s"];
DBMsg: PROC [r: ROPE] ~ {
s: CStrings.CString ¬ UXStrings.Create[r];
DBMsgInner[debugFmt, s];
};
DBPutErr: PROC [m1: ROPE, codes: LIST OF ATOM, m2: ROPE] ~ {
DBMsg[m1]; DBMsg[" codes: "];
FOR each: LIST OF ATOM ¬ codes, each.rest WHILE each # NIL DO
DBMsg[Convert.RopeFromAtom[each.first]];
DBMsg[" "];
ENDLOOP;
DBMsg[", msg: "]; DBMsg[m2]; DBMsg["\n"];
};
DBAtomFromString: PROC [s: CStrings.CString] RETURNS [ATOM] ~ {
r: ROPE ¬ UXStrings.ToRope[s, 100];
RETURN [Convert.AtomFromRope[r]];
};
DBEchoWorker: NetworkStream.ListenerWorkerProc -- [listener, in, out] -- ~ {
ENABLE UNWIND => { IO.Close[in, TRUE]; IO.Close[out, TRUE] };
c: CHAR;
DO
c ¬ IO.GetChar[in ! IO.EndOfStream => EXIT];
IO.PutChar[out, c];
NetworkStream.SendSoon[out, 0];
ENDLOOP;
IO.Close[in, TRUE]; IO.Close[out, TRUE];
};
DBStartEchoer: PROC [pf: ATOM, local: ROPE, tc: ATOM]
RETURNS [listener: NetworkStream.Listener] ~ {
listener ¬ NetworkStream.CreateListener[pf, local, tc, NIL, DBEchoWorker, NIL];
};
DBComplexEchoWorker: NetworkStream.ListenerWorkerProc -- [listener, in, out] -- ~ {
ENABLE UNWIND => { IO.Close[in, TRUE]; IO.Close[out, TRUE] };
c: CHAR;
atEndOfStream: BOOL ¬ FALSE;
streamState: NetworkStream.StreamState;
subStreamType: NetworkStream.SubStreamType;
attentionType: NetworkStream.AttentionType;
errorKind: CARD ¬ 0;
errorCodes: LIST OF ATOM;
errorMsg: ROPE;
IF debugMsgs THEN DBMsg["Echoer entered\n"];
DO
IF atEndOfStream THEN {
[streamState, subStreamType, attentionType] ¬
NetworkStream.GetStreamState[in, TRUE];
IF debugMsgs THEN {
m: ROPE ¬ IO.PutFR["Echoer atEndOfStream, state %g sst %g attn %g\n", IO.card[ORD[streamState]], IO.card[subStreamType], IO.card[attentionType]];
DBMsg[m];
};
IF streamState = remoteClosed THEN EXIT;
IF streamState = open THEN
atEndOfStream ¬ FALSE;
LOOP;
};
IF errorKind # 0 THEN {
IF debugMsgs THEN {
DBPutErr[IO.PutFR1["Echoer err %g ", IO.card[errorKind]], errorCodes, errorMsg];
};
EXIT;
};
{
ENABLE {
NetworkStream.Error => {
errorKind ¬ 1; errorCodes ¬ codes; errorMsg ¬ msg;
LOOP };
NetworkStream.Timeout => {
errorKind ¬ 2; errorCodes ¬ codes; errorMsg ¬ msg;
LOOP };
IO.Error => {
errorKind ¬ 3; [codes~errorCodes, msg~errorMsg] ¬ NetworkStream.GetIOErrorDetails[stream];
LOOP };
IO.EndOfStream => {
atEndOfStream ¬ TRUE;
LOOP };
};
c ¬ IO.GetChar[in];
};
IF debugMsgs THEN {
m: ROPE ¬ IO.PutFR1["Echoer got %g, replying ...\n", IO.char[c]];
DBMsg[m];
};
IO.PutChar[out, c];
IF debugMsgs THEN {
DBMsg["echoer done putchar ...\n"];
};
NetworkStream.SendSoon[out, 0];
IF debugMsgs THEN {
DBMsg["echoer done SendSoon.\n"];
};
ENDLOOP;
IF debugMsgs THEN {
DBMsg["Echoer exit\n"];
};
IO.Close[in, TRUE]; IO.Close[out, TRUE];
};
DBStartComplexEchoer: PROC [pf: ATOM, local: ROPE, tc: ATOM]
RETURNS [listener: NetworkStream.Listener] ~ {
listener ¬ NetworkStream.CreateListener[pf, local, tc, NIL, DBComplexEchoWorker, NIL];
};
DBListenerInfo: TYPE ~ RECORD [pf: ATOM, local: ROPE, tc: ATOM];
DBGetListenerInfo: PROC [l: NetworkStream.Listener] RETURNS [info: REF DBListenerInfo] ~ {
info ¬ NEW[DBListenerInfo ¬ [NIL, NIL, NIL]];
[protocolFamily~info.pf, local~info.local, transportClass~info.tc] ¬ NetworkStream.GetListenerInfo[l];
};
DBStreamInfo: TYPE ~ RECORD [pf: ATOM, local: ROPE, remote: ROPE, tc: ATOM];
DBGetStreamInfo: PROC [s: IO.STREAM] RETURNS [info: REF DBStreamInfo] ~ {
info ¬ NEW[DBStreamInfo ¬ [NIL, NIL, NIL, NIL]];
[protocolFamily~info.pf, local~info.local, remote~info.remote, transportClass~info.tc] ¬ NetworkStream.GetStreamInfo[s];
};
DBStreamPair: TYPE ~ RECORD [ in, out: IO.STREAM ];
DBCreateStreamPair: PROC [pf: ATOM, remote: ROPE, tc: ATOM, timeoutMsec: CARD32] RETURNS [streamPair: REF DBStreamPair] ~ {
streamPair ¬ NEW[DBStreamPair ¬ [NIL, NIL]];
[streamPair.in, streamPair.out] ¬ NetworkStream.CreateStreams[pf, remote, tc, timeoutMsec, NIL];
};
DBCallProtected: PROC [callee: PROC] ~ {
ENABLE {
NetworkStream.Error => {
IF debugMsgs THEN {
DBPutErr["NS.Err", codes, msg];
CONTINUE;
};
REJECT;
};
NetworkStream.Timeout => {
IF debugMsgs THEN {
DBPutErr["NS.Timeout", codes, msg];
CONTINUE;
};
REJECT;
};
IO.Error => {
IF debugMsgs THEN {
codes: LIST OF ATOM;
msg: ROPE;
[codes, msg] ¬ NetworkStream.GetIOErrorDetails[stream];
DBPutErr["IO.Err", codes, msg];
CONTINUE;
};
REJECT;
};
IO.EndOfStream => {
IF debugMsgs THEN {
DBPutErr["IO.EndOfStream", NIL, NIL];
CONTINUE;
};
REJECT;
};
};
callee[];
};
DBPutC: PROC [s: IO.STREAM, c: CHAR] ~ {
DoIt: PROC ~ { IO.PutChar[s, c] };
DBCallProtected[DoIt];
};
DBFlushC: PROC [s: IO.STREAM] ~ {
DoIt: PROC ~ { IO.Flush[s] };
DBCallProtected[DoIt];
};
DBGetC: PROC [s: IO.STREAM] RETURNS[c: CHAR] ~ {
DoIt: PROC ~ { c ¬ IO.GetChar[s] };
c ¬ '?;
DBCallProtected[DoIt];
};
DBGetBlock: PROC [s: IO.STREAM, n: INT] RETURNS [b: REF TEXT] ~ {
DoIt: PROC ~ {
nGot: INT;
b ¬ RefText.New[n];
nGot ¬ IO.GetBlock[s, b, 0, n];
IF (nGot < n) AND (debugMsgs) THEN {
DBMsg[IO.PutFR["GetBlock got only %g of %g\n", IO.int[nGot], IO.int[n]]];
};
};
DBCallProtected[DoIt];
};
DBPutBlock: PROC [s: IO.STREAM, n: INT, c: CHAR] ~ {
DoIt: PROC ~ {
b: REF TEXT ¬ RefText.New[n];
FOR i: INT IN [0..n) DO
b[i] ¬ c;
ENDLOOP;
IO.PutBlock[s, b, 0, n];
};
DBCallProtected[DoIt];
};
Start
StartCodeForSendSoon[];
}.