WorkerThreadsImpl.mesa
Copyright Ó 1993 by Xerox Corporation. All rights reserved.
Created by Christian Jacobi, April 2, 1993
Christian Jacobi, April 16, 1993 7:33 pm PDT
DIRECTORY
ForkOps,
Process,
WorkerThreads,
WorkerThreadsPrivate;
WorkerThreadsImpl:
CEDAR
MONITOR
IMPORTS ForkOps, Process
EXPORTS WorkerThreads =
BEGIN OPEN WorkerThreads;
PoolRec: TYPE = WorkerThreadsPrivate.PoolRec;
ActivityRec: TYPE = WorkerThreadsPrivate.ActivityRec;
PoolRep: PUBLIC TYPE = PoolRec;
ActivityRep: PUBLIC TYPE = ActivityRec;
IsPool:
PUBLIC
PROC [x:
REF]
RETURNS [
BOOL] = {
RETURN [ISTYPE[x, REF PoolRec]];
};
NarrowPool:
PUBLIC
PROC [x:
REF]
RETURNS [Pool] = {
RETURN [NARROW[x, REF PoolRec]];
};
IsActivity:
PUBLIC
PROC [x:
REF]
RETURNS [
BOOL] = {
RETURN [ISTYPE[x, REF ActivityRec]];
};
NarrowActivity:
PUBLIC
PROC [x:
REF]
RETURNS [Activity] = {
RETURN [NARROW[x, REF ActivityRec]];
};
wedged: ERROR = CODE; --only direct dependencies are detected
defaultPool: Pool ¬ CreatePool[8, 2];
CreatePool:
PUBLIC
PROC [percentage:
INT ¬ 4, initialPriority:
CARD32 ¬ 2]
RETURNS [Pool] = {
p: REF PoolRec ¬ NEW[PoolRec];
percentage ¬ MAX[percentage, 2];
percentage ¬ MIN[percentage, 20];
p.initialPriority ¬ initialPriority;
p.limitCnt ¬ percentage; --simplistic assumption of about 100 available threads
RETURN [p]
};
Enqueue:
ENTRY
PROC [p:
REF PoolRec, a:
REF ActivityRec, promiseToPerform:
BOOL]
RETURNS [mustFork:
BOOL ¬
FALSE] = {
Enqueue a for future execution.
Caller must fork new thread if mustFork=TRUE is returned.
IF p.tail#NIL THEN p.tail.next ¬ a ELSE p.head ¬ a;
p.tail ¬ a;
IF p.executingCnt<p.limitCnt
THEN {
IF p.head=p.tail <<single element>> AND promiseToPerform THEN RETURN [FALSE];
p.executingCnt ¬ p.executingCnt+1;
mustFork ¬ TRUE
};
};
Dequeue:
ENTRY
PROC [p:
REF PoolRec]
RETURNS [a:
REF ActivityRec ¬
NIL, gotit:
BOOL ¬
FALSE] = {
Remove front element
Caller must "execute" a, if gotit=true.
Caller must stop process if a=NIL is returned.
IF (a ¬ p.head)#
NIL
THEN {
IF (p.head ¬ a.next) = NIL THEN p.tail ¬ NIL;
a.next ¬ NIL;
IF gotit ¬ (a.state=queued) THEN a.state ¬ busy;
}
ELSE p.executingCnt ¬ p.executingCnt-1;
};
GetIt:
ENTRY
PROC [a:
REF ActivityRec]
RETURNS [gotit:
BOOL] = {
Remove arbitrary element.
Caller must "execute" a, if gotit=true.
IF gotit ¬ (a.state=queued) THEN a.state ¬ busy;
};
Fork:
PUBLIC
PROC [pool: Pool, proc:
PROC[
REF], data:
REF ¬
NIL, promiseToPerform:
BOOL]
RETURNS [Activity] = {
p: REF PoolRec ~ NARROW[IF pool=NIL THEN defaultPool ELSE pool];
a: REF ActivityRec ~ NEW[ActivityRec ¬ [proc: proc, data: data]];
IF proc=NIL THEN ERROR;
IF Enqueue[p, a, promiseToPerform].mustFork
THEN {
We are told that we have to fork, so we will do it. But, nobody told us to hurry...
We are introducing some slack, this won't reduce the dynamic number of fork calls made, but, it might reduce the number of concurrent threads active at any particular time.
IF p.executingCnt<=2
THEN ForkOps.Fork[ForkedHandler, p, p.initialPriority]
ELSE ForkOps.ForkDelayed[0, ForkedHandler, p, p.initialPriority] --use ForkDelayed instead of yield to put the slack on the forkee but not the forker
};
RETURN [a];
};
Handle1:
PROC [a:
REF ActivityRec] = {
Change:
ENTRY
PROC [a:
REF ActivityRec] =
INLINE {
SELECT a.state
FROM
busy => a.state ¬ finished;
aborting => a.state ¬ aborted;
ENDCASE => {};
BROADCAST a.recheck
};
a.process ¬ Process.GetCurrent[];
IF a.state=busy
THEN {
a.proc[a.data ! ABORTED => {a.state ¬ aborted; CONTINUE}];
};
a.data ¬ NIL;
Change[a];
Process.CancelAbort[a.process];
};
ForkedHandler:
PROC [pool:
REF] = {
a: REF ActivityRec; gotit: BOOL;
p: REF PoolRec ~ NARROW[pool];
DO
[a, gotit] ¬ Dequeue[p];
IF a=NIL THEN RETURN;
IF gotit THEN Handle1[a];
ENDLOOP
};
Perform:
PUBLIC PROC [activity: Activity] = {
WITH activity
SELECT
FROM
a:
REF ActivityRec => {
IF a.state=queued AND GetIt[a].gotit THEN Handle1[a];
};
ENDCASE => {};
};
Wait:
PUBLIC
PROC [activities:
LIST
OF Activity, perform:
BOOL ¬
TRUE] = {
self: PROCESS ¬ NIL;
Wait1:
ENTRY
PROC [a:
REF ActivityRec] =
INLINE {
DO
SELECT a.state
FROM
queued, busy => WAIT a.recheck;
ENDCASE => RETURN;
ENDLOOP;
};
IF perform
THEN
FOR l:
LIST
OF Activity ¬ activities, l.rest
WHILE l#
NIL
DO
Perform[l.first];
ENDLOOP;
FOR l:
LIST
OF Activity ¬ activities, l.rest
WHILE l#
NIL
DO
WITH l.first
SELECT
FROM
a:
REF ActivityRec =>
SELECT a.state
FROM
queued, busy => {
IF self=NIL THEN self ¬ Process.GetCurrent[];
IF a.process=self THEN ERROR wedged;
Wait1[a];
};
ENDCASE => {};
ENDCASE => {};
ENDLOOP;
};
AvailableCnt:
PUBLIC
PROC [pool: Pool]
RETURNS [
INT] = {
p: REF PoolRec ~ NARROW[pool];
RETURN [MAX[p.limitCnt-p.executingCnt, 0]];
};
State:
PUBLIC
PROC [activity: Activity]
RETURNS [ActivityState] = {
a: REF ActivityRec ~ NARROW[activity];
RETURN [a.state];
};
Discard:
PUBLIC
PROC [activity: Activity] = {
a: REF ActivityRec ~ NARROW[activity];
Protected:
ENTRY
PROC [a:
REF ActivityRec] =
INLINE {
IF a.state=queued THEN {a.state ¬ discarded; BROADCAST a.recheck}
};
IF a#NIL THEN Protected[a];
};
Abort:
PUBLIC
PROC [activity: Activity] = {
a: REF ActivityRec ~ NARROW[activity];
Protected:
ENTRY
PROC [a:
REF ActivityRec] =
INLINE {
ENABLE UNWIND => NULL; --for the case a.process being the running process
SELECT a.state
FROM
queued => {a.state ¬ discarded; BROADCAST a.recheck};
busy => {a.state ¬ aborting; Process.Abort[a.process]; BROADCAST a.recheck};
ENDCASE => {};
};
IF a#NIL THEN Protected[a];
Process.CheckForAbort[];
};
PeekActivity:
PUBLIC
PROC [pool: Pool]
RETURNS [activity: Activity ¬
NIL] = {
p: REF PoolRec ~ NARROW[pool];
Peek:
ENTRY
PROC [p:
REF PoolRec]
RETURNS [a:
REF ActivityRec ¬
NIL] =
INLINE {
a ¬ p.head;
WHILE a#
NIL
AND (a.state>=finished)
DO
a ¬ p.head ¬ a.next;
ENDLOOP;
IF a=
NIL
THEN p.tail ¬ NIL
ELSE WHILE a#NIL AND a.state#queued DO a ¬ a.next ENDLOOP;
};
IF p=NIL THEN ERROR;
activity ¬ Peek[p];
};
END.