DIRECTORY
BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses],
CrRPC USING [BeginErrorProc, BeginRejectProc, BeginReturnProc, BulkDataCheckAbortProc, BulkDataSink, BulkDataSource, BulkDataXferProc, CallProc, ClientProcs, Error, ErrorReason, GetCARDINAL, Handle, invalidArgument, MarshallProcs, noSuchProcedure, noSuchProgram, noSuchVersion, Object, PutCARDINAL, ServerProc, ServerProcs],
CrRPCFriends USING [AbortHdr, abortMsgType, BulkDataDescriptorType, CallHdr, --callHdrBytes,-- callMsgType, courierVersionNum, CreateClientHandleProc, CreateListenerProc, LookUpServerProc, MsgHdr, --msgHdrBytes,-- NewClientObject, RegisterCreateClientHandleProc, RegisterCreateListenerProc, RejectHdr, rejectMsgType, ReturnHdr, returnMsgType, SessionHdr, sessionHdrBytes],
Endian USING [BYTE, bytesPerHWord, CardFromF, CardFromH, FFromCard, FWORD, HFromCard, HWORD],
IO USING [CharsAvail, Close, EndOfStream, GetBlock, PutBlock, STREAM, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock],
PrincOpsUtils USING [LongCopy],
Process USING [Detach, Pause, SecondsToTicks, Ticks],
RefText USING [New, ObtainScratch],
Rope USING [ROPE],
XNS USING [Address, Socket, unknownSocket],
XNSSPPBuf USING [maxBodyBytes, maxBodyHWords],
XNSSPPTypes USING [endReplySST, endSST, SubSequenceType],
XNSStream USING [ConnectionClosed, Create, CreateListener, FlushInput, GetStatus, Listener, ListenerProc, Milliseconds, SendAttention, SendClose, SendCloseReply, SendEndOfMessage, SendNow, SetSSType, SetTimeouts, State, SubSequenceType, Timeout],
XNSWKS USING [courier];
CrRPCSImpl:
CEDAR
MONITOR
IMPORTS BasicTime, CrRPC, CrRPCFriends, Endian, IO, PrincOpsUtils, Process, RefText, XNSStream
~ {
Copied Types and Constants
CARD: TYPE ~ LONG CARDINAL;
BYTE: TYPE ~ Endian.BYTE;
FWORD: TYPE ~ Endian.FWORD;
HWORD: TYPE ~ Endian.HWORD;
bytesPerWord: CARDINAL ~ Endian.bytesPerHWord;
FfC:
PROC [c:
CARD]
RETURNS [
FWORD] ~
INLINE {
RETURN [Endian.FFromCard[c]] };
HfC:
PROC [c:
CARDINAL]
RETURNS [
HWORD] ~
INLINE {
RETURN [Endian.HFromCard[c]] };
CfF:
PROC [f:
FWORD]
RETURNS [
CARD] ~
INLINE {
RETURN [Endian.CardFromF[f]] };
CfH:
PROC [h:
HWORD]
RETURNS [
CARDINAL] ~
INLINE {
RETURN [Endian.CardFromH[h]] };
Handle: TYPE ~ CrRPC.Handle;
bufferBodyBytes: CARDINAL ~ XNSSPPBuf.maxBodyBytes;
bufferBodyHWords: CARDINAL ~ XNSSPPBuf.maxBodyHWords;
Handle Objects
HWords:
TYPE ~
MACHINE
DEPENDENT
RECORD [
hWords: ARRAY [0 .. bufferBodyHWords) OF HWORD
];
DataHandle: TYPE ~ REF DataObject;
DataObject:
TYPE ~
RECORD [
next: DataHandle ← NIL,
handle: Handle ← NIL,
lastUsed: BasicTime.Pulses,
stream: IO.STREAM ← NIL,
inputFlushNeeded: BOOL ← FALSE,
remote: XNS.Address,
timeoutMsec: INT,
sendBulkData: CrRPC.BulkDataXferProc ← NIL, -- for client handles
recvBulkData: CrRPC.BulkDataXferProc ← NIL, -- for client handles
b: REF TEXT ← NIL,
hIndex: CARDINAL ← 0, -- index in HWORDs of next data, (in XNS (IDP) packet body)
hLength: CARDINAL ← 0, -- length in HWORDs of buffer (XNS (IDP) packet body)
odd: BOOL ← FALSE -- TRUE on odd bytes, for GetB / putB
];
theMarshallProcs:
REF CrRPC.MarshallProcs
~
NEW[ CrRPC.MarshallProcs ← [
putB~PutB, putH~PutH, putF~PutF, unsafePutBlock~UnsafePutBlock, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, putHAlign~HAlign,
getB~GetB, getH~GetH, getF~GetF, unsafeGetBlock~UnsafeGetBlock, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, getHAlign~HAlign
]
];
theClientProcs:
REF CrRPC.ClientProcs
~
NEW[ CrRPC.ClientProcs ← [
setRemote~SetRemote,
setTimeout~SetTimeout,
setHops~SetHops,
destroy~CheckInToCache,
finalize~Finalize,
call~Call
]
];
theServerProcs:
REF CrRPC.ServerProcs
~ NEW[ CrRPC.ServerProcs ← [getRemote~GetRemote] ];
Courier Messages
Call message (without program-specific body).
CallMsg: TYPE ~ REF CallMsgObject;
CallMsgObject:
TYPE ~
MACHINE
DEPENDENT
RECORD [
msgHdr: CrRPCFriends.MsgHdr,
callHdr: CrRPCFriends.CallHdr
];
callMsgBytes: CARDINAL ~ SIZE[CallMsgObject]*bytesPerWord; -- should be BYTES
callMsgHWords: CARDINAL ~ SIZE[CallMsgObject];
Reject message (without program-specific body).
RejectMsg: TYPE ~ REF RejectMsgObject;
RejectMsgObject:
TYPE ~
MACHINE
DEPENDENT
RECORD [
msgHdr: CrRPCFriends.MsgHdr,
rejectHdr: CrRPCFriends.RejectHdr
];
rejectMsgBytes: CARDINAL ~ SIZE[RejectMsgObject]*bytesPerWord; -- should be BYTES
rejectMsgHWords: CARDINAL ~ SIZE[RejectMsgObject];
Return message (without program-specific body).
ReturnMsg: TYPE ~ REF ReturnMsgObject;
ReturnMsgObject:
TYPE ~
MACHINE
DEPENDENT
RECORD [
msgHdr: CrRPCFriends.MsgHdr,
returnHdr: CrRPCFriends.ReturnHdr
];
returnMsgBytes: CARDINAL ~ SIZE[ReturnMsgObject]*bytesPerWord; -- should be BYTES
returnMsgHWords: CARDINAL ~ SIZE[ReturnMsgObject];
Abort message (without program-specific body).
AbortMsg: TYPE ~ REF AbortMsgObject;
AbortMsgObject:
TYPE ~
MACHINE
DEPENDENT
RECORD [
msgHdr: CrRPCFriends.MsgHdr,
abortHdr: CrRPCFriends.AbortHdr
];
abortMsgBytes: CARDINAL ~ SIZE[AbortMsgObject]*bytesPerWord; -- should be BYTES
abortMsgHWords: CARDINAL ~ SIZE[AbortMsgObject];
Client Call
Call: CrRPC.CallProc
[h: Handle, remotePgm: CARD, remotePgmVersion: CARDINAL, remoteProc: CARDINAL, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]
~ {
dH: DataHandle ~ NARROW[h.data];
ConnectToServer:
PROC ~ {
Connect to the remote server. If the connect fails, dH.stream will be left NIL.
The version-number pair is deliberately set to include more than one version so the server will be forced to return his version number pair immediately.
ERRORS: XNSStream.ConnectionClosed, XNSStream.Timeout, CrRPC.Error.
vLow: HWORD ~ HfC[CrRPCFriends.courierVersionNum - 1];
vHigh: HWORD ~ HfC[CrRPCFriends.courierVersionNum];
nRead: NAT;
versionError: BOOL;
sP: LONG POINTER TO CrRPCFriends.SessionHdr;
IF dH.stream # NIL THEN ERROR;
dH.stream ← XNSStream.Create[
remote~dH.remote,
getTimeout~dH.timeoutMsec, putTimeout~dH.timeoutMsec];
TRUSTED {
sP ← LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]];
sP^ ← [lowVersion~vLow, highVersion~vHigh] };
dH.b.length ← CrRPCFriends.sessionHdrBytes;
[] ← IO.PutBlock[dH.stream, dH.b, 0, CrRPCFriends.sessionHdrBytes];
XNSStream.SendNow[dH.stream];
nRead ← IO.GetBlock[dH.stream, dH.b, 0, CrRPCFriends.sessionHdrBytes];
dH.inputFlushNeeded ← FALSE; -- ???? Is there EOM on version number pair ????
TRUSTED { versionError ← (nRead # CrRPCFriends.sessionHdrBytes)
OR (CfH[sP.lowVersion] > vHigh) OR (CfH[sP.highVersion] < vHigh) };
IF versionError
THEN {
SendCloseStream[dH];
ERROR CrRPC.Error[h, courierVersionMismatch, "version exchange error"];
};
};
SendCallMsg:
PROC ~ {
Send the call message.
ERRORS: XNSStream.ConnectionClosed, XNSStream.Timeout
TRUSTED {
cP: LONG POINTER TO CallMsgObject ~
LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]];
cP^ ← [
msgHdr~[msgType~CrRPCFriends.callMsgType],
callHdr~[
tID~HfC[0],
pgmNum~FfC[remotePgm],
pgmVersion~HfC[remotePgmVersion],
procNum~HfC[remoteProc]
]
];
};
dH.hIndex ← callMsgHWords;
dH.hLength ← bufferBodyHWords;
IF putArgs # NIL THEN putArgs[h];
PutBuf[dH];
XNSStream.SendEndOfMessage[dH.stream];
};
ReadResultMsg:
PROC ~ {
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
ENABLE
IO.EndOfStream => {
ERROR CrRPC.Error[h, resultsTooShort, "results too short"] };
msgType: CARDINAL;
GetBuf[dH];
msgType ← CrRPC.GetCARDINAL[h];
SELECT msgType
FROM
CrRPCFriends.rejectMsgType => {
rejectReason: CARDINAL;
[] ← GetH[h];
rejectReason ← CrRPC.GetCARDINAL[h];
[] ← AdvanceInput[h~h, wait~FALSE];
ERROR CrRPC.Error[h,
(
SELECT rejectReason
FROM
CrRPC.noSuchProgram => rejectedNoSuchProgram,
CrRPC.noSuchVersion => rejectedNoSuchVersion,
CrRPC.noSuchProcedure => rejectedNoSuchProcedure,
CrRPC.invalidArgument => rejectedInvalidArgument,
ENDCASE => rejectedUnspecified),
"rejected"];
};
CrRPCFriends.returnMsgType => {
[] ← GetH[h];
IF getResults # NIL THEN getResults[h];
[] ← AdvanceInput[h~h, wait~FALSE];
};
CrRPCFriends.abortMsgType => {
errNum: CARDINAL;
[] ← GetH[h];
errNum ← CrRPC.GetCARDINAL[h];
IF getError # NIL THEN getError[h, errNum];
[] ← AdvanceInput[h~h, wait~FALSE];
ERROR CrRPC.Error[h, remoteError, "unexpected remote error"] };
ENDCASE => {
SendCloseStream[dH];
ERROR CrRPC.Error[h, protocolError, "bad msgType in response"] };
};
ClientRdySysRecv:
PROC ~ {
Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE.
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
DO
[state, sst] ← AdvanceInput[h~h, wait~TRUE];
IF state = open THEN EXIT;
[] ← XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"];
dH.inputFlushNeeded ← TRUE;
};
ClientRdyBDTSend:
PROC
RETURNS [ok:
BOOL] ~ {
Prepare to send bulk data — (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case nothing arrives on the stream. If data arrives, it's a reject message from the other end, and the bulk data transfer should not be attempted. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately).
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ← AdvanceInput[h~h, wait~FALSE];
SELECT state
FROM
open => {
ok ← (IO.CharsAvail[self~dH.stream, wait~FALSE] = 0) };
attention => {
ok ← TRUE };
ENDCASE => ERROR;
};
ClientRdyBDTRecv:
PROC
RETURNS [ok:
BOOL] ~ {
Prepare to receive bulk data — (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case bulk data arrives on the stream.
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ← AdvanceInput[h~h, wait~TRUE];
SELECT state
FROM
open, endOfMessage => {
ok ← (sst # 0);
dH.inputFlushNeeded ← ok };
attention => {
ok ← TRUE };
ENDCASE => ERROR;
};
CheckAbortRecv: CrRPC.BulkDataCheckAbortProc ~ {
[] ← IO.CharsAvail[self~dH.stream, wait~TRUE];
RETURN [ XNSStream.GetStatus[self~dH.stream, reset~FALSE].state = attention ] };
CheckAbortSend: CrRPC.BulkDataCheckAbortProc ~ {
RETURN[ (XNSStream.GetStatus[self~dH.stream, reset~FALSE].state # open)
OR (IO.CharsAvail[self~dH.stream, wait~FALSE] > 0) ] };
Logic of Courier call begins here:
BEGIN
dH.sendBulkData ← NIL;
dH.recvBulkData ← NIL;
BEGIN
IF dH.stream #
NIL
THEN {
ENABLE XNSStream.ConnectionClosed, XNSStream.Timeout, CrRPC.Error => {
IF dH.stream # NIL THEN { IO.Close[dH.stream]; dH.stream ← NIL };
CONTINUE };
[] ← AdvanceInput[h~h, wait~FALSE]; -- check for close
SendCallMsg[];
GOTO Done };
{
-- dH.stream = NIL --
ENABLE XNSStream.ConnectionClosed, XNSStream.Timeout => {
IF dH.stream # NIL THEN { IO.Close[dH.stream]; dH.stream ← NIL };
ERROR CrRPC.Error[h, communicationFailure, "communication failure"] };
ConnectToServer[];
SendCallMsg[];
GOTO Done };
END;
At this point we have an open connection to the server and a call message has been sent successfully.
BEGIN
ENABLE {
XNSStream.ConnectionClosed, XNSStream.Timeout => {
IF dH.stream = NIL THEN ERROR; -- can't happen
IO.Close[dH.stream]; dH.stream ← NIL;
ERROR CrRPC.Error[h, communicationFailure, "communication failure"] };
};
SELECT
TRUE
FROM
(dH.sendBulkData #
NIL) => {
Bulk data transfer from client to server.
IF ClientRdyBDTSend[].ok
THEN {
XNSStream.SetSSType[dH.stream, 1]; -- ???? Define 1 ????
IF dH.sendBulkData[h, dH.stream, CheckAbortSend].abort
THEN XNSStream.SendAttention[dH.stream, 1] -- ???? Define 1 ????
ELSE XNSStream.SendEndOfMessage[dH.stream];
XNSStream.SetSSType[dH.stream, 0]; -- ???? Define 0 ????
};
};
(dH.recvBulkData #
NIL) => {
Bulk data transfer from server to client.
IF ClientRdyBDTRecv[].ok
THEN {
There's bulk data (or an attention) in the stream.
IF dH.recvBulkData[h, dH.stream, CheckAbortRecv].abort
THEN XNSStream.SendAttention[dH.stream, 1]; -- ???? Define 1 ????
};
};
ENDCASE => {
No bulk data transfer in either direction.
NULL;
};
ClientRdySysRecv[];
ReadResultMsg[];
[] ← AdvanceInput[h~h, wait~FALSE]; -- done by ReadResultMsg
END;
}; -- of Call
Server Process
serverGetTimeout: XNSStream.Milliseconds ~ 90000;
serverPutTimeout: XNSStream.Milliseconds ~ 20000;
ServerRdySysRecv:
PROC [h: Handle] ~ {
Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE.
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
DO
[state, sst] ← AdvanceInput[h~h, wait~TRUE];
IF state = open THEN EXIT;
[] ← XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"];
dH.inputFlushNeeded ← TRUE;
};
ServerRdyBDTRecv:
PROC [h: Handle] ~ {
Prepare to receive bulk data — (try to) check whether the other side has sent us anything besides bulk data — closed the stream, etc. In the normal case bulk data arrives on the stream.
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ← AdvanceInput[h~h, wait~TRUE];
SELECT state
FROM
open, endOfMessage => {
IF sst = 0 THEN ERROR CrRPC.Error[h, protocolError, "missing bulk data"];
dH.inputFlushNeeded ← TRUE };
ENDCASE => ERROR;
};
ServerRdyBDTSend:
PROC [h: Handle] ~ {
Prepare to send bulk data — (try to) check whether the other side has closed the stream or aborted the transfer. In the normal case nothing arrives on the stream. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately).
ERRORS: CrRPC.Error, XNSStream.Timeout, XNSStream.ConnectionClosed
dH: DataHandle ~ NARROW[h.data];
state: XNSStream.State;
sst: XNSStream.SubSequenceType;
[state, sst] ← AdvanceInput[h~h, wait~FALSE];
SELECT state
FROM
open => {
IF
IO.CharsAvail[self~dH.stream, wait~
FALSE] # 0
THEN
ERROR CrRPC.Error[h, protocolError, "client send to bdt source"];
};
ENDCASE => ERROR;
};
ResetBuffer: PROC [dH: DataHandle] ~ INLINE { dH.hIndex ← dH.hLength ← 0 };
BeginReturn: CrRPC.BeginReturnProc
[h: Handle]
~ {
dH: DataHandle ~ NARROW[h.data];
ResetBuffer[dH];
PutH[h, HfC[CrRPCFriends.returnMsgType]]; -- MsgHdr.msgType
PutH[h, HfC[0]]; -- ReturnHdr.tID
};
BeginError: CrRPC.BeginErrorProc
[h: Handle, errNum: CARDINAL]
~{
dH: DataHandle ~ NARROW[h.data];
ResetBuffer[dH];
PutH[h, HfC[CrRPCFriends.abortMsgType]]; -- MsgHdr.msgType
PutH[h, HfC[0]]; -- AbortHdr.tID
PutH[h, HfC[errNum]]; -- AbortHdr.errNum
};
BeginReject: CrRPC.BeginRejectProc
[h: Handle, rejectReason: CARDINAL]
~ {
dH: DataHandle ~ NARROW[h.data];
ResetBuffer[dH];
PutH[h, HfC[CrRPCFriends.rejectMsgType]]; -- MsgHdr.msgType
PutH[h, HfC[0]]; -- RejectHdr.tID
PutH[h, HfC[rejectReason]]; -- RejectHdr.rejectReason
};
ListenerProc: XNSStream.ListenerProc
[stream: IO.STREAM, remote: XNS.Address]
~ {
sessionHdr: CrRPCFriends.SessionHdr;
callMsgObject: CallMsgObject;
rejectMsgObject: RejectMsgObject;
desiredPgm: CARD;
desiredPgmVersion: CARDINAL;
desiredProc: CARDINAL;
serverProc: CrRPC.ServerProc;
nBytes: INT;
h: Handle;
dH: DataHandle;
b: REF TEXT;
BEGIN
ENABLE XNSStream.ConnectionClosed, CrRPC.Error, XNSStream.Timeout => GOTO Out;
Exchange version numbers
TRUSTED {
nBytes ← IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCFriends.sessionHdrBytes]
! XNSStream.Timeout => {[] ← XNSStream.SendClose[stream]; GOTO Out }]};
IF nBytes # CrRPCFriends.sessionHdrBytes THEN GOTO Out;
IF (CfH[sessionHdr.lowVersion] > CrRPCFriends.courierVersionNum)
OR (CfH[sessionHdr.highVersion] < CrRPCFriends.courierVersionNum)
THEN {
[] ← XNSStream.SendClose[stream]; GOTO Out };
sessionHdr.lowVersion ← sessionHdr.highVersion ← HfC[CrRPCFriends.courierVersionNum];
TRUSTED { IO.UnsafePutBlock[stream,
[base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCFriends.sessionHdrBytes]] };
XNSStream.SendNow[stream];
Create server handle
b ← RefText.New[nChars~bufferBodyBytes];
dH ←
NEW[DataObject ← [
next~NIL, handle~NIL, lastUsed~, stream~stream, remote~remote, timeoutMsec~, b~b
]];
h ← NEW[CrRPC.Object ← [class~$SPP, kind~server, marshallProcs~theMarshallProcs, procs~theServerProcs, data~dH, clientData~NIL]];
Process calls until timeout or unrecoverable error
DO
Get message; make sure it's a call, get desired program, version and proc number
ServerRdySysRecv[h];
TRUSTED { nBytes ←
IO.UnsafeGetBlock[stream,
[base~
LOOPHOLE[
LONG[@callMsgObject]], count~callMsgBytes]
! XNSStream.Timeout => {[] ← XNSStream.SendClose[stream]; GOTO Out}]};
IF (nBytes # callMsgBytes)
OR (CfH[callMsgObject.msgHdr.msgType] # CrRPCFriends.callMsgType)
THEN {
[] ← XNSStream.SendClose[stream]; GOTO Out };
desiredPgm ← CfF[callMsgObject.callHdr.pgmNum];
desiredPgmVersion ← CfH[callMsgObject.callHdr.pgmVersion];
desiredProc ← CfH[callMsgObject.callHdr.procNum];
serverProc ← CrRPCFriends.LookUpServerProc[desiredPgm, desiredPgmVersion];
SELECT serverProc
FROM
NIL =>
-- Don't have server, reject the call -- {
rejectMsgObject.msgHdr.msgType ← HfC[CrRPCFriends.rejectMsgType];
rejectMsgObject.rejectHdr.tID ← HfC[0];
rejectMsgObject.rejectHdr.rejectReason ← HfC[CrRPC.noSuchProgram];
TRUSTED { IO.UnsafePutBlock[stream,
[base~LOOPHOLE[LONG[@rejectMsgObject]], count~rejectMsgBytes]] };
};
ENDCASE =>
-- Serve the call -- {
ResetBuffer[dH];
serverProc[h, desiredPgm, desiredPgmVersion, desiredProc, BeginReturn, BeginError, BeginReject];
IF dH.stream = NIL THEN GOTO Closed; -- client caught error
PutBuf[dH];
};
XNSStream.SendEndOfMessage[stream];
ENDLOOP;
END;
EXITS
Out => IO.Close[stream];
Closed => NULL;
};
ListenerValue: TYPE ~ REF ListenerValueObject;
ListenerValueObject:
TYPE ~
RECORD [
next: ListenerValue,
socket: XNS.Socket,
listener: XNSStream.Listener
];
listeners: ListenerValue ←
NIL;
CreateListener:
ENTRY CrRPCFriends.CreateListenerProc
[socket: XNS.Socket]
~ {
newListener: XNSStream.Listener;
IF socket = XNS.unknownSocket THEN socket ← XNSWKS.courier;
FOR p: ListenerValue ← listeners, p.next
WHILE p #
NIL
DO
IF p.socket = socket THEN RETURN;
ENDLOOP;
newListener ← XNSStream.CreateListener[socket~socket, worker~ListenerProc,
getTimeout~serverGetTimeout, putTimeout~serverPutTimeout];
listeners ← NEW[ListenerValueObject ←
[next~listeners, socket~socket, listener~newListener]];
};
GetRemote:
PROC [h: Handle]
RETURNS [
XNS.Address] ~ {
dH: DataHandle ~ NARROW[h.data];
RETURN [dH.remote] };
I/O and Marshalling
AdvanceInput:
PROC [h: Handle, wait:
BOOL ←
FALSE]
RETURNS [state: XNSStream.State, ssType: XNSStream.SubSequenceType] ~ {
Advance input stream to next significant piece of input, checking for remote close and flushing unread input if needed.
ERRORS: XNSStream.ConnectionClosed, XNSStream.Timeout, CrRPC.Error
dH: DataHandle ~ NARROW[h.data];
sawBreak: BOOL ← FALSE;
IF dH.inputFlushNeeded
THEN {
[] ← XNSStream.FlushInput[self~dH.stream, wait~TRUE];
dH.inputFlushNeeded ← FALSE };
DO
[state~state, ssType~ssType] ← XNSStream.GetStatus[self~dH.stream, reset~FALSE];
IF (ssType = XNSSPPTypes.endSST)
OR (ssType = XNSSPPTypes.endReplySST)
THEN {
ReplyCloseStream[dH];
ERROR CrRPC.Error[h, protocolError, "remote close"] };
SELECT state
FROM
open => {
IF NOT wait THEN RETURN;
IF IO.CharsAvail[self~dH.stream, wait~TRUE] > 0 THEN RETURN;
LOOP };
attention => RETURN;
endOfMessage => {
IF sawBreak THEN RETURN;
sawBreak ← TRUE };
ssTypeChange => {
sawBreak ← TRUE };
ENDCASE => ERROR;
[] ← XNSStream.GetStatus[self~dH.stream, reset~TRUE];
ENDLOOP;
};
SendCloseStream:
PROC [dH: DataHandle] ~ {
IF dH.stream #
NIL
THEN {
[] ← XNSStream.SendClose[dH.stream
! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE];
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ← NIL };
};
ReplyCloseStream:
PROC [dH: DataHandle] ~ {
IF dH.stream #
NIL
THEN {
[] ← XNSStream.SendCloseReply[dH.stream
! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE];
IO.Close[self~dH.stream, abort~TRUE];
dH.stream ← NIL };
};
PutBuf:
PROC [dH: DataHandle] ~ {
dH.b.length ← 2*dH.hIndex;
IO.PutBlock[self~dH.stream, block~dH.b];
dH.hIndex ← 0 };
GetBuf:
PROC [dH: DataHandle, minBytes:
NAT ← 2] ~ {
nRead: NAT ~ IO.GetBlock[self~dH.stream, block~dH.b];
IF nRead < minBytes
THEN {
ERROR IO.EndOfStream[dH.stream] };
dH.hLength ← nRead / 2;
dH.hIndex ← 0;
};
PutB:
PROC [h: Handle, byte:
BYTE] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.odd
THEN {
dH.b[2*dH.hIndex - 1] ← LOOPHOLE[byte];
dH.odd ← FALSE;
}
ELSE {
IF dH.hIndex >= dH.hLength THEN PutBuf[dH];
dH.b[2*dH.hIndex] ← LOOPHOLE[byte];
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← TRUE;
};
};
PutH:
PROC [h: Handle, hWord:
HWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.hIndex >= dH.hLength THEN PutBuf[dH];
TRUSTED { hP:
LONG
POINTER
TO HWords ~
LOOPHOLE[
LOOPHOLE[dH.b,
LONG
POINTER] +
SIZE[
TEXT[0]]];
hP.hWords[dH.hIndex] ← hWord };
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← FALSE;
};
PutF:
PROC [h: Handle, fWord:
FWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF (dH.hIndex+1) >= dH.hLength THEN PutBuf[dH];
TRUSTED {
PrincOpsUtils.LongCopy[
from~@fWord,
to~(LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]] + dH.hIndex),
nwords~2];
};
dH.hIndex ← dH.hIndex + 2;
dH.odd ← FALSE
};
UnsafePutBlock:
UNSAFE PROC [h: Handle, block:
IO.UnsafeBlock] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.hIndex > 0 THEN PutBuf[dH];
TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~block] };
};
PutBulkDataSource:
PROC [h: Handle, bulkDataSource: CrRPC.BulkDataSource] ~ {
dH: DataHandle ~ NARROW[h.data];
IF h.kind # client THEN ERROR CrRPC.Error[h, notClientHandle, "not client handle"];
IF bulkDataSource =
NIL
THEN {
CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.immediate]];
TRUSTED { dH.sendBulkData ← bulkDataSource }};
};
PutBulkDataSink:
PROC [h: Handle, bulkDataSink: CrRPC.BulkDataSink] ~ {
dH: DataHandle ~ NARROW[h.data];
IF h.kind # client THEN ERROR CrRPC.Error[h, notClientHandle, "not client handle"];
IF bulkDataSink =
NIL
THEN {
CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.null]] }
ELSE {
CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.immediate]];
TRUSTED { dH.recvBulkData ← bulkDataSink }};
};
GetB:
PROC [h: Handle]
RETURNS [byte:
BYTE] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.odd
THEN {
byte ← LOOPHOLE[dH.b[2*dH.hIndex - 1]];
dH.odd ← FALSE;
}
ELSE {
IF dH.hIndex >= dH.hLength THEN GetBuf[dH];
byte ← LOOPHOLE[dH.b[2*dH.hIndex]];
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← TRUE;
};
};
GetH:
PROC [h: Handle]
RETURNS [hWord:
HWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.hIndex >= dH.hLength THEN GetBuf[dH, 2];
TRUSTED { hP:
LONG
POINTER
TO HWords ~
LOOPHOLE[
LOOPHOLE[dH.b,
LONG
POINTER] +
SIZE[
TEXT[0]]];
hWord ← hP.hWords[dH.hIndex] };
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← FALSE;
};
GetF:
PROC [h: Handle]
RETURNS [fWord:
FWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF (dH.hIndex+1) >= dH.hLength
THEN {
temp: MACHINE DEPENDENT RECORD [a, b: HWORD];
temp.a ← GetH[h]; temp.b ← GetH[h];
TRUSTED { fWord ← LOOPHOLE[temp] }
}
ELSE {
TRUSTED {
PrincOpsUtils.LongCopy[
from~(LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]] + dH.hIndex),
to~@fWord,
nwords~2];
};
dH.hIndex ← dH.hIndex + 2;
dH.odd ← FALSE;
};
};
UnsafeGetBlock:
UNSAFE
PROC [h: Handle, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT ← 0] ~ {
dH: DataHandle ~ NARROW[h.data];
WHILE (dH.hIndex < dH.hLength)
OR dH.odd
DO
IF block.count = 0 THEN RETURN;
TRUSTED { block.base[block.startIndex] ← LOOPHOLE[GetB[h]] };
block.startIndex ← block.startIndex + 1;
block.count ← block.count - 1;
ENDLOOP;
TRUSTED {
nBytesRead ← nBytesRead + IO.UnsafeGetBlock[self~dH.stream, block~block] };
};
GetBulkDataSource:
PROC [h: Handle]
RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
IF h.kind # server THEN ERROR CrRPC.Error[h, notServerHandle, "not server handle"];
typeCode ← CrRPC.GetCARDINAL[h];
SELECT typeCode
FROM
ORD[CrRPCFriends.BulkDataDescriptorType.null] => {
bulkDataSource ← NIL };
ORD[CrRPCFriends.BulkDataDescriptorType.immediate] =>
TRUSTED {
bulkDataSource ← ServerRecvBulkData };
ORD[CrRPCFriends.BulkDataDescriptorType.active],
ORD[CrRPCFriends.BulkDataDescriptorType.passive] => {
ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] };
ENDCASE => {
ERROR CrRPC.Error[h, protocolError, "bad bulkDataSource type"] };
};
ServerRecvBulkData: CrRPC.BulkDataXferProc
[h: Handle, stream: IO.STREAM, checkAbort: BulkDataCheckAbortProc]
RETURNS [abort: BOOL]
~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ← RefText.ObtainScratch[bufSize];
nBytes: INT;
state: XNSStream.State;
ssType: XNSSPPTypes.SubSequenceType;
abort ← FALSE;
ServerRdyBDTRecv[h];
DO
SELECT nBytes ←
IO.GetBlock[self~dH.stream, block~b]
FROM
0 => {
[state~state, ssType~ssType] ← XNSStream.GetStatus[dH.stream, TRUE];
SELECT state
FROM
attention => { abort ← TRUE; EXIT };
endOfMessage => { dH.inputFlushNeeded ← FALSE; EXIT };
ssTypeChange =>
SELECT ssType
FROM
0 => ERROR CrRPC.Error[h, protocolError, "bdt no eom"];
XNSSPPTypes.endSST, XNSSPPTypes.endReplySST =>
ERROR CrRPC.Error[h, remoteClose, "remote close"];
ENDCASE => NULL; -- ???? is this an error ????
open => NULL;
ENDCASE => ERROR;
};
ENDCASE => {
IO.PutBlock[self~stream, block~b, count~nBytes];
};
IF checkAbort[h].abort
THEN {
XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ????
EXIT };
ENDLOOP;
};
GetBulkDataSink:
PROC [h: Handle]
RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ {
dH: DataHandle ~ NARROW[h.data];
typeCode: CARDINAL;
IF h.kind # server THEN ERROR CrRPC.Error[h, notServerHandle, "not server handle"];
typeCode ← CrRPC.GetCARDINAL[h];
SELECT typeCode
FROM
ORD[CrRPCFriends.BulkDataDescriptorType.null] => {
bulkDataSink ← NIL };
ORD[CrRPCFriends.BulkDataDescriptorType.immediate] =>
TRUSTED {
bulkDataSink ← ServerSendBulkData };
ORD[CrRPCFriends.BulkDataDescriptorType.active],
ORD[CrRPCFriends.BulkDataDescriptorType.passive] => {
ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] };
ENDCASE => {
ERROR CrRPC.Error[h, protocolError, "bad bulkDataSink type"] };
};
ServerSendBulkData: CrRPC.BulkDataXferProc
[h: Handle, stream: IO.STREAM, checkAbort: BulkDataCheckAbortProc]
RETURNS [abort: BOOL]
~ {
dH: DataHandle ~ NARROW[h.data];
bufSize: NAT ~ 1024;
b: REF TEXT ← RefText.ObtainScratch[bufSize];
nBytes: INT;
state: XNSStream.State;
ssType: XNSSPPTypes.SubSequenceType;
abort ← FALSE;
ServerRdyBDTSend[h];
XNSStream.SetSSType[self~dH.stream, ssType~1]; -- Define 1 ????
{
DO
Check for abort from other side
[state~state, ssType~ssType] ← XNSStream.GetStatus[dH.stream, FALSE];
SELECT state
FROM
open => NULL;
attention, endOfMessage, ssTypeChange => {
abort ← TRUE;
GOTO FinishWithEOM };
ENDCASE => ERROR;
Check for abort from this side
IF checkAbort[h].abort
THEN {
GOTO FinishWithAttention };
Copy a block of data
SELECT nBytes ←
IO.GetBlock[self~stream, block~b]
FROM
0 => {
GOTO FinishWithEOM };
ENDCASE => {
IO.PutBlock[self~dH.stream, block~b, count~nBytes];
};
ENDLOOP;
EXITS
FinishWithAttention =>
XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ????
FinishWithEOM =>
XNSStream.SendEndOfMessage[dH.stream];
};
XNSStream.SetSSType[self~dH.stream, ssType~0]; -- Define 0 ????
};
HAlign:
PROC [h: Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
dH.odd ← FALSE };
Cache of Handles
We maintain a cache of recently used client handles with their transport intact. If a handle isn't used for awhile, it times out, the transport is closed and the handle destroyed.
cache: DataHandle ← NIL;
cachePulseOut: BasicTime.Pulses ~ BasicTime.MicrosecondsToPulses[60 * 1000000];
cacheSweepInterval: Process.Ticks ~ Process.SecondsToTicks[21];
PulsesSince:
PROC [then: BasicTime.Pulses]
RETURNS [BasicTime.Pulses] ~ {
RETURN [ BasicTime.GetClockPulses[] - then ] };
CheckOutFromCache:
ENTRY
PROC [remote:
XNS.Address]
RETURNS [h: Handle ←
NIL] ~ {
dH: DataHandle ← cache;
prevH: DataHandle ← NIL;
WHILE dH #
NIL
DO
IF dH.remote = remote
THEN {
IF prevH = NIL THEN cache ← dH.next ELSE prevH.next ← dH.next;
h ← dH.handle;
dH.handle ← NIL; -- for finalization
EXIT };
prevH ← dH; dH ← dH.next;
ENDLOOP;
};
CheckInToCache:
ENTRY
PROC [h: Handle] ~ {
dH: DataHandle ← NARROW[h.data];
dH.handle ← h;
dH.lastUsed ← BasicTime.GetClockPulses[];
dH.next ← cache;
cache ← dH };
CacheSweeper:
PROC ~ {
GetDeadHandles:
ENTRY
PROC
RETURNS [deadList: DataHandle ←
NIL] ~ {
dH, newCache, newCacheTail: DataHandle ← NIL;
WHILE cache #
NIL
DO
dH ← cache; cache ← cache.next;
IF PulsesSince[dH.lastUsed] <= cachePulseOut
THEN
-- add to new cache -- {
dH.next ← NIL;
IF newCacheTail = NIL THEN newCache ← dH ELSE newCacheTail.next ← dH;
newCacheTail ← dH }
ELSE
-- add to dead list -- {
dH.next ← deadList;
deadList ← dH };
ENDLOOP;
cache ← newCache;
};
DestroyDeadHandles:
PROC [deadList: DataHandle] ~ {
dH: DataHandle;
WHILE deadList #
NIL
DO
dH ← deadList; deadList ← dH.next; dH.next ← NIL;
FinishClientHandle[dH.handle];
ENDLOOP;
};
DO
DestroyDeadHandles[ GetDeadHandles[] ];
Process.Pause[ cacheSweepInterval ];
ENDLOOP;
};
Handle Creation and Destruction
SetRemote:
PROC [h: Handle, remote:
XNS.Address]
RETURNS [Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
timeoutMsec: INT ← dH.timeoutMsec;
IF remote.socket = XNS.unknownSocket THEN remote.socket ← XNSWKS.courier;
remote.socket ← XNSWKS.courier;
IF dH.remote = remote THEN RETURN[h];
CheckInToCache[h];
h ← CreateClientHandle[remote, timeoutMsec];
RETURN [h] };
defaultTimeoutMsec: INT ~ 15000;
minTimeoutMsec: INT ~ 100;
SetTimeout:
PROC [h: Handle, timeoutMsec:
INT]
RETURNS [Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
timeoutMsec ←
IF timeoutMsec = 0
THEN defaultTimeoutMsec
ELSE MAX[timeoutMsec, minTimeoutMsec];
IF timeoutMsec = dH.timeoutMsec THEN RETURN[h];
IF dH.stream #
NIL
THEN {
XNSStream.SetTimeouts[dH.stream, timeoutMsec, timeoutMsec
! XNSStream.ConnectionClosed => { dH.stream ← NIL; CONTINUE } ];
};
dH.timeoutMsec ← timeoutMsec;
RETURN [h] };
SetHops:
PROC [h: Handle, low, high:
NAT]
RETURNS [Handle] ~ {
ERROR CrRPC.Error[h, notImplemented, "not a broadcast handle"];
};
FinishClientHandle, Finalize:
PROC [h: Handle] ~ {
In the Finalize case, a client has dropped the handle h, which is guaranteed not to be in the cache. Note this operation is idempotent.
dH: DataHandle ~ NARROW[h.data];
dH.handle ← NIL; -- break cycle for GC
SendCloseStream[dH];
};
CreateClientHandle: CrRPCFriends.CreateClientHandleProc
[remote: XNS.Address, timeoutMsec: INT] RETURNS [Handle]
~ {
handle: Handle;
dH: DataHandle;
b: REF TEXT;
remote.socket ← XNSWKS.courier;
handle ← CheckOutFromCache[remote];
IF handle =
NIL
THEN {
b ← RefText.New[nChars~bufferBodyBytes];
dH ←
NEW[DataObject ← [
lastUsed~BasicTime.GetClockPulses[], remote~remote, timeoutMsec~timeoutMsec, b~b
]];
handle ← CrRPCFriends.NewClientObject[class~$SPP, marshallProcs~theMarshallProcs, procs~theClientProcs, data~dH];
};
IF SetTimeout[handle, timeoutMsec] # handle THEN ERROR;
RETURN [handle];
};
CrRPCFriends.RegisterCreateClientHandleProc[$SPP, CreateClientHandle];
CrRPCFriends.RegisterCreateListenerProc[$SPP, CreateListener];
TRUSTED { Process.Detach[ FORK CacheSweeper[] ] };
}.