<> <> <> <<>> <> <<>> DIRECTORY Basics USING [Card32FromF, Card16FromH, FFromCard32, FWORD, HFromCard16, HWORD], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses], CrRPC USING [BeginErrorProc, BeginRejectProc, BeginReturnProc, BulkDataCheckAbortProc, BulkDataSink, BulkDataSource, BulkDataValueProc, BulkDataXferProc, CallContinuationProc, CallProc, ContinuationProc, Error, ErrorReason, GetBulkDataXferProcProc, GetCard16, GetHWord, Handle, invalidArgument, noSuchProcedure, noSuchProgram, noSuchVersion, Object, ProcsObject, PutBulkDataXferProcProc, PutCard16, ReadBulkDataStreamProc, ServerProc, SetContinuationProc, SetParametersProc, WriteBulkDataSegmentProc], CrRPCBackdoor USING [AbortHdr, AbortMsgHdr, abortMsgHdrBytes, abortMsgType, BulkDataDescriptorType, CallHdr, CallMsgHdr, callMsgHdrBytes, callMsgType, courierVersionNum, CreateClientHandleProc, CreateListenerProc, LookUpServerProc, MsgHdr, --msgHdrBytes,-- NewClientObject, RegisterCreateClientHandleProc, RegisterCreateListenerProc, RejectHdr, RejectMsgHdr, rejectMsgHdrBytes, rejectMsgType, RenewClientObject, ReturnHdr, ReturnMsgHdr, returnMsgHdrBytes, returnMsgType, SessionHdr, sessionHdrBytes], IO USING [CharsAvail, Close, EndOfStream, Error, GetBlock, PutBlock, STREAM, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock], Process USING [Detach, Pause, SecondsToTicks, Ticks], RefText USING [ObtainScratch, ReleaseScratch], Rope USING [ROPE], RuntimeError USING [BoundsFault], XNS USING [Address, Socket, unknownSocket], XNSSPPTypes USING [bulkDataSST, defaultSST, endReplySST, endSST, SubSequenceType], XNSStream USING [ConnectionClosed, Create, CreateListener, FlushInput, GetStatus, Listener, ListenerProc, SendAttention, SendClose, SendCloseReply, SendEndOfMessage, SendNow, SetSSType, State, SubSequenceType, Timeout, waitForever], XNSWKS USING [courier]; CrRPCSPPImpl: CEDAR MONITOR IMPORTS Basics, BasicTime, CrRPC, CrRPCBackdoor, IO, Process, RefText, RuntimeError, XNSStream ~ { <> FWORD: TYPE ~ Basics.FWORD; HWORD: TYPE ~ Basics.HWORD; STREAM: TYPE ~ IO.STREAM; FfC: PROC [c: CARD32] RETURNS [FWORD] ~ INLINE { RETURN [Basics.FFromCard32[c]] }; HfC: PROC [c: CARD16] RETURNS [HWORD] ~ INLINE { RETURN [Basics.HFromCard16[c]] }; CfF: PROC [f: FWORD] RETURNS [CARD32] ~ INLINE { RETURN [Basics.Card32FromF[f]] }; CfH: PROC [h: HWORD] RETURNS [CARD16] ~ INLINE { RETURN [Basics.Card16FromH[h]] }; Handle: TYPE ~ CrRPC.Handle; <> DataHandle: TYPE ~ REF DataObject; DataObject: TYPE ~ RECORD [ next: DataHandle _ NIL, handle: Handle _ NIL, lastUsed: BasicTime.Pulses, stream: IO.STREAM _ NIL, inputFlushNeeded: BOOL _ FALSE, remote: XNS.Address, sendBulkData: CrRPC.BulkDataXferProc _ NIL, -- for client handles recvBulkData: CrRPC.BulkDataXferProc _ NIL, -- for client handles continuation: CrRPC.ContinuationProc _ NIL, -- for server handles continuationClientData: REF -- for server handles ]; theClientProcs: REF CrRPC.ProcsObject ~ NEW[ CrRPC.ProcsObject _ [ destroy~CheckInToCache, setParameters~SetParameters, call~Call, getBulkDataSource~ErrorGetBulkDataSource, getBulkDataSink~ErrorGetBulkDataSink, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, readBulkDataStream~ReadBulkDataStream, writeBulkDataSegment~WriteBulkDataSegment, callContinuation~CallContinuation, setContinuation~ErrorSetContinuation, finalize~Finalize ] ]; theServerProcs: REF CrRPC.ProcsObject ~ NEW[ CrRPC.ProcsObject _ [ destroy~CheckInToCache, setParameters~SetParameters, call~ErrorCall, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, putBulkDataSource~ErrorPutBulkDataSource, putBulkDataSink~ErrorPutBulkDataSink, readBulkDataStream~ReadBulkDataStream, writeBulkDataSegment~WriteBulkDataSegment, callContinuation~ErrorCallContinuation, setContinuation~SetContinuation, finalize~ErrorFinalize ] ]; <> <> <<$GetRemote returns a REF to the XNS.Address of the remote site associated with this handle. (Works for either client or server handles).>> SetParameters: CrRPC.SetParametersProc <<[h: Handle, op: ATOM, argument: REF] RETURNS [hNew: Handle, result: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; hNew _ h; SELECT op FROM $GetRemote => result _ NEW[XNS.Address _ dH.remote]; ENDCASE => ERROR CrRPC.Error[h, unknownOperation, NIL]; }; <> <> <> <> <> ErrorCall: CrRPC.CallProc ~ { ERROR CrRPC.Error[h, notClientHandle, "call using server handle"] }; Call: CrRPC.CallProc <<[h: Handle, remotePgm: CARD32, remotePgmVersion: CARD16, remoteProc: CARD16, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]>> ~ { dH: DataHandle ~ NARROW[h.data]; ConnectToServer: PROC ~ { <> <> <> sessionHdrOut: CrRPCBackdoor.SessionHdr _ [ lowVersion~HfC[CrRPCBackdoor.courierVersionNum - 1], highVersion~HfC[CrRPCBackdoor.courierVersionNum]]; sessionHdrIn: CrRPCBackdoor.SessionHdr; nRead: NAT; IF dH.stream # NIL THEN ERROR; dH.stream _ XNSStream.Create[remote~dH.remote, getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrOut]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] }; XNSStream.SendNow[dH.stream]; TRUSTED { nRead _ IO.UnsafeGetBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrIn]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] }; dH.inputFlushNeeded _ FALSE; -- ???? Is there EOM on version number pair ???? IF (nRead # CrRPCBackdoor.sessionHdrBytes) OR (CfH[sessionHdrIn.lowVersion] > CrRPCBackdoor.courierVersionNum) OR (CfH[sessionHdrIn.highVersion] < CrRPCBackdoor.courierVersionNum) THEN { SendCloseStream[dH]; ERROR CrRPC.Error[h, courierVersionMismatch, "Courier version error"]; }; }; SendCallMsg: PROC ~ { <> <> callMsgHdr: CrRPCBackdoor.CallMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.callMsgType], callHdr~[ tID~[0, 0], pgmNum~FfC[remotePgm], pgmVersion~HfC[remotePgmVersion], procNum~HfC[remoteProc] ] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]] }; IF putArgs # NIL THEN putArgs[h, dH.stream]; XNSStream.SendEndOfMessage[dH.stream]; }; ReadResultMsg: PROC ~ { <> msgType: HWORD _ CrRPC.GetHWord[dH.stream]; SELECT msgType FROM CrRPCBackdoor.rejectMsgType => { rejectReason: CARDINAL; [] _ CrRPC.GetHWord[dH.stream]; rejectReason _ CrRPC.GetCard16[dH.stream]; [] _ AdvanceInput[h~h, wait~FALSE]; ERROR CrRPC.Error[h, (SELECT rejectReason FROM CrRPC.noSuchProgram => rejectedNoSuchProgram, CrRPC.noSuchVersion => rejectedNoSuchVersion, CrRPC.noSuchProcedure => rejectedNoSuchProcedure, CrRPC.invalidArgument => rejectedInvalidArgument, ENDCASE => rejectedUnspecified), "rejected"]; }; CrRPCBackdoor.returnMsgType => { [] _ CrRPC.GetHWord[dH.stream]; IF getResults # NIL THEN getResults[h, dH.stream]; [] _ AdvanceInput[h~h, wait~FALSE]; }; CrRPCBackdoor.abortMsgType => { errNum: CARDINAL; [] _ CrRPC.GetHWord[dH.stream]; errNum _ CrRPC.GetCard16[dH.stream]; IF getError # NIL THEN getError[h, dH.stream, errNum]; [] _ AdvanceInput[h~h, wait~FALSE]; ERROR CrRPC.Error[h, remoteError, "unexpected remote error"] }; ENDCASE => { SendCloseStream[dH]; ERROR CrRPC.Error[h, protocolError, "bad msgType in response"] }; }; ClientRdySysRecv: PROC ~ { <> <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; DO [state, sst] _ AdvanceInput[h~h, wait~TRUE]; IF state = open THEN EXIT; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; IF sst # 0 THEN { SmashStream[dH]; ERROR CrRPC.Error[h, protocolError, "sys sst # 0"] }; dH.inputFlushNeeded _ TRUE; }; ClientRdyBDTSend: PROC RETURNS [ok: BOOL] ~ { <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~FALSE]; SELECT state FROM open => { ok _ (IO.CharsAvail[self~dH.stream, wait~FALSE] = 0) }; attention => { ok _ TRUE }; ENDCASE => ERROR; }; ClientRdyBDTRecv: PROC RETURNS [ok: BOOL] ~ { <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~TRUE]; SELECT state FROM open, endOfMessage => { ok _ (sst # 0); dH.inputFlushNeeded _ ok }; attention => { ok _ TRUE }; ENDCASE => ERROR; }; CheckAbortRecv: CrRPC.BulkDataCheckAbortProc ~ { [] _ IO.CharsAvail[self~dH.stream, wait~TRUE]; RETURN [ XNSStream.GetStatus[self~dH.stream, reset~FALSE].state = attention ] }; CheckAbortSend: CrRPC.BulkDataCheckAbortProc ~ { RETURN[ (XNSStream.GetStatus[self~dH.stream, reset~FALSE].state # open) OR (IO.CharsAvail[self~dH.stream, wait~FALSE] > 0) ] }; <> BEGIN dH.sendBulkData _ NIL; dH.recvBulkData _ NIL; BEGIN IF dH.stream # NIL THEN { ENABLE XNSStream.ConnectionClosed, CrRPC.Error, IO.Error => { SmashStream[dH]; CONTINUE }; [] _ AdvanceInput[h~h, wait~FALSE]; -- check for close SendCallMsg[]; GOTO Done }; { -- dH.stream = NIL -- ENABLE XNSStream.ConnectionClosed, IO.Error => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, "communication failure"] }; ConnectToServer[]; SendCallMsg[]; GOTO Done }; EXITS Done => NULL; END; <> BEGIN ENABLE { XNSStream.ConnectionClosed, IO.Error => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, "communication failure"] }; }; SELECT TRUE FROM (dH.sendBulkData # NIL) => { <> IF ClientRdyBDTSend[].ok THEN { XNSStream.SetSSType[dH.stream, XNSSPPTypes.bulkDataSST]; IF dH.sendBulkData[h, dH.stream, CheckAbortSend].abort THEN XNSStream.SendAttention[dH.stream, 1] -- ???? Define 1 ???? ELSE XNSStream.SendEndOfMessage[dH.stream]; XNSStream.SetSSType[dH.stream, XNSSPPTypes.defaultSST]; }; }; (dH.recvBulkData # NIL) => { <> ENABLE { IO.EndOfStream => { SendCloseStream[dH]; ERROR CrRPC.Error[h, bulkDataError, "end of stream"] }; RuntimeError.BoundsFault => { SendCloseStream[dH]; ERROR CrRPC.Error[h, bulkDataError, "bulk data value bounds fault"] }; }; IF ClientRdyBDTRecv[].ok THEN { <> IF dH.recvBulkData[h, dH.stream, CheckAbortRecv].abort THEN XNSStream.SendAttention[dH.stream, 1]; -- ???? Define 1 ???? }; }; ENDCASE => { <> NULL; }; { ENABLE { IO.EndOfStream => { SendCloseStream[dH]; ERROR CrRPC.Error[h, resultsError, "end of stream"] }; RuntimeError.BoundsFault => { SendCloseStream[dH]; ERROR CrRPC.Error[h, resultsError, "result value bounds fault"] }; }; ClientRdySysRecv[]; ReadResultMsg[]; <<[] _ AdvanceInput[h~h, wait~FALSE]; -- done by ReadResultMsg>> }; END; END; }; -- of Call <> <> <> ServerRdySysRecv: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; DO [state, sst] _ AdvanceInput[h~h, wait~TRUE]; IF state = open THEN EXIT; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"]; dH.inputFlushNeeded _ TRUE; }; ServerRdyBDTRecv: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~TRUE]; SELECT state FROM open, endOfMessage => { IF sst = 0 THEN ERROR CrRPC.Error[h, protocolError, "missing bulk data"]; dH.inputFlushNeeded _ TRUE }; attention => { NULL }; ENDCASE => ERROR; }; ServerRdyBDTSend: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~FALSE]; SELECT state FROM open => { IF IO.CharsAvail[self~dH.stream, wait~FALSE] # 0 THEN ERROR CrRPC.Error[h, protocolError, "client send to bdt source"]; }; attention => { NULL }; ENDCASE => ERROR; }; BeginReturn: CrRPC.BeginReturnProc <<[h: Handle]>> ~ { <> dH: DataHandle ~ NARROW[h.data]; returnMsgHdr: CrRPCBackdoor.ReturnMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.returnMsgType], returnHdr~[tID~[hi~0, lo~0]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@returnMsgHdr]], startIndex~0, count~CrRPCBackdoor.returnMsgHdrBytes]] }; }; BeginError: CrRPC.BeginErrorProc <<[h: Handle, errNum: CARDINAL]>> ~{ <> dH: DataHandle ~ NARROW[h.data]; abortMsgHdr: CrRPCBackdoor.AbortMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.abortMsgType], abortHdr~[tID~[0,0], errNum~HfC[errNum]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@abortMsgHdr]], startIndex~0, count~CrRPCBackdoor.abortMsgHdrBytes]] }; }; BeginReject: CrRPC.BeginRejectProc <<[h: Handle, rejectReason: CARDINAL]>> ~ { <> dH: DataHandle ~ NARROW[h.data]; rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.rejectMsgType], rejectHdr~[tID~[0,0], rejectReason~HfC[rejectReason]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] }; }; ListenerProc: XNSStream.ListenerProc <<[stream: IO.STREAM, remote: XNS.Address]>> ~ { sessionHdr: CrRPCBackdoor.SessionHdr; callMsgHdr: CrRPCBackdoor.CallMsgHdr; desiredPgm: CARD32; desiredPgmVersion: CARD16; desiredProc: CARD16; serverProc: CrRPC.ServerProc; nBytes: INT; h: Handle; dH: DataHandle; BEGIN ENABLE { XNSStream.ConnectionClosed => GOTO Out; CrRPC.Error, IO.Error, XNSStream.Timeout, RuntimeError.BoundsFault => GOTO Close; }; <> TRUSTED { nBytes _ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]]}; IF nBytes # CrRPCBackdoor.sessionHdrBytes THEN GOTO Close; IF (CfH[sessionHdr.lowVersion] > CrRPCBackdoor.courierVersionNum) OR (CfH[sessionHdr.highVersion] < CrRPCBackdoor.courierVersionNum) THEN { GOTO Close }; sessionHdr.lowVersion _ sessionHdr.highVersion _ HfC[CrRPCBackdoor.courierVersionNum]; TRUSTED { IO.UnsafePutBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]] }; XNSStream.SendNow[stream]; <> dH _ NEW[DataObject _ [lastUsed~, stream~stream, remote~remote]]; h _ NEW[CrRPC.Object _ [class~$SPP, kind~server, procs~theServerProcs, data~dH, clientData~NIL]]; <> DO <> ServerRdySysRecv[h]; TRUSTED { nBytes _ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]]}; IF (nBytes # CrRPCBackdoor.callMsgHdrBytes) OR (callMsgHdr.msgHdr.msgType # CrRPCBackdoor.callMsgType) THEN { GOTO Close }; desiredPgm _ CfF[callMsgHdr.callHdr.pgmNum]; desiredPgmVersion _ CfH[callMsgHdr.callHdr.pgmVersion]; desiredProc _ CfH[callMsgHdr.callHdr.procNum]; serverProc _ CrRPCBackdoor.LookUpServerProc[desiredPgm, desiredPgmVersion]; SELECT serverProc FROM NIL => -- Don't have server, reject the call -- { rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.rejectMsgType], rejectHdr~[tID~[0,0], rejectReason~HfC[CrRPC.noSuchProgram]] ]; TRUSTED { IO.UnsafePutBlock[stream, [base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] }; XNSStream.SendEndOfMessage[stream] }; ENDCASE => -- Serve the call -- { serverProc[h, stream, desiredPgm, desiredPgmVersion, desiredProc, BeginReturn, BeginError, BeginReject]; IF dH.stream = NIL THEN GOTO Smashed; -- client caught error XNSStream.SendEndOfMessage[stream]; IF dH.continuation # NIL THEN { newStream: IO.STREAM _ dH.continuation[dH.stream, dH.continuationClientData]; IF newStream = NIL THEN GOTO Closed; IF newStream # dH.stream THEN GOTO Smashed; }; }; ENDLOOP; END; EXITS Close => { [] _ XNSStream.SendClose[stream ! XNSStream.ConnectionClosed => CONTINUE]; IO.Close[self~stream, abort~TRUE]; }; Out => { IO.Close[self~stream, abort~TRUE]; }; Closed, Smashed => NULL; }; ListenerValue: TYPE ~ REF ListenerValueObject; ListenerValueObject: TYPE ~ RECORD [ next: ListenerValue, socket: XNS.Socket, listener: XNSStream.Listener ]; listeners: ListenerValue _ NIL; CreateListener: ENTRY CrRPCBackdoor.CreateListenerProc <<[local: CrRPC.RefAddress]>> ~ { newListener: XNSStream.Listener; socket: XNS.Socket _ XNS.unknownSocket; IF local # NIL THEN WITH local SELECT FROM rA: REF XNS.Address => socket _ rA.socket; rS: REF XNS.Socket => socket _ rS^; ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL]; IF socket = XNS.unknownSocket THEN socket _ XNSWKS.courier; FOR p: ListenerValue _ listeners, p.next WHILE p # NIL DO IF p.socket = socket THEN RETURN; ENDLOOP; newListener _ XNSStream.CreateListener[socket~socket, worker~ListenerProc, getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever]; listeners _ NEW[ListenerValueObject _ [next~listeners, socket~socket, listener~newListener]]; }; <> AdvanceInput: PROC [h: Handle, wait: BOOL _ FALSE] RETURNS [state: XNSStream.State, ssType: XNSStream.SubSequenceType] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; sawBreak: BOOL _ FALSE; IF dH.inputFlushNeeded THEN { [] _ XNSStream.FlushInput[self~dH.stream, wait~TRUE]; dH.inputFlushNeeded _ FALSE }; DO [state~state, ssType~ssType] _ XNSStream.GetStatus[self~dH.stream, reset~FALSE]; IF (ssType = XNSSPPTypes.endSST) OR (ssType = XNSSPPTypes.endReplySST) THEN { ReplyCloseStream[dH]; ERROR CrRPC.Error[h, protocolError, "remote close"] }; SELECT state FROM open => { IF NOT wait THEN RETURN; IF IO.CharsAvail[self~dH.stream, wait~TRUE] > 0 THEN RETURN; LOOP }; attention => RETURN; endOfMessage => { IF sawBreak THEN RETURN; sawBreak _ TRUE }; ssTypeChange => { sawBreak _ TRUE }; ENDCASE => ERROR; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; }; SmashStream: PROC [dH: DataHandle] ~ { <> IF dH.stream # NIL THEN { IO.Close[self~dH.stream, abort~TRUE]; dH.stream _ NIL }; }; SendCloseStream: PROC [dH: DataHandle] ~ { <> IF dH.stream # NIL THEN { [] _ XNSStream.SendClose[dH.stream ! XNSStream.ConnectionClosed => CONTINUE]; IO.Close[self~dH.stream, abort~TRUE]; dH.stream _ NIL }; }; ReplyCloseStream: PROC [dH: DataHandle] ~ { <> IF dH.stream # NIL THEN { [] _ XNSStream.SendCloseReply[dH.stream ! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE]; IO.Close[self~dH.stream, abort~TRUE]; dH.stream _ NIL }; }; ErrorPutBulkDataSource: CrRPC.PutBulkDataXferProcProc <<[h: Handle, s: STREAM, proc: BulkDataXferProc]>> ~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSource on server handle"] }; PutBulkDataSource: CrRPC.PutBulkDataXferProcProc <<[h: Handle, s: STREAM, proc: BulkDataXferProc]>> ~ { dH: DataHandle ~ NARROW[h.data]; IF proc = NIL THEN { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]]; TRUSTED { dH.sendBulkData _ proc }}; }; ErrorPutBulkDataSink: CrRPC.PutBulkDataXferProcProc <<[h: Handle, s: STREAM, proc: BulkDataXferProc]>> ~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSink on server handle"] }; PutBulkDataSink: CrRPC.PutBulkDataXferProcProc <<[h: Handle, s: STREAM, proc: BulkDataXferProc]>> ~ { dH: DataHandle ~ NARROW[h.data]; IF proc = NIL THEN { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]]; TRUSTED { dH.recvBulkData _ proc }}; }; ErrorGetBulkDataSource: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { ERROR CrRPC.Error[h, notServerHandle, "GetBDTSource on client handle"] }; GetBulkDataSource: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; typeCode _ CrRPC.GetCard16[s]; SELECT typeCode FROM ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => { bulkDataSource _ NIL }; ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSource _ ServerRecvBulkData }; ORD[CrRPCBackdoor.BulkDataDescriptorType.active], ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => { ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] }; ENDCASE => { ERROR CrRPC.Error[h, protocolError, "bad bulkDataSource type"] }; }; ServerRecvBulkData: CrRPC.BulkDataXferProc <<[h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL]>> ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT ~ RefText.ObtainScratch[bufSize]; nBytes: INT; state: XNSStream.State; ssType: XNSSPPTypes.SubSequenceType; abort _ FALSE; ServerRdyBDTRecv[h]; DO SELECT nBytes _ IO.GetBlock[self~dH.stream, block~b] FROM 0 => { [state~state, ssType~ssType] _ XNSStream.GetStatus[dH.stream, TRUE]; SELECT state FROM attention => { abort _ TRUE; EXIT }; endOfMessage => { dH.inputFlushNeeded _ FALSE; EXIT }; ssTypeChange => SELECT ssType FROM 0 => ERROR CrRPC.Error[h, protocolError, "bdt no eom"]; XNSSPPTypes.endSST, XNSSPPTypes.endReplySST => ERROR CrRPC.Error[h, remoteClose, "remote close"]; ENDCASE => NULL; -- ???? is this an error ???? open => NULL; ENDCASE => ERROR; }; ENDCASE => { IO.PutBlock[self~s, block~b, count~nBytes]; }; IF checkAbort[h].abort THEN { XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ???? EXIT }; ENDLOOP; RefText.ReleaseScratch[b]; }; ErrorGetBulkDataSink: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { ERROR CrRPC.Error[h, notServerHandle, "GetBDTSink on client handle"] }; GetBulkDataSink: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; typeCode _ CrRPC.GetCard16[s]; SELECT typeCode FROM ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => { bulkDataSink _ NIL }; ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSink _ ServerSendBulkData }; ORD[CrRPCBackdoor.BulkDataDescriptorType.active], ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => { ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] }; ENDCASE => { ERROR CrRPC.Error[h, protocolError, "bad bulkDataSink type"] }; }; ServerSendBulkData: CrRPC.BulkDataXferProc <<[h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL]>> ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT ~ RefText.ObtainScratch[bufSize]; nBytes: INT; state: XNSStream.State; ssType: XNSSPPTypes.SubSequenceType; abort _ FALSE; ServerRdyBDTSend[h]; XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.bulkDataSST]; { DO <> [state~state, ssType~ssType] _ XNSStream.GetStatus[dH.stream, FALSE]; SELECT state FROM open => NULL; attention, endOfMessage, ssTypeChange => { abort _ TRUE; GOTO FinishWithEOM }; ENDCASE => ERROR; <> IF checkAbort[h].abort THEN { GOTO FinishWithAttention }; <> SELECT nBytes _ IO.GetBlock[self~s, block~b] FROM 0 => { GOTO FinishWithEOM }; ENDCASE => { IO.PutBlock[self~dH.stream, block~b, count~nBytes]; }; ENDLOOP; EXITS FinishWithAttention => XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ???? FinishWithEOM => XNSStream.SendEndOfMessage[dH.stream]; }; RefText.ReleaseScratch[b]; XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.defaultSST]; }; <> nextSegment: CARD16 ~ 0; lastSegment: CARD16 ~ 1; ReadBulkDataStream: CrRPC.ReadBulkDataStreamProc <<[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, readValue: BulkDataValueProc] RETURNS [abort: BOOL]>> ~ { ENABLE { IO.EndOfStream, IO.Error, XNSStream.ConnectionClosed => ERROR CrRPC.Error[NIL, communicationFailure, "communication failure"]; RuntimeError.BoundsFault => ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"]; }; segmentKind: CARD16; len: CARD16; DO IF checkAbort[h] THEN RETURN[FALSE]; segmentKind _ CrRPC.GetCard16[s]; len _ CrRPC.GetCard16[s]; THROUGH [1..len] DO IF checkAbort[h] THEN RETURN[FALSE]; IF readValue[s].abort THEN RETURN[TRUE]; ENDLOOP; IF segmentKind = lastSegment THEN RETURN[FALSE]; ENDLOOP; }; WriteBulkDataSegment: CrRPC.WriteBulkDataSegmentProc <<[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, writeValue: BulkDataValueProc, n: CARDINAL] RETURNS [abort: BOOL, heAborted: BOOL]>> ~ { ENABLE IO.EndOfStream, IO.Error, XNSStream.ConnectionClosed => ERROR CrRPC.Error[NIL, communicationFailure, "communication failure"]; CrRPC.PutCard16[s, (IF n > 0 THEN nextSegment ELSE lastSegment)]; CrRPC.PutCard16[s, n]; abort _ heAborted _ FALSE; THROUGH [1..n] DO IF checkAbort[h] THEN { heAborted _ TRUE; RETURN }; IF writeValue[s].abort THEN { abort _ TRUE; RETURN }; ENDLOOP; }; <> ErrorCallContinuation: CrRPC.CallContinuationProc ~ { ERROR CrRPC.Error[h, notClientHandle, "CallContinuation on server handle"] }; CallContinuation: CrRPC.CallContinuationProc <<[h: Handle, proc: ContinuationProc, clientData: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; newStream: IO.STREAM; { ENABLE XNSStream.ConnectionClosed, XNSStream.Timeout, IO.EndOfStream, IO.Error => ERROR CrRPC.Error[NIL, communicationFailure, NIL]; newStream _ (IF proc # NIL THEN proc[dH.stream, clientData] ELSE dH.stream); }; IF (newStream # NIL) AND (newStream # dH.stream) THEN ERROR CrRPC.Error[NIL, notImplemented, NIL]; dH.stream _ newStream; }; ErrorSetContinuation: CrRPC.SetContinuationProc ~ { ERROR CrRPC.Error[h, notServerHandle, "SetContinuation on client handle"] }; SetContinuation: CrRPC.SetContinuationProc <<[h: Handle, proc: ContinuationProc, clientData: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; dH.continuation _ proc; dH.continuationClientData _ clientData }; <> <> cache: DataHandle _ NIL; cachePulseOut: BasicTime.Pulses ~ BasicTime.MicrosecondsToPulses[60 * 1000000]; cacheSweepInterval: Process.Ticks ~ Process.SecondsToTicks[21]; PulsesSince: PROC [then: BasicTime.Pulses] RETURNS [BasicTime.Pulses] ~ { RETURN [ BasicTime.GetClockPulses[] - then ] }; CheckOutFromCache: ENTRY PROC [remote: XNS.Address] RETURNS [h: Handle _ NIL] ~ { dH: DataHandle _ cache; prevH: DataHandle _ NIL; WHILE dH # NIL DO IF dH.remote = remote THEN { IF prevH = NIL THEN cache _ dH.next ELSE prevH.next _ dH.next; h _ dH.handle; dH.handle _ NIL; -- for finalization dH.lastUsed _ BasicTime.GetClockPulses[]; EXIT }; prevH _ dH; dH _ dH.next; ENDLOOP; }; CheckInToCache: ENTRY PROC [h: Handle] ~ { dH: DataHandle _ NARROW[h.data]; dH.handle _ h; dH.lastUsed _ BasicTime.GetClockPulses[]; dH.next _ cache; cache _ dH }; CacheSweeper: PROC ~ { GetDeadHandles: ENTRY PROC RETURNS [deadList: DataHandle _ NIL] ~ { dH, newCache, newCacheTail: DataHandle _ NIL; WHILE cache # NIL DO dH _ cache; cache _ cache.next; IF PulsesSince[dH.lastUsed] <= cachePulseOut THEN -- add to new cache -- { dH.next _ NIL; IF newCacheTail = NIL THEN newCache _ dH ELSE newCacheTail.next _ dH; newCacheTail _ dH } ELSE -- add to dead list -- { dH.next _ deadList; deadList _ dH }; ENDLOOP; cache _ newCache; }; DestroyDeadHandles: PROC [deadList: DataHandle] ~ { dH: DataHandle; WHILE deadList # NIL DO dH _ deadList; deadList _ dH.next; BuryDeadHandle[dH.handle]; ENDLOOP; }; DO DestroyDeadHandles[ GetDeadHandles[] ]; Process.Pause[ cacheSweepInterval ]; ENDLOOP; }; BuryDeadHandle: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; dH.handle _ NIL; -- break cycle for GC SendCloseStream[dH]; h.data _ NIL }; <> ErrorFinalize: PROC [h: Handle] ~ { ERROR }; Finalize: PROC [h: Handle] ~ { <> dH: DataHandle ~ NARROW[h.data]; IF dH = NIL THEN -- handle is already dead -- RETURN; IF PulsesSince[dH.lastUsed] <= cachePulseOut THEN { CrRPCBackdoor.RenewClientObject[h]; CheckInToCache[h]; RETURN }; BuryDeadHandle[h]; }; CreateClientHandle: CrRPCBackdoor.CreateClientHandleProc <<[remote: CrRPC.RefAddress] RETURNS [Handle]>> ~ { handle: Handle; dH: DataHandle; addr: XNS.Address; IF remote = NIL THEN ERROR CrRPC.Error[NIL, addressInappropriateForClass, "missing remote address"]; WITH remote SELECT FROM rA: REF XNS.Address => addr _ rA^; ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL]; IF addr.socket = XNS.unknownSocket THEN addr.socket _ XNSWKS.courier; handle _ CheckOutFromCache[addr]; IF handle = NIL THEN { dH _ NEW[DataObject _ [lastUsed~BasicTime.GetClockPulses[], remote~addr]]; handle _ CrRPCBackdoor.NewClientObject[class~$SPP, procs~theClientProcs, data~dH]; }; RETURN [handle]; }; <<>> CrRPCBackdoor.RegisterCreateClientHandleProc[$SPP, CreateClientHandle]; CrRPCBackdoor.RegisterCreateListenerProc[$SPP, CreateListener]; TRUSTED { Process.Detach[ FORK CacheSweeper[] ] }; }.