CreateEventDrivenStream:
PUBLIC
PROC[]
RETURNS[RealEvent.StreamHandle] =
{
RETURN[
NEW[RefAnyStream.Object
← [procs: eventDrivenStreamProcs,
data: NEW[EventDataRec ← [queueNonEmpty:, queue:, misc: eventDriven[]]]]
]
];
};
Get:
PROC[self: RefAnyStream.Handle]
RETURNS[re:
REF
ANY] =
{ RETURN[DoGet[NARROW[self.data]]];
};
DoGet:
ENTRY
PROC[data: EventDataHandle]
RETURNS[re: RealEvent.Handle] =
{ q:
LIST
OF RealEvent.Handle;
WHILE (q ← data.queue) = NIL DO WAIT data.queueNonEmpty ENDLOOP;
re ← q.first;
data.queue ← q.rest;
};
Put:
PROC[self: RefAnyStream.Handle, event:
REF
ANY] =
{ DoPut[NARROW[self.data], NARROW[event]];
};
DoPut:
ENTRY
PROC[data: EventDataHandle, event: RealEvent.Handle] =
{ prev:
LIST
OF RealEvent.Handle ←
NIL;
FOR queue:
LIST
OF RealEvent.Handle ← data.queue, queue.rest
UNTIL queue =
NIL
DO prev ← queue ENDLOOP;
IF prev = NIL THEN data.queue ← CONS[event] ELSE prev.rest ← CONS[event];
NOTIFY data.queueNonEmpty;
};
PutError:
PROC[self: RefAnyStream.Handle, event:
REF
ANY] =
{ ERROR RefAnyStream.Error[NotImplementedForThisStream];
};
NIY:
PROC[self: RefAnyStream.Handle]
RETURNS[
BOOLEAN] =
{ ERROR RefAnyStream.Error[NotImplementedForThisStream];
};
PutBack:
PROC[self: RefAnyStream.Handle, event:
REF
ANY] =
{ DoPutBack[NARROW[self.data], NARROW[event]];
};
DoPutBack:
ENTRY
PROC[data: EventDataHandle, event: RealEvent.Handle] =
{ data.queue ← CONS[event, data.queue];
};
Flush:
PROC[self: RefAnyStream.Handle] =
{DoFlush[NARROW[self.data]];
};
DoFlush:
ENTRY
PROC[data: EventDataHandle] =
{data.queue ← NIL;
};
Close:
PROC[self: RefAnyStream.Handle] =
{DoClose[NARROW[self.data]];
};
DoClose:
ENTRY
PROC[data: EventDataHandle] =
{
WITH data
SELECT
FROM
rev: REF eventDriven EventDataRec => NULL;
rclk: REF timerDriven EventDataRec => rclk.timerToStop ← TRUE;
ENDCASE => ERROR;
};
CreateTimerDrivenStream:
PUBLIC
PROC[interval: Seconds, sampler:
PROC[
REF
ANY]
RETURNS[
REAL], stateInfo:
REF
ANY ←
NIL]
RETURNS[h: RealEvent.StreamHandle] =
{ timer: Timer.Handle ← Timer.Create[];
h ←
NEW[RefAnyStream.Object
← [procs: timerDrivenStreamProcs,
data:
NEW[EventDataRec ← [queueNonEmpty:,
queue:,
misc: timerDriven[interval: interval,
sampler: sampler,
stateInfo: stateInfo,
timer: timer]
]
]
]
];
EnterCW[h, interval];
};
EnterCW:
ENTRY
PROC[h: RealEvent.StreamHandle, interval: Seconds] =
{
FOR cwl:
LIST
OF
REF CWRec ← timerWatchers, cwl.rest
UNTIL cwl =
NIL
DO
IF cwl.first.interval = interval
THEN cwl.first.streamList ←
CONS[h, cwl.first.streamList];
RETURN
ENDLOOP;
timerWatchers ← CONS[NEW[CWRec ← [interval: interval, streamList: CONS[h]]], timerWatchers];
Process.Detach[FORK TimerWatcher[timerWatchers.first]];
};
}.