<<>> <> <> <> <> <> <<>> <> <<>> DIRECTORY Basics USING [Card32FromF, Card16FromH, FFromCard32, FWORD, HFromCard16, HWORD], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses], CrRPC USING [BeginErrorProc, BeginRejectProc, BeginReturnProc, BulkDataCheckAbortProc, BulkDataSink, BulkDataSource, BulkDataValueProc, BulkDataXferProc, CallContinuationProc, CallProc, ContinuationProc, Error, ErrorReason, GetBulkDataXferProcProc, GetCard16, GetHWord, Handle, invalidArgument, noSuchProcedure, noSuchProgram, noSuchVersion, Object, ProcsObject, PutBulkDataXferProcProc, PutCard16, ReadBulkDataStreamProc, ServerProc, SetContinuationProc, SetParametersProc, WriteBulkDataSegmentProc], CrRPCBackdoor USING [AbortHdr, AbortMsgHdr, abortMsgHdrBytes, abortMsgType, BulkDataDescriptorType, CallHdr, CallMsgHdr, callMsgHdrBytes, callMsgType, courierVersionNum, CreateClientHandleProc, CreateListenerProc, LookUpServerProc, MsgHdr, --msgHdrBytes,-- NewClientObject, RegisterCreateClientHandleProc, RegisterCreateListenerProc, RejectHdr, RejectMsgHdr, rejectMsgHdrBytes, rejectMsgType, RenewClientObject, ReturnHdr, ReturnMsgHdr, returnMsgHdrBytes, returnMsgType, SessionHdr, sessionHdrBytes], IO USING [CharsAvail, Close, EndOf, EndOfStream, Error, GetBlock, PutBlock, PutFR1, STREAM, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock], Process USING [Detach, Pause, SecondsToTicks, Ticks], RefText USING [ObtainScratch, ReleaseScratch], Rope USING [ROPE], RuntimeError USING [BoundsFault], XNS USING [Address, Socket, unknownSocket], XNSSPPTypes USING [bulkDataSST, defaultSST, endReplySST, endSST, SubSequenceType], XNSStream USING [ConnectionClosed, Create, CreateListener, FlushInput, GetStatus, Listener, ListenerProc, SendAttention, SendClose, SendCloseReply, SendEndOfMessage, SendNow, SetSSType, State, SubSequenceType, Timeout, waitForever], XNSWKS USING [courier]; CrRPCSPPImpl: CEDAR MONITOR IMPORTS Basics, BasicTime, CrRPC, CrRPCBackdoor, IO, Process, RefText, RuntimeError, XNSStream ~ { <> FWORD: TYPE ~ Basics.FWORD; HWORD: TYPE ~ Basics.HWORD; STREAM: TYPE ~ IO.STREAM; FfC: PROC [c: CARD32] RETURNS [FWORD] ~ INLINE { RETURN [Basics.FFromCard32[c]] }; HfC: PROC [c: CARD16] RETURNS [HWORD] ~ INLINE { RETURN [Basics.HFromCard16[c]] }; CfF: PROC [f: FWORD] RETURNS [CARD32] ~ INLINE { RETURN [Basics.Card32FromF[f]] }; CfH: PROC [h: HWORD] RETURNS [CARD16] ~ INLINE { RETURN [Basics.Card16FromH[h]] }; Handle: TYPE ~ CrRPC.Handle; <> <> MyCharsAvail: PROC [self: STREAM, wait: BOOL ¬ FALSE] RETURNS [n: INT] ~ INLINE { IF (n ¬ IO.CharsAvail[self, wait]) # 0 THEN IF IO.EndOf[self] THEN n ¬ 0 }; <> DataHandle: TYPE ~ REF DataObject; DataObject: TYPE ~ RECORD [ next: DataHandle ¬ NIL, handle: Handle ¬ NIL, lastUsed: BasicTime.Pulses, stream: IO.STREAM ¬ NIL, inputFlushNeeded: BOOL ¬ FALSE, remote: XNS.Address, sendBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles recvBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles continuation: CrRPC.ContinuationProc ¬ NIL, -- for server handles continuationClientData: REF -- for server handles ]; theClientProcs: REF CrRPC.ProcsObject ~ NEW[ CrRPC.ProcsObject ¬ [ destroy~CheckInToCache, setParameters~SetParameters, call~Call, getBulkDataSource~ErrorGetBulkDataSource, getBulkDataSink~ErrorGetBulkDataSink, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, readBulkDataStream~ReadBulkDataStream, writeBulkDataSegment~WriteBulkDataSegment, callContinuation~CallContinuation, setContinuation~ErrorSetContinuation, finalize~Finalize ] ]; theServerProcs: REF CrRPC.ProcsObject ~ NEW[ CrRPC.ProcsObject ¬ [ destroy~CheckInToCache, setParameters~SetParameters, call~ErrorCall, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, putBulkDataSource~ErrorPutBulkDataSource, putBulkDataSink~ErrorPutBulkDataSink, readBulkDataStream~ReadBulkDataStream, writeBulkDataSegment~WriteBulkDataSegment, callContinuation~ErrorCallContinuation, setContinuation~SetContinuation, finalize~ErrorFinalize ] ]; <> alwaysSmashTheSocket: BOOL ¬ FALSE; <> <> <<$GetRemote returns a REF to the XNS.Address of the remote site associated with this handle. (Works for either client or server handles).>> SetParameters: CrRPC.SetParametersProc <<[h: Handle, op: ATOM, argument: REF] RETURNS [hNew: Handle, result: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; hNew ¬ h; SELECT op FROM $GetRemote => result ¬ NEW[XNS.Address ¬ dH.remote]; ENDCASE => ERROR CrRPC.Error[h, unknownOperation, NIL]; }; <> <> <> <> <> ErrorCall: CrRPC.CallProc ~ { ERROR CrRPC.Error[h, notClientHandle, "call using server handle"] }; Call: CrRPC.CallProc <<[h: Handle, remotePgm: CARD32, remotePgmVersion: CARD16, remoteProc: CARD16, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]>> ~ { dH: DataHandle ~ NARROW[h.data]; ConnectToServer: PROC ~ { <> <> <> sessionHdrOut: CrRPCBackdoor.SessionHdr ¬ [ lowVersion~HfC[CrRPCBackdoor.courierVersionNum - 1], highVersion~HfC[CrRPCBackdoor.courierVersionNum]]; sessionHdrIn: CrRPCBackdoor.SessionHdr; nRead: NAT; IF dH.stream # NIL THEN ERROR; dH.stream ¬ XNSStream.Create[remote~dH.remote, getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrOut]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] }; XNSStream.SendNow[dH.stream]; TRUSTED { nRead ¬ IO.UnsafeGetBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrIn]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] }; dH.inputFlushNeeded ¬ FALSE; -- ???? Is there EOM on version number pair ???? IF (nRead # CrRPCBackdoor.sessionHdrBytes) OR (CfH[sessionHdrIn.lowVersion] > CrRPCBackdoor.courierVersionNum) OR (CfH[sessionHdrIn.highVersion] < CrRPCBackdoor.courierVersionNum) THEN { SendCloseStream[dH]; ERROR CrRPC.Error[h, courierVersionMismatch, "Courier version error"]; }; }; SendCallMsg: PROC ~ { <> <> callMsgHdr: CrRPCBackdoor.CallMsgHdr ¬ [ msgHdr~[msgType~CrRPCBackdoor.callMsgType], callHdr~[ tID~[0, 0], pgmNum~FfC[remotePgm], pgmVersion~HfC[remotePgmVersion], procNum~HfC[remoteProc] ] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]] }; IF putArgs # NIL THEN putArgs[h, dH.stream]; XNSStream.SendEndOfMessage[dH.stream]; }; ReadResultMsg: PROC ~ { <> msgType: HWORD ¬ CrRPC.GetHWord[dH.stream]; SELECT msgType FROM CrRPCBackdoor.rejectMsgType => { rejectReason: CARDINAL; [] ¬ CrRPC.GetHWord[dH.stream]; rejectReason ¬ CrRPC.GetCard16[dH.stream]; [] ¬ AdvanceInput[h~h, wait~FALSE]; ERROR CrRPC.Error[h, (SELECT rejectReason FROM CrRPC.noSuchProgram => rejectedNoSuchProgram, CrRPC.noSuchVersion => rejectedNoSuchVersion, CrRPC.noSuchProcedure => rejectedNoSuchProcedure, CrRPC.invalidArgument => rejectedInvalidArgument, ENDCASE => rejectedUnspecified), "rejected"]; }; CrRPCBackdoor.returnMsgType => { [] ¬ CrRPC.GetHWord[dH.stream]; IF getResults # NIL THEN getResults[h, dH.stream]; [] ¬ AdvanceInput[h~h, wait~FALSE]; }; CrRPCBackdoor.abortMsgType => { errNum: CARDINAL; [] ¬ CrRPC.GetHWord[dH.stream]; errNum ¬ CrRPC.GetCard16[dH.stream]; IF getError # NIL THEN getError[h, dH.stream, errNum]; [] ¬ AdvanceInput[h~h, wait~FALSE]; ERROR CrRPC.Error[h, remoteError, "unexpected remote error"] }; ENDCASE => { SendCloseStream[dH]; ERROR CrRPC.Error[h, protocolError, "bad msgType in response"] }; }; ClientRdySysRecv: PROC ~ { <> <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; DO [state, sst] ¬ AdvanceInput[h~h, wait~TRUE]; IF state = open THEN EXIT; [] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; IF sst # 0 THEN { SmashStream[dH]; ERROR CrRPC.Error[h, protocolError, "sys sst # 0"] }; dH.inputFlushNeeded ¬ TRUE; }; ClientRdyBDTSend: PROC RETURNS [ok: BOOL] ~ { <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] ¬ AdvanceInput[h~h, wait~FALSE]; SELECT state FROM open => { ok ¬ (MyCharsAvail[self~dH.stream, wait~FALSE] = 0) }; attention => { ok ¬ TRUE }; ENDCASE => ERROR; }; ClientRdyBDTRecv: PROC RETURNS [ok: BOOL] ~ { <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] ¬ AdvanceInput[h~h, wait~TRUE]; SELECT state FROM open, endOfMessage => { ok ¬ (sst # 0); dH.inputFlushNeeded ¬ ok }; attention => { ok ¬ TRUE }; ENDCASE => ERROR; }; CheckAbortRecv: CrRPC.BulkDataCheckAbortProc ~ { [] ¬ IO.CharsAvail[self~dH.stream, wait~TRUE]; -- MyCharsAvail not needed here! RETURN [ XNSStream.GetStatus[self~dH.stream, reset~FALSE].state = attention ] }; CheckAbortSend: CrRPC.BulkDataCheckAbortProc ~ { RETURN[ (XNSStream.GetStatus[self~dH.stream, reset~FALSE].state # open) OR (MyCharsAvail[self~dH.stream, wait~FALSE] > 0) ] }; <> BEGIN dH.sendBulkData ¬ NIL; dH.recvBulkData ¬ NIL; BEGIN IF dH.stream # NIL THEN { ENABLE XNSStream.ConnectionClosed, CrRPC.Error, IO.Error => { SmashStream[dH]; CONTINUE }; [] ¬ AdvanceInput[h~h, wait~FALSE]; -- check for close SendCallMsg[]; GOTO Done }; { -- dH.stream = NIL -- ENABLE { XNSStream.ConnectionClosed => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] }; IO.Error => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] }; }; ConnectToServer[]; SendCallMsg[]; GOTO Done }; EXITS Done => NULL; END; <> BEGIN ENABLE { XNSStream.ConnectionClosed => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] }; IO.Error => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] }; }; SELECT TRUE FROM (dH.sendBulkData # NIL) => { <> IF ClientRdyBDTSend[].ok THEN { XNSStream.SetSSType[dH.stream, XNSSPPTypes.bulkDataSST]; IF dH.sendBulkData[h, dH.stream, CheckAbortSend].abort THEN XNSStream.SendAttention[dH.stream, 1] -- ???? Define 1 ???? ELSE XNSStream.SendEndOfMessage[dH.stream]; XNSStream.SetSSType[dH.stream, XNSSPPTypes.defaultSST]; }; }; (dH.recvBulkData # NIL) => { <> ENABLE { IO.EndOfStream => { SendCloseStream[dH]; ERROR CrRPC.Error[h, bulkDataError, "end of stream"] }; RuntimeError.BoundsFault => { SendCloseStream[dH]; ERROR CrRPC.Error[h, bulkDataError, "bulk data value bounds fault"] }; }; IF ClientRdyBDTRecv[].ok THEN { <> IF dH.recvBulkData[h, dH.stream, CheckAbortRecv].abort THEN XNSStream.SendAttention[dH.stream, 1]; -- ???? Define 1 ???? }; }; ENDCASE => { <> NULL; }; { ENABLE { IO.EndOfStream => { SendCloseStream[dH]; ERROR CrRPC.Error[h, resultsError, "end of stream"] }; RuntimeError.BoundsFault => { SendCloseStream[dH]; ERROR CrRPC.Error[h, resultsError, "result value bounds fault"] }; }; ClientRdySysRecv[]; ReadResultMsg[]; <<[] _ AdvanceInput[h~h, wait~FALSE]; -- done by ReadResultMsg>> }; END; END; }; -- of Call <> <> <> ServerRdySysRecv: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; DO [state, sst] ¬ AdvanceInput[h~h, wait~TRUE]; IF state = open THEN EXIT; [] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"]; dH.inputFlushNeeded ¬ TRUE; }; ServerRdyBDTRecv: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] ¬ AdvanceInput[h~h, wait~TRUE]; SELECT state FROM open, endOfMessage => { IF sst = 0 THEN ERROR CrRPC.Error[h, protocolError, "missing bulk data"]; dH.inputFlushNeeded ¬ TRUE }; attention => { NULL }; ENDCASE => ERROR; }; ServerRdyBDTSend: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] ¬ AdvanceInput[h~h, wait~FALSE]; SELECT state FROM open => { IF MyCharsAvail[self~dH.stream, wait~FALSE] # 0 THEN ERROR CrRPC.Error[h, protocolError, "client send to bdt source"]; }; attention => { NULL }; ENDCASE => ERROR; }; BeginReturn: CrRPC.BeginReturnProc <<[h: Handle]>> ~ { <> dH: DataHandle ~ NARROW[h.data]; returnMsgHdr: CrRPCBackdoor.ReturnMsgHdr ¬ [ msgHdr~[msgType~CrRPCBackdoor.returnMsgType], returnHdr~[tID~[hi~0, lo~0]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@returnMsgHdr]], startIndex~0, count~CrRPCBackdoor.returnMsgHdrBytes]] }; }; BeginError: CrRPC.BeginErrorProc <<[h: Handle, errNum: CARDINAL]>> ~{ <> dH: DataHandle ~ NARROW[h.data]; abortMsgHdr: CrRPCBackdoor.AbortMsgHdr ¬ [ msgHdr~[msgType~CrRPCBackdoor.abortMsgType], abortHdr~[tID~[0,0], errNum~HfC[errNum]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@abortMsgHdr]], startIndex~0, count~CrRPCBackdoor.abortMsgHdrBytes]] }; }; BeginReject: CrRPC.BeginRejectProc <<[h: Handle, rejectReason: CARDINAL]>> ~ { <> dH: DataHandle ~ NARROW[h.data]; rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr ¬ [ msgHdr~[msgType~CrRPCBackdoor.rejectMsgType], rejectHdr~[tID~[0,0], rejectReason~HfC[rejectReason]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] }; }; ListenerProc: XNSStream.ListenerProc <<[stream: IO.STREAM, remote: XNS.Address]>> ~ { sessionHdr: CrRPCBackdoor.SessionHdr; callMsgHdr: CrRPCBackdoor.CallMsgHdr; desiredPgm: CARD32; desiredPgmVersion: CARD16; desiredProc: CARD16; serverProc: CrRPC.ServerProc; nBytes: INT; h: Handle; dH: DataHandle; BEGIN ENABLE { XNSStream.ConnectionClosed => GOTO Out; CrRPC.Error, IO.Error, XNSStream.Timeout, RuntimeError.BoundsFault => GOTO Close; }; <> TRUSTED { nBytes ¬ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]]}; IF nBytes # CrRPCBackdoor.sessionHdrBytes THEN GOTO Close; IF (CfH[sessionHdr.lowVersion] > CrRPCBackdoor.courierVersionNum) OR (CfH[sessionHdr.highVersion] < CrRPCBackdoor.courierVersionNum) THEN { GOTO Close }; sessionHdr.lowVersion ¬ sessionHdr.highVersion ¬ HfC[CrRPCBackdoor.courierVersionNum]; TRUSTED { IO.UnsafePutBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]] }; XNSStream.SendNow[stream]; <> dH ¬ NEW[DataObject ¬ [lastUsed~, stream~stream, remote~remote]]; h ¬ NEW[CrRPC.Object ¬ [class~$SPP, kind~server, procs~theServerProcs, data~dH, clientData~NIL]]; <> DO <> ServerRdySysRecv[h]; TRUSTED { nBytes ¬ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]]}; IF (nBytes # CrRPCBackdoor.callMsgHdrBytes) OR (callMsgHdr.msgHdr.msgType # CrRPCBackdoor.callMsgType) THEN { GOTO Close }; desiredPgm ¬ CfF[callMsgHdr.callHdr.pgmNum]; desiredPgmVersion ¬ CfH[callMsgHdr.callHdr.pgmVersion]; desiredProc ¬ CfH[callMsgHdr.callHdr.procNum]; serverProc ¬ CrRPCBackdoor.LookUpServerProc[desiredPgm, desiredPgmVersion]; SELECT serverProc FROM NIL => -- Don't have server, reject the call -- { rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr ¬ [ msgHdr~[msgType~CrRPCBackdoor.rejectMsgType], rejectHdr~[tID~[0,0], rejectReason~HfC[CrRPC.noSuchProgram]] ]; TRUSTED { IO.UnsafePutBlock[stream, [base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] }; XNSStream.SendEndOfMessage[stream] }; ENDCASE => -- Serve the call -- { serverProc[h, stream, desiredPgm, desiredPgmVersion, desiredProc, BeginReturn, BeginError, BeginReject]; IF dH.stream = NIL THEN GOTO Smashed; -- client caught error XNSStream.SendEndOfMessage[stream]; IF dH.continuation # NIL THEN { newStream: IO.STREAM ¬ dH.continuation[dH.stream, dH.continuationClientData]; IF newStream = NIL THEN GOTO Closed; IF newStream # dH.stream THEN GOTO Smashed; }; }; ENDLOOP; END; EXITS Close => { [] ¬ XNSStream.SendClose[stream ! XNSStream.ConnectionClosed => CONTINUE]; IO.Close[self~stream, abort~TRUE]; }; Out => { IO.Close[self~stream, abort~TRUE]; }; Closed, Smashed => NULL; }; ListenerValue: TYPE ~ REF ListenerValueObject; ListenerValueObject: TYPE ~ RECORD [ next: ListenerValue, socket: XNS.Socket, listener: XNSStream.Listener ]; listeners: ListenerValue ¬ NIL; CreateListener: ENTRY CrRPCBackdoor.CreateListenerProc <<[local: CrRPC.RefAddress]>> ~ { newListener: XNSStream.Listener; socket: XNS.Socket ¬ XNS.unknownSocket; IF local # NIL THEN WITH local SELECT FROM rA: REF XNS.Address => socket ¬ rA.socket; rS: REF XNS.Socket => socket ¬ rS­; ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL]; IF socket = XNS.unknownSocket THEN socket ¬ XNSWKS.courier; FOR p: ListenerValue ¬ listeners, p.next WHILE p # NIL DO IF p.socket = socket THEN RETURN; ENDLOOP; newListener ¬ XNSStream.CreateListener[socket~socket, worker~ListenerProc, getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever]; listeners ¬ NEW[ListenerValueObject ¬ [next~listeners, socket~socket, listener~newListener]]; }; <> AdvanceInput: PROC [h: Handle, wait: BOOL ¬ FALSE] RETURNS [state: XNSStream.State, ssType: XNSStream.SubSequenceType] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; sawBreak: BOOL ¬ FALSE; IF dH.inputFlushNeeded THEN { [] ¬ XNSStream.FlushInput[self~dH.stream, wait~TRUE]; dH.inputFlushNeeded ¬ FALSE }; DO [state~state, ssType~ssType] ¬ XNSStream.GetStatus[self~dH.stream, reset~FALSE]; IF (ssType = XNSSPPTypes.endSST) OR (ssType = XNSSPPTypes.endReplySST) THEN { ReplyCloseStream[dH]; ERROR CrRPC.Error[h, protocolError, "remote close"] }; SELECT state FROM open => { IF NOT wait THEN RETURN; IF MyCharsAvail[self~dH.stream, wait~TRUE] > 0 THEN RETURN; LOOP }; attention => RETURN; endOfMessage => { IF sawBreak THEN RETURN; sawBreak ¬ TRUE }; ssTypeChange => { sawBreak ¬ TRUE }; ENDCASE => ERROR; [] ¬ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; }; SmashStream: PROC [dH: DataHandle] ~ { <> IF dH.stream # NIL THEN { IO.Close[self~dH.stream, abort~TRUE]; dH.stream ¬ NIL }; }; SendCloseStream: PROC [dH: DataHandle] ~ { <> IF dH.stream # NIL THEN { [] ¬ XNSStream.SendClose[dH.stream ! XNSStream.ConnectionClosed => CONTINUE]; IO.Close[self~dH.stream, abort~TRUE]; dH.stream ¬ NIL }; }; ReplyCloseStream: PROC [dH: DataHandle] ~ { <> IF dH.stream # NIL THEN { [] ¬ XNSStream.SendCloseReply[dH.stream ! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE]; IO.Close[self~dH.stream, abort~TRUE]; dH.stream ¬ NIL }; }; ErrorPutBulkDataSource: CrRPC.PutBulkDataXferProcProc <<[h: Handle, s: STREAM, proc: BulkDataXferProc]>> ~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSource on server handle"] }; PutBulkDataSource: CrRPC.PutBulkDataXferProcProc <<[h: Handle, s: STREAM, proc: BulkDataXferProc]>> ~ { dH: DataHandle ~ NARROW[h.data]; IF proc = NIL THEN { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]]; TRUSTED { dH.sendBulkData ¬ proc }}; }; ErrorPutBulkDataSink: CrRPC.PutBulkDataXferProcProc <<[h: Handle, s: STREAM, proc: BulkDataXferProc]>> ~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSink on server handle"] }; PutBulkDataSink: CrRPC.PutBulkDataXferProcProc <<[h: Handle, s: STREAM, proc: BulkDataXferProc]>> ~ { dH: DataHandle ~ NARROW[h.data]; IF proc = NIL THEN { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]]; TRUSTED { dH.recvBulkData ¬ proc }}; }; ErrorGetBulkDataSource: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { ERROR CrRPC.Error[h, notServerHandle, "GetBDTSource on client handle"] }; GetBulkDataSource: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; typeCode ¬ CrRPC.GetCard16[s]; SELECT typeCode FROM ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => { bulkDataSource ¬ NIL }; ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSource ¬ ServerRecvBulkData }; ORD[CrRPCBackdoor.BulkDataDescriptorType.active], ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => { ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] }; ENDCASE => { ERROR CrRPC.Error[h, protocolError, "bad bulkDataSource type"] }; }; ServerRecvBulkData: CrRPC.BulkDataXferProc <<[h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL]>> ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT ~ RefText.ObtainScratch[bufSize]; nBytes: INT; state: XNSStream.State; ssType: XNSSPPTypes.SubSequenceType; abort ¬ FALSE; ServerRdyBDTRecv[h]; DO SELECT nBytes ¬ IO.GetBlock[self~dH.stream, block~b] FROM 0 => { [state~state, ssType~ssType] ¬ XNSStream.GetStatus[dH.stream, TRUE]; SELECT state FROM attention => { abort ¬ TRUE; EXIT }; endOfMessage => { dH.inputFlushNeeded ¬ FALSE; EXIT }; ssTypeChange => SELECT ssType FROM 0 => ERROR CrRPC.Error[h, protocolError, "bdt no eom"]; XNSSPPTypes.endSST, XNSSPPTypes.endReplySST => ERROR CrRPC.Error[h, remoteClose, "remote close"]; ENDCASE => NULL; -- ???? is this an error ???? open => NULL; ENDCASE => ERROR; }; ENDCASE => { IO.PutBlock[self~s, block~b, count~nBytes]; }; IF checkAbort[h].abort THEN { XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ???? EXIT }; ENDLOOP; RefText.ReleaseScratch[b]; }; ErrorGetBulkDataSink: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { ERROR CrRPC.Error[h, notServerHandle, "GetBDTSink on client handle"] }; GetBulkDataSink: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; typeCode ¬ CrRPC.GetCard16[s]; SELECT typeCode FROM ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => { bulkDataSink ¬ NIL }; ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSink ¬ ServerSendBulkData }; ORD[CrRPCBackdoor.BulkDataDescriptorType.active], ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => { ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] }; ENDCASE => { ERROR CrRPC.Error[h, protocolError, "bad bulkDataSink type"] }; }; ServerSendBulkData: CrRPC.BulkDataXferProc <<[h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL]>> ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT ~ RefText.ObtainScratch[bufSize]; nBytes: INT; state: XNSStream.State; ssType: XNSSPPTypes.SubSequenceType; abort ¬ FALSE; ServerRdyBDTSend[h]; XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.bulkDataSST]; { DO <> [state~state, ssType~ssType] ¬ XNSStream.GetStatus[dH.stream, FALSE]; SELECT state FROM open => NULL; attention, endOfMessage, ssTypeChange => { abort ¬ TRUE; GOTO FinishWithEOM }; ENDCASE => ERROR; <> IF checkAbort[h].abort THEN { GOTO FinishWithAttention }; <> SELECT nBytes ¬ IO.GetBlock[self~s, block~b] FROM 0 => { GOTO FinishWithEOM }; ENDCASE => { IO.PutBlock[self~dH.stream, block~b, count~nBytes]; }; ENDLOOP; EXITS FinishWithAttention => XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ???? FinishWithEOM => XNSStream.SendEndOfMessage[dH.stream]; }; RefText.ReleaseScratch[b]; XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.defaultSST]; }; <> nextSegment: CARD16 ~ 0; lastSegment: CARD16 ~ 1; ReadBulkDataStream: CrRPC.ReadBulkDataStreamProc <<[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, readValue: BulkDataValueProc] RETURNS [abort: BOOL]>> ~ { ENABLE { XNSStream.ConnectionClosed => { ERROR CrRPC.Error[NIL, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] }; IO.Error, IO.EndOfStream => { ERROR CrRPC.Error[NIL, communicationFailure, "IO.Error"] }; RuntimeError.BoundsFault => ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"]; }; segmentKind: CARD16; len: CARD16; DO IF checkAbort[h] THEN RETURN[FALSE]; segmentKind ¬ CrRPC.GetCard16[s]; len ¬ CrRPC.GetCard16[s]; THROUGH [1..len] DO IF checkAbort[h] THEN RETURN[FALSE]; IF readValue[s].abort THEN RETURN[TRUE]; ENDLOOP; IF segmentKind = lastSegment THEN RETURN[FALSE]; ENDLOOP; }; WriteBulkDataSegment: CrRPC.WriteBulkDataSegmentProc <<[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, writeValue: BulkDataValueProc, n: CARDINAL] RETURNS [abort: BOOL, heAborted: BOOL]>> ~ { ENABLE { XNSStream.ConnectionClosed => { ERROR CrRPC.Error[NIL, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] }; IO.Error, IO.EndOfStream => { ERROR CrRPC.Error[NIL, communicationFailure, "IO.Error"] }; RuntimeError.BoundsFault => ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"]; }; CrRPC.PutCard16[s, (IF n > 0 THEN nextSegment ELSE lastSegment)]; CrRPC.PutCard16[s, n]; abort ¬ heAborted ¬ FALSE; THROUGH [1..n] DO IF checkAbort[h] THEN { heAborted ¬ TRUE; RETURN }; IF writeValue[s].abort THEN { abort ¬ TRUE; RETURN }; ENDLOOP; }; <> ErrorCallContinuation: CrRPC.CallContinuationProc ~ { ERROR CrRPC.Error[h, notClientHandle, "CallContinuation on server handle"] }; CallContinuation: CrRPC.CallContinuationProc <<[h: Handle, proc: ContinuationProc, clientData: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; newStream: IO.STREAM; { ENABLE { XNSStream.Timeout => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, "Stream timeout"] }; XNSStream.ConnectionClosed => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, IO.PutFR1["Stream failure: %g", [rope[text]]]] }; IO.Error, IO.EndOfStream => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, "IO.Error"] }; RuntimeError.BoundsFault => ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"]; }; newStream ¬ (IF proc # NIL THEN proc[dH.stream, clientData] ELSE dH.stream); }; IF (newStream # NIL) AND (newStream # dH.stream) THEN ERROR CrRPC.Error[NIL, notImplemented, NIL]; dH.stream ¬ newStream; }; ErrorSetContinuation: CrRPC.SetContinuationProc ~ { ERROR CrRPC.Error[h, notServerHandle, "SetContinuation on client handle"] }; SetContinuation: CrRPC.SetContinuationProc <<[h: Handle, proc: ContinuationProc, clientData: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; TRUSTED { dH.continuation ¬ proc }; dH.continuationClientData ¬ clientData }; <> <> cache: DataHandle ¬ NIL; cachePulseOut: BasicTime.Pulses ~ BasicTime.MicrosecondsToPulses[8 * 1000000]; cacheSweepInterval: Process.Ticks ~ Process.SecondsToTicks[8]; PulsesSince: PROC [then: BasicTime.Pulses] RETURNS [BasicTime.Pulses] ~ { RETURN [ BasicTime.GetClockPulses[] - then ] }; CheckOutFromCache: ENTRY PROC [remote: XNS.Address] RETURNS [h: Handle ¬ NIL] ~ { dH: DataHandle ¬ cache; prevH: DataHandle ¬ NIL; WHILE dH # NIL DO IF dH.remote = remote THEN { IF prevH = NIL THEN cache ¬ dH.next ELSE prevH.next ¬ dH.next; h ¬ dH.handle; dH.handle ¬ NIL; -- for finalization dH.lastUsed ¬ BasicTime.GetClockPulses[]; EXIT }; prevH ¬ dH; dH ¬ dH.next; ENDLOOP; }; CheckInToCache: ENTRY PROC [h: Handle] ~ { dH: DataHandle ¬ NARROW[h.data]; dH.handle ¬ h; dH.lastUsed ¬ BasicTime.GetClockPulses[]; dH.next ¬ cache; cache ¬ dH }; CacheSweeper: PROC ~ { GetDeadHandles: ENTRY PROC RETURNS [deadList: DataHandle ¬ NIL] ~ { dH, newCache, newCacheTail: DataHandle ¬ NIL; WHILE cache # NIL DO dH ¬ cache; cache ¬ cache.next; IF PulsesSince[dH.lastUsed] <= cachePulseOut THEN -- add to new cache -- { dH.next ¬ NIL; IF newCacheTail = NIL THEN newCache ¬ dH ELSE newCacheTail.next ¬ dH; newCacheTail ¬ dH } ELSE -- add to dead list -- { dH.next ¬ deadList; deadList ¬ dH }; ENDLOOP; cache ¬ newCache; }; DestroyDeadHandles: PROC [deadList: DataHandle] ~ { dH: DataHandle; WHILE deadList # NIL DO dH ¬ deadList; deadList ¬ dH.next; BuryDeadHandle[dH.handle]; ENDLOOP; }; DO DestroyDeadHandles[ GetDeadHandles[] ]; Process.Pause[ cacheSweepInterval ]; ENDLOOP; }; BuryDeadHandle: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; dH.handle ¬ NIL; -- break cycle for GC SendCloseStream[dH]; h.data ¬ NIL }; <> ErrorFinalize: PROC [h: Handle] ~ { ERROR }; Finalize: PROC [h: Handle] ~ { <> dH: DataHandle ~ NARROW[h.data]; IF dH = NIL THEN -- handle is already dead -- RETURN; IF PulsesSince[dH.lastUsed] <= cachePulseOut THEN { CrRPCBackdoor.RenewClientObject[h]; CheckInToCache[h]; RETURN }; BuryDeadHandle[h]; }; CreateClientHandle: CrRPCBackdoor.CreateClientHandleProc <<[remote: CrRPC.RefAddress] RETURNS [Handle]>> ~ { handle: Handle; dH: DataHandle; addr: XNS.Address; IF remote = NIL THEN ERROR CrRPC.Error[NIL, addressInappropriateForClass, "missing remote address"]; WITH remote SELECT FROM rA: REF XNS.Address => addr ¬ rA­; ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL]; IF ( alwaysSmashTheSocket ) OR ( addr.socket = XNS.unknownSocket ) THEN addr.socket ¬ XNSWKS.courier; handle ¬ CheckOutFromCache[addr]; IF handle = NIL THEN { dH ¬ NEW[DataObject ¬ [lastUsed~BasicTime.GetClockPulses[], remote~addr]]; handle ¬ CrRPCBackdoor.NewClientObject[class~$SPP, procs~theClientProcs, data~dH]; }; RETURN [handle]; }; <<>> CrRPCBackdoor.RegisterCreateClientHandleProc[$SPP, CreateClientHandle]; CrRPCBackdoor.RegisterCreateListenerProc[$SPP, CreateListener]; TRUSTED { Process.Detach[ FORK CacheSweeper[] ] }; }.