<> <> <> <<>> <> <> <<>> <> <> <> DIRECTORY BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses], CrRPC USING [BeginErrorProc, BeginRejectProc, BeginReturnProc, BulkDataCheckAbortProc, BulkDataSink, BulkDataSource, BulkDataXferProc, CallProc, ClientProcs, Error, ErrorReason, GetCARDINAL, Handle, invalidArgument, MarshallProcs, noSuchProcedure, noSuchProgram, noSuchVersion, Object, PutCARDINAL, ServerProc, ServerProcs], CrRPCFriends USING [AbortHdr, abortMsgType, BulkDataDescriptorType, CallHdr, --callHdrBytes,-- callMsgType, courierVersionNum, CreateClientHandleProc, CreateListenerProc, LookUpServerProc, MsgHdr, --msgHdrBytes,-- NewClientObject, RegisterCreateClientHandleProc, RegisterCreateListenerProc, RejectHdr, rejectMsgType, ReturnHdr, returnMsgType, SessionHdr, sessionHdrBytes], Endian USING [BYTE, bytesPerHWord, CardFromF, CardFromH, FFromCard, FWORD, HFromCard, HWORD], IO USING [CharsAvail, Close, EndOfStream, GetBlock, PutBlock, STREAM, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock], PrincOpsUtils USING [LongCopy], Process USING [Detach, Pause, SecondsToTicks, Ticks], RefText USING [New, ObtainScratch], Rope USING [ROPE], XNS USING [Address, Socket, unknownSocket], XNSSPPBuf USING [maxBodyBytes, maxBodyHWords], XNSSPPTypes USING [endReplySST, endSST, SubSequenceType], XNSStream USING [ConnectionClosed, Create, CreateListener, FlushInput, GetStatus, Listener, ListenerProc, Milliseconds, SendAttention, SendClose, SendCloseReply, SendEndOfMessage, SendNow, SetSSType, SetTimeouts, State, SubSequenceType, Timeout], XNSWKS USING [courier]; CrRPCSImpl: CEDAR MONITOR IMPORTS BasicTime, CrRPC, CrRPCFriends, Endian, IO, PrincOpsUtils, Process, RefText, XNSStream ~ { <> CARD: TYPE ~ LONG CARDINAL; BYTE: TYPE ~ Endian.BYTE; FWORD: TYPE ~ Endian.FWORD; HWORD: TYPE ~ Endian.HWORD; bytesPerWord: CARDINAL ~ Endian.bytesPerHWord; FfC: PROC [c: CARD] RETURNS [FWORD] ~ INLINE { RETURN [Endian.FFromCard[c]] }; HfC: PROC [c: CARDINAL] RETURNS [HWORD] ~ INLINE { RETURN [Endian.HFromCard[c]] }; CfF: PROC [f: FWORD] RETURNS [CARD] ~ INLINE { RETURN [Endian.CardFromF[f]] }; CfH: PROC [h: HWORD] RETURNS [CARDINAL] ~ INLINE { RETURN [Endian.CardFromH[h]] }; Handle: TYPE ~ CrRPC.Handle; bufferBodyBytes: CARDINAL ~ XNSSPPBuf.maxBodyBytes; bufferBodyHWords: CARDINAL ~ XNSSPPBuf.maxBodyHWords; <<>> <> HWords: TYPE ~ MACHINE DEPENDENT RECORD [ hWords: ARRAY [0 .. bufferBodyHWords) OF HWORD ]; DataHandle: TYPE ~ REF DataObject; DataObject: TYPE ~ RECORD [ next: DataHandle _ NIL, handle: Handle _ NIL, lastUsed: BasicTime.Pulses, stream: IO.STREAM _ NIL, inputFlushNeeded: BOOL _ FALSE, remote: XNS.Address, timeoutMsec: INT, sendBulkData: CrRPC.BulkDataXferProc _ NIL, -- for client handles recvBulkData: CrRPC.BulkDataXferProc _ NIL, -- for client handles b: REF TEXT _ NIL, hIndex: CARDINAL _ 0, -- index in HWORDs of next data, (in XNS (IDP) packet body) hLength: CARDINAL _ 0, -- length in HWORDs of buffer (XNS (IDP) packet body) odd: BOOL _ FALSE -- TRUE on odd bytes, for GetB / putB ]; theMarshallProcs: REF CrRPC.MarshallProcs ~ NEW[ CrRPC.MarshallProcs _ [ putB~PutB, putH~PutH, putF~PutF, unsafePutBlock~UnsafePutBlock, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, putHAlign~HAlign, getB~GetB, getH~GetH, getF~GetF, unsafeGetBlock~UnsafeGetBlock, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, getHAlign~HAlign ] ]; theClientProcs: REF CrRPC.ClientProcs ~ NEW[ CrRPC.ClientProcs _ [ setRemote~SetRemote, setTimeout~SetTimeout, setHops~SetHops, destroy~CheckInToCache, finalize~Finalize, call~Call ] ]; theServerProcs: REF CrRPC.ServerProcs ~ NEW[ CrRPC.ServerProcs _ [getRemote~GetRemote] ]; <> <> CallMsg: TYPE ~ REF CallMsgObject; CallMsgObject: TYPE ~ MACHINE DEPENDENT RECORD [ msgHdr: CrRPCFriends.MsgHdr, callHdr: CrRPCFriends.CallHdr ]; callMsgBytes: CARDINAL ~ SIZE[CallMsgObject]*bytesPerWord; -- should be BYTES callMsgHWords: CARDINAL ~ SIZE[CallMsgObject]; <> RejectMsg: TYPE ~ REF RejectMsgObject; RejectMsgObject: TYPE ~ MACHINE DEPENDENT RECORD [ msgHdr: CrRPCFriends.MsgHdr, rejectHdr: CrRPCFriends.RejectHdr ]; rejectMsgBytes: CARDINAL ~ SIZE[RejectMsgObject]*bytesPerWord; -- should be BYTES rejectMsgHWords: CARDINAL ~ SIZE[RejectMsgObject]; <> ReturnMsg: TYPE ~ REF ReturnMsgObject; ReturnMsgObject: TYPE ~ MACHINE DEPENDENT RECORD [ msgHdr: CrRPCFriends.MsgHdr, returnHdr: CrRPCFriends.ReturnHdr ]; returnMsgBytes: CARDINAL ~ SIZE[ReturnMsgObject]*bytesPerWord; -- should be BYTES returnMsgHWords: CARDINAL ~ SIZE[ReturnMsgObject]; <> AbortMsg: TYPE ~ REF AbortMsgObject; AbortMsgObject: TYPE ~ MACHINE DEPENDENT RECORD [ msgHdr: CrRPCFriends.MsgHdr, abortHdr: CrRPCFriends.AbortHdr ]; abortMsgBytes: CARDINAL ~ SIZE[AbortMsgObject]*bytesPerWord; -- should be BYTES abortMsgHWords: CARDINAL ~ SIZE[AbortMsgObject]; <> Call: CrRPC.CallProc <<[h: Handle, remotePgm: CARD, remotePgmVersion: CARDINAL, remoteProc: CARDINAL, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc]>> ~ { dH: DataHandle ~ NARROW[h.data]; ConnectToServer: PROC ~ { <> <> <> vLow: HWORD ~ HfC[CrRPCFriends.courierVersionNum - 1]; vHigh: HWORD ~ HfC[CrRPCFriends.courierVersionNum]; nRead: NAT; versionError: BOOL; sP: LONG POINTER TO CrRPCFriends.SessionHdr; IF dH.stream # NIL THEN ERROR; dH.stream _ XNSStream.Create[ remote~dH.remote, getTimeout~dH.timeoutMsec, putTimeout~dH.timeoutMsec]; TRUSTED { sP _ LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]]; sP^ _ [lowVersion~vLow, highVersion~vHigh] }; dH.b.length _ CrRPCFriends.sessionHdrBytes; [] _ IO.PutBlock[dH.stream, dH.b, 0, CrRPCFriends.sessionHdrBytes]; XNSStream.SendNow[dH.stream]; nRead _ IO.GetBlock[dH.stream, dH.b, 0, CrRPCFriends.sessionHdrBytes]; dH.inputFlushNeeded _ FALSE; -- ???? Is there EOM on version number pair ???? TRUSTED { versionError _ (nRead # CrRPCFriends.sessionHdrBytes) OR (CfH[sP.lowVersion] > vHigh) OR (CfH[sP.highVersion] < vHigh) }; IF versionError THEN { SendCloseStream[dH]; ERROR CrRPC.Error[h, courierVersionMismatch, "version exchange error"]; }; }; SendCallMsg: PROC ~ { <> <> TRUSTED { cP: LONG POINTER TO CallMsgObject ~ LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]]; cP^ _ [ msgHdr~[msgType~CrRPCFriends.callMsgType], callHdr~[ tID~HfC[0], pgmNum~FfC[remotePgm], pgmVersion~HfC[remotePgmVersion], procNum~HfC[remoteProc] ] ]; }; dH.hIndex _ callMsgHWords; dH.hLength _ bufferBodyHWords; IF putArgs # NIL THEN putArgs[h]; PutBuf[dH]; XNSStream.SendEndOfMessage[dH.stream]; }; ReadResultMsg: PROC ~ { <> ENABLE IO.EndOfStream => { ERROR CrRPC.Error[h, resultsTooShort, "results too short"] }; msgType: CARDINAL; GetBuf[dH]; msgType _ CrRPC.GetCARDINAL[h]; SELECT msgType FROM CrRPCFriends.rejectMsgType => { rejectReason: CARDINAL; [] _ GetH[h]; rejectReason _ CrRPC.GetCARDINAL[h]; [] _ AdvanceInput[h~h, wait~FALSE]; ERROR CrRPC.Error[h, (SELECT rejectReason FROM CrRPC.noSuchProgram => rejectedNoSuchProgram, CrRPC.noSuchVersion => rejectedNoSuchVersion, CrRPC.noSuchProcedure => rejectedNoSuchProcedure, CrRPC.invalidArgument => rejectedInvalidArgument, ENDCASE => rejectedUnspecified), "rejected"]; }; CrRPCFriends.returnMsgType => { [] _ GetH[h]; IF getResults # NIL THEN getResults[h]; [] _ AdvanceInput[h~h, wait~FALSE]; }; CrRPCFriends.abortMsgType => { errNum: CARDINAL; [] _ GetH[h]; errNum _ CrRPC.GetCARDINAL[h]; IF getError # NIL THEN getError[h, errNum]; [] _ AdvanceInput[h~h, wait~FALSE]; ERROR CrRPC.Error[h, remoteError, "unexpected remote error"] }; ENDCASE => { SendCloseStream[dH]; ERROR CrRPC.Error[h, protocolError, "bad msgType in response"] }; }; ClientRdySysRecv: PROC ~ { <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; DO [state, sst] _ AdvanceInput[h~h, wait~TRUE]; IF state = open THEN EXIT; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"]; dH.inputFlushNeeded _ TRUE; }; ClientRdyBDTSend: PROC RETURNS [ok: BOOL] ~ { <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~FALSE]; SELECT state FROM open => { ok _ (IO.CharsAvail[self~dH.stream, wait~FALSE] = 0) }; attention => { ok _ TRUE }; ENDCASE => ERROR; }; ClientRdyBDTRecv: PROC RETURNS [ok: BOOL] ~ { <> <> state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~TRUE]; SELECT state FROM open, endOfMessage => { ok _ (sst # 0); dH.inputFlushNeeded _ ok }; attention => { ok _ TRUE }; ENDCASE => ERROR; }; CheckAbortRecv: CrRPC.BulkDataCheckAbortProc ~ { [] _ IO.CharsAvail[self~dH.stream, wait~TRUE]; RETURN [ XNSStream.GetStatus[self~dH.stream, reset~FALSE].state = attention ] }; CheckAbortSend: CrRPC.BulkDataCheckAbortProc ~ { RETURN[ (XNSStream.GetStatus[self~dH.stream, reset~FALSE].state # open) OR (IO.CharsAvail[self~dH.stream, wait~FALSE] > 0) ] }; <> BEGIN dH.sendBulkData _ NIL; dH.recvBulkData _ NIL; BEGIN IF dH.stream # NIL THEN { ENABLE XNSStream.ConnectionClosed, XNSStream.Timeout, CrRPC.Error => { IF dH.stream # NIL THEN { IO.Close[dH.stream]; dH.stream _ NIL }; CONTINUE }; [] _ AdvanceInput[h~h, wait~FALSE]; -- check for close SendCallMsg[]; GOTO Done }; { -- dH.stream = NIL -- ENABLE XNSStream.ConnectionClosed, XNSStream.Timeout => { IF dH.stream # NIL THEN { IO.Close[dH.stream]; dH.stream _ NIL }; ERROR CrRPC.Error[h, communicationFailure, "communication failure"] }; ConnectToServer[]; SendCallMsg[]; GOTO Done }; EXITS Done => NULL; END; <> BEGIN ENABLE { XNSStream.ConnectionClosed, XNSStream.Timeout => { IF dH.stream = NIL THEN ERROR; -- can't happen IO.Close[dH.stream]; dH.stream _ NIL; ERROR CrRPC.Error[h, communicationFailure, "communication failure"] }; }; SELECT TRUE FROM (dH.sendBulkData # NIL) => { <> IF ClientRdyBDTSend[].ok THEN { XNSStream.SetSSType[dH.stream, 1]; -- ???? Define 1 ???? IF dH.sendBulkData[h, dH.stream, CheckAbortSend].abort THEN XNSStream.SendAttention[dH.stream, 1] -- ???? Define 1 ???? ELSE XNSStream.SendEndOfMessage[dH.stream]; XNSStream.SetSSType[dH.stream, 0]; -- ???? Define 0 ???? }; }; (dH.recvBulkData # NIL) => { <> IF ClientRdyBDTRecv[].ok THEN { <> IF dH.recvBulkData[h, dH.stream, CheckAbortRecv].abort THEN XNSStream.SendAttention[dH.stream, 1]; -- ???? Define 1 ???? }; }; ENDCASE => { <> NULL; }; ClientRdySysRecv[]; ReadResultMsg[]; <<[] _ AdvanceInput[h~h, wait~FALSE]; -- done by ReadResultMsg>> END; END; }; -- of Call <> serverGetTimeout: XNSStream.Milliseconds ~ 90000; serverPutTimeout: XNSStream.Milliseconds ~ 20000; ServerRdySysRecv: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; DO [state, sst] _ AdvanceInput[h~h, wait~TRUE]; IF state = open THEN EXIT; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; IF sst # 0 THEN ERROR CrRPC.Error[h, protocolError, "sys sst # 0"]; dH.inputFlushNeeded _ TRUE; }; ServerRdyBDTRecv: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~TRUE]; SELECT state FROM open, endOfMessage => { IF sst = 0 THEN ERROR CrRPC.Error[h, protocolError, "missing bulk data"]; dH.inputFlushNeeded _ TRUE }; attention => { NULL }; ENDCASE => ERROR; }; ServerRdyBDTSend: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; state: XNSStream.State; sst: XNSStream.SubSequenceType; [state, sst] _ AdvanceInput[h~h, wait~FALSE]; SELECT state FROM open => { IF IO.CharsAvail[self~dH.stream, wait~FALSE] # 0 THEN ERROR CrRPC.Error[h, protocolError, "client send to bdt source"]; }; attention => { NULL }; ENDCASE => ERROR; }; ResetBuffer: PROC [dH: DataHandle] ~ INLINE { dH.hIndex _ dH.hLength _ 0 }; BeginReturn: CrRPC.BeginReturnProc <<[h: Handle]>> ~ { dH: DataHandle ~ NARROW[h.data]; ResetBuffer[dH]; PutH[h, HfC[CrRPCFriends.returnMsgType]]; -- MsgHdr.msgType PutH[h, HfC[0]]; -- ReturnHdr.tID }; BeginError: CrRPC.BeginErrorProc <<[h: Handle, errNum: CARDINAL]>> ~{ dH: DataHandle ~ NARROW[h.data]; ResetBuffer[dH]; PutH[h, HfC[CrRPCFriends.abortMsgType]]; -- MsgHdr.msgType PutH[h, HfC[0]]; -- AbortHdr.tID PutH[h, HfC[errNum]]; -- AbortHdr.errNum }; BeginReject: CrRPC.BeginRejectProc <<[h: Handle, rejectReason: CARDINAL]>> ~ { dH: DataHandle ~ NARROW[h.data]; ResetBuffer[dH]; PutH[h, HfC[CrRPCFriends.rejectMsgType]]; -- MsgHdr.msgType PutH[h, HfC[0]]; -- RejectHdr.tID PutH[h, HfC[rejectReason]]; -- RejectHdr.rejectReason }; ListenerProc: XNSStream.ListenerProc <<[stream: IO.STREAM, remote: XNS.Address]>> ~ { sessionHdr: CrRPCFriends.SessionHdr; callMsgObject: CallMsgObject; rejectMsgObject: RejectMsgObject; desiredPgm: CARD; desiredPgmVersion: CARDINAL; desiredProc: CARDINAL; serverProc: CrRPC.ServerProc; nBytes: INT; h: Handle; dH: DataHandle; b: REF TEXT; BEGIN ENABLE XNSStream.ConnectionClosed, CrRPC.Error, XNSStream.Timeout => GOTO Out; <> TRUSTED { nBytes _ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCFriends.sessionHdrBytes] ! XNSStream.Timeout => {[] _ XNSStream.SendClose[stream]; GOTO Out }]}; IF nBytes # CrRPCFriends.sessionHdrBytes THEN GOTO Out; IF (CfH[sessionHdr.lowVersion] > CrRPCFriends.courierVersionNum) OR (CfH[sessionHdr.highVersion] < CrRPCFriends.courierVersionNum) THEN { [] _ XNSStream.SendClose[stream]; GOTO Out }; sessionHdr.lowVersion _ sessionHdr.highVersion _ HfC[CrRPCFriends.courierVersionNum]; TRUSTED { IO.UnsafePutBlock[stream, [base~LOOPHOLE[LONG[@sessionHdr]], count~CrRPCFriends.sessionHdrBytes]] }; XNSStream.SendNow[stream]; <> b _ RefText.New[nChars~bufferBodyBytes]; dH _ NEW[DataObject _ [ next~NIL, handle~NIL, lastUsed~, stream~stream, remote~remote, timeoutMsec~, b~b ]]; h _ NEW[CrRPC.Object _ [class~$SPP, kind~server, marshallProcs~theMarshallProcs, procs~theServerProcs, data~dH, clientData~NIL]]; <> DO <> ServerRdySysRecv[h]; TRUSTED { nBytes _ IO.UnsafeGetBlock[stream, [base~LOOPHOLE[LONG[@callMsgObject]], count~callMsgBytes] ! XNSStream.Timeout => {[] _ XNSStream.SendClose[stream]; GOTO Out}]}; IF (nBytes # callMsgBytes) OR (CfH[callMsgObject.msgHdr.msgType] # CrRPCFriends.callMsgType) THEN { [] _ XNSStream.SendClose[stream]; GOTO Out }; desiredPgm _ CfF[callMsgObject.callHdr.pgmNum]; desiredPgmVersion _ CfH[callMsgObject.callHdr.pgmVersion]; desiredProc _ CfH[callMsgObject.callHdr.procNum]; serverProc _ CrRPCFriends.LookUpServerProc[desiredPgm, desiredPgmVersion]; SELECT serverProc FROM NIL => -- Don't have server, reject the call -- { rejectMsgObject.msgHdr.msgType _ HfC[CrRPCFriends.rejectMsgType]; rejectMsgObject.rejectHdr.tID _ HfC[0]; rejectMsgObject.rejectHdr.rejectReason _ HfC[CrRPC.noSuchProgram]; TRUSTED { IO.UnsafePutBlock[stream, [base~LOOPHOLE[LONG[@rejectMsgObject]], count~rejectMsgBytes]] }; }; ENDCASE => -- Serve the call -- { ResetBuffer[dH]; serverProc[h, desiredPgm, desiredPgmVersion, desiredProc, BeginReturn, BeginError, BeginReject]; IF dH.stream = NIL THEN GOTO Closed; -- client caught error PutBuf[dH]; }; XNSStream.SendEndOfMessage[stream]; ENDLOOP; END; EXITS Out => IO.Close[stream]; Closed => NULL; }; ListenerValue: TYPE ~ REF ListenerValueObject; ListenerValueObject: TYPE ~ RECORD [ next: ListenerValue, socket: XNS.Socket, listener: XNSStream.Listener ]; listeners: ListenerValue _ NIL; CreateListener: ENTRY CrRPCFriends.CreateListenerProc <<[socket: XNS.Socket]>> ~ { newListener: XNSStream.Listener; IF socket = XNS.unknownSocket THEN socket _ XNSWKS.courier; FOR p: ListenerValue _ listeners, p.next WHILE p # NIL DO IF p.socket = socket THEN RETURN; ENDLOOP; newListener _ XNSStream.CreateListener[socket~socket, worker~ListenerProc, getTimeout~serverGetTimeout, putTimeout~serverPutTimeout]; listeners _ NEW[ListenerValueObject _ [next~listeners, socket~socket, listener~newListener]]; }; GetRemote: PROC [h: Handle] RETURNS [XNS.Address] ~ { dH: DataHandle ~ NARROW[h.data]; RETURN [dH.remote] }; <> AdvanceInput: PROC [h: Handle, wait: BOOL _ FALSE] RETURNS [state: XNSStream.State, ssType: XNSStream.SubSequenceType] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; sawBreak: BOOL _ FALSE; IF dH.inputFlushNeeded THEN { [] _ XNSStream.FlushInput[self~dH.stream, wait~TRUE]; dH.inputFlushNeeded _ FALSE }; DO [state~state, ssType~ssType] _ XNSStream.GetStatus[self~dH.stream, reset~FALSE]; IF (ssType = XNSSPPTypes.endSST) OR (ssType = XNSSPPTypes.endReplySST) THEN { ReplyCloseStream[dH]; ERROR CrRPC.Error[h, protocolError, "remote close"] }; SELECT state FROM open => { IF NOT wait THEN RETURN; IF IO.CharsAvail[self~dH.stream, wait~TRUE] > 0 THEN RETURN; LOOP }; attention => RETURN; endOfMessage => { IF sawBreak THEN RETURN; sawBreak _ TRUE }; ssTypeChange => { sawBreak _ TRUE }; ENDCASE => ERROR; [] _ XNSStream.GetStatus[self~dH.stream, reset~TRUE]; ENDLOOP; }; SendCloseStream: PROC [dH: DataHandle] ~ { IF dH.stream # NIL THEN { [] _ XNSStream.SendClose[dH.stream ! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE]; IO.Close[self~dH.stream, abort~TRUE]; dH.stream _ NIL }; }; ReplyCloseStream: PROC [dH: DataHandle] ~ { IF dH.stream # NIL THEN { [] _ XNSStream.SendCloseReply[dH.stream ! XNSStream.ConnectionClosed, XNSStream.Timeout => CONTINUE]; IO.Close[self~dH.stream, abort~TRUE]; dH.stream _ NIL }; }; PutBuf: PROC [dH: DataHandle] ~ { dH.b.length _ 2*dH.hIndex; IO.PutBlock[self~dH.stream, block~dH.b]; dH.hIndex _ 0 }; GetBuf: PROC [dH: DataHandle, minBytes: NAT _ 2] ~ { nRead: NAT ~ IO.GetBlock[self~dH.stream, block~dH.b]; IF nRead < minBytes THEN { ERROR IO.EndOfStream[dH.stream] }; dH.hLength _ nRead / 2; dH.hIndex _ 0; }; PutB: PROC [h: Handle, byte: BYTE] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.odd THEN { dH.b[2*dH.hIndex - 1] _ LOOPHOLE[byte]; dH.odd _ FALSE; } ELSE { IF dH.hIndex >= dH.hLength THEN PutBuf[dH]; dH.b[2*dH.hIndex] _ LOOPHOLE[byte]; dH.hIndex _ dH.hIndex.SUCC; dH.odd _ TRUE; }; }; PutH: PROC [h: Handle, hWord: HWORD] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.hIndex >= dH.hLength THEN PutBuf[dH]; TRUSTED { hP: LONG POINTER TO HWords ~ LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]]; hP.hWords[dH.hIndex] _ hWord }; dH.hIndex _ dH.hIndex.SUCC; dH.odd _ FALSE; }; PutF: PROC [h: Handle, fWord: FWORD] ~ { dH: DataHandle ~ NARROW[h.data]; IF (dH.hIndex+1) >= dH.hLength THEN PutBuf[dH]; TRUSTED { PrincOpsUtils.LongCopy[ from~@fWord, to~(LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]] + dH.hIndex), nwords~2]; }; dH.hIndex _ dH.hIndex + 2; dH.odd _ FALSE }; UnsafePutBlock: UNSAFE PROC [h: Handle, block: IO.UnsafeBlock] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.hIndex > 0 THEN PutBuf[dH]; TRUSTED { IO.UnsafePutBlock[self~dH.stream, block~block] }; }; PutBulkDataSource: PROC [h: Handle, bulkDataSource: CrRPC.BulkDataSource] ~ { dH: DataHandle ~ NARROW[h.data]; IF h.kind # client THEN ERROR CrRPC.Error[h, notClientHandle, "not client handle"]; IF bulkDataSource = NIL THEN { CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.immediate]]; TRUSTED { dH.sendBulkData _ bulkDataSource }}; }; PutBulkDataSink: PROC [h: Handle, bulkDataSink: CrRPC.BulkDataSink] ~ { dH: DataHandle ~ NARROW[h.data]; IF h.kind # client THEN ERROR CrRPC.Error[h, notClientHandle, "not client handle"]; IF bulkDataSink = NIL THEN { CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCARDINAL[h, ORD[CrRPCFriends.BulkDataDescriptorType.immediate]]; TRUSTED { dH.recvBulkData _ bulkDataSink }}; }; GetB: PROC [h: Handle] RETURNS [byte: BYTE] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.odd THEN { byte _ LOOPHOLE[dH.b[2*dH.hIndex - 1]]; dH.odd _ FALSE; } ELSE { IF dH.hIndex >= dH.hLength THEN GetBuf[dH]; byte _ LOOPHOLE[dH.b[2*dH.hIndex]]; dH.hIndex _ dH.hIndex.SUCC; dH.odd _ TRUE; }; }; GetH: PROC [h: Handle] RETURNS [hWord: HWORD] ~ { dH: DataHandle ~ NARROW[h.data]; IF dH.hIndex >= dH.hLength THEN GetBuf[dH, 2]; TRUSTED { hP: LONG POINTER TO HWords ~ LOOPHOLE[LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]]]; hWord _ hP.hWords[dH.hIndex] }; dH.hIndex _ dH.hIndex.SUCC; dH.odd _ FALSE; }; GetF: PROC [h: Handle] RETURNS [fWord: FWORD] ~ { dH: DataHandle ~ NARROW[h.data]; IF (dH.hIndex+1) >= dH.hLength THEN { temp: MACHINE DEPENDENT RECORD [a, b: HWORD]; temp.a _ GetH[h]; temp.b _ GetH[h]; TRUSTED { fWord _ LOOPHOLE[temp] } } ELSE { TRUSTED { PrincOpsUtils.LongCopy[ from~(LOOPHOLE[dH.b, LONG POINTER] + SIZE[TEXT[0]] + dH.hIndex), to~@fWord, nwords~2]; }; dH.hIndex _ dH.hIndex + 2; dH.odd _ FALSE; }; }; UnsafeGetBlock: UNSAFE PROC [h: Handle, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT _ 0] ~ { dH: DataHandle ~ NARROW[h.data]; WHILE (dH.hIndex < dH.hLength) OR dH.odd DO IF block.count = 0 THEN RETURN; TRUSTED { block.base[block.startIndex] _ LOOPHOLE[GetB[h]] }; block.startIndex _ block.startIndex + 1; block.count _ block.count - 1; ENDLOOP; TRUSTED { nBytesRead _ nBytesRead + IO.UnsafeGetBlock[self~dH.stream, block~block] }; }; GetBulkDataSource: PROC [h: Handle] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; IF h.kind # server THEN ERROR CrRPC.Error[h, notServerHandle, "not server handle"]; typeCode _ CrRPC.GetCARDINAL[h]; SELECT typeCode FROM ORD[CrRPCFriends.BulkDataDescriptorType.null] => { bulkDataSource _ NIL }; ORD[CrRPCFriends.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSource _ ServerRecvBulkData }; ORD[CrRPCFriends.BulkDataDescriptorType.active], ORD[CrRPCFriends.BulkDataDescriptorType.passive] => { ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] }; ENDCASE => { ERROR CrRPC.Error[h, protocolError, "bad bulkDataSource type"] }; }; ServerRecvBulkData: CrRPC.BulkDataXferProc <<[h: Handle, stream: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL]>> ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT _ RefText.ObtainScratch[bufSize]; nBytes: INT; state: XNSStream.State; ssType: XNSSPPTypes.SubSequenceType; abort _ FALSE; ServerRdyBDTRecv[h]; DO SELECT nBytes _ IO.GetBlock[self~dH.stream, block~b] FROM 0 => { [state~state, ssType~ssType] _ XNSStream.GetStatus[dH.stream, TRUE]; SELECT state FROM attention => { abort _ TRUE; EXIT }; endOfMessage => { dH.inputFlushNeeded _ FALSE; EXIT }; ssTypeChange => SELECT ssType FROM 0 => ERROR CrRPC.Error[h, protocolError, "bdt no eom"]; XNSSPPTypes.endSST, XNSSPPTypes.endReplySST => ERROR CrRPC.Error[h, remoteClose, "remote close"]; ENDCASE => NULL; -- ???? is this an error ???? open => NULL; ENDCASE => ERROR; }; ENDCASE => { IO.PutBlock[self~stream, block~b, count~nBytes]; }; IF checkAbort[h].abort THEN { XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ???? EXIT }; ENDLOOP; }; GetBulkDataSink: PROC [h: Handle] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; IF h.kind # server THEN ERROR CrRPC.Error[h, notServerHandle, "not server handle"]; typeCode _ CrRPC.GetCARDINAL[h]; SELECT typeCode FROM ORD[CrRPCFriends.BulkDataDescriptorType.null] => { bulkDataSink _ NIL }; ORD[CrRPCFriends.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSink _ ServerSendBulkData }; ORD[CrRPCFriends.BulkDataDescriptorType.active], ORD[CrRPCFriends.BulkDataDescriptorType.passive] => { ERROR CrRPC.Error[h, notImplemented, "no third party bulk data"] }; ENDCASE => { ERROR CrRPC.Error[h, protocolError, "bad bulkDataSink type"] }; }; ServerSendBulkData: CrRPC.BulkDataXferProc <<[h: Handle, stream: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL]>> ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT _ RefText.ObtainScratch[bufSize]; nBytes: INT; state: XNSStream.State; ssType: XNSSPPTypes.SubSequenceType; abort _ FALSE; ServerRdyBDTSend[h]; XNSStream.SetSSType[self~dH.stream, ssType~1]; -- Define 1 ???? { DO <> [state~state, ssType~ssType] _ XNSStream.GetStatus[dH.stream, FALSE]; SELECT state FROM open => NULL; attention, endOfMessage, ssTypeChange => { abort _ TRUE; GOTO FinishWithEOM }; ENDCASE => ERROR; <> IF checkAbort[h].abort THEN { GOTO FinishWithAttention }; <> SELECT nBytes _ IO.GetBlock[self~stream, block~b] FROM 0 => { GOTO FinishWithEOM }; ENDCASE => { IO.PutBlock[self~dH.stream, block~b, count~nBytes]; }; ENDLOOP; EXITS FinishWithAttention => XNSStream.SendAttention[dH.stream, 1]; -- Define 1 ???? FinishWithEOM => XNSStream.SendEndOfMessage[dH.stream]; }; XNSStream.SetSSType[self~dH.stream, ssType~0]; -- Define 0 ???? }; HAlign: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; dH.odd _ FALSE }; <> <> cache: DataHandle _ NIL; cachePulseOut: BasicTime.Pulses ~ BasicTime.MicrosecondsToPulses[60 * 1000000]; cacheSweepInterval: Process.Ticks ~ Process.SecondsToTicks[21]; PulsesSince: PROC [then: BasicTime.Pulses] RETURNS [BasicTime.Pulses] ~ { RETURN [ BasicTime.GetClockPulses[] - then ] }; CheckOutFromCache: ENTRY PROC [remote: XNS.Address] RETURNS [h: Handle _ NIL] ~ { dH: DataHandle _ cache; prevH: DataHandle _ NIL; WHILE dH # NIL DO IF dH.remote = remote THEN { IF prevH = NIL THEN cache _ dH.next ELSE prevH.next _ dH.next; h _ dH.handle; dH.handle _ NIL; -- for finalization EXIT }; prevH _ dH; dH _ dH.next; ENDLOOP; }; CheckInToCache: ENTRY PROC [h: Handle] ~ { dH: DataHandle _ NARROW[h.data]; dH.handle _ h; dH.lastUsed _ BasicTime.GetClockPulses[]; dH.next _ cache; cache _ dH }; CacheSweeper: PROC ~ { GetDeadHandles: ENTRY PROC RETURNS [deadList: DataHandle _ NIL] ~ { dH, newCache, newCacheTail: DataHandle _ NIL; WHILE cache # NIL DO dH _ cache; cache _ cache.next; IF PulsesSince[dH.lastUsed] <= cachePulseOut THEN -- add to new cache -- { dH.next _ NIL; IF newCacheTail = NIL THEN newCache _ dH ELSE newCacheTail.next _ dH; newCacheTail _ dH } ELSE -- add to dead list -- { dH.next _ deadList; deadList _ dH }; ENDLOOP; cache _ newCache; }; DestroyDeadHandles: PROC [deadList: DataHandle] ~ { dH: DataHandle; WHILE deadList # NIL DO dH _ deadList; deadList _ dH.next; dH.next _ NIL; FinishClientHandle[dH.handle]; ENDLOOP; }; DO DestroyDeadHandles[ GetDeadHandles[] ]; Process.Pause[ cacheSweepInterval ]; ENDLOOP; }; <> SetRemote: PROC [h: Handle, remote: XNS.Address] RETURNS [Handle] ~ { dH: DataHandle ~ NARROW[h.data]; timeoutMsec: INT _ dH.timeoutMsec; IF remote.socket = XNS.unknownSocket THEN remote.socket _ XNSWKS.courier; remote.socket _ XNSWKS.courier; IF dH.remote = remote THEN RETURN[h]; CheckInToCache[h]; h _ CreateClientHandle[remote, timeoutMsec]; RETURN [h] }; defaultTimeoutMsec: INT ~ 15000; minTimeoutMsec: INT ~ 100; SetTimeout: PROC [h: Handle, timeoutMsec: INT] RETURNS [Handle] ~ { dH: DataHandle ~ NARROW[h.data]; timeoutMsec _ IF timeoutMsec = 0 THEN defaultTimeoutMsec ELSE MAX[timeoutMsec, minTimeoutMsec]; IF timeoutMsec = dH.timeoutMsec THEN RETURN[h]; IF dH.stream # NIL THEN { XNSStream.SetTimeouts[dH.stream, timeoutMsec, timeoutMsec ! XNSStream.ConnectionClosed => { dH.stream _ NIL; CONTINUE } ]; }; dH.timeoutMsec _ timeoutMsec; RETURN [h] }; SetHops: PROC [h: Handle, low, high: NAT] RETURNS [Handle] ~ { ERROR CrRPC.Error[h, notImplemented, "not a broadcast handle"]; }; FinishClientHandle, Finalize: PROC [h: Handle] ~ { <> dH: DataHandle ~ NARROW[h.data]; dH.handle _ NIL; -- break cycle for GC SendCloseStream[dH]; }; CreateClientHandle: CrRPCFriends.CreateClientHandleProc <<[remote: XNS.Address, timeoutMsec: INT] RETURNS [Handle]>> ~ { handle: Handle; dH: DataHandle; b: REF TEXT; remote.socket _ XNSWKS.courier; handle _ CheckOutFromCache[remote]; IF handle = NIL THEN { b _ RefText.New[nChars~bufferBodyBytes]; dH _ NEW[DataObject _ [ lastUsed~BasicTime.GetClockPulses[], remote~remote, timeoutMsec~timeoutMsec, b~b ]]; handle _ CrRPCFriends.NewClientObject[class~$SPP, marshallProcs~theMarshallProcs, procs~theClientProcs, data~dH]; }; IF SetTimeout[handle, timeoutMsec] # handle THEN ERROR; RETURN [handle]; }; <<>> CrRPCFriends.RegisterCreateClientHandleProc[$SPP, CreateClientHandle]; CrRPCFriends.RegisterCreateListenerProc[$SPP, CreateListener]; TRUSTED { Process.Detach[ FORK CacheSweeper[] ] }; }.