DIRECTORY
Basics USING [Card32FromF, Card16FromH, FFromCard32, FWORD, HFromCard16, HWORD],
BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses],
CrRPC USING [BeginErrorProc, BeginRejectProc, BeginReturnProc, BulkDataCheckAbortProc, BulkDataSink, BulkDataSource, BulkDataValueProc, BulkDataXferProc, CallContinuationProc, CallProc, ContinuationProc, Error, ErrorReason, GetBulkDataXferProcProc, GetCard16, GetHWord, Handle, invalidArgument, noSuchProcedure, noSuchProgram, noSuchVersion, Object, ProcsObject, PutBulkDataXferProcProc, PutCard16, ReadBulkDataStreamProc, ServerProc, SetContinuationProc, SetParametersProc, WriteBulkDataSegmentProc],
CrRPCBackdoor USING [AbortHdr, AbortMsgHdr, abortMsgHdrBytes, abortMsgType, BulkDataDescriptorType, CallHdr, CallMsgHdr, callMsgHdrBytes, callMsgType, courierVersionNum, CreateClientHandleProc, CreateListenerProc, LookUpServerProc, MsgHdr, --msgHdrBytes,-- NewClientObject, RegisterCreateClientHandleProc, RegisterCreateListenerProc, RejectHdr, RejectMsgHdr, rejectMsgHdrBytes, rejectMsgType, RenewClientObject, ReturnHdr, ReturnMsgHdr, returnMsgHdrBytes, returnMsgType, SessionHdr, sessionHdrBytes],
IO USING [CharsAvail, Close, EndOf, EndOfStream, Error, GetBlock, PutBlock, PutFR1, STREAM, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock],
Process USING [Detach, Pause, SecondsToTicks, Ticks],
RefText USING [ObtainScratch, ReleaseScratch],
Rope USING [ROPE],
RuntimeError USING [BoundsFault],
XNS USING [Address, Socket, unknownSocket],
XNSSPPTypes USING [bulkDataSST, defaultSST, endReplySST, endSST, SubSequenceType],
XNSStream USING [ConnectionClosed, Create, CreateListener, FlushInput, GetStatus, Listener, ListenerProc, SendAttention, SendClose, SendCloseReply, SendEndOfMessage, SendNow, SetSSType, State, SubSequenceType, Timeout, waitForever],
XNSWKS USING [courier];
CrRPCSPPImpl:
CEDAR
MONITOR
IMPORTS Basics, BasicTime, CrRPC, CrRPCBackdoor, IO, Process, RefText, RuntimeError, XNSStream
~ {
Copied Types and Constants
FWORD: TYPE ~ Basics.FWORD;
HWORD: TYPE ~ Basics.HWORD;
STREAM: TYPE ~ IO.STREAM;
FfC:
PROC [c:
CARD32]
RETURNS [
FWORD] ~
INLINE {
RETURN [Basics.FFromCard32[c]] };
HfC:
PROC [c:
CARD16]
RETURNS [
HWORD] ~
INLINE {
RETURN [Basics.HFromCard16[c]] };
CfF:
PROC [f:
FWORD]
RETURNS [
CARD32] ~
INLINE {
RETURN [Basics.Card32FromF[f]] };
CfH:
PROC [h:
HWORD]
RETURNS [
CARD16] ~
INLINE {
RETURN [Basics.Card16FromH[h]] };
Handle: TYPE ~ CrRPC.Handle;
CharsAvail Hack
The "official" semantics of IO.CharsAvail is "the number of times IO.GetChar could be called without blocking", whence IO.CharsAvail must return a nonzero value at end of stream. This code was written assuming CharsAvail is 0 at end of stream. This ought to be fixed, but it's a bit subtle.
MyCharsAvail:
PROC [self:
STREAM, wait:
BOOL ¬
FALSE]
RETURNS [n:
INT] ~
INLINE {
IF (n ¬ IO.CharsAvail[self, wait]) # 0 THEN IF IO.EndOf[self] THEN n ¬ 0 };
Handle Objects
DataHandle: TYPE ~ REF DataObject;
DataObject:
TYPE ~
RECORD [
next: DataHandle ¬ NIL,
handle: Handle ¬ NIL,
lastUsed: BasicTime.Pulses,
stream: IO.STREAM ¬ NIL,
inputFlushNeeded: BOOL ¬ FALSE,
remote: XNS.Address,
sendBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles
recvBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles
continuation: CrRPC.ContinuationProc ¬ NIL, -- for server handles
continuationClientData: REF -- for server handles
];
theClientProcs:
REF CrRPC.ProcsObject
~
NEW[ CrRPC.ProcsObject ¬ [
destroy~CheckInToCache,
setParameters~SetParameters,
call~Call,
getBulkDataSource~ErrorGetBulkDataSource,
getBulkDataSink~ErrorGetBulkDataSink,
putBulkDataSource~PutBulkDataSource,
putBulkDataSink~PutBulkDataSink,
readBulkDataStream~ReadBulkDataStream,
writeBulkDataSegment~WriteBulkDataSegment,
callContinuation~CallContinuation,
setContinuation~ErrorSetContinuation,
finalize~Finalize
]
];
theServerProcs:
REF CrRPC.ProcsObject
~
NEW[ CrRPC.ProcsObject ¬ [
destroy~CheckInToCache,
setParameters~SetParameters,
call~ErrorCall,
getBulkDataSource~GetBulkDataSource,
getBulkDataSink~GetBulkDataSink,
putBulkDataSource~ErrorPutBulkDataSource,
putBulkDataSink~ErrorPutBulkDataSink,
readBulkDataStream~ReadBulkDataStream,
writeBulkDataSegment~WriteBulkDataSegment,
callContinuation~ErrorCallContinuation,
setContinuation~SetContinuation,
finalize~ErrorFinalize
]
];
Variables
alwaysSmashTheSocket: BOOL ¬ FALSE;
Parameters
Class-specific operations:
$GetRemote returns a REF to the XNS.Address of the remote site associated with this handle. (Works for either client or server handles).
SetParameters: CrRPC.SetParametersProc
[h: Handle, op: ATOM, argument: REF] RETURNS [hNew: Handle, result: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
hNew ¬ h;
SELECT op
FROM
$GetRemote => result ¬ NEW[XNS.Address ¬ dH.remote];
ENDCASE => ERROR CrRPC.Error[h, unknownOperation, NIL];
};
Client Call
Notes:
Before raising CrRPC.Error[h, ...] require that the stream NARROW[h.data].stream be in a reasonable state (possibly NIL).
The ONLY (expected) error that may be passed to clients is CrRPC.Error — stream class-specific errors MUST be caught here.
We catch RuntimeError.BoundsFault on read operations, because that could be raised by any getResults or getBulkData proc, and catching it here enables us to fix up the state of the transport stream.
ErrorCall: CrRPC.CallProc ~ {
ERROR CrRPC.Error[h, notClientHandle, "call using server handle"] };
Call: CrRPC.CallProc
[h: Handle, remotePgm: CARD32, remotePgmVersion: CARD16, remoteProc: CARD16, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]
~ {
dH: DataHandle ~ NARROW[h.data];
ConnectToServer:
PROC ~ {
Connect to the remote server. If the connect fails, dH.stream will be left NIL.
The version-number pair is deliberately set to include more than one version so the server will be forced to return his version number pair immediately.
ERRORS: XNSStream.ConnectionClosed, CrRPC.Error, IO.Error.
sessionHdrOut: CrRPCBackdoor.SessionHdr ¬ [
lowVersion~HfC[CrRPCBackdoor.courierVersionNum - 1],
highVersion~HfC[CrRPCBackdoor.courierVersionNum]];
sessionHdrIn: CrRPCBackdoor.SessionHdr;
nRead: NAT;
IF dH.stream # NIL THEN ERROR;
dH.stream ¬ XNSStream.Create[remote~dH.remote,
getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrOut]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] };
XNSStream.SendNow[dH.stream];
TRUSTED {
nRead ¬ IO.UnsafeGetBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrIn]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] };
dH.inputFlushNeeded ¬ FALSE; -- ???? Is there EOM on version number pair ????
IF (nRead # CrRPCBackdoor.sessionHdrBytes)
OR (CfH[sessionHdrIn.lowVersion] > CrRPCBackdoor.courierVersionNum)
OR (CfH[sessionHdrIn.highVersion] < CrRPCBackdoor.courierVersionNum)
THEN {
SendCloseStream[dH];
ERROR CrRPC.Error[h, courierVersionMismatch, "Courier version error"];
};
};
SendCallMsg:
PROC ~ {
Send the call message.
ERRORS: XNSStream.ConnectionClosed, IO.Error
callMsgHdr: CrRPCBackdoor.CallMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.callMsgType],
callHdr~[
tID~[0, 0],
pgmNum~FfC[remotePgm],
pgmVersion~HfC[remotePgmVersion],
procNum~HfC[remoteProc]
]
];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]] };
IF putArgs # NIL THEN putArgs[h, dH.stream];
XNSStream.SendEndOfMessage[dH.stream];
};
ReadResultMsg:
PROC ~ {
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed, IO.Error, IO.EndOfStream
msgType: HWORD ¬ CrRPC.GetHWord[dH.stream];
SELECT msgType
FROM
CrRPCBackdoor.rejectMsgType => {
rejectReason: CARDINAL;
[] ¬ CrRPC.GetHWord[dH.stream];
rejectReason ¬ CrRPC.GetCard16[dH.stream];
[] ¬ AdvanceInput[h~h, wait~FALSE];
ERROR CrRPC.Error[h,
(
SELECT rejectReason
FROM
CrRPC.noSuchProgram => rejectedNoSuchProgram,
CrRPC.noSuchVersion => rejectedNoSuchVersion,
CrRPC.noSuchProcedure => rejectedNoSuchProcedure,
CrRPC.invalidArgument => rejectedInvalidArgument,
ENDCASE => rejectedUnspecified),
"rejected"];
};
CrRPCBackdoor.returnMsgType => {
[] ¬ CrRPC.GetHWord[dH.stream];
IF getResults # NIL THEN getResults[h, dH.stream];
[] ¬ AdvanceInput[h~h, wait~FALSE];
};
CrRPCBackdoor.abortMsgType => {
errNum: CARDINAL;
[] ¬ CrRPC.GetHWord[dH.stream];
errNum ¬ CrRPC.GetCard16[dH.stream];
IF getError # NIL THEN getError[h, dH.stream, errNum];
[] ¬ AdvanceInput[h~h, wait~FALSE];
ERROR CrRPC.Error[h, remoteError, "unexpected remote error"] };
ENDCASE => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, protocolError, "bad msgType in response"] };
};
ClientRdySysRecv:
PROC ~ {
Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE.
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput, GetStatus)
???? SAME AS ServerRdySysRecv! ????
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
DO
[state, sst] ¬ AdvanceInput[h~h, wait~TRUE];
IF state = open THEN EXIT;
[] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
IF sst # 0
THEN {
SmashStream[dH];
ERROR CrRPC.Error[h, protocolError, "sys sst # 0"] };
dH.inputFlushNeeded ¬ TRUE;
};
ClientRdyBDTSend:
PROC
RETURNS [ok:
BOOL] ~ {
Prepare to send bulk data — (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case nothing arrives on the stream. If data arrives, it's a reject message from the other end, and the bulk data transfer should not be attempted. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately).
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput, CharsAvail)
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ¬ AdvanceInput[h~h, wait~FALSE];
SELECT state
FROM
open => {
ok ¬ (MyCharsAvail[self~dH.stream, wait~FALSE] = 0) };
attention => {
ok ¬ TRUE };
ENDCASE => ERROR;
};
ClientRdyBDTRecv:
PROC
RETURNS [ok:
BOOL] ~ {
Prepare to receive bulk data — (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case bulk data arrives on the stream.
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput)
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ¬ AdvanceInput[h~h, wait~TRUE];
SELECT state
FROM
open, endOfMessage => {
ok ¬ (sst # 0);
dH.inputFlushNeeded ¬ ok };
attention => {
ok ¬ TRUE };
ENDCASE => ERROR;
};
CheckAbortRecv: CrRPC.BulkDataCheckAbortProc ~ {
[] ¬ IO.CharsAvail[self~dH.stream, wait~TRUE]; -- MyCharsAvail not needed here!
RETURN [ XNSStream.GetStatus[self~dH.stream, reset~FALSE].state = attention ] };
CheckAbortSend: CrRPC.BulkDataCheckAbortProc ~ {
RETURN[ (XNSStream.GetStatus[self~dH.stream, reset~FALSE].state # open)
OR (MyCharsAvail[self~dH.stream, wait~FALSE] > 0) ] };
Logic of Courier call begins here:
BEGIN
dH.sendBulkData ¬ NIL;
dH.recvBulkData ¬ NIL;
BEGIN
IF dH.stream #
NIL
THEN {
ENABLE XNSStream.ConnectionClosed, CrRPC.Error,
IO.Error => {
SmashStream[dH];
CONTINUE };
[] ¬ AdvanceInput[h~h, wait~FALSE]; -- check for close
SendCallMsg[];
GOTO Done };
{
-- dH.stream = NIL --
ENABLE {
XNSStream.ConnectionClosed => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] };
};
ConnectToServer[];
SendCallMsg[];
GOTO Done };
END;
At this point we have an open connection to the server and a call message has been sent successfully.
BEGIN
ENABLE {
XNSStream.ConnectionClosed => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] };
};
SELECT
TRUE
FROM
(dH.sendBulkData #
NIL) => {
Bulk data transfer from client to server.
IF ClientRdyBDTSend[].ok
THEN {
XNSStream.SetSSType[dH.stream, XNSSPPTypes.bulkDataSST];
IF dH.sendBulkData[h, dH.stream, CheckAbortSend].abort
THEN XNSStream.SendAttention[dH.stream, 1] -- ???? Define 1 ????
ELSE XNSStream.SendEndOfMessage[dH.stream];
XNSStream.SetSSType[dH.stream, XNSSPPTypes.defaultSST];
};
};
(dH.recvBulkData #
NIL) => {
Bulk data transfer from server to client.
ENABLE {
IO.EndOfStream => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, bulkDataError, "end of stream"] };
RuntimeError.BoundsFault => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, bulkDataError, "bulk data value bounds fault"] };
};
IF ClientRdyBDTRecv[].ok
THEN {
There's bulk data (or an attention) in the stream.
IF dH.recvBulkData[h, dH.stream, CheckAbortRecv].abort
THEN XNSStream.SendAttention[dH.stream, 1]; -- ???? Define 1 ????
};
};
ENDCASE => {
No bulk data transfer in either direction.
NULL;
};
{
ENABLE {
IO.EndOfStream => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, resultsError, "end of stream"] };
RuntimeError.BoundsFault => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, resultsError, "result value bounds fault"] };
};
ClientRdySysRecv[];
ReadResultMsg[];
[] ← AdvanceInput[h~h, wait~FALSE]; -- done by ReadResultMsg
};
END;
}; -- of Call
Server Process
Notes:
Any ERROR — CrRPC.Error, XNSStream.ConnectionClosed, IO.Error, IO.EndOfStream, RuntimeError.BoundsFault — eventually causes the transport to be discarded without attempting to serve any more calls on it.
ServerRdySysRecv:
PROC [h: Handle] ~ {
Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE.
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
DO
[state, sst] ¬ AdvanceInput[h~h, wait~TRUE];
IF state = open THEN EXIT;
[] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"];
dH.inputFlushNeeded ¬ TRUE;
};
ServerRdyBDTRecv:
PROC [h: Handle] ~ {
Prepare to receive bulk data — (try to) check whether the other side has sent us anything besides bulk data — closed the stream, etc. In the normal case bulk data arrives on the stream.
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ¬ AdvanceInput[h~h, wait~TRUE];
SELECT state
FROM
open, endOfMessage => {
IF sst = 0 THEN ERROR CrRPC.Error[h, protocolError, "missing bulk data"];
dH.inputFlushNeeded ¬ TRUE };
ENDCASE => ERROR;
};
ServerRdyBDTSend:
PROC [h: Handle] ~ {
Prepare to send bulk data — (try to) check whether the other side has closed the stream or aborted the transfer. In the normal case nothing arrives on the stream. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately).
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ¬ AdvanceInput[h~h, wait~FALSE];
SELECT state
FROM
open => {
IF MyCharsAvail[self~dH.stream, wait~
FALSE] # 0
THEN
ERROR CrRPC.Error[h, protocolError, "client send to bdt source"];
};
ENDCASE => ERROR;
};
BeginReturn: CrRPC.BeginReturnProc
[h: Handle]
~ {
ERRORS: XNSStream.ConnectionClosed, IO.Error
dH: DataHandle ~ NARROW[h.data];
returnMsgHdr: CrRPCBackdoor.ReturnMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.returnMsgType],
returnHdr~[tID~[hi~0, lo~0]]
];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@returnMsgHdr]], startIndex~0, count~CrRPCBackdoor.returnMsgHdrBytes]] };
};
BeginError: CrRPC.BeginErrorProc
[h: Handle, errNum: CARDINAL]
~{
ERRORS: XNSStream.ConnectionClosed, IO.Error
dH: DataHandle ~ NARROW[h.data];
abortMsgHdr: CrRPCBackdoor.AbortMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.abortMsgType],
abortHdr~[tID~[0,0], errNum~HfC[errNum]]
];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@abortMsgHdr]], startIndex~0, count~CrRPCBackdoor.abortMsgHdrBytes]] };
};
BeginReject: CrRPC.BeginRejectProc
[h: Handle, rejectReason: CARDINAL]
~ {
ERRORS: XNSStream.ConnectionClosed, IO.Error
dH: DataHandle ~ NARROW[h.data];
rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.rejectMsgType],
rejectHdr~[tID~[0,0], rejectReason~HfC[rejectReason]]
];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] };
};
ListenerProc: XNSStream.ListenerProc
[stream: IO.STREAM, remote: XNS.Address]
~ {
sessionHdr: CrRPCBackdoor.SessionHdr;
callMsgHdr: CrRPCBackdoor.CallMsgHdr;
desiredPgm: CARD32;
desiredPgmVersion: CARD16;
desiredProc: CARD16;
serverProc: CrRPC.ServerProc;
nBytes: INT;
h: Handle;
dH: DataHandle;
BEGIN
ENABLE {
XNSStream.ConnectionClosed => GOTO Out;
CrRPC.Error, IO.Error, XNSStream.Timeout, RuntimeError.BoundsFault => GOTO Close;
};
Exchange version numbers
TRUSTED {
nBytes ¬ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]]};
IF nBytes # CrRPCBackdoor.sessionHdrBytes THEN GOTO Close;
IF (CfH[sessionHdr.lowVersion] > CrRPCBackdoor.courierVersionNum)
OR (CfH[sessionHdr.highVersion] < CrRPCBackdoor.courierVersionNum)
THEN {
GOTO Close };
sessionHdr.lowVersion ¬ sessionHdr.highVersion ¬ HfC[CrRPCBackdoor.courierVersionNum];
TRUSTED { IO.UnsafePutBlock[stream,
[base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]] };
XNSStream.SendNow[stream];
Create server handle
dH ¬ NEW[DataObject ¬ [lastUsed~, stream~stream, remote~remote]];
h ¬ NEW[CrRPC.Object ¬ [class~$SPP, kind~server, procs~theServerProcs, data~dH, clientData~NIL]];
Process calls until timeout or unrecoverable error
DO
Get message; make sure it's a call, get desired program, version and proc number
ServerRdySysRecv[h];
TRUSTED { nBytes ¬ IO.UnsafeGetBlock[stream,
[base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]]};
IF (nBytes # CrRPCBackdoor.callMsgHdrBytes)
OR (callMsgHdr.msgHdr.msgType # CrRPCBackdoor.callMsgType)
THEN {
GOTO Close };
desiredPgm ¬ CfF[callMsgHdr.callHdr.pgmNum];
desiredPgmVersion ¬ CfH[callMsgHdr.callHdr.pgmVersion];
desiredProc ¬ CfH[callMsgHdr.callHdr.procNum];
serverProc ¬ CrRPCBackdoor.LookUpServerProc[desiredPgm, desiredPgmVersion];
SELECT serverProc
FROM
NIL =>
-- Don't have server, reject the call -- {
rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.rejectMsgType],
rejectHdr~[tID~[0,0], rejectReason~HfC[CrRPC.noSuchProgram]]
];
TRUSTED { IO.UnsafePutBlock[stream,
[base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] };
XNSStream.SendEndOfMessage[stream] };
ENDCASE =>
-- Serve the call -- {
serverProc[h, stream, desiredPgm, desiredPgmVersion, desiredProc, BeginReturn, BeginError, BeginReject];
IF dH.stream = NIL THEN GOTO Smashed; -- client caught error
XNSStream.SendEndOfMessage[stream];
IF dH.continuation #
NIL
THEN {
newStream: IO.STREAM
¬ dH.continuation[dH.stream, dH.continuationClientData];
IF newStream = NIL THEN GOTO Closed;
IF newStream # dH.stream THEN GOTO Smashed;
};
};
ENDLOOP;
END;
EXITS
Close => {
[] ¬ XNSStream.SendClose[stream
! XNSStream.ConnectionClosed => CONTINUE];
IO.Close[self~stream, abort~TRUE];
};
Out => {
IO.Close[self~stream, abort~TRUE];
};
Closed, Smashed => NULL;
};
ListenerValue: TYPE ~ REF ListenerValueObject;
ListenerValueObject:
TYPE ~
RECORD [
next: ListenerValue,
socket: XNS.Socket,
listener: XNSStream.Listener
];
listeners: ListenerValue ¬
NIL;
CreateListener:
ENTRY CrRPCBackdoor.CreateListenerProc
[local: CrRPC.RefAddress]
~ {
newListener: XNSStream.Listener;
socket: XNS.Socket ¬ XNS.unknownSocket;
IF local #
NIL
THEN
WITH local
SELECT
FROM
rA: REF XNS.Address => socket ¬ rA.socket;
rS: REF XNS.Socket => socket ¬ rS;
ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL];
IF socket = XNS.unknownSocket THEN socket ¬ XNSWKS.courier;
FOR p: ListenerValue ¬ listeners, p.next
WHILE p #
NIL
DO
IF p.socket = socket THEN RETURN;
ENDLOOP;
newListener ¬ XNSStream.CreateListener[socket~socket, worker~ListenerProc,
getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever];
listeners ¬ NEW[ListenerValueObject ¬
[next~listeners, socket~socket, listener~newListener]];
};
I/O and Marshalling
AdvanceInput:
PROC [h: Handle, wait:
BOOL ¬
FALSE]
RETURNS [state: XNSStream.State, ssType: XNSStream.SubSequenceType] ~ {
Advance input stream to next significant piece of input, checking for remote close and flushing unread input if needed.
ERRORS: XNSStream.ConnectionClosed, CrRPC.Error
dH: DataHandle ~ NARROW[h.data];
sawBreak: BOOL ¬ FALSE;
IF dH.inputFlushNeeded
THEN {
[] ¬ XNSStream.FlushInput[self~dH.stream, wait~TRUE];
dH.inputFlushNeeded ¬ FALSE };
DO
[state~state, ssType~ssType] ¬ XNSStream.GetStatus[self~dH.stream, reset~FALSE];
IF (ssType = XNSSPPTypes.endSST)
OR (ssType = XNSSPPTypes.endReplySST)
THEN {
ReplyCloseStream[dH];
ERROR CrRPC.Error[h, protocolError, "remote close"] };
SELECT state
FROM
open => {
IF NOT wait THEN RETURN;
IF MyCharsAvail[self~dH.stream, wait~TRUE] > 0 THEN RETURN;
LOOP };
attention => RETURN;
endOfMessage => {
IF sawBreak THEN RETURN;
sawBreak ¬ TRUE };
ssTypeChange => {
sawBreak ¬ TRUE };
ENDCASE => ERROR;
[] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
};
SmashStream:
PROC [dH: DataHandle] ~ {
ERRORS: none.
IF dH.stream #
NIL
THEN {
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ¬ NIL };
};
SendCloseStream:
PROC [dH: DataHandle] ~ {
ERRORS: none.
IF dH.stream #
NIL
THEN {
[] ¬ XNSStream.SendClose[dH.stream
! XNSStream.ConnectionClosed => CONTINUE];
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ¬ NIL };
};
ReplyCloseStream:
PROC [dH: DataHandle] ~ {
ERRORS: none.
IF dH.stream #
NIL
THEN {
[] ¬ XNSStream.SendCloseReply[dH.stream
! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE];
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ¬ NIL };
};
ErrorPutBulkDataSource: CrRPC.PutBulkDataXferProcProc
[h: Handle, s: STREAM, proc: BulkDataXferProc]
~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSource on server handle"] };
PutBulkDataSource: CrRPC.PutBulkDataXferProcProc
[h: Handle, s: STREAM, proc: BulkDataXferProc]
~ {
dH: DataHandle ~ NARROW[h.data];
IF proc =
NIL
THEN {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]];
TRUSTED { dH.sendBulkData ¬ proc }};
};
ErrorPutBulkDataSink: CrRPC.PutBulkDataXferProcProc
[h: Handle, s: STREAM, proc: BulkDataXferProc]
~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSink on server handle"] };
PutBulkDataSink: CrRPC.PutBulkDataXferProcProc
[h: Handle, s: STREAM, proc: BulkDataXferProc]
~ {
dH: DataHandle ~ NARROW[h.data];
IF proc =
NIL
THEN {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]];
TRUSTED { dH.recvBulkData ¬ proc }};
};
ErrorGetBulkDataSource:
PROC [h: Handle, s:
STREAM]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
ERROR CrRPC.Error[h, notServerHandle, "GetBDTSource on client handle"] };
GetBulkDataSource:
PROC [h: Handle, s:
STREAM]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
typeCode ¬ CrRPC.GetCard16[s];
SELECT typeCode
FROM
ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => {
bulkDataSource ¬ NIL };
ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] =>
TRUSTED {
bulkDataSource ¬ ServerRecvBulkData };
ORD[CrRPCBackdoor.BulkDataDescriptorType.active],
ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => {
ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] };
ENDCASE => {
ERROR CrRPC.Error[h, protocolError, "bad bulkDataSource type"] };
};
ServerRecvBulkData: CrRPC.BulkDataXferProc
[h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc]
RETURNS [abort: BOOL]
~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ~ RefText.ObtainScratch[bufSize];
nBytes: INT;
state: XNSStream.State;
ssType: XNSSPPTypes.SubSequenceType;
abort ¬ FALSE;
ServerRdyBDTRecv[h];
DO
SELECT nBytes ¬
IO.GetBlock[self~dH.stream, block~b]
FROM
0 => {
[state~state, ssType~ssType] ¬ XNSStream.GetStatus[dH.stream, TRUE];
SELECT state
FROM
attention => { abort ¬ TRUE; EXIT };
endOfMessage => { dH.inputFlushNeeded ¬ FALSE; EXIT };
ssTypeChange =>
SELECT ssType
FROM
0 => ERROR CrRPC.Error[h, protocolError, "bdt no eom"];
XNSSPPTypes.endSST, XNSSPPTypes.endReplySST =>
ERROR CrRPC.Error[h, remoteClose, "remote close"];
ENDCASE => NULL; -- ???? is this an error ????
open => NULL;
ENDCASE => ERROR;
};
ENDCASE => {
IO.PutBlock[self~s, block~b, count~nBytes];
};
IF checkAbort[h].abort
THEN {
XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ????
EXIT };
ENDLOOP;
RefText.ReleaseScratch[b];
};
ErrorGetBulkDataSink:
PROC [h: Handle, s:
STREAM]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
ERROR CrRPC.Error[h, notServerHandle, "GetBDTSink on client handle"] };
GetBulkDataSink:
PROC [h: Handle, s:
STREAM]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
typeCode ¬ CrRPC.GetCard16[s];
SELECT typeCode
FROM
ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => {
bulkDataSink ¬ NIL };
ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] =>
TRUSTED {
bulkDataSink ¬ ServerSendBulkData };
ORD[CrRPCBackdoor.BulkDataDescriptorType.active],
ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => {
ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] };
ENDCASE => {
ERROR CrRPC.Error[h, protocolError, "bad bulkDataSink type"] };
};
ServerSendBulkData: CrRPC.BulkDataXferProc
[h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc]
RETURNS [abort: BOOL]
~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ~ RefText.ObtainScratch[bufSize];
nBytes: INT;
state: XNSStream.State;
ssType: XNSSPPTypes.SubSequenceType;
abort ¬ FALSE;
ServerRdyBDTSend[h];
XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.bulkDataSST];
{
DO
Check for abort from other side
[state~state, ssType~ssType] ¬ XNSStream.GetStatus[dH.stream, FALSE];
SELECT state
FROM
open => NULL;
attention, endOfMessage, ssTypeChange => {
abort ¬ TRUE;
GOTO FinishWithEOM };
ENDCASE => ERROR;
Check for abort from this side
IF checkAbort[h].abort
THEN {
GOTO FinishWithAttention };
Copy a block of data
SELECT nBytes ¬
IO.GetBlock[self~s, block~b]
FROM
0 => {
GOTO FinishWithEOM };
ENDCASE => {
IO.PutBlock[self~dH.stream, block~b, count~nBytes];
};
ENDLOOP;
EXITS
FinishWithAttention =>
XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ????
FinishWithEOM =>
XNSStream.SendEndOfMessage[dH.stream];
};
RefText.ReleaseScratch[b];
XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.defaultSST];
};
Bulk Data Streams
nextSegment: CARD16 ~ 0;
lastSegment: CARD16 ~ 1;
ReadBulkDataStream: CrRPC.ReadBulkDataStreamProc
[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, readValue: BulkDataValueProc] RETURNS [abort: BOOL]
~ {
ENABLE
{
XNSStream.ConnectionClosed => {
ERROR CrRPC.Error[NIL, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error,
IO.EndOfStream => {
ERROR CrRPC.Error[NIL, communicationFailure, "IO.Error"] };
RuntimeError.BoundsFault =>
ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"];
};
segmentKind: CARD16;
len: CARD16;
DO
IF checkAbort[h] THEN RETURN[FALSE];
segmentKind ¬ CrRPC.GetCard16[s];
len ¬ CrRPC.GetCard16[s];
THROUGH [1..len]
DO
IF checkAbort[h] THEN RETURN[FALSE];
IF readValue[s].abort THEN RETURN[TRUE];
ENDLOOP;
IF segmentKind = lastSegment THEN RETURN[FALSE];
ENDLOOP;
};
WriteBulkDataSegment: CrRPC.WriteBulkDataSegmentProc
[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, writeValue: BulkDataValueProc, n: CARDINAL] RETURNS [abort: BOOL, heAborted: BOOL]
~ {
ENABLE
{
XNSStream.ConnectionClosed => {
ERROR CrRPC.Error[NIL, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error,
IO.EndOfStream => {
ERROR CrRPC.Error[NIL, communicationFailure, "IO.Error"] };
RuntimeError.BoundsFault =>
ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"];
};
CrRPC.PutCard16[s, (IF n > 0 THEN nextSegment ELSE lastSegment)];
CrRPC.PutCard16[s, n];
abort ¬ heAborted ¬ FALSE;
THROUGH [1..n]
DO
IF checkAbort[h] THEN { heAborted ¬ TRUE; RETURN };
IF writeValue[s].abort THEN { abort ¬ TRUE; RETURN };
ENDLOOP;
};
Continuations
ErrorCallContinuation: CrRPC.CallContinuationProc ~ {
ERROR CrRPC.Error[h, notClientHandle, "CallContinuation on server handle"] };
CallContinuation: CrRPC.CallContinuationProc
[h: Handle, proc: ContinuationProc, clientData: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
newStream: IO.STREAM;
{
ENABLE
{
XNSStream.Timeout => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, "Stream timeout"] };
XNSStream.ConnectionClosed => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error,
IO.EndOfStream => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] };
RuntimeError.BoundsFault =>
ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"];
};
newStream ¬ (IF proc # NIL THEN proc[dH.stream, clientData] ELSE dH.stream);
};
IF (newStream # NIL) AND (newStream # dH.stream) THEN
ERROR CrRPC.Error[NIL, notImplemented, NIL];
dH.stream ¬ newStream;
};
ErrorSetContinuation: CrRPC.SetContinuationProc ~ {
ERROR CrRPC.Error[h, notServerHandle, "SetContinuation on client handle"] };
SetContinuation: CrRPC.SetContinuationProc
[h: Handle, proc: ContinuationProc, clientData: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
TRUSTED { dH.continuation ¬ proc };
dH.continuationClientData ¬ clientData };
Cache of Handles
We maintain a cache of recently used client handles with their transport intact. If a handle isn't used for awhile, it times out, the transport is closed and the handle destroyed.
cache: DataHandle ¬ NIL;
cachePulseOut: BasicTime.Pulses ~ BasicTime.MicrosecondsToPulses[8 * 1000000];
cacheSweepInterval: Process.Ticks ~ Process.SecondsToTicks[8];
PulsesSince:
PROC [then: BasicTime.Pulses]
RETURNS [BasicTime.Pulses] ~ {
RETURN [ BasicTime.GetClockPulses[] - then ] };
CheckOutFromCache:
ENTRY
PROC [remote:
XNS.Address]
RETURNS [h: Handle ¬
NIL] ~ {
dH: DataHandle ¬ cache;
prevH: DataHandle ¬ NIL;
WHILE dH #
NIL
DO
IF dH.remote = remote
THEN {
IF prevH = NIL THEN cache ¬ dH.next ELSE prevH.next ¬ dH.next;
h ¬ dH.handle;
dH.handle ¬ NIL; -- for finalization
dH.lastUsed ¬ BasicTime.GetClockPulses[];
EXIT };
prevH ¬ dH; dH ¬ dH.next;
ENDLOOP;
};
CheckInToCache:
ENTRY
PROC [h: Handle] ~ {
dH: DataHandle ¬ NARROW[h.data];
dH.handle ¬ h;
dH.lastUsed ¬ BasicTime.GetClockPulses[];
dH.next ¬ cache;
cache ¬ dH };
CacheSweeper:
PROC ~ {
GetDeadHandles:
ENTRY
PROC
RETURNS [deadList: DataHandle ¬
NIL] ~ {
dH, newCache, newCacheTail: DataHandle ¬ NIL;
WHILE cache #
NIL
DO
dH ¬ cache; cache ¬ cache.next;
IF PulsesSince[dH.lastUsed] <= cachePulseOut
THEN
-- add to new cache -- {
dH.next ¬ NIL;
IF newCacheTail = NIL THEN newCache ¬ dH ELSE newCacheTail.next ¬ dH;
newCacheTail ¬ dH }
ELSE
-- add to dead list -- {
dH.next ¬ deadList;
deadList ¬ dH };
ENDLOOP;
cache ¬ newCache;
};
DestroyDeadHandles:
PROC [deadList: DataHandle] ~ {
dH: DataHandle;
WHILE deadList #
NIL
DO
dH ¬ deadList; deadList ¬ dH.next;
BuryDeadHandle[dH.handle];
ENDLOOP;
};
DO
DestroyDeadHandles[ GetDeadHandles[] ];
Process.Pause[ cacheSweepInterval ];
ENDLOOP;
};
BuryDeadHandle:
PROC [h: Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
dH.handle ¬ NIL; -- break cycle for GC
SendCloseStream[dH];
h.data ¬ NIL };
Handle Creation and Destruction
ErrorFinalize: PROC [h: Handle] ~ { ERROR };
Finalize:
PROC [h: Handle] ~ {
A client has dropped the handle h, which is guaranteed not to be in the cache.
dH: DataHandle ~ NARROW[h.data];
IF dH = NIL THEN -- handle is already dead -- RETURN;
IF PulsesSince[dH.lastUsed] <= cachePulseOut
THEN {
CrRPCBackdoor.RenewClientObject[h];
CheckInToCache[h];
RETURN };
BuryDeadHandle[h];
};
CreateClientHandle: CrRPCBackdoor.CreateClientHandleProc
[remote: CrRPC.RefAddress] RETURNS [Handle]
~ {
handle: Handle;
dH: DataHandle;
addr: XNS.Address;
IF remote =
NIL
THEN
ERROR CrRPC.Error[NIL, addressInappropriateForClass, "missing remote address"];
WITH remote
SELECT
FROM
rA: REF XNS.Address => addr ¬ rA;
ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL];
IF ( alwaysSmashTheSocket ) OR ( addr.socket = XNS.unknownSocket )
THEN addr.socket ¬ XNSWKS.courier;
handle ¬ CheckOutFromCache[addr];
IF handle =
NIL
THEN {
dH ¬ NEW[DataObject ¬ [lastUsed~BasicTime.GetClockPulses[], remote~addr]];
handle ¬ CrRPCBackdoor.NewClientObject[class~$SPP, procs~theClientProcs, data~dH];
};
RETURN [handle];
};
CrRPCBackdoor.RegisterCreateClientHandleProc[$SPP, CreateClientHandle];
CrRPCBackdoor.RegisterCreateListenerProc[$SPP, CreateListener];
TRUSTED { Process.Detach[ FORK CacheSweeper[] ] };
}.