CrRPCSPPImpl.mesa
Copyright Ó 1986, 1988, 1989, 1991 by Xerox Corporation. All rights reserved.
Demers, August 31, 1988 2:24:34 pm PDT
Willie-Sue, January 3, 1989 3:21:26 pm PST
Tim Diebert: June 8, 1989 5:23:42 pm PDT
Courier runtime support for SPP transport.
DIRECTORY
Basics USING [Card32FromF, Card16FromH, FFromCard32, FWORD, HFromCard16, HWORD],
BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses],
CrRPC USING [BeginErrorProc, BeginRejectProc, BeginReturnProc, BulkDataCheckAbortProc, BulkDataSink, BulkDataSource, BulkDataValueProc, BulkDataXferProc, CallContinuationProc, CallProc, ContinuationProc, Error, ErrorReason, GetBulkDataXferProcProc, GetCard16, GetHWord, Handle, invalidArgument, noSuchProcedure, noSuchProgram, noSuchVersion, Object, ProcsObject, PutBulkDataXferProcProc, PutCard16, ReadBulkDataStreamProc, ServerProc, SetContinuationProc, SetParametersProc, WriteBulkDataSegmentProc],
CrRPCBackdoor USING [AbortHdr, AbortMsgHdr, abortMsgHdrBytes, abortMsgType, BulkDataDescriptorType, CallHdr, CallMsgHdr, callMsgHdrBytes, callMsgType, courierVersionNum, CreateClientHandleProc, CreateListenerProc, LookUpServerProc, MsgHdr, --msgHdrBytes,-- NewClientObject, RegisterCreateClientHandleProc, RegisterCreateListenerProc, RejectHdr, RejectMsgHdr, rejectMsgHdrBytes, rejectMsgType, RenewClientObject, ReturnHdr, ReturnMsgHdr, returnMsgHdrBytes, returnMsgType, SessionHdr, sessionHdrBytes],
IO USING [CharsAvail, Close, EndOf, EndOfStream, Error, GetBlock, PutBlock, PutFR1, STREAM, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock],
Process USING [Detach, Pause, SecondsToTicks, Ticks],
RefText USING [ObtainScratch, ReleaseScratch],
Rope USING [ROPE],
RuntimeError USING [BoundsFault],
XNS USING [Address, Socket, unknownSocket],
XNSSPPTypes USING [bulkDataSST, defaultSST, endReplySST, endSST, SubSequenceType],
XNSStream USING [ConnectionClosed, Create, CreateListener, FlushInput, GetStatus, Listener, ListenerProc, SendAttention, SendClose, SendCloseReply, SendEndOfMessage, SendNow, SetSSType, State, SubSequenceType, Timeout, waitForever],
XNSWKS USING [courier];
CrRPCSPPImpl: CEDAR MONITOR
IMPORTS Basics, BasicTime, CrRPC, CrRPCBackdoor, IO, Process, RefText, RuntimeError, XNSStream
~ {
Copied Types and Constants
FWORD: TYPE ~ Basics.FWORD;
HWORD: TYPE ~ Basics.HWORD;
STREAM: TYPE ~ IO.STREAM;
FfC: PROC [c: CARD32] RETURNS [FWORD] ~ INLINE {
RETURN [Basics.FFromCard32[c]] };
HfC: PROC [c: CARD16] RETURNS [HWORD] ~ INLINE {
RETURN [Basics.HFromCard16[c]] };
CfF: PROC [f: FWORD] RETURNS [CARD32] ~ INLINE {
RETURN [Basics.Card32FromF[f]] };
CfH: PROC [h: HWORD] RETURNS [CARD16] ~ INLINE {
RETURN [Basics.Card16FromH[h]] };
Handle: TYPE ~ CrRPC.Handle;
CharsAvail Hack
The "official" semantics of IO.CharsAvail is "the number of times IO.GetChar could be called without blocking", whence IO.CharsAvail must return a nonzero value at end of stream. This code was written assuming CharsAvail is 0 at end of stream. This ought to be fixed, but it's a bit subtle.
MyCharsAvail: PROC [self: STREAM, wait: BOOL ¬ FALSE] RETURNS [n: INT] ~ INLINE {
IF (n ¬ IO.CharsAvail[self, wait]) # 0 THEN IF IO.EndOf[self] THEN n ¬ 0 };
Handle Objects
DataHandle: TYPE ~ REF DataObject;
DataObject: TYPE ~ RECORD [
next: DataHandle ¬ NIL,
handle: Handle ¬ NIL,
lastUsed: BasicTime.Pulses,
stream: IO.STREAM ¬ NIL,
inputFlushNeeded: BOOL ¬ FALSE,
remote: XNS.Address,
sendBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles
recvBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles
continuation: CrRPC.ContinuationProc ¬ NIL, -- for server handles
continuationClientData: REF -- for server handles
];
theClientProcs: REF CrRPC.ProcsObject
~ NEW[ CrRPC.ProcsObject ¬ [
destroy~CheckInToCache,
setParameters~SetParameters,
call~Call,
getBulkDataSource~ErrorGetBulkDataSource,
getBulkDataSink~ErrorGetBulkDataSink,
putBulkDataSource~PutBulkDataSource,
putBulkDataSink~PutBulkDataSink,
readBulkDataStream~ReadBulkDataStream,
writeBulkDataSegment~WriteBulkDataSegment,
callContinuation~CallContinuation,
setContinuation~ErrorSetContinuation,
finalize~Finalize
]
];
theServerProcs: REF CrRPC.ProcsObject
~ NEW[ CrRPC.ProcsObject ¬ [
destroy~CheckInToCache,
setParameters~SetParameters,
call~ErrorCall,
getBulkDataSource~GetBulkDataSource,
getBulkDataSink~GetBulkDataSink,
putBulkDataSource~ErrorPutBulkDataSource,
putBulkDataSink~ErrorPutBulkDataSink,
readBulkDataStream~ReadBulkDataStream,
writeBulkDataSegment~WriteBulkDataSegment,
callContinuation~ErrorCallContinuation,
setContinuation~SetContinuation,
finalize~ErrorFinalize
]
];
Variables
alwaysSmashTheSocket: BOOL ¬ FALSE;
Parameters
Class-specific operations:
$GetRemote returns a REF to the XNS.Address of the remote site associated with this handle. (Works for either client or server handles).
SetParameters: CrRPC.SetParametersProc
[h: Handle, op: ATOM, argument: REF] RETURNS [hNew: Handle, result: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
hNew ¬ h;
SELECT op FROM
$GetRemote => result ¬ NEW[XNS.Address ¬ dH.remote];
ENDCASE => ERROR CrRPC.Error[h, unknownOperation, NIL];
};
Client Call
Notes:
Before raising CrRPC.Error[h, ...] require that the stream NARROW[h.data].stream be in a reasonable state (possibly NIL).
The ONLY (expected) error that may be passed to clients is CrRPC.Error — stream class-specific errors MUST be caught here.
We catch RuntimeError.BoundsFault on read operations, because that could be raised by any getResults or getBulkData proc, and catching it here enables us to fix up the state of the transport stream.
ErrorCall: CrRPC.CallProc ~ {
ERROR CrRPC.Error[h, notClientHandle, "call using server handle"] };
Call: CrRPC.CallProc
[h: Handle, remotePgm: CARD32, remotePgmVersion: CARD16, remoteProc: CARD16, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]
~ {
dH: DataHandle ~ NARROW[h.data];
ConnectToServer: PROC ~ {
Connect to the remote server. If the connect fails, dH.stream will be left NIL.
The version-number pair is deliberately set to include more than one version so the server will be forced to return his version number pair immediately.
ERRORS: XNSStream.ConnectionClosed, CrRPC.Error, IO.Error.
sessionHdrOut: CrRPCBackdoor.SessionHdr ¬ [
lowVersion~HfC[CrRPCBackdoor.courierVersionNum - 1],
highVersion~HfC[CrRPCBackdoor.courierVersionNum]];
sessionHdrIn: CrRPCBackdoor.SessionHdr;
nRead: NAT;
IF dH.stream # NIL THEN ERROR;
dH.stream ¬ XNSStream.Create[remote~dH.remote,
getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrOut]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] };
XNSStream.SendNow[dH.stream];
TRUSTED {
nRead ¬ IO.UnsafeGetBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrIn]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] };
dH.inputFlushNeeded ¬ FALSE; -- ???? Is there EOM on version number pair ????
IF (nRead # CrRPCBackdoor.sessionHdrBytes)
OR (CfH[sessionHdrIn.lowVersion] > CrRPCBackdoor.courierVersionNum)
OR (CfH[sessionHdrIn.highVersion] < CrRPCBackdoor.courierVersionNum) THEN {
SendCloseStream[dH];
ERROR CrRPC.Error[h, courierVersionMismatch, "Courier version error"];
};
};
SendCallMsg: PROC ~ {
Send the call message.
ERRORS: XNSStream.ConnectionClosed, IO.Error
callMsgHdr: CrRPCBackdoor.CallMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.callMsgType],
callHdr~[
tID~[0, 0],
pgmNum~FfC[remotePgm],
pgmVersion~HfC[remotePgmVersion],
procNum~HfC[remoteProc]
]
];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]] };
IF putArgs # NIL THEN putArgs[h, dH.stream];
XNSStream.SendEndOfMessage[dH.stream];
};
ReadResultMsg: PROC ~ {
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed, IO.Error, IO.EndOfStream
msgType: HWORD ¬ CrRPC.GetHWord[dH.stream];
SELECT msgType FROM
CrRPCBackdoor.rejectMsgType => {
rejectReason: CARDINAL;
[] ¬ CrRPC.GetHWord[dH.stream];
rejectReason ¬ CrRPC.GetCard16[dH.stream];
[] ¬ AdvanceInput[h~h, wait~FALSE];
ERROR CrRPC.Error[h,
(SELECT rejectReason FROM
CrRPC.noSuchProgram => rejectedNoSuchProgram,
CrRPC.noSuchVersion => rejectedNoSuchVersion,
CrRPC.noSuchProcedure => rejectedNoSuchProcedure,
CrRPC.invalidArgument => rejectedInvalidArgument,
ENDCASE => rejectedUnspecified),
"rejected"];
};
CrRPCBackdoor.returnMsgType => {
[] ¬ CrRPC.GetHWord[dH.stream];
IF getResults # NIL THEN getResults[h, dH.stream];
[] ¬ AdvanceInput[h~h, wait~FALSE];
};
CrRPCBackdoor.abortMsgType => {
errNum: CARDINAL;
[] ¬ CrRPC.GetHWord[dH.stream];
errNum ¬ CrRPC.GetCard16[dH.stream];
IF getError # NIL THEN getError[h, dH.stream, errNum];
[] ¬ AdvanceInput[h~h, wait~FALSE];
ERROR CrRPC.Error[h, remoteError, "unexpected remote error"] };
ENDCASE => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, protocolError, "bad msgType in response"] };
};
ClientRdySysRecv: PROC ~ {
Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE.
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput, GetStatus)
???? SAME AS ServerRdySysRecv! ????
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
DO
[state, sst] ¬ AdvanceInput[h~h, wait~TRUE];
IF state = open THEN EXIT;
[] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
IF sst # 0 THEN {
SmashStream[dH];
ERROR CrRPC.Error[h, protocolError, "sys sst # 0"] };
dH.inputFlushNeeded ¬ TRUE;
};
ClientRdyBDTSend: PROC RETURNS [ok: BOOL] ~ {
Prepare to send bulk data — (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case nothing arrives on the stream. If data arrives, it's a reject message from the other end, and the bulk data transfer should not be attempted. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately).
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput, CharsAvail)
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ¬ AdvanceInput[h~h, wait~FALSE];
SELECT state FROM
open => {
ok ¬ (MyCharsAvail[self~dH.stream, wait~FALSE] = 0) };
attention => {
ok ¬ TRUE };
ENDCASE => ERROR;
};
ClientRdyBDTRecv: PROC RETURNS [ok: BOOL] ~ {
Prepare to receive bulk data — (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case bulk data arrives on the stream.
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput)
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ¬ AdvanceInput[h~h, wait~TRUE];
SELECT state FROM
open, endOfMessage => {
ok ¬ (sst # 0);
dH.inputFlushNeeded ¬ ok };
attention => {
ok ¬ TRUE };
ENDCASE => ERROR;
};
CheckAbortRecv: CrRPC.BulkDataCheckAbortProc ~ {
[] ¬ IO.CharsAvail[self~dH.stream, wait~TRUE]; -- MyCharsAvail not needed here!
RETURN [ XNSStream.GetStatus[self~dH.stream, reset~FALSE].state = attention ] };
CheckAbortSend: CrRPC.BulkDataCheckAbortProc ~ {
RETURN[ (XNSStream.GetStatus[self~dH.stream, reset~FALSE].state # open)
OR (MyCharsAvail[self~dH.stream, wait~FALSE] > 0) ] };
Logic of Courier call begins here:
BEGIN
dH.sendBulkData ¬ NIL;
dH.recvBulkData ¬ NIL;
BEGIN
IF dH.stream # NIL THEN {
ENABLE XNSStream.ConnectionClosed, CrRPC.Error, IO.Error => {
SmashStream[dH];
CONTINUE };
[] ¬ AdvanceInput[h~h, wait~FALSE]; -- check for close
SendCallMsg[];
GOTO Done };
{ -- dH.stream = NIL --
ENABLE {
XNSStream.ConnectionClosed => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] };
};
ConnectToServer[];
SendCallMsg[];
GOTO Done };
EXITS
Done => NULL;
END;
At this point we have an open connection to the server and a call message has been sent successfully.
BEGIN
ENABLE {
XNSStream.ConnectionClosed => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] };
};
SELECT TRUE FROM
(dH.sendBulkData # NIL) => {
Bulk data transfer from client to server.
IF ClientRdyBDTSend[].ok THEN {
XNSStream.SetSSType[dH.stream, XNSSPPTypes.bulkDataSST];
IF dH.sendBulkData[h, dH.stream, CheckAbortSend].abort
THEN XNSStream.SendAttention[dH.stream, 1] -- ???? Define 1 ????
ELSE XNSStream.SendEndOfMessage[dH.stream];
XNSStream.SetSSType[dH.stream, XNSSPPTypes.defaultSST];
};
};
(dH.recvBulkData # NIL) => {
Bulk data transfer from server to client.
ENABLE {
IO.EndOfStream => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, bulkDataError, "end of stream"] };
RuntimeError.BoundsFault => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, bulkDataError, "bulk data value bounds fault"] };
};
IF ClientRdyBDTRecv[].ok THEN {
There's bulk data (or an attention) in the stream.
IF dH.recvBulkData[h, dH.stream, CheckAbortRecv].abort
THEN XNSStream.SendAttention[dH.stream, 1]; -- ???? Define 1 ????
};
};
ENDCASE => {
No bulk data transfer in either direction.
NULL;
};
{
ENABLE {
IO.EndOfStream => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, resultsError, "end of stream"] };
RuntimeError.BoundsFault => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, resultsError, "result value bounds fault"] };
};
ClientRdySysRecv[];
ReadResultMsg[];
[] ← AdvanceInput[h~h, wait~FALSE]; -- done by ReadResultMsg
};
END;
END;
}; -- of Call
Server Process
Notes:
Any ERROR — CrRPC.Error, XNSStream.ConnectionClosed, IO.Error, IO.EndOfStream, RuntimeError.BoundsFault — eventually causes the transport to be discarded without attempting to serve any more calls on it.
ServerRdySysRecv: PROC [h: Handle] ~ {
Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE.
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
DO
[state, sst] ¬ AdvanceInput[h~h, wait~TRUE];
IF state = open THEN EXIT;
[] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"];
dH.inputFlushNeeded ¬ TRUE;
};
ServerRdyBDTRecv: PROC [h: Handle] ~ {
Prepare to receive bulk data — (try to) check whether the other side has sent us anything besides bulk data — closed the stream, etc. In the normal case bulk data arrives on the stream.
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ¬ AdvanceInput[h~h, wait~TRUE];
SELECT state FROM
open, endOfMessage => {
IF sst = 0 THEN ERROR CrRPC.Error[h, protocolError, "missing bulk data"];
dH.inputFlushNeeded ¬ TRUE };
attention => {
NULL };
ENDCASE => ERROR;
};
ServerRdyBDTSend: PROC [h: Handle] ~ {
Prepare to send bulk data — (try to) check whether the other side has closed the stream or aborted the transfer. In the normal case nothing arrives on the stream. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately).
ERRORS: CrRPC.Error, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ¬ AdvanceInput[h~h, wait~FALSE];
SELECT state FROM
open => {
IF MyCharsAvail[self~dH.stream, wait~FALSE] # 0 THEN
ERROR CrRPC.Error[h, protocolError, "client send to bdt source"];
};
attention => {
NULL };
ENDCASE => ERROR;
};
BeginReturn: CrRPC.BeginReturnProc
[h: Handle]
~ {
ERRORS: XNSStream.ConnectionClosed, IO.Error
dH: DataHandle ~ NARROW[h.data];
returnMsgHdr: CrRPCBackdoor.ReturnMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.returnMsgType],
returnHdr~[tID~[hi~0, lo~0]]
];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@returnMsgHdr]], startIndex~0, count~CrRPCBackdoor.returnMsgHdrBytes]] };
};
BeginError: CrRPC.BeginErrorProc
[h: Handle, errNum: CARDINAL]
~{
ERRORS: XNSStream.ConnectionClosed, IO.Error
dH: DataHandle ~ NARROW[h.data];
abortMsgHdr: CrRPCBackdoor.AbortMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.abortMsgType],
abortHdr~[tID~[0,0], errNum~HfC[errNum]]
];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@abortMsgHdr]], startIndex~0, count~CrRPCBackdoor.abortMsgHdrBytes]] };
};
BeginReject: CrRPC.BeginRejectProc
[h: Handle, rejectReason: CARDINAL]
~ {
ERRORS: XNSStream.ConnectionClosed, IO.Error
dH: DataHandle ~ NARROW[h.data];
rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.rejectMsgType],
rejectHdr~[tID~[0,0], rejectReason~HfC[rejectReason]]
];
TRUSTED {
IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] };
};
ListenerProc: XNSStream.ListenerProc
[stream: IO.STREAM, remote: XNS.Address]
~ {
sessionHdr: CrRPCBackdoor.SessionHdr;
callMsgHdr: CrRPCBackdoor.CallMsgHdr;
desiredPgm: CARD32;
desiredPgmVersion: CARD16;
desiredProc: CARD16;
serverProc: CrRPC.ServerProc;
nBytes: INT;
h: Handle;
dH: DataHandle;
BEGIN
ENABLE {
XNSStream.ConnectionClosed => GOTO Out;
CrRPC.Error, IO.Error, XNSStream.Timeout, RuntimeError.BoundsFault => GOTO Close;
};
Exchange version numbers
TRUSTED {
nBytes ¬ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]]};
IF nBytes # CrRPCBackdoor.sessionHdrBytes THEN GOTO Close;
IF (CfH[sessionHdr.lowVersion] > CrRPCBackdoor.courierVersionNum)
OR (CfH[sessionHdr.highVersion] < CrRPCBackdoor.courierVersionNum) THEN {
GOTO Close };
sessionHdr.lowVersion ¬ sessionHdr.highVersion ¬ HfC[CrRPCBackdoor.courierVersionNum];
TRUSTED { IO.UnsafePutBlock[stream,
[base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]] };
XNSStream.SendNow[stream];
Create server handle
dH ¬ NEW[DataObject ¬ [lastUsed~, stream~stream, remote~remote]];
h ¬ NEW[CrRPC.Object ¬ [class~$SPP, kind~server, procs~theServerProcs, data~dH, clientData~NIL]];
Process calls until timeout or unrecoverable error
DO
Get message; make sure it's a call, get desired program, version and proc number
ServerRdySysRecv[h];
TRUSTED { nBytes ¬ IO.UnsafeGetBlock[stream,
[base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]]};
IF (nBytes # CrRPCBackdoor.callMsgHdrBytes)
OR (callMsgHdr.msgHdr.msgType # CrRPCBackdoor.callMsgType) THEN {
GOTO Close };
desiredPgm ¬ CfF[callMsgHdr.callHdr.pgmNum];
desiredPgmVersion ¬ CfH[callMsgHdr.callHdr.pgmVersion];
desiredProc ¬ CfH[callMsgHdr.callHdr.procNum];
serverProc ¬ CrRPCBackdoor.LookUpServerProc[desiredPgm, desiredPgmVersion];
SELECT serverProc FROM
NIL => -- Don't have server, reject the call -- {
rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr ¬ [
msgHdr~[msgType~CrRPCBackdoor.rejectMsgType],
rejectHdr~[tID~[0,0], rejectReason~HfC[CrRPC.noSuchProgram]]
];
TRUSTED { IO.UnsafePutBlock[stream,
[base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] };
XNSStream.SendEndOfMessage[stream] };
ENDCASE => -- Serve the call -- {
serverProc[h, stream, desiredPgm, desiredPgmVersion, desiredProc, BeginReturn, BeginError, BeginReject];
IF dH.stream = NIL THEN GOTO Smashed; -- client caught error
XNSStream.SendEndOfMessage[stream];
IF dH.continuation # NIL THEN {
newStream: IO.STREAM
¬ dH.continuation[dH.stream, dH.continuationClientData];
IF newStream = NIL THEN GOTO Closed;
IF newStream # dH.stream THEN GOTO Smashed;
};
};
ENDLOOP;
END;
EXITS
Close => {
[] ¬ XNSStream.SendClose[stream
! XNSStream.ConnectionClosed => CONTINUE];
IO.Close[self~stream, abort~TRUE];
};
Out => {
IO.Close[self~stream, abort~TRUE];
};
Closed, Smashed => NULL;
};
ListenerValue: TYPE ~ REF ListenerValueObject;
ListenerValueObject: TYPE ~ RECORD [
next: ListenerValue,
socket: XNS.Socket,
listener: XNSStream.Listener
];
listeners: ListenerValue ¬ NIL;
CreateListener: ENTRY CrRPCBackdoor.CreateListenerProc
[local: CrRPC.RefAddress]
~ {
newListener: XNSStream.Listener;
socket: XNS.Socket ¬ XNS.unknownSocket;
IF local # NIL THEN WITH local SELECT FROM
rA: REF XNS.Address => socket ¬ rA.socket;
rS: REF XNS.Socket => socket ¬ rS­;
ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL];
IF socket = XNS.unknownSocket THEN socket ¬ XNSWKS.courier;
FOR p: ListenerValue ¬ listeners, p.next WHILE p # NIL DO
IF p.socket = socket THEN RETURN;
ENDLOOP;
newListener ¬ XNSStream.CreateListener[socket~socket, worker~ListenerProc,
getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever];
listeners ¬ NEW[ListenerValueObject ¬
[next~listeners, socket~socket, listener~newListener]];
};
I/O and Marshalling
AdvanceInput: PROC [h: Handle, wait: BOOL ¬ FALSE]
RETURNS [state: XNSStream.State, ssType: XNSStream.SubSequenceType] ~ {
Advance input stream to next significant piece of input, checking for remote close and flushing unread input if needed.
ERRORS: XNSStream.ConnectionClosed, CrRPC.Error
dH: DataHandle ~ NARROW[h.data];
sawBreak: BOOL ¬ FALSE;
IF dH.inputFlushNeeded THEN {
[] ¬ XNSStream.FlushInput[self~dH.stream, wait~TRUE];
dH.inputFlushNeeded ¬ FALSE };
DO
[state~state, ssType~ssType] ¬ XNSStream.GetStatus[self~dH.stream, reset~FALSE];
IF (ssType = XNSSPPTypes.endSST) OR (ssType = XNSSPPTypes.endReplySST) THEN {
ReplyCloseStream[dH];
ERROR CrRPC.Error[h, protocolError, "remote close"] };
SELECT state FROM
open => {
IF NOT wait THEN RETURN;
IF MyCharsAvail[self~dH.stream, wait~TRUE] > 0 THEN RETURN;
LOOP };
attention => RETURN;
endOfMessage => {
IF sawBreak THEN RETURN;
sawBreak ¬ TRUE };
ssTypeChange => {
sawBreak ¬ TRUE };
ENDCASE => ERROR;
[] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
};
SmashStream: PROC [dH: DataHandle] ~ {
ERRORS: none.
IF dH.stream # NIL THEN {
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ¬ NIL };
};
SendCloseStream: PROC [dH: DataHandle] ~ {
ERRORS: none.
IF dH.stream # NIL THEN {
[] ¬ XNSStream.SendClose[dH.stream
! XNSStream.ConnectionClosed => CONTINUE];
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ¬ NIL };
};
ReplyCloseStream: PROC [dH: DataHandle] ~ {
ERRORS: none.
IF dH.stream # NIL THEN {
[] ¬ XNSStream.SendCloseReply[dH.stream
! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE];
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ¬ NIL };
};
ErrorPutBulkDataSource: CrRPC.PutBulkDataXferProcProc
[h: Handle, s: STREAM, proc: BulkDataXferProc]
~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSource on server handle"] };
PutBulkDataSource: CrRPC.PutBulkDataXferProcProc
[h: Handle, s: STREAM, proc: BulkDataXferProc]
~ {
dH: DataHandle ~ NARROW[h.data];
IF proc = NIL
THEN {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]];
TRUSTED { dH.sendBulkData ¬ proc }};
};
ErrorPutBulkDataSink: CrRPC.PutBulkDataXferProcProc
[h: Handle, s: STREAM, proc: BulkDataXferProc]
~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSink on server handle"] };
PutBulkDataSink: CrRPC.PutBulkDataXferProcProc
[h: Handle, s: STREAM, proc: BulkDataXferProc]
~ {
dH: DataHandle ~ NARROW[h.data];
IF proc = NIL
THEN {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]];
TRUSTED { dH.recvBulkData ¬ proc }};
};
ErrorGetBulkDataSource: PROC [h: Handle, s: STREAM]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
ERROR CrRPC.Error[h, notServerHandle, "GetBDTSource on client handle"] };
GetBulkDataSource: PROC [h: Handle, s: STREAM]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
typeCode ¬ CrRPC.GetCard16[s];
SELECT typeCode FROM
ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => {
bulkDataSource ¬ NIL };
ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED {
bulkDataSource ¬ ServerRecvBulkData };
ORD[CrRPCBackdoor.BulkDataDescriptorType.active],
ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => {
ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] };
ENDCASE => {
ERROR CrRPC.Error[h, protocolError, "bad bulkDataSource type"] };
};
ServerRecvBulkData: CrRPC.BulkDataXferProc
[h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc]
RETURNS [abort: BOOL]
~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ~ RefText.ObtainScratch[bufSize];
nBytes: INT;
state: XNSStream.State;
ssType: XNSSPPTypes.SubSequenceType;
abort ¬ FALSE;
ServerRdyBDTRecv[h];
DO
SELECT nBytes ¬ IO.GetBlock[self~dH.stream, block~b] FROM
0 => {
[state~state, ssType~ssType] ¬ XNSStream.GetStatus[dH.stream, TRUE];
SELECT state FROM
attention => { abort ¬ TRUE; EXIT };
endOfMessage => { dH.inputFlushNeeded ¬ FALSE; EXIT };
ssTypeChange => SELECT ssType FROM
0 => ERROR CrRPC.Error[h, protocolError, "bdt no eom"];
XNSSPPTypes.endSST, XNSSPPTypes.endReplySST =>
ERROR CrRPC.Error[h, remoteClose, "remote close"];
ENDCASE => NULL; -- ???? is this an error ????
open => NULL;
ENDCASE => ERROR;
};
ENDCASE => {
IO.PutBlock[self~s, block~b, count~nBytes];
};
IF checkAbort[h].abort THEN {
XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ????
EXIT };
ENDLOOP;
RefText.ReleaseScratch[b];
};
ErrorGetBulkDataSink: PROC [h: Handle, s: STREAM]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
ERROR CrRPC.Error[h, notServerHandle, "GetBDTSink on client handle"] };
GetBulkDataSink: PROC [h: Handle, s: STREAM]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
typeCode ¬ CrRPC.GetCard16[s];
SELECT typeCode FROM
ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => {
bulkDataSink ¬ NIL };
ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED {
bulkDataSink ¬ ServerSendBulkData };
ORD[CrRPCBackdoor.BulkDataDescriptorType.active],
ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => {
ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] };
ENDCASE => {
ERROR CrRPC.Error[h, protocolError, "bad bulkDataSink type"] };
};
ServerSendBulkData: CrRPC.BulkDataXferProc
[h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc]
RETURNS [abort: BOOL]
~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ~ RefText.ObtainScratch[bufSize];
nBytes: INT;
state: XNSStream.State;
ssType: XNSSPPTypes.SubSequenceType;
abort ¬ FALSE;
ServerRdyBDTSend[h];
XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.bulkDataSST];
{
DO
Check for abort from other side
[state~state, ssType~ssType] ¬ XNSStream.GetStatus[dH.stream, FALSE];
SELECT state FROM
open => NULL;
attention, endOfMessage, ssTypeChange => {
abort ¬ TRUE;
GOTO FinishWithEOM };
ENDCASE => ERROR;
Check for abort from this side
IF checkAbort[h].abort THEN {
GOTO FinishWithAttention };
Copy a block of data
SELECT nBytes ¬ IO.GetBlock[self~s, block~b] FROM
0 => {
GOTO FinishWithEOM };
ENDCASE => {
IO.PutBlock[self~dH.stream, block~b, count~nBytes];
};
ENDLOOP;
EXITS
FinishWithAttention =>
XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ????
FinishWithEOM =>
XNSStream.SendEndOfMessage[dH.stream];
};
RefText.ReleaseScratch[b];
XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.defaultSST];
};
Bulk Data Streams
nextSegment: CARD16 ~ 0;
lastSegment: CARD16 ~ 1;
ReadBulkDataStream: CrRPC.ReadBulkDataStreamProc
[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, readValue: BulkDataValueProc] RETURNS [abort: BOOL]
~ {
ENABLE {
XNSStream.ConnectionClosed => {
ERROR CrRPC.Error[NIL, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error, IO.EndOfStream => {
ERROR CrRPC.Error[NIL, communicationFailure, "IO.Error"] };
RuntimeError.BoundsFault =>
ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"];
};
segmentKind: CARD16;
len: CARD16;
DO
IF checkAbort[h] THEN RETURN[FALSE];
segmentKind ¬ CrRPC.GetCard16[s];
len ¬ CrRPC.GetCard16[s];
THROUGH [1..len] DO
IF checkAbort[h] THEN RETURN[FALSE];
IF readValue[s].abort THEN RETURN[TRUE];
ENDLOOP;
IF segmentKind = lastSegment THEN RETURN[FALSE];
ENDLOOP;
};
WriteBulkDataSegment: CrRPC.WriteBulkDataSegmentProc
[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, writeValue: BulkDataValueProc, n: CARDINAL] RETURNS [abort: BOOL, heAborted: BOOL]
~ {
ENABLE {
XNSStream.ConnectionClosed => {
ERROR CrRPC.Error[NIL, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error, IO.EndOfStream => {
ERROR CrRPC.Error[NIL, communicationFailure, "IO.Error"] };
RuntimeError.BoundsFault =>
ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"];
};
CrRPC.PutCard16[s, (IF n > 0 THEN nextSegment ELSE lastSegment)];
CrRPC.PutCard16[s, n];
abort ¬ heAborted ¬ FALSE;
THROUGH [1..n] DO
IF checkAbort[h] THEN { heAborted ¬ TRUE; RETURN };
IF writeValue[s].abort THEN { abort ¬ TRUE; RETURN };
ENDLOOP;
};
Continuations
ErrorCallContinuation: CrRPC.CallContinuationProc ~ {
ERROR CrRPC.Error[h, notClientHandle, "CallContinuation on server handle"] };
CallContinuation: CrRPC.CallContinuationProc
[h: Handle, proc: ContinuationProc, clientData: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
newStream: IO.STREAM;
{
ENABLE {
XNSStream.Timeout => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, "Stream timeout"] };
XNSStream.ConnectionClosed => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] };
IO.Error, IO.EndOfStream => {
SmashStream[dH];
ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] };
RuntimeError.BoundsFault =>
ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"];
};
newStream ¬ (IF proc # NIL THEN proc[dH.stream, clientData] ELSE dH.stream);
};
IF (newStream # NIL) AND (newStream # dH.stream) THEN
ERROR CrRPC.Error[NIL, notImplemented, NIL];
dH.stream ¬ newStream;
};
ErrorSetContinuation: CrRPC.SetContinuationProc ~ {
ERROR CrRPC.Error[h, notServerHandle, "SetContinuation on client handle"] };
SetContinuation: CrRPC.SetContinuationProc
[h: Handle, proc: ContinuationProc, clientData: REF]
~ {
dH: DataHandle ~ NARROW[h.data];
TRUSTED { dH.continuation ¬ proc };
dH.continuationClientData ¬ clientData };
Cache of Handles
We maintain a cache of recently used client handles with their transport intact. If a handle isn't used for awhile, it times out, the transport is closed and the handle destroyed.
cache: DataHandle ¬ NIL;
cachePulseOut: BasicTime.Pulses ~ BasicTime.MicrosecondsToPulses[8 * 1000000];
cacheSweepInterval: Process.Ticks ~ Process.SecondsToTicks[8];
PulsesSince: PROC [then: BasicTime.Pulses] RETURNS [BasicTime.Pulses] ~ {
RETURN [ BasicTime.GetClockPulses[] - then ] };
CheckOutFromCache: ENTRY PROC [remote: XNS.Address] RETURNS [h: Handle ¬ NIL] ~ {
dH: DataHandle ¬ cache;
prevH: DataHandle ¬ NIL;
WHILE dH # NIL DO
IF dH.remote = remote THEN {
IF prevH = NIL THEN cache ¬ dH.next ELSE prevH.next ¬ dH.next;
h ¬ dH.handle;
dH.handle ¬ NIL; -- for finalization
dH.lastUsed ¬ BasicTime.GetClockPulses[];
EXIT };
prevH ¬ dH; dH ¬ dH.next;
ENDLOOP;
};
CheckInToCache: ENTRY PROC [h: Handle] ~ {
dH: DataHandle ¬ NARROW[h.data];
dH.handle ¬ h;
dH.lastUsed ¬ BasicTime.GetClockPulses[];
dH.next ¬ cache;
cache ¬ dH };
CacheSweeper: PROC ~ {
GetDeadHandles: ENTRY PROC RETURNS [deadList: DataHandle ¬ NIL] ~ {
dH, newCache, newCacheTail: DataHandle ¬ NIL;
WHILE cache # NIL DO
dH ¬ cache; cache ¬ cache.next;
IF PulsesSince[dH.lastUsed] <= cachePulseOut
THEN -- add to new cache -- {
dH.next ¬ NIL;
IF newCacheTail = NIL THEN newCache ¬ dH ELSE newCacheTail.next ¬ dH;
newCacheTail ¬ dH }
ELSE -- add to dead list -- {
dH.next ¬ deadList;
deadList ¬ dH };
ENDLOOP;
cache ¬ newCache;
};
DestroyDeadHandles: PROC [deadList: DataHandle] ~ {
dH: DataHandle;
WHILE deadList # NIL DO
dH ¬ deadList; deadList ¬ dH.next;
BuryDeadHandle[dH.handle];
ENDLOOP;
};
DO
DestroyDeadHandles[ GetDeadHandles[] ];
Process.Pause[ cacheSweepInterval ];
ENDLOOP;
};
BuryDeadHandle: PROC [h: Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
dH.handle ¬ NIL; -- break cycle for GC
SendCloseStream[dH];
h.data ¬ NIL };
Handle Creation and Destruction
ErrorFinalize: PROC [h: Handle] ~ { ERROR };
Finalize: PROC [h: Handle] ~ {
A client has dropped the handle h, which is guaranteed not to be in the cache.
dH: DataHandle ~ NARROW[h.data];
IF dH = NIL THEN -- handle is already dead -- RETURN;
IF PulsesSince[dH.lastUsed] <= cachePulseOut THEN {
CrRPCBackdoor.RenewClientObject[h];
CheckInToCache[h];
RETURN };
BuryDeadHandle[h];
};
CreateClientHandle: CrRPCBackdoor.CreateClientHandleProc
[remote: CrRPC.RefAddress] RETURNS [Handle]
~ {
handle: Handle;
dH: DataHandle;
addr: XNS.Address;
IF remote = NIL THEN
ERROR CrRPC.Error[NIL, addressInappropriateForClass, "missing remote address"];
WITH remote SELECT FROM
rA: REF XNS.Address => addr ¬ rA­;
ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL];
IF ( alwaysSmashTheSocket ) OR ( addr.socket = XNS.unknownSocket )
THEN addr.socket ¬ XNSWKS.courier;
handle ¬ CheckOutFromCache[addr];
IF handle = NIL THEN {
dH ¬ NEW[DataObject ¬ [lastUsed~BasicTime.GetClockPulses[], remote~addr]];
handle ¬ CrRPCBackdoor.NewClientObject[class~$SPP, procs~theClientProcs, data~dH];
};
RETURN [handle];
};
CrRPCBackdoor.RegisterCreateClientHandleProc[$SPP, CreateClientHandle];
CrRPCBackdoor.RegisterCreateListenerProc[$SPP, CreateListener];
TRUSTED { Process.Detach[ FORK CacheSweeper[] ] };
}.