CoordinatorRemoteCallsImpl:
MONITOR
IMPORTS
AlpineImport,
AlpineTransMgrRpcControl,
ConversationTable,
CoordinatorInternal,
Process,
RPC
EXPORTS
CoordinatorInternal
= BEGIN
Conversation: TYPE = AlpineEnvironment.Conversation;
TransID: TYPE = AlpineEnvironment.TransID;
nProcessesCalling:
INT ← 0;
number of processes engaged in remote calls.
maxProcessesCalling: INT = 20;
nProcessesIdle:
INT ← 0;
number of processes waiting on parmsArrived condition.
maxProcessesIdle: INT = 10;
parmsTaken:
CONDITION;
A process calling PassParms waits here if parms is occupied on entry.
parmsArrived:
CONDITION;
A process calling GetParms waits here if parms is empty on entry.
Parms: TYPE = CoordinatorInternal.Parms;
parms: Parms;
parmsOccupied:
BOOL ←
FALSE;
If parmsOccupied, then parms is meaningful.
Results: TYPE = Coordinator.Results;
Statistics on module usage:
timesCalledPassParms: INT ← 0;
timesWaitedOnParmsTaken: INT ← 0;
Returns from the procs PassParms and GetParms below are strictly interleaved, due to
the single-element parms buffer.
PassParms:
PUBLIC
ENTRY
PROC [p: Parms] = {
ENABLE UNWIND => NULL;
timesCalledPassParms ← timesCalledPassParms + 1;
WHILE parmsOccupied
DO
timesWaitedOnParmsTaken ← timesWaitedOnParmsTaken + 1;
WAIT parmsTaken;
ENDLOOP;
parms ← p;
parmsOccupied ← TRUE;
IF nProcessesIdle > 0 THEN NOTIFY parmsArrived
ELSE
IF nProcessesCalling < maxProcessesCalling
THEN {
nProcessesCalling ← nProcessesCalling + 1;
Process.Detach[FORK CallerProcess[]];
};
};
CallerProcess:
PROC [] = {
A caller process is outside the monitor except while calling GetParms.
parms: Parms;
conversation: RPC.Conversation;
alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord;
whyCallFailed: RPC.CallFailure;
result: Results;
DO
GetParms[@parms ! Stop => GOTO stop];
conversation ← ConversationTable.Fetch[parms.w.first.worker];
alpineTransMgr ← AlpineImport.GetTransMgrInterface[parms.w.first.worker];
IF conversation =
NIL
OR alpineTransMgr =
NIL
THEN {
result ← [bindingFailed, none[]];
}
ELSE {
ENABLE {
RPC.CallFailed => CHECKED { whyCallFailed ← why; GOTO callFailed };
alpineTransMgr.Refused => { };
};
SELECT parms.proc
FROM
prepare => {
state: AlpineEnvironment.WorkerState ← alpineTransMgr.WorkerPrepare[
conversation, parms.c.transID, parms.newTrans];
result ← [none, prepare[state]];
};
finish => {
alpineTransMgr.WorkerFinish[conversation, parms.c.transID, parms.requiredOutcome];
result ← [none, finish[]];
};
ENDCASE => ERROR;
EXITS
callFailed => {
SELECT whyCallFailed
FROM
timeout, unbound => {
parms.w.first.worker.TransMgrInterfaceCallFailed[alpineTransMgr];
result ← [callFailed, none[]] };
busy => result ← [busy, none[]];
runtimeProtocol, stubProtocol => ERROR;
ENDCASE => ERROR;
};
};
alpineTransMgr ← NIL;
CoordinatorInternal.ReturnResults[parms.c, parms.w, result];
ENDLOOP;
};
GetParms:
ENTRY
PROC [parmsPtr:
--RESULT--
POINTER
TO Parms] =
INLINE {
! Stop -> calling process should die.
Pass a POINTER TO Parms instead of returning Parms, to avoid warnings about
potentially unsafe long REF-containing return record.
ENABLE UNWIND => nProcessesIdle ← nProcessesIdle - 1;
nProcessesCalling ← nProcessesCalling - 1;
WHILE
NOT parmsOccupied
DO
IF nProcessesIdle = maxProcessesIdle
THEN
no call to make and already enough idle processes, so terminate process.
RETURN WITH ERROR Stop;
nProcessesIdle ← nProcessesIdle + 1;
WAIT parmsArrived;
nProcessesIdle ← nProcessesIdle - 1;
ENDLOOP;
parmsPtr^ ← parms;
parmsOccupied ← FALSE;
nProcessesCalling ← nProcessesCalling + 1;
NOTIFY parmsTaken;
};
Stop: ERROR = CODE;
Process.DisableTimeout[@parmsTaken];
Process.EnableAborts[@parmsTaken];
Process.DisableTimeout[@parmsArrived];
Process.EnableAborts[@parmsArrived];
END.