-- CoordinatorRemoteCallsImpl.mesa -- Makes AlpineTransMgr calls using a pool of processes. -- Enforces a bound on the number of processes devoted to AlpineTransMgr calls. -- Last edited by -- MBrown on November 21, 1982 3:40 pm -- NOTES: -- We land in debugger if AlpineCoordinatorAndWorker.Prepare or .Finish raises --Refused[wrongCoordinator]. Also if RPC.CallFailed[protocolError] is raised. DIRECTORY AlpineEnvironment, AlpineImport, AlpineTransMgr, AlpineTransMgrRpcControl, ConversationTable, Coordinator, CoordinatorInternal, Process, RPC; CoordinatorRemoteCallsImpl: MONITOR IMPORTS AlpineImport, ConversationTable, CoordinatorInternal, Process, RPC EXPORTS CoordinatorInternal = BEGIN Conversation: TYPE = AlpineEnvironment.Conversation; TransID: TYPE = AlpineEnvironment.TransID; nProcessesCalling: INT ← 0; -- number of processes engaged in remote calls. maxProcessesCalling: INT = 20; nProcessesIdle: INT ← 0; -- number of processes waiting on parmsArrived condition. maxProcessesIdle: INT = 10; parmsTaken: CONDITION; -- A process calling PassParms waits here if parms is occupied on entry. parmsArrived: CONDITION; -- A process calling GetParms waits here if parms is empty on entry. Parms: TYPE = CoordinatorInternal.Parms; parms: Parms; parmsOccupied: BOOL ← FALSE; -- If parmsOccupied, then parms is meaningful. Results: TYPE = Coordinator.Results; -- Statistics on module usage: timesCalledPassParms: INT ← 0; timesWaitedOnParmsTaken: INT ← 0; -- Returns from the procs PassParms and GetParms below are strictly interleaved, due to --the single-element parms buffer. PassParms: PUBLIC ENTRY PROC [p: Parms] = { ENABLE UNWIND => NULL; timesCalledPassParms ← timesCalledPassParms + 1; WHILE parmsOccupied DO timesWaitedOnParmsTaken ← timesWaitedOnParmsTaken + 1; WAIT parmsTaken; ENDLOOP; parms ← p; parmsOccupied ← TRUE; IF nProcessesIdle > 0 THEN NOTIFY parmsArrived ELSE IF nProcessesCalling < maxProcessesCalling THEN { nProcessesCalling ← nProcessesCalling + 1; Process.Detach[FORK CallerProcess[]]; }; }; CallerProcess: PROC [] = { -- A caller process is outside the monitor except while calling GetParms. parms: Parms; conversation: RPC.Conversation; alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord; whyCallFailed: RPC.CallFailure; result: Results; DO GetParms[@parms ! Stop => GOTO stop]; conversation ← ConversationTable.Fetch[parms.w.first.worker]; alpineTransMgr ← AlpineImport.GetTransMgrInterface[parms.w.first.worker]; IF conversation = NIL OR alpineTransMgr = NIL THEN { result ← [bindingFailed, none[]]; } ELSE { ENABLE { RPC.CallFailed => CHECKED { whyCallFailed ← why; GOTO callFailed }; alpineTransMgr.Refused => { }; }; SELECT parms.proc FROM prepare => { state: AlpineEnvironment.WorkerState ← alpineTransMgr.WorkerPrepare[ conversation, parms.c.transID, parms.newTrans]; result ← [none, prepare[state]]; }; finish => { alpineTransMgr.WorkerFinish[conversation, parms.c.transID, parms.requiredOutcome]; result ← [none, finish[]]; }; ENDCASE => ERROR; EXITS callFailed => { SELECT whyCallFailed FROM timeout, unbound => { parms.w.first.worker.TransMgrInterfaceCallFailed[alpineTransMgr]; result ← [callFailed, none[]] }; busy => result ← [busy, none[]]; runtimeProtocol, stubProtocol => ERROR; ENDCASE => ERROR; }; }; alpineTransMgr ← NIL; CoordinatorInternal.ReturnResults[parms.c, parms.w, result]; ENDLOOP; EXITS stop => RETURN }; GetParms: ENTRY PROC [parmsPtr: --RESULT--POINTER TO Parms] = INLINE { -- ! Stop -> calling process should die. -- Pass a POINTER TO Parms instead of returning Parms, to avoid warnings about --potentially unsafe long REF-containing return record. ENABLE UNWIND => nProcessesIdle ← nProcessesIdle - 1; nProcessesCalling ← nProcessesCalling - 1; WHILE NOT parmsOccupied DO IF nProcessesIdle = maxProcessesIdle THEN -- no call to make and already enough idle processes, so terminate process. RETURN WITH ERROR Stop; nProcessesIdle ← nProcessesIdle + 1; WAIT parmsArrived; nProcessesIdle ← nProcessesIdle - 1; ENDLOOP; parmsPtr↑ ← parms; parmsOccupied ← FALSE; nProcessesCalling ← nProcessesCalling + 1; NOTIFY parmsTaken; }; Stop: ERROR = CODE; Process.DisableTimeout[@parmsTaken]; Process.EnableAborts[@parmsTaken]; Process.DisableTimeout[@parmsArrived]; Process.EnableAborts[@parmsArrived]; END.