CrRPCCMUXImpl.mesa
Copyright Ó 1986, 1988, 1989, 1991, 1992 by Xerox Corporation. All rights reserved.
Demers, April 26, 1990 8:36 am PDT
Willie-Sue, January 3, 1989 3:21:26 pm PST
Tim Diebert: June 8, 1989 5:23:42 pm PDT
Foote, March 11, 1991 1:34 pm PST
Willie-s, March 26, 1993 3:48 pm PST
Courier runtime support for CMUX transport.
WARNING: byte-addressed machines only!
TODO: Ugh, the whole thing about switching the stream from input to output to BDT is not clearly thought out for servers yet! The localAbort bdt state may never be used!
DIRECTORY
Basics,
CrRPC,
CrRPCBuggy,
CrRPCBackdoor,
IO,
Process,
ProcStream,
RefText,
Rope,
RuntimeError,
SimpleFeedback,
--UnixTypes,
XNS,
XNSWKS;
CrRPCCMUXImpl: CEDAR MONITOR
IMPORTS Basics, CrRPC, CrRPCBackdoor, IO, Process, ProcStream, RefText, RuntimeError, SimpleFeedback
EXPORTS CrRPCBuggy
~ {
Debugging
debugMsgs: BOOL ¬ FALSE;
CRRPCCMUXSetDebugMsgs: PROC [val: INT] RETURNS [oldVal: INT] ~ {
oldVal ¬ (IF debugMsgs THEN 1 ELSE 0);
debugMsgs ¬ (val # 0);
};
DebugMsg: PROC [r: Rope.ROPE] ~ TRUSTED {
SimpleFeedback.Append[$CrRPC, oneLiner, $debugging, r];
};
Copied Types and Constants
byteAddressedOrElse: BOOL[TRUE..TRUE] ¬ (BYTES[INT32] = UNITS[INT32]);
BytePtr: TYPE ~ LONG POINTER TO BYTE;
alwaysSmashTheSocket: BOOL ¬ TRUE;
myHandleClass: CrRPC.HandleClass ¬ $CMUX;
serverPauseMsec: CARD32 ¬ 2500; -- after bad error
serverGetTimeoutMsec: CARD32 ¬ 2500; -- controls responsiveness of server shutdown
waitForever: CARD32 ¬ 0;
FWORD: TYPE ~ Basics.FWORD;
HWORD: TYPE ~ Basics.HWORD;
ROPE: TYPE ~ Rope.ROPE;
STREAM: TYPE ~ IO.STREAM;
Ticks: TYPE ~ Process.Ticks;
Handle: TYPE ~ CrRPC.Handle;
Glue
CProc: TYPE ~ RECORD[CARD32]; -- Opaque C procedure
XRCProcFromMProc: PROC [PROC] RETURNS [CProc] ~ TRUSTED MACHINE CODE { "XR𡤌ProcFromMProc" };
CMUXDescriptor: TYPE ~ RECORD [ val: INT32 ];
nullCMUXDescriptor: CMUXDescriptor ~ [-1];
ErrCodeFromCMUXDescriptor: PROC [d: CMUXDescriptor] RETURNS [INT] ~ INLINE { RETURN [IF d.val >= 0 THEN 0 ELSE d.val] };
EIO: INT ~ 5;
ERANGE: INT ~ 34;
ETIMEDOUT: INT ~ 60;
EBADMSG: INT ~ 76;
XRCMUXCreate: PROC RETURNS [CMUXDescriptor] ~ TRUSTED MACHINE CODE { "XR𡤌MUXCreate" };
XRCMUXDestroy: PROC [d: CMUXDescriptor] ~ TRUSTED MACHINE CODE { "XR𡤌MUXDestroy" };
CMUXAddrType: TYPE ~ MACHINE DEPENDENT {
defaultAddr(0),
defaultName(1),
xnsAddr(2),
xnsName(3),
last(CARD.LAST)
};
XRCMUXConnect: PROC [d: CMUXDescriptor, type: CMUXAddrType, addr: BytePtr, addrLen: CARD32] RETURNS [INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXConnect" };
CmuxAllocaterProc: TYPE ~ CProc;
XRCMUXReadMsg: UNSAFE PROC [d: CMUXDescriptor, ignoreAborts: BOOL, timeoutMsec: CARD32, allocaterFunc: CmuxAllocaterProc, allocaterClientData: REF] RETURNS [nBytesRead: INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXReadMsg" };
XRCMUXWriteMsg: PROC [d: CMUXDescriptor, hdr: BytePtr, hdrBytes: CARD32, body: BytePtr, bodyBytes: CARD32] RETURNS [INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXWriteMsg" };
XRCMUXBDTRead: UNSAFE PROC [d: CMUXDescriptor, buf: BytePtr, bufBytes: CARD32] RETURNS [nBytesRead: INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXBDTRead" };
XRCMUXBDTWrite: PROC [d: CMUXDescriptor, buf: BytePtr, bufBytes: CARD32] RETURNS [nBytesWritten: INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXBDTWrite" };
XRCMUXFinishBDT: PROC [d: CMUXDescriptor, source: BOOL, sendAbort: BOOL] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXFinishBDT" };
XRCMUXServerAdd: PROC [d: CMUXDescriptor, pgm: CARD32, loVersion: CARD32, hiVersion: CARD32] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXServerAdd" };
XRCMUXServerQuit: PROC [d: CMUXDescriptor] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXServerQuit" };
Handle Objects
BDTState: TYPE ~ {
ok, -- normal
eof, -- got eof on input
abort, -- got abort from other end
localAbort, -- aborted from this side
error -- got fatal I/O error
};
Phase: TYPE ~ {
inactive, 
msgIn,
msgOut,
bdtIn,
bdtOut
};
DataHandle: TYPE ~ REF DataObject;
DataObject: TYPE ~ RECORD [
next: DataHandle ¬ NIL,
handle: Handle ¬ NIL,
descriptor: CMUXDescriptor ¬ nullCMUXDescriptor,
lastUsed: Ticks ¬ 0,
buf: REF TEXT ¬ NIL,
inStream: STREAM ¬ NIL,
outStream: STREAM ¬ NIL,
stream: STREAM ¬ NIL,
bdtState: BDTState ¬ ok,
phase: Phase ¬ inactive,
tID: HWORD ¬ [0, 0], -- of current call
remote: REF XNS.Address ¬ NIL,
sendBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles
recvBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles
continuation: CrRPC.ContinuationProc ¬ NIL, -- for server handles
continuationClientData: REF -- for server handles
];
theClientProcs: REF CrRPC.ProcsObject
~ NEW[ CrRPC.ProcsObject ¬ [
destroy~CheckInToCache,
setParameters~SetParameters,
call~Call,
getBulkDataSource~ErrorGetBulkDataSource,
getBulkDataSink~ErrorGetBulkDataSink,
putBulkDataSource~PutBulkDataSource,
putBulkDataSink~PutBulkDataSink,
readBulkDataStream~ReadBulkDataStream,
writeBulkDataSegment~WriteBulkDataSegment,
callContinuation~CallContinuation,
setContinuation~ErrorSetContinuation,
finalize~Finalize
]
];
theServerProcs: REF CrRPC.ProcsObject
~ NEW[ CrRPC.ProcsObject ¬ [
destroy~CheckInToCache,
setParameters~SetParameters,
call~ErrorCall,
getBulkDataSource~GetBulkDataSource,
getBulkDataSink~GetBulkDataSink,
putBulkDataSource~ErrorPutBulkDataSource,
putBulkDataSink~ErrorPutBulkDataSink,
readBulkDataStream~ReadBulkDataStream,
writeBulkDataSegment~WriteBulkDataSegment,
callContinuation~ErrorCallContinuation,
setContinuation~SetContinuation,
finalize~ErrorFinalize
]
];
Error Reporting
RaiseError: PROC [h: Handle, reason: CrRPC.ErrorReason, errno: INT, msg: ROPE] ~ {
IF errno < 0 THEN errno ¬ -errno;
IF errno > 0 THEN msg ¬ IO.PutFR["%g (errno %g)", IO.rope[msg], IO.int[errno]];
IF reason = unknown THEN reason ¬ other;
IF debugMsgs THEN DebugMsg[IO.PutFR["RaiseError reason %g msg %g\n", IO.int[ORD[reason]], IO.rope[msg]]];
ERROR CrRPC.Error[h, reason, msg];
};
Parameters
Class-specific operations:
$GetRemote returns a REF to the XNS.Address of the remote site associated with this handle. (Works for either client or server handles).
SetParameters: CrRPC.SetParametersProc
[h: Handle, op: ATOM, argument: REF] RETURNS [hNew: Handle, result: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
hNew ¬ h;
SELECT op FROM
$GetRemote => result ¬ NEW[XNS.Address ¬ (dH.remote)­];
ENDCASE => ERROR CrRPC.Error[h, unknownOperation, NIL];
};
Client Call
ConnectToServer: PROC [h: Handle] ~ {
Connect to the remote server. If the connect fails, dH.descriptor will be left nullCMUXDescriptor.
ERRORS: CrRPC.Error.
dH: DataHandle ~ NARROW[h.data];
d: CMUXDescriptor;
ans: INT;
IF dH.descriptor # nullCMUXDescriptor THEN ERROR;
d ¬ XRCMUXCreate[];
ans ¬ ErrCodeFromCMUXDescriptor[d];
IF ans # 0 THEN RaiseError[h, communicationFailure, ans, "can't create CMUX descriptor"];
ans ¬ XRCMUXConnect[d, xnsAddr, LOOPHOLE[dH.remote], BYTES[XNS.Address]];
IF ans # 0 THEN {
XRCMUXDestroy[d];
RaiseError[h, cantConnectToRemote, ans, "can't set CMUX descriptor address"];
};
dH.descriptor ¬ d;
};
DisconnectFromServer: PROC [h: Handle] ~ {
Disconnect from the remote server, setting dH.descriptor to nullCMUXDescriptor.
ERRORS: None.
dH: DataHandle ~ NARROW[h.data];
IF dH.descriptor # nullCMUXDescriptor THEN {
XRCMUXDestroy[dH.descriptor];
dH.descriptor ¬ nullCMUXDescriptor;
};
};
ErrorCall: CrRPC.CallProc ~ {
RaiseError[h, notClientHandle, 0, "call using server handle"] };
GetMsgAllocater: -- CmuxAllocaterProc -- PROC [startIndex: CARD32, nBytes: CARD32, clientData: REF] RETURNS [BytePtr] ~ TRUSTED {
dH: DataHandle ¬ NARROW[clientData];
limit: CARD ¬ startIndex+nBytes;
IF dH.buf = NIL THEN {
IF startIndex # 0 THEN ERROR; -- should be less extreme ... ajd
dH.buf ¬ RefText.New[limit];
};
IF dH.buf.maxLength < startIndex THEN ERROR; -- should be less extreme ... ajd
IF dH.buf.maxLength < limit THEN {
newBuf: REF TEXT ¬ RefText.New[limit];
IF startIndex > 0 THEN TRUSTED {
[] ¬ Basics.ByteBlt[
from ~ [
blockPointer ~ LOOPHOLE[dH.buf],
startIndex ~ BYTES[TEXT[0]],
stopIndexPlusOne ~ BYTES[TEXT[0]] + startIndex
],
to ~ [
blockPointer ~ LOOPHOLE[newBuf],
startIndex ~ BYTES[TEXT[0]],
stopIndexPlusOne ~ BYTES[TEXT[0]] + startIndex
]
];
};
dH.buf ¬ newBuf;
};
RETURN[ LOOPHOLE[dH.buf, BytePtr] + BYTES[TEXT[0]] + startIndex ];
};
BDTGetProc: ProcStream.GetProc -- UNSAFE PROC [clientData: REF, offset: CARD, block: IO.UnsafeBlock] RETURNS[nBytesRead: INT, atEnd: BOOL] -- ~ {
dH: DataHandle ¬ NARROW[clientData];
IF debugMsgs THEN DebugMsg[IO.PutFR1["BDTGetProc req %g ", IO.int[block.count]]];
SELECT dH.bdtState FROM
ok => TRUSTED {
nBytesRead ¬ XRCMUXBDTRead[dH.descriptor, LOOPHOLE[block.base + block.startIndex], block.count];
IF debugMsgs THEN DebugMsg["calling CMUXBDRTRead, "];
};
eof, abort, localAbort => {
nBytesRead ¬ 0;
IF debugMsgs THEN DebugMsg["already eof/abort, "];
};
error => {
nBytesRead ¬ -EIO;
IF debugMsgs THEN DebugMsg["already error, "];
};
ENDCASE => ERROR;
atEnd ¬ TRUE;
SELECT nBytesRead FROM
block.count => atEnd ¬ FALSE;
(-EBADMSG) => { dH.bdtState ¬ abort; nBytesRead ¬ 0 };
< 0 => dH.bdtState ¬ error;
< block.count => IF dH.bdtState = ok THEN dH.bdtState ¬ eof;
ENDCASE => ERROR;
IF debugMsgs THEN DebugMsg[IO.PutFR["BDTGetProc ret bytes %g atEnd %g\n", IO.int[nBytesRead], IO.bool[atEnd]]];
};
BDTPutProc: ProcStream.PutProc -- PROC [clientData: REF, offset: CARD, block: IO.UnsafeBlock] RETURNS[nBytesWritten: INT] -- ~ {
dH: DataHandle ¬ NARROW[clientData];
SELECT dH.bdtState FROM
ok => TRUSTED {
nBytesWritten ¬ XRCMUXBDTWrite[dH.descriptor, LOOPHOLE[block.base + block.startIndex], block.count];
};
abort, localAbort => {
nBytesWritten ¬ block.count;
};
error => {
nBytesWritten ¬ -EIO;
};
eof => {
ERROR;
};
ENDCASE => ERROR;
SELECT nBytesWritten FROM
(-ERANGE) => { dH.bdtState ¬ abort; nBytesWritten ¬ block.count };
< 0 => dH.bdtState ¬ error;
ENDCASE => NULL;
};
Call: CrRPC.CallProc -- [h: Handle, remotePgm: CARD32, remotePgmVersion: CARD16, remoteProc: CARD16, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc] -- ~ {
dH: DataHandle ~ NARROW[h.data];
BDTCheckAbort: CrRPC.BulkDataCheckAbortProc ~ {
RETURN[ dH.bdtState = abort ];
};
Logic of Courier call begins here:
BEGIN
ENABLE {
IO.Error -- [ec, stream] -- => {
msg: ROPE ¬ IO.PutFR1[ "I/O Error, ec %g", IO.card[ORD[ec]] ];
SELECT dH.phase FROM
bdtIn, bdtOut => {
tCode: INT; tMsg: ROPE;
[code~tCode, msg~tMsg] ¬ ProcStream.GetErrorDetails[stream];
msg ¬ IO.PutFR["%g (BDT error %g: %g)", IO.rope[msg], IO.int[ABS[tCode]], IO.rope[tMsg]];
};
ENDCASE;
RaiseError[h, communicationFailure, 0, msg];
};
IO.EndOfStream -- [stream] -- => {
msg: ROPE ¬ ( SELECT dH.phase FROM
msgOut => "I/O EndOfStream, args send (!)",
msgIn => "I/O EndOfStream, results recv",
bdtIn => "I/O EndOfStream, BDT read",
bdtOut => "I/O EndOfStream, BDT write (!)",
ENDCASE => "I/O EndOfStream, inactive (!)" );
RaiseError[h, remoteClose, 0, msg];
};
RuntimeError.BoundsFault => {
RaiseError[h, protocolError, 0, "data bounds fault"];
};
};
IF dH.descriptor = nullCMUXDescriptor THEN ConnectToServer[h];
dH.sendBulkData ¬ NIL;
dH.recvBulkData ¬ NIL;
Format call message ...
dH.phase ¬ msgOut;
dH.outStream ¬ IO.TOS[dH.buf, dH.outStream]; dH.buf ¬ NIL;
IF putArgs # NIL THEN putArgs[h, dH.outStream];
IO.Close[dH.outStream]; ???
dH.buf ¬ IO.TextFromTOS[dH.outStream];
BEGIN
ENABLE UNWIND => DisconnectFromServer[h];
Send call message ...
TRUSTED {
callMsgHdr: CrRPCBackdoor.CallMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.callMsgType],
callHdr~[
tID~[0, 0],
pgmNum~Basics.FFromCard32[remotePgm],
pgmVersion~Basics.HFromCard16[remotePgmVersion],
procNum~Basics.HFromCard16[remoteProc]
]
];
ans: INT;
callMsgHdr.callHdr.tID ← dH.tID ← ???; (unused).
IF debugMsgs THEN DebugMsg[IO.PutFR["call pgm %g proc %g args %g bytes\n", IO.card[remotePgm], IO.card[remoteProc], IO.card[dH.buf.length]]];
ans ¬ XRCMUXWriteMsg[dH.descriptor,
LOOPHOLE[@callMsgHdr], CrRPCBackdoor.callMsgHdrBytes,
LOOPHOLE[dH.buf, BytePtr] + BYTES[TEXT[0]], dH.buf.length];
IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't send call msg"];
};
Do BDT transfer ...
BEGIN
bdtProc: CrRPC.BulkDataXferProc ¬ NIL;
localAbort: BOOL;
ans: INT;
SELECT TRUE FROM
(dH.sendBulkData # NIL) => {
need TRUSTED here now for Cedar10.0
TRUSTED { bdtProc ¬ dH.sendBulkData }; dH.phase ¬ bdtIn;
};
(dH.recvBulkData # NIL) => {
need TRUSTED here now for Cedar10.0
TRUSTED { bdtProc ¬ dH.recvBulkData }; dH.phase ¬ bdtOut;
};
ENDCASE;
IF bdtProc # NIL THEN {
dH.bdtState ¬ ok;
dH.stream ¬ ProcStream.PIOS[dH, BDTGetProc, BDTPutProc, NIL, NIL, dH.stream, FALSE, FALSE];
IF debugMsgs THEN DebugMsg[IO.PutFR["BDT xfer stream %x (%g)\n", IO.int[LOOPHOLE[dH.stream]], IO.rope[IF dH.sendBulkData # NIL THEN "source" ELSE "sink"]]];
localAbort ¬ bdtProc[h, dH.stream, BDTCheckAbort];
IF (dH.sendBulkData # NIL) AND (NOT localAbort)
THEN IO.Flush[dH.stream];
ans ¬ XRCMUXFinishBDT[dH.descriptor, TRUE, localAbort];
IF ans < 0 THEN RaiseError[h, bulkDataError, ans, "bulk data transfer failed"];
IF debugMsgs THEN DebugMsg["BDT xfer done"];
IF dH.recvBulkData # NIL THEN {
charsRead, charsUnread: INT;
charsRead ¬ IO.GetIndex[dH.stream];
IF debugMsgs THEN DebugMsg[IO.PutFR1[" (%g bytes read, ", IO.int[charsRead]]];
charsUnread ¬ IO.CharsAvail[dH.stream, FALSE];
IF charsUnread = INT.LAST
THEN { IF debugMsgs THEN DebugMsg["end of stream)"] }
ELSE { IF debugMsgs THEN DebugMsg[IO.PutFR1["%g unread", IO.int[charsUnread]]] };
};
IF debugMsgs THEN DebugMsg["\n"];
};
END;
Read reply message ...
TRUSTED {
ans: INT;
dH.phase ¬ msgIn;
ans ¬ XRCMUXReadMsg[dH.descriptor, TRUE, waitForever, XRCProcFromMProc[LOOPHOLE[GetMsgAllocater]], dH];
IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't read reply msg"];
IF debugMsgs THEN DebugMsg[IO.PutFR["reply pgm %g proc %g reply %g bytes\n", IO.card[remotePgm], IO.card[remoteProc], IO.int[ans]]];
dH.buf.length ¬ ans;
};
END;
Interpret reply message ...
BEGIN
msgType: HWORD;
dH.inStream ¬ IO.TIS[dH.buf]; dH.buf ¬ NIL;
msgType ¬ CrRPC.GetHWord[dH.inStream];
SELECT msgType FROM
CrRPCBackdoor.rejectMsgType => {
rejectReason: CARDINAL;
[] ¬ CrRPC.GetHWord[dH.inStream];
rejectReason ¬ CrRPC.GetCard16[dH.inStream];
RaiseError[h,
(SELECT rejectReason FROM
CrRPC.noSuchProgram => rejectedNoSuchProgram,
CrRPC.noSuchVersion => rejectedNoSuchVersion,
CrRPC.noSuchProcedure => rejectedNoSuchProcedure,
CrRPC.invalidArgument => rejectedInvalidArgument,
ENDCASE => rejectedUnspecified),
0,
"rejected"];
};
CrRPCBackdoor.returnMsgType => {
[] ¬ CrRPC.GetHWord[dH.inStream];
IF getResults # NIL THEN getResults[h, dH.inStream];
};
CrRPCBackdoor.abortMsgType => {
errNum: CARDINAL;
[] ¬ CrRPC.GetHWord[dH.inStream];
errNum ¬ CrRPC.GetCard16[dH.inStream];
IF getError # NIL THEN getError[h, dH.inStream, errNum];
RaiseError[h, remoteError, 0, "unexpected remote error"];
};
ENDCASE => {
DisconnectFromServer[h];
RaiseError[h, protocolError, 0, IO.PutFR1["bad msgType (%g) in response", IO.card[Basics.Card16FromH[msgType]]]];
};
END;
dH.phase ¬ inactive;
END;
}; -- of Call
Server Process
RefreshServerCMUXDescriptor: PROC [h: Handle, pgm: CARD32, pgmVersion: CARD16] ~ {
ans: INT;
dH: DataHandle;
d: CMUXDescriptor;
dH ¬ NARROW[h.data];
IF dH.descriptor # nullCMUXDescriptor THEN RETURN;
d ¬ XRCMUXCreate[];
ans ¬ ErrCodeFromCMUXDescriptor[d];
IF ans # 0 THEN RaiseError[h, communicationFailure, ans, "can't create CMUX descriptor"];
ans ¬ XRCMUXServerAdd[d, pgm, CARD32[pgmVersion], CARD32[pgmVersion]];
IF ans # 0 THEN {
XRCMUXDestroy[d];
RaiseError[h, communicationFailure, ans, "can't add service to CMUX"];
};
dH.descriptor ¬ d;
};
SmashServerCMUXDescriptor: PROC [h: Handle] ~ {
dH: DataHandle;
d: CMUXDescriptor;
dH ¬ NARROW[h.data];
IF (d ¬ dH.descriptor) = nullCMUXDescriptor THEN RETURN;
[] ¬ XRCMUXServerQuit[d];
XRCMUXDestroy[d];
dH.descriptor ¬ nullCMUXDescriptor;
};
CleanupServerBDT: PROC [h: Handle, dH: DataHandle] ~ {
ans: INT ¬ 0;
SELECT dH.phase FROM
bdtIn => {
ans ¬ XRCMUXFinishBDT[dH.descriptor, FALSE, (dH.bdtState = localAbort)];
};
bdtOut => {
IF dH.bdtState = ok THEN IO.Flush[dH.stream];
ans ¬ XRCMUXFinishBDT[dH.descriptor, TRUE, (dH.bdtState = localAbort)];
};
ENDCASE;
IF ans < 0 THEN RaiseError[h, bulkDataError, ans, "bulk data transfer failed"];
dH.phase ¬ msgOut;
};
BeginReturn: CrRPC.BeginReturnProc -- [h: Handle] -- ~ {
dH: DataHandle ¬ NARROW[h.data];
s: STREAM;
CleanupServerBDT[h, dH];
s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL;
CrRPC.PutHWord[s, CrRPCBackdoor.returnMsgType];
CrRPC.PutHWord[s, dH.tID];
};
BeginError: CrRPC.BeginErrorProc -- [h: Handle, errNum: CARDINAL] -- ~ {
dH: DataHandle ¬ NARROW[h.data];
s: STREAM;
CleanupServerBDT[h, dH];
s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL;
CrRPC.PutHWord[s, CrRPCBackdoor.abortMsgType];
CrRPC.PutHWord[s, dH.tID];
CrRPC.PutCard16[s, errNum];
};
BeginReject: CrRPC.BeginRejectProc -- [h: Handle, rejectReason: CARDINAL] -- ~ {
dH: DataHandle ¬ NARROW[h.data];
s: STREAM;
CleanupServerBDT[h, dH];
s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL;
CrRPC.PutHWord[s, CrRPCBackdoor.rejectMsgType];
CrRPC.PutHWord[s, dH.tID];
CrRPC.PutCard16[s, rejectReason];
};
ServerDaemon: PROC [h: Handle, pgm: CARD32, pgmVersion: CARD16, serverProc: CrRPC.ServerProc, stopServerQueryProc: CrRPC.StopServerQueryProc] ~ {
dH: DataHandle ¬ NARROW[h.data];
pgmF: FWORD ¬ Basics.FFromCard32[pgm];
pgmVersionH: HWORD ¬ Basics.HFromCard16[pgmVersion];
DO
BEGIN
ENABLE {
IO.Error -- [ec, stream] -- => {
IF debugMsgs THEN {
DebugMsg[IO.PutFR1[ "I/O Error, ec %g, ", IO.card[ORD[ec]] ]];
DebugMsg[IO.PutFR1[ "phase %g\n", IO.card[ORD[dH.phase]] ]];
};
CONTINUE;
};
IO.EndOfStream -- [stream] -- => {
IF debugMsgs THEN {
DebugMsg[ "I/O end of stream\n" ];
};
CONTINUE;
};
CrRPC.Error -- [h, errorReason, text] -- => {
IF debugMsgs THEN {
DebugMsg[IO.PutFR[ "CrRPC.Error %g (%g)\n", IO.card[ORD[errorReason]], IO.rope[text] ]];
};
CONTINUE;
};
};
DO
procNum: CARD16;
IF dH.descriptor = nullCMUXDescriptor THEN EXIT;
IF stopServerQueryProc # NIL
THEN IF stopServerQueryProc[h]
THEN GOTO Done;
dH.phase ¬ msgIn;
At this point we have a handle with a valid descriptor in it.
Read call message ...
TRUSTED BEGIN
ans: INT;
ans ¬ XRCMUXReadMsg[dH.descriptor, TRUE, serverGetTimeoutMsec, XRCProcFromMProc[LOOPHOLE[GetMsgAllocater]], dH];
IF ans <= 0 THEN {
IF ans = (-ETIMEDOUT) THEN LOOP;
IF debugMsgs THEN DebugMsg[IO.PutFR1[ "CMUXReadMsg error %g\n", IO.int[ans] ]];
EXIT;
};
IF debugMsgs THEN DebugMsg[IO.PutFR["got call, pgm %g, %g bytes\n", IO.card[pgm], IO.int[ans]]];
dH.buf.length ¬ ans;
END;
Interpret call message ...
BEGIN
msgType: HWORD;
s: STREAM;
s ¬ dH.stream ¬ IO.TIS[dH.buf, dH.stream]; dH.buf ¬ NIL;
msgType ¬ CrRPC.GetHWord[dH.stream];
SELECT msgType FROM
CrRPCBackdoor.callMsgType => {
dH.tID ¬ CrRPC.GetHWord[s];
IF CrRPC.GetFWord[s] # pgmF
THEN RaiseError[h, other, 0, "CMUX server pgm # botch"];
IF CrRPC.GetHWord[s] # pgmVersionH
THEN RaiseError[h, other, 0, "CMUX server version # botch"];
procNum ¬ CrRPC.GetCard16[s]
};
ENDCASE => {
IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUX bad msg %g\n", IO.card[Basics.Card16FromH[msgType]]]];
RaiseError[h, protocolError, 0, "CMUX delivered non-call msg"];
};
END;
Call server proc ...
BEGIN
serverProc[h, dH.stream, pgm, pgmVersion, procNum, BeginReturn, BeginError, BeginReject];
END;
Send the return/reject/error message ...
BEGIN
ans: INT;
b: REF TEXT;
IF dH.phase # msgOut THEN {
IF debugMsgs THEN DebugMsg[IO.PutFR1["bad phase after server proc %g\n", IO.card[ORD[dH.phase]]]];
};
b ¬ dH.buf ¬ IO.TextFromTOS[dH.stream];
IF debugMsgs THEN DebugMsg[IO.PutFR["sending reply, pgm %g proc %g len %g\n", IO.card[pgm], IO.card[procNum], IO.card[b.length]]];
ans ¬ XRCMUXWriteMsg[dH.descriptor,
LOOPHOLE[b, BytePtr] + BYTES[TEXT[0]], b.length,
NIL, 0];
IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't send reply msg"];
END;
dH.phase ¬ inactive;
ENDLOOP;
END;
SmashServerCMUXDescriptor[h];
IF debugMsgs THEN DebugMsg["CMUX server pause\n"];
Process.PauseMsec[serverPauseMsec];
RefreshServerCMUXDescriptor[h, pgm, pgmVersion ! CrRPC.Error => CONTINUE ];
REPEAT
Done => NULL;
ENDLOOP;
SmashServerCMUXDescriptor[h];
};
StartServerInstance: CrRPCBackdoor.StartServerInstanceProc
[pgm: CARD32, pgmVersion: CARD16, local: CrRPC.RefAddress]
~ {
h: Handle;
dH: DataHandle;
serverProc: CrRPC.ServerProc;
stopServerQueryProc: CrRPC.StopServerQueryProc;
[serverProc, stopServerQueryProc] ¬ CrRPCBackdoor.LookUpServerProcs[pgm, pgmVersion];
IF serverProc = NIL THEN RaiseError[NIL, rejectedNoSuchProgram, 0, "no server proc for remote program"];
dH ¬ NEW[DataObject ¬ []];
h ¬ NEW[CrRPC.Object ¬ [
class~myHandleClass,
kind~server,
procs~theServerProcs,
data~dH,
clientData~NIL
]];
dH.handle ¬ h;
RefreshServerCMUXDescriptor[h, pgm, pgmVersion];
TRUSTED {
Process.Detach[ FORK ServerDaemon[h, pgm, pgmVersion, serverProc, stopServerQueryProc] ];
};
};
I/O and Marshalling
ErrorPutBulkDataSource: CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc]-- ~ {
RaiseError[h, notClientHandle, 0, "putBDTSource on server handle"];
};
PutBulkDataSource: CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ {
dH: DataHandle ~ NARROW[h.data];
IF proc = NIL
THEN {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]];
TRUSTED { dH.sendBulkData ¬ proc }};
};
ErrorPutBulkDataSink: CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ {
RaiseError[h, notClientHandle, 0, "putBDTSink on server handle"];
};
PutBulkDataSink: CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ {
dH: DataHandle ~ NARROW[h.data];
IF proc = NIL
THEN {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]];
TRUSTED { dH.recvBulkData ¬ proc }};
};
PutBulkDataSinkBuggy: PUBLIC CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ {
This is a workaround for a bug in the AdobeServer.
AdobeCourierServerImplA.GetSystemDefaultUserFile expects a
bulk data transfer proc, but doesn't deserriallize the bulk data transfer tag.
dH: DataHandle ~ NARROW[h.data];
TRUSTED { dH.recvBulkData ¬ proc };
};
ErrorGetBulkDataSource: PROC [h: Handle, s: STREAM]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
RaiseError[h, notServerHandle, 0, "GetBDTSource on client handle"];
};
GetBulkDataSource: PROC [h: Handle, s: STREAM]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
typeCode ¬ CrRPC.GetCard16[s];
SELECT typeCode FROM
ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => {
bulkDataSource ¬ NIL };
ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED {
bulkDataSource ¬ ServerRecvBulkData };
ORD[CrRPCBackdoor.BulkDataDescriptorType.active],
ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => {
RaiseError[h, notImplemented, 0, "no third party bulk data"] };
ENDCASE => {
RaiseError[h, protocolError, 0, "bad bulkDataSource type"] };
};
ServerRecvBulkData: CrRPC.BulkDataXferProc -- [h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL] -- ~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 2048;
b: REF TEXT ~ RefText.ObtainScratch[bufSize];
nReadSoFar: CARD ¬ 0;
dH.phase ¬ bdtIn; dH.bdtState ¬ ok;
DO
nBytes: INT;
atEnd: BOOL;
TRUSTED {
[nBytes, atEnd] ¬ BDTGetProc[dH, nReadSoFar, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~bufSize]];
};
IF nBytes < 0 THEN RaiseError[h, communicationFailure, -nBytes, "BDT read failure"];
IF (nBytes > 0) THEN {
nReadSoFar ¬ nReadSoFar + nBytes;
IO.UnsafePutBlock[s, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~nBytes]];
};
IF atEnd THEN EXIT;
IF checkAbort[h].abort THEN {
dH.bdtState ¬ localAbort;
EXIT;
};
ENDLOOP;
RefText.ReleaseScratch[b];
RETURN [dH.bdtState = abort];
};
ErrorGetBulkDataSink: PROC [h: Handle, s: STREAM]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
RaiseError[h, notServerHandle, 0, "GetBDTSink on client handle"];
};
GetBulkDataSink: PROC [h: Handle, s: STREAM]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
typeCode ¬ CrRPC.GetCard16[s];
SELECT typeCode FROM
ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => {
bulkDataSink ¬ NIL };
ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED {
bulkDataSink ¬ ServerSendBulkData };
ORD[CrRPCBackdoor.BulkDataDescriptorType.active],
ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => {
RaiseError[h, notImplemented, 0, "no third party bulk data"] };
ENDCASE => {
RaiseError[h, protocolError, 0, "bad bulkDataSink type"] };
};
ServerSendBulkData: CrRPC.BulkDataXferProc -- [h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL] -- ~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ~ RefText.ObtainScratch[bufSize];
nWrittenSoFar: CARD ¬ 0;
dH.phase ¬ bdtOut; dH.bdtState ¬ ok;
DO
nRead, nWritten: INT;
nRead ¬ IO.GetBlock[s, b];
IF nRead <= 0 THEN EXIT;
nWritten ¬ BDTPutProc[dH, nWrittenSoFar, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~nRead]];
IF dH.bdtState # ok THEN EXIT;
IF checkAbort[h].abort THEN {
dH.bdtState ¬ localAbort;
EXIT;
};
ENDLOOP;
RefText.ReleaseScratch[b];
RETURN[ dH.bdtState = abort ];
};
Bulk Data Streams
nextSegment: CARD16 ~ 0;
lastSegment: CARD16 ~ 1;
ReadBulkDataStream: CrRPC.ReadBulkDataStreamProc
[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, readValue: BulkDataValueProc] RETURNS [abort: BOOL]
~ {
segmentKind: CARD16;
len: CARD16;
DO
IF checkAbort[h] THEN RETURN[FALSE];
segmentKind ¬ CrRPC.GetCard16[s];
len ¬ CrRPC.GetCard16[s];
THROUGH [1..len] DO
IF checkAbort[h] THEN RETURN[FALSE];
IF readValue[s].abort THEN RETURN[TRUE];
ENDLOOP;
IF segmentKind = lastSegment THEN RETURN[FALSE];
ENDLOOP;
};
WriteBulkDataSegment: CrRPC.WriteBulkDataSegmentProc
[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, writeValue: BulkDataValueProc, n: CARDINAL] RETURNS [abort: BOOL, heAborted: BOOL]
~ {
CrRPC.PutCard16[s, (IF n > 0 THEN nextSegment ELSE lastSegment)];
CrRPC.PutCard16[s, n];
abort ¬ heAborted ¬ FALSE;
THROUGH [1..n] DO
IF checkAbort[h] THEN { heAborted ¬ TRUE; RETURN };
IF writeValue[s].abort THEN { abort ¬ TRUE; RETURN };
ENDLOOP;
};
Continuations
ErrorCallContinuation: CrRPC.CallContinuationProc ~ {
RaiseError[h, notClientHandle, 0, "CallContinuation on server handle"] };
CallContinuation: CrRPC.CallContinuationProc
[h: Handle, proc: ContinuationProc, clientData: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
RaiseError[h, notImplemented, 0, "continuations not implemented"];
};
ErrorSetContinuation: CrRPC.SetContinuationProc ~ {
RaiseError[h, notServerHandle, 0, "SetContinuation on client handle"] };
SetContinuation: CrRPC.SetContinuationProc
[h: Handle, proc: ContinuationProc, clientData: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
RaiseError[h, notImplemented, 0, "continuations not implemented"];
TRUSTED { dH.continuation ¬ proc };
dH.continuationClientData ¬ clientData };
Cache of Handles
We maintain a cache of recently used client handles with their transport intact. If a handle isn't used for awhile, it times out, the transport is closed and the handle destroyed.
cache: DataHandle ¬ NIL;
cacheTimeoutTicks: Ticks ¬ Process.SecondsToTicks[8];
cacheSweepIntervalTicks: Ticks ¬ Process.SecondsToTicks[8];
Now: PROC RETURNS [Ticks] ~ TRUSTED MACHINE CODE { "XR←TicksSinceBoot" };
TicksSince: PROC [then: Ticks] RETURNS [Ticks] ~ { RETURN [ Now[] - then ] };
CheckOutFromCache: ENTRY PROC [remote: XNS.Address] RETURNS [h: Handle ¬ NIL] ~ {
dH: DataHandle ¬ cache;
prevH: DataHandle ¬ NIL;
WHILE dH # NIL DO
IF (dH.remote)­ = remote THEN {
IF prevH = NIL THEN cache ¬ dH.next ELSE prevH.next ¬ dH.next;
h ¬ dH.handle;
dH.handle ¬ NIL; -- for finalization
dH.lastUsed ¬ Now[];
EXIT };
prevH ¬ dH; dH ¬ dH.next;
ENDLOOP;
};
CheckInToCache: ENTRY PROC [h: Handle] ~ {
dH: DataHandle ¬ NARROW[h.data];
IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUXCheckIn d=%g\n", IO.int[dH.descriptor]]];
dH.handle ¬ h;
dH.lastUsed ¬ Now[];
dH.next ¬ cache;
cache ¬ dH };
CacheSweeper: PROC ~ {
GetDeadHandles: ENTRY PROC RETURNS [deadList: DataHandle ¬ NIL] ~ {
dH, newCache, newCacheTail: DataHandle ¬ NIL;
WHILE cache # NIL DO
dH ¬ cache; cache ¬ cache.next;
IF TicksSince[dH.lastUsed] <= cacheTimeoutTicks
THEN -- add to new cache -- {
dH.next ¬ NIL;
IF newCacheTail = NIL THEN newCache ¬ dH ELSE newCacheTail.next ¬ dH;
newCacheTail ¬ dH }
ELSE -- add to dead list -- {
dH.next ¬ deadList;
deadList ¬ dH };
ENDLOOP;
cache ¬ newCache;
};
DestroyDeadHandles: PROC [deadList: DataHandle] ~ {
dH: DataHandle;
IF deadList # NIL THEN {
IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUXCacheSweeper %g\n", IO.card[LOOPHOLE[deadList]]]];
};
WHILE deadList # NIL DO
dH ¬ deadList; deadList ¬ dH.next;
BuryDeadHandle[dH.handle];
ENDLOOP;
};
DO
DestroyDeadHandles[ GetDeadHandles[] ];
Process.Pause[ cacheSweepIntervalTicks ];
ENDLOOP;
};
BuryDeadHandle: PROC [h: Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
dH.handle ¬ NIL; -- break cycle for GC
dH.stream ¬ NIL; -- break cycle with ProcStream
IF debugMsgs THEN DebugMsg[IO.PutFR1["BuryDeahHandle d=%g\n", IO.int[dH.descriptor]]];
DisconnectFromServer[h];
h.data ¬ NIL;
};
Handle Creation and Destruction
ErrorFinalize: PROC [h: Handle] RETURNS [renewHandle: BOOL] ~ { ERROR };
Finalize: PROC [h: Handle] RETURNS [renewHandle: BOOL ¬ FALSE] ~ {
A client has dropped the handle h, which is guaranteed not to be in the cache.
dH: DataHandle ~ NARROW[h.data];
IF dH = NIL THEN -- handle is already dead -- RETURN;
IF debugMsgs THEN DebugMsg[IO.PutFR1["Finalizing d=%g\n", IO.int[dH.descriptor]]];
IF TicksSince[dH.lastUsed] <= cacheTimeoutTicks THEN {
CheckInToCache[h];
RETURN [TRUE]; };
BuryDeadHandle[h];
RETURN [FALSE];
};
CreateClientHandle: CrRPCBackdoor.CreateClientHandleProc
[remote: CrRPC.RefAddress] RETURNS [Handle]
~ {
handle: Handle;
dH: DataHandle;
addr: REF XNS.Address;
IF remote = NIL THEN
RaiseError[NIL, addressInappropriateForClass, 0, "missing remote address"];
WITH remote SELECT FROM
rA: REF XNS.Address => addr ¬ NEW[XNS.Address ¬ (rA­)];
ENDCASE => RaiseError[NIL, addressInappropriateForClass, 0, "not XNS address"];
IF ( alwaysSmashTheSocket ) OR ( addr.socket = XNS.unknownSocket )
THEN addr.socket ¬ XNSWKS.courier;
handle ¬ CheckOutFromCache[addr­];
IF handle = NIL THEN {
dH ¬ NEW[DataObject ¬ [lastUsed~Now[], remote~addr]];
handle ¬ CrRPCBackdoor.NewClientObject[class~$CMUX, procs~theClientProcs, data~dH];
};
RETURN [handle];
};
Startup
CrRPCBackdoor.RegisterCreateClientHandleProc[myHandleClass, CreateClientHandle];
CrRPCBackdoor.RegisterStartServerInstanceProc[myHandleClass, StartServerInstance];
TRUSTED { Process.Detach[ FORK CacheSweeper[] ] };
}.