<> <> <> <> <> <> <> <> <> <> DIRECTORY AlpineEnvironment, AlpineImport, AlpineTransMgr, AlpineTransMgrRpcControl, ConversationTable, Coordinator, CoordinatorInternal, Process, RPC; CoordinatorRemoteCallsImpl: MONITOR IMPORTS AlpineImport, AlpineTransMgrRpcControl, ConversationTable, CoordinatorInternal, Process, RPC EXPORTS CoordinatorInternal = BEGIN Conversation: TYPE = AlpineEnvironment.Conversation; TransID: TYPE = AlpineEnvironment.TransID; nProcessesCalling: INT _ 0; <> maxProcessesCalling: INT = 20; nProcessesIdle: INT _ 0; <> maxProcessesIdle: INT = 10; parmsTaken: CONDITION; <> parmsArrived: CONDITION; <> Parms: TYPE = CoordinatorInternal.Parms; parms: Parms; parmsOccupied: BOOL _ FALSE; <> Results: TYPE = Coordinator.Results; <> timesCalledPassParms: INT _ 0; timesWaitedOnParmsTaken: INT _ 0; <> <> PassParms: PUBLIC ENTRY PROC [p: Parms] = { ENABLE UNWIND => NULL; timesCalledPassParms _ timesCalledPassParms + 1; WHILE parmsOccupied DO timesWaitedOnParmsTaken _ timesWaitedOnParmsTaken + 1; WAIT parmsTaken; ENDLOOP; parms _ p; parmsOccupied _ TRUE; IF nProcessesIdle > 0 THEN NOTIFY parmsArrived ELSE IF nProcessesCalling < maxProcessesCalling THEN { nProcessesCalling _ nProcessesCalling + 1; Process.Detach[FORK CallerProcess[]]; }; }; CallerProcess: PROC [] = { <> parms: Parms; conversation: RPC.Conversation; alpineTransMgr: AlpineTransMgrRpcControl.InterfaceRecord; whyCallFailed: RPC.CallFailure; result: Results; DO GetParms[@parms ! Stop => GOTO stop]; conversation _ ConversationTable.Fetch[parms.w.first.worker]; alpineTransMgr _ AlpineImport.GetTransMgrInterface[parms.w.first.worker]; IF conversation = NIL OR alpineTransMgr = NIL THEN { result _ [bindingFailed, none[]]; } ELSE { ENABLE { RPC.CallFailed => CHECKED { whyCallFailed _ why; GOTO callFailed }; alpineTransMgr.Refused => { }; }; SELECT parms.proc FROM prepare => { state: AlpineEnvironment.WorkerState _ alpineTransMgr.WorkerPrepare[ conversation, parms.c.transID, parms.newTrans]; result _ [none, prepare[state]]; }; finish => { alpineTransMgr.WorkerFinish[conversation, parms.c.transID, parms.requiredOutcome]; result _ [none, finish[]]; }; ENDCASE => ERROR; EXITS callFailed => { SELECT whyCallFailed FROM timeout, unbound => { parms.w.first.worker.TransMgrInterfaceCallFailed[alpineTransMgr]; result _ [callFailed, none[]] }; busy => result _ [busy, none[]]; runtimeProtocol, stubProtocol => ERROR; ENDCASE => ERROR; }; }; alpineTransMgr _ NIL; CoordinatorInternal.ReturnResults[parms.c, parms.w, result]; ENDLOOP; EXITS stop => RETURN }; GetParms: ENTRY PROC [parmsPtr: --RESULT--POINTER TO Parms] = INLINE { < calling process should die.>> <> <> ENABLE UNWIND => nProcessesIdle _ nProcessesIdle - 1; nProcessesCalling _ nProcessesCalling - 1; WHILE NOT parmsOccupied DO IF nProcessesIdle = maxProcessesIdle THEN <> RETURN WITH ERROR Stop; nProcessesIdle _ nProcessesIdle + 1; WAIT parmsArrived; nProcessesIdle _ nProcessesIdle - 1; ENDLOOP; parmsPtr^ _ parms; parmsOccupied _ FALSE; nProcessesCalling _ nProcessesCalling + 1; NOTIFY parmsTaken; }; Stop: ERROR = CODE; Process.DisableTimeout[@parmsTaken]; Process.EnableAborts[@parmsTaken]; Process.DisableTimeout[@parmsArrived]; Process.EnableAborts[@parmsArrived]; END. <> <> <<>>