Copied Types and Constants
byteAddressedOrElse: BOOL[TRUE..TRUE] ¬ (BYTES[INT32] = UNITS[INT32]);
BytePtr: TYPE ~ LONG POINTER TO BYTE;
alwaysSmashTheSocket: BOOL ¬ TRUE;
myHandleClass: CrRPC.HandleClass ¬ $CMUX;
serverPauseMsec: CARD32 ¬ 2500; -- after bad error
serverGetTimeoutMsec: CARD32 ¬ 2500; -- controls responsiveness of server shutdown
waitForever: CARD32 ¬ 0;
FWORD: TYPE ~ Basics.FWORD;
HWORD: TYPE ~ Basics.HWORD;
ROPE: TYPE ~ Rope.ROPE;
STREAM: TYPE ~ IO.STREAM;
Ticks: TYPE ~ Process.Ticks;
Handle: TYPE ~ CrRPC.Handle;
Glue
CProc: TYPE ~ RECORD[CARD32]; -- Opaque C procedure
XRCProcFromMProc: PROC [PROC] RETURNS [CProc] ~ TRUSTED MACHINE CODE { "XR𡤌ProcFromMProc" };
CMUXDescriptor: TYPE ~ RECORD [ val: INT32 ];
nullCMUXDescriptor: CMUXDescriptor ~ [-1];
ErrCodeFromCMUXDescriptor: PROC [d: CMUXDescriptor] RETURNS [INT] ~ INLINE { RETURN [IF d.val >= 0 THEN 0 ELSE d.val] };
EIO: INT ~ 5;
ERANGE: INT ~ 34;
ETIMEDOUT: INT ~ 60;
EBADMSG: INT ~ 76;
XRCMUXCreate:
PROC
RETURNS [CMUXDescriptor] ~
TRUSTED
MACHINE
CODE { "XR𡤌MUXCreate" };
XRCMUXDestroy:
PROC [d: CMUXDescriptor] ~
TRUSTED
MACHINE
CODE { "XR𡤌MUXDestroy" };
CMUXAddrType:
TYPE ~
MACHINE
DEPENDENT {
defaultAddr(0),
defaultName(1),
xnsAddr(2),
xnsName(3),
last(CARD.LAST)
};
XRCMUXConnect:
PROC [d: CMUXDescriptor, type: CMUXAddrType, addr: BytePtr, addrLen:
CARD32]
RETURNS [
INT] ~
TRUSTED
MACHINE
CODE { "XR𡤌MUXConnect" };
CmuxAllocaterProc:
TYPE ~ CProc;
XRCMUXReadMsg:
UNSAFE
PROC [d: CMUXDescriptor, ignoreAborts:
BOOL, timeoutMsec:
CARD32, allocaterFunc: CmuxAllocaterProc, allocaterClientData:
REF]
RETURNS [nBytesRead:
INT] ~
TRUSTED
MACHINE
CODE { "XR𡤌MUXReadMsg" };
XRCMUXWriteMsg:
PROC [d: CMUXDescriptor, hdr: BytePtr, hdrBytes:
CARD32, body: BytePtr, bodyBytes:
CARD32]
RETURNS [
INT] ~
TRUSTED
MACHINE
CODE { "XR𡤌MUXWriteMsg" };
XRCMUXBDTRead:
UNSAFE
PROC [d: CMUXDescriptor, buf: BytePtr, bufBytes:
CARD32]
RETURNS [nBytesRead:
INT] ~
TRUSTED
MACHINE
CODE { "XR𡤌MUXBDTRead" };
XRCMUXBDTWrite:
PROC [d: CMUXDescriptor, buf: BytePtr, bufBytes:
CARD32]
RETURNS [nBytesWritten:
INT] ~
TRUSTED
MACHINE
CODE { "XR𡤌MUXBDTWrite" };
XRCMUXFinishBDT: PROC [d: CMUXDescriptor, source: BOOL, sendAbort: BOOL] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXFinishBDT" };
XRCMUXServerAdd: PROC [d: CMUXDescriptor, pgm: CARD32, loVersion: CARD32, hiVersion: CARD32] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXServerAdd" };
XRCMUXServerQuit: PROC [d: CMUXDescriptor] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR𡤌MUXServerQuit" };
Handle Objects
BDTState:
TYPE ~ {
ok, -- normal
eof, -- got eof on input
abort, -- got abort from other end
localAbort, -- aborted from this side
error -- got fatal I/O error
};
Phase:
TYPE ~ {
inactive,
msgIn,
msgOut,
bdtIn,
bdtOut
};
DataHandle: TYPE ~ REF DataObject;
DataObject:
TYPE ~
RECORD [
next: DataHandle ¬ NIL,
handle: Handle ¬ NIL,
descriptor: CMUXDescriptor ¬ nullCMUXDescriptor,
lastUsed: Ticks ¬ 0,
buf: REF TEXT ¬ NIL,
inStream: STREAM ¬ NIL,
outStream: STREAM ¬ NIL,
stream: STREAM ¬ NIL,
bdtState: BDTState ¬ ok,
phase: Phase ¬ inactive,
tID: HWORD ¬ [0, 0], -- of current call
remote: REF XNS.Address ¬ NIL,
sendBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles
recvBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles
continuation: CrRPC.ContinuationProc ¬ NIL, -- for server handles
continuationClientData: REF -- for server handles
];
theClientProcs:
REF CrRPC.ProcsObject
~
NEW[ CrRPC.ProcsObject ¬ [
destroy~CheckInToCache,
setParameters~SetParameters,
call~Call,
getBulkDataSource~ErrorGetBulkDataSource,
getBulkDataSink~ErrorGetBulkDataSink,
putBulkDataSource~PutBulkDataSource,
putBulkDataSink~PutBulkDataSink,
readBulkDataStream~ReadBulkDataStream,
writeBulkDataSegment~WriteBulkDataSegment,
callContinuation~CallContinuation,
setContinuation~ErrorSetContinuation,
finalize~Finalize
]
];
theServerProcs:
REF CrRPC.ProcsObject
~
NEW[ CrRPC.ProcsObject ¬ [
destroy~CheckInToCache,
setParameters~SetParameters,
call~ErrorCall,
getBulkDataSource~GetBulkDataSource,
getBulkDataSink~GetBulkDataSink,
putBulkDataSource~ErrorPutBulkDataSource,
putBulkDataSink~ErrorPutBulkDataSink,
readBulkDataStream~ReadBulkDataStream,
writeBulkDataSegment~WriteBulkDataSegment,
callContinuation~ErrorCallContinuation,
setContinuation~SetContinuation,
finalize~ErrorFinalize
]
];
Error Reporting
RaiseError:
PROC [h: Handle, reason: CrRPC.ErrorReason, errno:
INT, msg:
ROPE] ~ {
IF errno < 0 THEN errno ¬ -errno;
IF errno > 0 THEN msg ¬ IO.PutFR["%g (errno %g)", IO.rope[msg], IO.int[errno]];
IF reason = unknown THEN reason ¬ other;
IF debugMsgs THEN DebugMsg[IO.PutFR["RaiseError reason %g msg %g\n", IO.int[ORD[reason]], IO.rope[msg]]];
ERROR CrRPC.Error[h, reason, msg];
};
Client Call
ConnectToServer:
PROC [h: Handle] ~ {
Connect to the remote server. If the connect fails, dH.descriptor will be left nullCMUXDescriptor.
ERRORS: CrRPC.Error.
dH: DataHandle ~ NARROW[h.data];
d: CMUXDescriptor;
ans: INT;
IF dH.descriptor # nullCMUXDescriptor THEN ERROR;
d ¬ XRCMUXCreate[];
ans ¬ ErrCodeFromCMUXDescriptor[d];
IF ans # 0 THEN RaiseError[h, communicationFailure, ans, "can't create CMUX descriptor"];
ans ¬ XRCMUXConnect[d, xnsAddr, LOOPHOLE[dH.remote], BYTES[XNS.Address]];
IF ans # 0
THEN {
XRCMUXDestroy[d];
RaiseError[h, cantConnectToRemote, ans, "can't set CMUX descriptor address"];
};
dH.descriptor ¬ d;
};
DisconnectFromServer:
PROC [h: Handle] ~ {
Disconnect from the remote server, setting dH.descriptor to nullCMUXDescriptor.
ERRORS: None.
dH: DataHandle ~ NARROW[h.data];
IF dH.descriptor # nullCMUXDescriptor
THEN {
XRCMUXDestroy[dH.descriptor];
dH.descriptor ¬ nullCMUXDescriptor;
};
};
ErrorCall: CrRPC.CallProc ~ {
RaiseError[h, notClientHandle, 0, "call using server handle"] };
GetMsgAllocater:
-- CmuxAllocaterProc --
PROC [startIndex:
CARD32, nBytes:
CARD32, clientData:
REF]
RETURNS [BytePtr] ~
TRUSTED {
dH: DataHandle ¬ NARROW[clientData];
limit: CARD ¬ startIndex+nBytes;
IF dH.buf =
NIL THEN {
IF startIndex # 0 THEN ERROR; -- should be less extreme ... ajd
dH.buf ¬ RefText.New[limit];
};
IF dH.buf.maxLength < startIndex THEN ERROR; -- should be less extreme ... ajd
IF dH.buf.maxLength < limit
THEN {
newBuf: REF TEXT ¬ RefText.New[limit];
IF startIndex > 0
THEN
TRUSTED {
[] ¬ Basics.ByteBlt[
from ~ [
blockPointer ~ LOOPHOLE[dH.buf],
startIndex ~ BYTES[TEXT[0]],
stopIndexPlusOne ~ BYTES[TEXT[0]] + startIndex
],
to ~ [
blockPointer ~ LOOPHOLE[newBuf],
startIndex ~ BYTES[TEXT[0]],
stopIndexPlusOne ~ BYTES[TEXT[0]] + startIndex
]
];
};
dH.buf ¬ newBuf;
};
RETURN[ LOOPHOLE[dH.buf, BytePtr] + BYTES[TEXT[0]] + startIndex ];
};
BDTGetProc: ProcStream.GetProc
-- UNSAFE PROC [clientData: REF, offset: CARD, block: IO.UnsafeBlock] RETURNS[nBytesRead: INT, atEnd: BOOL] -- ~ {
dH: DataHandle ¬ NARROW[clientData];
IF debugMsgs THEN DebugMsg[IO.PutFR1["BDTGetProc req %g ", IO.int[block.count]]];
SELECT dH.bdtState
FROM
ok =>
TRUSTED {
nBytesRead ¬ XRCMUXBDTRead[dH.descriptor, LOOPHOLE[block.base + block.startIndex], block.count];
IF debugMsgs THEN DebugMsg["calling CMUXBDRTRead, "];
};
eof, abort, localAbort => {
nBytesRead ¬ 0;
IF debugMsgs THEN DebugMsg["already eof/abort, "];
};
error => {
nBytesRead ¬ -EIO;
IF debugMsgs THEN DebugMsg["already error, "];
};
ENDCASE => ERROR;
atEnd ¬ TRUE;
SELECT nBytesRead
FROM
block.count => atEnd ¬ FALSE;
(-EBADMSG) => { dH.bdtState ¬ abort; nBytesRead ¬ 0 };
< 0 => dH.bdtState ¬ error;
< block.count => IF dH.bdtState = ok THEN dH.bdtState ¬ eof;
ENDCASE => ERROR;
IF debugMsgs THEN DebugMsg[IO.PutFR["BDTGetProc ret bytes %g atEnd %g\n", IO.int[nBytesRead], IO.bool[atEnd]]];
};
BDTPutProc: ProcStream.PutProc
-- PROC [clientData: REF, offset: CARD, block: IO.UnsafeBlock] RETURNS[nBytesWritten: INT] -- ~ {
dH: DataHandle ¬ NARROW[clientData];
SELECT dH.bdtState
FROM
ok =>
TRUSTED {
nBytesWritten ¬ XRCMUXBDTWrite[dH.descriptor, LOOPHOLE[block.base + block.startIndex], block.count];
};
abort, localAbort => {
nBytesWritten ¬ block.count;
};
error => {
nBytesWritten ¬ -EIO;
};
ENDCASE => ERROR;
SELECT nBytesWritten
FROM
(-ERANGE) => { dH.bdtState ¬ abort; nBytesWritten ¬ block.count };
< 0 => dH.bdtState ¬ error;
ENDCASE => NULL;
};
Call: CrRPC.CallProc
-- [h: Handle, remotePgm: CARD32, remotePgmVersion: CARD16, remoteProc: CARD16, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc] -- ~ {
dH: DataHandle ~
NARROW[h.data];
BDTCheckAbort: CrRPC.BulkDataCheckAbortProc ~ {
RETURN[ dH.bdtState = abort ];
};
Logic of Courier call begins here:
BEGIN
ENABLE {
IO.Error
-- [ec, stream] -- => {
msg: ROPE ¬ IO.PutFR1[ "I/O Error, ec %g", IO.card[ORD[ec]] ];
SELECT dH.phase
FROM
bdtIn, bdtOut => {
tCode: INT; tMsg: ROPE;
[code~tCode, msg~tMsg] ¬ ProcStream.GetErrorDetails[stream];
msg ¬ IO.PutFR["%g (BDT error %g: %g)", IO.rope[msg], IO.int[ABS[tCode]], IO.rope[tMsg]];
};
ENDCASE;
RaiseError[h, communicationFailure, 0, msg];
};
IO.EndOfStream
-- [stream] -- => {
msg:
ROPE ¬ (
SELECT dH.phase
FROM
msgOut => "I/O EndOfStream, args send (!)",
msgIn => "I/O EndOfStream, results recv",
bdtIn => "I/O EndOfStream, BDT read",
bdtOut => "I/O EndOfStream, BDT write (!)",
ENDCASE => "I/O EndOfStream, inactive (!)" );
RaiseError[h, remoteClose, 0, msg];
};
RuntimeError.BoundsFault => {
RaiseError[h, protocolError, 0, "data bounds fault"];
};
};
IF dH.descriptor = nullCMUXDescriptor THEN ConnectToServer[h];
dH.sendBulkData ¬ NIL;
dH.recvBulkData ¬ NIL;
Format call message ...
dH.phase ¬ msgOut;
dH.outStream ¬ IO.TOS[dH.buf, dH.outStream]; dH.buf ¬ NIL;
IF putArgs # NIL THEN putArgs[h, dH.outStream];
IO.Close[dH.outStream]; ???
dH.buf ¬ IO.TextFromTOS[dH.outStream];
BEGIN
ENABLE UNWIND => DisconnectFromServer[h];
Send call message ...
TRUSTED {
callMsgHdr: CrRPCBackdoor.CallMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.callMsgType],
callHdr~[
tID~[0, 0],
pgmNum~Basics.FFromCard32[remotePgm],
pgmVersion~Basics.HFromCard16[remotePgmVersion],
procNum~Basics.HFromCard16[remoteProc]
]
];
ans: INT;
callMsgHdr.callHdr.tID ← dH.tID ← ???; (unused).
IF debugMsgs THEN DebugMsg[IO.PutFR["call pgm %g proc %g args %g bytes\n", IO.card[remotePgm], IO.card[remoteProc], IO.card[dH.buf.length]]];
ans ¬ XRCMUXWriteMsg[dH.descriptor,
LOOPHOLE[@callMsgHdr], CrRPCBackdoor.callMsgHdrBytes,
LOOPHOLE[dH.buf, BytePtr] + BYTES[TEXT[0]], dH.buf.length];
IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't send call msg"];
};
Do BDT transfer ...
BEGIN
bdtProc: CrRPC.BulkDataXferProc ¬ NIL;
localAbort: BOOL;
ans: INT;
SELECT
TRUE
FROM
(dH.sendBulkData #
NIL) => {
need TRUSTED here now for Cedar10.0
TRUSTED { bdtProc ¬ dH.sendBulkData }; dH.phase ¬ bdtIn;
};
(dH.recvBulkData #
NIL) => {
need TRUSTED here now for Cedar10.0
TRUSTED { bdtProc ¬ dH.recvBulkData }; dH.phase ¬ bdtOut;
};
ENDCASE;
IF bdtProc #
NIL
THEN {
dH.bdtState ¬ ok;
dH.stream ¬ ProcStream.PIOS[dH, BDTGetProc, BDTPutProc, NIL, NIL, dH.stream, FALSE, FALSE];
IF debugMsgs THEN DebugMsg[IO.PutFR["BDT xfer stream %x (%g)\n", IO.int[LOOPHOLE[dH.stream]], IO.rope[IF dH.sendBulkData # NIL THEN "source" ELSE "sink"]]];
localAbort ¬ bdtProc[h, dH.stream, BDTCheckAbort];
IF (dH.sendBulkData # NIL) AND (NOT localAbort)
THEN IO.Flush[dH.stream];
ans ¬ XRCMUXFinishBDT[dH.descriptor, TRUE, localAbort];
IF ans < 0 THEN RaiseError[h, bulkDataError, ans, "bulk data transfer failed"];
IF debugMsgs THEN DebugMsg["BDT xfer done"];
IF dH.recvBulkData #
NIL
THEN {
charsRead, charsUnread: INT;
charsRead ¬ IO.GetIndex[dH.stream];
IF debugMsgs THEN DebugMsg[IO.PutFR1[" (%g bytes read, ", IO.int[charsRead]]];
charsUnread ¬ IO.CharsAvail[dH.stream, FALSE];
IF charsUnread =
INT.
LAST
THEN { IF debugMsgs THEN DebugMsg["end of stream)"] }
ELSE { IF debugMsgs THEN DebugMsg[IO.PutFR1["%g unread", IO.int[charsUnread]]] };
};
IF debugMsgs THEN DebugMsg["\n"];
};
END;
Read reply message ...
TRUSTED {
ans: INT;
dH.phase ¬ msgIn;
ans ¬ XRCMUXReadMsg[dH.descriptor, TRUE, waitForever, XRCProcFromMProc[LOOPHOLE[GetMsgAllocater]], dH];
IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't read reply msg"];
IF debugMsgs THEN DebugMsg[IO.PutFR["reply pgm %g proc %g reply %g bytes\n", IO.card[remotePgm], IO.card[remoteProc], IO.int[ans]]];
dH.buf.length ¬ ans;
};
Interpret reply message ...
BEGIN
msgType: HWORD;
dH.inStream ¬ IO.TIS[dH.buf]; dH.buf ¬ NIL;
msgType ¬ CrRPC.GetHWord[dH.inStream];
SELECT msgType
FROM
CrRPCBackdoor.rejectMsgType => {
rejectReason: CARDINAL;
[] ¬ CrRPC.GetHWord[dH.inStream];
rejectReason ¬ CrRPC.GetCard16[dH.inStream];
RaiseError[h,
(
SELECT rejectReason
FROM
CrRPC.noSuchProgram => rejectedNoSuchProgram,
CrRPC.noSuchVersion => rejectedNoSuchVersion,
CrRPC.noSuchProcedure => rejectedNoSuchProcedure,
CrRPC.invalidArgument => rejectedInvalidArgument,
ENDCASE => rejectedUnspecified),
0,
"rejected"];
};
CrRPCBackdoor.returnMsgType => {
[] ¬ CrRPC.GetHWord[dH.inStream];
IF getResults # NIL THEN getResults[h, dH.inStream];
};
CrRPCBackdoor.abortMsgType => {
errNum: CARDINAL;
[] ¬ CrRPC.GetHWord[dH.inStream];
errNum ¬ CrRPC.GetCard16[dH.inStream];
IF getError # NIL THEN getError[h, dH.inStream, errNum];
RaiseError[h, remoteError, 0, "unexpected remote error"];
};
ENDCASE => {
DisconnectFromServer[h];
RaiseError[h, protocolError, 0, IO.PutFR1["bad msgType (%g) in response", IO.card[Basics.Card16FromH[msgType]]]];
};
END;
dH.phase ¬ inactive;
}; -- of Call
Server Process
RefreshServerCMUXDescriptor:
PROC [h: Handle, pgm:
CARD32, pgmVersion:
CARD16] ~ {
ans: INT;
dH: DataHandle;
d: CMUXDescriptor;
dH ¬ NARROW[h.data];
IF dH.descriptor # nullCMUXDescriptor THEN RETURN;
d ¬ XRCMUXCreate[];
ans ¬ ErrCodeFromCMUXDescriptor[d];
IF ans # 0 THEN RaiseError[h, communicationFailure, ans, "can't create CMUX descriptor"];
ans ¬ XRCMUXServerAdd[d, pgm, CARD32[pgmVersion], CARD32[pgmVersion]];
IF ans # 0
THEN {
XRCMUXDestroy[d];
RaiseError[h, communicationFailure, ans, "can't add service to CMUX"];
};
dH.descriptor ¬ d;
};
SmashServerCMUXDescriptor:
PROC [h: Handle] ~ {
dH: DataHandle;
d: CMUXDescriptor;
dH ¬ NARROW[h.data];
IF (d ¬ dH.descriptor) = nullCMUXDescriptor THEN RETURN;
[] ¬ XRCMUXServerQuit[d];
XRCMUXDestroy[d];
dH.descriptor ¬ nullCMUXDescriptor;
};
CleanupServerBDT:
PROC [h: Handle, dH: DataHandle] ~ {
ans: INT ¬ 0;
SELECT dH.phase
FROM
bdtIn => {
ans ¬ XRCMUXFinishBDT[dH.descriptor, FALSE, (dH.bdtState = localAbort)];
};
bdtOut => {
IF dH.bdtState = ok THEN IO.Flush[dH.stream];
ans ¬ XRCMUXFinishBDT[dH.descriptor, TRUE, (dH.bdtState = localAbort)];
};
ENDCASE;
IF ans < 0 THEN RaiseError[h, bulkDataError, ans, "bulk data transfer failed"];
dH.phase ¬ msgOut;
};
BeginReturn: CrRPC.BeginReturnProc
-- [h: Handle] -- ~ {
dH: DataHandle ¬ NARROW[h.data];
s: STREAM;
CleanupServerBDT[h, dH];
s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL;
CrRPC.PutHWord[s, CrRPCBackdoor.returnMsgType];
CrRPC.PutHWord[s, dH.tID];
};
BeginError: CrRPC.BeginErrorProc
-- [h: Handle, errNum: CARDINAL] -- ~ {
dH: DataHandle ¬ NARROW[h.data];
s: STREAM;
CleanupServerBDT[h, dH];
s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL;
CrRPC.PutHWord[s, CrRPCBackdoor.abortMsgType];
CrRPC.PutHWord[s, dH.tID];
CrRPC.PutCard16[s, errNum];
};
BeginReject: CrRPC.BeginRejectProc
-- [h: Handle, rejectReason: CARDINAL] -- ~ {
dH: DataHandle ¬ NARROW[h.data];
s: STREAM;
CleanupServerBDT[h, dH];
s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL;
CrRPC.PutHWord[s, CrRPCBackdoor.rejectMsgType];
CrRPC.PutHWord[s, dH.tID];
CrRPC.PutCard16[s, rejectReason];
};
ServerDaemon:
PROC [h: Handle, pgm:
CARD32, pgmVersion:
CARD16, serverProc: CrRPC.ServerProc, stopServerQueryProc: CrRPC.StopServerQueryProc] ~ {
dH: DataHandle ¬ NARROW[h.data];
pgmF: FWORD ¬ Basics.FFromCard32[pgm];
pgmVersionH: HWORD ¬ Basics.HFromCard16[pgmVersion];
DO
BEGIN
ENABLE {
IO.Error
-- [ec, stream] -- => {
IF debugMsgs
THEN {
DebugMsg[IO.PutFR1[ "I/O Error, ec %g, ", IO.card[ORD[ec]] ]];
DebugMsg[IO.PutFR1[ "phase %g\n", IO.card[ORD[dH.phase]] ]];
};
CONTINUE;
};
IO.EndOfStream
-- [stream] -- => {
IF debugMsgs
THEN {
DebugMsg[ "I/O end of stream\n" ];
};
CONTINUE;
};
CrRPC.Error
-- [h, errorReason, text] -- => {
IF debugMsgs
THEN {
DebugMsg[IO.PutFR[ "CrRPC.Error %g (%g)\n", IO.card[ORD[errorReason]], IO.rope[text] ]];
};
CONTINUE;
};
};
DO
procNum: CARD16;
IF dH.descriptor = nullCMUXDescriptor THEN EXIT;
IF stopServerQueryProc #
NIL
THEN
IF stopServerQueryProc[h]
THEN GOTO Done;
dH.phase ¬ msgIn;
At this point we have a handle with a valid descriptor in it.
Read call message ...
TRUSTED
BEGIN
ans: INT;
ans ¬ XRCMUXReadMsg[dH.descriptor, TRUE, serverGetTimeoutMsec, XRCProcFromMProc[LOOPHOLE[GetMsgAllocater]], dH];
IF ans <= 0
THEN {
IF ans = (-ETIMEDOUT) THEN LOOP;
IF debugMsgs THEN DebugMsg[IO.PutFR1[ "CMUXReadMsg error %g\n", IO.int[ans] ]];
EXIT;
};
IF debugMsgs THEN DebugMsg[IO.PutFR["got call, pgm %g, %g bytes\n", IO.card[pgm], IO.int[ans]]];
dH.buf.length ¬ ans;
END;
Interpret call message ...
BEGIN
msgType: HWORD;
s: STREAM;
s ¬ dH.stream ¬ IO.TIS[dH.buf, dH.stream]; dH.buf ¬ NIL;
msgType ¬ CrRPC.GetHWord[dH.stream];
SELECT msgType
FROM
CrRPCBackdoor.callMsgType => {
dH.tID ¬ CrRPC.GetHWord[s];
IF CrRPC.GetFWord[s] # pgmF
THEN RaiseError[h, other, 0, "CMUX server pgm # botch"];
IF CrRPC.GetHWord[s] # pgmVersionH
THEN RaiseError[h, other, 0, "CMUX server version # botch"];
procNum ¬ CrRPC.GetCard16[s]
};
ENDCASE => {
IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUX bad msg %g\n", IO.card[Basics.Card16FromH[msgType]]]];
RaiseError[h, protocolError, 0, "CMUX delivered non-call msg"];
};
END;
Call server proc ...
BEGIN
serverProc[h, dH.stream, pgm, pgmVersion, procNum, BeginReturn, BeginError, BeginReject];
END;
Send the return/reject/error message ...
BEGIN
ans: INT;
b: REF TEXT;
IF dH.phase # msgOut
THEN {
IF debugMsgs THEN DebugMsg[IO.PutFR1["bad phase after server proc %g\n", IO.card[ORD[dH.phase]]]];
};
b ¬ dH.buf ¬ IO.TextFromTOS[dH.stream];
IF debugMsgs THEN DebugMsg[IO.PutFR["sending reply, pgm %g proc %g len %g\n", IO.card[pgm], IO.card[procNum], IO.card[b.length]]];
ans ¬ XRCMUXWriteMsg[dH.descriptor,
LOOPHOLE[b, BytePtr] + BYTES[TEXT[0]], b.length,
NIL, 0];
IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't send reply msg"];
END;
dH.phase ¬ inactive;
ENDLOOP;
SmashServerCMUXDescriptor[h];
IF debugMsgs THEN DebugMsg["CMUX server pause\n"];
Process.PauseMsec[serverPauseMsec];
RefreshServerCMUXDescriptor[h, pgm, pgmVersion ! CrRPC.Error => CONTINUE ];
ENDLOOP;
SmashServerCMUXDescriptor[h];
};
StartServerInstance: CrRPCBackdoor.StartServerInstanceProc
[pgm: CARD32, pgmVersion: CARD16, local: CrRPC.RefAddress]
~ {
h: Handle;
dH: DataHandle;
serverProc: CrRPC.ServerProc;
stopServerQueryProc: CrRPC.StopServerQueryProc;
[serverProc, stopServerQueryProc] ¬ CrRPCBackdoor.LookUpServerProcs[pgm, pgmVersion];
IF serverProc = NIL THEN RaiseError[NIL, rejectedNoSuchProgram, 0, "no server proc for remote program"];
dH ¬ NEW[DataObject ¬ []];
h ¬
NEW[CrRPC.Object ¬ [
class~myHandleClass,
kind~server,
procs~theServerProcs,
data~dH,
clientData~NIL
]];
dH.handle ¬ h;
RefreshServerCMUXDescriptor[h, pgm, pgmVersion];
TRUSTED {
Process.Detach[ FORK ServerDaemon[h, pgm, pgmVersion, serverProc, stopServerQueryProc] ];
};
};
I/O and Marshalling
ErrorPutBulkDataSource: CrRPC.PutBulkDataXferProcProc
-- [h: Handle, s: STREAM, proc: BulkDataXferProc]-- ~ {
RaiseError[h, notClientHandle, 0, "putBDTSource on server handle"];
};
PutBulkDataSource: CrRPC.PutBulkDataXferProcProc
-- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ {
dH: DataHandle ~ NARROW[h.data];
IF proc =
NIL
THEN {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]];
TRUSTED { dH.sendBulkData ¬ proc }};
};
ErrorPutBulkDataSink: CrRPC.PutBulkDataXferProcProc
-- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ {
RaiseError[h, notClientHandle, 0, "putBDTSink on server handle"];
};
PutBulkDataSink: CrRPC.PutBulkDataXferProcProc
-- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ {
dH: DataHandle ~ NARROW[h.data];
IF proc =
NIL
THEN {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]];
TRUSTED { dH.recvBulkData ¬ proc }};
};
PutBulkDataSinkBuggy: PUBLIC CrRPC.PutBulkDataXferProcProc
-- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ {
This is a workaround for a bug in the AdobeServer.
AdobeCourierServerImplA.GetSystemDefaultUserFile expects a
bulk data transfer proc, but doesn't deserriallize the bulk data transfer tag.
dH: DataHandle ~ NARROW[h.data];
TRUSTED { dH.recvBulkData ¬ proc };
};
ErrorGetBulkDataSource:
PROC [h: Handle, s:
STREAM]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
RaiseError[h, notServerHandle, 0, "GetBDTSource on client handle"];
};
GetBulkDataSource:
PROC [h: Handle, s:
STREAM]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
typeCode ¬ CrRPC.GetCard16[s];
SELECT typeCode
FROM
ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => {
bulkDataSource ¬ NIL };
ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] =>
TRUSTED {
bulkDataSource ¬ ServerRecvBulkData };
ORD[CrRPCBackdoor.BulkDataDescriptorType.active],
ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => {
RaiseError[h, notImplemented, 0, "no third party bulk data"] };
ENDCASE => {
RaiseError[h, protocolError, 0, "bad bulkDataSource type"] };
};
ServerRecvBulkData: CrRPC.BulkDataXferProc
-- [h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL] -- ~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 2048;
b: REF TEXT ~ RefText.ObtainScratch[bufSize];
nReadSoFar: CARD ¬ 0;
dH.phase ¬ bdtIn; dH.bdtState ¬ ok;
DO
nBytes: INT;
atEnd: BOOL;
TRUSTED {
[nBytes, atEnd] ¬ BDTGetProc[dH, nReadSoFar, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~bufSize]];
};
IF nBytes < 0 THEN RaiseError[h, communicationFailure, -nBytes, "BDT read failure"];
IF (nBytes > 0)
THEN {
nReadSoFar ¬ nReadSoFar + nBytes;
IO.UnsafePutBlock[s, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~nBytes]];
};
IF atEnd THEN EXIT;
IF checkAbort[h].abort
THEN {
dH.bdtState ¬ localAbort;
EXIT;
};
ENDLOOP;
RefText.ReleaseScratch[b];
RETURN [dH.bdtState = abort];
};
ErrorGetBulkDataSink:
PROC [h: Handle, s:
STREAM]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
RaiseError[h, notServerHandle, 0, "GetBDTSink on client handle"];
};
GetBulkDataSink:
PROC [h: Handle, s:
STREAM]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
typeCode ¬ CrRPC.GetCard16[s];
SELECT typeCode
FROM
ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => {
bulkDataSink ¬ NIL };
ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] =>
TRUSTED {
bulkDataSink ¬ ServerSendBulkData };
ORD[CrRPCBackdoor.BulkDataDescriptorType.active],
ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => {
RaiseError[h, notImplemented, 0, "no third party bulk data"] };
ENDCASE => {
RaiseError[h, protocolError, 0, "bad bulkDataSink type"] };
};
ServerSendBulkData: CrRPC.BulkDataXferProc
-- [h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL] -- ~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ~ RefText.ObtainScratch[bufSize];
nWrittenSoFar: CARD ¬ 0;
dH.phase ¬ bdtOut; dH.bdtState ¬ ok;
DO
nRead, nWritten: INT;
nRead ¬ IO.GetBlock[s, b];
IF nRead <= 0 THEN EXIT;
nWritten ¬ BDTPutProc[dH, nWrittenSoFar, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~nRead]];
IF dH.bdtState # ok THEN EXIT;
IF checkAbort[h].abort
THEN {
dH.bdtState ¬ localAbort;
EXIT;
};
ENDLOOP;
RefText.ReleaseScratch[b];
RETURN[ dH.bdtState = abort ];
};
Bulk Data Streams
nextSegment: CARD16 ~ 0;
lastSegment: CARD16 ~ 1;
ReadBulkDataStream: CrRPC.ReadBulkDataStreamProc
[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, readValue: BulkDataValueProc] RETURNS [abort: BOOL]
~ {
segmentKind: CARD16;
len: CARD16;
DO
IF checkAbort[h] THEN RETURN[FALSE];
segmentKind ¬ CrRPC.GetCard16[s];
len ¬ CrRPC.GetCard16[s];
THROUGH [1..len]
DO
IF checkAbort[h] THEN RETURN[FALSE];
IF readValue[s].abort THEN RETURN[TRUE];
ENDLOOP;
IF segmentKind = lastSegment THEN RETURN[FALSE];
ENDLOOP;
};
WriteBulkDataSegment: CrRPC.WriteBulkDataSegmentProc
[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, writeValue: BulkDataValueProc, n: CARDINAL] RETURNS [abort: BOOL, heAborted: BOOL]
~ {
CrRPC.PutCard16[s, (IF n > 0 THEN nextSegment ELSE lastSegment)];
CrRPC.PutCard16[s, n];
abort ¬ heAborted ¬ FALSE;
THROUGH [1..n]
DO
IF checkAbort[h] THEN { heAborted ¬ TRUE; RETURN };
IF writeValue[s].abort THEN { abort ¬ TRUE; RETURN };
ENDLOOP;
};
Continuations
ErrorCallContinuation: CrRPC.CallContinuationProc ~ {
RaiseError[h, notClientHandle, 0, "CallContinuation on server handle"] };
CallContinuation: CrRPC.CallContinuationProc
[h: Handle, proc: ContinuationProc, clientData: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
RaiseError[h, notImplemented, 0, "continuations not implemented"];
};
ErrorSetContinuation: CrRPC.SetContinuationProc ~ {
RaiseError[h, notServerHandle, 0, "SetContinuation on client handle"] };
SetContinuation: CrRPC.SetContinuationProc
[h: Handle, proc: ContinuationProc, clientData: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
RaiseError[h, notImplemented, 0, "continuations not implemented"];
TRUSTED { dH.continuation ¬ proc };
dH.continuationClientData ¬ clientData };
Cache of Handles
We maintain a cache of recently used client handles with their transport intact. If a handle isn't used for awhile, it times out, the transport is closed and the handle destroyed.
cache: DataHandle ¬ NIL;
cacheTimeoutTicks: Ticks ¬ Process.SecondsToTicks[8];
cacheSweepIntervalTicks: Ticks ¬ Process.SecondsToTicks[8];
Now: PROC RETURNS [Ticks] ~ TRUSTED MACHINE CODE { "XR←TicksSinceBoot" };
TicksSince: PROC [then: Ticks] RETURNS [Ticks] ~ { RETURN [ Now[] - then ] };
CheckOutFromCache:
ENTRY
PROC [remote:
XNS.Address]
RETURNS [h: Handle ¬
NIL] ~ {
dH: DataHandle ¬ cache;
prevH: DataHandle ¬ NIL;
WHILE dH #
NIL
DO
IF (dH.remote) = remote
THEN {
IF prevH = NIL THEN cache ¬ dH.next ELSE prevH.next ¬ dH.next;
h ¬ dH.handle;
dH.handle ¬ NIL; -- for finalization
dH.lastUsed ¬ Now[];
EXIT };
prevH ¬ dH; dH ¬ dH.next;
ENDLOOP;
};
CheckInToCache:
ENTRY
PROC [h: Handle] ~ {
dH: DataHandle ¬ NARROW[h.data];
IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUXCheckIn d=%g\n", IO.int[dH.descriptor]]];
dH.handle ¬ h;
dH.lastUsed ¬ Now[];
dH.next ¬ cache;
cache ¬ dH };
CacheSweeper:
PROC ~ {
GetDeadHandles:
ENTRY
PROC
RETURNS [deadList: DataHandle ¬
NIL] ~ {
dH, newCache, newCacheTail: DataHandle ¬ NIL;
WHILE cache #
NIL
DO
dH ¬ cache; cache ¬ cache.next;
IF TicksSince[dH.lastUsed] <= cacheTimeoutTicks
THEN
-- add to new cache -- {
dH.next ¬ NIL;
IF newCacheTail = NIL THEN newCache ¬ dH ELSE newCacheTail.next ¬ dH;
newCacheTail ¬ dH }
ELSE
-- add to dead list -- {
dH.next ¬ deadList;
deadList ¬ dH };
ENDLOOP;
cache ¬ newCache;
};
DestroyDeadHandles:
PROC [deadList: DataHandle] ~ {
dH: DataHandle;
IF deadList #
NIL
THEN {
IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUXCacheSweeper %g\n", IO.card[LOOPHOLE[deadList]]]];
};
WHILE deadList #
NIL
DO
dH ¬ deadList; deadList ¬ dH.next;
BuryDeadHandle[dH.handle];
ENDLOOP;
};
DO
DestroyDeadHandles[ GetDeadHandles[] ];
Process.Pause[ cacheSweepIntervalTicks ];
ENDLOOP;
};
BuryDeadHandle:
PROC [h: Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
dH.handle ¬ NIL; -- break cycle for GC
dH.stream ¬ NIL; -- break cycle with ProcStream
IF debugMsgs THEN DebugMsg[IO.PutFR1["BuryDeahHandle d=%g\n", IO.int[dH.descriptor]]];
DisconnectFromServer[h];
h.data ¬ NIL;
};
Handle Creation and Destruction
ErrorFinalize: PROC [h: Handle] RETURNS [renewHandle: BOOL] ~ { ERROR };
Finalize:
PROC [h: Handle]
RETURNS [renewHandle:
BOOL ¬
FALSE] ~ {
A client has dropped the handle h, which is guaranteed not to be in the cache.
dH: DataHandle ~ NARROW[h.data];
IF dH = NIL THEN -- handle is already dead -- RETURN;
IF debugMsgs THEN DebugMsg[IO.PutFR1["Finalizing d=%g\n", IO.int[dH.descriptor]]];
IF TicksSince[dH.lastUsed] <= cacheTimeoutTicks
THEN {
CheckInToCache[h];
RETURN [TRUE]; };
BuryDeadHandle[h];
RETURN [FALSE];
};
CreateClientHandle: CrRPCBackdoor.CreateClientHandleProc
[remote: CrRPC.RefAddress] RETURNS [Handle]
~ {
handle: Handle;
dH: DataHandle;
addr: REF XNS.Address;
IF remote =
NIL
THEN
RaiseError[NIL, addressInappropriateForClass, 0, "missing remote address"];
WITH remote
SELECT
FROM
rA: REF XNS.Address => addr ¬ NEW[XNS.Address ¬ (rA)];
ENDCASE => RaiseError[NIL, addressInappropriateForClass, 0, "not XNS address"];
IF ( alwaysSmashTheSocket ) OR ( addr.socket = XNS.unknownSocket )
THEN addr.socket ¬ XNSWKS.courier;
handle ¬ CheckOutFromCache[addr];
IF handle =
NIL
THEN {
dH ¬ NEW[DataObject ¬ [lastUsed~Now[], remote~addr]];
handle ¬ CrRPCBackdoor.NewClientObject[class~$CMUX, procs~theClientProcs, data~dH];
};
RETURN [handle];
};
CrRPCBackdoor.RegisterCreateClientHandleProc[myHandleClass, CreateClientHandle];
CrRPCBackdoor.RegisterStartServerInstanceProc[myHandleClass, StartServerInstance];
}.