CrRPCSImpl.mesa
Copyright © 1986 by Xerox Corporation. All rights reserved.
Demers, November 21, 1986 3:01:24 pm PST
Interim Courier client runtime support for SPP-based Courier.
This depends on the size ratio BYTE::HWORD::FWORD being 1::2::4 but otherwise should work okay on left- or right-handed machines.
TODO:
Fix CheckAbort[] to actually do something!
ServerRecvBulkData isn't finished yet; also, I think it may screw up on some cases of aborting the transfer.
DIRECTORY
BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses],
CrRPC USING [BeginErrorProc, BeginRejectProc, BeginReturnProc, BulkDataCheckAbortProc, BulkDataSink, BulkDataSource, BulkDataXferProc, CallProc, ClientProcs, Error, ErrorReason, GetCARDINAL, Handle, invalidArgument, MarshallProcs, noSuchProcedure, noSuchProgram, noSuchVersion, Object, PutCARDINAL, ServerProc, ServerProcs],
CrRPCFriends USING [AbortHdr, abortMsgType, BulkDataDescriptorType, CallHdr, --callHdrBytes,-- callMsgType, courierVersionNum, CreateClientHandleProc, CreateListenerProc, LookUpServerProc, MsgHdr, --msgHdrBytes,-- NewClientObject, RegisterCreateClientHandleProc, RegisterCreateListenerProc, RejectHdr, rejectMsgType, ReturnHdr, returnMsgType, SessionHdr, sessionHdrBytes],
Endian USING [BYTE, bytesPerHWord, CardFromF, CardFromH, FFromCard, FWORD, HFromCard, HWORD],
IO USING [CharsAvail, Close, EndOfStream, GetBlock, PutBlock, STREAM, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock],
PrincOpsUtils USING [LongCopy],
Process USING [Detach, Pause, SecondsToTicks, Ticks],
RefText USING [New, ObtainScratch],
Rope USING [ROPE],
XNS USING [Address, Socket, unknownSocket],
XNSSPPBuf USING [maxBodyBytes, maxBodyHWords],
XNSSPPTypes USING [endReplySST, endSST, SubSequenceType],
XNSStream USING [ConnectionClosed, Create, CreateListener, FlushInput, GetStatus, Listener, ListenerProc, Milliseconds, SendAttention, SendClose, SendCloseReply, SendEndOfMessage, SendNow, SetSSType, SetTimeouts, State, SubSequenceType, Timeout],
XNSWKS USING [courier];
CrRPCSImpl: CEDAR MONITOR
IMPORTS BasicTime, CrRPC, CrRPCFriends, Endian, IO, PrincOpsUtils, Process, RefText, XNSStream
~ {
Copied Types and Constants
CARD: TYPE ~ LONG CARDINAL;
BYTE: TYPE ~ Endian.BYTE;
FWORD: TYPE ~ Endian.FWORD;
HWORD: TYPE ~ Endian.HWORD;
bytesPerWord: CARDINAL ~ Endian.bytesPerHWord;
FfC: PROC [c: CARD] RETURNS [FWORD] ~ INLINE {
RETURN [Endian.FFromCard[c]] };
HfC: PROC [c: CARDINAL] RETURNS [HWORD] ~ INLINE {
RETURN [Endian.HFromCard[c]] };
CfF: PROC [f: FWORD] RETURNS [CARD] ~ INLINE {
RETURN [Endian.CardFromF[f]] };
CfH: PROC [h: HWORD] RETURNS [CARDINAL] ~ INLINE {
RETURN [Endian.CardFromH[h]] };
Handle: TYPE ~ CrRPC.Handle;
bufferBodyBytes: CARDINAL ~ XNSSPPBuf.maxBodyBytes;
bufferBodyHWords: CARDINAL ~ XNSSPPBuf.maxBodyHWords;
Handle Objects
HWords: TYPE ~ MACHINE DEPENDENT RECORD [
hWords: ARRAY [0 .. bufferBodyHWords) OF HWORD
];
DataHandle: TYPE ~ REF DataObject;
DataObject: TYPE ~ RECORD [
next: DataHandle ← NIL,
handle: Handle ← NIL,
lastUsed: BasicTime.Pulses,
stream: IO.STREAMNIL,
inputFlushNeeded: BOOLFALSE,
remote: XNS.Address,
timeoutMsec: INT,
sendBulkData: CrRPC.BulkDataXferProc ← NIL, -- for client handles
recvBulkData: CrRPC.BulkDataXferProc ← NIL, -- for client handles
b: REF TEXTNIL,
hIndex: CARDINAL ← 0, -- index in HWORDs of next data, (in XNS (IDP) packet body)
hLength: CARDINAL ← 0, -- length in HWORDs of buffer (XNS (IDP) packet body)
odd: BOOLFALSE -- TRUE on odd bytes, for GetB / putB
];
theMarshallProcs: REF CrRPC.MarshallProcs
~ NEW[ CrRPC.MarshallProcs ← [
putB~PutB, putH~PutH, putF~PutF, unsafePutBlock~UnsafePutBlock, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, putHAlign~HAlign,
getB~GetB, getH~GetH, getF~GetF, unsafeGetBlock~UnsafeGetBlock, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, getHAlign~HAlign
]
];
theClientProcs: REF CrRPC.ClientProcs
~ NEW[ CrRPC.ClientProcs ← [
setRemote~SetRemote,
setTimeout~SetTimeout,
setHops~SetHops,
destroy~CheckInToCache,
finalize~Finalize,
call~Call
]
];
theServerProcs: REF CrRPC.ServerProcs
~ NEW[ CrRPC.ServerProcs ← [getRemote~GetRemote] ];
Courier Messages
Call message (without program-specific body).
CallMsg: TYPE ~ REF CallMsgObject;
CallMsgObject: TYPE ~ MACHINE DEPENDENT RECORD [
msgHdr: CrRPCFriends.MsgHdr,
callHdr: CrRPCFriends.CallHdr
];
callMsgBytes: CARDINAL ~ SIZE[CallMsgObject]*bytesPerWord; -- should be BYTES
callMsgHWords: CARDINAL ~ SIZE[CallMsgObject];
Reject message (without program-specific body).
RejectMsg: TYPE ~ REF RejectMsgObject;
RejectMsgObject: TYPE ~ MACHINE DEPENDENT RECORD [
msgHdr: CrRPCFriends.MsgHdr,
rejectHdr: CrRPCFriends.RejectHdr
];
rejectMsgBytes: CARDINAL ~ SIZE[RejectMsgObject]*bytesPerWord; -- should be BYTES
rejectMsgHWords: CARDINAL ~ SIZE[RejectMsgObject];
Return message (without program-specific body).
ReturnMsg: TYPE ~ REF ReturnMsgObject;
ReturnMsgObject: TYPE ~ MACHINE DEPENDENT RECORD [
msgHdr: CrRPCFriends.MsgHdr,
returnHdr: CrRPCFriends.ReturnHdr
];
returnMsgBytes: CARDINAL ~ SIZE[ReturnMsgObject]*bytesPerWord; -- should be BYTES
returnMsgHWords: CARDINAL ~ SIZE[ReturnMsgObject];
Abort message (without program-specific body).
AbortMsg: TYPE ~ REF AbortMsgObject;
AbortMsgObject: TYPE ~ MACHINE DEPENDENT RECORD [
msgHdr: CrRPCFriends.MsgHdr,
abortHdr: CrRPCFriends.AbortHdr
];
abortMsgBytes: CARDINAL ~ SIZE[AbortMsgObject]*bytesPerWord; -- should be BYTES
abortMsgHWords: CARDINAL ~ SIZE[AbortMsgObject];
Client Call
Call: CrRPC.CallProc
[h: Handle, remotePgm: CARD, remotePgmVersion: CARDINAL, remoteProc: CARDINAL, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]
~ {
dH: DataHandle ~ NARROW[h.data];
ConnectToServer: PROC ~ {
Connect to the remote server. If the connect fails, dH.stream will be left NIL.
The version-number pair is deliberately set to include more than one version so the server will be forced to return his version number pair immediately.
ERRORS: XNSStream.ConnectionClosed, XNSStream.Timeout, CrRPC.Error.
vLow: HWORD ~ HfC[CrRPCFriends.courierVersionNum - 1];
vHigh: HWORD ~ HfC[CrRPCFriends.courierVersionNum];
nRead: NAT;
versionError: BOOL;
sP: LONG POINTER TO CrRPCFriends.SessionHdr;
IF dH.stream # NIL THEN ERROR;
dH.stream ← XNSStream.Create[
remote~dH.remote,
getTimeout~dH.timeoutMsec, putTimeout~dH.timeoutMsec];
TRUSTED {
sP ← LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]];
sP^ ← [lowVersion~vLow, highVersion~vHigh] };
dH.b.length ← CrRPCFriends.sessionHdrBytes;
[] ← IO.PutBlock[dH.stream, dH.b, 0, CrRPCFriends.sessionHdrBytes];
XNSStream.SendNow[dH.stream];
nRead ← IO.GetBlock[dH.stream, dH.b, 0, CrRPCFriends.sessionHdrBytes];
dH.inputFlushNeeded ← FALSE; -- ???? Is there EOM on version number pair ????
TRUSTED { versionError ← (nRead # CrRPCFriends.sessionHdrBytes)
OR (CfH[sP.lowVersion] > vHigh) OR (CfH[sP.highVersion] < vHigh) };
IF versionError THEN {
SendCloseStream[dH];
ERROR CrRPC.Error[h, courierVersionMismatch, "version exchange error"];
};
};
SendCallMsg: PROC ~ {
Send the call message.
ERRORS: XNSStream.ConnectionClosed, XNSStream.Timeout
TRUSTED {
cP: LONG POINTER TO CallMsgObject ~
LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]];
cP^ ← [
msgHdr~[msgType~CrRPCFriends.callMsgType],
callHdr~[
tID~HfC[0],
pgmNum~FfC[remotePgm],
pgmVersion~HfC[remotePgmVersion],
procNum~HfC[remoteProc]
]
];
};
dH.hIndex ← callMsgHWords;
dH.hLength ← bufferBodyHWords;
IF putArgs # NIL THEN putArgs[h];
PutBuf[dH];
XNSStream.SendEndOfMessage[dH.stream];
};
ReadResultMsg: PROC ~ {
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
ENABLE IO.EndOfStream => {
ERROR CrRPC.Error[h, resultsTooShort, "results too short"] };
msgType: CARDINAL;
GetBuf[dH];
msgType ← CrRPC.GetCARDINAL[h];
SELECT msgType FROM
CrRPCFriends.rejectMsgType => {
rejectReason: CARDINAL;
[] ← GetH[h];
rejectReason ← CrRPC.GetCARDINAL[h];
[] ← AdvanceInput[h~h, wait~FALSE];
ERROR CrRPC.Error[h,
(SELECT rejectReason FROM
CrRPC.noSuchProgram => rejectedNoSuchProgram,
CrRPC.noSuchVersion => rejectedNoSuchVersion,
CrRPC.noSuchProcedure => rejectedNoSuchProcedure,
CrRPC.invalidArgument => rejectedInvalidArgument,
ENDCASE => rejectedUnspecified),
"rejected"];
};
CrRPCFriends.returnMsgType => {
[] ← GetH[h];
IF getResults # NIL THEN getResults[h];
[] ← AdvanceInput[h~h, wait~FALSE];
};
CrRPCFriends.abortMsgType => {
errNum: CARDINAL;
[] ← GetH[h];
errNum ← CrRPC.GetCARDINAL[h];
IF getError # NIL THEN getError[h, errNum];
[] ← AdvanceInput[h~h, wait~FALSE];
ERROR CrRPC.Error[h, remoteError, "unexpected remote error"] };
ENDCASE => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, protocolError, "bad msgType in response"] };
};
ClientRdySysRecv: PROC ~ {
Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE.
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
DO
[state, sst] ← AdvanceInput[h~h, wait~TRUE];
IF state = open THEN EXIT;
[] ← XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"];
dH.inputFlushNeeded ← TRUE;
};
ClientRdyBDTSend: PROC RETURNS [ok: BOOL] ~ {
Prepare to send bulk data — (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case nothing arrives on the stream. If data arrives, it's a reject message from the other end, and the bulk data transfer should not be attempted. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately).
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ← AdvanceInput[h~h, wait~FALSE];
SELECT state FROM
open => {
ok ← (IO.CharsAvail[self~dH.stream, wait~FALSE] = 0) };
attention => {
ok ← TRUE };
ENDCASE => ERROR;
};
ClientRdyBDTRecv: PROC RETURNS [ok: BOOL] ~ {
Prepare to receive bulk data — (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case bulk data arrives on the stream.
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ← AdvanceInput[h~h, wait~TRUE];
SELECT state FROM
open, endOfMessage => {
ok ← (sst # 0);
dH.inputFlushNeeded ← ok };
attention => {
ok ← TRUE };
ENDCASE => ERROR;
};
CheckAbortRecv: CrRPC.BulkDataCheckAbortProc ~ {
[] ← IO.CharsAvail[self~dH.stream, wait~TRUE];
RETURN [ XNSStream.GetStatus[self~dH.stream, reset~FALSE].state = attention ] };
CheckAbortSend: CrRPC.BulkDataCheckAbortProc ~ {
RETURN[ (XNSStream.GetStatus[self~dH.stream, reset~FALSE].state # open)
OR (IO.CharsAvail[self~dH.stream, wait~FALSE] > 0) ] };
Logic of Courier call begins here:
BEGIN
dH.sendBulkData ← NIL;
dH.recvBulkData ← NIL;
BEGIN
IF dH.stream # NIL THEN {
ENABLE XNSStream.ConnectionClosed, XNSStream.Timeout, CrRPC.Error => {
IF dH.stream # NIL THEN { IO.Close[dH.stream]; dH.stream ← NIL };
CONTINUE };
[] ← AdvanceInput[h~h, wait~FALSE]; -- check for close
SendCallMsg[];
GOTO Done };
{ -- dH.stream = NIL --
ENABLE XNSStream.ConnectionClosed, XNSStream.Timeout => {
IF dH.stream # NIL THEN { IO.Close[dH.stream]; dH.stream ← NIL };
ERROR CrRPC.Error[h, communicationFailure, "communication failure"] };
ConnectToServer[];
SendCallMsg[];
GOTO Done };
EXITS
Done => NULL;
END;
At this point we have an open connection to the server and a call message has been sent successfully.
BEGIN
ENABLE {
XNSStream.ConnectionClosed, XNSStream.Timeout => {
IF dH.stream = NIL THEN ERROR; -- can't happen
IO.Close[dH.stream]; dH.stream ← NIL;
ERROR CrRPC.Error[h, communicationFailure, "communication failure"] };
};
SELECT TRUE FROM
(dH.sendBulkData # NIL) => {
Bulk data transfer from client to server.
IF ClientRdyBDTSend[].ok THEN {
XNSStream.SetSSType[dH.stream, 1]; -- ???? Define 1 ????
IF dH.sendBulkData[h, dH.stream, CheckAbortSend].abort
THEN XNSStream.SendAttention[dH.stream, 1] -- ???? Define 1 ????
ELSE XNSStream.SendEndOfMessage[dH.stream];
XNSStream.SetSSType[dH.stream, 0]; -- ???? Define 0 ????
};
};
(dH.recvBulkData # NIL) => {
Bulk data transfer from server to client.
IF ClientRdyBDTRecv[].ok THEN {
There's bulk data (or an attention) in the stream.
IF dH.recvBulkData[h, dH.stream, CheckAbortRecv].abort
THEN XNSStream.SendAttention[dH.stream, 1]; -- ???? Define 1 ????
};
};
ENDCASE => {
No bulk data transfer in either direction.
NULL;
};
ClientRdySysRecv[];
ReadResultMsg[];
[] ← AdvanceInput[h~h, wait~FALSE]; -- done by ReadResultMsg
END;
END;
}; -- of Call
Server Process
serverGetTimeout: XNSStream.Milliseconds ~ 90000;
serverPutTimeout: XNSStream.Milliseconds ~ 20000;
ServerRdySysRecv: PROC [h: Handle] ~ {
Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE.
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
DO
[state, sst] ← AdvanceInput[h~h, wait~TRUE];
IF state = open THEN EXIT;
[] ← XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"];
dH.inputFlushNeeded ← TRUE;
};
ServerRdyBDTRecv: PROC [h: Handle] ~ {
Prepare to receive bulk data — (try to) check whether the other side has sent us anything besides bulk data — closed the stream, etc. In the normal case bulk data arrives on the stream.
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ← AdvanceInput[h~h, wait~TRUE];
SELECT state FROM
open, endOfMessage => {
IF sst = 0 THEN ERROR CrRPC.Error[h, protocolError, "missing bulk data"];
dH.inputFlushNeeded ← TRUE };
attention => {
NULL };
ENDCASE => ERROR;
};
ServerRdyBDTSend: PROC [h: Handle] ~ {
Prepare to send bulk data — (try to) check whether the other side has closed the stream or aborted the transfer. In the normal case nothing arrives on the stream. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately).
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ← AdvanceInput[h~h, wait~FALSE];
SELECT state FROM
open => {
IF IO.CharsAvail[self~dH.stream, wait~FALSE] # 0 THEN
ERROR CrRPC.Error[h, protocolError, "client send to bdt source"];
};
attention => {
NULL };
ENDCASE => ERROR;
};
ResetBuffer: PROC [dH: DataHandle] ~ INLINE { dH.hIndex ← dH.hLength ← 0 };
BeginReturn: CrRPC.BeginReturnProc
[h: Handle]
~ {
dH: DataHandle ~ NARROW[h.data];
ResetBuffer[dH];
PutH[h, HfC[CrRPCFriends.returnMsgType]]; -- MsgHdr.msgType
PutH[h, HfC[0]]; -- ReturnHdr.tID
};
BeginError: CrRPC.BeginErrorProc
[h: Handle, errNum: CARDINAL]
~{
dH: DataHandle ~ NARROW[h.data];
ResetBuffer[dH];
PutH[h, HfC[CrRPCFriends.abortMsgType]]; -- MsgHdr.msgType
PutH[h, HfC[0]]; -- AbortHdr.tID
PutH[h, HfC[errNum]]; -- AbortHdr.errNum
};
BeginReject: CrRPC.BeginRejectProc
[h: Handle, rejectReason: CARDINAL]
~ {
dH: DataHandle ~ NARROW[h.data];
ResetBuffer[dH];
PutH[h, HfC[CrRPCFriends.rejectMsgType]]; -- MsgHdr.msgType
PutH[h, HfC[0]]; -- RejectHdr.tID
PutH[h, HfC[rejectReason]]; -- RejectHdr.rejectReason
};
ListenerProc: XNSStream.ListenerProc
[stream: IO.STREAM, remote: XNS.Address]
~ {
sessionHdr: CrRPCFriends.SessionHdr;
callMsgObject: CallMsgObject;
rejectMsgObject: RejectMsgObject;
desiredPgm: CARD;
desiredPgmVersion: CARDINAL;
desiredProc: CARDINAL;
serverProc: CrRPC.ServerProc;
nBytes: INT;
h: Handle;
dH: DataHandle;
b: REF TEXT;
BEGIN
ENABLE XNSStream.ConnectionClosed, CrRPC.Error, XNSStream.Timeout => GOTO Out;
Exchange version numbers
TRUSTED {
nBytes ← IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCFriends.sessionHdrBytes]
! XNSStream.Timeout => {[] ← XNSStream.SendClose[stream]; GOTO Out }]};
IF nBytes # CrRPCFriends.sessionHdrBytes THEN GOTO Out;
IF (CfH[sessionHdr.lowVersion] > CrRPCFriends.courierVersionNum)
OR (CfH[sessionHdr.highVersion] < CrRPCFriends.courierVersionNum) THEN {
[] ← XNSStream.SendClose[stream]; GOTO Out };
sessionHdr.lowVersion ← sessionHdr.highVersion ← HfC[CrRPCFriends.courierVersionNum];
TRUSTED { IO.UnsafePutBlock[stream,
[base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCFriends.sessionHdrBytes]] };
XNSStream.SendNow[stream];
Create server handle
b ← RefText.New[nChars~bufferBodyBytes];
dH ← NEW[DataObject ← [
next~NIL, handle~NIL, lastUsed~, stream~stream, remote~remote, timeoutMsec~, b~b
]];
h ← NEW[CrRPC.Object ← [class~$SPP, kind~server, marshallProcs~theMarshallProcs, procs~theServerProcs, data~dH, clientData~NIL]];
Process calls until timeout or unrecoverable error
DO
Get message; make sure it's a call, get desired program, version and proc number
ServerRdySysRecv[h];
TRUSTED { nBytes ← IO.UnsafeGetBlock[stream,
[base~LOOPHOLE[LONG[@callMsgObject]], count~callMsgBytes]
! XNSStream.Timeout => {[] ← XNSStream.SendClose[stream]; GOTO Out}]};
IF (nBytes # callMsgBytes)
OR (CfH[callMsgObject.msgHdr.msgType] # CrRPCFriends.callMsgType) THEN {
[] ← XNSStream.SendClose[stream]; GOTO Out };
desiredPgm ← CfF[callMsgObject.callHdr.pgmNum];
desiredPgmVersion ← CfH[callMsgObject.callHdr.pgmVersion];
desiredProc ← CfH[callMsgObject.callHdr.procNum];
serverProc ← CrRPCFriends.LookUpServerProc[desiredPgm, desiredPgmVersion];
SELECT serverProc FROM
NIL => -- Don't have server, reject the call -- {
rejectMsgObject.msgHdr.msgType ← HfC[CrRPCFriends.rejectMsgType];
rejectMsgObject.rejectHdr.tID ← HfC[0];
rejectMsgObject.rejectHdr.rejectReason ← HfC[CrRPC.noSuchProgram];
TRUSTED { IO.UnsafePutBlock[stream,
[base~LOOPHOLE[LONG[@rejectMsgObject]], count~rejectMsgBytes]] };
};
ENDCASE => -- Serve the call -- {
ResetBuffer[dH];
serverProc[h, desiredPgm, desiredPgmVersion, desiredProc, BeginReturn, BeginError, BeginReject];
IF dH.stream = NIL THEN GOTO Closed; -- client caught error
PutBuf[dH];
};
XNSStream.SendEndOfMessage[stream];
ENDLOOP;
END;
EXITS
Out => IO.Close[stream];
Closed => NULL;
};
ListenerValue: TYPE ~ REF ListenerValueObject;
ListenerValueObject: TYPE ~ RECORD [
next: ListenerValue,
socket: XNS.Socket,
listener: XNSStream.Listener
];
listeners: ListenerValue ← NIL;
CreateListener: ENTRY CrRPCFriends.CreateListenerProc
[socket: XNS.Socket]
~ {
newListener: XNSStream.Listener;
IF socket = XNS.unknownSocket THEN socket ← XNSWKS.courier;
FOR p: ListenerValue ← listeners, p.next WHILE p # NIL DO
IF p.socket = socket THEN RETURN;
ENDLOOP;
newListener ← XNSStream.CreateListener[socket~socket, worker~ListenerProc,
getTimeout~serverGetTimeout, putTimeout~serverPutTimeout];
listeners ← NEW[ListenerValueObject ←
[next~listeners, socket~socket, listener~newListener]];
};
GetRemote: PROC [h: Handle] RETURNS [XNS.Address] ~ {
dH: DataHandle ~ NARROW[h.data];
RETURN [dH.remote] };
I/O and Marshalling
AdvanceInput: PROC [h: Handle, wait: BOOLFALSE]
RETURNS [state: XNSStream.State, ssType: XNSStream.SubSequenceType] ~ {
Advance input stream to next significant piece of input, checking for remote close and flushing unread input if needed.
ERRORS: XNSStream.ConnectionClosed, XNSStream.Timeout, CrRPC.Error
dH: DataHandle ~ NARROW[h.data];
sawBreak: BOOLFALSE;
IF dH.inputFlushNeeded THEN {
[] ← XNSStream.FlushInput[self~dH.stream, wait~TRUE];
dH.inputFlushNeeded ← FALSE };
DO
[state~state, ssType~ssType] ← XNSStream.GetStatus[self~dH.stream, reset~FALSE];
IF (ssType = XNSSPPTypes.endSST) OR (ssType = XNSSPPTypes.endReplySST) THEN {
ReplyCloseStream[dH];
ERROR CrRPC.Error[h, protocolError, "remote close"] };
SELECT state FROM
open => {
IF NOT wait THEN RETURN;
IF IO.CharsAvail[self~dH.stream, wait~TRUE] > 0 THEN RETURN;
LOOP };
attention => RETURN;
endOfMessage => {
IF sawBreak THEN RETURN;
sawBreak ← TRUE };
ssTypeChange => {
sawBreak ← TRUE };
ENDCASE => ERROR;
[] ← XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
};
SendCloseStream: PROC [dH: DataHandle] ~ {
IF dH.stream # NIL THEN {
[] ← XNSStream.SendClose[dH.stream
! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE];
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ← NIL };
};
ReplyCloseStream: PROC [dH: DataHandle] ~ {
IF dH.stream # NIL THEN {
[] ← XNSStream.SendCloseReply[dH.stream
! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE];
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ← NIL };
};
PutBuf: PROC [dH: DataHandle] ~ {
dH.b.length ← 2*dH.hIndex;
IO.PutBlock[self~dH.stream, block~dH.b];
dH.hIndex ← 0 };
GetBuf: PROC [dH: DataHandle, minBytes: NAT ← 2] ~ {
nRead: NAT ~ IO.GetBlock[self~dH.stream, block~dH.b];
IF nRead < minBytes THEN {
ERROR IO.EndOfStream[dH.stream] };
dH.hLength ← nRead / 2;
dH.hIndex ← 0;
};
PutB: PROC [h: Handle, byte: BYTE] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.odd
THEN {
dH.b[2*dH.hIndex - 1] ← LOOPHOLE[byte];
dH.odd ← FALSE;
}
ELSE {
IF dH.hIndex >= dH.hLength THEN PutBuf[dH];
dH.b[2*dH.hIndex] ← LOOPHOLE[byte];
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← TRUE;
};
};
PutH: PROC [h: Handle, hWord: HWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.hIndex >= dH.hLength THEN PutBuf[dH];
TRUSTED { hP: LONG POINTER TO HWords ~
LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]];
hP.hWords[dH.hIndex] ← hWord };
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← FALSE;
};
PutF: PROC [h: Handle, fWord: FWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF (dH.hIndex+1) >= dH.hLength THEN PutBuf[dH];
TRUSTED {
PrincOpsUtils.LongCopy[
from~@fWord,
to~(LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]] + dH.hIndex),
nwords~2];
};
dH.hIndex ← dH.hIndex + 2;
dH.odd ← FALSE
};
UnsafePutBlock: UNSAFE PROC [h: Handle, block: IO.UnsafeBlock] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.hIndex > 0 THEN PutBuf[dH];
TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~block] };
};
PutBulkDataSource: PROC [h: Handle, bulkDataSource: CrRPC.BulkDataSource] ~ {
dH: DataHandle ~ NARROW[h.data];
IF h.kind # client THEN ERROR CrRPC.Error[h, notClientHandle, "not client handle"];
IF bulkDataSource = NIL
THEN {
CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.immediate]];
TRUSTED { dH.sendBulkData ← bulkDataSource }};
};
PutBulkDataSink: PROC [h: Handle, bulkDataSink: CrRPC.BulkDataSink] ~ {
dH: DataHandle ~ NARROW[h.data];
IF h.kind # client THEN ERROR CrRPC.Error[h, notClientHandle, "not client handle"];
IF bulkDataSink = NIL
THEN {
CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.immediate]];
TRUSTED { dH.recvBulkData ← bulkDataSink }};
};
GetB: PROC [h: Handle] RETURNS [byte: BYTE] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.odd
THEN {
byte ← LOOPHOLE[dH.b[2*dH.hIndex - 1]];
dH.odd ← FALSE;
}
ELSE {
IF dH.hIndex >= dH.hLength THEN GetBuf[dH];
byte ← LOOPHOLE[dH.b[2*dH.hIndex]];
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← TRUE;
};
};
GetH: PROC [h: Handle] RETURNS [hWord: HWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.hIndex >= dH.hLength THEN GetBuf[dH, 2];
TRUSTED { hP: LONG POINTER TO HWords ~
LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]];
hWord ← hP.hWords[dH.hIndex] };
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← FALSE;
};
GetF: PROC [h: Handle] RETURNS [fWord: FWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF (dH.hIndex+1) >= dH.hLength
THEN {
temp: MACHINE DEPENDENT RECORD [a, b: HWORD];
temp.a ← GetH[h]; temp.b ← GetH[h];
TRUSTED { fWord ← LOOPHOLE[temp] }
}
ELSE {
TRUSTED {
PrincOpsUtils.LongCopy[
from~(LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]] + dH.hIndex),
to~@fWord,
nwords~2];
};
dH.hIndex ← dH.hIndex + 2;
dH.odd ← FALSE;
};
};
UnsafeGetBlock: UNSAFE PROC [h: Handle, block: IO.UnsafeBlock]
RETURNS [nBytesRead: INT ← 0] ~ {
dH: DataHandle ~ NARROW[h.data];
WHILE (dH.hIndex < dH.hLength) OR dH.odd DO
IF block.count = 0 THEN RETURN;
TRUSTED { block.base[block.startIndex] ← LOOPHOLE[GetB[h]] };
block.startIndex ← block.startIndex + 1;
block.count ← block.count - 1;
ENDLOOP;
TRUSTED {
nBytesRead ← nBytesRead + IO.UnsafeGetBlock[self~dH.stream, block~block] };
};
GetBulkDataSource: PROC [h: Handle] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
IF h.kind # server THEN ERROR CrRPC.Error[h, notServerHandle, "not server handle"];
typeCode ← CrRPC.GetCARDINAL[h];
SELECT typeCode FROM
ORD[CrRPCFriends.BulkDataDescriptorType.null] => {
bulkDataSource ← NIL };
ORD[CrRPCFriends.BulkDataDescriptorType.immediate] => TRUSTED {
bulkDataSource ← ServerRecvBulkData };
ORD[CrRPCFriends.BulkDataDescriptorType.active],
ORD[CrRPCFriends.BulkDataDescriptorType.passive] => {
ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] };
ENDCASE => {
ERROR CrRPC.Error[h, protocolError, "bad bulkDataSource type"] };
};
ServerRecvBulkData: CrRPC.BulkDataXferProc
[h: Handle, stream: IO.STREAM, checkAbort: BulkDataCheckAbortProc]
RETURNS [abort: BOOL]
~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ← RefText.ObtainScratch[bufSize];
nBytes: INT;
state: XNSStream.State;
ssType: XNSSPPTypes.SubSequenceType;
abort ← FALSE;
ServerRdyBDTRecv[h];
DO
SELECT nBytes ← IO.GetBlock[self~dH.stream, block~b] FROM
0 => {
[state~state, ssType~ssType] ← XNSStream.GetStatus[dH.stream, TRUE];
SELECT state FROM
attention => { abort ← TRUE; EXIT };
endOfMessage => { dH.inputFlushNeeded ← FALSE; EXIT };
ssTypeChange => SELECT ssType FROM
0 => ERROR CrRPC.Error[h, protocolError, "bdt no eom"];
XNSSPPTypes.endSST, XNSSPPTypes.endReplySST =>
ERROR CrRPC.Error[h, remoteClose, "remote close"];
ENDCASE => NULL; -- ???? is this an error ????
open => NULL;
ENDCASE => ERROR;
};
ENDCASE => {
IO.PutBlock[self~stream, block~b, count~nBytes];
};
IF checkAbort[h].abort THEN {
XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ????
EXIT };
ENDLOOP;
};
GetBulkDataSink: PROC [h: Handle] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
IF h.kind # server THEN ERROR CrRPC.Error[h, notServerHandle, "not server handle"];
typeCode ← CrRPC.GetCARDINAL[h];
SELECT typeCode FROM
ORD[CrRPCFriends.BulkDataDescriptorType.null] => {
bulkDataSink ← NIL };
ORD[CrRPCFriends.BulkDataDescriptorType.immediate] => TRUSTED {
bulkDataSink ← ServerSendBulkData };
ORD[CrRPCFriends.BulkDataDescriptorType.active],
ORD[CrRPCFriends.BulkDataDescriptorType.passive] => {
ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] };
ENDCASE => {
ERROR CrRPC.Error[h, protocolError, "bad bulkDataSink type"] };
};
ServerSendBulkData: CrRPC.BulkDataXferProc
[h: Handle, stream: IO.STREAM, checkAbort: BulkDataCheckAbortProc]
RETURNS [abort: BOOL]
~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ← RefText.ObtainScratch[bufSize];
nBytes: INT;
state: XNSStream.State;
ssType: XNSSPPTypes.SubSequenceType;
abort ← FALSE;
ServerRdyBDTSend[h];
XNSStream.SetSSType[self~dH.stream, ssType~1]; -- Define 1 ????
{
DO
Check for abort from other side
[state~state, ssType~ssType] ← XNSStream.GetStatus[dH.stream, FALSE];
SELECT state FROM
open => NULL;
attention, endOfMessage, ssTypeChange => {
abort ← TRUE;
GOTO FinishWithEOM };
ENDCASE => ERROR;
Check for abort from this side
IF checkAbort[h].abort THEN {
GOTO FinishWithAttention };
Copy a block of data
SELECT nBytes ← IO.GetBlock[self~stream, block~b] FROM
0 => {
GOTO FinishWithEOM };
ENDCASE => {
IO.PutBlock[self~dH.stream, block~b, count~nBytes];
};
ENDLOOP;
EXITS
FinishWithAttention =>
XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ????
FinishWithEOM =>
XNSStream.SendEndOfMessage[dH.stream];
};
XNSStream.SetSSType[self~dH.stream, ssType~0]; -- Define 0 ????
};
HAlign: PROC [h: Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
dH.odd ← FALSE };
Cache of Handles
We maintain a cache of recently used client handles with their transport intact. If a handle isn't used for awhile, it times out, the transport is closed and the handle destroyed.
cache: DataHandle ← NIL;
cachePulseOut: BasicTime.Pulses ~ BasicTime.MicrosecondsToPulses[60 * 1000000];
cacheSweepInterval: Process.Ticks ~ Process.SecondsToTicks[21];
PulsesSince: PROC [then: BasicTime.Pulses] RETURNS [BasicTime.Pulses] ~ {
RETURN [ BasicTime.GetClockPulses[] - then ] };
CheckOutFromCache: ENTRY PROC [remote: XNS.Address] RETURNS [h: Handle ← NIL] ~ {
dH: DataHandle ← cache;
prevH: DataHandle ← NIL;
WHILE dH # NIL DO
IF dH.remote = remote THEN {
IF prevH = NIL THEN cache ← dH.next ELSE prevH.next ← dH.next;
h ← dH.handle;
dH.handle ← NIL; -- for finalization
EXIT };
prevH ← dH; dH ← dH.next;
ENDLOOP;
};
CheckInToCache: ENTRY PROC [h: Handle] ~ {
dH: DataHandle ← NARROW[h.data];
dH.handle ← h;
dH.lastUsed ← BasicTime.GetClockPulses[];
dH.next ← cache;
cache ← dH };
CacheSweeper: PROC ~ {
GetDeadHandles: ENTRY PROC RETURNS [deadList: DataHandle ← NIL] ~ {
dH, newCache, newCacheTail: DataHandle ← NIL;
WHILE cache # NIL DO
dH ← cache; cache ← cache.next;
IF PulsesSince[dH.lastUsed] <= cachePulseOut
THEN -- add to new cache -- {
dH.next ← NIL;
IF newCacheTail = NIL THEN newCache ← dH ELSE newCacheTail.next ← dH;
newCacheTail ← dH }
ELSE -- add to dead list -- {
dH.next ← deadList;
deadList ← dH };
ENDLOOP;
cache ← newCache;
};
DestroyDeadHandles: PROC [deadList: DataHandle] ~ {
dH: DataHandle;
WHILE deadList # NIL DO
dH ← deadList; deadList ← dH.next; dH.next ← NIL;
FinishClientHandle[dH.handle];
ENDLOOP;
};
DO
DestroyDeadHandles[ GetDeadHandles[] ];
Process.Pause[ cacheSweepInterval ];
ENDLOOP;
};
Handle Creation and Destruction
SetRemote: PROC [h: Handle, remote: XNS.Address] RETURNS [Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
timeoutMsec: INT ← dH.timeoutMsec;
IF remote.socket = XNS.unknownSocket THEN remote.socket ← XNSWKS.courier;
remote.socket ← XNSWKS.courier;
IF dH.remote = remote THEN RETURN[h];
CheckInToCache[h];
h ← CreateClientHandle[remote, timeoutMsec];
RETURN [h] };
defaultTimeoutMsec: INT ~ 15000;
minTimeoutMsec: INT ~ 100;
SetTimeout: PROC [h: Handle, timeoutMsec: INT] RETURNS [Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
timeoutMsec ← IF timeoutMsec = 0
THEN defaultTimeoutMsec
ELSE MAX[timeoutMsec, minTimeoutMsec];
IF timeoutMsec = dH.timeoutMsec THEN RETURN[h];
IF dH.stream # NIL THEN {
XNSStream.SetTimeouts[dH.stream, timeoutMsec, timeoutMsec
! XNSStream.ConnectionClosed => { dH.stream ← NIL; CONTINUE } ];
};
dH.timeoutMsec ← timeoutMsec;
RETURN [h] };
SetHops: PROC [h: Handle, low, high: NAT] RETURNS [Handle] ~ {
ERROR CrRPC.Error[h, notImplemented, "not a broadcast handle"];
};
FinishClientHandle, Finalize: PROC [h: Handle] ~ {
In the Finalize case, a client has dropped the handle h, which is guaranteed not to be in the cache. Note this operation is idempotent.
dH: DataHandle ~ NARROW[h.data];
dH.handle ← NIL; -- break cycle for GC
SendCloseStream[dH];
};
CreateClientHandle: CrRPCFriends.CreateClientHandleProc
[remote: XNS.Address, timeoutMsec: INT] RETURNS [Handle]
~ {
handle: Handle;
dH: DataHandle;
b: REF TEXT;
remote.socket ← XNSWKS.courier;
handle ← CheckOutFromCache[remote];
IF handle = NIL THEN {
b ← RefText.New[nChars~bufferBodyBytes];
dH ← NEW[DataObject ← [
lastUsed~BasicTime.GetClockPulses[], remote~remote, timeoutMsec~timeoutMsec, b~b
]];
handle ← CrRPCFriends.NewClientObject[class~$SPP, marshallProcs~theMarshallProcs, procs~theClientProcs, data~dH];
};
IF SetTimeout[handle, timeoutMsec] # handle THEN ERROR;
RETURN [handle];
};
CrRPCFriends.RegisterCreateClientHandleProc[$SPP, CreateClientHandle];
CrRPCFriends.RegisterCreateListenerProc[$SPP, CreateListener];
TRUSTED { Process.Detach[ FORK CacheSweeper[] ] };
}.