<<>> <> <> <> <> DIRECTORY ForkOps, Process, WorkerThreads, WorkerThreadsPrivate; WorkerThreadsImpl: CEDAR MONITOR IMPORTS ForkOps, Process EXPORTS WorkerThreads = BEGIN OPEN WorkerThreads; PoolRec: TYPE = WorkerThreadsPrivate.PoolRec; ActivityRec: TYPE = WorkerThreadsPrivate.ActivityRec; PoolRep: PUBLIC TYPE = PoolRec; ActivityRep: PUBLIC TYPE = ActivityRec; IsPool: PUBLIC PROC [x: REF] RETURNS [BOOL] = { RETURN [ISTYPE[x, REF PoolRec]]; }; NarrowPool: PUBLIC PROC [x: REF] RETURNS [Pool] = { RETURN [NARROW[x, REF PoolRec]]; }; IsActivity: PUBLIC PROC [x: REF] RETURNS [BOOL] = { RETURN [ISTYPE[x, REF ActivityRec]]; }; NarrowActivity: PUBLIC PROC [x: REF] RETURNS [Activity] = { RETURN [NARROW[x, REF ActivityRec]]; }; wedged: ERROR = CODE; --only direct dependencies are detected defaultPool: Pool ¬ CreatePool[8, 2]; CreatePool: PUBLIC PROC [percentage: INT ¬ 4, initialPriority: CARD32 ¬ 2] RETURNS [Pool] = { p: REF PoolRec ¬ NEW[PoolRec]; percentage ¬ MAX[percentage, 2]; percentage ¬ MIN[percentage, 20]; p.initialPriority ¬ initialPriority; p.limitCnt ¬ percentage; --simplistic assumption of about 100 available threads RETURN [p] }; Enqueue: ENTRY PROC [p: REF PoolRec, a: REF ActivityRec, promiseToPerform: BOOL] RETURNS [mustFork: BOOL ¬ FALSE] = { <> <> IF p.tail#NIL THEN p.tail.next ¬ a ELSE p.head ¬ a; p.tail ¬ a; IF p.executingCnt> AND promiseToPerform THEN RETURN [FALSE]; p.executingCnt ¬ p.executingCnt+1; mustFork ¬ TRUE }; }; Dequeue: ENTRY PROC [p: REF PoolRec] RETURNS [a: REF ActivityRec ¬ NIL, gotit: BOOL ¬ FALSE] = { <> <> <> IF (a ¬ p.head)#NIL THEN { IF (p.head ¬ a.next) = NIL THEN p.tail ¬ NIL; a.next ¬ NIL; IF gotit ¬ (a.state=queued) THEN a.state ¬ busy; } ELSE p.executingCnt ¬ p.executingCnt-1; }; GetIt: ENTRY PROC [a: REF ActivityRec] RETURNS [gotit: BOOL] = { <> <> IF gotit ¬ (a.state=queued) THEN a.state ¬ busy; }; Fork: PUBLIC PROC [pool: Pool, proc: PROC[REF], data: REF ¬ NIL, promiseToPerform: BOOL] RETURNS [Activity] = { p: REF PoolRec ~ NARROW[IF pool=NIL THEN defaultPool ELSE pool]; a: REF ActivityRec ~ NEW[ActivityRec ¬ [proc: proc, data: data]]; IF proc=NIL THEN ERROR; IF Enqueue[p, a, promiseToPerform].mustFork THEN { <> <> IF p.executingCnt<=2 THEN ForkOps.Fork[ForkedHandler, p, p.initialPriority] ELSE ForkOps.ForkDelayed[0, ForkedHandler, p, p.initialPriority] --use ForkDelayed instead of yield to put the slack on the forkee but not the forker }; RETURN [a]; }; <<>> Handle1: PROC [a: REF ActivityRec] = { Change: ENTRY PROC [a: REF ActivityRec] = INLINE { SELECT a.state FROM busy => a.state ¬ finished; aborting => a.state ¬ aborted; ENDCASE => {}; BROADCAST a.recheck }; a.process ¬ Process.GetCurrent[]; IF a.state=busy THEN { a.proc[a.data ! ABORTED => {a.state ¬ aborted; CONTINUE}]; }; a.data ¬ NIL; Change[a]; Process.CancelAbort[a.process]; }; ForkedHandler: PROC [pool: REF] = { a: REF ActivityRec; gotit: BOOL; p: REF PoolRec ~ NARROW[pool]; DO [a, gotit] ¬ Dequeue[p]; IF a=NIL THEN RETURN; IF gotit THEN Handle1[a]; ENDLOOP }; Perform: PUBLIC PROC [activity: Activity] = { WITH activity SELECT FROM a: REF ActivityRec => { IF a.state=queued AND GetIt[a].gotit THEN Handle1[a]; }; ENDCASE => {}; }; Wait: PUBLIC PROC [activities: LIST OF Activity, perform: BOOL ¬ TRUE] = { self: PROCESS ¬ NIL; Wait1: ENTRY PROC [a: REF ActivityRec] = INLINE { DO SELECT a.state FROM queued, busy => WAIT a.recheck; ENDCASE => RETURN; ENDLOOP; }; IF perform THEN FOR l: LIST OF Activity ¬ activities, l.rest WHILE l#NIL DO Perform[l.first]; ENDLOOP; FOR l: LIST OF Activity ¬ activities, l.rest WHILE l#NIL DO WITH l.first SELECT FROM a: REF ActivityRec => SELECT a.state FROM queued, busy => { IF self=NIL THEN self ¬ Process.GetCurrent[]; IF a.process=self THEN ERROR wedged; Wait1[a]; }; ENDCASE => {}; ENDCASE => {}; ENDLOOP; }; AvailableCnt: PUBLIC PROC [pool: Pool] RETURNS [INT] = { p: REF PoolRec ~ NARROW[pool]; RETURN [MAX[p.limitCnt-p.executingCnt, 0]]; }; State: PUBLIC PROC [activity: Activity] RETURNS [ActivityState] = { a: REF ActivityRec ~ NARROW[activity]; RETURN [a.state]; }; Discard: PUBLIC PROC [activity: Activity] = { a: REF ActivityRec ~ NARROW[activity]; Protected: ENTRY PROC [a: REF ActivityRec] = INLINE { IF a.state=queued THEN {a.state ¬ discarded; BROADCAST a.recheck} }; IF a#NIL THEN Protected[a]; }; <<>> Abort: PUBLIC PROC [activity: Activity] = { a: REF ActivityRec ~ NARROW[activity]; Protected: ENTRY PROC [a: REF ActivityRec] = INLINE { ENABLE UNWIND => NULL; --for the case a.process being the running process SELECT a.state FROM queued => {a.state ¬ discarded; BROADCAST a.recheck}; busy => {a.state ¬ aborting; Process.Abort[a.process]; BROADCAST a.recheck}; ENDCASE => {}; }; IF a#NIL THEN Protected[a]; Process.CheckForAbort[]; }; <<>> PeekActivity: PUBLIC PROC [pool: Pool] RETURNS [activity: Activity ¬ NIL] = { p: REF PoolRec ~ NARROW[pool]; Peek: ENTRY PROC [p: REF PoolRec] RETURNS [a: REF ActivityRec ¬ NIL] = INLINE { a ¬ p.head; WHILE a#NIL AND (a.state>=finished) DO a ¬ p.head ¬ a.next; ENDLOOP; IF a=NIL THEN p.tail ¬ NIL ELSE WHILE a#NIL AND a.state#queued DO a ¬ a.next ENDLOOP; }; IF p=NIL THEN ERROR; activity ¬ Peek[p]; }; END.