RealEventImpl.mesa, an implementation of time-stamped streams of REAL values
Last Modified On 21-Jan-82 10:10:13 By Paul Rovner
DIRECTORY
Process USING[Detach, Pause, SecondsToTicks],
Real USING[FixI],
RealEvent USING[Handle, StreamHandle, Object],
RefAnyStream USING[ObjectProcsHandle, ObjectProcs, Object, Handle, Error],
Timer USING[Seconds, Handle, Create, Read];
RealEventImpl: MONITOR
IMPORTS Process, Real, RefAnyStream, Timer
EXPORTS RealEvent
= {
Seconds: TYPE = Timer.Seconds;
EventDataHandle: TYPE = REF EventDataRec;
EventDataRec: TYPE = RECORD[
queue: LIST OF RealEvent.Handle ← NIL,
queueNonEmpty: CONDITION,
misc: SELECT tag: * FROM
eventDriven => NULL,
timerDriven => [interval: Seconds,
sampler: PROC[REF ANY] RETURNS[REAL],
stateInfo: REF ANY,
timer: Timer.Handle,
timerToStop: BOOLEANFALSE]
ENDCASE
];
CWRec: TYPE = RECORD[
interval: Seconds,
streamList: LIST OF RealEvent.StreamHandle
];
variables
timerWatchers: LIST OF REF CWRec ← NIL;
eventDrivenStreamProcs: RefAnyStream.ObjectProcsHandle
= NEW[RefAnyStream.ObjectProcs
← [get: Get,
put: Put,
putBack: PutBack,
flush: Flush,
close: Close,
endOf: NIY,
empty: NIY]];
timerDrivenStreamProcs: RefAnyStream.ObjectProcsHandle
= NEW[RefAnyStream.ObjectProcs
← [get: Get,
put: PutError,
putBack: PutBack,
flush: Flush,
close: Close,
endOf: NIY,
empty: NIY]];
PROCEDUREs
CreateEventDrivenStream: PUBLIC PROC[]
RETURNS[RealEvent.StreamHandle] =
{ RETURN[NEW[RefAnyStream.Object
← [procs: eventDrivenStreamProcs,
data: NEW[EventDataRec ← [queueNonEmpty:, queue:, misc: eventDriven[]]]]
]
];
};
Get: PROC[self: RefAnyStream.Handle] RETURNS[re: REF ANY] =
{ RETURN[DoGet[NARROW[self.data]]];
};
DoGet: ENTRY PROC[data: EventDataHandle] RETURNS[re: RealEvent.Handle] =
{ q: LIST OF RealEvent.Handle;
WHILE (q ← data.queue) = NIL DO WAIT data.queueNonEmpty ENDLOOP;
re ← q.first;
data.queue ← q.rest;
};
Put: PROC[self: RefAnyStream.Handle, event: REF ANY] =
{ DoPut[NARROW[self.data], NARROW[event]];
};
DoPut: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] =
{ prev: LIST OF RealEvent.Handle ← NIL;
FOR queue: LIST OF RealEvent.Handle ← data.queue, queue.rest UNTIL queue = NIL
DO prev ← queue ENDLOOP;
IF prev = NIL THEN data.queue ← CONS[event] ELSE prev.rest ← CONS[event];
NOTIFY data.queueNonEmpty;
};
PutError: PROC[self: RefAnyStream.Handle, event: REF ANY] =
{ ERROR RefAnyStream.Error[NotImplementedForThisStream];
};
NIY: PROC[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
{ ERROR RefAnyStream.Error[NotImplementedForThisStream];
};
PutBack: PROC[self: RefAnyStream.Handle, event: REF ANY] =
{ DoPutBack[NARROW[self.data], NARROW[event]];
};
DoPutBack: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] =
{ data.queue ← CONS[event, data.queue];
};
Flush: PROC[self: RefAnyStream.Handle] =
{DoFlush[NARROW[self.data]];
};
DoFlush: ENTRY PROC[data: EventDataHandle] =
{data.queue ← NIL;
};
Close: PROC[self: RefAnyStream.Handle] =
{DoClose[NARROW[self.data]];
};
DoClose: ENTRY PROC[data: EventDataHandle] =
{WITH data SELECT FROM
rev: REF eventDriven EventDataRec => NULL;
rclk: REF timerDriven EventDataRec => rclk.timerToStop ← TRUE;
ENDCASE => ERROR;
};
CreateTimerDrivenStream: PUBLIC PROC[interval: Seconds, sampler: PROC[REF ANY] RETURNS[REAL], stateInfo: REF ANYNIL]
RETURNS[h: RealEvent.StreamHandle] =
{ timer: Timer.Handle ← Timer.Create[];
h ← NEW[RefAnyStream.Object
← [procs: timerDrivenStreamProcs,
data: NEW[EventDataRec ← [queueNonEmpty:,
queue:,
misc: timerDriven[interval: interval,
sampler: sampler,
stateInfo: stateInfo,
timer: timer]
]
]
]
];
EnterCW[h, interval];
};
EnterCW: ENTRY PROC[h: RealEvent.StreamHandle, interval: Seconds] =
{ FOR cwl: LIST OF REF CWRec ← timerWatchers, cwl.rest UNTIL cwl = NIL
DO IF cwl.first.interval = interval THEN cwl.first.streamList ← CONS[h, cwl.first.streamList];
RETURN
ENDLOOP;
timerWatchers ← CONS[NEW[CWRec ← [interval: interval, streamList: CONS[h]]], timerWatchers];
Process.Detach[FORK TimerWatcher[timerWatchers.first]];
};
CleanCW: ENTRY PROC[cw: REF CWRec] =
{prev: LIST OF RealEvent.StreamHandle ← NIL;
FOR rel: LIST OF RealEvent.StreamHandle ← cw.streamList, rel.rest UNTIL rel = NIL
DO data: REF timerDriven EventDataRec = NARROW[rel.first.data];
IF data.timerToStop
THEN {IF prev = NIL
THEN cw.streamList ← rel.rest
ELSE prev.rest ← rel.rest}
ELSE prev ← rel;
ENDLOOP;
};
this guy is FORKED
TimerWatcher: PROC[cw: REF CWRec] =
{UNTIL cw.streamList = NIL
DO
{FOR rel: LIST OF RealEvent.StreamHandle ← cw.streamList, rel.rest UNTIL rel = NIL
DO data: REF timerDriven EventDataRec = NARROW[rel.first.data];
IF data.timerToStop
THEN {CleanCW[cw]; GOTO next}
ELSE {sampleValue: REAL ← data.sampler[data.stateInfo];
time: REAL ← data.timer.Read[].time;
Put[rel.first, NEW[RealEvent.Object ← [sampleValue: sampleValue, time: time]]]};
ENDLOOP;
Process.Pause[Process.SecondsToTicks[Real.FixI[cw.interval]]];
EXITS next => NULL};
ENDLOOP};
}.