PeriodicalForkImpl.mesa
Copyright Ó 1989, 1991, 1992 by Xerox Corporation. All rights reserved.
Created by Christian Jacobi, March 1, 1989 5:17:26 pm PST
Christian Jacobi, March 5, 1992 11:42 am PST
DIRECTORY DelayedFork, FastFork, PeriodicalFork, Process, RuntimeError;
PeriodicalForkImpl: CEDAR MONITOR
IMPORTS FastFork, Process, RuntimeError
EXPORTS DelayedFork, PeriodicalFork ~
BEGIN
forkImmediateTime: INT = 32;
minimumTime: INT = 64;
minimumTicks: Process.Ticks = Process.MsecToTicks[minimumTime];
timeTop: TimeRef ¬ NEW[TimeRec];
TimeRef: TYPE = REF TimeRec;
TimeRec: TYPE = RECORD [
toggle: BOOL ¬ FALSE, --not monitored, there is only 1 PeriodicalProcess
lessFrequent: TimeRef ¬ NIL,
periodicJobs: JobList ¬ NIL,
thisTime: JobList ¬ NIL,
nextTime: JobList ¬ NIL
];
JobRef: TYPE = LIST OF JobRec;
--understanding is: only first list element may be used and constitutes job
JobList: TYPE = LIST OF JobRec;
--understanding is: all elements may be used
JobRec: TYPE = RECORD [
proc: PROC[REF] ¬ NIL,
data: REF ¬ NIL,
priority: CARD32 ¬ 0,
isForked: ForkState ¬ joined, --prevent forking more than one process
periodic: BOOL ¬ FALSE
];
ForkState: TYPE = {joined, enqueued};
freeList: JobList ¬ NIL;
NewJob: ENTRY PROC RETURNS [JobRef] = INLINE {
RETURN [InternalNewJob[]];
};
InternalNewJob: INTERNAL PROC RETURNS [job: JobRef] = INLINE {
IF freeList=NIL
THEN job ¬ LIST[[]]
ELSE {
job ¬ freeList;
freeList ¬ job.rest;
job.rest ¬ NIL;
};
};
InternalFreeJob: INTERNAL PROC [job: JobRef] = INLINE {
--WARNING: job MUST NOT be in any list since rest field used
job.first.data ¬ NIL;
job.rest ¬ freeList;
freeList ¬ job
};
Register: PUBLIC PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = {
p: PROC[REF] ¬ proc; --trap here if local proc
EntryRegister: ENTRY PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = {
ref: TimeRef ¬ FindTime[ms];
job: JobRef ¬ InternalNewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ TRUE;
job.rest ¬ ref.periodicJobs;
ref.periodicJobs ¬ job;
};
EntryRegister[ms, p, data, priority];
};
ForkSoon: PUBLIC PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = {
p: PROC[REF] ¬ proc; --trap here if local proc
EntryDelayedFork: ENTRY PROC [ms: INT, proc: PROC[REF], data: REF] = {
ref: TimeRef ¬ FindTime[ms];
job: JobRef ¬ InternalNewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ FALSE;
job.rest ¬ ref.nextTime;
ref.nextTime ¬ job;
};
IF ms<=forkImmediateTime
THEN {
job: JobRef ¬ NewJob[];
job.first.isForked ¬ joined;
job.first.proc ¬ proc;
job.first.data ¬ data;
job.first.priority ¬ priority;
job.first.periodic ¬ FALSE;
job.rest ¬ NIL;
Enqueue[job];
IF ms<0 OR pCountHint<=0 THEN FastFork.Fork[Envelope, NIL, internalPriority];
pCountHint ¬ pCountHint-1;
}
ELSE EntryDelayedFork[ms, p, data];
};
FindTime: INTERNAL PROC [ms: INT] RETURNS [ref: TimeRef ¬ timeTop] = INLINE {
WHILE minimumTime<ms DO
IF ref.lessFrequent=NIL THEN ref.lessFrequent ¬ NEW[TimeRec];
ref ¬ ref.lessFrequent;
ms ¬ ms/2;
ENDLOOP;
};
NoOp: PROC [REF] = {};
Unregister: PUBLIC ENTRY PROC [proc: PROC[REF], data: REF] = {
ENABLE UNWIND => NULL;
Match: INTERNAL PROC [job: JobRef] RETURNS [BOOL] = {
RETURN [job.first.proc=proc AND job.first.data=data]
};
UnregisterTime: INTERNAL PROC [time: TimeRef] = {
lag: JobList ¬ NIL;
list: JobList ¬ time.periodicJobs;
WHILE list#NIL DO
IF Match[list]
THEN {
--don't worry about calling InternalFreeJob; its tricky and occurs rarely
list.first.proc ¬ NoOp; --so won't be called even if already in delay buffer
IF lag=NIL
THEN {time.periodicJobs ¬ list.rest; list ¬ time.periodicJobs}
ELSE {lag.rest ¬ list.rest; list ¬ lag.rest}
}
ELSE {lag ¬ list; list ¬ list.rest};
ENDLOOP
};
FOR ref: TimeRef ¬ timeTop, ref.lessFrequent WHILE ref#NIL DO
UnregisterTime[ref]
ENDLOOP;
};
unregisterSelf: SIGNAL = CODE;
UnregisterSelf: PUBLIC PROC [] = {
SIGNAL unregisterSelf
};
spread the time of forking:
1) to increase probability of reusing PROCESS resource
2) to balance load
Using two buffers
delaybuffer: delays jobs multiples of time intervals
(so large intervals will spread the calls on multiple minor intervals)
serializeBuffer: serializing calls while minor interval
(usually will be emptied before next minor interval)
access to "serialize" variables must be monitored
serializeNum: INT = 64;
serializeYoungest: [0..serializeNum) ¬ 0;
serializeCnt: INT ¬ 0;
serializeBuffer: REF ARRAY [0..serializeNum) OF JobRef ¬
NEW[ARRAY [0..serializeNum) OF JobRef ¬ ALL[NIL]];
access to "delay" variables not monitored; only one process
delayNum: INT = 128;
delayYoungest: [0..delayNum) ¬ 0;
delayCnt: INT ¬ 0;
delayBuffer: REF ARRAY [0..delayNum) OF JobRef ¬
NEW[ARRAY [0..delayNum) OF JobRef ¬ ALL[NIL]];
pCountHint: INT ¬ 0; --not exact since don't know who waits for conditions. Not monitored but periodically reset.
countForOne: INT = 5;
ForwardDelayed: PROC [n: INT ¬ 1] = {
--moves "n" jobs from delay buffer into serialize buffer
WHILE delayCnt>0 --not empty-- AND n>0 DO
delayCnt ¬ delayCnt-1;
n ¬ n - 1;
BEGIN
job: JobRef ¬ delayBuffer[(delayYoungest+delayNum-delayCnt) MOD delayNum];
Enqueue[job];
END;
ENDLOOP;
};
internalPriority: Process.Priority ~ Process.priorityClient3;
Envelope: PROC [data: REF] = {
--execute jobs as long as some are available in serialize buffer
job: JobRef ¬ NIL;
pCountHint ¬ pCountHint+countForOne;
DO
job ¬ Dequeue[job];
IF job=NIL THEN {pCountHint ¬ MAX[pCountHint-countForOne, 0]; RETURN};
IF job.first.priority#internalPriority THEN Process.SetPriority[job.first.priority];
job.first.proc[job.first.data !
UNWIND => {pCountHint ¬ pCountHint+countForOne; CONTINUE};
unregisterSelf => {Unregister[job.first.proc, job.first.data]; RESUME};
RuntimeError.UNCAUGHT => {pCountHint ¬ pCountHint-countForOne};
];
IF job.first.priority#internalPriority THEN Process.SetPriority[internalPriority];
job.first.isForked ¬ joined;
ENDLOOP
};
Dequeue: ENTRY PROC [last: JobRef] RETURNS [job: JobRef ¬ NIL] = INLINE {
--Returns next job to be executed out of serialize buffer
--last: may be used to free last job [ughh: this saves a monitor entry/exit]
IF last#NIL AND ~last.first.periodic THEN InternalFreeJob[last];
IF serializeCnt>0 --not empty-- THEN {
serializeCnt ¬ serializeCnt-1;
job ¬ serializeBuffer[(serializeYoungest+serializeNum-serializeCnt) MOD serializeNum];
};
};
EntryEnqueue: ENTRY PROC [job: JobRef] RETURNS [success: BOOL] = INLINE {
--enqueues job into serialize buffer if space available
IF serializeCnt>=serializeNum --full-- THEN RETURN [FALSE];
serializeYoungest ¬ (serializeYoungest+1) MOD serializeNum;
serializeBuffer[serializeYoungest] ¬ job;
serializeCnt ¬ serializeCnt+1;
job.first.isForked ¬ enqueued;
success ¬ TRUE;
};
EnqueueHard: PROC [job: JobRef] = {
--enqueues job into serialize buffer; insists even if spinning necessary
pauseLength: INT ¬ 1;
DO
--Fork one more process to empty buffer
FastFork.Fork[Envelope, NIL, internalPriority];
Process.Pause[pauseLength]; --give it a chance to do some work
IF EntryEnqueue[job].success THEN RETURN;
pauseLength ¬ pauseLength*2;
--It's ok to loop and fork more processes: buffer is full
ENDLOOP;
};
Enqueue: PROC [job: JobRef] = INLINE {
--enqueues job into serialize buffer
IF ~EntryEnqueue[job].success THEN EnqueueHard[job];
};
Delay: PROC [job: JobRef] = INLINE {
--enqueues job into delay buffer
IF delayCnt>=delayNum --full-- THEN ForwardDelayed[3];
delayYoungest ¬ (delayYoungest+1) MOD delayNum;
delayBuffer[delayYoungest] ¬ job;
delayCnt ¬ delayCnt+1;
};
GetTheDelayedForks: ENTRY PROC [time: TimeRef] RETURNS [jobs: JobList] = INLINE {
jobs ¬ time.thisTime;
time.thisTime ¬ time.nextTime;
time.nextTime ¬ NIL;
};
SweepThisPeriod: PROC [time: TimeRef] = INLINE {
--periodic forks
-- not monitored; all what can happen is missing the first turn
FOR jobs: JobList ¬ time.periodicJobs, jobs.rest WHILE jobs#NIL DO
IF jobs.first.isForked=joined THEN {
IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs]
};
ENDLOOP;
--delayed forks
-- monitored; we can't afford missing the only turn
IF time.thisTime#NIL OR time.nextTime#NIL THEN {
jobs: JobList ¬ GetTheDelayedForks[time];
WHILE jobs#NIL DO
next: JobList ¬ jobs.rest; --save the rest field before possible freeing of first job clobbers...
jobs.rest ← NIL;
IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs];
jobs ¬ next
ENDLOOP;
};
};
SweepThisAndLargerPeriods: PROC [time: TimeRef] = INLINE {
WHILE time#NIL DO
SweepThisPeriod[time];
--recurse for double time increment
IF time.toggle ¬ ~time.toggle THEN time ¬ time.lessFrequent ELSE RETURN;
ENDLOOP;
};
PeriodicalProcess: PROC [data: REF] = {
DO
SweepThisAndLargerPeriods[timeTop];
IF delayCnt>0 THEN ForwardDelayed[delayCnt/4+1];
pCountHint ¬ 0;
--old processes which are still alive will probably not help
--freshly created processes because of overflow need not be considered; we want more parallelism in this case
IF serializeCnt>0 THEN FastFork.Fork[Envelope, NIL, internalPriority];
Process.Pause[minimumTicks];
ENDLOOP
};
FastFork.Fork[PeriodicalProcess, NIL, Process.priorityForeground];
END.