PeriodicalForkImpl.mesa
Copyright Ó 1989, 1991, 1992 by Xerox Corporation. All rights reserved.
Created by Christian Jacobi, March 1, 1989 5:17:26 pm PST
Christian Jacobi, March 5, 1992 11:42 am PST
DIRECTORY DelayedFork, FastFork, PeriodicalFork, Process, RuntimeError;
PeriodicalForkImpl:
CEDAR
MONITOR
IMPORTS FastFork, Process, RuntimeError
EXPORTS DelayedFork, PeriodicalFork ~
forkImmediateTime: INT = 32;
minimumTime: INT = 64;
minimumTicks: Process.Ticks = Process.MsecToTicks[minimumTime];
timeTop: TimeRef ¬ NEW[TimeRec];
TimeRef: TYPE = REF TimeRec;
TimeRec:
TYPE =
RECORD [
toggle: BOOL ¬ FALSE, --not monitored, there is only 1 PeriodicalProcess
lessFrequent: TimeRef ¬ NIL,
periodicJobs: JobList ¬ NIL,
thisTime: JobList ¬ NIL,
nextTime: JobList ¬ NIL
];
JobRef:
TYPE =
LIST
OF JobRec;
--understanding is: only first list element may be used and constitutes job
JobList:
TYPE =
LIST
OF JobRec;
--understanding is: all elements may be used
JobRec:
TYPE =
RECORD [
proc: PROC[REF] ¬ NIL,
data: REF ¬ NIL,
priority: CARD32 ¬ 0,
isForked: ForkState ¬ joined, --prevent forking more than one process
periodic: BOOL ¬ FALSE
];
ForkState: TYPE = {joined, enqueued};
freeList: JobList ¬ NIL;
NewJob:
ENTRY
PROC
RETURNS [JobRef] =
INLINE {
RETURN [InternalNewJob[]];
};
InternalNewJob:
INTERNAL
PROC
RETURNS [job: JobRef] =
INLINE {
IF freeList=
NIL
THEN job ¬ LIST[[]]
ELSE {
job ¬ freeList;
freeList ¬ job.rest;
job.rest ¬ NIL;
};
};
InternalFreeJob:
INTERNAL
PROC [job: JobRef] =
INLINE {
--WARNING: job MUST NOT be in any list since rest field used
job.first.data ¬ NIL;
job.rest ¬ freeList;
freeList ¬ job
};
Register:
PUBLIC
PROC [ms:
INT, proc:
PROC[
REF], data:
REF, priority:
CARD32] = {
p: PROC[REF] ¬ proc; --trap here if local proc
EntryRegister:
ENTRY
PROC [ms:
INT, proc:
PROC[
REF], data:
REF, priority:
CARD32] = {
ref: TimeRef ¬ FindTime[ms];
job: JobRef ¬ InternalNewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ TRUE;
job.rest ¬ ref.periodicJobs;
ref.periodicJobs ¬ job;
};
EntryRegister[ms, p, data, priority];
};
ForkSoon:
PUBLIC
PROC [ms:
INT, proc:
PROC[
REF], data:
REF, priority:
CARD32] = {
p: PROC[REF] ¬ proc; --trap here if local proc
EntryDelayedFork:
ENTRY
PROC [ms:
INT, proc:
PROC[
REF], data:
REF] = {
ref: TimeRef ¬ FindTime[ms];
job: JobRef ¬ InternalNewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ FALSE;
job.rest ¬ ref.nextTime;
ref.nextTime ¬ job;
};
IF ms<=forkImmediateTime
THEN {
job: JobRef ¬ NewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ FALSE;
job.rest ¬ NIL;
Enqueue[job];
IF ms<0 OR pCountHint<=0 THEN FastFork.Fork[Envelope, NIL, internalPriority];
pCountHint ¬ pCountHint-1;
}
ELSE EntryDelayedFork[ms, p, data];
};
FindTime:
INTERNAL
PROC [ms:
INT]
RETURNS [ref: TimeRef ¬ timeTop] =
INLINE {
WHILE minimumTime<ms
DO
IF ref.lessFrequent=NIL THEN ref.lessFrequent ¬ NEW[TimeRec];
ref ¬ ref.lessFrequent;
ms ¬ ms/2;
ENDLOOP;
};
NoOp: PROC [REF] = {};
Unregister:
PUBLIC
ENTRY
PROC [proc:
PROC[
REF], data:
REF] = {
ENABLE UNWIND => NULL;
Match:
INTERNAL
PROC [job: JobRef]
RETURNS [
BOOL] = {
RETURN [job.first.proc=proc AND job.first.data=data]
};
UnregisterTime:
INTERNAL
PROC [time: TimeRef] = {
lag: JobList ¬ NIL;
list: JobList ¬ time.periodicJobs;
WHILE list#
NIL
DO
IF Match[list]
THEN {
--don't worry about calling InternalFreeJob; its tricky and occurs rarely
list.first.proc ¬ NoOp; --so won't be called even if already in delay buffer
IF lag=
NIL
THEN {time.periodicJobs ¬ list.rest; list ¬ time.periodicJobs}
ELSE {lag.rest ¬ list.rest; list ¬ lag.rest}
}
ELSE {lag ¬ list; list ¬ list.rest};
ENDLOOP
};
FOR ref: TimeRef ¬ timeTop, ref.lessFrequent
WHILE ref#
NIL
DO
UnregisterTime[ref]
ENDLOOP;
};
unregisterSelf: SIGNAL = CODE;
UnregisterSelf:
PUBLIC PROC [] = {
SIGNAL unregisterSelf
};
spread the time of forking:
1) to increase probability of reusing PROCESS resource
2) to balance load
Using two buffers
delaybuffer: delays jobs multiples of time intervals
(so large intervals will spread the calls on multiple minor intervals)
serializeBuffer: serializing calls while minor interval
(usually will be emptied before next minor interval)
access to "serialize" variables must be monitored
serializeNum: INT = 64;
serializeYoungest: [0..serializeNum) ¬ 0;
serializeCnt: INT ¬ 0;
serializeBuffer:
REF
ARRAY [0..serializeNum)
OF JobRef ¬
NEW[ARRAY [0..serializeNum) OF JobRef ¬ ALL[NIL]];
access to "delay" variables not monitored; only one process
delayNum: INT = 128;
delayYoungest: [0..delayNum) ¬ 0;
delayCnt: INT ¬ 0;
delayBuffer:
REF
ARRAY [0..delayNum)
OF JobRef ¬
NEW[ARRAY [0..delayNum) OF JobRef ¬ ALL[NIL]];
pCountHint: INT ¬ 0; --not exact since don't know who waits for conditions. Not monitored but periodically reset.
countForOne: INT = 5;
ForwardDelayed:
PROC [n:
INT ¬ 1] = {
--moves "n" jobs from delay buffer into serialize buffer
WHILE delayCnt>0
--not empty--
AND n>0
DO
delayCnt ¬ delayCnt-1;
n ¬ n - 1;
BEGIN
job: JobRef ¬ delayBuffer[(delayYoungest+delayNum-delayCnt) MOD delayNum];
Enqueue[job];
END;
ENDLOOP;
};
internalPriority: Process.Priority ~ Process.priorityClient3;
Envelope:
PROC [data:
REF] = {
--execute jobs as long as some are available in serialize buffer
job: JobRef ¬ NIL;
pCountHint ¬ pCountHint+countForOne;
DO
job ¬ Dequeue[job];
IF job=NIL THEN {pCountHint ¬ MAX[pCountHint-countForOne, 0]; RETURN};
IF job.first.priority#internalPriority THEN Process.SetPriority[job.first.priority];
job.first.proc[job.first.data !
UNWIND => {pCountHint ¬ pCountHint+countForOne; CONTINUE};
unregisterSelf => {Unregister[job.first.proc, job.first.data]; RESUME};
RuntimeError.UNCAUGHT => {pCountHint ¬ pCountHint-countForOne};
];
IF job.first.priority#internalPriority THEN Process.SetPriority[internalPriority];
job.first.isForked ¬ joined;
ENDLOOP
};
Dequeue:
ENTRY
PROC [last: JobRef]
RETURNS [job: JobRef ¬
NIL] =
INLINE {
--Returns next job to be executed out of serialize buffer
--last: may be used to free last job [ughh: this saves a monitor entry/exit]
IF last#NIL AND ~last.first.periodic THEN InternalFreeJob[last];
IF serializeCnt>0
--not empty--
THEN {
serializeCnt ¬ serializeCnt-1;
job ¬ serializeBuffer[(serializeYoungest+serializeNum-serializeCnt) MOD serializeNum];
};
};
EntryEnqueue:
ENTRY
PROC [job: JobRef]
RETURNS [success:
BOOL] =
INLINE {
--enqueues job into serialize buffer if space available
IF serializeCnt>=serializeNum --full-- THEN RETURN [FALSE];
serializeYoungest ¬ (serializeYoungest+1) MOD serializeNum;
serializeBuffer[serializeYoungest] ¬ job;
serializeCnt ¬ serializeCnt+1;
job.first.isForked ¬ enqueued;
success ¬ TRUE;
};
EnqueueHard:
PROC [job: JobRef] = {
--enqueues job into serialize buffer; insists even if spinning necessary
pauseLength: INT ¬ 1;
DO
--Fork one more process to empty buffer
FastFork.Fork[Envelope, NIL, internalPriority];
Process.Pause[pauseLength]; --give it a chance to do some work
IF EntryEnqueue[job].success THEN RETURN;
pauseLength ¬ pauseLength*2;
--It's ok to loop and fork more processes: buffer is full
ENDLOOP;
};
Enqueue:
PROC [job: JobRef] =
INLINE {
--enqueues job into serialize buffer
IF ~EntryEnqueue[job].success THEN EnqueueHard[job];
};
Delay:
PROC [job: JobRef] =
INLINE {
--enqueues job into delay buffer
IF delayCnt>=delayNum --full-- THEN ForwardDelayed[3];
delayYoungest ¬ (delayYoungest+1) MOD delayNum;
delayBuffer[delayYoungest] ¬ job;
delayCnt ¬ delayCnt+1;
};
GetTheDelayedForks:
ENTRY
PROC [time: TimeRef]
RETURNS [jobs: JobList] =
INLINE {
jobs ¬ time.thisTime;
time.thisTime ¬ time.nextTime;
time.nextTime ¬ NIL;
};
SweepThisPeriod:
PROC [time: TimeRef] =
INLINE {
--periodic forks
-- not monitored; all what can happen is missing the first turn
FOR jobs: JobList ¬ time.periodicJobs, jobs.rest
WHILE jobs#
NIL
DO
IF jobs.first.isForked=joined
THEN {
IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs]
};
ENDLOOP;
--delayed forks
-- monitored; we can't afford missing the only turn
IF time.thisTime#
NIL
OR time.nextTime#
NIL
THEN {
jobs: JobList ¬ GetTheDelayedForks[time];
WHILE jobs#
NIL
DO
next: JobList ¬ jobs.rest; --save the rest field before possible freeing of first job clobbers...
jobs.rest ← NIL;
IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs];
jobs ¬ next
ENDLOOP;
};
};
SweepThisAndLargerPeriods:
PROC [time: TimeRef] =
INLINE {
WHILE time#
NIL
DO
SweepThisPeriod[time];
--recurse for double time increment
IF time.toggle ¬ ~time.toggle THEN time ¬ time.lessFrequent ELSE RETURN;
ENDLOOP;
};
PeriodicalProcess:
PROC [data:
REF] = {
DO
SweepThisAndLargerPeriods[timeTop];
IF delayCnt>0 THEN ForwardDelayed[delayCnt/4+1];
pCountHint ¬ 0;
--old processes which are still alive will probably not help
--freshly created processes because of overflow need not be considered; we want more parallelism in this case
IF serializeCnt>0 THEN FastFork.Fork[Envelope, NIL, internalPriority];
Process.Pause[minimumTicks];
ENDLOOP
};
FastFork.Fork[PeriodicalProcess, NIL, Process.priorityForeground];
END.