DIRECTORY
ForkOps,
Process,
WorkerThreads,
WorkerThreadsPrivate;
WorkerThreadsImpl: CEDAR MONITOR
IMPORTS ForkOps, Process
EXPORTS WorkerThreads =
BEGIN OPEN WorkerThreads;
PoolRec: TYPE = WorkerThreadsPrivate.PoolRec;
ActivityRec: TYPE = WorkerThreadsPrivate.ActivityRec;
PoolRep: PUBLIC TYPE = PoolRec;
ActivityRep: PUBLIC TYPE = ActivityRec;
IsPool: PUBLIC PROC [x: REF] RETURNS [BOOL] = {
RETURN [ISTYPE[x, REF PoolRec]];
};
NarrowPool: PUBLIC PROC [x: REF] RETURNS [Pool] = {
RETURN [NARROW[x, REF PoolRec]];
};
IsActivity: PUBLIC PROC [x: REF] RETURNS [BOOL] = {
RETURN [ISTYPE[x, REF ActivityRec]];
};
NarrowActivity: PUBLIC PROC [x: REF] RETURNS [Activity] = {
RETURN [NARROW[x, REF ActivityRec]];
};
wedged: ERROR = CODE; --only direct dependencies are detected
defaultPool: Pool ¬ CreatePool[8, 2];
CreatePool: PUBLIC PROC [percentage: INT ¬ 4, initialPriority: CARD32 ¬ 2] RETURNS [Pool] = {
p: REF PoolRec ¬ NEW[PoolRec];
percentage ¬ MAX[percentage, 2];
percentage ¬ MIN[percentage, 20];
p.initialPriority ¬ initialPriority;
p.limitCnt ¬ percentage; --simplistic assumption of about 100 available threads
RETURN [p]
};
Enqueue: ENTRY PROC [p: REF PoolRec, a: REF ActivityRec, promiseToPerform: BOOL] RETURNS [mustFork: BOOL ¬ FALSE] = {
IF p.tail#NIL THEN p.tail.next ¬ a ELSE p.head ¬ a;
p.tail ¬ a;
IF p.executingCnt
> AND promiseToPerform THEN RETURN [FALSE];
p.executingCnt ¬ p.executingCnt+1;
mustFork ¬ TRUE
};
};
Dequeue: ENTRY PROC [p: REF PoolRec] RETURNS [a: REF ActivityRec ¬ NIL, gotit: BOOL ¬ FALSE] = {
IF (a ¬ p.head)#NIL
THEN {
IF (p.head ¬ a.next) = NIL THEN p.tail ¬ NIL;
a.next ¬ NIL;
IF gotit ¬ (a.state=queued) THEN a.state ¬ busy;
}
ELSE p.executingCnt ¬ p.executingCnt-1;
};
GetIt: ENTRY PROC [a: REF ActivityRec] RETURNS [gotit: BOOL] = {
IF gotit ¬ (a.state=queued) THEN a.state ¬ busy;
};
Fork: PUBLIC PROC [pool: Pool, proc: PROC[REF], data: REF ¬ NIL, promiseToPerform: BOOL] RETURNS [Activity] = {
p: REF PoolRec ~ NARROW[IF pool=NIL THEN defaultPool ELSE pool];
a: REF ActivityRec ~ NEW[ActivityRec ¬ [proc: proc, data: data]];
IF proc=NIL THEN ERROR;
IF Enqueue[p, a, promiseToPerform].mustFork THEN {
IF p.executingCnt<=2
THEN ForkOps.Fork[ForkedHandler, p, p.initialPriority]
ELSE ForkOps.ForkDelayed[0, ForkedHandler, p, p.initialPriority] --use ForkDelayed instead of yield to put the slack on the forkee but not the forker
};
RETURN [a];
};
Handle1: PROC [a: REF ActivityRec] = {
Change: ENTRY PROC [a: REF ActivityRec] = INLINE {
SELECT a.state FROM
busy => a.state ¬ finished;
aborting => a.state ¬ aborted;
ENDCASE => {};
BROADCAST a.recheck
};
a.process ¬ Process.GetCurrent[];
IF a.state=busy THEN {
a.proc[a.data ! ABORTED => {a.state ¬ aborted; CONTINUE}];
};
a.data ¬ NIL;
Change[a];
Process.CancelAbort[a.process];
};
ForkedHandler: PROC [pool: REF] = {
a: REF ActivityRec; gotit: BOOL;
p: REF PoolRec ~ NARROW[pool];
DO
[a, gotit] ¬ Dequeue[p];
IF a=NIL THEN RETURN;
IF gotit THEN Handle1[a];
ENDLOOP
};
Perform: PUBLIC PROC [activity: Activity] = {
WITH activity SELECT FROM
a: REF ActivityRec => {
IF a.state=queued AND GetIt[a].gotit THEN Handle1[a];
};
ENDCASE => {};
};
Wait: PUBLIC PROC [activities: LIST OF Activity, perform: BOOL ¬ TRUE] = {
self: PROCESS ¬ NIL;
Wait1: ENTRY PROC [a: REF ActivityRec] = INLINE {
DO
SELECT a.state FROM
queued, busy => WAIT a.recheck;
ENDCASE => RETURN;
ENDLOOP;
};
IF perform THEN
FOR l: LIST OF Activity ¬ activities, l.rest WHILE l#NIL DO
Perform[l.first];
ENDLOOP;
FOR l: LIST OF Activity ¬ activities, l.rest WHILE l#NIL DO
WITH l.first SELECT FROM
a: REF ActivityRec =>
SELECT a.state FROM
queued, busy => {
IF self=NIL THEN self ¬ Process.GetCurrent[];
IF a.process=self THEN ERROR wedged;
Wait1[a];
};
ENDCASE => {};
ENDCASE => {};
ENDLOOP;
};
AvailableCnt: PUBLIC PROC [pool: Pool] RETURNS [INT] = {
p: REF PoolRec ~ NARROW[pool];
RETURN [MAX[p.limitCnt-p.executingCnt, 0]];
};
State: PUBLIC PROC [activity: Activity] RETURNS [ActivityState] = {
a: REF ActivityRec ~ NARROW[activity];
RETURN [a.state];
};
Discard: PUBLIC PROC [activity: Activity] = {
a: REF ActivityRec ~ NARROW[activity];
Protected: ENTRY PROC [a: REF ActivityRec] = INLINE {
IF a.state=queued THEN {a.state ¬ discarded; BROADCAST a.recheck}
};
IF a#NIL THEN Protected[a];
};
Abort: PUBLIC PROC [activity: Activity] = {
a: REF ActivityRec ~ NARROW[activity];
Protected: ENTRY PROC [a: REF ActivityRec] = INLINE {
ENABLE UNWIND => NULL; --for the case a.process being the running process
SELECT a.state FROM
queued => {a.state ¬ discarded; BROADCAST a.recheck};
busy => {a.state ¬ aborting; Process.Abort[a.process]; BROADCAST a.recheck};
ENDCASE => {};
};
IF a#NIL THEN Protected[a];
Process.CheckForAbort[];
};
PeekActivity: PUBLIC PROC [pool: Pool] RETURNS [activity: Activity ¬ NIL] = {
p: REF PoolRec ~ NARROW[pool];
Peek: ENTRY PROC [p: REF PoolRec] RETURNS [a: REF ActivityRec ¬ NIL] = INLINE {
a ¬ p.head;
WHILE a#NIL AND (a.state>=finished) DO
a ¬ p.head ¬ a.next;
ENDLOOP;
IF a=NIL
THEN p.tail ¬ NIL
ELSE WHILE a#NIL AND a.state#queued DO a ¬ a.next ENDLOOP;
};
IF p=NIL THEN ERROR;
activity ¬ Peek[p];
};
END.
Δ
WorkerThreadsImpl.mesa
Copyright Σ 1993 by Xerox Corporation. All rights reserved.
Created by Christian Jacobi, April 2, 1993
Christian Jacobi, April 16, 1993 7:33 pm PDT
Enqueue a for future execution.
Caller must fork new thread if mustFork=TRUE is returned.
Remove front element
Caller must "execute" a, if gotit=true.
Caller must stop process if a=NIL is returned.
Remove arbitrary element.
Caller must "execute" a, if gotit=true.
We are told that we have to fork, so we will do it. But, nobody told us to hurry...
We are introducing some slack, this won't reduce the dynamic number of fork calls made, but, it might reduce the number of concurrent threads active at any particular time.
Κι NewlineDelimiter
Icode
Οe1