ForkOpsImpl.mesa
Copyright Ó 1989, 1990, 1991, 1992, 1993 by Xerox Corporation. All rights reserved.
Created by Christian Jacobi, February 27, 1989 12:18:47 pm PST
Christian Jacobi, May 24, 1993 11:58 am PDT
DIRECTORY ForkOps, ForkOpsFriends, ForkOpsMonitor, Process;
ForkOpsImpl: CEDAR MONITOR
IMPORTS Process
EXPORTS ForkOps, ForkOpsFriends, ForkOpsMonitor =
BEGIN
Fork implementation idea:
There is a small ready-pool
Every process in the ready-pool can jump immediately when a job is ready.
Time delay structure implementation idea:
When an element is in the down list and the level is ticked, we will substract the timeConst and find a new place further down.
When an element is in the up list and the level is ticked, we will decide whether to move it up further or put it into the down list.
Idea for next improvement
Before calling ForkJob, test whether period is slow and enough processes are already running, if so put job on queue and only fork when something returns
debugging: BOOL = FALSE;
Ticks: TYPE = Process.Ticks;
readlyList: ProcessRef ¬ NIL;
--invariants:
--Every entry on invariants has a process waiting for its reCheck.
--While entry is on the readlyList its job is NIL.
readyCnt: INT ¬ 0;
softReadyLimit: INT ~ 8; --number of threads kept in ready list in a quiescent system
softReadyLimitH: INT ~ softReadyLimit-2; --used with softReadyLimit for hysteresis
hardReadyLimit: INT ~ 24; --maximum number of threads kept in ready list ever
dieJob: Job ~ NEW[JobRec ¬ [proc: NoOp]];
hasTimouter: BOOL ¬ FALSE;
timeOut: CONDITION;
failedCnt: CARD ¬ 0;
realForksCount: CARD ¬ 0;
realReturnsCount: CARD ¬ 0;
Job: TYPE = REF JobRec;
JobRec: TYPE = RECORD [
--Fields for client specified data
proc: PROC [REF],
data: REF ¬ NIL,
priority: CARDINAL ¬ 2,
period: Ticks ¬ 0, --0 means not periodical
--Fields for implementing the delay network
restDelay: Ticks ¬ 0,
next: Job ¬ NIL,
--Fields for implementing abort
pr: ProcessRef ¬ NIL,
wasProc: PROC [REF] ¬ NIL,
--While active, reserved for doubly linked circular list on activeSentinell
nextActive: Job ¬ NIL,
prevActive: Job ¬ NIL
];
ProcessRef: TYPE = REF ProcessRec;
ProcessRec: TYPE = RECORD [
next: ProcessRef ¬ NIL, --good while ready
job: Job, --monitored updates only
process: PROCESS ¬ NIL, --once initialized will never change
reCheck: CONDITION --(time-out only for case when notification goes lost)
];
freeJobs: Job ¬ NIL;
activeSentinell: Job; --doubly linked circular list of active Jobs, never NIL
--While a job is in this list, proc and data are defined (but absolutely not constant)
InitActiveList: PROC [] = {
activeSentinell ¬ NEW[JobRec];
activeSentinell.data ¬ activeSentinell;
activeSentinell.nextActive ¬ activeSentinell;
activeSentinell.prevActive ¬ activeSentinell;
};
NilProcedure: ERROR = CODE;
NewJob: ENTRY PROC [proc: PROC [REF], data: REF, priority: CARD32] RETURNS [job: Job] = {
--Allocates or reuses job structure, and, puts it into the active list.
--Caller must have tested proc to be global.
IF proc=NIL THEN RETURN WITH ERROR NilProcedure;
job ¬ freeJobs;
IF job#NIL
THEN {freeJobs ¬ job.next; job.next ¬ NIL}
ELSE job ¬ NEW[JobRec];
job.data ¬ data;
job.proc ¬ proc;
job.priority ¬ priority;
SpliceInActiveList[job];
};
FreeJob: INTERNAL PROC [job: Job] = {
--Put it on free list
IF debugging AND job.next#NIL THEN ERROR;
UnSpliceFromActiveList[job];
job.data ¬ NIL;
job.next ¬ freeJobs;
freeJobs ¬ job
};
SpliceInActiveList: INTERNAL PROC [job: Job] = INLINE {
--Include job in active list
--Trusts that the job and data fields have been initialized
job.prevActive ¬ activeSentinell;
job.nextActive ¬ activeSentinell.nextActive;
activeSentinell.nextActive.prevActive ¬ job;
activeSentinell.nextActive ¬ job;
};
UnSpliceFromActiveList: INTERNAL PROC [job: Job] = INLINE {
--Removes job from active list
job.nextActive.prevActive ¬ job.prevActive;
job.prevActive.nextActive ¬ job.nextActive;
job.prevActive ¬ NIL;
job.nextActive ¬ NIL;
};
GetFromReadyQueue: INTERNAL PROC [] RETURNS [pr: ProcessRef] = INLINE {
--Select a process from ready queue, there must be one
IF readyCnt<softReadyLimitH AND hasTimouter THEN NOTIFY timeOut;
readyCnt ¬ readyCnt-1;
pr ¬ readlyList;
readlyList ¬ readlyList.next;
pr.next ¬ NIL;
};
PutOnReadyQueue: INTERNAL PROC [pr: ProcessRef] = INLINE {
IF readyCnt>=softReadyLimit AND ~hasTimouter THEN {
LimitSleepers[];
};
--Put process on ready queue
readyCnt ¬ readyCnt+1;
pr.next ¬ readlyList;
readlyList ¬ pr;
};
Fork: PUBLIC PROC [proc: PROC[REF], data: REF ¬ NIL, priority: CARD32 ¬ 2] = {
copy: PROC[REF] ← proc; --test for SafeStorage.UnsafeProcAssignment
job: Job ¬ NewJob[copy, data, priority];
ForkJob[job];
};
LimitSleepers: INTERNAL PROC [] = {
--Make sure ready queue is not permanently staying too large by timing out some
--Using this within PutOnReadyQueue should assert that we are allocating a thread only when the ready queue is large; reducing the limit the total number of threads used.
Kill1Sleeper: INTERNAL PROC [] = {
pr: ProcessRef ¬ GetFromReadyQueue[];
pr.job ¬ dieJob;
NOTIFY pr.reCheck;
};
IF readyCnt>=softReadyLimit AND ~hasTimouter THEN {
hasTimouter ¬ TRUE;
WHILE readyCnt>=softReadyLimitH DO
Kill1Sleeper[];
WAIT timeOut;
ENDLOOP;
hasTimouter ¬ FALSE;
};
};
ForkJob: PROC [job: Job] = {
TryEnqueue: ENTRY PROC [job: Job] RETURNS [done: BOOL] = {
IF done ¬ (readlyList#NIL)
THEN {
pr: ProcessRef ~ GetFromReadyQueue[]; --invariant: process is waiting on pr.reCheck
pr.job ¬ job;
NOTIFY pr.reCheck;
}
ELSE failedCnt ¬ failedCnt+1;
};
IF ~TryEnqueue[job].done THEN {
Process.Yield[]; --Hope this might help a running process to finish (so we can avoid the fork). Sorry, this will make breadth-first forking benchmarks look bad, but, real applications will behave good.
IF ~TryEnqueue[job].done THEN TRUSTED {
--It has been experimentally verified, that this is quite rare
pr: ProcessRef ~ NEW[ProcessRec];
pr.job ¬ job;
Process.DisableTimeout[@pr.reCheck];
Process.Detach[FORK ForkBaseProcess[pr]]
};
};
};
ForkBaseProcess: PROC [pr: ProcessRef] = {
--Base of forked proces, executing jobs as long as there is work
gjob: Job;
pr.process ¬ Process.GetCurrent[];
FirstJob[pr];
DO
BEGIN
ENABLE ABORTED => GOTO Ooops; --amortize the ENABLE over many forks
DO
job: Job ¬ gjob ¬ pr.job;
IF job=NIL THEN RETURN;
IF Process.GetPriority[]#job.priority THEN {
Process.SetPriority[job.priority]; --Todays version of Process.SetPriority does a yield which I want to avoid here
};
job.proc[job.data];
<<Process.SetPriority[Process.priorityRealTime];>> -- put it back when it wouldn't yield anymore. Actually I don't mind if this loop gets one yield. I'm afraid of YieldSecondBest and of two yields. ChJ December 22, 1992 12:49:47 pm PST
WaitForNextJob[pr, job];
ENDLOOP;
EXITS Ooops => {
NoteAborted[gjob];
WaitForNextJob[pr, gjob];
};
END;
ENDLOOP;
};
Warned: PROC [x: REF] = {};
Stop: PUBLIC ENTRY PROC [proc: PROC [REF], data: REF, abort: BOOL] = {
--Does aborts if necessary but does not remove jobs from active list
abortSelf: BOOL ¬ FALSE;
failedList: LIST OF ProcessRef ¬ NIL;
lag: Job;
StopInDelayStructure[proc, data];
--Jobs is no more in delay structure, but we don't know whether it is already started or not
--Regular loop to find all active jobs
lag ¬ activeSentinell;
DO
this: Job ¬ lag.nextActive;
IF this=activeSentinell THEN EXIT;
IF this.data=data AND (this.proc=proc OR (this.wasProc=proc AND this.proc=Warned)) THEN {
pr: ProcessRef ¬ this.pr;
--Disable pending calls
this.period ¬ 0; --stops periodic reforking tail loop
this.wasProc ¬ proc;
this.proc ¬ Warned;
--Abort ongoing calls
IF abort AND pr#NIL AND pr.job=this THEN {
SELECT TRUE FROM
pr.process=NIL => failedList ¬ CONS[pr, failedList];
pr.process=Process.GetCurrent[] => abortSelf ¬ TRUE;
ENDCASE => Process.Abort[pr.process ! ANY => CONTINUE];
};
};
lag ¬ this;
ENDLOOP;
--Special loop to find all processes which were at a critical phase in previous loop and failed
FOR l: LIST OF ProcessRef ¬ failedList, l.rest WHILE l#NIL DO
pr: ProcessRef ¬ l.first;
WHILE pr.process=NIL DO
WAIT pr.reCheck
ENDLOOP;
--Left the monitor. Need full invariant checking
BEGIN
job: Job ¬ pr.job;
IF job#NIL AND job.proc=Warned AND job.wasProc=proc AND job.data=data THEN {
SELECT TRUE FROM
pr.process=Process.GetCurrent[] => abortSelf ¬ TRUE;
ENDCASE => Process.Abort[pr.process ! ANY => CONTINUE];
};
END;
ENDLOOP;
--Special care when aborting self
IF abortSelf THEN RETURN WITH ERROR ABORTED
};
FirstJob: ENTRY PROC [pr: ProcessRef] = {
--Execute new job part of WaitForNextJob
--Generic initialization
realForksCount ¬ realForksCount+1;
NOTIFY pr.reCheck;
--Execute new job
IF debugging AND pr.job.pr#NIL THEN ERROR;
pr.job.pr ¬ pr;
};
WaitForNextJob: ENTRY PROC [pr: ProcessRef, lastJob: Job] = {
--Disable last and enable new job. Funny 1/2 loop unrolling to reduce monitors.
--Disable last job
IF debugging AND pr.job#lastJob THEN ERROR;
pr.job ¬ NIL;
IF debugging AND lastJob.pr=NIL THEN ERROR;
lastJob.pr ¬ NIL;
IF lastJob.period=0
THEN FreeJob[lastJob]
ELSE StartDelaying[lastJob, lastJob.period];
IF readyCnt>=hardReadyLimit THEN {
pr.job ¬ NIL;
realReturnsCount ¬ realReturnsCount+1;
RETURN
};
--Search a new job
--Enter ready queue
PutOnReadyQueue[pr];
WHILE pr.job=NIL DO WAIT pr.reCheck ENDLOOP;
IF pr.job=dieJob THEN {
pr.job ¬ NIL;
realReturnsCount ¬ realReturnsCount+1;
RETURN
};
--Note that we have left the monitor and have to re-establish invariants
Process.CancelAbort[pr.process];
--Execute new job
IF debugging AND pr.job.pr#NIL THEN ERROR;
pr.job.pr ¬ pr;
};
Anchor: TYPE = REF AnchorRec;
AnchorRec: TYPE = RECORD [
timeConst: Ticks,
carry: BOOL ¬ FALSE,
moreFrequent: Anchor ¬ NIL,
lessFrequent: Anchor ¬ NIL,
upList: Job ¬ NIL, --recently included elements, walking further into the delay structure
downList: Job ¬ NIL --walking slowly out of the delay structure
];
stopper: Anchor; --artifical stop for the down moving
delayRoot: Anchor; --most frequent Anchor; data structure built at initialization time.
InitDelayStructure: PROC [] = {
--The delay-structure has all pointers set, and,
--is preallocated so we don't ever need to do NIL tests.
timeConst: Ticks ¬ 1;
last: Anchor ¬ delayRoot ¬ NEW[AnchorRec];
stopper ¬ NEW[AnchorRec];
stopper.timeConst ¬ 0;
stopper.lessFrequent ¬ delayRoot;
stopper.moreFrequent ¬ NIL;
delayRoot.moreFrequent ¬ stopper; --crash if dereferenced
delayRoot.timeConst ¬ timeConst;
FOR i: INT IN [0..BITS[Ticks]) DO
t: Anchor ¬ NEW[AnchorRec];
IF timeConst<=LAST[Ticks]/2
THEN timeConst ¬ timeConst*2
ELSE timeConst ¬ LAST[Ticks];
t.timeConst ¬ timeConst;
t.moreFrequent ¬ last;
last.lessFrequent ¬ t;
last ¬ t;
ENDLOOP;
last.lessFrequent ¬ last;
};
EntryStartDelaying: ENTRY PROC [job: Job, restTime: Ticks] = {
StartDelaying[job, restTime];
};
StartDelaying: INTERNAL PROC [job: Job, restTime: Ticks] = INLINE {
IF debugging AND job.next#NIL THEN ERROR;
job.restDelay ¬ restTime;
IF restTime>0
THEN {job.next ¬ delayRoot.upList; delayRoot.upList ¬ job}
ELSE {job.next ¬ delayRoot.downList; delayRoot.downList ¬ job};
};
DownAsFarAsItGoes: PROC [anchor: Anchor, job: Job] = INLINE {
restTime: Ticks ¬ job.restDelay;
newStep: Anchor ¬ anchor.moreFrequent;
WHILE restTime<newStep.timeConst DO --stopper has a timeConst of zero
newStep ¬ newStep.moreFrequent;
ENDLOOP;
IF debugging AND job.next#NIL THEN ERROR;
job.next ¬ newStep.downList; newStep.downList ¬ job;
};
HandleDown: PROC [anchor: Anchor] = {
job: Job ¬ anchor.downList; anchor.downList ¬ NIL;
WHILE job#NIL DO
next: Job ¬ job.next; job.next ¬ NIL;
job.restDelay ¬ job.restDelay-anchor.timeConst;
DownAsFarAsItGoes[anchor, job];
job ¬ next;
ENDLOOP
};
HandleUp: PROC [anchor: Anchor] = {
UpOne: PROC [anchor: Anchor, job: Job, thisTime, nextTime: Ticks] = INLINE {
restTime: Ticks ¬ job.restDelay-thisTime;
IF debugging AND job.next#NIL THEN ERROR;
IF debugging AND job.restDelay<thisTime THEN ERROR;
SELECT TRUE FROM
restTime>=nextTime => {
IF anchor.carry
THEN job.restDelay ¬ restTime
ELSE {
--compensate for non-full cycle
--job.restDelay ¬ job.restDelay-thisTime+(nextTime-thisTime); ...is identity
};
--including above requires outer loop stepping in downwards direction
job.next ¬ anchor.lessFrequent.upList; anchor.lessFrequent.upList ¬ job;
};
restTime>=thisTime => {
job.restDelay ¬ restTime;
job.next ¬ anchor.downList; anchor.downList ¬ job;
};
ENDCASE => {
--I think this doesn't happen but I'm too lazy to prove it
job.restDelay ¬ restTime;
DownAsFarAsItGoes[anchor, job];
};
};
job: Job ¬ anchor.upList;
thisTime: Ticks ~ anchor.timeConst;
nextTime: Ticks ~ anchor.lessFrequent.timeConst;
anchor.upList ¬ NIL;
WHILE job#NIL DO
next: Job ¬ job.next; job.next ¬ NIL;
UpOne[anchor, job, thisTime, nextTime];
job ¬ next;
ENDLOOP;
};
Tick: ENTRY PROC [] RETURNS [forkList: Job] = {
--Ticks clock for delay network; returns jobs to fork
afterLimit: Anchor;
limit: Anchor ¬ delayRoot;
DO
limit.carry ¬ ~limit.carry;
IF ~limit.carry THEN {afterLimit ¬ limit.lessFrequent; EXIT};
limit ¬ limit.lessFrequent;
ENDLOOP;
--Handle the downward moves first, upward moves afterwards
--  Prevents double clicking jobs included in the downList by the upward moves
BEGIN --Downward moves
Looping in upwards direction
--  Prevents double clicking jobs moved down
--  Loop excluding the root
stepping: Anchor ¬ delayRoot;
WHILE stepping#afterLimit DO
HandleDown[stepping];
stepping ¬ stepping.lessFrequent;
ENDLOOP;
END;
BEGIN --Upward moves
Looping in downwards direction
--  Prevents double clicking jobs moved upwards
--  Loop including the root
stepping: Anchor ¬ limit;
WHILE stepping#stopper DO
HandleUp[stepping];
stepping ¬ stepping.moreFrequent;
ENDLOOP;
END;
forkList ¬ stopper.downList; stopper.downList ¬ NIL
};
TickingProcess: PROC [] = {
Process.SetPriority[Process.prioritySwatWatcher];
DO --forever
forkList: Job ¬ Tick[];
DoTheForks[forkList];
Process.Pause[1];
ENDLOOP;
};
DoTheForks: PROC [forkList: Job] = INLINE {
WHILE forkList#NIL DO
next: Job ~ forkList.next;
forkList.next ¬ NIL;
ForkJob[forkList];
forkList ¬ next;
ENDLOOP;
};
ForkDelayed: PUBLIC PROC [ms: INT ¬ 0, proc: PROC[REF], data: REF ¬ NIL, priority: CARD32 ¬ 2] = {
copy: PROC[REF] ← proc; --test for SafeStorage.UnsafeProcAssignment
job: Job ~ NewJob[copy, data, priority];
EntryStartDelaying[job, Process.MsecToTicks[ms]];
};
ForkPeriodically: PUBLIC PROC [ms: INT, proc: PROC[REF], data: REF ¬ NIL, priority: CARD32 ¬ 2] = {
job: Job;
copy: PROC[REF] ← proc; --test for SafeStorage.UnsafeProcAssignment
IF ms<=0 THEN ERROR;
job ¬ NewJob[proc, data, priority];
job.period ¬ Process.MsecToTicks[ms];
EntryStartDelaying[job, job.period];
};
NoOp: PROC [x: REF] = {
};
StopInDelayStructure: INTERNAL PROC [proc: PROC[REF], data: REF, ms: INT ¬ LAST[INT]] = {
The definition says that job is expensive
anchor: Anchor ¬ stopper;
time: Ticks ¬ IF ms=LAST[INT] THEN LAST[Ticks] ELSE Process.MsecToTicks[ms];
RemFromList: INTERNAL PROC [list: Job, proc: PROC[REF], data: REF] = {
lag: Job ¬ NIL;
WHILE list#NIL DO
IF list.proc=proc AND list.data=data THEN {
list.proc ¬ NoOp; --Now proc won't be called anymore
UnSpliceFromActiveList[list]; --Safe: NOT executing while in the delay structure
IF lag#NIL THEN lag.next ¬ list.next; --Also remove it from list, but only if easy
Don't free it because it might still hang on the anchor. Rely on garbage collection.
};
lag ¬ list; list ¬ list.next;
ENDLOOP;
};
WHILE anchor.timeConst<=time DO
RemFromList[anchor.downList, proc, data];
RemFromList[anchor.upList, proc, data];
IF anchor=anchor.lessFrequent THEN RETURN;
anchor ¬ anchor.lessFrequent;
ENDLOOP;
};
Pending: PUBLIC ENTRY PROC [] RETURNS [cnt: CARD ¬ 0] = {
lag: Job ¬ activeSentinell;
WHILE lag.nextActive#activeSentinell DO
cnt ¬ cnt+1; lag ¬ lag.nextActive;
ENDLOOP;
};
RealForks: PUBLIC PROC [] RETURNS [CARD] = {
RETURN [realForksCount];
};
RealReturns: PUBLIC PROC [] RETURNS [CARD] = {
RETURN [realReturnsCount];
};
YieldCount: PUBLIC ENTRY PROC [] RETURNS [CARD] = {
RETURN [failedCnt/2]
};
regList: LIST OF PROC [REF] ¬ NIL; --first is a forkProc, rest.first is an abortNotifier, etc.
MyRegisterAbortNotifier: ENTRY PROC [forkProc: PROC [REF], abortNotifier: PROC [REF]] = {
FOR list: LIST OF PROC [REF] ¬ regList, list.rest.rest WHILE list#NIL DO
IF list.first=forkProc THEN {list.rest.first ¬ abortNotifier; RETURN};
ENDLOOP;
IF abortNotifier#NIL THEN regList ¬ CONS[forkProc, CONS[abortNotifier, regList]];
};
RegisterAbortNotifier: PUBLIC PROC [forkProc: PROC [REF], abortNotifier: PROC [REF]] = {
forkCopy: PROC [REF] ~ forkProc;
abortCopy: PROC [REF] ~ abortNotifier;
MyRegisterAbortNotifier[forkCopy, abortCopy];
};
FetchAbortNotifier: ENTRY PROC [forkProc: PROC [REF]] RETURNS [abortNotifier: PROC [REF]] = TRUSTED {
FOR list: LIST OF PROC [REF] ¬ regList, list.rest.rest WHILE list#NIL DO
IF list.first=forkProc THEN RETURN [list.rest.first];
ENDLOOP;
};
NoteAborted: PROC [lastJob: Job] = {
abortProc: PROC [REF];
proc: PROC [REF] ¬ lastJob.proc;
IF proc=Warned THEN proc ¬ lastJob.wasProc;
abortProc ¬ FetchAbortNotifier[proc];
IF abortProc#NIL THEN abortProc[lastJob.data ! ABORTED => CONTINUE];
};
InitActiveList[];
InitDelayStructure[];
TRUSTED {
Process.SetTimeout[@timeOut, Process.MsecToTicks[300]];
Process.Detach[FORK TickingProcess[]];
};
END.