<> <> <> DIRECTORY BufferDefs USING[ PupBuffer ], PrincOpsUtils USING[ BITAND, BITXOR, GetReturnLink, IsBound, LongCOPY, MyLocalFrame, PsbHandleToIndex, ReadPSB ], PrincOps USING[ PsbIndex, PsbNull ], PupDefs USING[ AnyLocalPupAddress, GetFreePupBuffer, PupRouterSendThis, ReturnFreePupBuffer], PupTypes USING[ PupAddress, PupHostID, PupNetID, PupSocketID ], RPC USING[ CallFailed, Conversation, unencrypted ], RPCInternal USING[ ConversationObject, DecryptPkt, EncryptPkt, exportTable, firstConversation, GetConnectionState, ImportInstance ], RPCLupine USING[ DataLength, Dispatcher, GetRPCPkt, Header, maxDataLength, maxPupWords, pktOverhead, RPCPkt ], RPCPkt USING[ CallCount, ConnectionID, DispatcherDetails, EnqueueAgain, IdleReceive, Header, Machine, noDispatcher, Outcome, PktExchange, pktLengthOverhead, SetupResponse ], RPCPrivate USING[ rpcSocket, ReturnBuffer ], VM USING[ AddressForPageNumber, Allocate, Free, Interval, PagesForWords, Pin ]; RPCPktStreams: MONITOR IMPORTS PrincOpsUtils, PupDefs, RPC, RPCLupine, RPCInternal, RPCPkt, RPCPrivate, VM EXPORTS RPC--Header,ConversationObject--, RPCInternal--DoSignal, ServerMain--, RPCLupine--lots of things-- SHARES BufferDefs, RPCLupine = BEGIN Header: PUBLIC TYPE = RPCPkt.Header; ConcreteHeader: PROC[abstract: LONG POINTER TO RPCLupine.Header] RETURNS[LONG POINTER TO Header] = INLINE { RETURN[ abstract ] }; myHost: RPCPkt.Machine; GiveBackBuffer: PROC[b: BufferDefs.PupBuffer] = <> IF PrincOpsUtils.IsBound[RPCPrivate.ReturnBuffer] THEN RPCPrivate.ReturnBuffer ELSE PupDefs.ReturnFreePupBuffer; -- ******** Caller ******** -- <> CallDestHint: TYPE = ARRAY PrincOps.PsbIndex OF PrincOps.PsbIndex; lastCallDest: REF CallDestHint = NEW[CallDestHint_ALL[PrincOps.PsbNull]]; RecordCallDest: ENTRY PROC[header: LONG POINTER TO Header] = INLINE { lastCallDest[header.destPSB--myPSB--] _ header.srcePSB }; ImportInstance: PUBLIC TYPE = RPCInternal.ImportInstance; <> ConversationObject: PUBLIC TYPE = RPCInternal.ConversationObject; Conversation: TYPE = REF ConversationObject; MisusedConversation: ERROR = CODE; StartCall: PUBLIC ENTRY PROC[callPkt: RPCLupine.RPCPkt, interface: REF ImportInstance, localConversation: Conversation _ RPC.unencrypted] = BEGIN myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]]; header: LONG POINTER TO Header = @callPkt.header; header.destHost _ interface.host; header.destSoc _ RPCPrivate.rpcSocket; header.destPSB _ lastCallDest[myPSB]; callPkt.convHandle _ localConversation; IF localConversation = RPC.unencrypted THEN header.conv _ RPCInternal.firstConversation ELSE BEGIN -- ?? header.conv _ RPCInternal.GetPktConversation[localConversation] -- header.conv _ [localConversation.id.count.ls, caller, localConversation.id.count.ms]; IF localConversation.id.originator # myHost THEN BEGIN IF header.destHost # localConversation.id.originator THEN ERROR MisusedConversation[!UNWIND => NULL]; header.conv.originator _ callee; END; END; header.pktID.activity _ myPSB; -- header.pktID.callSeq gets filled in by PktExchange -- header.pktID.pktSeq _ 0; -- => new call -- header.dispatcher _ interface.dispatcher; END; Call: PUBLIC PROC[pkt: RPCLupine.RPCPkt, callLength: RPCLupine.DataLength, maxReturnLength: RPCLupine.DataLength, signalHandler: RPCLupine.Dispatcher _ NIL] RETURNS[ returnLength: RPCLupine.DataLength, lastPkt: BOOLEAN] = BEGIN recvdHeader: LONG POINTER TO Header = @pkt.header; returnLength _ RPCPkt.PktExchange[pkt, callLength, maxReturnLength, call, signalHandler ].newLength; RecordCallDest[recvdHeader]; SELECT recvdHeader.outcome FROM result => NULL; unbound => ERROR RPC.CallFailed[unbound]; protocol => ERROR RPC.CallFailed[runtimeProtocol]; signal => ERROR -- handled inside RPCPkt.PktExchange --; unwind => -- This is legal only if we were called to raise a remote signal; UnwindRequested should be caught where we called the dispatcher << that noticed the signal>> { RPCPkt.SetupResponse[recvdHeader]; ERROR UnwindRequested[] }; ENDCASE => --unwind,garbage-- ERROR RPC.CallFailed[runtimeProtocol]; RPCPkt.SetupResponse[recvdHeader]; RETURN[ returnLength, recvdHeader.type.eom = end ] END; -- ******** Protocol implementation: multi-packet case ******** -- SendPrelimPkt: PUBLIC PROC[pkt: RPCLupine.RPCPkt, length: RPCLupine.DataLength] = { [] _ RPCPkt.PktExchange[pkt, length, 0, sending] }; ReceiveExtraPkt: PUBLIC PROC[pkt: RPCLupine.RPCPkt] RETURNS[ length: RPCLupine.DataLength, lastPkt: BOOLEAN] = BEGIN recvdHeader: LONG POINTER TO Header; length _ RPCPkt.PktExchange[pkt, 0, RPCLupine.maxDataLength, receiving].newLength; recvdHeader _ @pkt.header; RPCPkt.SetupResponse[recvdHeader]; RETURN[ length, recvdHeader.type.eom = end ] END; -- ******** Protocol implementation: callee and packets-while-notWanting ******** -- idlerAckCount: CARDINAL _ 0; idlerRequeueCount: CARDINAL _ 0; GenerateIdlerResponse: PROC[recvd: RPCLupine.RPCPkt] = BEGIN -- packet is encrypted! ackPkt: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; header: LONG POINTER TO Header = LOOPHOLE[@ackPkt.pupLength]; recvdHeader: LONG POINTER TO Header = @recvd.header; workerPSB: PrincOps.PsbIndex = recvdHeader.destPSB; -- as adjusted by FindCallee -- idlerAckCount _ idlerAckCount+1; RPCPkt.SetupResponse[recvdHeader]; header^ _ recvdHeader^; header.length _ recvdHeader.length; header.oddByte _ no; header.type _ [0,rpc,end,dontAck,ack]; header.srceHost _ myHost; header.srceSoc _ RPCPrivate.rpcSocket; header.srcePSB _ workerPSB; PupDefs.PupRouterSendThis[ackPkt]; END; EnqueueForNewPSB: PROC[recvd: RPCLupine.RPCPkt] = BEGIN -- packet is encrypted! pupPkt: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; header: LONG POINTER TO Header = LOOPHOLE[@pupPkt.pupLength]; recvdHeader: LONG POINTER TO Header = @recvd.header; idlerRequeueCount _ idlerRequeueCount+1; PrincOpsUtils.LongCOPY[from: recvdHeader, to: header, nwords: recvdHeader.length]; RPCPkt.EnqueueAgain[pupPkt]; END; <> CalleeState: TYPE = RECORD[ next: POINTER TO CalleeState, callee: PrincOps.PsbIndex, state: LONG POINTER TO Header]; callees: POINTER TO CalleeState _ NIL; EntryAddCallee: ENTRY PROC[stateBlock: POINTER TO CalleeState] = INLINE { AddCallee[stateBlock] }; AddCallee: INTERNAL PROC[stateBlock: POINTER TO CalleeState] = INLINE { stateBlock^.next _ callees; callees _ stateBlock }; RemoveCallee: ENTRY PROC[stateBlock: POINTER TO CalleeState] = BEGIN FOR p: POINTER TO POINTER TO CalleeState _ @callees, @(p^.next) DO SELECT TRUE FROM p^ = stateBlock => { p^ _ p^.next; RETURN }; p^ = NIL => ERROR; ENDCASE => NULL; ENDLOOP; END; FindCallee: ENTRY PROC[given: LONG POINTER TO Header] RETURNS[BOOLEAN] = BEGIN <> FOR p: POINTER TO CalleeState _ callees, p.next DO SELECT TRUE FROM p = NIL => RETURN[FALSE]; p.state.conv = given.conv --AND same originator .... -- AND p.state.pktID.activity = given.pktID.activity AND p.state.pktID.callSeq = given.pktID.callSeq => { given^.destPSB _ p.callee; RETURN[TRUE] }; ENDCASE => NULL; ENDLOOP; END; <> HashKey: TYPE = [0..127]; ConnectionData: TYPE = RECORD[next: Connection, id: RPCPkt.ConnectionID, call: RPCPkt.CallCount, conv: RPC.Conversation -- NB: opaque type --]; Connection: TYPE = REF ConnectionData; connections: REF ARRAY HashKey OF Connection = NEW[ARRAY HashKey OF Connection _ ALL[NIL]]; ForgetConnections: INTERNAL PROC = -- Forget connection state, so that subsequent calls will cause an RFA BEGIN FOR hash: HashKey IN HashKey DO connections[hash] _ NIL ENDLOOP; END; <> serverDataLength: RPCLupine.DataLength = RPCLupine.maxDataLength; ServerMain: PUBLIC PROC = BEGIN myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]]; pktSpace: VM.Interval = VM.Allocate[VM.PagesForWords[serverDataLength+RPCLupine.pktOverhead]]; myPkt: RPCLupine.RPCPkt = RPCLupine.GetRPCPkt[VM.AddressForPageNumber[pktSpace.page]]; recvdHeader: LONG POINTER TO Header = @myPkt.header; myStateBlock: CalleeState _ [NIL, myPSB, recvdHeader]; newPkt: BOOLEAN _ FALSE; -- Whether packet is valid -- decrypted: BOOLEAN _ FALSE; -- if "newPkt", whether it's been decrypted -- newLength: RPCLupine.DataLength; -- iff "newPkt" and "decrypted", pkt's length -- connection: Connection; Cleanup: PROC = { VM.Free[pktSpace] }; LookupCaller: ENTRY PROC[id: RPCPkt.ConnectionID] RETURNS[{new, old, phoney, unknown}] = INLINE <> <> <> <> <> <> <> <> <> <> <> BEGIN connection _ connections[ PrincOpsUtils.BITAND[PrincOpsUtils.BITXOR[id.caller,id.activity],LAST[HashKey]]]; DO SELECT TRUE FROM connection = NIL => BEGIN IF recvdHeader.type.class # call THEN RETURN[old]; RETURN[unknown]; END; id.conv = connection.id.conv AND id.caller = connection.id.caller AND recvdHeader.srcePSB = connection.id.activity => BEGIN myPkt.convHandle _ connection.conv; IF NOT decrypted THEN BEGIN IF connection.conv # RPC.unencrypted THEN BEGIN ok: BOOLEAN; [ok, newLength] _ RPCInternal.DecryptPkt[recvdHeader, myPkt.convHandle]; decrypted _ TRUE; IF NOT ok THEN RETURN[phoney]; END ELSE BEGIN newLength _ recvdHeader.length - RPCPkt.pktLengthOverhead; decrypted _ TRUE; END; END; IF recvdHeader.pktID.activity # recvdHeader.srcePSB THEN RETURN[phoney]; IF recvdHeader.type.class # call THEN RETURN[old]; IF recvdHeader.pktID.callSeq > connection.call THEN BEGIN IF recvdHeader.pktID.pktSeq # 1 THEN RETURN[phoney]; connection.call _ recvdHeader.pktID.callSeq; AddCallee[@myStateBlock]; RETURN[new] END ELSE RETURN[old] END; ENDCASE => connection _ connection.next; ENDLOOP; END; NoteConnection: ENTRY PROC[id: RPCPkt.ConnectionID, call: RPCPkt.CallCount, conv: RPC.Conversation] = BEGIN prev: Connection _ NIL; hash: HashKey = PrincOpsUtils.BITAND[PrincOpsUtils.BITXOR[id.caller,id.activity],LAST[HashKey]]; connection _ connections[hash]; DO SELECT TRUE FROM connection = NIL => BEGIN connection _ NEW[ConnectionData _ [next: NIL, id: id, call: call-1, conv: conv] ]; IF prev = NIL THEN connections[hash] _ connection ELSE prev.next _ connection; EXIT END; id.conv = connection.id.conv AND id.caller = connection.id.caller AND id.activity = connection.id.activity => -- already there! -- EXIT; ENDCASE => { prev _ connection; connection _ connection.next }; ENDLOOP; END; VM.Pin[pktSpace]; <> DO ENABLE { ABORTED => EXIT; UNWIND => Cleanup[] }; IF NOT newPkt THEN { RPCPkt.IdleReceive[myPkt, RPCLupine.maxPupWords]; newPkt _ TRUE; decrypted _ FALSE }; SELECT LookupCaller[ id: [recvdHeader.conv, recvdHeader.srceHost, recvdHeader.srcePSB] ] FROM new => -- start of new call -- BEGIN target: RPCPkt.DispatcherDetails = recvdHeader.dispatcher; resultLength: RPCLupine.DataLength; RPCPkt.SetupResponse[recvdHeader]; IF target.dispatcherHint >= RPCInternal.exportTable.used OR target.dispatcherID = RPCPkt.noDispatcher OR target.dispatcherID # RPCInternal.exportTable[target.dispatcherHint].id THEN { Reject[myPkt, unbound]; resultLength _ 0 } ELSE resultLength _ RPCInternal.exportTable[target.dispatcherHint].dispatcher[ myPkt, newLength, recvdHeader.type.eom = end, connection.conv ! RPC.CallFailed => TRUSTED{newPkt _ FALSE; RemoveCallee[@myStateBlock]; LOOP}; UnwindRequested => <> <> TRUSTED{ resultLength _ 0; CONTINUE }; RejectUnbound => -- The dispatcher wants caller to get CallFailed[unbound] -- TRUSTED{ Reject[myPkt, unbound]; resultLength _ 0; CONTINUE }; RejectProtocol => -- The dispatcher wants caller to get CallFailed[badProtocol] -- TRUSTED{ Reject[myPkt, protocol]; resultLength _ 0; CONTINUE }; ABORTED--which also causes an EXIT--, UNWIND => RemoveCallee[@myStateBlock] ]; RemoveCallee[@myStateBlock]; [newPkt, newLength] _ RPCPkt.PktExchange[myPkt, resultLength, serverDataLength, endCall ! RPC.CallFailed => TRUSTED{ newPkt _ FALSE; CONTINUE }]; IF newPkt THEN decrypted _ TRUE; -- now newPkt=FALSE or myPkt is decrypted and contains start of new call END; unknown => -- need to ask other end for connection state -- BEGIN ok: BOOLEAN; id: RPCPkt.ConnectionID; call: RPCPkt.CallCount; conv: RPC.Conversation; l: RPCLupine.DataLength; [ok, id, call, conv, l] _ RPCInternal.GetConnectionState[decrypted, myPkt ! RPC.CallFailed => TRUSTED{ newPkt_FALSE; LOOP } ]; IF ok THEN BEGIN IF NOT newPkt THEN ERROR; IF NOT decrypted THEN { decrypted _ TRUE; newLength _ l }; NoteConnection[id, call, conv]; END ELSE newPkt _ FALSE; END; phoney => -- ignorable packet -- newPkt _ FALSE; old => BEGIN <> oldDest: PrincOps.PsbIndex = recvdHeader.destPSB; knownCallee: BOOL = decrypted AND FindCallee[recvdHeader]--may alter destPSB--; IF knownCallee AND recvdHeader.destPSB # oldDest THEN -- destPSB his was wrong: requeue pkt for correct process -- <> BEGIN IF decrypted THEN recvdHeader.length _ IF myPkt.convHandle = RPC.unencrypted THEN RPCPkt.pktLengthOverhead + newLength ELSE RPCInternal.EncryptPkt[myPkt, newLength]; EnqueueForNewPSB[myPkt]; END ELSE BEGIN <> IF recvdHeader.type.ack = pleaseAck AND recvdHeader.type.eom = end AND( recvdHeader.type.class = data OR knownCallee ) THEN BEGIN recvdHeader.length _ IF NOT decrypted OR myPkt.convHandle = RPC.unencrypted THEN RPCPkt.pktLengthOverhead ELSE RPCInternal.EncryptPkt[myPkt, 0]; GenerateIdlerResponse[myPkt]; END; END; newPkt _ FALSE; END; ENDCASE => ERROR; ENDLOOP; Cleanup[]; END; -- ******** Remote signalling ******** -- StartSignal: PUBLIC PROC[signalPkt: RPCLupine.RPCPkt] = { ConcreteHeader[@signalPkt.header].outcome _ signal }; UnwindRequested: ERROR = CODE; -- internal: remote machine is unwinding a signal -- DoSignal: PUBLIC PROC[b: BufferDefs.PupBuffer, pktLength: RPCLupine.DataLength, signalHandler: RPCLupine.Dispatcher, convHandle: RPC.Conversation] RETURNS[resumePkt: RPCLupine.RPCPkt, resumeLength: RPCLupine.DataLength, myLocalFrame: POINTER] = BEGIN myPktSpace: ARRAY [1..serverDataLength + RPCLupine.pktOverhead] OF WORD; pkt: RPCLupine.RPCPkt = RPCLupine.GetRPCPkt[@myPktSpace]; recvdHeader: LONG POINTER TO Header = @pkt.header; myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]]; <> myStateBlock: CalleeState _ [NIL, myPSB, recvdHeader]; BEGIN -- copy from the Pup buffer into our frame IF pktLength > serverDataLength THEN ERROR RPC.CallFailed[runtimeProtocol ! UNWIND => GiveBackBuffer[b]]; PrincOpsUtils.LongCOPY[from: @b.pupLength, to: recvdHeader, nwords: pktLength+SIZE[Header]]; GiveBackBuffer[b]; END; pkt.convHandle _ convHandle; EntryAddCallee[@myStateBlock]; BEGIN ENABLE UNWIND => RemoveCallee[@myStateBlock]; handlerFailed: BOOLEAN _ FALSE; -- CallFailed raised inside signalHandler! -- RPCPkt.SetupResponse[recvdHeader]; IF signalHandler = NIL THEN { Reject[pkt, unbound]; resumeLength _ 0 } ELSE resumeLength _ signalHandler[ pkt, pktLength, recvdHeader.type.eom = end, convHandle ! RPC.CallFailed => TRUSTED{ handlerFailed _ TRUE }; UNWIND => IF NOT handlerFailed THEN BEGIN recvdHeader.outcome _ unwind; resumeLength _ RPCPkt.PktExchange[pkt, 0, serverDataLength, call, signalHandler].newLength; SELECT recvdHeader.outcome FROM result => NULL -- let our UNWIND propagate --; signal => ERROR -- handled inside RPCPkt.PktExchange--; ENDCASE => --unbound,protocol,unwind,garbage-- ERROR RPC.CallFailed[runtimeProtocol]; RPCPkt.SetupResponse[recvdHeader]; END; <> UnwindRequested => <> { resumeLength _ 0; CONTINUE }; RejectUnbound => { Reject[pkt, unbound]; resumeLength _ 0; CONTINUE }; RejectProtocol => { Reject[pkt, protocol]; resumeLength _ 0; CONTINUE }; ]; END; RemoveCallee[@myStateBlock]; -- Magic to return to my caller without freeing my local frame (LOOPHOLE[PrincOpsUtils.GetReturnLink[], PROC[RPCLupine.RPCPkt, RPCLupine.DataLength, POINTER]]) [pkt, resumeLength, LOOPHOLE[PrincOpsUtils.MyLocalFrame[]]]; END; -- ******** Remote call failure ******** RejectUnbound: PUBLIC ERROR = CODE; RejectProtocol: PUBLIC ERROR = CODE; Reject: PROC[pkt: RPCLupine.RPCPkt, rejection: RPCPkt.Outcome] = BEGIN header: LONG POINTER TO Header = @pkt.header; UNTIL header.type.eom = end DO [,] _ ReceiveExtraPkt[pkt ! RPC.CallFailed => TRUSTED{ rejection _ protocol; EXIT }] ENDLOOP; header.outcome _ rejection; END; -- ******** Initialization ******** Initialize: ENTRY PROC = BEGIN myAddr: PupTypes.PupAddress = PupDefs.AnyLocalPupAddress[RPCPrivate.rpcSocket]; myHost _ [net: myAddr.net, host: myAddr.host]; END; Restart: ENTRY PROC = BEGIN ForgetConnections[] END; Initialize[]; DO STOP; Restart[]; ENDLOOP; END.