<<>> <> <> <> <> <<>> DIRECTORY DelayedFork, FastFork, PeriodicalFork, Process, RuntimeError; PeriodicalForkImpl: CEDAR MONITOR IMPORTS FastFork, Process, RuntimeError EXPORTS DelayedFork, PeriodicalFork ~ BEGIN forkImmediateTime: INT = 32; minimumTime: INT = 64; minimumTicks: Process.Ticks = Process.MsecToTicks[minimumTime]; timeTop: TimeRef ¬ NEW[TimeRec]; TimeRef: TYPE = REF TimeRec; TimeRec: TYPE = RECORD [ toggle: BOOL ¬ FALSE, --not monitored, there is only 1 PeriodicalProcess lessFrequent: TimeRef ¬ NIL, periodicJobs: JobList ¬ NIL, thisTime: JobList ¬ NIL, nextTime: JobList ¬ NIL ]; JobRef: TYPE = LIST OF JobRec; <<--understanding is: only first list element may be used and constitutes job>> JobList: TYPE = LIST OF JobRec; <<--understanding is: all elements may be used>> JobRec: TYPE = RECORD [ proc: PROC[REF] ¬ NIL, data: REF ¬ NIL, priority: CARD32 ¬ 0, isForked: ForkState ¬ joined, --prevent forking more than one process periodic: BOOL ¬ FALSE ]; ForkState: TYPE = {joined, enqueued}; freeList: JobList ¬ NIL; NewJob: ENTRY PROC RETURNS [JobRef] = INLINE { RETURN [InternalNewJob[]]; }; InternalNewJob: INTERNAL PROC RETURNS [job: JobRef] = INLINE { IF freeList=NIL THEN job ¬ LIST[[]] ELSE { job ¬ freeList; freeList ¬ job.rest; job.rest ¬ NIL; }; }; InternalFreeJob: INTERNAL PROC [job: JobRef] = INLINE { <<--WARNING: job MUST NOT be in any list since rest field used>> job.first.data ¬ NIL; job.rest ¬ freeList; freeList ¬ job }; Register: PUBLIC PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = { p: PROC[REF] ¬ proc; --trap here if local proc EntryRegister: ENTRY PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = { ref: TimeRef ¬ FindTime[ms]; job: JobRef ¬ InternalNewJob[]; job.first.isForked ¬ joined; job.first.proc ¬ proc; job.first.data ¬ data; job.first.priority ¬ priority; job.first.periodic ¬ TRUE; job.rest ¬ ref.periodicJobs; ref.periodicJobs ¬ job; }; EntryRegister[ms, p, data, priority]; }; ForkSoon: PUBLIC PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = { p: PROC[REF] ¬ proc; --trap here if local proc EntryDelayedFork: ENTRY PROC [ms: INT, proc: PROC[REF], data: REF] = { ref: TimeRef ¬ FindTime[ms]; job: JobRef ¬ InternalNewJob[]; job.first.isForked ¬ joined; job.first.proc ¬ proc; job.first.data ¬ data; job.first.priority ¬ priority; job.first.periodic ¬ FALSE; job.rest ¬ ref.nextTime; ref.nextTime ¬ job; }; IF ms<=forkImmediateTime THEN { job: JobRef ¬ NewJob[]; job.first.isForked ¬ joined; job.first.proc ¬ proc; job.first.data ¬ data; job.first.priority ¬ priority; job.first.periodic ¬ FALSE; job.rest ¬ NIL; Enqueue[job]; IF ms<0 OR pCountHint<=0 THEN FastFork.Fork[Envelope, NIL, internalPriority]; pCountHint ¬ pCountHint-1; } ELSE EntryDelayedFork[ms, p, data]; }; FindTime: INTERNAL PROC [ms: INT] RETURNS [ref: TimeRef ¬ timeTop] = INLINE { WHILE minimumTime NULL; Match: INTERNAL PROC [job: JobRef] RETURNS [BOOL] = { RETURN [job.first.proc=proc AND job.first.data=data] }; UnregisterTime: INTERNAL PROC [time: TimeRef] = { lag: JobList ¬ NIL; list: JobList ¬ time.periodicJobs; WHILE list#NIL DO IF Match[list] THEN { <<--don't worry about calling InternalFreeJob; its tricky and occurs rarely>> list.first.proc ¬ NoOp; --so won't be called even if already in delay buffer IF lag=NIL THEN {time.periodicJobs ¬ list.rest; list ¬ time.periodicJobs} ELSE {lag.rest ¬ list.rest; list ¬ lag.rest} } ELSE {lag ¬ list; list ¬ list.rest}; ENDLOOP }; FOR ref: TimeRef ¬ timeTop, ref.lessFrequent WHILE ref#NIL DO UnregisterTime[ref] ENDLOOP; }; unregisterSelf: SIGNAL = CODE; UnregisterSelf: PUBLIC PROC [] = { SIGNAL unregisterSelf }; <> <<1) to increase probability of reusing PROCESS resource >> <<2) to balance load >> <> <> <<(so large intervals will spread the calls on multiple minor intervals)>> <> <<(usually will be emptied before next minor interval)>> <> serializeNum: INT = 64; serializeYoungest: [0..serializeNum) ¬ 0; serializeCnt: INT ¬ 0; serializeBuffer: REF ARRAY [0..serializeNum) OF JobRef ¬ NEW[ARRAY [0..serializeNum) OF JobRef ¬ ALL[NIL]]; <<>> <> delayNum: INT = 128; delayYoungest: [0..delayNum) ¬ 0; delayCnt: INT ¬ 0; delayBuffer: REF ARRAY [0..delayNum) OF JobRef ¬ NEW[ARRAY [0..delayNum) OF JobRef ¬ ALL[NIL]]; pCountHint: INT ¬ 0; --not exact since don't know who waits for conditions. Not monitored but periodically reset. countForOne: INT = 5; ForwardDelayed: PROC [n: INT ¬ 1] = { <<--moves "n" jobs from delay buffer into serialize buffer>> WHILE delayCnt>0 --not empty-- AND n>0 DO delayCnt ¬ delayCnt-1; n ¬ n - 1; BEGIN job: JobRef ¬ delayBuffer[(delayYoungest+delayNum-delayCnt) MOD delayNum]; Enqueue[job]; END; ENDLOOP; }; internalPriority: Process.Priority ~ Process.priorityClient3; Envelope: PROC [data: REF] = { <<--execute jobs as long as some are available in serialize buffer>> job: JobRef ¬ NIL; pCountHint ¬ pCountHint+countForOne; DO job ¬ Dequeue[job]; IF job=NIL THEN {pCountHint ¬ MAX[pCountHint-countForOne, 0]; RETURN}; IF job.first.priority#internalPriority THEN Process.SetPriority[job.first.priority]; job.first.proc[job.first.data ! UNWIND => {pCountHint ¬ pCountHint+countForOne; CONTINUE}; unregisterSelf => {Unregister[job.first.proc, job.first.data]; RESUME}; RuntimeError.UNCAUGHT => {pCountHint ¬ pCountHint-countForOne}; ]; IF job.first.priority#internalPriority THEN Process.SetPriority[internalPriority]; job.first.isForked ¬ joined; ENDLOOP }; Dequeue: ENTRY PROC [last: JobRef] RETURNS [job: JobRef ¬ NIL] = INLINE { <<--Returns next job to be executed out of serialize buffer>> <<--last: may be used to free last job [ughh: this saves a monitor entry/exit]>> IF last#NIL AND ~last.first.periodic THEN InternalFreeJob[last]; IF serializeCnt>0 --not empty-- THEN { serializeCnt ¬ serializeCnt-1; job ¬ serializeBuffer[(serializeYoungest+serializeNum-serializeCnt) MOD serializeNum]; }; }; EntryEnqueue: ENTRY PROC [job: JobRef] RETURNS [success: BOOL] = INLINE { <<--enqueues job into serialize buffer if space available>> IF serializeCnt>=serializeNum --full-- THEN RETURN [FALSE]; serializeYoungest ¬ (serializeYoungest+1) MOD serializeNum; serializeBuffer[serializeYoungest] ¬ job; serializeCnt ¬ serializeCnt+1; job.first.isForked ¬ enqueued; success ¬ TRUE; }; EnqueueHard: PROC [job: JobRef] = { <<--enqueues job into serialize buffer; insists even if spinning necessary>> pauseLength: INT ¬ 1; DO <<--Fork one more process to empty buffer>> FastFork.Fork[Envelope, NIL, internalPriority]; Process.Pause[pauseLength]; --give it a chance to do some work IF EntryEnqueue[job].success THEN RETURN; pauseLength ¬ pauseLength*2; <<--It's ok to loop and fork more processes: buffer is full>> ENDLOOP; }; Enqueue: PROC [job: JobRef] = INLINE { <<--enqueues job into serialize buffer>> IF ~EntryEnqueue[job].success THEN EnqueueHard[job]; }; Delay: PROC [job: JobRef] = INLINE { <<--enqueues job into delay buffer>> IF delayCnt>=delayNum --full-- THEN ForwardDelayed[3]; delayYoungest ¬ (delayYoungest+1) MOD delayNum; delayBuffer[delayYoungest] ¬ job; delayCnt ¬ delayCnt+1; }; GetTheDelayedForks: ENTRY PROC [time: TimeRef] RETURNS [jobs: JobList] = INLINE { jobs ¬ time.thisTime; time.thisTime ¬ time.nextTime; time.nextTime ¬ NIL; }; SweepThisPeriod: PROC [time: TimeRef] = INLINE { <<--periodic forks>> <<-- not monitored; all what can happen is missing the first turn>> FOR jobs: JobList ¬ time.periodicJobs, jobs.rest WHILE jobs#NIL DO IF jobs.first.isForked=joined THEN { IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs] }; ENDLOOP; <<--delayed forks>> <<-- monitored; we can't afford missing the only turn>> IF time.thisTime#NIL OR time.nextTime#NIL THEN { jobs: JobList ¬ GetTheDelayedForks[time]; WHILE jobs#NIL DO next: JobList ¬ jobs.rest; --save the rest field before possible freeing of first job clobbers... jobs.rest _ NIL; IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs]; jobs ¬ next ENDLOOP; }; }; SweepThisAndLargerPeriods: PROC [time: TimeRef] = INLINE { WHILE time#NIL DO SweepThisPeriod[time]; <<--recurse for double time increment>> IF time.toggle ¬ ~time.toggle THEN time ¬ time.lessFrequent ELSE RETURN; ENDLOOP; }; PeriodicalProcess: PROC [data: REF] = { DO SweepThisAndLargerPeriods[timeTop]; IF delayCnt>0 THEN ForwardDelayed[delayCnt/4+1]; pCountHint ¬ 0; <<--old processes which are still alive will probably not help >> <<--freshly created processes because of overflow need not be considered; we want more parallelism in this case >> IF serializeCnt>0 THEN FastFork.Fork[Envelope, NIL, internalPriority]; Process.Pause[minimumTicks]; ENDLOOP }; FastFork.Fork[PeriodicalProcess, NIL, Process.priorityForeground]; END.