-- RefAnyStreamImpl.mesa
-- Last Modified On 22-Dec-81 14:30:06 By Sturgis
-- Last Modified On 21-Jan-82 12:02:44 By Paul Rovner

DIRECTORY
    CoFork USING[CoForkMe],
    RefAnyStream USING[Handle, Object, ObjectProcs, ErrorCode];
    
RefAnyStreamImpl: MONITOR  LOCKS NARROW[self.data, StreamData] USING self: RefAnyStream.Handle
  IMPORTS CoFork
  EXPORTS RefAnyStream =
 BEGIN
 
 -- nomenclature
 	-- sink is associated with the co-forked procedure
	-- source, stopper are associated with the client
	
	-- producer puts ref-anys
	-- consumer gets ref-anys
	
	-- co-fork refers to the "co-forked" call stack, this side signals end by returning
	-- client refers to the non "co-forked" code, this side signals end by calling stopper.
	
	-- if the producer is co-forked, then the producer will put ref-anys to the sink
	-- it the producer is the client, then the producer will put ref-anys to the source
 
-- errors
Error: PUBLIC ERROR [ec: RefAnyStream.ErrorCode] = CODE;

-- port declarations, must follow a precise pattern specified by CoFork

	-- aItems go from client to co-fork, bItems go from co-fork to client


 RefAnySource: TYPE = POINTER TO PORT[continue: BOOLEAN, aItem: REF ANY] RETURNS[more: BOOLEAN, bItem: REF ANY]; -- used by client
 RefAnySink: TYPE = POINTER TO PORT[more: BOOLEAN, bItem: REF ANY] RETURNS[continue: BOOLEAN, aItem: REF ANY]; -- used by co-fork
 RefAnyStopper: TYPE = PROCEDURE[continue: BOOLEAN, aItem: REF ANY]; -- used by client
 
 -- the following record type is used by all stream types so that I can compile all in one module, so that the locks expression is uniform for all entry procedures  (ugh!!!)  note that some fields are unused in each case.
 
 StreamData: TYPE = REF StreamDataBody;
 
 StreamDataBody: TYPE = MONITORED RECORD[
  -- following fields are used in the client stream
    source: RefAnySource,
    stop: RefAnyStopper,
 
  -- following fields are used by the consumer
    itemReady: BOOLEAN,
    item: REF ANY,
    
  -- following fields are used in the co-forked stream
    sink: RefAnySink];
 
 
 
 -- RefAnyStream implementation seen by a co-forked RefAny producer
 
 
 CoForkedProducerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: CoForkedProducerGet, put: CoForkedProducerPut, putBack:  CoForkedProducerPutBack, flush: CoForkedProducerFlush, close: CoForkedProducerClose, endOf: CoForkedProducerEndOf, empty: ErrProc]];
 
 ErrProc: PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
    {ERROR Error[NotImplementedForThisStream]};
 
 CoForkedProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
 
 CoForkedProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    IF d.sink # NIL THEN {IF NOT (d.sink[TRUE, event]).continue THEN d.sink ← NIL} ELSE ERROR Error[StreamClosed];
    END;
 
 CoForkedProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
 
 CoForkedProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
 
 CoForkedProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    d.sink ← NIL;
    END;
    
 CoForkedProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    RETURN[d.sink=NIL];
    END;
    
    
    
 --  RefAnyStream implementation seen by a client consumer
 
  
 ClientConsumerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: ClientConsumerGet, put: ClientConsumerPut, putBack: ClientConsumerPutBack, flush: ClientConsumerFlush, close: ClientConsumerClose, endOf: ClientConsumerEndOf, empty: ErrProc]];
 
 ClientConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    IF d.source = NIL THEN ERROR Error[StreamClosed];
    IF d.itemReady THEN {d.itemReady ← FALSE; RETURN[d.item]};
    [d.itemReady, d.item] ← d.source[TRUE, NIL];
    IF d.itemReady
     THEN {d.itemReady ← FALSE; RETURN[d.item]}
     ELSE {d.stop[FALSE, NIL];
           d.source ← NIL;
           ERROR Error[StreamClosed]};
    END;
 
 ClientConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
 
 ClientConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented
 
 ClientConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
    
 ClientConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
       BEGIN ENABLE UNWIND => NULL; 
       d: StreamData ← NARROW[self.data];
       IF d.itemReady THEN RETURN[FALSE];
       IF d.source = NIL THEN RETURN[TRUE];
       [d.itemReady, d.item] ← d.source[TRUE, NIL];
       RETURN[NOT d.itemReady]
       END;
 
 ClientConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    IF d.source # NIL
     THEN {d.stop[FALSE, NIL];
	   d.source ← NIL};
    END;
    
   
 
 
 -- co-forked producer client code
  
 MakeCoForkedProducer: PROCEDURE[producer: PROC[REF ANY, RefAnyStream.Handle], data: REF ANY] RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] =
    BEGIN
    sink: RefAnySink ← CoFork.CoForkMe[];
    d: StreamData ← NEW[StreamDataBody ← [ , NIL, NIL, FALSE, NIL, sink]];
    sinkStream: RefAnyStream.Handle;
    IF NOT (sink[FALSE, NIL]).continue THEN d.sink ← NIL;
    sinkStream ← NEW[RefAnyStream.Object ← [procs: CoForkedProducerStreamProcs, data: d]];
    producer[data, sinkStream];
    END;
 
 CreateCoForkedProducerStream: PUBLIC PROC[producer: PROC[self: REF ANY, sink: RefAnyStream.Handle--for use by the co-forked producer--], data: REF ANY]
         RETURNS[RefAnyStream.Handle--for use by the client consumer--] =
    BEGIN
    source: RefAnySource;
    stop: RefAnyStopper;
    d: StreamData;
    sourceStream: RefAnyStream.Handle;
    [source, stop] ← MakeCoForkedProducer[producer, data];
    d ← NEW[StreamDataBody ← [ , source, stop, FALSE, NIL, NIL]];
    sourceStream ← NEW[RefAnyStream.Object ← [procs: ClientConsumerStreamProcs, data: d]];
    RETURN[sourceStream];
    END;
    
    
 
-- RefAnyStream implementation seen by a client RefAny producer
 
 
 ClientProducerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: ClientProducerGet, put: ClientProducerPut, putBack:  ClientProducerPutBack, flush: ClientProducerFlush, close: ClientProducerClose, endOf: ClientProducerEndOf, empty: ErrProc]];
 
  
 ClientProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
 
 ClientProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    IF d.source # NIL THEN {IF NOT (d.source[TRUE, event]).more THEN d.source ← NIL} ELSE ERROR Error[StreamClosed];
    END;
 
 ClientProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
 
 ClientProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
 
 ClientProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    IF d.source # NIL
     THEN {d.stop[FALSE, NIL];
	   d.source ← NIL};
    END;
    
 ClientProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    RETURN[d.source=NIL];
    END;
    
    
    
 --  RefAnyStream implementation seen by a co-forked consumer
 
  
 CoForkedConsumerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: CoForkedConsumerGet, put: CoForkedConsumerPut, putBack: CoForkedConsumerPutBack, flush: CoForkedConsumerFlush, close: CoForkedConsumerClose, endOf: CoForkedConsumerEndOf, empty: ErrProc]];
 
 CoForkedConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    IF d.sink = NIL THEN ERROR Error[StreamClosed];
    IF d.itemReady THEN {d.itemReady ← FALSE; RETURN[d.item]};
    [d.itemReady, d.item] ← d.sink[TRUE, NIL];
    IF d.itemReady
     THEN {d.itemReady ← FALSE; RETURN[d.item]}
     ELSE {d.sink ← NIL; ERROR Error[StreamClosed]};
    END;
 
 CoForkedConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
 
 CoForkedConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented
 
 CoForkedConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
    {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
    
 CoForkedConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
       BEGIN ENABLE UNWIND => NULL; 
       d: StreamData ← NARROW[self.data];
       IF d.itemReady THEN RETURN[FALSE];
       IF d.sink = NIL THEN RETURN[TRUE];
       [d.itemReady, d.item] ← d.source[TRUE, NIL];
       IF NOT d.itemReady THEN d.sink ← NIL;
       RETURN[NOT d.itemReady]
       END;
 
 CoForkedConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
    BEGIN ENABLE UNWIND => NULL; 
    d: StreamData ← NARROW[self.data];
    d.sink ← NIL;
    END;
    
   
  -- co-forked consumer client code
  
 MakeCoForkedConsumer: PROCEDURE[consumer: PROC[REF ANY, RefAnyStream.Handle], data: REF ANY] RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] =
    BEGIN
    sink: RefAnySink ← CoFork.CoForkMe[];
    d: StreamData ← NEW[StreamDataBody ← [ , NIL, NIL, FALSE, NIL, sink]];
    sinkStream: RefAnyStream.Handle;
    IF NOT (sink[FALSE, NIL]).continue THEN d.sink ← NIL;
    sinkStream ← NEW[RefAnyStream.Object ← [procs: CoForkedConsumerStreamProcs, data: d]];
    consumer[data, sinkStream];
    END;
 
 CreateCoForkedConsumerStream: PUBLIC PROC[consumer: PROC[self: REF ANY, source: RefAnyStream.Handle--for use by the co-forked consumer--], data: REF ANY]
         RETURNS[RefAnyStream.Handle--for use by the client producer--] =
    BEGIN
    source: RefAnySource;
    stop: RefAnyStopper;
    d: StreamData;
    sourceStream: RefAnyStream.Handle;
    [source, stop] ← MakeCoForkedConsumer[consumer, data];
    d ← NEW[StreamDataBody ← [ , source, stop, FALSE, NIL, NIL]];
    sourceStream ← NEW[RefAnyStream.Object ← [procs: ClientProducerStreamProcs, data: d]];
    RETURN[sourceStream];
    END;
    

 
       
 END.
 
 -- 16-Dec-81 17:26:53: start PortRealEventStreamImpl, begin formulating how it will work
 -- 17-Dec-81 16:46:46: basic code compiles.  Yet to do: 1) monitor entry procedures, 2) define an interface and export to it, 3) finalization code to arrange to call stop (and nill out the pointer) (or call destroy) 4) opposite direction streams.
 -- 21-Dec-81 13:24:26: now exports PortRealEventStream
 -- 21-Dec-81 13:42:11: add object monitor.  UGH, since monitor lock expression must be uniform over all entry procedures, and since I did not want to split into more than one file, I have had to combine the two type stream object records, leading to waste space.  UGH
 -- 21-Dec-81 14:36:32 made Howard upset about Cedar and its performance. Converted to RefAnyStream. PDR
 -- 21-Dec-81 15:39:32: renamed all things to be RefAnyFoo rather than RealEventFoo
 -- 21-Dec-81 16:52:59: modified ports to pass shut down signals (out of band) in both directions.
 -- 21-Dec-81 17:55:48: renamed module to RefAnyStreamImpl. Renamed XXX to CreateGeneratorDrivenStream, exported to RefAnyStream. Made PortRealEventStream go away. Put in calls on Error in place of ERROR. PDR
 -- 22-Dec-81 11:30:41: confusion on shutdown indication, removed continue field, and will use sink=NIL as signal for consumer requested termination.
 -- 22-Dec-81 12:24:01: rename generator to be producer, and add consumer driven stream (opposite sense from first version)
 -- 22-Dec-81 14:29:53: change nomenclature so as to distinguish producer/consumer, coforked/client, etc issues