<<>> <> <> <> <> <> <> <> <<>> <> <> <<>> <> <<>> DIRECTORY Basics, CrRPC, CrRPCBuggy, CrRPCBackdoor, IO, Process, ProcStream, RefText, Rope, RuntimeError, SimpleFeedback, --UnixTypes, XNS, XNSWKS; CrRPCCMUXImpl: CEDAR MONITOR IMPORTS Basics, CrRPC, CrRPCBackdoor, IO, Process, ProcStream, RefText, RuntimeError, SimpleFeedback EXPORTS CrRPCBuggy ~ { <> debugMsgs: BOOL ¬ FALSE; CRRPCCMUXSetDebugMsgs: PROC [val: INT] RETURNS [oldVal: INT] ~ { oldVal ¬ (IF debugMsgs THEN 1 ELSE 0); debugMsgs ¬ (val # 0); }; DebugMsg: PROC [r: Rope.ROPE] ~ TRUSTED { SimpleFeedback.Append[$CrRPC, oneLiner, $debugging, r]; }; <> byteAddressedOrElse: BOOL[TRUE..TRUE] ¬ (BYTES[INT32] = UNITS[INT32]); BytePtr: TYPE ~ LONG POINTER TO BYTE; alwaysSmashTheSocket: BOOL ¬ TRUE; myHandleClass: CrRPC.HandleClass ¬ $CMUX; serverPauseMsec: CARD32 ¬ 2500; -- after bad error serverGetTimeoutMsec: CARD32 ¬ 2500; -- controls responsiveness of server shutdown waitForever: CARD32 ¬ 0; FWORD: TYPE ~ Basics.FWORD; HWORD: TYPE ~ Basics.HWORD; ROPE: TYPE ~ Rope.ROPE; STREAM: TYPE ~ IO.STREAM; Ticks: TYPE ~ Process.Ticks; Handle: TYPE ~ CrRPC.Handle; <> CProc: TYPE ~ RECORD[CARD32]; -- Opaque C procedure XRCProcFromMProc: PROC [PROC] RETURNS [CProc] ~ TRUSTED MACHINE CODE { "XR_CProcFromMProc" }; CMUXDescriptor: TYPE ~ RECORD [ val: INT32 ]; nullCMUXDescriptor: CMUXDescriptor ~ [-1]; ErrCodeFromCMUXDescriptor: PROC [d: CMUXDescriptor] RETURNS [INT] ~ INLINE { RETURN [IF d.val >= 0 THEN 0 ELSE d.val] }; EIO: INT ~ 5; ERANGE: INT ~ 34; ETIMEDOUT: INT ~ 60; EBADMSG: INT ~ 76; XRCMUXCreate: PROC RETURNS [CMUXDescriptor] ~ TRUSTED MACHINE CODE { "XR_CMUXCreate" }; XRCMUXDestroy: PROC [d: CMUXDescriptor] ~ TRUSTED MACHINE CODE { "XR_CMUXDestroy" }; CMUXAddrType: TYPE ~ MACHINE DEPENDENT { defaultAddr(0), defaultName(1), xnsAddr(2), xnsName(3), last(CARD.LAST) }; XRCMUXConnect: PROC [d: CMUXDescriptor, type: CMUXAddrType, addr: BytePtr, addrLen: CARD32] RETURNS [INT] ~ TRUSTED MACHINE CODE { "XR_CMUXConnect" }; CmuxAllocaterProc: TYPE ~ CProc; XRCMUXReadMsg: UNSAFE PROC [d: CMUXDescriptor, ignoreAborts: BOOL, timeoutMsec: CARD32, allocaterFunc: CmuxAllocaterProc, allocaterClientData: REF] RETURNS [nBytesRead: INT] ~ TRUSTED MACHINE CODE { "XR_CMUXReadMsg" }; XRCMUXWriteMsg: PROC [d: CMUXDescriptor, hdr: BytePtr, hdrBytes: CARD32, body: BytePtr, bodyBytes: CARD32] RETURNS [INT] ~ TRUSTED MACHINE CODE { "XR_CMUXWriteMsg" }; XRCMUXBDTRead: UNSAFE PROC [d: CMUXDescriptor, buf: BytePtr, bufBytes: CARD32] RETURNS [nBytesRead: INT] ~ TRUSTED MACHINE CODE { "XR_CMUXBDTRead" }; XRCMUXBDTWrite: PROC [d: CMUXDescriptor, buf: BytePtr, bufBytes: CARD32] RETURNS [nBytesWritten: INT] ~ TRUSTED MACHINE CODE { "XR_CMUXBDTWrite" }; XRCMUXFinishBDT: PROC [d: CMUXDescriptor, source: BOOL, sendAbort: BOOL] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR_CMUXFinishBDT" }; XRCMUXServerAdd: PROC [d: CMUXDescriptor, pgm: CARD32, loVersion: CARD32, hiVersion: CARD32] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR_CMUXServerAdd" }; XRCMUXServerQuit: PROC [d: CMUXDescriptor] RETURNS[INT] ~ TRUSTED MACHINE CODE { "XR_CMUXServerQuit" }; <> BDTState: TYPE ~ { ok, -- normal eof, -- got eof on input abort, -- got abort from other end localAbort, -- aborted from this side error -- got fatal I/O error }; Phase: TYPE ~ { inactive, msgIn, msgOut, bdtIn, bdtOut }; DataHandle: TYPE ~ REF DataObject; DataObject: TYPE ~ RECORD [ next: DataHandle ¬ NIL, handle: Handle ¬ NIL, descriptor: CMUXDescriptor ¬ nullCMUXDescriptor, lastUsed: Ticks ¬ 0, buf: REF TEXT ¬ NIL, inStream: STREAM ¬ NIL, outStream: STREAM ¬ NIL, stream: STREAM ¬ NIL, bdtState: BDTState ¬ ok, phase: Phase ¬ inactive, tID: HWORD ¬ [0, 0], -- of current call remote: REF XNS.Address ¬ NIL, sendBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles recvBulkData: CrRPC.BulkDataXferProc ¬ NIL, -- for client handles continuation: CrRPC.ContinuationProc ¬ NIL, -- for server handles continuationClientData: REF -- for server handles ]; theClientProcs: REF CrRPC.ProcsObject ~ NEW[ CrRPC.ProcsObject ¬ [ destroy~CheckInToCache, setParameters~SetParameters, call~Call, getBulkDataSource~ErrorGetBulkDataSource, getBulkDataSink~ErrorGetBulkDataSink, putBulkDataSource~PutBulkDataSource, putBulkDataSink~PutBulkDataSink, readBulkDataStream~ReadBulkDataStream, writeBulkDataSegment~WriteBulkDataSegment, callContinuation~CallContinuation, setContinuation~ErrorSetContinuation, finalize~Finalize ] ]; theServerProcs: REF CrRPC.ProcsObject ~ NEW[ CrRPC.ProcsObject ¬ [ destroy~CheckInToCache, setParameters~SetParameters, call~ErrorCall, getBulkDataSource~GetBulkDataSource, getBulkDataSink~GetBulkDataSink, putBulkDataSource~ErrorPutBulkDataSource, putBulkDataSink~ErrorPutBulkDataSink, readBulkDataStream~ReadBulkDataStream, writeBulkDataSegment~WriteBulkDataSegment, callContinuation~ErrorCallContinuation, setContinuation~SetContinuation, finalize~ErrorFinalize ] ]; <> RaiseError: PROC [h: Handle, reason: CrRPC.ErrorReason, errno: INT, msg: ROPE] ~ { IF errno < 0 THEN errno ¬ -errno; IF errno > 0 THEN msg ¬ IO.PutFR["%g (errno %g)", IO.rope[msg], IO.int[errno]]; IF reason = unknown THEN reason ¬ other; IF debugMsgs THEN DebugMsg[IO.PutFR["RaiseError reason %g msg %g\n", IO.int[ORD[reason]], IO.rope[msg]]]; ERROR CrRPC.Error[h, reason, msg]; }; <> <> <<$GetRemote returns a REF to the XNS.Address of the remote site associated with this handle. (Works for either client or server handles).>> SetParameters: CrRPC.SetParametersProc <<[h: Handle, op: ATOM, argument: REF] RETURNS [hNew: Handle, result: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; hNew ¬ h; SELECT op FROM $GetRemote => result ¬ NEW[XNS.Address ¬ (dH.remote)­]; ENDCASE => ERROR CrRPC.Error[h, unknownOperation, NIL]; }; <> ConnectToServer: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; d: CMUXDescriptor; ans: INT; IF dH.descriptor # nullCMUXDescriptor THEN ERROR; d ¬ XRCMUXCreate[]; ans ¬ ErrCodeFromCMUXDescriptor[d]; IF ans # 0 THEN RaiseError[h, communicationFailure, ans, "can't create CMUX descriptor"]; ans ¬ XRCMUXConnect[d, xnsAddr, LOOPHOLE[dH.remote], BYTES[XNS.Address]]; IF ans # 0 THEN { XRCMUXDestroy[d]; RaiseError[h, cantConnectToRemote, ans, "can't set CMUX descriptor address"]; }; dH.descriptor ¬ d; }; DisconnectFromServer: PROC [h: Handle] ~ { <> <> dH: DataHandle ~ NARROW[h.data]; IF dH.descriptor # nullCMUXDescriptor THEN { XRCMUXDestroy[dH.descriptor]; dH.descriptor ¬ nullCMUXDescriptor; }; }; ErrorCall: CrRPC.CallProc ~ { RaiseError[h, notClientHandle, 0, "call using server handle"] }; GetMsgAllocater: -- CmuxAllocaterProc -- PROC [startIndex: CARD32, nBytes: CARD32, clientData: REF] RETURNS [BytePtr] ~ TRUSTED { dH: DataHandle ¬ NARROW[clientData]; limit: CARD ¬ startIndex+nBytes; IF dH.buf = NIL THEN { IF startIndex # 0 THEN ERROR; -- should be less extreme ... ajd dH.buf ¬ RefText.New[limit]; }; IF dH.buf.maxLength < startIndex THEN ERROR; -- should be less extreme ... ajd IF dH.buf.maxLength < limit THEN { newBuf: REF TEXT ¬ RefText.New[limit]; IF startIndex > 0 THEN TRUSTED { [] ¬ Basics.ByteBlt[ from ~ [ blockPointer ~ LOOPHOLE[dH.buf], startIndex ~ BYTES[TEXT[0]], stopIndexPlusOne ~ BYTES[TEXT[0]] + startIndex ], to ~ [ blockPointer ~ LOOPHOLE[newBuf], startIndex ~ BYTES[TEXT[0]], stopIndexPlusOne ~ BYTES[TEXT[0]] + startIndex ] ]; }; dH.buf ¬ newBuf; }; RETURN[ LOOPHOLE[dH.buf, BytePtr] + BYTES[TEXT[0]] + startIndex ]; }; BDTGetProc: ProcStream.GetProc -- UNSAFE PROC [clientData: REF, offset: CARD, block: IO.UnsafeBlock] RETURNS[nBytesRead: INT, atEnd: BOOL] -- ~ { dH: DataHandle ¬ NARROW[clientData]; IF debugMsgs THEN DebugMsg[IO.PutFR1["BDTGetProc req %g ", IO.int[block.count]]]; SELECT dH.bdtState FROM ok => TRUSTED { nBytesRead ¬ XRCMUXBDTRead[dH.descriptor, LOOPHOLE[block.base + block.startIndex], block.count]; IF debugMsgs THEN DebugMsg["calling CMUXBDRTRead, "]; }; eof, abort, localAbort => { nBytesRead ¬ 0; IF debugMsgs THEN DebugMsg["already eof/abort, "]; }; error => { nBytesRead ¬ -EIO; IF debugMsgs THEN DebugMsg["already error, "]; }; ENDCASE => ERROR; atEnd ¬ TRUE; SELECT nBytesRead FROM block.count => atEnd ¬ FALSE; (-EBADMSG) => { dH.bdtState ¬ abort; nBytesRead ¬ 0 }; < 0 => dH.bdtState ¬ error; < block.count => IF dH.bdtState = ok THEN dH.bdtState ¬ eof; ENDCASE => ERROR; IF debugMsgs THEN DebugMsg[IO.PutFR["BDTGetProc ret bytes %g atEnd %g\n", IO.int[nBytesRead], IO.bool[atEnd]]]; }; BDTPutProc: ProcStream.PutProc -- PROC [clientData: REF, offset: CARD, block: IO.UnsafeBlock] RETURNS[nBytesWritten: INT] -- ~ { dH: DataHandle ¬ NARROW[clientData]; SELECT dH.bdtState FROM ok => TRUSTED { nBytesWritten ¬ XRCMUXBDTWrite[dH.descriptor, LOOPHOLE[block.base + block.startIndex], block.count]; }; abort, localAbort => { nBytesWritten ¬ block.count; }; error => { nBytesWritten ¬ -EIO; }; eof => { ERROR; }; ENDCASE => ERROR; SELECT nBytesWritten FROM (-ERANGE) => { dH.bdtState ¬ abort; nBytesWritten ¬ block.count }; < 0 => dH.bdtState ¬ error; ENDCASE => NULL; }; Call: CrRPC.CallProc -- [h: Handle, remotePgm: CARD32, remotePgmVersion: CARD16, remoteProc: CARD16, putArgs: PutArgsProc, getResults: GetResultsProc, getError: GetErrorProc] -- ~ { dH: DataHandle ~ NARROW[h.data]; BDTCheckAbort: CrRPC.BulkDataCheckAbortProc ~ { RETURN[ dH.bdtState = abort ]; }; <> BEGIN ENABLE { IO.Error -- [ec, stream] -- => { msg: ROPE ¬ IO.PutFR1[ "I/O Error, ec %g", IO.card[ORD[ec]] ]; SELECT dH.phase FROM bdtIn, bdtOut => { tCode: INT; tMsg: ROPE; [code~tCode, msg~tMsg] ¬ ProcStream.GetErrorDetails[stream]; msg ¬ IO.PutFR["%g (BDT error %g: %g)", IO.rope[msg], IO.int[ABS[tCode]], IO.rope[tMsg]]; }; ENDCASE; RaiseError[h, communicationFailure, 0, msg]; }; IO.EndOfStream -- [stream] -- => { msg: ROPE ¬ ( SELECT dH.phase FROM msgOut => "I/O EndOfStream, args send (!)", msgIn => "I/O EndOfStream, results recv", bdtIn => "I/O EndOfStream, BDT read", bdtOut => "I/O EndOfStream, BDT write (!)", ENDCASE => "I/O EndOfStream, inactive (!)" ); RaiseError[h, remoteClose, 0, msg]; }; RuntimeError.BoundsFault => { RaiseError[h, protocolError, 0, "data bounds fault"]; }; }; IF dH.descriptor = nullCMUXDescriptor THEN ConnectToServer[h]; dH.sendBulkData ¬ NIL; dH.recvBulkData ¬ NIL; <> dH.phase ¬ msgOut; dH.outStream ¬ IO.TOS[dH.buf, dH.outStream]; dH.buf ¬ NIL; IF putArgs # NIL THEN putArgs[h, dH.outStream]; <> dH.buf ¬ IO.TextFromTOS[dH.outStream]; BEGIN ENABLE UNWIND => DisconnectFromServer[h]; <> TRUSTED { callMsgHdr: CrRPCBackdoor.CallMsgHdr ¬ [ msgHdr~[msgType~CrRPCBackdoor.callMsgType], callHdr~[ tID~[0, 0], pgmNum~Basics.FFromCard32[remotePgm], pgmVersion~Basics.HFromCard16[remotePgmVersion], procNum~Basics.HFromCard16[remoteProc] ] ]; ans: INT; <> IF debugMsgs THEN DebugMsg[IO.PutFR["call pgm %g proc %g args %g bytes\n", IO.card[remotePgm], IO.card[remoteProc], IO.card[dH.buf.length]]]; ans ¬ XRCMUXWriteMsg[dH.descriptor, LOOPHOLE[@callMsgHdr], CrRPCBackdoor.callMsgHdrBytes, LOOPHOLE[dH.buf, BytePtr] + BYTES[TEXT[0]], dH.buf.length]; IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't send call msg"]; }; <> BEGIN bdtProc: CrRPC.BulkDataXferProc ¬ NIL; localAbort: BOOL; ans: INT; SELECT TRUE FROM (dH.sendBulkData # NIL) => { <> TRUSTED { bdtProc ¬ dH.sendBulkData }; dH.phase ¬ bdtIn; }; (dH.recvBulkData # NIL) => { <> TRUSTED { bdtProc ¬ dH.recvBulkData }; dH.phase ¬ bdtOut; }; ENDCASE; IF bdtProc # NIL THEN { dH.bdtState ¬ ok; dH.stream ¬ ProcStream.PIOS[dH, BDTGetProc, BDTPutProc, NIL, NIL, dH.stream, FALSE, FALSE]; IF debugMsgs THEN DebugMsg[IO.PutFR["BDT xfer stream %x (%g)\n", IO.int[LOOPHOLE[dH.stream]], IO.rope[IF dH.sendBulkData # NIL THEN "source" ELSE "sink"]]]; localAbort ¬ bdtProc[h, dH.stream, BDTCheckAbort]; IF (dH.sendBulkData # NIL) AND (NOT localAbort) THEN IO.Flush[dH.stream]; ans ¬ XRCMUXFinishBDT[dH.descriptor, TRUE, localAbort]; IF ans < 0 THEN RaiseError[h, bulkDataError, ans, "bulk data transfer failed"]; IF debugMsgs THEN DebugMsg["BDT xfer done"]; IF dH.recvBulkData # NIL THEN { charsRead, charsUnread: INT; charsRead ¬ IO.GetIndex[dH.stream]; IF debugMsgs THEN DebugMsg[IO.PutFR1[" (%g bytes read, ", IO.int[charsRead]]]; charsUnread ¬ IO.CharsAvail[dH.stream, FALSE]; IF charsUnread = INT.LAST THEN { IF debugMsgs THEN DebugMsg["end of stream)"] } ELSE { IF debugMsgs THEN DebugMsg[IO.PutFR1["%g unread", IO.int[charsUnread]]] }; }; IF debugMsgs THEN DebugMsg["\n"]; }; END; <> TRUSTED { ans: INT; dH.phase ¬ msgIn; ans ¬ XRCMUXReadMsg[dH.descriptor, TRUE, waitForever, XRCProcFromMProc[LOOPHOLE[GetMsgAllocater]], dH]; IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't read reply msg"]; IF debugMsgs THEN DebugMsg[IO.PutFR["reply pgm %g proc %g reply %g bytes\n", IO.card[remotePgm], IO.card[remoteProc], IO.int[ans]]]; dH.buf.length ¬ ans; }; END; <> BEGIN msgType: HWORD; dH.inStream ¬ IO.TIS[dH.buf]; dH.buf ¬ NIL; msgType ¬ CrRPC.GetHWord[dH.inStream]; SELECT msgType FROM CrRPCBackdoor.rejectMsgType => { rejectReason: CARDINAL; [] ¬ CrRPC.GetHWord[dH.inStream]; rejectReason ¬ CrRPC.GetCard16[dH.inStream]; RaiseError[h, (SELECT rejectReason FROM CrRPC.noSuchProgram => rejectedNoSuchProgram, CrRPC.noSuchVersion => rejectedNoSuchVersion, CrRPC.noSuchProcedure => rejectedNoSuchProcedure, CrRPC.invalidArgument => rejectedInvalidArgument, ENDCASE => rejectedUnspecified), 0, "rejected"]; }; CrRPCBackdoor.returnMsgType => { [] ¬ CrRPC.GetHWord[dH.inStream]; IF getResults # NIL THEN getResults[h, dH.inStream]; }; CrRPCBackdoor.abortMsgType => { errNum: CARDINAL; [] ¬ CrRPC.GetHWord[dH.inStream]; errNum ¬ CrRPC.GetCard16[dH.inStream]; IF getError # NIL THEN getError[h, dH.inStream, errNum]; RaiseError[h, remoteError, 0, "unexpected remote error"]; }; ENDCASE => { DisconnectFromServer[h]; RaiseError[h, protocolError, 0, IO.PutFR1["bad msgType (%g) in response", IO.card[Basics.Card16FromH[msgType]]]]; }; END; dH.phase ¬ inactive; END; }; -- of Call <> RefreshServerCMUXDescriptor: PROC [h: Handle, pgm: CARD32, pgmVersion: CARD16] ~ { ans: INT; dH: DataHandle; d: CMUXDescriptor; dH ¬ NARROW[h.data]; IF dH.descriptor # nullCMUXDescriptor THEN RETURN; d ¬ XRCMUXCreate[]; ans ¬ ErrCodeFromCMUXDescriptor[d]; IF ans # 0 THEN RaiseError[h, communicationFailure, ans, "can't create CMUX descriptor"]; ans ¬ XRCMUXServerAdd[d, pgm, CARD32[pgmVersion], CARD32[pgmVersion]]; IF ans # 0 THEN { XRCMUXDestroy[d]; RaiseError[h, communicationFailure, ans, "can't add service to CMUX"]; }; dH.descriptor ¬ d; }; SmashServerCMUXDescriptor: PROC [h: Handle] ~ { dH: DataHandle; d: CMUXDescriptor; dH ¬ NARROW[h.data]; IF (d ¬ dH.descriptor) = nullCMUXDescriptor THEN RETURN; [] ¬ XRCMUXServerQuit[d]; XRCMUXDestroy[d]; dH.descriptor ¬ nullCMUXDescriptor; }; CleanupServerBDT: PROC [h: Handle, dH: DataHandle] ~ { ans: INT ¬ 0; SELECT dH.phase FROM bdtIn => { ans ¬ XRCMUXFinishBDT[dH.descriptor, FALSE, (dH.bdtState = localAbort)]; }; bdtOut => { IF dH.bdtState = ok THEN IO.Flush[dH.stream]; ans ¬ XRCMUXFinishBDT[dH.descriptor, TRUE, (dH.bdtState = localAbort)]; }; ENDCASE; IF ans < 0 THEN RaiseError[h, bulkDataError, ans, "bulk data transfer failed"]; dH.phase ¬ msgOut; }; BeginReturn: CrRPC.BeginReturnProc -- [h: Handle] -- ~ { dH: DataHandle ¬ NARROW[h.data]; s: STREAM; CleanupServerBDT[h, dH]; s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL; CrRPC.PutHWord[s, CrRPCBackdoor.returnMsgType]; CrRPC.PutHWord[s, dH.tID]; }; BeginError: CrRPC.BeginErrorProc -- [h: Handle, errNum: CARDINAL] -- ~ { dH: DataHandle ¬ NARROW[h.data]; s: STREAM; CleanupServerBDT[h, dH]; s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL; CrRPC.PutHWord[s, CrRPCBackdoor.abortMsgType]; CrRPC.PutHWord[s, dH.tID]; CrRPC.PutCard16[s, errNum]; }; BeginReject: CrRPC.BeginRejectProc -- [h: Handle, rejectReason: CARDINAL] -- ~ { dH: DataHandle ¬ NARROW[h.data]; s: STREAM; CleanupServerBDT[h, dH]; s ¬ dH.stream ¬ IO.TOS[dH.buf, dH.stream]; dH.buf ¬ NIL; CrRPC.PutHWord[s, CrRPCBackdoor.rejectMsgType]; CrRPC.PutHWord[s, dH.tID]; CrRPC.PutCard16[s, rejectReason]; }; ServerDaemon: PROC [h: Handle, pgm: CARD32, pgmVersion: CARD16, serverProc: CrRPC.ServerProc, stopServerQueryProc: CrRPC.StopServerQueryProc] ~ { dH: DataHandle ¬ NARROW[h.data]; pgmF: FWORD ¬ Basics.FFromCard32[pgm]; pgmVersionH: HWORD ¬ Basics.HFromCard16[pgmVersion]; DO BEGIN ENABLE { IO.Error -- [ec, stream] -- => { IF debugMsgs THEN { DebugMsg[IO.PutFR1[ "I/O Error, ec %g, ", IO.card[ORD[ec]] ]]; DebugMsg[IO.PutFR1[ "phase %g\n", IO.card[ORD[dH.phase]] ]]; }; CONTINUE; }; IO.EndOfStream -- [stream] -- => { IF debugMsgs THEN { DebugMsg[ "I/O end of stream\n" ]; }; CONTINUE; }; CrRPC.Error -- [h, errorReason, text] -- => { IF debugMsgs THEN { DebugMsg[IO.PutFR[ "CrRPC.Error %g (%g)\n", IO.card[ORD[errorReason]], IO.rope[text] ]]; }; CONTINUE; }; }; DO procNum: CARD16; IF dH.descriptor = nullCMUXDescriptor THEN EXIT; IF stopServerQueryProc # NIL THEN IF stopServerQueryProc[h] THEN GOTO Done; dH.phase ¬ msgIn; <> <> TRUSTED BEGIN ans: INT; ans ¬ XRCMUXReadMsg[dH.descriptor, TRUE, serverGetTimeoutMsec, XRCProcFromMProc[LOOPHOLE[GetMsgAllocater]], dH]; IF ans <= 0 THEN { IF ans = (-ETIMEDOUT) THEN LOOP; IF debugMsgs THEN DebugMsg[IO.PutFR1[ "CMUXReadMsg error %g\n", IO.int[ans] ]]; EXIT; }; IF debugMsgs THEN DebugMsg[IO.PutFR["got call, pgm %g, %g bytes\n", IO.card[pgm], IO.int[ans]]]; dH.buf.length ¬ ans; END; <> BEGIN msgType: HWORD; s: STREAM; s ¬ dH.stream ¬ IO.TIS[dH.buf, dH.stream]; dH.buf ¬ NIL; msgType ¬ CrRPC.GetHWord[dH.stream]; SELECT msgType FROM CrRPCBackdoor.callMsgType => { dH.tID ¬ CrRPC.GetHWord[s]; IF CrRPC.GetFWord[s] # pgmF THEN RaiseError[h, other, 0, "CMUX server pgm # botch"]; IF CrRPC.GetHWord[s] # pgmVersionH THEN RaiseError[h, other, 0, "CMUX server version # botch"]; procNum ¬ CrRPC.GetCard16[s] }; ENDCASE => { IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUX bad msg %g\n", IO.card[Basics.Card16FromH[msgType]]]]; RaiseError[h, protocolError, 0, "CMUX delivered non-call msg"]; }; END; <> BEGIN serverProc[h, dH.stream, pgm, pgmVersion, procNum, BeginReturn, BeginError, BeginReject]; END; <> BEGIN ans: INT; b: REF TEXT; IF dH.phase # msgOut THEN { IF debugMsgs THEN DebugMsg[IO.PutFR1["bad phase after server proc %g\n", IO.card[ORD[dH.phase]]]]; }; b ¬ dH.buf ¬ IO.TextFromTOS[dH.stream]; IF debugMsgs THEN DebugMsg[IO.PutFR["sending reply, pgm %g proc %g len %g\n", IO.card[pgm], IO.card[procNum], IO.card[b.length]]]; ans ¬ XRCMUXWriteMsg[dH.descriptor, LOOPHOLE[b, BytePtr] + BYTES[TEXT[0]], b.length, NIL, 0]; IF ans < 0 THEN RaiseError[h, communicationFailure, ans, "can't send reply msg"]; END; dH.phase ¬ inactive; ENDLOOP; END; SmashServerCMUXDescriptor[h]; IF debugMsgs THEN DebugMsg["CMUX server pause\n"]; Process.PauseMsec[serverPauseMsec]; RefreshServerCMUXDescriptor[h, pgm, pgmVersion ! CrRPC.Error => CONTINUE ]; REPEAT Done => NULL; ENDLOOP; SmashServerCMUXDescriptor[h]; }; StartServerInstance: CrRPCBackdoor.StartServerInstanceProc <<[pgm: CARD32, pgmVersion: CARD16, local: CrRPC.RefAddress]>> ~ { h: Handle; dH: DataHandle; serverProc: CrRPC.ServerProc; stopServerQueryProc: CrRPC.StopServerQueryProc; [serverProc, stopServerQueryProc] ¬ CrRPCBackdoor.LookUpServerProcs[pgm, pgmVersion]; IF serverProc = NIL THEN RaiseError[NIL, rejectedNoSuchProgram, 0, "no server proc for remote program"]; dH ¬ NEW[DataObject ¬ []]; h ¬ NEW[CrRPC.Object ¬ [ class~myHandleClass, kind~server, procs~theServerProcs, data~dH, clientData~NIL ]]; dH.handle ¬ h; RefreshServerCMUXDescriptor[h, pgm, pgmVersion]; TRUSTED { Process.Detach[ FORK ServerDaemon[h, pgm, pgmVersion, serverProc, stopServerQueryProc] ]; }; }; <> ErrorPutBulkDataSource: CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc]-- ~ { RaiseError[h, notClientHandle, 0, "putBDTSource on server handle"]; }; PutBulkDataSource: CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ { dH: DataHandle ~ NARROW[h.data]; IF proc = NIL THEN { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]]; TRUSTED { dH.sendBulkData ¬ proc }}; }; ErrorPutBulkDataSink: CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ { RaiseError[h, notClientHandle, 0, "putBDTSink on server handle"]; }; PutBulkDataSink: CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ { dH: DataHandle ~ NARROW[h.data]; IF proc = NIL THEN { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.null]] } ELSE { CrRPC.PutCard16[s, ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate]]; TRUSTED { dH.recvBulkData ¬ proc }}; }; PutBulkDataSinkBuggy: PUBLIC CrRPC.PutBulkDataXferProcProc -- [h: Handle, s: STREAM, proc: BulkDataXferProc] -- ~ { <> <> <> dH: DataHandle ~ NARROW[h.data]; TRUSTED { dH.recvBulkData ¬ proc }; }; ErrorGetBulkDataSource: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { RaiseError[h, notServerHandle, 0, "GetBDTSource on client handle"]; }; GetBulkDataSource: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSource: CrRPC.BulkDataSource] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; typeCode ¬ CrRPC.GetCard16[s]; SELECT typeCode FROM ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => { bulkDataSource ¬ NIL }; ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSource ¬ ServerRecvBulkData }; ORD[CrRPCBackdoor.BulkDataDescriptorType.active], ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => { RaiseError[h, notImplemented, 0, "no third party bulk data"] }; ENDCASE => { RaiseError[h, protocolError, 0, "bad bulkDataSource type"] }; }; ServerRecvBulkData: CrRPC.BulkDataXferProc -- [h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL] -- ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 2048; b: REF TEXT ~ RefText.ObtainScratch[bufSize]; nReadSoFar: CARD ¬ 0; dH.phase ¬ bdtIn; dH.bdtState ¬ ok; DO nBytes: INT; atEnd: BOOL; TRUSTED { [nBytes, atEnd] ¬ BDTGetProc[dH, nReadSoFar, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~bufSize]]; }; IF nBytes < 0 THEN RaiseError[h, communicationFailure, -nBytes, "BDT read failure"]; IF (nBytes > 0) THEN { nReadSoFar ¬ nReadSoFar + nBytes; IO.UnsafePutBlock[s, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~nBytes]]; }; IF atEnd THEN EXIT; IF checkAbort[h].abort THEN { dH.bdtState ¬ localAbort; EXIT; }; ENDLOOP; RefText.ReleaseScratch[b]; RETURN [dH.bdtState = abort]; }; <<>> ErrorGetBulkDataSink: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { RaiseError[h, notServerHandle, 0, "GetBDTSink on client handle"]; }; GetBulkDataSink: PROC [h: Handle, s: STREAM] RETURNS[bulkDataSink: CrRPC.BulkDataSink] ~ { dH: DataHandle ~ NARROW[h.data]; typeCode: CARDINAL; typeCode ¬ CrRPC.GetCard16[s]; SELECT typeCode FROM ORD[CrRPCBackdoor.BulkDataDescriptorType.null] => { bulkDataSink ¬ NIL }; ORD[CrRPCBackdoor.BulkDataDescriptorType.immediate] => TRUSTED { bulkDataSink ¬ ServerSendBulkData }; ORD[CrRPCBackdoor.BulkDataDescriptorType.active], ORD[CrRPCBackdoor.BulkDataDescriptorType.passive] => { RaiseError[h, notImplemented, 0, "no third party bulk data"] }; ENDCASE => { RaiseError[h, protocolError, 0, "bad bulkDataSink type"] }; }; ServerSendBulkData: CrRPC.BulkDataXferProc -- [h: Handle, s: IO.STREAM, checkAbort: BulkDataCheckAbortProc] RETURNS [abort: BOOL] -- ~ { dH: DataHandle ~ NARROW[h.data]; bufSize: NAT ~ 1024; b: REF TEXT ~ RefText.ObtainScratch[bufSize]; nWrittenSoFar: CARD ¬ 0; dH.phase ¬ bdtOut; dH.bdtState ¬ ok; DO nRead, nWritten: INT; nRead ¬ IO.GetBlock[s, b]; IF nRead <= 0 THEN EXIT; nWritten ¬ BDTPutProc[dH, nWrittenSoFar, [base~LOOPHOLE[b], startIndex~BYTES[TEXT[0]], count~nRead]]; IF dH.bdtState # ok THEN EXIT; IF checkAbort[h].abort THEN { dH.bdtState ¬ localAbort; EXIT; }; ENDLOOP; RefText.ReleaseScratch[b]; RETURN[ dH.bdtState = abort ]; }; <<>> <> nextSegment: CARD16 ~ 0; lastSegment: CARD16 ~ 1; ReadBulkDataStream: CrRPC.ReadBulkDataStreamProc <<[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, readValue: BulkDataValueProc] RETURNS [abort: BOOL]>> ~ { segmentKind: CARD16; len: CARD16; DO IF checkAbort[h] THEN RETURN[FALSE]; segmentKind ¬ CrRPC.GetCard16[s]; len ¬ CrRPC.GetCard16[s]; THROUGH [1..len] DO IF checkAbort[h] THEN RETURN[FALSE]; IF readValue[s].abort THEN RETURN[TRUE]; ENDLOOP; IF segmentKind = lastSegment THEN RETURN[FALSE]; ENDLOOP; }; WriteBulkDataSegment: CrRPC.WriteBulkDataSegmentProc <<[h: Handle, s: STREAM, checkAbort: BulkDataCheckAbortProc, writeValue: BulkDataValueProc, n: CARDINAL] RETURNS [abort: BOOL, heAborted: BOOL]>> ~ { CrRPC.PutCard16[s, (IF n > 0 THEN nextSegment ELSE lastSegment)]; CrRPC.PutCard16[s, n]; abort ¬ heAborted ¬ FALSE; THROUGH [1..n] DO IF checkAbort[h] THEN { heAborted ¬ TRUE; RETURN }; IF writeValue[s].abort THEN { abort ¬ TRUE; RETURN }; ENDLOOP; }; <> ErrorCallContinuation: CrRPC.CallContinuationProc ~ { RaiseError[h, notClientHandle, 0, "CallContinuation on server handle"] }; CallContinuation: CrRPC.CallContinuationProc <<[h: Handle, proc: ContinuationProc, clientData: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; RaiseError[h, notImplemented, 0, "continuations not implemented"]; }; ErrorSetContinuation: CrRPC.SetContinuationProc ~ { RaiseError[h, notServerHandle, 0, "SetContinuation on client handle"] }; SetContinuation: CrRPC.SetContinuationProc <<[h: Handle, proc: ContinuationProc, clientData: REF]>> ~ { dH: DataHandle ~ NARROW[h.data]; RaiseError[h, notImplemented, 0, "continuations not implemented"]; TRUSTED { dH.continuation ¬ proc }; dH.continuationClientData ¬ clientData }; <> <> cache: DataHandle ¬ NIL; cacheTimeoutTicks: Ticks ¬ Process.SecondsToTicks[8]; cacheSweepIntervalTicks: Ticks ¬ Process.SecondsToTicks[8]; Now: PROC RETURNS [Ticks] ~ TRUSTED MACHINE CODE { "XR_TicksSinceBoot" }; TicksSince: PROC [then: Ticks] RETURNS [Ticks] ~ { RETURN [ Now[] - then ] }; CheckOutFromCache: ENTRY PROC [remote: XNS.Address] RETURNS [h: Handle ¬ NIL] ~ { dH: DataHandle ¬ cache; prevH: DataHandle ¬ NIL; WHILE dH # NIL DO IF (dH.remote)­ = remote THEN { IF prevH = NIL THEN cache ¬ dH.next ELSE prevH.next ¬ dH.next; h ¬ dH.handle; dH.handle ¬ NIL; -- for finalization dH.lastUsed ¬ Now[]; EXIT }; prevH ¬ dH; dH ¬ dH.next; ENDLOOP; }; CheckInToCache: ENTRY PROC [h: Handle] ~ { dH: DataHandle ¬ NARROW[h.data]; IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUXCheckIn d=%g\n", IO.int[dH.descriptor]]]; dH.handle ¬ h; dH.lastUsed ¬ Now[]; dH.next ¬ cache; cache ¬ dH }; CacheSweeper: PROC ~ { GetDeadHandles: ENTRY PROC RETURNS [deadList: DataHandle ¬ NIL] ~ { dH, newCache, newCacheTail: DataHandle ¬ NIL; WHILE cache # NIL DO dH ¬ cache; cache ¬ cache.next; IF TicksSince[dH.lastUsed] <= cacheTimeoutTicks THEN -- add to new cache -- { dH.next ¬ NIL; IF newCacheTail = NIL THEN newCache ¬ dH ELSE newCacheTail.next ¬ dH; newCacheTail ¬ dH } ELSE -- add to dead list -- { dH.next ¬ deadList; deadList ¬ dH }; ENDLOOP; cache ¬ newCache; }; DestroyDeadHandles: PROC [deadList: DataHandle] ~ { dH: DataHandle; IF deadList # NIL THEN { IF debugMsgs THEN DebugMsg[IO.PutFR1["CMUXCacheSweeper %g\n", IO.card[LOOPHOLE[deadList]]]]; }; WHILE deadList # NIL DO dH ¬ deadList; deadList ¬ dH.next; BuryDeadHandle[dH.handle]; ENDLOOP; }; DO DestroyDeadHandles[ GetDeadHandles[] ]; Process.Pause[ cacheSweepIntervalTicks ]; ENDLOOP; }; BuryDeadHandle: PROC [h: Handle] ~ { dH: DataHandle ~ NARROW[h.data]; dH.handle ¬ NIL; -- break cycle for GC dH.stream ¬ NIL; -- break cycle with ProcStream IF debugMsgs THEN DebugMsg[IO.PutFR1["BuryDeahHandle d=%g\n", IO.int[dH.descriptor]]]; DisconnectFromServer[h]; h.data ¬ NIL; }; <> ErrorFinalize: PROC [h: Handle] RETURNS [renewHandle: BOOL] ~ { ERROR }; Finalize: PROC [h: Handle] RETURNS [renewHandle: BOOL ¬ FALSE] ~ { <> dH: DataHandle ~ NARROW[h.data]; IF dH = NIL THEN -- handle is already dead -- RETURN; IF debugMsgs THEN DebugMsg[IO.PutFR1["Finalizing d=%g\n", IO.int[dH.descriptor]]]; IF TicksSince[dH.lastUsed] <= cacheTimeoutTicks THEN { CheckInToCache[h]; RETURN [TRUE]; }; BuryDeadHandle[h]; RETURN [FALSE]; }; CreateClientHandle: CrRPCBackdoor.CreateClientHandleProc <<[remote: CrRPC.RefAddress] RETURNS [Handle]>> ~ { handle: Handle; dH: DataHandle; addr: REF XNS.Address; IF remote = NIL THEN RaiseError[NIL, addressInappropriateForClass, 0, "missing remote address"]; WITH remote SELECT FROM rA: REF XNS.Address => addr ¬ NEW[XNS.Address ¬ (rA­)]; ENDCASE => RaiseError[NIL, addressInappropriateForClass, 0, "not XNS address"]; IF ( alwaysSmashTheSocket ) OR ( addr.socket = XNS.unknownSocket ) THEN addr.socket ¬ XNSWKS.courier; handle ¬ CheckOutFromCache[addr­]; IF handle = NIL THEN { dH ¬ NEW[DataObject ¬ [lastUsed~Now[], remote~addr]]; handle ¬ CrRPCBackdoor.NewClientObject[class~$CMUX, procs~theClientProcs, data~dH]; }; RETURN [handle]; }; <> CrRPCBackdoor.RegisterCreateClientHandleProc[myHandleClass, CreateClientHandle]; CrRPCBackdoor.RegisterStartServerInstanceProc[myHandleClass, StartServerInstance]; TRUSTED { Process.Detach[ FORK CacheSweeper[] ] }; }.