<> <> <> <> <> DIRECTORY BufferDefs USING [PupBuffer], DebuggerSwap USING [CallDebugger], PrincOps USING [PsbIndex, PsbNull], PrincOpsUtils USING [BITAND, BITXOR, IsBound, LongCopy, PsbHandleToIndex, ReadPSB], Process USING [Detach], PupDefs USING [AnyLocalPupAddress, GetFreePupBuffer, PupRouterSendThis, ReturnFreePupBuffer], PupTypes USING [PupAddress, PupHostID, PupNetID, PupSocketID], RPC USING [CallFailed, Conversation, unencrypted], RPCInternal USING [ConversationObject, DecryptPkt, EncryptPkt, exportTable, firstConversation, GetConnectionState, ImportInstance], RPCLupine USING [DataLength, Dispatcher, GetRPCPkt, Header, maxDataLength, maxPupWords, pktOverhead, RPCPkt], RPCPkt USING [CallCount, ConnectionID, DispatcherDetails, EnqueueAgain, Header, IdleReceive, Machine, noDispatcher, Outcome, PktExchange, pktLengthOverhead, SetupResponse], RPCPrivate USING [ReturnBuffer, rpcSocket], SafeStorage USING [CantEstablishFinalization, EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ, ReEstablishFinalization], VM USING [AddressForPageNumber, Interval, nullInterval, PagesForWords, Pin, SimpleAllocate, Unpin]; RPCPktStreams: MONITOR IMPORTS DebuggerSwap, PrincOpsUtils, Process, PupDefs, RPC, RPCLupine, RPCInternal, RPCPkt, RPCPrivate, SafeStorage, VM EXPORTS RPC, --Header,ConversationObject RPCInternal, --DoSignal, ServerMain RPCLupine -- others SHARES BufferDefs, RPCLupine = { Header: PUBLIC TYPE = RPCPkt.Header; HeaderPtr: TYPE = LONG POINTER TO Header; LupineHeader: TYPE = RPCLupine.Header; LupineHeaderPtr: TYPE = LONG POINTER TO LupineHeader; ConcreteHeader: PROC [abstract: LupineHeaderPtr] RETURNS [HeaderPtr] = INLINE { RETURN [abstract]; }; myHost: RPCPkt.Machine; GiveBackBuffer: PROC [b: BufferDefs.PupBuffer] = <> IF PrincOpsUtils.IsBound[LOOPHOLE[RPCPrivate.ReturnBuffer]] THEN RPCPrivate.ReturnBuffer ELSE PupDefs.ReturnFreePupBuffer; <<******** Caller ********>> <> CallDestHint: TYPE = ARRAY PrincOps.PsbIndex OF PrincOps.PsbIndex; lastCallDest: REF CallDestHint = NEW[CallDestHint _ ALL[PrincOps.PsbNull]]; RecordCallDest: ENTRY PROC [header: HeaderPtr] = INLINE { lastCallDest[header.destPSB--myPSB--] _ header.srcePSB; }; ImportInstance: PUBLIC TYPE = RPCInternal.ImportInstance; <> ConversationObject: PUBLIC TYPE = RPCInternal.ConversationObject; Conversation: TYPE = REF ConversationObject; MisusedConversation: ERROR = CODE; StartCall: PUBLIC ENTRY PROC [callPkt: RPCLupine.RPCPkt, interface: REF ImportInstance, localConversation: Conversation _ RPC.unencrypted] = { ENABLE UNWIND => NULL; myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]]; header: HeaderPtr = @callPkt.header; header.destHost _ interface.host; header.destSoc _ RPCPrivate.rpcSocket; header.destPSB _ lastCallDest[myPSB]; callPkt.convHandle _ localConversation; IF localConversation = RPC.unencrypted THEN header.conv _ RPCInternal.firstConversation ELSE { <> header.conv _ [localConversation.id.count.ls, caller, localConversation.id.count.ms]; IF localConversation.id.originator # myHost THEN { IF header.destHost # localConversation.id.originator THEN ERROR MisusedConversation[]; header.conv.originator _ callee; }; }; header.pktID.activity _ myPSB; <> header.pktID.pktSeq _ 0; -- => new call -- header.dispatcher _ interface.dispatcher; }; Call: PUBLIC PROC [pkt: RPCLupine.RPCPkt, callLength: RPCLupine.DataLength, maxReturnLength: RPCLupine.DataLength, signalHandler: RPCLupine.Dispatcher _ NIL] RETURNS [ returnLength: RPCLupine.DataLength, lastPkt: BOOL] = { recvdHeader: HeaderPtr = @pkt.header; returnLength _ RPCPkt.PktExchange[pkt, callLength, maxReturnLength, call, signalHandler ].newLength; RecordCallDest[recvdHeader]; SELECT recvdHeader.outcome FROM result => NULL; unbound => ERROR RPC.CallFailed[unbound]; protocol => ERROR RPC.CallFailed[runtimeProtocol]; signal => ERROR -- handled inside RPCPkt.PktExchange --; unwind => { <> RPCPkt.SetupResponse[recvdHeader]; ERROR UnwindRequested[]; }; ENDCASE => <> ERROR RPC.CallFailed[runtimeProtocol]; RPCPkt.SetupResponse[recvdHeader]; RETURN[ returnLength, recvdHeader.type.eom = end ] }; <<******** Protocol implementation: multi-packet case ******** -->> SendPrelimPkt: PUBLIC PROC [pkt: RPCLupine.RPCPkt, length: RPCLupine.DataLength] = { [] _ RPCPkt.PktExchange[pkt, length, 0, sending]; }; ReceiveExtraPkt: PUBLIC PROC [pkt: RPCLupine.RPCPkt] RETURNS [length: RPCLupine.DataLength, lastPkt: BOOL] = { recvdHeader: HeaderPtr; length _ RPCPkt.PktExchange[pkt, 0, RPCLupine.maxDataLength, receiving].newLength; recvdHeader _ @pkt.header; RPCPkt.SetupResponse[recvdHeader]; RETURN[ length, recvdHeader.type.eom = end ] }; <<******** Protocol implementation: callee and packets-while-notWanting ******** -->> idlerAckCount: CARDINAL _ 0; idlerRequeueCount: CARDINAL _ 0; GenerateIdlerResponse: PROC [recvd: RPCLupine.RPCPkt] = { <> ackPkt: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; header: HeaderPtr = LOOPHOLE[@ackPkt.pupLength]; recvdHeader: HeaderPtr = @recvd.header; workerPSB: PrincOps.PsbIndex = recvdHeader.destPSB; -- as adjusted by FindCallee -- idlerAckCount _ idlerAckCount+1; RPCPkt.SetupResponse[recvdHeader]; header^ _ recvdHeader^; header.length _ recvdHeader.length; header.oddByte _ no; header.type _ [0,rpc,end,dontAck,ack]; header.srceHost _ myHost; header.srceSoc _ RPCPrivate.rpcSocket; header.srcePSB _ workerPSB; PupDefs.PupRouterSendThis[ackPkt]; }; EnqueueForNewPSB: PROC [recvd: RPCLupine.RPCPkt] = { <> pupPkt: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; header: HeaderPtr = LOOPHOLE[@pupPkt.pupLength]; recvdHeader: HeaderPtr = @recvd.header; idlerRequeueCount _ idlerRequeueCount+1; PrincOpsUtils.LongCopy[from: recvdHeader, to: header, nwords: recvdHeader.length]; RPCPkt.EnqueueAgain[pupPkt]; }; <> CalleeState: TYPE = REF CalleeStateRec; CalleeStateRec: TYPE = RECORD [ entered: BOOL _ FALSE, free: BOOL _ FALSE, next: CalleeState _ NIL, callee: PrincOps.PsbIndex _ PrincOps.PsbNull, state: HeaderPtr _ NIL, space: VM.Interval _ VM.nullInterval]; callees: CalleeState _ NIL; calleesCount: INT _ 0; freeCallees: CalleeState _ NIL; freeCount: INT _ 0; worldSwap: BOOL _ FALSE; <> RPCInternalFailure: ERROR = CODE; Crash: PROC = { IF worldSwap THEN DebuggerSwap.CallDebugger["RPC internal failure!"L] ELSE ERROR RPCInternalFailure; }; AllocCalleeState: ENTRY PROC RETURNS [CalleeState] = { myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]]; state: CalleeState _ freeCallees; space: VM.Interval _ VM.nullInterval; IF freeCallees # NIL THEN { freeCallees _ state.next; freeCount _ freeCount - 1; space _ state.space; } ELSE { state _ NEW[CalleeStateRec]; }; IF space.count = 0 THEN space _ VM.SimpleAllocate[ VM.PagesForWords[serverDataLength+RPCLupine.pktOverhead]]; VM.Pin[space]; state^ _ [free: FALSE, entered: FALSE, next: NIL, callee: myPSB, state: NIL, space: space]; SafeStorage.EnableFinalization[state]; RETURN [state]; }; FreeCalleeState: ENTRY PROC [state: CalleeState] = { <> IF state.free THEN RETURN; IF state.entered THEN RemoveCalleeInternal[state]; VM.Unpin[state.space]; state.free _ TRUE; state.next _ freeCallees; freeCallees _ state; freeCount _ freeCount + 1; }; AddCallee: ENTRY PROC [state: CalleeState] = INLINE { AddCalleeInternal[state]; }; AddCalleeInternal: INTERNAL PROC [state: CalleeState] = { IF state.entered THEN Crash[]; state.entered _ TRUE; state.next _ callees; callees _ state; calleesCount _ calleesCount + 1; }; RemoveCallee: ENTRY PROC [state: CalleeState] = INLINE { RemoveCalleeInternal[state]; }; RemoveCalleeInternal: INTERNAL PROC [state: CalleeState] = { lag: CalleeState _ NIL; IF state # NIL AND state.entered THEN FOR p: CalleeState _ callees, p.next WHILE p # NIL DO SELECT p FROM state => { IF lag = NIL THEN callees _ p.next ELSE lag.next _ p.next; state.entered _ FALSE; calleesCount _ calleesCount - 1; RETURN; }; NIL => Crash[]; ENDCASE; lag _ p; ENDLOOP; Crash[]; <> <> }; FindCallee: ENTRY PROC [given: HeaderPtr] RETURNS [BOOL] = { <> IF given # NIL THEN FOR p: CalleeState _ callees, p.next WHILE p # NIL DO state: HeaderPtr _ p.state; SELECT TRUE FROM state = NIL => {}; state.conv = given.conv --AND same originator .... -- AND state.pktID.activity = given.pktID.activity AND state.pktID.callSeq = given.pktID.callSeq => { given^.destPSB _ p.callee; RETURN [TRUE]; }; ENDCASE; ENDLOOP; RETURN [FALSE]; }; FinalizeCalleeBlocks: PROC [fq: SafeStorage.FinalizationQueue] = { DO WITH SafeStorage.FQNext[fq] SELECT FROM state: CalleeState => FreeCalleeState[state]; ENDCASE; ENDLOOP; }; <> HashKey: TYPE = [0..127]; ConnectionData: TYPE = RECORD[next: Connection, id: RPCPkt.ConnectionID, call: RPCPkt.CallCount, conv: RPC.Conversation -- NB: opaque type --]; Connection: TYPE = REF ConnectionData; connections: REF ARRAY HashKey OF Connection = NEW[ARRAY HashKey OF Connection _ ALL[NIL]]; ForgetConnections: INTERNAL PROC = { <> FOR hash: HashKey IN HashKey DO connections[hash] _ NIL ENDLOOP; }; <> serverDataLength: RPCLupine.DataLength = RPCLupine.maxDataLength; ServerMain: PUBLIC PROC = { myStateBlock: CalleeState _ AllocCalleeState[]; pktSpace: VM.Interval = myStateBlock.space; myPkt: RPCLupine.RPCPkt = RPCLupine.GetRPCPkt[VM.AddressForPageNumber[pktSpace.page]]; recvdHeader: HeaderPtr _ myStateBlock.state _ @myPkt.header; newPkt: BOOL _ FALSE; -- Whether packet is valid -- decrypted: BOOL _ FALSE; -- if "newPkt", whether it's been decrypted -- newLength: RPCLupine.DataLength; -- iff "newPkt" and "decrypted", pkt's length -- connection: Connection; LookupCaller: ENTRY PROC [id: RPCPkt.ConnectionID] RETURNS [{new, old, phoney, unknown}] = { <> <> <> <> <> <> <> <> <> <> <> ENABLE UNWIND => Crash[]; connection _ connections[ PrincOpsUtils.BITAND[ PrincOpsUtils.BITXOR[LOOPHOLE[id.caller, WORD], id.activity], LAST[HashKey]]]; DO SELECT TRUE FROM connection = NIL => { IF recvdHeader.type.class # call THEN RETURN[old]; RETURN[unknown]; }; id.conv = connection.id.conv AND id.caller = connection.id.caller AND recvdHeader.srcePSB = connection.id.activity => { myPkt.convHandle _ connection.conv; IF NOT decrypted THEN { IF connection.conv # RPC.unencrypted THEN { ok: BOOL; [ok, newLength] _ RPCInternal.DecryptPkt[recvdHeader, myPkt.convHandle]; decrypted _ TRUE; IF NOT ok THEN RETURN[phoney]; } ELSE { newLength _ recvdHeader.length - RPCPkt.pktLengthOverhead; decrypted _ TRUE; }; }; IF recvdHeader.pktID.activity # recvdHeader.srcePSB THEN RETURN[phoney]; IF recvdHeader.type.class # call THEN RETURN[old]; IF recvdHeader.pktID.callSeq > connection.call THEN { IF recvdHeader.pktID.pktSeq # 1 THEN RETURN[phoney]; connection.call _ recvdHeader.pktID.callSeq; AddCalleeInternal[myStateBlock]; RETURN[new] } ELSE RETURN[old] }; ENDCASE => connection _ connection.next; ENDLOOP; }; NoteConnection: ENTRY PROC [id: RPCPkt.ConnectionID, call: RPCPkt.CallCount, conv: RPC.Conversation] = { prev: Connection _ NIL; hash: HashKey = PrincOpsUtils.BITAND[PrincOpsUtils.BITXOR[LOOPHOLE[id.caller, WORD], id.activity], LAST[HashKey]]; connection _ connections[hash]; DO SELECT TRUE FROM connection = NIL => { connection _ NEW[ConnectionData _ [next: NIL, id: id, call: call-1, conv: conv] ]; IF prev = NIL THEN connections[hash] _ connection ELSE prev.next _ connection; EXIT }; id.conv = connection.id.conv AND id.caller = connection.id.caller AND id.activity = connection.id.activity => -- already there! -- EXIT; ENDCASE => { prev _ connection; connection _ connection.next }; ENDLOOP; }; <> DO ENABLE { ABORTED => EXIT; UNWIND => FreeCalleeState[myStateBlock] }; IF NOT newPkt THEN { RPCPkt.IdleReceive[myPkt, RPCLupine.maxPupWords]; newPkt _ TRUE; decrypted _ FALSE; }; SELECT LookupCaller[[recvdHeader.conv, recvdHeader.srceHost, recvdHeader.srcePSB]] FROM new => { <> target: RPCPkt.DispatcherDetails = recvdHeader.dispatcher; resultLength: RPCLupine.DataLength; RPCPkt.SetupResponse[recvdHeader]; IF target.dispatcherHint >= RPCInternal.exportTable.used OR target.dispatcherID = RPCPkt.noDispatcher OR target.dispatcherID # RPCInternal.exportTable[target.dispatcherHint].id THEN { Reject[myPkt, unbound]; resultLength _ 0 } ELSE resultLength _ RPCInternal.exportTable[target.dispatcherHint].dispatcher[ myPkt, newLength, recvdHeader.type.eom = end, connection.conv ! RPC.CallFailed => TRUSTED { newPkt _ FALSE; RemoveCallee[myStateBlock]; LOOP; }; UnwindRequested => { <> <> resultLength _ 0; CONTINUE; }; RejectUnbound => { <> Reject[myPkt, unbound]; resultLength _ 0; CONTINUE; }; RejectProtocol => { <> Reject[myPkt, protocol]; resultLength _ 0; CONTINUE; }; ]; RemoveCallee[myStateBlock]; [newPkt, newLength] _ RPCPkt.PktExchange[myPkt, resultLength, serverDataLength, endCall ! RPC.CallFailed => TRUSTED {newPkt _ FALSE; CONTINUE}]; IF newPkt THEN decrypted _ TRUE; <> }; unknown => { <> ok: BOOL; id: RPCPkt.ConnectionID; call: RPCPkt.CallCount; conv: RPC.Conversation; l: RPCLupine.DataLength; [ok, id, call, conv, l] _ RPCInternal.GetConnectionState[decrypted, myPkt ! RPC.CallFailed => TRUSTED {newPkt_FALSE; LOOP}]; IF ok THEN { IF NOT newPkt THEN ERROR; IF NOT decrypted THEN {decrypted _ TRUE; newLength _ l}; NoteConnection[id, call, conv]; } ELSE newPkt _ FALSE; }; phoney => <> newPkt _ FALSE; old => { <> oldDest: PrincOps.PsbIndex = recvdHeader.destPSB; knownCallee: BOOL = decrypted AND FindCallee[recvdHeader]--may alter destPSB--; IF knownCallee AND recvdHeader.destPSB # oldDest THEN { <> IF decrypted THEN recvdHeader.length _ IF myPkt.convHandle = RPC.unencrypted THEN RPCPkt.pktLengthOverhead + newLength ELSE RPCInternal.EncryptPkt[myPkt, newLength]; EnqueueForNewPSB[myPkt]; } ELSE { <> IF recvdHeader.type.ack = pleaseAck AND recvdHeader.type.eom = end AND( recvdHeader.type.class = data OR knownCallee ) THEN { recvdHeader.length _ IF NOT decrypted OR myPkt.convHandle = RPC.unencrypted THEN RPCPkt.pktLengthOverhead ELSE RPCInternal.EncryptPkt[myPkt, 0]; GenerateIdlerResponse[myPkt]; }; }; newPkt _ FALSE; }; ENDCASE => ERROR; ENDLOOP; FreeCalleeState[myStateBlock]; }; <<******** Remote signalling ********>> StartSignal: PUBLIC PROC [signalPkt: RPCLupine.RPCPkt] = { ConcreteHeader[@signalPkt.header].outcome _ signal; }; UnwindRequested: ERROR = CODE; -- internal: remote machine is unwinding a signal PktData: TYPE = REF PktSpace; PktSpace: TYPE = ARRAY [1..serverDataLength + RPCLupine.pktOverhead] OF WORD; DoSignal: PUBLIC PROC [b: BufferDefs.PupBuffer, pktLength: RPCLupine.DataLength, signalHandler: RPCLupine.Dispatcher, convHandle: RPC.Conversation] RETURNS [resumePkt: RPCLupine.RPCPkt _ NIL, resumeLength: RPCLupine.DataLength _ 0, myLocalFrame: POINTER _ NIL] = { IF pktLength > serverDataLength THEN { GiveBackBuffer[b]; ERROR RPC.CallFailed[runtimeProtocol] } ELSE { myStateBlock: CalleeState _ AllocCalleeState[]; pktSpace: VM.Interval = myStateBlock.space; pkt: RPCLupine.RPCPkt = RPCLupine.GetRPCPkt[VM.AddressForPageNumber[pktSpace.page]]; recvdHeader: HeaderPtr _ myStateBlock.state _ @pkt.header; <> PrincOpsUtils.LongCopy[from: @b.pupLength, to: recvdHeader, nwords: pktLength+SIZE[Header]]; { ENABLE UNWIND => FreeCalleeState[myStateBlock]; handlerFailed: BOOL _ FALSE; -- CallFailed raised inside signalHandler! -- pkt.convHandle _ convHandle; GiveBackBuffer[b]; AddCallee[myStateBlock]; RPCPkt.SetupResponse[recvdHeader]; IF signalHandler = NIL THEN { Reject[pkt, unbound]; resumeLength _ 0 } ELSE resumeLength _ signalHandler[ pkt, pktLength, recvdHeader.type.eom = end, convHandle ! RPC.CallFailed => TRUSTED {handlerFailed _ TRUE}; UNWIND => IF NOT handlerFailed THEN { recvdHeader.outcome _ unwind; resumeLength _ RPCPkt.PktExchange[pkt, 0, serverDataLength, call, signalHandler].newLength; SELECT recvdHeader.outcome FROM result => NULL -- let our UNWIND propagate --; signal => ERROR -- handled inside RPCPkt.PktExchange--; ENDCASE => <> ERROR RPC.CallFailed[runtimeProtocol]; RPCPkt.SetupResponse[recvdHeader]; }; <> UnwindRequested => { <> resumeLength _ 0; CONTINUE; }; RejectUnbound => { Reject[pkt, unbound]; resumeLength _ 0; CONTINUE }; RejectProtocol => { Reject[pkt, protocol]; resumeLength _ 0; CONTINUE }; ]; }; FreeCalleeState[myStateBlock]; }; }; <<******** Remote call failure ********>> RejectUnbound: PUBLIC ERROR = CODE; RejectProtocol: PUBLIC ERROR = CODE; Reject: PROC [pkt: RPCLupine.RPCPkt, rejection: RPCPkt.Outcome] = { header: HeaderPtr = @pkt.header; UNTIL header.type.eom = end DO [,] _ ReceiveExtraPkt[pkt ! RPC.CallFailed => TRUSTED {rejection _ protocol; EXIT}] ENDLOOP; header.outcome _ rejection; }; <<******** Initialization ********>> Initialize: ENTRY PROC = { myAddr: PupTypes.PupAddress = PupDefs.AnyLocalPupAddress[RPCPrivate.rpcSocket]; fq: SafeStorage.FinalizationQueue _ SafeStorage.NewFQ[48]; SafeStorage.EstablishFinalization[CODE[CalleeStateRec], 0, fq ! SafeStorage.CantEstablishFinalization => { SafeStorage.ReEstablishFinalization[CODE[CalleeStateRec], 0, fq] }]; myHost _ [net: myAddr.net, host: myAddr.host]; Process.Detach[FORK FinalizeCalleeBlocks[fq]]; }; Restart: ENTRY PROC = { ForgetConnections[] }; Initialize[]; DO STOP; Restart[]; ENDLOOP; }. <> <> <> <> <<>>