-- RealEventImpl.mesa, an implementation of time-stamped streams of REAL values
-- Last Modified On 21-Jan-82 10:10:13 By Paul Rovner

DIRECTORY
    Process USING[Detach, Pause, SecondsToTicks],
    Real USING[FixI],
    RealEvent USING[Handle, StreamHandle, Object],
    RefAnyStream USING[ObjectProcsHandle, ObjectProcs, Object, Handle, Error],
    Timer USING[Seconds, Handle, Create, Read];
    
RealEventImpl: MONITOR
  IMPORTS Process, Real, RefAnyStream, Timer
  EXPORTS RealEvent
= {
   Seconds: TYPE = Timer.Seconds;

   EventDataHandle: TYPE = REF EventDataRec;

   EventDataRec: TYPE = RECORD[
	queue: LIST OF RealEvent.Handle ← NIL,
	queueNonEmpty: CONDITION,
	misc: SELECT tag: * FROM
	        eventDriven => NULL,
		timerDriven => [interval: Seconds,
		                sampler: PROC[REF ANY] RETURNS[REAL],
			        stateInfo: REF ANY,
			        timer: Timer.Handle,
			        timerToStop: BOOLEAN ← FALSE]
	       ENDCASE
	];
     
   CWRec: TYPE = RECORD[
	interval: Seconds,
	streamList: LIST OF RealEvent.StreamHandle
	];
     

-- variables

timerWatchers: LIST OF REF CWRec ← NIL;

eventDrivenStreamProcs: RefAnyStream.ObjectProcsHandle
                                = NEW[RefAnyStream.ObjectProcs
                                                ← [get: Get,
		                                   put: Put,
		                                   putBack: PutBack,
		                                   flush: Flush,
		                                   close: Close,
						   endOf: NIY,
						   empty: NIY]];

timerDrivenStreamProcs: RefAnyStream.ObjectProcsHandle
                                = NEW[RefAnyStream.ObjectProcs
                                                ← [get: Get,
		                                   put: PutError,
		                                   putBack: PutBack,
		                                   flush: Flush,
		                                   close: Close,
						   endOf: NIY,
						   empty: NIY]];

-- PROCEDUREs

   CreateEventDrivenStream: PUBLIC PROC[]
         RETURNS[RealEvent.StreamHandle] =
     { RETURN[NEW[RefAnyStream.Object
                    ←  [procs: eventDrivenStreamProcs,
		        data: NEW[EventDataRec ← [queueNonEmpty:, queue:, misc: eventDriven[]]]]
		 ]
	     ];
     };
   
    Get: PROC[self: RefAnyStream.Handle] RETURNS[re: REF ANY] =
      { RETURN[DoGet[NARROW[self.data]]];
      };

    DoGet: ENTRY PROC[data: EventDataHandle] RETURNS[re: RealEvent.Handle] =
      { q: LIST OF RealEvent.Handle;
        WHILE (q ← data.queue) = NIL DO WAIT data.queueNonEmpty ENDLOOP;
        re ← q.first;
	data.queue ← q.rest;
      };
      
      
    Put: PROC[self: RefAnyStream.Handle, event: REF ANY] =
      { DoPut[NARROW[self.data], NARROW[event]];
      };
      
    DoPut: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] =
      { prev: LIST OF RealEvent.Handle ← NIL;
        FOR queue: LIST OF RealEvent.Handle ← data.queue, queue.rest UNTIL queue = NIL
          DO prev ← queue ENDLOOP;
	IF prev = NIL THEN data.queue ← CONS[event] ELSE prev.rest ← CONS[event];
	NOTIFY data.queueNonEmpty;
      };
      
    PutError: PROC[self: RefAnyStream.Handle, event: REF ANY] =
      { ERROR RefAnyStream.Error[NotImplementedForThisStream];
      };
      
    NIY: PROC[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
      { ERROR RefAnyStream.Error[NotImplementedForThisStream];
      };
      
    PutBack: PROC[self: RefAnyStream.Handle, event: REF ANY] =
      { DoPutBack[NARROW[self.data], NARROW[event]];
      };
      
    DoPutBack: ENTRY PROC[data: EventDataHandle, event: RealEvent.Handle] =
      { data.queue ← CONS[event, data.queue];
      };
      
    Flush: PROC[self: RefAnyStream.Handle] =
      {DoFlush[NARROW[self.data]];
      };
      
    DoFlush: ENTRY PROC[data: EventDataHandle] =
      {data.queue ← NIL;
      };
      
    Close: PROC[self: RefAnyStream.Handle] =
      {DoClose[NARROW[self.data]];
      };

    DoClose: ENTRY PROC[data: EventDataHandle] =
      {WITH data SELECT FROM
         rev: REF eventDriven EventDataRec => NULL;
	 rclk: REF timerDriven EventDataRec => rclk.timerToStop ← TRUE;
	ENDCASE => ERROR;
      };

   CreateTimerDrivenStream: PUBLIC PROC[interval: Seconds, sampler: PROC[REF ANY] RETURNS[REAL], stateInfo: REF ANY ← NIL]
         RETURNS[h: RealEvent.StreamHandle] =
     { timer: Timer.Handle ← Timer.Create[];
       h ← NEW[RefAnyStream.Object
                    ←  [procs: timerDrivenStreamProcs,
		        data: NEW[EventDataRec ← [queueNonEmpty:,
			                          queue:,
			                          misc: timerDriven[interval: interval,
						                    sampler: sampler,
								    stateInfo: stateInfo,
								    timer: timer]
						 ]
				 ]
	               ]
	     ];
       EnterCW[h, interval];
     };
   

   EnterCW: ENTRY PROC[h: RealEvent.StreamHandle, interval: Seconds] =
     { FOR cwl: LIST OF REF CWRec ← timerWatchers, cwl.rest UNTIL cwl = NIL
         DO  IF cwl.first.interval = interval THEN cwl.first.streamList ← CONS[h, cwl.first.streamList];
	     RETURN
	ENDLOOP;
      
       timerWatchers ← CONS[NEW[CWRec ← [interval: interval, streamList: CONS[h]]], timerWatchers];
       Process.Detach[FORK TimerWatcher[timerWatchers.first]];
     };
     
   CleanCW: ENTRY PROC[cw: REF CWRec] =
     {prev: LIST OF RealEvent.StreamHandle ← NIL;
      FOR rel: LIST OF RealEvent.StreamHandle ← cw.streamList, rel.rest UNTIL rel = NIL
	   DO data: REF timerDriven EventDataRec = NARROW[rel.first.data];
	      IF data.timerToStop
	       THEN {IF prev = NIL
		       THEN cw.streamList ← rel.rest
		       ELSE prev.rest ← rel.rest}
	       ELSE prev ← rel;
       ENDLOOP;
     };
     
      -- this guy is FORKED
   TimerWatcher: PROC[cw: REF CWRec] =
     {UNTIL cw.streamList = NIL
       DO
         {FOR rel: LIST OF RealEvent.StreamHandle ← cw.streamList, rel.rest UNTIL rel = NIL
	    DO data: REF timerDriven EventDataRec = NARROW[rel.first.data];
	       IF data.timerToStop
	        THEN {CleanCW[cw]; GOTO next}
	        ELSE {sampleValue: REAL ← data.sampler[data.stateInfo];
		      time: REAL ← data.timer.Read[].time;
	              Put[rel.first, NEW[RealEvent.Object ← [sampleValue: sampleValue, time: time]]]};
	   ENDLOOP;
	  Process.Pause[Process.SecondsToTicks[Real.FixI[cw.interval]]];
	  EXITS next => NULL};
      ENDLOOP};

 }.