<> <> DIRECTORY Process USING[Detach, Pause, SecondsToTicks], Real USING[FixI], RealEvent USING[Handle, StreamHandle, Object], RefAnyStream USING[ObjectProcsHandle, ObjectProcs, Object, Handle, Error], Timer USING[Seconds, Handle, Create, Read]; RealEventImpl: MONITOR IMPORTS Process, Real, RefAnyStream, Timer EXPORTS RealEvent = { Seconds: TYPE = Timer.Seconds; EventDataHandle: TYPE = REF EventDataRec; EventDataRec: TYPE = RECORD[ queue: LIST OF RealEvent.Handle _ NIL, queueNonEmpty: CONDITION, misc: SELECT tag: * FROM eventDriven => NULL, timerDriven => [interval: Seconds, sampler: PROC[REF ANY] RETURNS[REAL], stateInfo: REF ANY, timer: Timer.Handle, timerToStop: BOOLEAN _ FALSE] ENDCASE ]; CWRec: TYPE = RECORD[ interval: Seconds, streamList: LIST OF RealEvent.StreamHandle ]; <> timerWatchers: LIST OF REF CWRec _ NIL; eventDrivenStreamProcs: RefAnyStream.ObjectProcsHandle = NEW[RefAnyStream.ObjectProcs _ [get: Get, put: Put, putBack: PutBack, flush: Flush, close: Close, endOf: NIY, empty: NIY]]; timerDrivenStreamProcs: RefAnyStream.ObjectProcsHandle = NEW[RefAnyStream.ObjectProcs _ [get: Get, put: PutError, putBack: PutBack, flush: Flush, close: Close, endOf: NIY, empty: NIY]]; <> CreateEventDrivenStream: PUBLIC PROC[] RETURNS[RealEvent.StreamHandle] = { RETURN[NEW[RefAnyStream.Object _ [procs: eventDrivenStreamProcs, data: NEW[EventDataRec _ [queueNonEmpty:, queue:, misc: eventDriven[]]]] ] ]; }; Get: PROC[self: RefAnyStream.Handle] RETURNS[re: REF ANY] = { RETURN[DoGet[NARROW[self.data]]]; }; DoGet: ENTRY PROC[data: EventDataHandle] RETURNS[re: RealEvent.Handle] = { q: LIST OF RealEvent.Handle; WHILE (q _ data.queue) = NIL DO WAIT data.queueNonEmpty ENDLOOP; re _ q.first; data.queue _ q.rest; }; Put: PROC[self: RefAnyStream.Handle, event: REF ANY] = { DoPut[NARROW[self.data], NARROW[event]]; }; DoPut: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] = { prev: LIST OF RealEvent.Handle _ NIL; FOR queue: LIST OF RealEvent.Handle _ data.queue, queue.rest UNTIL queue = NIL DO prev _ queue ENDLOOP; IF prev = NIL THEN data.queue _ CONS[event] ELSE prev.rest _ CONS[event]; NOTIFY data.queueNonEmpty; }; PutError: PROC[self: RefAnyStream.Handle, event: REF ANY] = { ERROR RefAnyStream.Error[NotImplementedForThisStream]; }; NIY: PROC[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = { ERROR RefAnyStream.Error[NotImplementedForThisStream]; }; PutBack: PROC[self: RefAnyStream.Handle, event: REF ANY] = { DoPutBack[NARROW[self.data], NARROW[event]]; }; DoPutBack: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] = { data.queue _ CONS[event, data.queue]; }; Flush: PROC[self: RefAnyStream.Handle] = {DoFlush[NARROW[self.data]]; }; DoFlush: ENTRY PROC[data: EventDataHandle] = {data.queue _ NIL; }; Close: PROC[self: RefAnyStream.Handle] = {DoClose[NARROW[self.data]]; }; DoClose: ENTRY PROC[data: EventDataHandle] = {WITH data SELECT FROM rev: REF eventDriven EventDataRec => NULL; rclk: REF timerDriven EventDataRec => rclk.timerToStop _ TRUE; ENDCASE => ERROR; }; CreateTimerDrivenStream: PUBLIC PROC[interval: Seconds, sampler: PROC[REF ANY] RETURNS[REAL], stateInfo: REF ANY _ NIL] RETURNS[h: RealEvent.StreamHandle] = { timer: Timer.Handle _ Timer.Create[]; h _ NEW[RefAnyStream.Object _ [procs: timerDrivenStreamProcs, data: NEW[EventDataRec _ [queueNonEmpty:, queue:, misc: timerDriven[interval: interval, sampler: sampler, stateInfo: stateInfo, timer: timer] ] ] ] ]; EnterCW[h, interval]; }; EnterCW: ENTRY PROC[h: RealEvent.StreamHandle, interval: Seconds] = { FOR cwl: LIST OF REF CWRec _ timerWatchers, cwl.rest UNTIL cwl = NIL DO IF cwl.first.interval = interval THEN cwl.first.streamList _ CONS[h, cwl.first.streamList]; RETURN ENDLOOP; timerWatchers _ CONS[NEW[CWRec _ [interval: interval, streamList: CONS[h]]], timerWatchers]; Process.Detach[FORK TimerWatcher[timerWatchers.first]]; }; CleanCW: ENTRY PROC[cw: REF CWRec] = {prev: LIST OF RealEvent.StreamHandle _ NIL; FOR rel: LIST OF RealEvent.StreamHandle _ cw.streamList, rel.rest UNTIL rel = NIL DO data: REF timerDriven EventDataRec = NARROW[rel.first.data]; IF data.timerToStop THEN {IF prev = NIL THEN cw.streamList _ rel.rest ELSE prev.rest _ rel.rest} ELSE prev _ rel; ENDLOOP; }; <> TimerWatcher: PROC[cw: REF CWRec] = {UNTIL cw.streamList = NIL DO {FOR rel: LIST OF RealEvent.StreamHandle _ cw.streamList, rel.rest UNTIL rel = NIL DO data: REF timerDriven EventDataRec = NARROW[rel.first.data]; IF data.timerToStop THEN {CleanCW[cw]; GOTO next} ELSE {sampleValue: REAL _ data.sampler[data.stateInfo]; time: REAL _ data.timer.Read[].time; Put[rel.first, NEW[RealEvent.Object _ [sampleValue: sampleValue, time: time]]]}; ENDLOOP; Process.Pause[Process.SecondsToTicks[Real.FixI[cw.interval]]]; EXITS next => NULL}; ENDLOOP}; }.