DIRECTORY
BufferDefs USING [PupBuffer],
DebuggerSwap USING [CallDebugger],
PrincOps USING [PsbIndex, PsbNull],
PrincOpsUtils USING [BITAND, BITXOR, IsBound, LongCopy, PsbHandleToIndex, ReadPSB],
Process USING [Detach],
PupDefs USING [AnyLocalPupAddress, GetFreePupBuffer, PupRouterSendThis, ReturnFreePupBuffer],
PupTypes USING [PupAddress, PupHostID, PupNetID, PupSocketID],
RPC USING [CallFailed, Conversation, unencrypted],
RPCInternal USING [ConversationObject, DecryptPkt, EncryptPkt, exportTable, firstConversation, GetConnectionState, ImportInstance],
RPCLupine USING [DataLength, Dispatcher, GetRPCPkt, Header, maxDataLength, maxPupWords, pktOverhead, RPCPkt],
RPCPkt USING [CallCount, ConnectionID, DispatcherDetails, EnqueueAgain, Header, IdleReceive, Machine, noDispatcher, Outcome, PktExchange, pktLengthOverhead, SetupResponse],
RPCPrivate USING [ReturnBuffer, rpcSocket],
SafeStorage USING [CantEstablishFinalization, EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ, ReEstablishFinalization],
VM USING [AddressForPageNumber, Interval, nullInterval, PagesForWords, Pin, SimpleAllocate, Unpin];
RPCPktStreams:
MONITOR
IMPORTS DebuggerSwap, PrincOpsUtils, Process, PupDefs, RPC, RPCLupine, RPCInternal, RPCPkt, RPCPrivate, SafeStorage, VM
EXPORTS
RPC, --Header,ConversationObject
RPCInternal, --DoSignal, ServerMain
RPCLupine -- others
SHARES BufferDefs, RPCLupine = {
******** Caller ********
For each PSB that initiates a call, record last callee PSB, and use that PSB as destPSB hint for next call, to obtain implicit ack of last result packet. The fact that the destPSB will be wrong of we next talk to a different server host is only a slight pessimization.
CallDestHint: TYPE = ARRAY PrincOps.PsbIndex OF PrincOps.PsbIndex;
lastCallDest:
REF CallDestHint =
NEW[CallDestHint ←
ALL[PrincOps.PsbNull]];
RecordCallDest:
ENTRY
PROC [header: HeaderPtr] =
INLINE {
lastCallDest[header.destPSB--myPSB--] ← header.srcePSB;
};
ImportInstance: PUBLIC TYPE = RPCInternal.ImportInstance;
During a call, a single packet is used for buffering all data sent and received. Whenever the client of RPCLupine has possesion of the buffer (after StartCall), the buffer is set up correctly for transmitting. I.e. buffer.header.dest = the remote machine. Thus, this is true on exit from StartCall, and on entry and exit to/from SendPrelimPkt, ReceiveExtraPkt, Call, and the dispatchers. This causes an extra call of SetupResponse in Call in the case where there will be no subsequent call of ReceiveExtraPkt, but it preserves my sanity.
ConversationObject: PUBLIC TYPE = RPCInternal.ConversationObject;
Conversation: TYPE = REF ConversationObject;
MisusedConversation: ERROR = CODE;
StartCall:
PUBLIC
ENTRY
PROC [callPkt: RPCLupine.RPCPkt, interface:
REF ImportInstance, localConversation: Conversation ←
RPC.unencrypted] = {
ENABLE UNWIND => NULL;
myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]];
header: HeaderPtr = @callPkt.header;
header.destHost ← interface.host;
header.destSoc ← RPCPrivate.rpcSocket;
header.destPSB ← lastCallDest[myPSB];
callPkt.convHandle ← localConversation;
IF localConversation =
RPC.unencrypted
THEN header.conv ← RPCInternal.firstConversation
ELSE {
?? header.conv ← RPCInternal.GetPktConversation[localConversation]
header.conv ← [localConversation.id.count.ls, caller, localConversation.id.count.ms];
IF localConversation.id.originator # myHost
THEN {
IF header.destHost # localConversation.id.originator
THEN
ERROR MisusedConversation[];
header.conv.originator ← callee;
};
};
header.pktID.activity ← myPSB;
header.pktID.callSeq gets filled in by PktExchange --
header.pktID.pktSeq ← 0; -- => new call --
header.dispatcher ← interface.dispatcher;
};
Call:
PUBLIC
PROC [pkt: RPCLupine.RPCPkt, callLength: RPCLupine.DataLength, maxReturnLength: RPCLupine.DataLength, signalHandler: RPCLupine.Dispatcher ←
NIL]
RETURNS [ returnLength: RPCLupine.DataLength, lastPkt:
BOOL] = {
recvdHeader: HeaderPtr = @pkt.header;
returnLength ← RPCPkt.PktExchange[pkt, callLength,
maxReturnLength, call, signalHandler ].newLength;
RecordCallDest[recvdHeader];
SELECT recvdHeader.outcome
FROM
result => NULL;
unbound => ERROR RPC.CallFailed[unbound];
protocol => ERROR RPC.CallFailed[runtimeProtocol];
signal => ERROR -- handled inside RPCPkt.PktExchange --;
unwind => {
This is legal only if we were called to raise a remote signal; UnwindRequested should be caught where we called the dispatcher that noticed the signal
RPCPkt.SetupResponse[recvdHeader];
ERROR UnwindRequested[];
};
ENDCASE =>
unwind,garbage
ERROR RPC.CallFailed[runtimeProtocol];
RPCPkt.SetupResponse[recvdHeader];
RETURN[ returnLength, recvdHeader.type.eom = end ]
};
******** Protocol implementation: callee and packets-while-notWanting ******** --
idlerAckCount: CARDINAL ← 0;
idlerRequeueCount: CARDINAL ← 0;
GenerateIdlerResponse:
PROC [recvd: RPCLupine.RPCPkt] = {
packet is encrypted!
ackPkt: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[];
header: HeaderPtr = LOOPHOLE[@ackPkt.pupLength];
recvdHeader: HeaderPtr = @recvd.header;
workerPSB: PrincOps.PsbIndex = recvdHeader.destPSB; -- as adjusted by FindCallee --
idlerAckCount ← idlerAckCount+1;
RPCPkt.SetupResponse[recvdHeader];
header^ ← recvdHeader^;
header.length ← recvdHeader.length;
header.oddByte ← no;
header.type ← [0,rpc,end,dontAck,ack];
header.srceHost ← myHost;
header.srceSoc ← RPCPrivate.rpcSocket;
header.srcePSB ← workerPSB;
PupDefs.PupRouterSendThis[ackPkt];
};
EnqueueForNewPSB:
PROC [recvd: RPCLupine.RPCPkt] = {
packet is encrypted!
pupPkt: BufferDefs.PupBuffer = PupDefs.GetFreePupBuffer[];
header: HeaderPtr = LOOPHOLE[@pupPkt.pupLength];
recvdHeader: HeaderPtr = @recvd.header;
idlerRequeueCount ← idlerRequeueCount+1;
PrincOpsUtils.LongCopy[from: recvdHeader, to: header, nwords: recvdHeader.length];
RPCPkt.EnqueueAgain[pupPkt];
};
We must maintain globally accessible state indicating current calls in the callee, so that the callee can respond to pings.
CalleeState: TYPE = REF CalleeStateRec;
CalleeStateRec:
TYPE =
RECORD [
entered: BOOL ← FALSE,
free: BOOL ← FALSE,
next: CalleeState ← NIL,
callee: PrincOps.PsbIndex ← PrincOps.PsbNull,
state: HeaderPtr ← NIL,
space: VM.Interval ← VM.nullInterval];
callees: CalleeState ← NIL;
calleesCount: INT ← 0;
freeCallees: CalleeState ← NIL;
freeCount: INT ← 0;
worldSwap:
BOOL ←
FALSE;
This flag can be used to force world-swap debugging (teledebugging) for really delicate errors. Most bugs can be found just due to RPCInternalFailure being raised.
RPCInternalFailure: ERROR = CODE;
Crash:
PROC = {
IF worldSwap
THEN DebuggerSwap.CallDebugger["RPC internal failure!"L]
ELSE ERROR RPCInternalFailure;
};
AllocCalleeState:
ENTRY
PROC
RETURNS [CalleeState] = {
myPSB: PrincOps.PsbIndex = PrincOpsUtils.PsbHandleToIndex[PrincOpsUtils.ReadPSB[]];
state: CalleeState ← freeCallees;
space: VM.Interval ← VM.nullInterval;
IF freeCallees #
NIL
THEN {
freeCallees ← state.next;
freeCount ← freeCount - 1;
space ← state.space;
}
ELSE {
state ← NEW[CalleeStateRec];
};
IF space.count = 0
THEN
space ←
VM.SimpleAllocate[
VM.PagesForWords[serverDataLength+RPCLupine.pktOverhead]];
VM.Pin[space];
state^ ← [free: FALSE, entered: FALSE, next: NIL, callee: myPSB, state: NIL, space: space];
SafeStorage.EnableFinalization[state];
RETURN [state];
};
FreeCalleeState:
ENTRY
PROC [state: CalleeState] = {
This one can be called multiple times, since it is involved in cleanup activity
IF state.free THEN RETURN;
IF state.entered THEN RemoveCalleeInternal[state];
VM.Unpin[state.space];
state.free ← TRUE;
state.next ← freeCallees;
freeCallees ← state;
freeCount ← freeCount + 1;
};
AddCallee:
ENTRY
PROC [state: CalleeState] =
INLINE {
AddCalleeInternal[state];
};
AddCalleeInternal:
INTERNAL
PROC [state: CalleeState] = {
IF state.entered THEN Crash[];
state.entered ← TRUE;
state.next ← callees;
callees ← state;
calleesCount ← calleesCount + 1;
};
RemoveCallee:
ENTRY
PROC [state: CalleeState] =
INLINE {
RemoveCalleeInternal[state];
};
RemoveCalleeInternal:
INTERNAL
PROC [state: CalleeState] = {
lag: CalleeState ← NIL;
IF state #
NIL
AND state.entered
THEN
FOR p: CalleeState ← callees, p.next
WHILE p #
NIL
DO
SELECT p
FROM
state => {
IF lag = NIL THEN callees ← p.next ELSE lag.next ← p.next;
state.entered ← FALSE;
calleesCount ← calleesCount - 1;
RETURN;
};
NIL => Crash[];
ENDCASE;
lag ← p;
ENDLOOP;
Crash[];
Remove has been called for something that is not on the callee chain.
This should not happen!
};
FindCallee:
ENTRY
PROC [given: HeaderPtr]
RETURNS [
BOOL] = {
Returns TRUE iff there is a current callee for this call, even if the callee's pktSeq differs. If result is TRUE, updates "given"s destPSB to match callee's. Assumes pkt has been decrypted.
IF given #
NIL
THEN
FOR p: CalleeState ← callees, p.next
WHILE p #
NIL
DO
state: HeaderPtr ← p.state;
SELECT
TRUE
FROM
state = NIL => {};
state.conv = given.conv
--AND same originator .... --
AND state.pktID.activity = given.pktID.activity
AND state.pktID.callSeq = given.pktID.callSeq => {
given^.destPSB ← p.callee;
RETURN [TRUE];
};
ENDCASE;
ENDLOOP;
RETURN [FALSE];
};
FinalizeCalleeBlocks:
PROC [fq: SafeStorage.FinalizationQueue] = {
DO
WITH SafeStorage.FQNext[fq]
SELECT
FROM
state: CalleeState => FreeCalleeState[state];
ENDCASE;
ENDLOOP;
};
For each calling RPCPkt.ConnectionID we must maintain a sequence number, being the last call initiated on that conversation, so that we can eliminate duplicate call request packets. This information is maintained as a hash table with linked overflow. The hash function is (connection.caller XOR connection.activity) MOD 128. The hash table is altered by LookupCaller and EndConnection, which are nested inside ServerMain for sordid efficiency reasons, and by NoteCaller.
HashKey: TYPE = [0..127];
ConnectionData:
TYPE =
RECORD[next: Connection,
id: RPCPkt.ConnectionID,
call: RPCPkt.CallCount,
conv: RPC.Conversation -- NB: opaque type --];
Connection: TYPE = REF ConnectionData;
connections:
REF
ARRAY HashKey
OF Connection =
NEW[
ARRAY HashKey
OF Connection ←
ALL[
NIL]];
ForgetConnections:
INTERNAL
PROC = {
Forget connection state, so that subsequent calls will cause an RFA
FOR hash: HashKey IN HashKey DO connections[hash] ← NIL ENDLOOP;
};
Received packets are dispatched to "ServerMain" processes (through IdleReceive) if the addressed process is not wanting to receive any packets at the time, or if the destPSB is PsbNull. Thus ServerMain serves both as the listener waiting for RFC's on a conventional rendezvous protocol, and as the process listening to the incoming per-connection socket in more heavyweight protocols. There are several cases. The packet may be the first packet of a new call - in this case, this process will handle the call. The packet may be an old duplicate packet from a dead call - in this case the packet can be ignored. The packet may be a retransmission in a current call - in this case an ack may be required. Remember that packets can arrive here in both the caller and callee hosts!
serverDataLength: RPCLupine.DataLength = RPCLupine.maxDataLength;
ServerMain:
PUBLIC
PROC = {
myStateBlock: CalleeState ← AllocCalleeState[];
pktSpace: VM.Interval = myStateBlock.space;
myPkt: RPCLupine.RPCPkt = RPCLupine.GetRPCPkt[VM.AddressForPageNumber[pktSpace.page]];
recvdHeader: HeaderPtr ← myStateBlock.state ← @myPkt.header;
newPkt: BOOL ← FALSE; -- Whether packet is valid --
decrypted: BOOL ← FALSE; -- if "newPkt", whether it's been decrypted --
newLength: RPCLupine.DataLength; -- iff "newPkt" and "decrypted", pkt's length --
connection: Connection;
LookupCaller:
ENTRY
PROC [id: RPCPkt.ConnectionID]
RETURNS [{new, old, phoney, unknown}] = {
Implicitly, recvdHeader is a parameter of LookupCaller.
If pkt starts call and ConnectionID is unknown, returns "unknown";
If pkt starts call and isn't duplicate, adds us as callee, returns "new";
If pkt is part of some previously initiated call, returns "old";
If pkt is part of some call with unknown ConnectionID, returns "phoney"
If decrypted pkt is inconsistent, returns "phoney".
Otherwise, returns "old".
On entry, packet has previously been decrypted iff "decrypted".
On exit if result is "new", pkt is decrypted
On exit if "decrypted", then myPkt.convHandle is set.
Note that if result is "old", pkt may or may not be decrypted.
ENABLE UNWIND => Crash[];
connection ← connections[
PrincOpsUtils.BITAND[
PrincOpsUtils.BITXOR[LOOPHOLE[id.caller, WORD], id.activity],
LAST[HashKey]]];
DO
SELECT
TRUE
FROM
connection =
NIL => {
IF recvdHeader.type.class # call THEN RETURN[old];
RETURN[unknown];
};
id.conv = connection.id.conv
AND id.caller = connection.id.caller
AND recvdHeader.srcePSB = connection.id.activity => {
myPkt.convHandle ← connection.conv;
IF
NOT decrypted
THEN {
IF connection.conv #
RPC.unencrypted
THEN {
ok: BOOL;
[ok, newLength] ← RPCInternal.DecryptPkt[recvdHeader, myPkt.convHandle];
decrypted ← TRUE;
IF NOT ok THEN RETURN[phoney];
}
ELSE {
newLength ← recvdHeader.length - RPCPkt.pktLengthOverhead;
decrypted ← TRUE;
};
};
IF recvdHeader.pktID.activity # recvdHeader.srcePSB THEN RETURN[phoney];
IF recvdHeader.type.class # call THEN RETURN[old];
IF recvdHeader.pktID.callSeq > connection.call
THEN {
IF recvdHeader.pktID.pktSeq # 1 THEN RETURN[phoney];
connection.call ← recvdHeader.pktID.callSeq;
AddCalleeInternal[myStateBlock];
RETURN[new]
}
ELSE RETURN[old]
};
ENDCASE => connection ← connection.next;
ENDLOOP;
};
NoteConnection:
ENTRY
PROC [id: RPCPkt.ConnectionID, call: RPCPkt.CallCount, conv:
RPC.Conversation] = {
prev: Connection ← NIL;
hash: HashKey = PrincOpsUtils.BITAND[PrincOpsUtils.BITXOR[LOOPHOLE[id.caller, WORD], id.activity], LAST[HashKey]];
connection ← connections[hash];
DO
SELECT
TRUE
FROM
connection =
NIL => {
connection ← NEW[ConnectionData ← [next: NIL, id: id, call: call-1, conv: conv] ];
IF prev =
NIL
THEN connections[hash] ← connection
ELSE prev.next ← connection;
EXIT
};
id.conv = connection.id.conv
AND id.caller = connection.id.caller
AND id.activity = connection.id.activity => -- already there! -- EXIT;
ENDCASE => { prev ← connection; connection ← connection.next };
ENDLOOP;
};
newPkt = TRUE at top of loop iff we have the first pkt of next call already. At top of loop, myPkt is decrypted if newPkt = TRUE.
DO
ENABLE { ABORTED => EXIT; UNWIND => FreeCalleeState[myStateBlock] };
IF
NOT newPkt
THEN {
RPCPkt.IdleReceive[myPkt, RPCLupine.maxPupWords];
newPkt ← TRUE;
decrypted ← FALSE;
};
SELECT LookupCaller[[recvdHeader.conv, recvdHeader.srceHost, recvdHeader.srcePSB]]
FROM
new => {
start of new call
target: RPCPkt.DispatcherDetails = recvdHeader.dispatcher;
resultLength: RPCLupine.DataLength;
RPCPkt.SetupResponse[recvdHeader];
IF target.dispatcherHint >= RPCInternal.exportTable.used
OR target.dispatcherID = RPCPkt.noDispatcher
OR target.dispatcherID # RPCInternal.exportTable[target.dispatcherHint].id
THEN { Reject[myPkt, unbound]; resultLength ← 0 }
ELSE resultLength ← RPCInternal.exportTable[target.dispatcherHint].dispatcher[
myPkt, newLength, recvdHeader.type.eom = end, connection.conv !
RPC.CallFailed =>
TRUSTED {
newPkt ← FALSE;
RemoveCallee[myStateBlock];
LOOP;
};
UnwindRequested => {
The dispatcher raised a remote signal which the remote machine
is unwinding
resultLength ← 0;
CONTINUE;
};
RejectUnbound => {
The dispatcher wants caller to get CallFailed[unbound]
Reject[myPkt, unbound];
resultLength ← 0;
CONTINUE;
};
RejectProtocol => {
The dispatcher wants caller to get CallFailed[badProtocol]
Reject[myPkt, protocol];
resultLength ← 0;
CONTINUE;
};
];
RemoveCallee[myStateBlock];
[newPkt, newLength] ← RPCPkt.PktExchange[myPkt, resultLength,
serverDataLength, endCall
! RPC.CallFailed => TRUSTED {newPkt ← FALSE; CONTINUE}];
IF newPkt THEN decrypted ← TRUE;
now newPkt=FALSE or myPkt is decrypted and contains start of new call
};
unknown => {
need to ask other end for connection state
ok: BOOL;
id: RPCPkt.ConnectionID;
call: RPCPkt.CallCount;
conv: RPC.Conversation;
l: RPCLupine.DataLength;
[ok, id, call, conv, l] ←
RPCInternal.GetConnectionState[decrypted, myPkt !
RPC.CallFailed => TRUSTED {newPkt←FALSE; LOOP}];
IF ok
THEN {
IF NOT newPkt THEN ERROR;
IF NOT decrypted THEN {decrypted ← TRUE; newLength ← l};
NoteConnection[id, call, conv];
}
ELSE newPkt ← FALSE;
};
phoney =>
ignorable packet
newPkt ← FALSE;
old => {
Pkt may or may not have been decrypted. If the packet came to us because it had an incorrect destPSB, we should try correcting it and giving it to the correct process. This ensures that destPSB is only a hint. Also, because of the restrictions on generating ack's (described below), there are cases where an ack is required but only the correct worker process is allowed to generate it.
oldDest: PrincOps.PsbIndex = recvdHeader.destPSB;
knownCallee: BOOL = decrypted AND FindCallee[recvdHeader]--may alter destPSB--;
IF knownCallee
AND recvdHeader.destPSB # oldDest
THEN {
destPSB his was wrong: requeue pkt for correct process; note that if correct process doesn't want the packet right now, it may come back to an idler process, but it will have correct destPSB
IF decrypted
THEN
recvdHeader.length ←
IF myPkt.convHandle =
RPC.unencrypted
THEN RPCPkt.pktLengthOverhead + newLength
ELSE RPCInternal.EncryptPkt[myPkt, newLength];
EnqueueForNewPSB[myPkt];
}
ELSE {
We're here because the packet doesn't start a new call. We should respond if the packet is a retransmission or a ping. We generate an ack only if the packet has eom-end. Therefore, the last packet in any direction may only be sent when the worker process has generated the ack for the preceding packet in that direction. Therefore, the last packet in any direction comes to an idler process only after the worker process has received a previous transmission of that packet (because of the way "wanting" is set in PktExchange). We assume that class=data isn't used for pings. If we're still working on the call, we generate an ack containing the worker process's PSBIndex. Beware when caller and callee are on the same host!
IF recvdHeader.type.ack = pleaseAck
AND recvdHeader.type.eom = end
AND( recvdHeader.type.class = data
OR knownCallee )
THEN {
recvdHeader.length ←
IF NOT decrypted OR myPkt.convHandle = RPC.unencrypted
THEN RPCPkt.pktLengthOverhead
ELSE RPCInternal.EncryptPkt[myPkt, 0];
GenerateIdlerResponse[myPkt];
};
};
newPkt ← FALSE;
};
ENDCASE => ERROR;
ENDLOOP;
FreeCalleeState[myStateBlock];
};
}.