DIRECTORY ForkOps, Process, WorkerThreads, WorkerThreadsPrivate; WorkerThreadsImpl: CEDAR MONITOR IMPORTS ForkOps, Process EXPORTS WorkerThreads = BEGIN OPEN WorkerThreads; PoolRec: TYPE = WorkerThreadsPrivate.PoolRec; ActivityRec: TYPE = WorkerThreadsPrivate.ActivityRec; PoolRep: PUBLIC TYPE = PoolRec; ActivityRep: PUBLIC TYPE = ActivityRec; IsPool: PUBLIC PROC [x: REF] RETURNS [BOOL] = { RETURN [ISTYPE[x, REF PoolRec]]; }; NarrowPool: PUBLIC PROC [x: REF] RETURNS [Pool] = { RETURN [NARROW[x, REF PoolRec]]; }; IsActivity: PUBLIC PROC [x: REF] RETURNS [BOOL] = { RETURN [ISTYPE[x, REF ActivityRec]]; }; NarrowActivity: PUBLIC PROC [x: REF] RETURNS [Activity] = { RETURN [NARROW[x, REF ActivityRec]]; }; wedged: ERROR = CODE; --only direct dependencies are detected defaultPool: Pool ¬ CreatePool[8, 2]; CreatePool: PUBLIC PROC [percentage: INT ¬ 4, initialPriority: CARD32 ¬ 2] RETURNS [Pool] = { p: REF PoolRec ¬ NEW[PoolRec]; percentage ¬ MAX[percentage, 2]; percentage ¬ MIN[percentage, 20]; p.initialPriority ¬ initialPriority; p.limitCnt ¬ percentage; --simplistic assumption of about 100 available threads RETURN [p] }; Enqueue: ENTRY PROC [p: REF PoolRec, a: REF ActivityRec, promiseToPerform: BOOL] RETURNS [mustFork: BOOL ¬ FALSE] = { IF p.tail#NIL THEN p.tail.next ¬ a ELSE p.head ¬ a; p.tail ¬ a; IF p.executingCnt> AND promiseToPerform THEN RETURN [FALSE]; p.executingCnt ¬ p.executingCnt+1; mustFork ¬ TRUE }; }; Dequeue: ENTRY PROC [p: REF PoolRec] RETURNS [a: REF ActivityRec ¬ NIL, gotit: BOOL ¬ FALSE] = { IF (a ¬ p.head)#NIL THEN { IF (p.head ¬ a.next) = NIL THEN p.tail ¬ NIL; a.next ¬ NIL; IF gotit ¬ (a.state=queued) THEN a.state ¬ busy; } ELSE p.executingCnt ¬ p.executingCnt-1; }; GetIt: ENTRY PROC [a: REF ActivityRec] RETURNS [gotit: BOOL] = { IF gotit ¬ (a.state=queued) THEN a.state ¬ busy; }; Fork: PUBLIC PROC [pool: Pool, proc: PROC[REF], data: REF ¬ NIL, promiseToPerform: BOOL] RETURNS [Activity] = { p: REF PoolRec ~ NARROW[IF pool=NIL THEN defaultPool ELSE pool]; a: REF ActivityRec ~ NEW[ActivityRec ¬ [proc: proc, data: data]]; IF proc=NIL THEN ERROR; IF Enqueue[p, a, promiseToPerform].mustFork THEN { IF p.executingCnt<=2 THEN ForkOps.Fork[ForkedHandler, p, p.initialPriority] ELSE ForkOps.ForkDelayed[0, ForkedHandler, p, p.initialPriority] --use ForkDelayed instead of yield to put the slack on the forkee but not the forker }; RETURN [a]; }; Handle1: PROC [a: REF ActivityRec] = { Change: ENTRY PROC [a: REF ActivityRec] = INLINE { SELECT a.state FROM busy => a.state ¬ finished; aborting => a.state ¬ aborted; ENDCASE => {}; BROADCAST a.recheck }; a.process ¬ Process.GetCurrent[]; IF a.state=busy THEN { a.proc[a.data ! ABORTED => {a.state ¬ aborted; CONTINUE}]; }; a.data ¬ NIL; Change[a]; Process.CancelAbort[a.process]; }; ForkedHandler: PROC [pool: REF] = { a: REF ActivityRec; gotit: BOOL; p: REF PoolRec ~ NARROW[pool]; DO [a, gotit] ¬ Dequeue[p]; IF a=NIL THEN RETURN; IF gotit THEN Handle1[a]; ENDLOOP }; Perform: PUBLIC PROC [activity: Activity] = { WITH activity SELECT FROM a: REF ActivityRec => { IF a.state=queued AND GetIt[a].gotit THEN Handle1[a]; }; ENDCASE => {}; }; Wait: PUBLIC PROC [activities: LIST OF Activity, perform: BOOL ¬ TRUE] = { self: PROCESS ¬ NIL; Wait1: ENTRY PROC [a: REF ActivityRec] = INLINE { DO SELECT a.state FROM queued, busy => WAIT a.recheck; ENDCASE => RETURN; ENDLOOP; }; IF perform THEN FOR l: LIST OF Activity ¬ activities, l.rest WHILE l#NIL DO Perform[l.first]; ENDLOOP; FOR l: LIST OF Activity ¬ activities, l.rest WHILE l#NIL DO WITH l.first SELECT FROM a: REF ActivityRec => SELECT a.state FROM queued, busy => { IF self=NIL THEN self ¬ Process.GetCurrent[]; IF a.process=self THEN ERROR wedged; Wait1[a]; }; ENDCASE => {}; ENDCASE => {}; ENDLOOP; }; AvailableCnt: PUBLIC PROC [pool: Pool] RETURNS [INT] = { p: REF PoolRec ~ NARROW[pool]; RETURN [MAX[p.limitCnt-p.executingCnt, 0]]; }; State: PUBLIC PROC [activity: Activity] RETURNS [ActivityState] = { a: REF ActivityRec ~ NARROW[activity]; RETURN [a.state]; }; Discard: PUBLIC PROC [activity: Activity] = { a: REF ActivityRec ~ NARROW[activity]; Protected: ENTRY PROC [a: REF ActivityRec] = INLINE { IF a.state=queued THEN {a.state ¬ discarded; BROADCAST a.recheck} }; IF a#NIL THEN Protected[a]; }; Abort: PUBLIC PROC [activity: Activity] = { a: REF ActivityRec ~ NARROW[activity]; Protected: ENTRY PROC [a: REF ActivityRec] = INLINE { ENABLE UNWIND => NULL; --for the case a.process being the running process SELECT a.state FROM queued => {a.state ¬ discarded; BROADCAST a.recheck}; busy => {a.state ¬ aborting; Process.Abort[a.process]; BROADCAST a.recheck}; ENDCASE => {}; }; IF a#NIL THEN Protected[a]; Process.CheckForAbort[]; }; PeekActivity: PUBLIC PROC [pool: Pool] RETURNS [activity: Activity ¬ NIL] = { p: REF PoolRec ~ NARROW[pool]; Peek: ENTRY PROC [p: REF PoolRec] RETURNS [a: REF ActivityRec ¬ NIL] = INLINE { a ¬ p.head; WHILE a#NIL AND (a.state>=finished) DO a ¬ p.head ¬ a.next; ENDLOOP; IF a=NIL THEN p.tail ¬ NIL ELSE WHILE a#NIL AND a.state#queued DO a ¬ a.next ENDLOOP; }; IF p=NIL THEN ERROR; activity ¬ Peek[p]; }; END. Δ WorkerThreadsImpl.mesa Copyright Σ 1993 by Xerox Corporation. All rights reserved. Created by Christian Jacobi, April 2, 1993 Christian Jacobi, April 16, 1993 7:33 pm PDT Enqueue a for future execution. Caller must fork new thread if mustFork=TRUE is returned. Remove front element Caller must "execute" a, if gotit=true. Caller must stop process if a=NIL is returned. Remove arbitrary element. Caller must "execute" a, if gotit=true. We are told that we have to fork, so we will do it. But, nobody told us to hurry... We are introducing some slack, this won't reduce the dynamic number of fork calls made, but, it might reduce the number of concurrent threads active at any particular time. Κι•NewlineDelimiter ™™Icodešœ Οeœ1™