DIRECTORY Basics USING [Card32FromF, Card16FromH, FFromCard32, FWORD, HFromCard16, HWORD], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses], CrRPC USING [BeginErrorProc, BeginRejectProc, BeginReturnProc, BulkDataCheckAbortProc, BulkDataSink, BulkDataSource, BulkDataValueProc, BulkDataXferProc, CallContinuationProc, CallProc, ContinuationProc, Error, ErrorReason, GetBulkDataXferProcProc, GetCard16, GetHWord, Handle, invalidArgument, noSuchProcedure, noSuchProgram, noSuchVersion, Object, ProcsObject, PutBulkDataXferProcProc, PutCard16, ReadBulkDataStreamProc, ServerProc, SetContinuationProc, SetParametersProc, WriteBulkDataSegmentProc], CrRPCBackdoor USING [AbortHdr, AbortMsgHdr, abortMsgHdrBytes, abortMsgType, BulkDataDescriptorType, CallHdr, CallMsgHdr, callMsgHdrBytes, callMsgType, courierVersionNum, CreateClientHandleProc, CreateListenerProc, LookUpServerProc, MsgHdr, --msgHdrBytes,-- NewClientObject, RegisterCreateClientHandleProc, RegisterCreateListenerProc, RejectHdr, RejectMsgHdr, rejectMsgHdrBytes, rejectMsgType, RenewClientObject, ReturnHdr, ReturnMsgHdr, returnMsgHdrBytes, returnMsgType, SessionHdr, sessionHdrBytes], IO USING [CharsAvail, Close, EndOfStream, Error, GetBlock, PutBlock, STREAM, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock], Process USING [Detach, Pause, SecondsToTicks, Ticks], RefText USING [ObtainScratch, ReleaseScratch], Rope USING [ROPE], RuntimeError USING [BoundsFault], XNS USING [Address, Socket, unknownSocket], XNSSPPTypes USING [bulkDataSST, defaultSST, endReplySST, endSST, SubSequenceType], XNSStream USING [ConnectionClosed, Create, CreateListener, FlushInput, GetStatus, Listener, ListenerProc, SendAttention, SendClose, SendCloseReply, SendEndOfMessage, SendNow, SetSSType, State, SubSequenceType, Timeout, waitForever], XNSWKS USING [courier]; CrRPCSPPImpl: CEDAR MONITOR IMPORTS Basics, BasicTime, CrRPC, CrRPCBackdoor, IO, Process, RefText, RuntimeError, XNSStream ~ { FWORD: TYPE ~ Basics.FWORD; HWORD: TYPE ~ Basics.HWORD; STREAM: TYPE ~ IO.STREAM; FfC: PROC [c: CARD32] RETURNS [FWORD] ~ INLINE { RETURN [Basics.FFromCard32[c]] }; HfC: PROC [c: CARD16] RETURNS [HWORD] ~ INLINE { RETURN [Basics.HFromCard16[c]] }; CfF: PROC [f: FWORD] RETURNS [CARD32] ~ INLINE { RETURN [Basics.Card32FromF[f]] }; CfH: PROC [h: HWORD] RETURNS [CARD16] ~ INLINE { RETURN [Basics.Card16FromH[h]] }; Handle: TYPE ~ CrRPC.Handle; DataHandle: TYPE ~ REF DataObject; DataObject: TYPE ~ RECORD [ next: DataHandle _ NIL, handle: Handle _ NIL, lastUsed: BasicTime.Pulses, stream: IO.STREAM _ NIL, inputFlushNeeded: BOOL _ FALSE, remote: XNS.Address, sendBulkData: CrRPC.BulkDataXferProc _ NIL, -- for client handles recvBulkData: CrRPC.BulkDataXferProc _ NIL, -- for client handles continuation: CrRPC.ContinuationProc _ NIL, -- for server handles continuationClientData: REF -- for server handles ]; theClientProcs: REF CrRPC.ProcsObject ~ NEW[ CrRPC.ProcsObject _ [ destroy~CheckInToCache, setParameters~SetParameters, call~Call, getBulkDataSource~ErrorGetBulkDataSource, getBulkDataSink~ErrorGetBulkDataSink, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, readBulkDataStream~ReadBulkDataStream, writeBulkDataSegment~WriteBulkDataSegment, callContinuation~CallContinuation, setContinuation~ErrorSetContinuation, finalize~Finalize ] ]; theServerProcs: REF CrRPC.ProcsObject ~ NEW[ CrRPC.ProcsObject _ [ destroy~CheckInToCache, setParameters~SetParameters, call~ErrorCall, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, putBulkDataSource~ErrorPutBulkDataSource, putBulkDataSink~ErrorPutBulkDataSink, readBulkDataStream~ReadBulkDataStream, writeBulkDataSegment~WriteBulkDataSegment, callContinuation~ErrorCallContinuation, setContinuation~SetContinuation, finalize~ErrorFinalize ] ]; SetParameters: CrRPC.SetParametersProc ~ { dH: DataHandle ~ NARROW[h.data]; hNew _ h; SELECT op FROM $GetRemote => result _ NEW[XNS.Address _ dH.remote]; ENDCASE => ERROR CrRPC.Error[h, unknownOperation, NIL]; }; ErrorCall: CrRPC.CallProc ~ { ERROR CrRPC.Error[h, notClientHandle, "call using server handle"] }; Call: CrRPC.CallProc ~ { dH: DataHandle ~ NARROW[h.data]; ConnectToServer: PROC ~ { sessionHdrOut: CrRPCBackdoor.SessionHdr _ [ lowVersion~HfC[CrRPCBackdoor.courierVersionNum - 1], highVersion~HfC[CrRPCBackdoor.courierVersionNum]]; sessionHdrIn: CrRPCBackdoor.SessionHdr; nRead: NAT; IF dH.stream # NIL THEN ERROR; dH.stream _ XNSStream.Create[remote~dH.remote, getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrOut]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] }; XNSStream.SendNow[dH.stream]; TRUSTED { nRead _ IO.UnsafeGetBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@sessionHdrIn]], startIndex~0, count~CrRPCBackdoor.sessionHdrBytes]] }; dH.inputFlushNeeded _ FALSE; -- ???? Is there EOM on version number pair ???? IF (nRead # CrRPCBackdoor.sessionHdrBytes) OR (CfH[sessionHdrIn.lowVersion] > CrRPCBackdoor.courierVersionNum) OR (CfH[sessionHdrIn.highVersion] < CrRPCBackdoor.courierVersionNum) THEN { SendCloseStream[dH]; ERROR CrRPC.Error[h, courierVersionMismatch, "Courier version error"]; }; }; SendCallMsg: PROC ~ { callMsgHdr: CrRPCBackdoor.CallMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.callMsgType], callHdr~[ tID~[0, 0], pgmNum~FfC[remotePgm], pgmVersion~HfC[remotePgmVersion], procNum~HfC[remoteProc] ] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]] }; IF putArgs # NIL THEN putArgs[h, dH.stream]; XNSStream.SendEndOfMessage[dH.stream]; }; ReadResultMsg: PROC ~ { msgType: HWORD _ CrRPC.GetHWord[dH.stream]; SELECT msgType FROM CrRPCBackdoor.rejectMsgType => { rejectReason: CARDINAL; [] _ CrRPC.GetHWord[dH.stream]; rejectReason _ CrRPC.GetCard16[dH.stream]; [] _ AdvanceInput[h~h, wait~FALSE]; ERROR CrRPC.Error[h, (SELECT rejectReason FROM CrRPC.noSuchProgram => rejectedNoSuchProgram, CrRPC.noSuchVersion => rejectedNoSuchVersion, CrRPC.noSuchProcedure => rejectedNoSuchProcedure, CrRPC.invalidArgument => rejectedInvalidArgument, ENDCASE => rejectedUnspecified), "rejected"]; }; CrRPCBackdoor.returnMsgType => { [] _ CrRPC.GetHWord[dH.stream]; IF getResults # NIL THEN getResults[h, dH.stream]; [] _ AdvanceInput[h~h, wait~FALSE]; }; CrRPCBackdoor.abortMsgType => { errNum: CARDINAL; [] _ CrRPC.GetHWord[dH.stream]; errNum _ CrRPC.GetCard16[dH.stream]; IF getError # NIL THEN getError[h, dH.stream, errNum]; [] _ AdvanceInput[h~h, wait~FALSE]; ERROR CrRPC.Error[h, remoteError, "unexpected remote error"] }; ENDCASE => { SendCloseStream[dH]; ERROR CrRPC.Error[h, protocolError, "bad msgType in response"] }; }; ClientRdySysRecv: PROC ~ { state: XNSStream.State; sst: XNSStream.SubSequenceType; DO [state, sst] _ AdvanceInput[h~h, wait~TRUE]; IF state = open THEN EXIT; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; IF sst # 0 THEN { SmashStream[dH]; ERROR CrRPC.Error[h, protocolError, "sys sst # 0"] }; dH.inputFlushNeeded _ TRUE; }; ClientRdyBDTSend: PROC RETURNS [ok: BOOL] ~ { state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~FALSE]; SELECT state FROM open => { ok _ (IO.CharsAvail[self~dH.stream, wait~FALSE] = 0) }; attention => { ok _ TRUE }; ENDCASE => ERROR; }; ClientRdyBDTRecv: PROC RETURNS [ok: BOOL] ~ { state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~TRUE]; SELECT state FROM open, endOfMessage => { ok _ (sst # 0); dH.inputFlushNeeded _ ok }; attention => { ok _ TRUE }; ENDCASE => ERROR; }; CheckAbortRecv: CrRPC.BulkDataCheckAbortProc ~ { [] _ IO.CharsAvail[self~dH.stream, wait~TRUE]; RETURN [ XNSStream.GetStatus[self~dH.stream, reset~FALSE].state = attention ] }; CheckAbortSend: CrRPC.BulkDataCheckAbortProc ~ { RETURN[ (XNSStream.GetStatus[self~dH.stream, reset~FALSE].state # open) OR (IO.CharsAvail[self~dH.stream, wait~FALSE] > 0) ] }; BEGIN dH.sendBulkData _ NIL; dH.recvBulkData _ NIL; BEGIN IF dH.stream # NIL THEN { ENABLE XNSStream.ConnectionClosed, CrRPC.Error, IO.Error => { SmashStream[dH]; CONTINUE }; [] _ AdvanceInput[h~h, wait~FALSE]; -- check for close SendCallMsg[]; GOTO Done }; { -- dH.stream = NIL -- ENABLE XNSStream.ConnectionClosed, IO.Error => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, "communication failure"] }; ConnectToServer[]; SendCallMsg[]; GOTO Done }; EXITS Done => NULL; END; BEGIN ENABLE { XNSStream.ConnectionClosed, IO.Error => { SmashStream[dH]; ERROR CrRPC.Error[h, communicationFailure, "communication failure"] }; }; SELECT TRUE FROM (dH.sendBulkData # NIL) => { IF ClientRdyBDTSend[].ok THEN { XNSStream.SetSSType[dH.stream, XNSSPPTypes.bulkDataSST]; IF dH.sendBulkData[h, dH.stream, CheckAbortSend].abort THEN XNSStream.SendAttention[dH.stream, 1] -- ???? Define 1 ???? ELSE XNSStream.SendEndOfMessage[dH.stream]; XNSStream.SetSSType[dH.stream, XNSSPPTypes.defaultSST]; }; }; (dH.recvBulkData # NIL) => { ENABLE { IO.EndOfStream => { SendCloseStream[dH]; ERROR CrRPC.Error[h, bulkDataError, "end of stream"] }; RuntimeError.BoundsFault => { SendCloseStream[dH]; ERROR CrRPC.Error[h, bulkDataError, "bulk data value bounds fault"] }; }; IF ClientRdyBDTRecv[].ok THEN { IF dH.recvBulkData[h, dH.stream, CheckAbortRecv].abort THEN XNSStream.SendAttention[dH.stream, 1]; -- ???? Define 1 ???? }; }; ENDCASE => { NULL; }; { ENABLE { IO.EndOfStream => { SendCloseStream[dH]; ERROR CrRPC.Error[h, resultsError, "end of stream"] }; RuntimeError.BoundsFault => { SendCloseStream[dH]; ERROR CrRPC.Error[h, resultsError, "result value bounds fault"] }; }; ClientRdySysRecv[]; ReadResultMsg[]; }; END; END; }; -- of Call ServerRdySysRecv: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; DO [state, sst] _ AdvanceInput[h~h, wait~TRUE]; IF state = open THEN EXIT; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"]; dH.inputFlushNeeded _ TRUE; }; ServerRdyBDTRecv: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~TRUE]; SELECT state FROM open, endOfMessage => { IF sst = 0 THEN ERROR CrRPC.Error[h, protocolError, "missing bulk data"]; dH.inputFlushNeeded _ TRUE }; attention => { NULL }; ENDCASE => ERROR; }; ServerRdyBDTSend: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~FALSE]; SELECT state FROM open => { IF IO.CharsAvail[self~dH.stream, wait~FALSE] # 0 THEN ERROR CrRPC.Error[h, protocolError, "client send to bdt source"]; }; attention => { NULL }; ENDCASE => ERROR; }; BeginReturn: CrRPC.BeginReturnProc ~ { dH: DataHandle ~ NARROW[h.data]; returnMsgHdr: CrRPCBackdoor.ReturnMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.returnMsgType], returnHdr~[tID~[hi~0, lo~0]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@returnMsgHdr]], startIndex~0, count~CrRPCBackdoor.returnMsgHdrBytes]] }; }; BeginError: CrRPC.BeginErrorProc ~{ dH: DataHandle ~ NARROW[h.data]; abortMsgHdr: CrRPCBackdoor.AbortMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.abortMsgType], abortHdr~[tID~[0,0], errNum~HfC[errNum]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@abortMsgHdr]], startIndex~0, count~CrRPCBackdoor.abortMsgHdrBytes]] }; }; BeginReject: CrRPC.BeginRejectProc ~ { dH: DataHandle ~ NARROW[h.data]; rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.rejectMsgType], rejectHdr~[tID~[0,0], rejectReason~HfC[rejectReason]] ]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~[base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] }; }; ListenerProc: XNSStream.ListenerProc ~ { sessionHdr: CrRPCBackdoor.SessionHdr; callMsgHdr: CrRPCBackdoor.CallMsgHdr; desiredPgm: CARD32; desiredPgmVersion: CARD16; desiredProc: CARD16; serverProc: CrRPC.ServerProc; nBytes: INT; h: Handle; dH: DataHandle; BEGIN ENABLE { XNSStream.ConnectionClosed => GOTO Out; CrRPC.Error, IO.Error, XNSStream.Timeout, RuntimeError.BoundsFault => GOTO Close; }; TRUSTED { nBytes _ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]]}; IF nBytes # CrRPCBackdoor.sessionHdrBytes THEN GOTO Close; IF (CfH[sessionHdr.lowVersion] > CrRPCBackdoor.courierVersionNum) OR (CfH[sessionHdr.highVersion] < CrRPCBackdoor.courierVersionNum) THEN { GOTO Close }; sessionHdr.lowVersion _ sessionHdr.highVersion _ HfC[CrRPCBackdoor.courierVersionNum]; TRUSTED { IO.UnsafePutBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCBackdoor.sessionHdrBytes]] }; XNSStream.SendNow[stream]; dH _ NEW[DataObject _ [lastUsed~, stream~stream, remote~remote]]; h _ NEW[CrRPC.Object _ [class~$SPP, kind~server, procs~theServerProcs, data~dH, clientData~NIL]]; DO ServerRdySysRecv[h]; TRUSTED { nBytes _ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@callMsgHdr]], startIndex~0, count~CrRPCBackdoor.callMsgHdrBytes]]}; IF (nBytes # CrRPCBackdoor.callMsgHdrBytes) OR (callMsgHdr.msgHdr.msgType # CrRPCBackdoor.callMsgType) THEN { GOTO Close }; desiredPgm _ CfF[callMsgHdr.callHdr.pgmNum]; desiredPgmVersion _ CfH[callMsgHdr.callHdr.pgmVersion]; desiredProc _ CfH[callMsgHdr.callHdr.procNum]; serverProc _ CrRPCBackdoor.LookUpServerProc[desiredPgm, desiredPgmVersion]; SELECT serverProc FROM NIL => -- Don't have server, reject the call -- { rejectMsgHdr: CrRPCBackdoor.RejectMsgHdr _ [ msgHdr~[msgType~CrRPCBackdoor.rejectMsgType], rejectHdr~[tID~[0,0], rejectReason~HfC[CrRPC.noSuchProgram]] ]; TRUSTED { IO.UnsafePutBlock[stream, [base~LOOPHOLE[LONG[@rejectMsgHdr]], startIndex~0, count~CrRPCBackdoor.rejectMsgHdrBytes]] }; XNSStream.SendEndOfMessage[stream] }; ENDCASE => -- Serve the call -- { serverProc[h, stream, desiredPgm, desiredPgmVersion, desiredProc, BeginReturn, BeginError, BeginReject]; IF dH.stream = NIL THEN GOTO Smashed; -- client caught error XNSStream.SendEndOfMessage[stream]; IF dH.continuation # NIL THEN { newStream: IO.STREAM _ dH.continuation[dH.stream, dH.continuationClientData]; IF newStream = NIL THEN GOTO Closed; IF newStream # dH.stream THEN GOTO Smashed; }; }; ENDLOOP; END; EXITS Close => { [] _ XNSStream.SendClose[stream ! XNSStream.ConnectionClosed => CONTINUE]; IO.Close[self~stream, abort~TRUE]; }; Out => { IO.Close[self~stream, abort~TRUE]; }; Closed, Smashed => NULL; }; ListenerValue: TYPE ~ REF ListenerValueObject; ListenerValueObject: TYPE ~ RECORD [ next: ListenerValue, socket: XNS.Socket, listener: XNSStream.Listener ]; listeners: ListenerValue _ NIL; CreateListener: ENTRY CrRPCBackdoor.CreateListenerProc ~ { newListener: XNSStream.Listener; socket: XNS.Socket _ XNS.unknownSocket; IF local # NIL THEN WITH local SELECT FROM rA: REF XNS.Address => socket _ rA.socket; rS: REF XNS.Socket => socket _ rS^; ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL]; IF socket = XNS.unknownSocket THEN socket _ XNSWKS.courier; FOR p: ListenerValue _ listeners, p.next WHILE p # NIL DO IF p.socket = socket THEN RETURN; ENDLOOP; newListener _ XNSStream.CreateListener[socket~socket, worker~ListenerProc, getTimeout~XNSStream.waitForever, putTimeout~XNSStream.waitForever]; listeners _ NEW[ListenerValueObject _ [next~listeners, socket~socket, listener~newListener]]; }; AdvanceInput: PROC [h: Handle, wait: BOOL _ FALSE] RETURNS [state: XNSStream.State, ssType: XNSStream.SubSequenceType] ~ { dH: DataHandle ~ NARROW[h.data]; sawBreak: BOOL _ FALSE; IF dH.inputFlushNeeded THEN { [] _ XNSStream.FlushInput[self~dH.stream, wait~TRUE]; dH.inputFlushNeeded _ FALSE }; DO [state~state, ssType~ssType] _ XNSStream.GetStatus[self~dH.stream, reset~FALSE]; IF (ssType = XNSSPPTypes.endSST) OR (ssType = XNSSPPTypes.endReplySST) THEN { ReplyCloseStream[dH]; ERROR CrRPC.Error[h, protocolError, "remote close"] }; SELECT state FROM open => { IF NOT wait THEN RETURN; IF IO.CharsAvail[self~dH.stream, wait~TRUE] > 0 THEN RETURN; LOOP }; attention => RETURN; endOfMessage => { IF sawBreak THEN RETURN; sawBreak _ TRUE }; ssTypeChange => { sawBreak _ TRUE }; ENDCASE => ERROR; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; }; SmashStream: PROC [dH: DataHandle] ~ { IF dH.stream # NIL THEN { IO.Close[self~dH.stream, abort~TRUE]; dH.stream _ NIL }; }; SendCloseStream: PROC [dH: DataHandle] ~ { IF dH.stream # NIL THEN { [] _ XNSStream.SendClose[dH.stream ! XNSStream.ConnectionClosed => CONTINUE]; IO.Close[self~dH.stream, abort~TRUE]; dH.stream _ NIL }; }; ReplyCloseStream: PROC [dH: DataHandle] ~ { IF dH.stream # NIL THEN { [] _ XNSStream.SendCloseReply[dH.stream ! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE]; IO.Close[self~dH.stream, abort~TRUE]; dH.stream _ NIL }; }; ErrorPutBulkDataSource: CrRPC.PutBulkDataXferProcProc ~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSource on server handle"] }; PutBulkDataSource: CrRPC.PutBulkDataXferProcProc ~ { dH: DataHandle ~ NARROW[h.data]; IF proc = NIL THEN { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]]; TRUSTED { dH.sendBulkData _ proc }}; }; ErrorPutBulkDataSink: CrRPC.PutBulkDataXferProcProc ~ { ERROR CrRPC.Error[h, notClientHandle, "putBDTSink on server handle"] }; PutBulkDataSink: CrRPC.PutBulkDataXferProcProc ~ { dH: DataHandle ~ NARROW[h.data]; IF proc = NIL THEN { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]]; TRUSTED { dH.recvBulkData _ proc }}; }; ErrorGetBulkDataSource: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { ERROR CrRPC.Error[h, notServerHandle, "GetBDTSource on client handle"] }; GetBulkDataSource: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; typeCode _ CrRPC.GetCard16[s]; SELECT typeCode FROM ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => { bulkDataSource _ NIL }; ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSource _ ServerRecvBulkData }; ORD[CrRPCBackdoor.BulkDataDescriptorType.active], ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => { ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] }; ENDCASE => { ERROR CrRPC.Error[h, protocolError, "bad bulkDataSource type"] }; }; ServerRecvBulkData: CrRPC.BulkDataXferProc ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT ~ RefText.ObtainScratch[bufSize]; nBytes: INT; state: XNSStream.State; ssType: XNSSPPTypes.SubSequenceType; abort _ FALSE; ServerRdyBDTRecv[h]; DO SELECT nBytes _ IO.GetBlock[self~dH.stream, block~b] FROM 0 => { [state~state, ssType~ssType] _ XNSStream.GetStatus[dH.stream, TRUE]; SELECT state FROM attention => { abort _ TRUE; EXIT }; endOfMessage => { dH.inputFlushNeeded _ FALSE; EXIT }; ssTypeChange => SELECT ssType FROM 0 => ERROR CrRPC.Error[h, protocolError, "bdt no eom"]; XNSSPPTypes.endSST, XNSSPPTypes.endReplySST => ERROR CrRPC.Error[h, remoteClose, "remote close"]; ENDCASE => NULL; -- ???? is this an error ???? open => NULL; ENDCASE => ERROR; }; ENDCASE => { IO.PutBlock[self~s, block~b, count~nBytes]; }; IF checkAbort[h].abort THEN { XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ???? EXIT }; ENDLOOP; RefText.ReleaseScratch[b]; }; ErrorGetBulkDataSink: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { ERROR CrRPC.Error[h, notServerHandle, "GetBDTSink on client handle"] }; GetBulkDataSink: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; typeCode _ CrRPC.GetCard16[s]; SELECT typeCode FROM ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => { bulkDataSink _ NIL }; ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSink _ ServerSendBulkData }; ORD[CrRPCBackdoor.BulkDataDescriptorType.active], ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => { ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] }; ENDCASE => { ERROR CrRPC.Error[h, protocolError, "bad bulkDataSink type"] }; }; ServerSendBulkData: CrRPC.BulkDataXferProc ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT ~ RefText.ObtainScratch[bufSize]; nBytes: INT; state: XNSStream.State; ssType: XNSSPPTypes.SubSequenceType; abort _ FALSE; ServerRdyBDTSend[h]; XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.bulkDataSST]; { DO [state~state, ssType~ssType] _ XNSStream.GetStatus[dH.stream, FALSE]; SELECT state FROM open => NULL; attention, endOfMessage, ssTypeChange => { abort _ TRUE; GOTO FinishWithEOM }; ENDCASE => ERROR; IF checkAbort[h].abort THEN { GOTO FinishWithAttention }; SELECT nBytes _ IO.GetBlock[self~s, block~b] FROM 0 => { GOTO FinishWithEOM }; ENDCASE => { IO.PutBlock[self~dH.stream, block~b, count~nBytes]; }; ENDLOOP; EXITS FinishWithAttention => XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ???? FinishWithEOM => XNSStream.SendEndOfMessage[dH.stream]; }; RefText.ReleaseScratch[b]; XNSStream.SetSSType[self~dH.stream, ssType~XNSSPPTypes.defaultSST]; }; nextSegment: CARD16 ~ 0; lastSegment: CARD16 ~ 1; ReadBulkDataStream: CrRPC.ReadBulkDataStreamProc ~ { ENABLE { IO.EndOfStream, IO.Error, XNSStream.ConnectionClosed => ERROR CrRPC.Error[NIL, communicationFailure, "communication failure"]; RuntimeError.BoundsFault => ERROR CrRPC.Error[NIL, bulkDataError, "bulk data value bounds fault"]; }; segmentKind: CARD16; len: CARD16; DO IF checkAbort[h] THEN RETURN[FALSE]; segmentKind _ CrRPC.GetCard16[s]; len _ CrRPC.GetCard16[s]; THROUGH [1..len] DO IF checkAbort[h] THEN RETURN[FALSE]; IF readValue[s].abort THEN RETURN[TRUE]; ENDLOOP; IF segmentKind = lastSegment THEN RETURN[FALSE]; ENDLOOP; }; WriteBulkDataSegment: CrRPC.WriteBulkDataSegmentProc ~ { ENABLE IO.EndOfStream, IO.Error, XNSStream.ConnectionClosed => ERROR CrRPC.Error[NIL, communicationFailure, "communication failure"]; CrRPC.PutCard16[s, (IF n > 0 THEN nextSegment ELSE lastSegment)]; CrRPC.PutCard16[s, n]; abort _ heAborted _ FALSE; THROUGH [1..n] DO IF checkAbort[h] THEN { heAborted _ TRUE; RETURN }; IF writeValue[s].abort THEN { abort _ TRUE; RETURN }; ENDLOOP; }; ErrorCallContinuation: CrRPC.CallContinuationProc ~ { ERROR CrRPC.Error[h, notClientHandle, "CallContinuation on server handle"] }; CallContinuation: CrRPC.CallContinuationProc ~ { dH: DataHandle ~ NARROW[h.data]; newStream: IO.STREAM; { ENABLE XNSStream.ConnectionClosed, XNSStream.Timeout, IO.EndOfStream, IO.Error => ERROR CrRPC.Error[NIL, communicationFailure, NIL]; newStream _ (IF proc # NIL THEN proc[dH.stream, clientData] ELSE dH.stream); }; IF (newStream # NIL) AND (newStream # dH.stream) THEN ERROR CrRPC.Error[NIL, notImplemented, NIL]; dH.stream _ newStream; }; ErrorSetContinuation: CrRPC.SetContinuationProc ~ { ERROR CrRPC.Error[h, notServerHandle, "SetContinuation on client handle"] }; SetContinuation: CrRPC.SetContinuationProc ~ { dH: DataHandle ~ NARROW[h.data]; dH.continuation _ proc; dH.continuationClientData _ clientData }; cache: DataHandle _ NIL; cachePulseOut: BasicTime.Pulses ~ BasicTime.MicrosecondsToPulses[60 * 1000000]; cacheSweepInterval: Process.Ticks ~ Process.SecondsToTicks[21]; PulsesSince: PROC [then: BasicTime.Pulses] RETURNS [BasicTime.Pulses] ~ { RETURN [ BasicTime.GetClockPulses[] - then ] }; CheckOutFromCache: ENTRY PROC [remote: XNS.Address] RETURNS [h: Handle _ NIL] ~ { dH: DataHandle _ cache; prevH: DataHandle _ NIL; WHILE dH # NIL DO IF dH.remote = remote THEN { IF prevH = NIL THEN cache _ dH.next ELSE prevH.next _ dH.next; h _ dH.handle; dH.handle _ NIL; -- for finalization dH.lastUsed _ BasicTime.GetClockPulses[]; EXIT }; prevH _ dH; dH _ dH.next; ENDLOOP; }; CheckInToCache: ENTRY PROC [h: Handle] ~ { dH: DataHandle _ NARROW[h.data]; dH.handle _ h; dH.lastUsed _ BasicTime.GetClockPulses[]; dH.next _ cache; cache _ dH }; CacheSweeper: PROC ~ { GetDeadHandles: ENTRY PROC RETURNS [deadList: DataHandle _ NIL] ~ { dH, newCache, newCacheTail: DataHandle _ NIL; WHILE cache # NIL DO dH _ cache; cache _ cache.next; IF PulsesSince[dH.lastUsed] <= cachePulseOut THEN -- add to new cache -- { dH.next _ NIL; IF newCacheTail = NIL THEN newCache _ dH ELSE newCacheTail.next _ dH; newCacheTail _ dH } ELSE -- add to dead list -- { dH.next _ deadList; deadList _ dH }; ENDLOOP; cache _ newCache; }; DestroyDeadHandles: PROC [deadList: DataHandle] ~ { dH: DataHandle; WHILE deadList # NIL DO dH _ deadList; deadList _ dH.next; BuryDeadHandle[dH.handle]; ENDLOOP; }; DO DestroyDeadHandles[ GetDeadHandles[] ]; Process.Pause[ cacheSweepInterval ]; ENDLOOP; }; BuryDeadHandle: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; dH.handle _ NIL; -- break cycle for GC SendCloseStream[dH]; h.data _ NIL }; ErrorFinalize: PROC [h: Handle] ~ { ERROR }; Finalize: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH = NIL THEN -- handle is already dead -- RETURN; IF PulsesSince[dH.lastUsed] <= cachePulseOut THEN { CrRPCBackdoor.RenewClientObject[h]; CheckInToCache[h]; RETURN }; BuryDeadHandle[h]; }; CreateClientHandle: CrRPCBackdoor.CreateClientHandleProc ~ { handle: Handle; dH: DataHandle; addr: XNS.Address; IF remote = NIL THEN ERROR CrRPC.Error[NIL, addressInappropriateForClass, "missing remote address"]; WITH remote SELECT FROM rA: REF XNS.Address => addr _ rA^; ENDCASE => ERROR CrRPC.Error[NIL, addressInappropriateForClass, NIL]; IF addr.socket = XNS.unknownSocket THEN addr.socket _ XNSWKS.courier; handle _ CheckOutFromCache[addr]; IF handle = NIL THEN { dH _ NEW[DataObject _ [lastUsed~BasicTime.GetClockPulses[], remote~addr]]; handle _ CrRPCBackdoor.NewClientObject[class~$SPP, procs~theClientProcs, data~dH]; }; RETURN [handle]; }; CrRPCBackdoor.RegisterCreateClientHandleProc[$SPP, CreateClientHandle]; CrRPCBackdoor.RegisterCreateListenerProc[$SPP, CreateListener]; TRUSTED { Process.Detach[ FORK CacheSweeper[] ] }; }. CrRPCSPPImpl.mesa Copyright c 1986 by Xerox Corporation. All rights reserved. Demers, January 8, 1987 3:26:27 pm PST Courier runtime support for SPP transport. Copied Types and Constants Handle Objects Parameters Class-specific operations: $GetRemote returns a REF to the XNS.Address of the remote site associated with this handle. (Works for either client or server handles). [h: Handle, op: ATOM, argument: REF] RETURNS [hNew: Handle, result: REF] Client Call Notes: Before raising CrRPC.Error[h, ...] require that the stream NARROW[h.data].stream be in a reasonable state (possibly NIL). The ONLY (expected) error that may be passed to clients is CrRPC.Error  stream class-specific errors MUST be caught here. We catch RuntimeError.BoundsFault on read operations, because that could be raised by any getResults or getBulkData proc, and catching it here enables us to fix up the state of the transport stream. [h: Handle, remotePgm: CARD32, remotePgmVersion: CARD16, remoteProc: CARD16, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc] Connect to the remote server. If the connect fails, dH.stream will be left NIL. The version-number pair is deliberately set to include more than one version so the server will be forced to return his version number pair immediately. ERRORS: XNSStream.ConnectionClosed, CrRPC.Error, IO.Error. Send the call message. ERRORS: XNSStream.ConnectionClosed, IO.Error ERRORS: CrRPC.Error, XNSStream.ConnectionClosed, IO.Error, IO.EndOfStream Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE. ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput, GetStatus) ???? SAME AS ServerRdySysRecv! ???? Prepare to send bulk data  (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case nothing arrives on the stream. If data arrives, it's a reject message from the other end, and the bulk data transfer should not be attempted. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately). ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput, CharsAvail) Prepare to receive bulk data  (try to) check whether the other side has sent us a reject / abort message, closed the stream, etc. In the normal case bulk data arrives on the stream. ERRORS: CrRPC.Error, XNSStream.ConnectionClosed (from AdvanceInput) Logic of Courier call begins here: At this point we have an open connection to the server and a call message has been sent successfully. Bulk data transfer from client to server. Bulk data transfer from server to client. There's bulk data (or an attention) in the stream. No bulk data transfer in either direction. [] _ AdvanceInput[h~h, wait~FALSE]; -- done by ReadResultMsg Server Process Notes: Any ERROR  CrRPC.Error, XNSStream.ConnectionClosed, IO.Error, IO.EndOfStream, RuntimeError.BoundsFault  eventually causes the transport to be discarded without attempting to serve any more calls on it. Prepare to receive a system message. Leave dH.inputFlushNeeded = TRUE. ERRORS: CrRPC.Error, XNSStream.ConnectionClosed Prepare to receive bulk data  (try to) check whether the other side has sent us anything besides bulk data  closed the stream, etc. In the normal case bulk data arrives on the stream. ERRORS: CrRPC.Error, XNSStream.ConnectionClosed Prepare to send bulk data  (try to) check whether the other side has closed the stream or aborted the transfer. In the normal case nothing arrives on the stream. If an attention arrives, it's an abort request from the other end, so we let the bulk data transfer proceed (only to be aborted immediately). ERRORS: CrRPC.Error, XNSStream.ConnectionClosed [h: Handle] ERRORS: XNSStream.ConnectionClosed, IO.Error [h: Handle, errNum: CARDINAL] ERRORS: XNSStream.ConnectionClosed, IO.Error [h: Handle, rejectReason: CARDINAL] ERRORS: XNSStream.ConnectionClosed, IO.Error [stream: IO.STREAM, remote: XNS.Address] Exchange version numbers Create server handle Process calls until timeout or unrecoverable error Get message; make sure it's a call, get desired program, version and proc number [local: CrRPC.RefAddress] I/O and Marshalling Advance input stream to next significant piece of input, checking for remote close and flushing unread input if needed. ERRORS: XNSStream.ConnectionClosed, CrRPC.Error ERRORS: none. ERRORS: none. ERRORS: none. [h: Handle, s: STREAM, proc: BulkDataXferProc] [h: Handle, s: STREAM, proc: BulkDataXferProc] [h: Handle, s: STREAM, proc: BulkDataXferProc] [h: Handle, s: STREAM, proc: BulkDataXferProc] [h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL] [h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL] Check for abort from other side Check for abort from this side Copy a block of data Bulk Data Streams [h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, readValue: BulkDataValueProc] RETURNS [abort: BOOL] [h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, writeValue: BulkDataValueProc, n: CARDINAL] RETURNS [abort: BOOL, heAborted: BOOL] Continuations [h: Handle, proc: ContinuationProc, clientData: REF] [h: Handle, proc: ContinuationProc, clientData: REF] Cache of Handles We maintain a cache of recently used client handles with their transport intact. If a handle isn't used for awhile, it times out, the transport is closed and the handle destroyed. Handle Creation and Destruction A client has dropped the handle h, which is guaranteed not to be in the cache. [remote: CrRPC.RefAddress] RETURNS [Handle] ΚΛ˜šœ™Icodešœ Οmœ1™˜C—šžœ˜ Kšžœ<˜A——Kšœ˜K˜—š œ˜*Kš œžœžœ&žœ žœ™SK˜Kšœžœ ˜ Kšœ žœ˜Kšœžœžœ"˜-Kšœžœ˜ K˜Kšœ$˜$Kšœžœ˜K˜šž˜šžœ žœ#ž˜9˜Kšœ>žœ˜Dšžœž˜Kšœžœžœ˜$Kšœ(žœžœ˜6šœžœž˜"Kšœžœ-˜7šœ.˜.Kšžœ-˜2—Kšžœžœ‘˜.—Kšœžœ˜ Kšžœžœ˜—K˜—šžœ˜ Kšžœ)˜+K˜——šžœžœ˜Kšœ'‘˜7Kšžœ˜—Kšžœ˜—K˜Kšœ˜K˜—š œžœžœžœ&˜_KšžœB˜GK˜—š œžœžœžœ&˜ZKšœžœ ˜ Kšœ žœ˜Kšœ˜šžœ ž˜šžœ0˜3Kšœžœ˜—šžœ4žœ˜@Kšœ$˜$—šžœ/žœ3˜hKšžœ>˜C—šžœ˜ Kšžœ:˜?——Kšœ˜K˜—š œ˜*Kš œžœžœ&žœ žœ™SK˜Kšœžœ ˜ Kšœ žœ˜Kšœžœžœ"˜-Kšœžœ˜ K˜Kšœ$˜$Kšœžœ˜K˜KšœD˜D˜šž˜™Kšœ>žœ˜Ešžœž˜Kšœžœ˜ šœ*˜*Kšœžœ˜ Kšžœ˜—Kšžœžœ˜——™šžœžœ˜Kšžœ˜——™šžœ žœž˜1˜Kšžœ˜—šžœ˜ Kšžœ1˜3K˜———Kšžœ˜—šž˜˜Kšœ'‘˜7—˜Kšœ&˜&——K˜—K˜KšœC˜CKšœ˜K˜——™Kšœ žœ˜Kšœ žœ˜K˜š œ˜0KšœžœDžœ žœ™nK˜šžœž˜šžœžœ%˜7Kšžœ žœ1˜F—˜Kšžœ žœ1˜F—K˜—Kšœ žœ˜Kšœžœ˜ šž˜Kšžœžœžœžœ˜$Kšœ!˜!Kšœ˜šžœ ž˜Kšžœžœžœžœ˜$Kšžœžœžœžœ˜(Kšžœ˜—Kšžœžœžœžœ˜0Kšžœ˜—K˜—K˜š œ ˜4Kš œžœHžœžœ žœ žœ™K˜šžœžœžœ%˜>Kšžœ žœ1˜F—Kšœžœžœ žœ˜AKšœ˜Kšœžœ˜šžœž˜Kšžœžœžœžœ˜3Kšžœžœ žœžœ˜5Kšžœ˜—Kšœ˜——™ š œ ˜5KšžœH˜MK˜—š œ˜,Kšœ0žœ™4K˜Kšœžœ ˜ Kšœ žœžœ˜š œžœ0žœžœ žœ žœžœ˜†Kš œ žœžœžœžœ ˜LK˜—Kšžœžœžœžœžœ žœžœ˜bK˜K˜K˜—š œ˜3KšžœG˜LK˜—š œ˜*Kšœ0žœ™4K˜Kšœžœ ˜ K˜K˜)——™K™΄K˜Kšœžœ˜K˜OK˜?K˜š  œžœžœ˜IKšžœ)˜/—K˜š  œžœžœ žœ žœžœ˜QKšœ˜Kšœžœ˜šžœžœž˜šžœžœ˜Kšžœ žœžœžœ˜>K˜Kšœ žœ‘˜$K˜)Kšžœ˜—K˜Kšžœ˜—K˜K˜—š œžœžœ˜*Kšœžœ ˜ K˜K˜)K˜Kšœ ˜ K˜—š  œžœ˜š  œžœžœžœžœ˜CKšœ)žœ˜-šžœ žœž˜Kšœ ˜ šžœ*˜,šžœ‘œ˜Jšœ žœ˜Jšžœžœžœžœ˜EJšœ˜—šžœ‘œ˜Jšœ˜Jšœ˜——Jšžœ˜—J˜K˜—š œžœ˜3K˜šžœ žœž˜Kšœ#˜#K˜Kšžœ˜—K˜—šž˜K˜'K˜$Kšžœ˜—K˜K˜—š œžœ˜$Kšœžœ ˜ Kšœ žœ‘˜&Jšœ˜Kšœ žœ˜——™Kš  œžœžœ˜,K˜š œžœ˜K™NKšœžœ ˜ Kš žœžœžœ‘œžœ˜5šžœ+žœ˜3Jšœ#˜#Jšœ˜Jšžœ˜ —Jšœ˜K˜K˜—š œ&˜8Kšœžœ™+K˜Kšœ˜Kšœ˜Kšœžœ ˜šžœ žœž˜Kšžœ žœ:˜O—šžœžœž˜Kšœžœžœ˜"Kšžœžœ žœ žœ˜E—Kšžœžœžœžœ ˜EKšœ!˜!šžœ žœžœ˜KšœžœB˜JKšœR˜RK˜—Kšžœ ˜K˜——K™KšœG˜GJšœ?˜?Kšžœžœ˜2K˜—J˜J˜—…—f6œ