DIRECTORY Process USING[Detach, Pause, SecondsToTicks], Real USING[FixI], RealEvent USING[Handle, StreamHandle, Object], RefAnyStream USING[ObjectProcsHandle, ObjectProcs, Object, Handle, Error], Timer USING[Seconds, Handle, Create, Read]; RealEventImpl: MONITOR IMPORTS Process, Real, RefAnyStream, Timer EXPORTS RealEvent = { Seconds: TYPE = Timer.Seconds; EventDataHandle: TYPE = REF EventDataRec; EventDataRec: TYPE = RECORD[ queue: LIST OF RealEvent.Handle _ NIL, queueNonEmpty: CONDITION, misc: SELECT tag: * FROM eventDriven => NULL, timerDriven => [interval: Seconds, sampler: PROC[REF ANY] RETURNS[REAL], stateInfo: REF ANY, timer: Timer.Handle, timerToStop: BOOLEAN _ FALSE] ENDCASE ]; CWRec: TYPE = RECORD[ interval: Seconds, streamList: LIST OF RealEvent.StreamHandle ]; timerWatchers: LIST OF REF CWRec _ NIL; eventDrivenStreamProcs: RefAnyStream.ObjectProcsHandle = NEW[RefAnyStream.ObjectProcs _ [get: Get, put: Put, putBack: PutBack, flush: Flush, close: Close, endOf: NIY, empty: NIY]]; timerDrivenStreamProcs: RefAnyStream.ObjectProcsHandle = NEW[RefAnyStream.ObjectProcs _ [get: Get, put: PutError, putBack: PutBack, flush: Flush, close: Close, endOf: NIY, empty: NIY]]; CreateEventDrivenStream: PUBLIC PROC[] RETURNS[RealEvent.StreamHandle] = { RETURN[NEW[RefAnyStream.Object _ [procs: eventDrivenStreamProcs, data: NEW[EventDataRec _ [queueNonEmpty:, queue:, misc: eventDriven[]]]] ] ]; }; Get: PROC[self: RefAnyStream.Handle] RETURNS[re: REF ANY] = { RETURN[DoGet[NARROW[self.data]]]; }; DoGet: ENTRY PROC[data: EventDataHandle] RETURNS[re: RealEvent.Handle] = { q: LIST OF RealEvent.Handle; WHILE (q _ data.queue) = NIL DO WAIT data.queueNonEmpty ENDLOOP; re _ q.first; data.queue _ q.rest; }; Put: PROC[self: RefAnyStream.Handle, event: REF ANY] = { DoPut[NARROW[self.data], NARROW[event]]; }; DoPut: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] = { prev: LIST OF RealEvent.Handle _ NIL; FOR queue: LIST OF RealEvent.Handle _ data.queue, queue.rest UNTIL queue = NIL DO prev _ queue ENDLOOP; IF prev = NIL THEN data.queue _ CONS[event] ELSE prev.rest _ CONS[event]; NOTIFY data.queueNonEmpty; }; PutError: PROC[self: RefAnyStream.Handle, event: REF ANY] = { ERROR RefAnyStream.Error[NotImplementedForThisStream]; }; NIY: PROC[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = { ERROR RefAnyStream.Error[NotImplementedForThisStream]; }; PutBack: PROC[self: RefAnyStream.Handle, event: REF ANY] = { DoPutBack[NARROW[self.data], NARROW[event]]; }; DoPutBack: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] = { data.queue _ CONS[event, data.queue]; }; Flush: PROC[self: RefAnyStream.Handle] = {DoFlush[NARROW[self.data]]; }; DoFlush: ENTRY PROC[data: EventDataHandle] = {data.queue _ NIL; }; Close: PROC[self: RefAnyStream.Handle] = {DoClose[NARROW[self.data]]; }; DoClose: ENTRY PROC[data: EventDataHandle] = {WITH data SELECT FROM rev: REF eventDriven EventDataRec => NULL; rclk: REF timerDriven EventDataRec => rclk.timerToStop _ TRUE; ENDCASE => ERROR; }; CreateTimerDrivenStream: PUBLIC PROC[interval: Seconds, sampler: PROC[REF ANY] RETURNS[REAL], stateInfo: REF ANY _ NIL] RETURNS[h: RealEvent.StreamHandle] = { timer: Timer.Handle _ Timer.Create[]; h _ NEW[RefAnyStream.Object _ [procs: timerDrivenStreamProcs, data: NEW[EventDataRec _ [queueNonEmpty:, queue:, misc: timerDriven[interval: interval, sampler: sampler, stateInfo: stateInfo, timer: timer] ] ] ] ]; EnterCW[h, interval]; }; EnterCW: ENTRY PROC[h: RealEvent.StreamHandle, interval: Seconds] = { FOR cwl: LIST OF REF CWRec _ timerWatchers, cwl.rest UNTIL cwl = NIL DO IF cwl.first.interval = interval THEN cwl.first.streamList _ CONS[h, cwl.first.streamList]; RETURN ENDLOOP; timerWatchers _ CONS[NEW[CWRec _ [interval: interval, streamList: CONS[h]]], timerWatchers]; Process.Detach[FORK TimerWatcher[timerWatchers.first]]; }; CleanCW: ENTRY PROC[cw: REF CWRec] = {prev: LIST OF RealEvent.StreamHandle _ NIL; FOR rel: LIST OF RealEvent.StreamHandle _ cw.streamList, rel.rest UNTIL rel = NIL DO data: REF timerDriven EventDataRec = NARROW[rel.first.data]; IF data.timerToStop THEN {IF prev = NIL THEN cw.streamList _ rel.rest ELSE prev.rest _ rel.rest} ELSE prev _ rel; ENDLOOP; }; TimerWatcher: PROC[cw: REF CWRec] = {UNTIL cw.streamList = NIL DO {FOR rel: LIST OF RealEvent.StreamHandle _ cw.streamList, rel.rest UNTIL rel = NIL DO data: REF timerDriven EventDataRec = NARROW[rel.first.data]; IF data.timerToStop THEN {CleanCW[cw]; GOTO next} ELSE {sampleValue: REAL _ data.sampler[data.stateInfo]; time: REAL _ data.timer.Read[].time; Put[rel.first, NEW[RealEvent.Object _ [sampleValue: sampleValue, time: time]]]}; ENDLOOP; Process.Pause[Process.SecondsToTicks[Real.FixI[cw.interval]]]; EXITS next => NULL}; ENDLOOP}; }. ®RealEventImpl.mesa, an implementation of time-stamped streams of REAL values Last Modified On 21-Jan-82 10:10:13 By Paul Rovner variables PROCEDUREs this guy is FORKED ʘJšœL™LJšœ2™2J˜šÏk ˜ Jšœœ ˜-Jšœœ˜Jšœ œ˜.Jšœ œ8˜JJšœœ ˜+J˜—šœ˜Jšœ#˜*Jšœ ˜—˜Jšœ œ˜J˜Jšœœœ˜)J˜šœœœ˜Jšœœœœ˜&Jšœ œ˜šœœ˜Jšœœ˜˜"Jš œ œœœœœ˜%Jšœ œœ˜J˜Jšœ œœ˜—Jš˜—J˜J˜—šœœœ˜J˜Jšœ œœ˜*J˜J˜J˜——Jšœ ™ J˜Jš œœœœ œ˜'J˜˜6šœœ˜˜ J˜ J˜J˜ J˜ Jšœœ˜ Jšœœ˜ J˜———˜6šœœ˜˜ J˜J˜J˜ J˜ Jšœœ˜ Jšœœ˜ J˜———Jšœ ™ ˜šÏnœœœ˜&Jšœ˜!šœœœ˜ ˜"Jšœœ?˜H—J˜J˜—J˜J˜š žœœœœœ˜;Jšœœœ˜#J˜J˜—šžœœœœ˜Hšœœœ˜Jš œœœœœ˜@J˜ J˜—J˜J˜J˜—šžœœ#œœ˜6Jšœœ œ ˜*J˜J˜—šžœœœ2˜Cšœœœœ˜'š œœœ+œ ˜NJšœœ˜—Jš œœœœœ œ˜IJšœ˜—J˜J˜—šžœœ#œœ˜;Jšœœ1˜8J˜J˜—šœœœœ˜7Jšœœ1˜8J˜J˜—šžœœ#œœ˜:Jšœ œ œ ˜.J˜J˜—šž œœœ2˜GJšœœ˜'J˜J˜—šžœœ˜(Jšœ œ ˜J˜J˜—šžœœœ˜,Jšœœ˜J˜J˜—šžœœ˜(Jšœ œ ˜J˜J˜—šžœœœ˜,šœœœ˜Jšœœœ˜*Jšœœ0œ˜>Jšœœ˜—J˜J˜——šžœœœœœœœœœœœ˜wJšœ˜$˜'šœœ˜˜"šœœ ˜)J˜˜%J˜J˜J˜ —J˜J˜—J˜—J˜—J˜—J˜J˜J˜—šžœœœ0˜Cš œœœœœ!œ˜Fšœœœœ˜_Jš˜—Jšœ˜J˜Jšœœœ*œ˜\Jšœœ$˜7—J˜J˜—šžœœœœ ˜$šœœœœ˜,š œœœ2œ˜Qšœœœ˜?šœ˜šœœ˜Jšœ˜Jšœ˜—Jšœ ˜——Jšœ˜——J˜˜Jšœ™——šž œœœ ˜#šœœ˜š˜š œœœœ2œ˜Ršœœœ˜?šœ˜Jšœœ˜šœœ ˜7Jšœœ˜$Jšœœ>˜P———Jšœ˜J˜>Jšœ œ˜——Jšœ˜ J˜——J˜J˜——…—¦s