<> <> <> <<>> <> <> <<>> <> <> <> <> DIRECTORY BasicTime USING [GetClockPulses], CommBuffer USING [Overhead], CrRPC USING [BulkDataSink, BulkDataSource, CallProc, ClientProcs, Error, Handle, invalidArgument, MarshallProcs, noSuchProcedure, noSuchProgram, noSuchVersion, Object], CrRPCFriends USING [AbortHdr, abortHdrBytes, abortMsgType, CallHdr, callHdrBytes, callMsgType, courierVersionNum, CreateClientHandleProc, MsgHdr, msgHdrBytes, RegisterCreateClientHandleProc, RejectHdr, rejectHdrBytes, rejectMsgType, ReturnHdr, returnHdrBytes, returnMsgType], Endian USING [BYTE, bytesPerHWord, CardFromF, CardFromH, FFromCard, FWORD, HFromCard, HWORD], IO USING [UnsafeBlock], PrincOpsUtils USING [LongCopy], XNS USING [Address, broadcastHost, broadcastNet, Net, unknownSocket], XNSBuf USING [Buffer, Hdr, maxBodyBytes], XNSErrorTypes USING [invalidPacketTypeErr, protocolViolationErr], XNSExchangeTypes USING [clearinghouseServiceType], XNSExchangeBuf USING [Buffer, Hdr, hdrBytes], XNSRouter USING [Enumerate, Hops, maxHops, RoutingTableEntry], XNSSocket USING [AllocBuffer, Create, Destroy, dontWait, FreeBuffer, Get, GetRemoteAddress, GetUserHWords, Handle, ReturnError, SetGetTimeout, SetNoErrors, SetRemoteAddress, SetUserHWords], XNSSocketBackdoor USING [FlushCache, PutCached]; CrRPCEImpl: CEDAR PROGRAM IMPORTS BasicTime, CrRPC, CrRPCFriends, Endian, PrincOpsUtils, XNSRouter, XNSSocket, XNSSocketBackdoor ~ { CARD: TYPE ~ LONG CARDINAL; BYTE: TYPE ~ Endian.BYTE; FWORD: TYPE ~ Endian.FWORD; HWORD: TYPE ~ Endian.HWORD; bytesPerWord: CARDINAL ~ Endian.bytesPerHWord; FfC: PROC [c: CARD] RETURNS [FWORD] ~ INLINE { RETURN [Endian.FFromCard[c]] }; HfC: PROC [c: CARDINAL] RETURNS [HWORD] ~ INLINE { RETURN [Endian.HFromCard[c]] }; CfF: PROC [f: FWORD] RETURNS [CARD] ~ INLINE { RETURN [Endian.CardFromF[f]] }; CfH: PROC [h: HWORD] RETURNS [CARDINAL] ~ INLINE { RETURN [Endian.CardFromH[h]] }; Handle: TYPE ~ CrRPC.Handle; DataHandle: TYPE ~ REF DataObject; DataObject: TYPE ~ RECORD [ socket: XNSSocket.Handle, timeoutMsec: INT, exchangeID: LONG CARDINAL, doExpandingBroadcast: BOOL _ FALSE, lowHops, highHops: XNSRouter.Hops _ 0, b: XNSBuf.Buffer _ NIL, hIndex: CARDINAL _ 0, -- index in HWORDs of next data, (in XNS (IDP) packet body) hLength: CARDINAL _ 0, -- length in HWORDs of buffer (XNS (IDP) packet body) odd: BOOL _ FALSE -- TRUE on odd bytes, for GetB / putB ]; theMarshallProcs: REF CrRPC.MarshallProcs ~ NEW[ CrRPC.MarshallProcs _ [ putB~PutB, putH~PutH, putF~PutF, unsafePutBlock~UnsafePutBlock, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, putHAlign~HAlign, getB~GetB, getH~GetH, getF~GetF, unsafeGetBlock~UnsafeGetBlock, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, getHAlign~HAlign ] ]; theClientProcs: REF CrRPC.ClientProcs ~ NEW[ CrRPC.ClientProcs _ [ setRemote~SetRemote, setTimeout~SetTimeout, setHops~SetHops, destroy~DestroyClient, call~Call ] ]; <> <
> ECHdr: TYPE ~ MACHINE DEPENDENT RECORD [ lowVersion: HWORD, highVersion: HWORD ]; ecHdrBytes: CARDINAL ~ SIZE[ECHdr]*bytesPerWord; <> ECCallBuffer: TYPE ~ REF ECCallBufferObject; ECCallBufferObject: TYPE ~ MACHINE DEPENDENT RECORD [ ovh: CommBuffer.Overhead, hdr1: XNSBuf.Hdr, hdr2: XNSExchangeBuf.Hdr, ecHdr: ECHdr, msgHdr: CrRPCFriends.MsgHdr, callHdr: CrRPCFriends.CallHdr ]; ecCallBufferBytes: CARDINAL ~ SIZE[ECCallBufferObject]*bytesPerWord; <> ECRejectBuffer: TYPE ~ REF ECRejectBufferObject; ECRejectBufferObject: TYPE ~ MACHINE DEPENDENT RECORD [ ovh: CommBuffer.Overhead, hdr1: XNSBuf.Hdr, hdr2: XNSExchangeBuf.Hdr, ecHdr: ECHdr, msgHdr: CrRPCFriends.MsgHdr, rejectHdr: CrRPCFriends.RejectHdr ]; ecRejectBufferBytes: CARDINAL ~ SIZE[ECRejectBufferObject]*bytesPerWord; <> ECReturnBuffer: TYPE ~ REF ECReturnBufferObject; ECReturnBufferObject: TYPE ~ MACHINE DEPENDENT RECORD [ ovh: CommBuffer.Overhead, hdr1: XNSBuf.Hdr, hdr2: XNSExchangeBuf.Hdr, ecHdr: ECHdr, msgHdr: CrRPCFriends.MsgHdr, returnHdr: CrRPCFriends.ReturnHdr ]; ecReturnBufferBytes: CARDINAL ~ SIZE[ECReturnBufferObject]*bytesPerWord; <> ECAbortBuffer: TYPE ~ REF ECAbortBufferObject; ECAbortBufferObject: TYPE ~ MACHINE DEPENDENT RECORD [ ovh: CommBuffer.Overhead, hdr1: XNSBuf.Hdr, hdr2: XNSExchangeBuf.Hdr, ecHdr: ECHdr, msgHdr: CrRPCFriends.MsgHdr, abortHdr: CrRPCFriends.AbortHdr ]; ecAbortBufferBytes: CARDINAL ~ SIZE[ECAbortBufferObject]*bytesPerWord; <> ecOverheadBytes: CARDINAL ~ XNSExchangeBuf.hdrBytes + ecHdrBytes + CrRPCFriends.msgHdrBytes; <<>> <> ecCallInitialHIndex: CARDINAL ~ (ecOverheadBytes + CrRPCFriends.callHdrBytes) / bytesPerWord; ecRejectInitialHIndex: CARDINAL ~ (ecOverheadBytes + CrRPCFriends.rejectHdrBytes) / bytesPerWord; ecReturnInitialHIndex: CARDINAL ~ (ecOverheadBytes + CrRPCFriends.returnHdrBytes) / bytesPerWord; ecAbortInitialHIndex: CARDINAL ~ (ecOverheadBytes + CrRPCFriends.abortHdrBytes) / bytesPerWord; Call: CrRPC.CallProc <<[h: Handle, remotePgm: CARD, remotePgmVersion: CARDINAL, remoteProc: CARDINAL, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]>> ~ { dH: DataHandle ~ NARROW[h.data]; callBuf: XNSBuf.Buffer _ NIL; CleanUp: PROC ~ { IF dH.b # NIL THEN { XNSSocket.FreeBuffer[dH.b]; dH.b _ NIL }; IF callBuf # NIL THEN { XNSSocket.FreeBuffer[callBuf]; callBuf _ NIL }; }; FlushReceiveQueue: PROC ~ { XNSSocket.SetGetTimeout[dH.socket, XNSSocket.dontWait]; DO b: XNSBuf.Buffer ~ XNSSocket.Get[dH.socket]; IF b = NIL THEN EXIT; XNSSocket.FreeBuffer[b]; ENDLOOP; XNSSocket.SetGetTimeout[dH.socket, dH.timeoutMsec]; }; FillCallBuffer: PROC ~ { cB: ECCallBuffer; dH.b _ XNSSocket.AllocBuffer[dH.socket]; TRUSTED { cB _ LOOPHOLE[dH.b] }; cB.hdr1.type _ exchange; { dH.exchangeID _ dH.exchangeID.SUCC; cB.hdr2 _ [id~FfC[dH.exchangeID], type~XNSExchangeTypes.clearinghouseServiceType] }; { v: HWORD ~ HfC[CrRPCFriends.courierVersionNum]; cB.ecHdr _ [lowVersion~v, highVersion~v] }; { cB.msgHdr _ [msgType~HfC[CrRPCFriends.callMsgType]] }; { cB.callHdr _ [ tID~HfC[0], pgmNum~FfC[remotePgm], pgmVersion~HfC[remotePgmVersion], procNum~HfC[remoteProc]] }; dH.hIndex _ ecCallInitialHIndex; dH.hLength _ XNSBuf.maxBodyBytes / bytesPerWord; putArgs[h]; XNSSocket.SetUserHWords[dH.b, dH.hIndex]; -- Yes, hIndex not hLength! callBuf _ dH.b; dH.b _ NIL; }; GetReply: PROC [ignoreErrors: BOOL] RETURNS [b: XNSBuf.Buffer] ~ { DO b _ XNSSocket.Get[dH.socket]; IF b = NIL THEN RETURN; -- timeout SELECT b.hdr1.type FROM error => IF ignoreErrors THEN { XNSSocket.FreeBuffer[b]; LOOP } ELSE { XNSSocket.FreeBuffer[b]; LOOP }; -- FIX THIS LATER ???? if ERROR remember to free b! ???? exchange => { eB: XNSExchangeBuf.Buffer; TRUSTED { eB _ LOOPHOLE[b] }; <> IF (CfF[eB.hdr2.id] # dH.exchangeID) THEN { XNSSocket.ReturnError[b, XNSErrorTypes.protocolViolationErr]; LOOP }; RETURN }; ENDCASE => { XNSSocket.ReturnError[b, XNSErrorTypes.invalidPacketTypeErr]; LOOP }; ENDLOOP; }; DoBroadcastCall: PROC [net: XNS.Net, rte: XNSRouter.RoutingTableEntry] ~ { remoteAddress: XNS.Address _ XNSSocket.GetRemoteAddress[dH.socket]; remoteAddress.net _ net; remoteAddress.host _ XNS.broadcastHost; XNSSocket.SetRemoteAddress[dH.socket, remoteAddress]; XNSSocketBackdoor.PutCached[callBuf]; DO dH.b _ GetReply[ignoreErrors~TRUE]; IF dH.b = NIL THEN EXIT; ProcessResult[ignoreRejects~TRUE]; ENDLOOP; }; DoDirectedCall: PROC ~ { retries: CARDINAL ~ 8; THROUGH [1..retries] DO XNSSocketBackdoor.FlushCache[dH.socket]; XNSSocketBackdoor.PutCached[callBuf]; dH.b _ GetReply[ignoreErrors~FALSE]; IF dH.b # NIL THEN EXIT; ENDLOOP; IF dH.b = NIL THEN ERROR CrRPC.Error[h, timeout, "no response"]; ProcessResult[ignoreRejects~FALSE]; }; ProcessResult: PROC [ignoreRejects: BOOL] ~ { mType: CARDINAL; dH.hLength _ XNSSocket.GetUserHWords[dH.b]; TRUSTED { mType _ CfH[LOOPHOLE[dH.b,ECReturnBuffer].msgHdr.msgType] }; SELECT mType FROM CrRPCFriends.rejectMsgType => { rejectReason: CARDINAL; IF ignoreRejects THEN RETURN; TRUSTED { rejectReason _ CfH[LOOPHOLE[dH.b, ECRejectBuffer].rejectHdr.rejectReason] }; dH.hIndex _ ecRejectInitialHIndex; ERROR CrRPC.Error[h, (SELECT rejectReason FROM CrRPC.noSuchProgram => rejectedNoSuchProgram, CrRPC.noSuchVersion => rejectedNoSuchVersion, CrRPC.noSuchProcedure => rejectedNoSuchProcedure, CrRPC.invalidArgument => rejectedInvalidArgument, ENDCASE => rejectedUnspecified), "rejected"]; }; CrRPCFriends.returnMsgType => { dH.hIndex _ ecReturnInitialHIndex; getResults[h]; RETURN }; CrRPCFriends.abortMsgType => { dH.hIndex _ ecAbortInitialHIndex; IF getError # NIL THEN TRUSTED { getError[h, CfH[(LOOPHOLE[dH.b, ECAbortBuffer]).abortHdr.errNum]]; -- will probably raise a pgm-specific ERROR }; ERROR CrRPC.Error[h, remoteError, "unexpected remote error"]; }; ENDCASE => { ERROR CrRPC.Error[h, unknown, "protocol error in response"]; }; }; BEGIN ENABLE UNWIND => CleanUp[]; FlushReceiveQueue[]; FillCallBuffer[]; IF dH.doExpandingBroadcast THEN XNSRouter.Enumerate[low~dH.lowHops, high~dH.highHops, proc~DoBroadcastCall] ELSE DoDirectedCall[]; CleanUp[]; END; }; PutB: PROC [h: Handle, byte: BYTE] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.odd THEN { TRUSTED { dH.b.body.bytes[2*dH.hIndex - 1] _ byte }; dH.odd _ FALSE; } ELSE { IF dH.hIndex >= dH.hLength THEN ERROR CrRPC.Error[h, argsTooLong, "args too long"]; TRUSTED { dH.b.body.bytes[2*dH.hIndex] _ byte }; dH.hIndex _ dH.hIndex.SUCC; dH.odd _ TRUE; }; }; PutH: PROC [h: Handle, hWord: HWORD] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.hIndex >= dH.hLength THEN ERROR CrRPC.Error[h, argsTooLong, "args too long"]; TRUSTED { dH.b.body.hWords[dH.hIndex] _ hWord }; dH.hIndex _ dH.hIndex.SUCC; dH.odd _ FALSE; }; PutF: PROC [h: Handle, fWord: FWORD] ~ { dH: DataHandle ~ NARROW[h.data]; IF (dH.hIndex+1) >= dH.hLength THEN ERROR CrRPC.Error[h, argsTooLong, "args too long"]; TRUSTED { PrincOpsUtils.LongCopy[ from~@fWord, to~@dH.b.body.hWords + dH.hIndex, nwords~2]; }; dH.hIndex _ dH.hIndex + 2; dH.odd _ FALSE }; UnsafePutBlock: UNSAFE PROC [h: Handle, block: IO.UnsafeBlock] ~ { FOR i: INT IN [block.startIndex .. block.startIndex+block.count) DO TRUSTED { PutB[h, LOOPHOLE[block.base[i]]] }; ENDLOOP; }; PutBulkDataSource: PROC [h: Handle, descriptor: CrRPC.BulkDataSource] ~ { ERROR CrRPC.Error[h, notImplemented, "bulk data not implemented"] }; PutBulkDataSink: PROC [h: Handle, descriptor: CrRPC.BulkDataSink] ~ { ERROR CrRPC.Error[h, notImplemented, "bulk data not implemented"] }; GetB: PROC [h: Handle] RETURNS [byte: BYTE] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.odd THEN { TRUSTED { byte _ dH.b.body.bytes[2*dH.hIndex - 1] }; dH.odd _ FALSE; } ELSE { IF dH.hIndex >= dH.hLength THEN ERROR CrRPC.Error[h, resultsTooShort, "results too short"]; TRUSTED { byte _ dH.b.body.bytes[2*dH.hIndex] }; dH.hIndex _ dH.hIndex.SUCC; dH.odd _ TRUE; }; }; GetH: PROC [h: Handle] RETURNS [hWord: HWORD] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.hIndex >= dH.hLength THEN ERROR CrRPC.Error[h, resultsTooShort, "results too short"]; TRUSTED { hWord _ dH.b.body.hWords[dH.hIndex] }; dH.hIndex _ dH.hIndex.SUCC; dH.odd _ FALSE; }; GetF: PROC [h: Handle] RETURNS [fWord: FWORD] ~ { dH: DataHandle ~ NARROW[h.data]; IF (dH.hIndex+1) >= dH.hLength THEN ERROR CrRPC.Error[h, resultsTooShort, "results too short"]; TRUSTED { PrincOpsUtils.LongCopy[ from~@dH.b.body.hWords + dH.hIndex, to~@fWord, nwords~2]; }; dH.hIndex _ dH.hIndex + 2; dH.odd _ FALSE }; UnsafeGetBlock: UNSAFE PROC [h: Handle, block: IO.UnsafeBlock] RETURNS [nBytes: INT _ 0] ~ { FOR i: INT IN [block.startIndex .. block.startIndex+block.count) DO TRUSTED { block.base[i] _ LOOPHOLE[GetB[h]] }; ENDLOOP; }; GetBulkDataSource: PROC [h: Handle] RETURNS [CrRPC.BulkDataSource] ~ { ERROR CrRPC.Error[h, notImplemented, "bulk data not implemented"] }; GetBulkDataSink: PROC [h: Handle] RETURNS [CrRPC.BulkDataSink] ~ { ERROR CrRPC.Error[h, notImplemented, "bulk data not implemented"] }; HAlign: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; dH.odd _ FALSE }; SetRemote: PROC [h: Handle, remote: XNS.Address] RETURNS [Handle] ~ { <> dH: DataHandle ~ NARROW[h.data]; IF remote.socket = XNS.unknownSocket THEN ERROR CrRPC.Error[h, notImplemented, "no well-known socket"]; dH.doExpandingBroadcast _ (remote.net = XNS.broadcastNet); dH.lowHops _ dH.highHops _ 0; XNSSocket.SetRemoteAddress[dH.socket, remote]; RETURN [h] }; defaultTimeoutMsec: INT ~ 200; minTimeoutMsec: INT ~ 50; SetTimeout: PROC [h: Handle, timeoutMsec: INT] RETURNS [Handle] ~ { <> dH: DataHandle ~ NARROW[h.data]; dH.timeoutMsec _ IF timeoutMsec = 0 THEN defaultTimeoutMsec ELSE MAX[timeoutMsec, minTimeoutMsec]; RETURN [h] }; SetHops: PROC[h: Handle, low, high: NAT] RETURNS [Handle] ~ { dH: DataHandle ~ NARROW[h.data]; IF NOT dH.doExpandingBroadcast THEN ERROR CrRPC.Error[h, notImplemented, "not a broadcast handle"]; dH.lowHops _ MIN[low, XNSRouter.maxHops]; dH.highHops _ MIN[high, XNSRouter.maxHops]; RETURN [h] }; DestroyClient: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; XNSSocket.Destroy[dH.socket]; }; CreateClient: CrRPCFriends.CreateClientHandleProc <<[remote: XNS.Address, timeoutMsec: INT] RETURNS[Handle]>> ~ { handle: Handle _ NIL; socket: XNSSocket.Handle _ NIL; dH: DataHandle _ NIL; socket _ XNSSocket.Create[sendBuffers~1, recvBuffers~5]; XNSSocket.SetNoErrors[socket]; dH _ NEW[DataObject _ [ socket~socket, timeoutMsec~, exchangeID~BasicTime.GetClockPulses[] ]]; handle _ NEW[CrRPC.Object _ [class~$EXCHANGE, kind~client, marshallProcs~theMarshallProcs, procs~theClientProcs, data~dH]]; IF SetRemote[handle, remote] # handle THEN ERROR; IF SetTimeout[handle, timeoutMsec] # handle THEN ERROR; RETURN [handle]; }; <<>> CrRPCFriends.RegisterCreateClientHandleProc[$EXCHANGE, CreateClient]; }.