DIRECTORY DelayedFork, FastFork, PeriodicalFork, Process, RuntimeError; PeriodicalForkImpl: CEDAR MONITOR IMPORTS FastFork, Process, RuntimeError EXPORTS DelayedFork, PeriodicalFork ~ BEGIN forkImmediateTime: INT = 32; minimumTime: INT = 64; minimumTicks: Process.Ticks = Process.MsecToTicks[minimumTime]; timeTop: TimeRef ¬ NEW[TimeRec]; TimeRef: TYPE = REF TimeRec; TimeRec: TYPE = RECORD [ toggle: BOOL ¬ FALSE, --not monitored, there is only 1 PeriodicalProcess lessFrequent: TimeRef ¬ NIL, periodicJobs: JobList ¬ NIL, thisTime: JobList ¬ NIL, nextTime: JobList ¬ NIL ]; JobRef: TYPE = LIST OF JobRec; JobList: TYPE = LIST OF JobRec; JobRec: TYPE = RECORD [ proc: PROC[REF] ¬ NIL, data: REF ¬ NIL, priority: CARD32 ¬ 0, isForked: ForkState ¬ joined, --prevent forking more than one process periodic: BOOL ¬ FALSE ]; ForkState: TYPE = {joined, enqueued}; freeList: JobList ¬ NIL; NewJob: ENTRY PROC RETURNS [JobRef] = INLINE { RETURN [InternalNewJob[]]; }; InternalNewJob: INTERNAL PROC RETURNS [job: JobRef] = INLINE { IF freeList=NIL THEN job ¬ LIST[[]] ELSE { job ¬ freeList; freeList ¬ job.rest; job.rest ¬ NIL; }; }; InternalFreeJob: INTERNAL PROC [job: JobRef] = INLINE { job.first.data ¬ NIL; job.rest ¬ freeList; freeList ¬ job }; Register: PUBLIC PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = { p: PROC[REF] ¬ proc; --trap here if local proc EntryRegister: ENTRY PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = { ref: TimeRef ¬ FindTime[ms]; job: JobRef ¬ InternalNewJob[]; job.first.isForked ¬ joined; job.first.proc ¬ proc; job.first.data ¬ data; job.first.priority ¬ priority; job.first.periodic ¬ TRUE; job.rest ¬ ref.periodicJobs; ref.periodicJobs ¬ job; }; EntryRegister[ms, p, data, priority]; }; ForkSoon: PUBLIC PROC [ms: INT, proc: PROC[REF], data: REF, priority: CARD32] = { p: PROC[REF] ¬ proc; --trap here if local proc EntryDelayedFork: ENTRY PROC [ms: INT, proc: PROC[REF], data: REF] = { ref: TimeRef ¬ FindTime[ms]; job: JobRef ¬ InternalNewJob[]; job.first.isForked ¬ joined; job.first.proc ¬ proc; job.first.data ¬ data; job.first.priority ¬ priority; job.first.periodic ¬ FALSE; job.rest ¬ ref.nextTime; ref.nextTime ¬ job; }; IF ms<=forkImmediateTime THEN { job: JobRef ¬ NewJob[]; job.first.isForked ¬ joined; job.first.proc ¬ proc; job.first.data ¬ data; job.first.priority ¬ priority; job.first.periodic ¬ FALSE; job.rest ¬ NIL; Enqueue[job]; IF ms<0 OR pCountHint<=0 THEN FastFork.Fork[Envelope, NIL, internalPriority]; pCountHint ¬ pCountHint-1; } ELSE EntryDelayedFork[ms, p, data]; }; FindTime: INTERNAL PROC [ms: INT] RETURNS [ref: TimeRef ¬ timeTop] = INLINE { WHILE minimumTime NULL; Match: INTERNAL PROC [job: JobRef] RETURNS [BOOL] = { RETURN [job.first.proc=proc AND job.first.data=data] }; UnregisterTime: INTERNAL PROC [time: TimeRef] = { lag: JobList ¬ NIL; list: JobList ¬ time.periodicJobs; WHILE list#NIL DO IF Match[list] THEN { list.first.proc ¬ NoOp; --so won't be called even if already in delay buffer IF lag=NIL THEN {time.periodicJobs ¬ list.rest; list ¬ time.periodicJobs} ELSE {lag.rest ¬ list.rest; list ¬ lag.rest} } ELSE {lag ¬ list; list ¬ list.rest}; ENDLOOP }; FOR ref: TimeRef ¬ timeTop, ref.lessFrequent WHILE ref#NIL DO UnregisterTime[ref] ENDLOOP; }; unregisterSelf: SIGNAL = CODE; UnregisterSelf: PUBLIC PROC [] = { SIGNAL unregisterSelf }; serializeNum: INT = 64; serializeYoungest: [0..serializeNum) ¬ 0; serializeCnt: INT ¬ 0; serializeBuffer: REF ARRAY [0..serializeNum) OF JobRef ¬ NEW[ARRAY [0..serializeNum) OF JobRef ¬ ALL[NIL]]; delayNum: INT = 128; delayYoungest: [0..delayNum) ¬ 0; delayCnt: INT ¬ 0; delayBuffer: REF ARRAY [0..delayNum) OF JobRef ¬ NEW[ARRAY [0..delayNum) OF JobRef ¬ ALL[NIL]]; pCountHint: INT ¬ 0; --not exact since don't know who waits for conditions. Not monitored but periodically reset. countForOne: INT = 5; ForwardDelayed: PROC [n: INT ¬ 1] = { WHILE delayCnt>0 --not empty-- AND n>0 DO delayCnt ¬ delayCnt-1; n ¬ n - 1; BEGIN job: JobRef ¬ delayBuffer[(delayYoungest+delayNum-delayCnt) MOD delayNum]; Enqueue[job]; END; ENDLOOP; }; internalPriority: Process.Priority ~ Process.priorityClient3; Envelope: PROC [data: REF] = { job: JobRef ¬ NIL; pCountHint ¬ pCountHint+countForOne; DO job ¬ Dequeue[job]; IF job=NIL THEN {pCountHint ¬ MAX[pCountHint-countForOne, 0]; RETURN}; IF job.first.priority#internalPriority THEN Process.SetPriority[job.first.priority]; job.first.proc[job.first.data ! UNWIND => {pCountHint ¬ pCountHint+countForOne; CONTINUE}; unregisterSelf => {Unregister[job.first.proc, job.first.data]; RESUME}; RuntimeError.UNCAUGHT => {pCountHint ¬ pCountHint-countForOne}; ]; IF job.first.priority#internalPriority THEN Process.SetPriority[internalPriority]; job.first.isForked ¬ joined; ENDLOOP }; Dequeue: ENTRY PROC [last: JobRef] RETURNS [job: JobRef ¬ NIL] = INLINE { IF last#NIL AND ~last.first.periodic THEN InternalFreeJob[last]; IF serializeCnt>0 --not empty-- THEN { serializeCnt ¬ serializeCnt-1; job ¬ serializeBuffer[(serializeYoungest+serializeNum-serializeCnt) MOD serializeNum]; }; }; EntryEnqueue: ENTRY PROC [job: JobRef] RETURNS [success: BOOL] = INLINE { IF serializeCnt>=serializeNum --full-- THEN RETURN [FALSE]; serializeYoungest ¬ (serializeYoungest+1) MOD serializeNum; serializeBuffer[serializeYoungest] ¬ job; serializeCnt ¬ serializeCnt+1; job.first.isForked ¬ enqueued; success ¬ TRUE; }; EnqueueHard: PROC [job: JobRef] = { pauseLength: INT ¬ 1; DO FastFork.Fork[Envelope, NIL, internalPriority]; Process.Pause[pauseLength]; --give it a chance to do some work IF EntryEnqueue[job].success THEN RETURN; pauseLength ¬ pauseLength*2; ENDLOOP; }; Enqueue: PROC [job: JobRef] = INLINE { IF ~EntryEnqueue[job].success THEN EnqueueHard[job]; }; Delay: PROC [job: JobRef] = INLINE { IF delayCnt>=delayNum --full-- THEN ForwardDelayed[3]; delayYoungest ¬ (delayYoungest+1) MOD delayNum; delayBuffer[delayYoungest] ¬ job; delayCnt ¬ delayCnt+1; }; GetTheDelayedForks: ENTRY PROC [time: TimeRef] RETURNS [jobs: JobList] = INLINE { jobs ¬ time.thisTime; time.thisTime ¬ time.nextTime; time.nextTime ¬ NIL; }; SweepThisPeriod: PROC [time: TimeRef] = INLINE { FOR jobs: JobList ¬ time.periodicJobs, jobs.rest WHILE jobs#NIL DO IF jobs.first.isForked=joined THEN { IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs] }; ENDLOOP; IF time.thisTime#NIL OR time.nextTime#NIL THEN { jobs: JobList ¬ GetTheDelayedForks[time]; WHILE jobs#NIL DO next: JobList ¬ jobs.rest; --save the rest field before possible freeing of first job clobbers... jobs.rest _ NIL; IF time=timeTop THEN Enqueue[jobs] ELSE Delay[jobs]; jobs ¬ next ENDLOOP; }; }; SweepThisAndLargerPeriods: PROC [time: TimeRef] = INLINE { WHILE time#NIL DO SweepThisPeriod[time]; IF time.toggle ¬ ~time.toggle THEN time ¬ time.lessFrequent ELSE RETURN; ENDLOOP; }; PeriodicalProcess: PROC [data: REF] = { DO SweepThisAndLargerPeriods[timeTop]; IF delayCnt>0 THEN ForwardDelayed[delayCnt/4+1]; pCountHint ¬ 0; IF serializeCnt>0 THEN FastFork.Fork[Envelope, NIL, internalPriority]; Process.Pause[minimumTicks]; ENDLOOP }; FastFork.Fork[PeriodicalProcess, NIL, Process.priorityForeground]; END. 8 PeriodicalForkImpl.mesa Copyright Σ 1989, 1991, 1992 by Xerox Corporation. All rights reserved. Created by Christian Jacobi, March 1, 1989 5:17:26 pm PST Christian Jacobi, March 5, 1992 11:42 am PST --understanding is: only first list element may be used and constitutes job --understanding is: all elements may be used --WARNING: job MUST NOT be in any list since rest field used --don't worry about calling InternalFreeJob; its tricky and occurs rarely spread the time of forking: 1) to increase probability of reusing PROCESS resource 2) to balance load Using two buffers delaybuffer: delays jobs multiples of time intervals (so large intervals will spread the calls on multiple minor intervals) serializeBuffer: serializing calls while minor interval (usually will be emptied before next minor interval) access to "serialize" variables must be monitored access to "delay" variables not monitored; only one process --moves "n" jobs from delay buffer into serialize buffer --execute jobs as long as some are available in serialize buffer --Returns next job to be executed out of serialize buffer --last: may be used to free last job [ughh: this saves a monitor entry/exit] --enqueues job into serialize buffer if space available --enqueues job into serialize buffer; insists even if spinning necessary --Fork one more process to empty buffer --It's ok to loop and fork more processes: buffer is full --enqueues job into serialize buffer --enqueues job into delay buffer --periodic forks -- not monitored; all what can happen is missing the first turn --delayed forks -- monitored; we can't afford missing the only turn --recurse for double time increment --old processes which are still alive will probably not help --freshly created processes because of overflow need not be considered; we want more parallelism in this case Κ >•NewlineDelimiter –(cedarcode) style™codešœ™Kšœ Οeœ=™HKšœ9™9Kšœ-™-K™—šΟk œ>˜GK˜—šΟnœžœž˜!Kšžœ ˜'Kšžœ˜&—šž˜K˜—Kšœžœ˜Kšœ žœ˜Kšœ?˜?K˜Kšœžœ ˜ Kšœ žœžœ ˜šœ žœžœ˜KšœžœžœΟc2˜HKšœžœ˜Kšœžœ˜Kšœžœ˜Kšœž˜K˜—K˜šœžœžœžœ˜Kš K™K—šœ žœžœžœ˜Kšœ,™,K˜—šœžœžœ˜Kš œžœžœžœžœ˜Kšœžœž˜Kšœ žœ˜Kšœ '˜EKšœ žœž˜K˜—K˜Kšœ žœ˜%K˜Kšœžœ˜K˜š Ÿœžœžœžœ žœ˜.Kšžœ˜Kšœ˜K˜—š Ÿœžœžœžœžœ˜>šžœ žœ˜Kšžœžœ˜šžœ˜Kšœ˜Kšœ˜Kšœ žœ˜K˜——Kšœ˜—K˜šŸœžœžœžœ˜7Kšœ<™Kšžœžœžœ˜š Ÿœžœžœžœžœ˜5Kšžœžœ˜4K˜—šŸœžœžœ˜1Kšœžœ˜Kšœ"˜"šžœžœž˜šžœ ˜šžœ˜JšœI™IKšœ 4˜Lšžœžœ˜ Kšžœ:˜>Kšžœ(˜,—Kšœ˜—Kšžœ!˜%—Kšž˜—K˜—šžœ*žœžœž˜=Kšœ˜Kšžœ˜—K˜—K˜Kšœžœžœ˜K˜šŸœž œ˜"Kšžœ˜Kšœ˜—K˜šœ™Kšœ&žœ ™7Kšœ™—™™5K™F—šœ8™8Kšœ4™4——K˜Kšœ1™1Kšœžœ˜Kšœ*˜*Kšœžœ˜šœžœžœžœ ˜8Kš œžœžœžœ žœžœ˜3—K™K™;Kšœ žœ˜Kšœ"˜"Kšœ žœ˜šœ žœžœžœ ˜0Kš œžœžœžœ žœžœ˜/—K˜Kšœ žœ ]˜rKšœ žœ˜K˜šŸœžœžœ ˜%J™8šžœ   œžœžœ˜*Kšœ˜Kšœ ˜ šž˜Kšœ<žœ ˜JKšœ ˜ Kšžœ˜—Kšžœ˜—K˜—K˜Kšœ=˜=K˜šŸœžœžœ˜Kš 0œ™@Kšœžœ˜Kšœ$˜$šž˜Kšœ˜Kš žœžœžœžœžœ˜FKšžœ%žœ)˜Tšœ ˜ Kšžœ*žœ˜:Kšœ?žœ˜GKšœ žœ*˜?Kšœ˜—Kšžœ%žœ'˜RKšœ˜Kšž˜—K˜K˜—š Ÿœžœžœžœžœžœ˜IJš )œ™9Jš œ F™LKšžœžœžœžœ˜@šžœ  œžœ˜&Kšœ˜KšœDžœ˜VK˜—Kšœ˜—K˜š Ÿ œžœžœžœ žœžœ˜IJšœ7™7Kš žœ œžœžœžœ˜;Kšœ*žœ˜;Kšœ)˜)Kšœ˜Kšœ˜Kšœ žœ˜K˜—K˜šŸ œžœ˜#JšœH™HKšœ žœ˜šžœ˜Jš '™'Kšœžœ˜0Kšœ "˜>Kšžœžœžœ˜)Kšœ˜Jšœ9™9Kšžœ˜—Kšœ˜—K˜šŸœžœžœ˜&Jšœ$™$Kšžœžœ˜4K˜K˜—šŸœžœžœ˜$Jšœ ™ Kšžœ œžœ˜6Kšœ"žœ ˜/Kšœ!˜!Kšœ˜Kšœ˜—K˜š Ÿœžœžœžœžœ˜QKšœ˜Kšœ˜Kšœžœ˜K˜—K˜šŸœžœžœ˜0Kš ™Kš ?™?šžœ.žœžœžœ˜Cšžœžœ˜$Kšžœžœžœ ˜3K˜—Kšžœ˜—Kš ™Kš 3™3š žœžœžœžœžœ˜0Kšœ)˜)šžœžœž˜Kšœ F˜aKšœ žœ˜Kšžœžœžœ ˜4Kšœ ˜ Kšžœ˜—Kšœ˜—K˜K˜—šŸœžœžœ˜:šžœžœžœ˜Kšœ˜Kš #™#Kšžœžœžœžœ˜HKšžœ˜—K˜—K˜šŸœžœžœ˜'šž˜Kšœ#˜#Kšžœ žœ˜0šœ˜Jšœ=™=Jšœo™o—Kšžœžœ0˜FKšœ˜Kšž˜—K˜—K˜Kšœ!žœ˜BKšžœ˜K˜—…—ζ0\