CreateEventDrivenStream: 
PUBLIC 
PROC[]
RETURNS[RealEvent.StreamHandle] =
{ 
RETURN[
NEW[RefAnyStream.Object
←  [procs: eventDrivenStreamProcs,
data: NEW[EventDataRec ← [queueNonEmpty:, queue:, misc: eventDriven[]]]]
 
]
];
 
};
Get: 
PROC[self: RefAnyStream.Handle] 
RETURNS[re: 
REF 
ANY] =
{ RETURN[DoGet[NARROW[self.data]]];
};
 
DoGet: 
ENTRY 
PROC[data: EventDataHandle] 
RETURNS[re: RealEvent.Handle] =
{ q: 
LIST 
OF RealEvent.Handle;
WHILE (q ← data.queue) = NIL DO WAIT data.queueNonEmpty ENDLOOP;
re ← q.first;
data.queue ← q.rest;
 
};
 
Put: 
PROC[self: RefAnyStream.Handle, event: 
REF 
ANY] =
{ DoPut[NARROW[self.data], NARROW[event]];
};
 
DoPut: 
ENTRY 
PROC[data: EventDataHandle, event: RealEvent.Handle] =
{ prev: 
LIST 
OF RealEvent.Handle ← 
NIL;
FOR queue: 
LIST 
OF RealEvent.Handle ← data.queue, queue.rest 
UNTIL queue = 
NIL
DO prev ← queue ENDLOOP;
 
IF prev = NIL THEN data.queue ← CONS[event] ELSE prev.rest ← CONS[event];
NOTIFY data.queueNonEmpty;
 
};
 
PutError: 
PROC[self: RefAnyStream.Handle, event: 
REF 
ANY] =
{ ERROR RefAnyStream.Error[NotImplementedForThisStream];
};
 
NIY: 
PROC[self: RefAnyStream.Handle] 
RETURNS[
BOOLEAN] =
{ ERROR RefAnyStream.Error[NotImplementedForThisStream];
};
 
PutBack: 
PROC[self: RefAnyStream.Handle, event: 
REF 
ANY] =
{ DoPutBack[NARROW[self.data], NARROW[event]];
};
 
DoPutBack: 
ENTRY 
PROC[data: EventDataHandle, event: RealEvent.Handle] =
{ data.queue ← CONS[event, data.queue];
};
 
Flush: 
PROC[self: RefAnyStream.Handle] =
{DoFlush[NARROW[self.data]];
};
 
DoFlush: 
ENTRY 
PROC[data: EventDataHandle] =
{data.queue ← NIL;
};
 
Close: 
PROC[self: RefAnyStream.Handle] =
{DoClose[NARROW[self.data]];
};
 
DoClose: 
ENTRY 
PROC[data: EventDataHandle] =
{
WITH data 
SELECT 
FROM
rev: REF eventDriven EventDataRec => NULL;
rclk: REF timerDriven EventDataRec => rclk.timerToStop ← TRUE;
ENDCASE => ERROR;
 
};
 
 
CreateTimerDrivenStream: 
PUBLIC 
PROC[interval: Seconds, sampler: 
PROC[
REF 
ANY] 
RETURNS[
REAL], stateInfo: 
REF 
ANY ← 
NIL]
RETURNS[h: RealEvent.StreamHandle] =
{ timer: Timer.Handle ← Timer.Create[];
h ← 
NEW[RefAnyStream.Object
←  [procs: timerDrivenStreamProcs,
data: 
NEW[EventDataRec ← [queueNonEmpty:,
queue:,
misc: timerDriven[interval: interval,
sampler: sampler,
stateInfo: stateInfo,
timer: timer]
 
]
]
 
]
 
];
 
EnterCW[h, interval];
 
};
 
EnterCW: 
ENTRY 
PROC[h: RealEvent.StreamHandle, interval: Seconds] =
{ 
FOR cwl: 
LIST 
OF 
REF CWRec ← timerWatchers, cwl.rest 
UNTIL cwl = 
NIL
DO  
IF cwl.first.interval = interval 
THEN cwl.first.streamList ← 
CONS[h, cwl.first.streamList];
RETURN
 
ENDLOOP;
timerWatchers ← CONS[NEW[CWRec ← [interval: interval, streamList: CONS[h]]], timerWatchers];
Process.Detach[FORK TimerWatcher[timerWatchers.first]];
 
};
 
}.