CrRPCEImpl.mesa
Copyright © 1986 by Xerox Corporation. All rights reserved.
Demers, November 18, 1986 2:44:00 pm PST
Interim Courier client runtime support for expedited Courier (no streams).
This depends on the size ratio BYTE::HWORD::FWORD being 1::2::4.
TODO:
Make UnsafeGetBlock and UnsafePutBlock reasonably efficient.
Make retry timeout heuristic more reasonable?
Make a server implementation!
DIRECTORY
BasicTime USING [GetClockPulses],
CommBuffer USING [Overhead],
CrRPC USING [BulkDataSink, BulkDataSource, CallProc, ClientProcs, Error, Handle, invalidArgument, MarshallProcs, noSuchProcedure, noSuchProgram, noSuchVersion, Object],
CrRPCFriends USING [AbortHdr, abortHdrBytes, abortMsgType, CallHdr, callHdrBytes, callMsgType, courierVersionNum, CreateClientHandleProc, MsgHdr, msgHdrBytes, RegisterCreateClientHandleProc, RejectHdr, rejectHdrBytes, rejectMsgType, ReturnHdr, returnHdrBytes, returnMsgType],
Endian USING [BYTE, bytesPerHWord, CardFromF, CardFromH, FFromCard, FWORD, HFromCard, HWORD],
IO USING [UnsafeBlock],
PrincOpsUtils USING [LongCopy],
XNS USING [Address, broadcastHost, broadcastNet, Net, unknownSocket],
XNSBuf USING [Buffer, Hdr, maxBodyBytes],
XNSErrorTypes USING [invalidPacketTypeErr, protocolViolationErr],
XNSExchangeTypes USING [clearinghouseServiceType],
XNSExchangeBuf USING [Buffer, Hdr, hdrBytes],
XNSRouter USING [Enumerate, Hops, maxHops, RoutingTableEntry],
XNSSocket USING [AllocBuffer, Create, Destroy, dontWait, FreeBuffer, Get, GetRemoteAddress, GetUserHWords, Handle, ReturnError, SetGetTimeout, SetNoErrors, SetRemoteAddress, SetUserHWords],
XNSSocketBackdoor USING [FlushCache, PutCached];
CrRPCEImpl: CEDAR PROGRAM
IMPORTS BasicTime, CrRPC, CrRPCFriends, Endian, PrincOpsUtils, XNSRouter, XNSSocket, XNSSocketBackdoor
~ {
CARD: TYPE ~ LONG CARDINAL;
BYTE: TYPE ~ Endian.BYTE;
FWORD: TYPE ~ Endian.FWORD;
HWORD: TYPE ~ Endian.HWORD;
bytesPerWord: CARDINAL ~ Endian.bytesPerHWord;
FfC: PROC [c: CARD] RETURNS [FWORD] ~ INLINE {
RETURN [Endian.FFromCard[c]] };
HfC: PROC [c: CARDINAL] RETURNS [HWORD] ~ INLINE {
RETURN [Endian.HFromCard[c]] };
CfF: PROC [f: FWORD] RETURNS [CARD] ~ INLINE {
RETURN [Endian.CardFromF[f]] };
CfH: PROC [h: HWORD] RETURNS [CARDINAL] ~ INLINE {
RETURN [Endian.CardFromH[h]] };
Handle: TYPE ~ CrRPC.Handle;
DataHandle: TYPE ~ REF DataObject;
DataObject: TYPE ~ RECORD [
socket: XNSSocket.Handle,
timeoutMsec: INT,
exchangeID: LONG CARDINAL,
doExpandingBroadcast: BOOLFALSE,
lowHops, highHops: XNSRouter.Hops ← 0,
b: XNSBuf.Buffer ← NIL,
hIndex: CARDINAL ← 0, -- index in HWORDs of next data, (in XNS (IDP) packet body)
hLength: CARDINAL ← 0, -- length in HWORDs of buffer (XNS (IDP) packet body)
odd: BOOLFALSE -- TRUE on odd bytes, for GetB / putB
];
theMarshallProcs: REF CrRPC.MarshallProcs
~ NEW[ CrRPC.MarshallProcs ← [
putB~PutB, putH~PutH, putF~PutF, unsafePutBlock~UnsafePutBlock, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, putHAlign~HAlign,
getB~GetB, getH~GetH, getF~GetF, unsafeGetBlock~UnsafeGetBlock, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, getHAlign~HAlign
]
];
theClientProcs: REF CrRPC.ClientProcs
~ NEW[ CrRPC.ClientProcs ← [
setRemote~SetRemote,
setTimeout~SetTimeout,
setHops~SetHops,
destroy~DestroyClient,
call~Call
]
];
Courier Messages
Header that begins an expedited-Courier exchange.
ECHdr: TYPE ~ MACHINE DEPENDENT RECORD [
lowVersion: HWORD,
highVersion: HWORD
];
ecHdrBytes: CARDINAL ~ SIZE[ECHdr]*bytesPerWord;
Network call buffer (without program-specific body).
ECCallBuffer: TYPE ~ REF ECCallBufferObject;
ECCallBufferObject: TYPE ~ MACHINE DEPENDENT RECORD [
ovh: CommBuffer.Overhead,
hdr1: XNSBuf.Hdr,
hdr2: XNSExchangeBuf.Hdr,
ecHdr: ECHdr,
msgHdr: CrRPCFriends.MsgHdr,
callHdr: CrRPCFriends.CallHdr
];
ecCallBufferBytes: CARDINAL ~ SIZE[ECCallBufferObject]*bytesPerWord;
Reject buffer (without program-specific body).
ECRejectBuffer: TYPE ~ REF ECRejectBufferObject;
ECRejectBufferObject: TYPE ~ MACHINE DEPENDENT RECORD [
ovh: CommBuffer.Overhead,
hdr1: XNSBuf.Hdr,
hdr2: XNSExchangeBuf.Hdr,
ecHdr: ECHdr,
msgHdr: CrRPCFriends.MsgHdr,
rejectHdr: CrRPCFriends.RejectHdr
];
ecRejectBufferBytes: CARDINAL ~ SIZE[ECRejectBufferObject]*bytesPerWord;
Return buffer (without program-specific body).
ECReturnBuffer: TYPE ~ REF ECReturnBufferObject;
ECReturnBufferObject: TYPE ~ MACHINE DEPENDENT RECORD [
ovh: CommBuffer.Overhead,
hdr1: XNSBuf.Hdr,
hdr2: XNSExchangeBuf.Hdr,
ecHdr: ECHdr,
msgHdr: CrRPCFriends.MsgHdr,
returnHdr: CrRPCFriends.ReturnHdr
];
ecReturnBufferBytes: CARDINAL ~ SIZE[ECReturnBufferObject]*bytesPerWord;
Abort buffer (without program-specific body).
ECAbortBuffer: TYPE ~ REF ECAbortBufferObject;
ECAbortBufferObject: TYPE ~ MACHINE DEPENDENT RECORD [
ovh: CommBuffer.Overhead,
hdr1: XNSBuf.Hdr,
hdr2: XNSExchangeBuf.Hdr,
ecHdr: ECHdr,
msgHdr: CrRPCFriends.MsgHdr,
abortHdr: CrRPCFriends.AbortHdr
];
ecAbortBufferBytes: CARDINAL ~ SIZE[ECAbortBufferObject]*bytesPerWord;
Bytes of overhead in EC packet = bytes of EC header included in XNS (IDP) body:
ecOverheadBytes: CARDINAL ~ XNSExchangeBuf.hdrBytes + ecHdrBytes + CrRPCFriends.msgHdrBytes;
Initial hIndex values
ecCallInitialHIndex: CARDINAL ~ (ecOverheadBytes + CrRPCFriends.callHdrBytes) / bytesPerWord;
ecRejectInitialHIndex: CARDINAL ~ (ecOverheadBytes + CrRPCFriends.rejectHdrBytes) / bytesPerWord;
ecReturnInitialHIndex: CARDINAL ~ (ecOverheadBytes + CrRPCFriends.returnHdrBytes) / bytesPerWord;
ecAbortInitialHIndex: CARDINAL ~ (ecOverheadBytes + CrRPCFriends.abortHdrBytes) / bytesPerWord;
Call: CrRPC.CallProc
[h: Handle, remotePgm: CARD, remotePgmVersion: CARDINAL, remoteProc: CARDINAL, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]
~ {
dH: DataHandle ~ NARROW[h.data];
callBuf: XNSBuf.Buffer ← NIL;
CleanUp: PROC ~ {
IF dH.b # NIL THEN { XNSSocket.FreeBuffer[dH.b]; dH.b ← NIL };
IF callBuf # NIL THEN { XNSSocket.FreeBuffer[callBuf]; callBuf ← NIL };
};
FlushReceiveQueue: PROC ~ {
XNSSocket.SetGetTimeout[dH.socket, XNSSocket.dontWait];
DO
b: XNSBuf.Buffer ~ XNSSocket.Get[dH.socket];
IF b = NIL THEN EXIT;
XNSSocket.FreeBuffer[b];
ENDLOOP;
XNSSocket.SetGetTimeout[dH.socket, dH.timeoutMsec];
};
FillCallBuffer: PROC ~ {
cB: ECCallBuffer;
dH.b ← XNSSocket.AllocBuffer[dH.socket];
TRUSTED { cB ← LOOPHOLE[dH.b] };
cB.hdr1.type ← exchange;
{ dH.exchangeID ← dH.exchangeID.SUCC;
cB.hdr2 ← [id~FfC[dH.exchangeID], type~XNSExchangeTypes.clearinghouseServiceType] };
{ v: HWORD ~ HfC[CrRPCFriends.courierVersionNum];
cB.ecHdr ← [lowVersion~v, highVersion~v] };
{ cB.msgHdr ← [msgType~HfC[CrRPCFriends.callMsgType]] };
{ cB.callHdr ← [
tID~HfC[0], pgmNum~FfC[remotePgm], pgmVersion~HfC[remotePgmVersion], procNum~HfC[remoteProc]] };
dH.hIndex ← ecCallInitialHIndex;
dH.hLength ← XNSBuf.maxBodyBytes / bytesPerWord;
putArgs[h];
XNSSocket.SetUserHWords[dH.b, dH.hIndex]; -- Yes, hIndex not hLength!
callBuf ← dH.b; dH.b ← NIL;
};
GetReply: PROC [ignoreErrors: BOOL] RETURNS [b: XNSBuf.Buffer] ~ {
DO
b ← XNSSocket.Get[dH.socket];
IF b = NIL THEN RETURN; -- timeout
SELECT b.hdr1.type FROM
error => IF ignoreErrors
THEN {
XNSSocket.FreeBuffer[b];
LOOP }
ELSE {
XNSSocket.FreeBuffer[b];
LOOP }; -- FIX THIS LATER ???? if ERROR remember to free b! ????
exchange => {
eB: XNSExchangeBuf.Buffer;
TRUSTED { eB ← LOOPHOLE[b] };
CHECK THE CLIENT TYPE HERE AS WELL ????
IF (CfF[eB.hdr2.id] # dH.exchangeID) THEN {
XNSSocket.ReturnError[b, XNSErrorTypes.protocolViolationErr];
LOOP };
RETURN };
ENDCASE => {
XNSSocket.ReturnError[b, XNSErrorTypes.invalidPacketTypeErr];
LOOP };
ENDLOOP;
};
DoBroadcastCall: PROC [net: XNS.Net, rte: XNSRouter.RoutingTableEntry] ~ {
remoteAddress: XNS.Address ← XNSSocket.GetRemoteAddress[dH.socket];
remoteAddress.net ← net;
remoteAddress.host ← XNS.broadcastHost;
XNSSocket.SetRemoteAddress[dH.socket, remoteAddress];
XNSSocketBackdoor.PutCached[callBuf];
DO
dH.b ← GetReply[ignoreErrors~TRUE];
IF dH.b = NIL THEN EXIT;
ProcessResult[ignoreRejects~TRUE];
ENDLOOP;
};
DoDirectedCall: PROC ~ {
retries: CARDINAL ~ 8;
THROUGH [1..retries] DO
XNSSocketBackdoor.FlushCache[dH.socket];
XNSSocketBackdoor.PutCached[callBuf];
dH.b ← GetReply[ignoreErrors~FALSE];
IF dH.b # NIL THEN EXIT;
ENDLOOP;
IF dH.b = NIL THEN
ERROR CrRPC.Error[h, timeout, "no response"];
ProcessResult[ignoreRejects~FALSE];
};
ProcessResult: PROC [ignoreRejects: BOOL] ~ {
mType: CARDINAL;
dH.hLength ← XNSSocket.GetUserHWords[dH.b];
TRUSTED { mType ← CfH[LOOPHOLE[dH.b,ECReturnBuffer].msgHdr.msgType] };
SELECT mType FROM
CrRPCFriends.rejectMsgType => {
rejectReason: CARDINAL;
IF ignoreRejects THEN RETURN;
TRUSTED { rejectReason ← CfH[LOOPHOLE[dH.b, ECRejectBuffer].rejectHdr.rejectReason] };
dH.hIndex ← ecRejectInitialHIndex;
ERROR CrRPC.Error[h,
(SELECT rejectReason FROM
CrRPC.noSuchProgram => rejectedNoSuchProgram,
CrRPC.noSuchVersion => rejectedNoSuchVersion,
CrRPC.noSuchProcedure => rejectedNoSuchProcedure,
CrRPC.invalidArgument => rejectedInvalidArgument,
ENDCASE => rejectedUnspecified),
"rejected"];
};
CrRPCFriends.returnMsgType => {
dH.hIndex ← ecReturnInitialHIndex;
getResults[h];
RETURN };
CrRPCFriends.abortMsgType => {
dH.hIndex ← ecAbortInitialHIndex;
IF getError # NIL THEN TRUSTED {
getError[h, CfH[(LOOPHOLE[dH.b, ECAbortBuffer]).abortHdr.errNum]]; -- will probably raise a pgm-specific ERROR
};
ERROR CrRPC.Error[h, remoteError, "unexpected remote error"];
};
ENDCASE => {
ERROR CrRPC.Error[h, unknown, "protocol error in response"];
};
};
BEGIN
ENABLE UNWIND => CleanUp[];
FlushReceiveQueue[];
FillCallBuffer[];
IF dH.doExpandingBroadcast
THEN XNSRouter.Enumerate[low~dH.lowHops, high~dH.highHops,
proc~DoBroadcastCall]
ELSE DoDirectedCall[];
CleanUp[];
END;
};
PutB: PROC [h: Handle, byte: BYTE] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.odd
THEN {
TRUSTED { dH.b.body.bytes[2*dH.hIndex - 1] ← byte };
dH.odd ← FALSE;
}
ELSE {
IF dH.hIndex >= dH.hLength THEN
ERROR CrRPC.Error[h, argsTooLong, "args too long"];
TRUSTED { dH.b.body.bytes[2*dH.hIndex] ← byte };
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← TRUE;
};
};
PutH: PROC [h: Handle, hWord: HWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.hIndex >= dH.hLength THEN
ERROR CrRPC.Error[h, argsTooLong, "args too long"];
TRUSTED { dH.b.body.hWords[dH.hIndex] ← hWord };
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← FALSE;
};
PutF: PROC [h: Handle, fWord: FWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF (dH.hIndex+1) >= dH.hLength THEN
ERROR CrRPC.Error[h, argsTooLong, "args too long"];
TRUSTED {
PrincOpsUtils.LongCopy[
from~@fWord,
to~@dH.b.body.hWords + dH.hIndex,
nwords~2];
};
dH.hIndex ← dH.hIndex + 2;
dH.odd ← FALSE
};
UnsafePutBlock: UNSAFE PROC [h: Handle, block: IO.UnsafeBlock] ~ {
FOR i: INT IN [block.startIndex .. block.startIndex+block.count) DO
TRUSTED { PutB[h, LOOPHOLE[block.base[i]]] };
ENDLOOP;
};
PutBulkDataSource: PROC [h: Handle, descriptor: CrRPC.BulkDataSource] ~ {
ERROR CrRPC.Error[h, notImplemented, "bulk data not implemented"] };
PutBulkDataSink: PROC [h: Handle, descriptor: CrRPC.BulkDataSink] ~ {
ERROR CrRPC.Error[h, notImplemented, "bulk data not implemented"] };
GetB: PROC [h: Handle] RETURNS [byte: BYTE] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.odd
THEN {
TRUSTED { byte ← dH.b.body.bytes[2*dH.hIndex - 1] };
dH.odd ← FALSE;
}
ELSE {
IF dH.hIndex >= dH.hLength THEN
ERROR CrRPC.Error[h, resultsTooShort, "results too short"];
TRUSTED { byte ← dH.b.body.bytes[2*dH.hIndex] };
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← TRUE;
};
};
GetH: PROC [h: Handle] RETURNS [hWord: HWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF dH.hIndex >= dH.hLength THEN
ERROR CrRPC.Error[h, resultsTooShort, "results too short"];
TRUSTED { hWord ← dH.b.body.hWords[dH.hIndex] };
dH.hIndex ← dH.hIndex.SUCC;
dH.odd ← FALSE;
};
GetF: PROC [h: Handle] RETURNS [fWord: FWORD] ~ {
dH: DataHandle ~ NARROW[h.data];
IF (dH.hIndex+1) >= dH.hLength THEN
ERROR CrRPC.Error[h, resultsTooShort, "results too short"];
TRUSTED {
PrincOpsUtils.LongCopy[
from~@dH.b.body.hWords + dH.hIndex,
to~@fWord,
nwords~2];
};
dH.hIndex ← dH.hIndex + 2;
dH.odd ← FALSE
};
UnsafeGetBlock: UNSAFE PROC [h: Handle, block: IO.UnsafeBlock]
RETURNS [nBytes: INT ← 0] ~ {
FOR i: INT IN [block.startIndex .. block.startIndex+block.count) DO
TRUSTED { block.base[i] ← LOOPHOLE[GetB[h]] };
ENDLOOP;
};
GetBulkDataSource: PROC [h: Handle] RETURNS [CrRPC.BulkDataSource] ~ {
ERROR CrRPC.Error[h, notImplemented, "bulk data not implemented"] };
GetBulkDataSink: PROC [h: Handle] RETURNS [CrRPC.BulkDataSink] ~ {
ERROR CrRPC.Error[h, notImplemented, "bulk data not implemented"] };
HAlign: PROC [h: Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
dH.odd ← FALSE };
SetRemote: PROC [h: Handle, remote: XNS.Address] RETURNS [Handle] ~ {
BEWARE: if this is changed to return something other than h, Create[...] will have to be changed as well.
dH: DataHandle ~ NARROW[h.data];
IF remote.socket = XNS.unknownSocket
THEN ERROR CrRPC.Error[h, notImplemented, "no well-known socket"];
dH.doExpandingBroadcast ← (remote.net = XNS.broadcastNet);
dH.lowHops ← dH.highHops ← 0;
XNSSocket.SetRemoteAddress[dH.socket, remote];
RETURN [h] };
defaultTimeoutMsec: INT ~ 200;
minTimeoutMsec: INT ~ 50;
SetTimeout: PROC [h: Handle, timeoutMsec: INT] RETURNS [Handle] ~ {
BEWARE: if this is changed to return something other than h, Create[...] will have to be changed as well.
dH: DataHandle ~ NARROW[h.data];
dH.timeoutMsec ← IF timeoutMsec = 0
THEN defaultTimeoutMsec
ELSE MAX[timeoutMsec, minTimeoutMsec];
RETURN [h] };
SetHops: PROC[h: Handle, low, high: NAT] RETURNS [Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
IF NOT dH.doExpandingBroadcast THEN
ERROR CrRPC.Error[h, notImplemented, "not a broadcast handle"];
dH.lowHops ← MIN[low, XNSRouter.maxHops];
dH.highHops ← MIN[high, XNSRouter.maxHops];
RETURN [h] };
DestroyClient: PROC [h: Handle] ~ {
dH: DataHandle ~ NARROW[h.data];
XNSSocket.Destroy[dH.socket];
};
CreateClient: CrRPCFriends.CreateClientHandleProc
[remote: XNS.Address, timeoutMsec: INT] RETURNS[Handle]
~ {
handle: Handle ← NIL;
socket: XNSSocket.Handle ← NIL;
dH: DataHandle ← NIL;
socket ← XNSSocket.Create[sendBuffers~1, recvBuffers~5];
XNSSocket.SetNoErrors[socket];
dH ← NEW[DataObject ← [
socket~socket, timeoutMsec~, exchangeID~BasicTime.GetClockPulses[]
]];
handle ← NEW[CrRPC.Object ← [class~$EXCHANGE, kind~client, marshallProcs~theMarshallProcs, procs~theClientProcs, data~dH]];
IF SetRemote[handle, remote] # handle THEN ERROR;
IF SetTimeout[handle, timeoutMsec] # handle THEN ERROR;
RETURN [handle];
};
CrRPCFriends.RegisterCreateClientHandleProc[$EXCHANGE, CreateClient];
}.