PeriodicalForkImpl.mesa
Copyright Ó 1989, 1991, 1992 by Xerox Corporation.  All rights reserved.
Created by Christian Jacobi, March 1, 1989 5:17:26 pm PST
Christian Jacobi, March 5, 1992 11:42 am PST 
 
DIRECTORY DelayedFork, FastFork, PeriodicalFork, Process, RuntimeError;
 
PeriodicalForkImpl: 
CEDAR 
MONITOR
IMPORTS FastFork, Process, RuntimeError
EXPORTS DelayedFork, PeriodicalFork ~ 
 
forkImmediateTime: INT = 32;
minimumTime: INT = 64;
minimumTicks: Process.Ticks = Process.MsecToTicks[minimumTime];
timeTop: TimeRef ¬ NEW[TimeRec];
TimeRef: TYPE = REF TimeRec;
TimeRec: 
TYPE = 
RECORD [
toggle: BOOL ¬ FALSE, --not monitored, there is only 1 PeriodicalProcess
lessFrequent: TimeRef ¬ NIL,
periodicJobs: JobList ¬ NIL,
thisTime: JobList ¬ NIL,
nextTime: JobList ¬ NIL
];
JobRef: 
TYPE = 
LIST 
OF JobRec;
--understanding is: only first list element may be used and constitutes job
JobList: 
TYPE = 
LIST 
OF JobRec;
--understanding is: all elements may be used
JobRec: 
TYPE = 
RECORD [
proc: PROC[REF] ¬ NIL, 
data: REF ¬ NIL,
priority: CARD32  ¬ 0,
isForked: ForkState ¬ joined, --prevent forking more than one process
periodic: BOOL ¬ FALSE
];
ForkState: TYPE = {joined, enqueued};
freeList: JobList ¬ NIL;
NewJob: 
ENTRY 
PROC 
RETURNS [JobRef] = 
INLINE {
RETURN [InternalNewJob[]];
}; 
 
InternalNewJob: 
INTERNAL 
PROC 
RETURNS [job: JobRef] = 
INLINE {
IF freeList=
NIL 
THEN job ¬ LIST[[]] 
ELSE {
job ¬ freeList; 
freeList ¬ job.rest;
job.rest ¬ NIL;
};
 
 
}; 
 
InternalFreeJob: 
INTERNAL 
PROC [job: JobRef] = 
INLINE {
--WARNING: job MUST NOT be in any list since rest field used
job.first.data ¬ NIL;
job.rest ¬ freeList; 
freeList ¬ job
}; 
 
Register: 
PUBLIC 
PROC [ms: 
INT, proc: 
PROC[
REF], data: 
REF, priority: 
CARD32] = {
p: PROC[REF] ¬ proc; --trap here if local proc
EntryRegister: 
ENTRY 
PROC [ms: 
INT, proc: 
PROC[
REF], data: 
REF, priority: 
CARD32] = {
ref: TimeRef ¬ FindTime[ms];
job: JobRef ¬ InternalNewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ TRUE;
job.rest ¬ ref.periodicJobs;
ref.periodicJobs ¬ job;
};
 
EntryRegister[ms, p, data, priority];
};
 
ForkSoon: 
PUBLIC 
PROC [ms: 
INT, proc: 
PROC[
REF], data: 
REF, priority: 
CARD32] = {
p: PROC[REF] ¬ proc; --trap here if local proc
EntryDelayedFork: 
ENTRY 
PROC [ms: 
INT, proc: 
PROC[
REF], data: 
REF] = {
ref: TimeRef ¬ FindTime[ms];
job: JobRef ¬ InternalNewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ FALSE;
job.rest ¬ ref.nextTime;
ref.nextTime ¬ job;
};
 
IF ms<=forkImmediateTime 
THEN {
job: JobRef ¬ NewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ FALSE;
job.rest ¬ NIL;
Enqueue[job];
IF ms<0 OR pCountHint<=0 THEN FastFork.Fork[Envelope, NIL, internalPriority];
pCountHint ¬ pCountHint-1;
}
 
ELSE EntryDelayedFork[ms, p, data];
 
};
 
FindTime: 
INTERNAL 
PROC [ms: 
INT] 
RETURNS [ref: TimeRef ¬ timeTop] = 
INLINE {
WHILE minimumTime<ms 
DO
IF ref.lessFrequent=NIL THEN ref.lessFrequent ¬ NEW[TimeRec];
ref ¬ ref.lessFrequent;
ms ¬ ms/2;
ENDLOOP;
 
};
 
NoOp: PROC [REF] = {};
Unregister: 
PUBLIC 
ENTRY 
PROC [proc: 
PROC[
REF], data: 
REF] = {
ENABLE UNWIND => NULL;
Match: 
INTERNAL 
PROC [job: JobRef] 
RETURNS [
BOOL] = {
RETURN [job.first.proc=proc AND job.first.data=data]
};
 
UnregisterTime: 
INTERNAL 
PROC [time: TimeRef] = {
lag: JobList ¬ NIL;
list: JobList ¬ time.periodicJobs;
WHILE list#
NIL 
DO
IF Match[list] 
THEN {
--don't worry about calling InternalFreeJob; its tricky and occurs rarely
list.first.proc ¬ NoOp; --so won't be called even if already in delay buffer
IF lag=
NIL 
THEN {time.periodicJobs ¬ list.rest; list ¬ time.periodicJobs}
ELSE {lag.rest ¬ list.rest; list ¬ lag.rest}
 
}
 
ELSE {lag ¬ list; list ¬ list.rest}; 
 
ENDLOOP
 
};
 
FOR ref: TimeRef ¬ timeTop, ref.lessFrequent 
WHILE ref#
NIL 
DO
UnregisterTime[ref]
ENDLOOP;
 
};
 
unregisterSelf: SIGNAL = CODE;
UnregisterSelf: 
PUBLIC PROC [] = {
SIGNAL unregisterSelf
};
 
spread the time of forking: 
1) to increase probability of reusing PROCESS resource 
2) to balance load 
 
Using two buffers
delaybuffer: delays jobs multiples of time intervals 
(so large intervals will spread the calls on multiple minor intervals)
 
serializeBuffer: serializing calls while minor interval 
(usually will be emptied before next minor interval)
 
 
access to "serialize" variables must be monitored
serializeNum: INT = 64;
serializeYoungest: [0..serializeNum) ¬ 0; 
serializeCnt: INT ¬ 0;
serializeBuffer: 
REF 
ARRAY [0..serializeNum) 
OF JobRef ¬
 NEW[ARRAY [0..serializeNum) OF JobRef ¬ ALL[NIL]];
access to "delay" variables not monitored; only one process
delayNum: INT = 128;
delayYoungest: [0..delayNum) ¬ 0; 
delayCnt: INT ¬ 0;
delayBuffer: 
REF 
ARRAY [0..delayNum) 
OF JobRef ¬
 NEW[ARRAY [0..delayNum) OF JobRef ¬ ALL[NIL]];
pCountHint: INT ¬ 0; --not exact since don't know who waits for conditions.  Not monitored but periodically reset.
countForOne: INT = 5;
ForwardDelayed: 
PROC [n: 
INT ¬ 1] = {
--moves "n" jobs from delay buffer into serialize buffer
WHILE delayCnt>0 
--not empty-- 
AND n>0 
DO 
delayCnt ¬ delayCnt-1;
n ¬ n - 1;
BEGIN
job: JobRef ¬ delayBuffer[(delayYoungest+delayNum-delayCnt) MOD delayNum];
Enqueue[job];
END;
 
ENDLOOP;
 
};
 
internalPriority: Process.Priority ~ Process.priorityClient3;
Envelope: 
PROC [data: 
REF] = {
--execute jobs as long as some are available in serialize buffer
job: JobRef ¬ NIL;
pCountHint ¬ pCountHint+countForOne;
DO
job ¬ Dequeue[job];
IF job=NIL THEN {pCountHint ¬ MAX[pCountHint-countForOne, 0]; RETURN};
IF job.first.priority#internalPriority THEN Process.SetPriority[job.first.priority];
job.first.proc[job.first.data ! 
UNWIND => {pCountHint ¬ pCountHint+countForOne; CONTINUE};
unregisterSelf => {Unregister[job.first.proc, job.first.data]; RESUME};
RuntimeError.UNCAUGHT => {pCountHint ¬ pCountHint-countForOne};
];
IF job.first.priority#internalPriority THEN Process.SetPriority[internalPriority];
job.first.isForked ¬ joined;
ENDLOOP
 
};
 
Dequeue: 
ENTRY 
PROC [last: JobRef] 
RETURNS [job: JobRef ¬ 
NIL] = 
INLINE {
--Returns next job to be executed out of serialize buffer
--last: may be used to free last job [ughh: this saves a monitor entry/exit]
IF last#NIL AND ~last.first.periodic THEN InternalFreeJob[last];
IF serializeCnt>0 
--not empty-- 
THEN {
serializeCnt ¬ serializeCnt-1;
job ¬ serializeBuffer[(serializeYoungest+serializeNum-serializeCnt) MOD serializeNum];
};
 
};
 
EntryEnqueue: 
ENTRY 
PROC [job: JobRef] 
RETURNS [success: 
BOOL] = 
INLINE {
--enqueues job into serialize buffer if space available
IF serializeCnt>=serializeNum --full-- THEN RETURN [FALSE];
serializeYoungest ¬ (serializeYoungest+1) MOD serializeNum;
serializeBuffer[serializeYoungest] ¬ job;
serializeCnt ¬ serializeCnt+1;
job.first.isForked ¬ enqueued;
success ¬ TRUE;
};
 
EnqueueHard: 
PROC [job: JobRef] = {
--enqueues job into serialize buffer; insists even if spinning necessary
pauseLength: INT ¬ 1;
DO 
--Fork one more process to empty buffer
FastFork.Fork[Envelope, NIL, internalPriority]; 
Process.Pause[pauseLength]; --give it a chance to do some work
IF EntryEnqueue[job].success THEN RETURN;
pauseLength ¬ pauseLength*2;
--It's ok to loop and fork more processes: buffer is full
ENDLOOP;
 
};
 
Enqueue: 
PROC [job: JobRef] = 
INLINE {
--enqueues job into serialize buffer
IF ~EntryEnqueue[job].success THEN EnqueueHard[job];
};
 
Delay: 
PROC [job: JobRef] = 
INLINE {
--enqueues job into delay buffer
IF delayCnt>=delayNum --full-- THEN ForwardDelayed[3];
delayYoungest ¬ (delayYoungest+1) MOD delayNum;
delayBuffer[delayYoungest] ¬ job;
delayCnt ¬ delayCnt+1;
};
 
GetTheDelayedForks: 
ENTRY 
PROC [time: TimeRef] 
RETURNS [jobs: JobList] = 
INLINE {
jobs ¬ time.thisTime;
time.thisTime ¬ time.nextTime;
time.nextTime ¬ NIL; 
};
 
SweepThisPeriod: 
PROC [time: TimeRef] = 
INLINE {
--periodic forks
-- not monitored; all what can happen is missing the first turn
FOR jobs: JobList ¬ time.periodicJobs, jobs.rest 
WHILE jobs#
NIL 
DO 
IF jobs.first.isForked=joined 
THEN {
IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs]
};
 
ENDLOOP;
 
--delayed forks
-- monitored; we can't afford missing the only turn
IF time.thisTime#
NIL 
OR time.nextTime#
NIL 
THEN {
jobs: JobList ¬ GetTheDelayedForks[time];
WHILE jobs#
NIL 
DO
next: JobList ¬ jobs.rest; --save the rest field before possible freeing of first job clobbers...
jobs.rest ← NIL;
IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs];
jobs ¬ next
ENDLOOP;
 
};
 
};
 
SweepThisAndLargerPeriods: 
PROC [time: TimeRef] = 
INLINE {
WHILE time#
NIL 
DO 
SweepThisPeriod[time];
--recurse for double time increment
IF time.toggle ¬ ~time.toggle THEN time ¬ time.lessFrequent ELSE RETURN;
ENDLOOP;
 
};
 
PeriodicalProcess: 
PROC [data: 
REF] = {
DO
SweepThisAndLargerPeriods[timeTop];
IF delayCnt>0 THEN ForwardDelayed[delayCnt/4+1];
pCountHint ¬ 0; 
--old processes which are still alive will probably not help 
--freshly created processes because of overflow need not be considered; we want more parallelism in this case  
IF serializeCnt>0 THEN FastFork.Fork[Envelope, NIL, internalPriority];
Process.Pause[minimumTicks];
ENDLOOP
 
};
 
FastFork.Fork[PeriodicalProcess, NIL, Process.priorityForeground];
END.