-- RealEventImpl.mesa, an implementation of time-stamped streams of REAL values -- Last Modified On 21-Jan-82 10:10:13 By Paul Rovner DIRECTORY Process USING[Detach, Pause, SecondsToTicks], Real USING[FixI], RealEvent USING[Handle, StreamHandle, Object], RefAnyStream USING[ObjectProcsHandle, ObjectProcs, Object, Handle, Error], Timer USING[Seconds, Handle, Create, Read]; RealEventImpl: MONITOR IMPORTS Process, Real, RefAnyStream, Timer EXPORTS RealEvent = { Seconds: TYPE = Timer.Seconds; EventDataHandle: TYPE = REF EventDataRec; EventDataRec: TYPE = RECORD[ queue: LIST OF RealEvent.Handle ← NIL, queueNonEmpty: CONDITION, misc: SELECT tag: * FROM eventDriven => NULL, timerDriven => [interval: Seconds, sampler: PROC[REF ANY] RETURNS[REAL], stateInfo: REF ANY, timer: Timer.Handle, timerToStop: BOOLEAN ← FALSE] ENDCASE ]; CWRec: TYPE = RECORD[ interval: Seconds, streamList: LIST OF RealEvent.StreamHandle ]; -- variables timerWatchers: LIST OF REF CWRec ← NIL; eventDrivenStreamProcs: RefAnyStream.ObjectProcsHandle = NEW[RefAnyStream.ObjectProcs ← [get: Get, put: Put, putBack: PutBack, flush: Flush, close: Close, endOf: NIY, empty: NIY]]; timerDrivenStreamProcs: RefAnyStream.ObjectProcsHandle = NEW[RefAnyStream.ObjectProcs ← [get: Get, put: PutError, putBack: PutBack, flush: Flush, close: Close, endOf: NIY, empty: NIY]]; -- PROCEDUREs CreateEventDrivenStream: PUBLIC PROC[] RETURNS[RealEvent.StreamHandle] = { RETURN[NEW[RefAnyStream.Object ← [procs: eventDrivenStreamProcs, data: NEW[EventDataRec ← [queueNonEmpty:, queue:, misc: eventDriven[]]]] ] ]; }; Get: PROC[self: RefAnyStream.Handle] RETURNS[re: REF ANY] = { RETURN[DoGet[NARROW[self.data]]]; }; DoGet: ENTRY PROC[data: EventDataHandle] RETURNS[re: RealEvent.Handle] = { q: LIST OF RealEvent.Handle; WHILE (q ← data.queue) = NIL DO WAIT data.queueNonEmpty ENDLOOP; re ← q.first; data.queue ← q.rest; }; Put: PROC[self: RefAnyStream.Handle, event: REF ANY] = { DoPut[NARROW[self.data], NARROW[event]]; }; DoPut: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] = { prev: LIST OF RealEvent.Handle ← NIL; FOR queue: LIST OF RealEvent.Handle ← data.queue, queue.rest UNTIL queue = NIL DO prev ← queue ENDLOOP; IF prev = NIL THEN data.queue ← CONS[event] ELSE prev.rest ← CONS[event]; NOTIFY data.queueNonEmpty; }; PutError: PROC[self: RefAnyStream.Handle, event: REF ANY] = { ERROR RefAnyStream.Error[NotImplementedForThisStream]; }; NIY: PROC[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = { ERROR RefAnyStream.Error[NotImplementedForThisStream]; }; PutBack: PROC[self: RefAnyStream.Handle, event: REF ANY] = { DoPutBack[NARROW[self.data], NARROW[event]]; }; DoPutBack: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] = { data.queue ← CONS[event, data.queue]; }; Flush: PROC[self: RefAnyStream.Handle] = {DoFlush[NARROW[self.data]]; }; DoFlush: ENTRY PROC[data: EventDataHandle] = {data.queue ← NIL; }; Close: PROC[self: RefAnyStream.Handle] = {DoClose[NARROW[self.data]]; }; DoClose: ENTRY PROC[data: EventDataHandle] = {WITH data SELECT FROM rev: REF eventDriven EventDataRec => NULL; rclk: REF timerDriven EventDataRec => rclk.timerToStop ← TRUE; ENDCASE => ERROR; }; CreateTimerDrivenStream: PUBLIC PROC[interval: Seconds, sampler: PROC[REF ANY] RETURNS[REAL], stateInfo: REF ANY ← NIL] RETURNS[h: RealEvent.StreamHandle] = { timer: Timer.Handle ← Timer.Create[]; h ← NEW[RefAnyStream.Object ← [procs: timerDrivenStreamProcs, data: NEW[EventDataRec ← [queueNonEmpty:, queue:, misc: timerDriven[interval: interval, sampler: sampler, stateInfo: stateInfo, timer: timer] ] ] ] ]; EnterCW[h, interval]; }; EnterCW: ENTRY PROC[h: RealEvent.StreamHandle, interval: Seconds] = { FOR cwl: LIST OF REF CWRec ← timerWatchers, cwl.rest UNTIL cwl = NIL DO IF cwl.first.interval = interval THEN cwl.first.streamList ← CONS[h, cwl.first.streamList]; RETURN ENDLOOP; timerWatchers ← CONS[NEW[CWRec ← [interval: interval, streamList: CONS[h]]], timerWatchers]; Process.Detach[FORK TimerWatcher[timerWatchers.first]]; }; CleanCW: ENTRY PROC[cw: REF CWRec] = {prev: LIST OF RealEvent.StreamHandle ← NIL; FOR rel: LIST OF RealEvent.StreamHandle ← cw.streamList, rel.rest UNTIL rel = NIL DO data: REF timerDriven EventDataRec = NARROW[rel.first.data]; IF data.timerToStop THEN {IF prev = NIL THEN cw.streamList ← rel.rest ELSE prev.rest ← rel.rest} ELSE prev ← rel; ENDLOOP; }; -- this guy is FORKED TimerWatcher: PROC[cw: REF CWRec] = {UNTIL cw.streamList = NIL DO {FOR rel: LIST OF RealEvent.StreamHandle ← cw.streamList, rel.rest UNTIL rel = NIL DO data: REF timerDriven EventDataRec = NARROW[rel.first.data]; IF data.timerToStop THEN {CleanCW[cw]; GOTO next} ELSE {sampleValue: REAL ← data.sampler[data.stateInfo]; time: REAL ← data.timer.Read[].time; Put[rel.first, NEW[RealEvent.Object ← [sampleValue: sampleValue, time: time]]]}; ENDLOOP; Process.Pause[Process.SecondsToTicks[Real.FixI[cw.interval]]]; EXITS next => NULL}; ENDLOOP}; }.