-- RefAnyStreamImpl.mesa -- Last Modified On 22-Dec-81 14:30:06 By Sturgis -- Last Modified On 21-Jan-82 12:02:44 By Paul Rovner DIRECTORY CoFork USING[CoForkMe], RefAnyStream USING[Handle, Object, ObjectProcs, ErrorCode]; RefAnyStreamImpl: MONITOR LOCKS NARROW[self.data, StreamData] USING self: RefAnyStream.Handle IMPORTS CoFork EXPORTS RefAnyStream = BEGIN -- nomenclature -- sink is associated with the co-forked procedure -- source, stopper are associated with the client -- producer puts ref-anys -- consumer gets ref-anys -- co-fork refers to the "co-forked" call stack, this side signals end by returning -- client refers to the non "co-forked" code, this side signals end by calling stopper. -- if the producer is co-forked, then the producer will put ref-anys to the sink -- it the producer is the client, then the producer will put ref-anys to the source -- errors Error: PUBLIC ERROR [ec: RefAnyStream.ErrorCode] = CODE; -- port declarations, must follow a precise pattern specified by CoFork -- aItems go from client to co-fork, bItems go from co-fork to client RefAnySource: TYPE = POINTER TO PORT[continue: BOOLEAN, aItem: REF ANY] RETURNS[more: BOOLEAN, bItem: REF ANY]; -- used by client RefAnySink: TYPE = POINTER TO PORT[more: BOOLEAN, bItem: REF ANY] RETURNS[continue: BOOLEAN, aItem: REF ANY]; -- used by co-fork RefAnyStopper: TYPE = PROCEDURE[continue: BOOLEAN, aItem: REF ANY]; -- used by client -- the following record type is used by all stream types so that I can compile all in one module, so that the locks expression is uniform for all entry procedures (ugh!!!) note that some fields are unused in each case. StreamData: TYPE = REF StreamDataBody; StreamDataBody: TYPE = MONITORED RECORD[ -- following fields are used in the client stream source: RefAnySource, stop: RefAnyStopper, -- following fields are used by the consumer itemReady: BOOLEAN, item: REF ANY, -- following fields are used in the co-forked stream sink: RefAnySink]; -- RefAnyStream implementation seen by a co-forked RefAny producer CoForkedProducerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: CoForkedProducerGet, put: CoForkedProducerPut, putBack: CoForkedProducerPutBack, flush: CoForkedProducerFlush, close: CoForkedProducerClose, endOf: CoForkedProducerEndOf, empty: ErrProc]]; ErrProc: PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = {ERROR Error[NotImplementedForThisStream]}; CoForkedProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.sink # NIL THEN {IF NOT (d.sink[TRUE, event]).continue THEN d.sink _ NIL} ELSE ERROR Error[StreamClosed]; END; CoForkedProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; d.sink _ NIL; END; CoForkedProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; RETURN[d.sink=NIL]; END; -- RefAnyStream implementation seen by a client consumer ClientConsumerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: ClientConsumerGet, put: ClientConsumerPut, putBack: ClientConsumerPutBack, flush: ClientConsumerFlush, close: ClientConsumerClose, endOf: ClientConsumerEndOf, empty: ErrProc]]; ClientConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source = NIL THEN ERROR Error[StreamClosed]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]}; [d.itemReady, d.item] _ d.source[TRUE, NIL]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]} ELSE {d.stop[FALSE, NIL]; d.source _ NIL; ERROR Error[StreamClosed]}; END; ClientConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented ClientConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.itemReady THEN RETURN[FALSE]; IF d.source = NIL THEN RETURN[TRUE]; [d.itemReady, d.item] _ d.source[TRUE, NIL]; RETURN[NOT d.itemReady] END; ClientConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {d.stop[FALSE, NIL]; d.source _ NIL}; END; -- co-forked producer client code MakeCoForkedProducer: PROCEDURE[producer: PROC[REF ANY, RefAnyStream.Handle], data: REF ANY] RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] = BEGIN sink: RefAnySink _ CoFork.CoForkMe[]; d: StreamData _ NEW[StreamDataBody _ [ , NIL, NIL, FALSE, NIL, sink]]; sinkStream: RefAnyStream.Handle; IF NOT (sink[FALSE, NIL]).continue THEN d.sink _ NIL; sinkStream _ NEW[RefAnyStream.Object _ [procs: CoForkedProducerStreamProcs, data: d]]; producer[data, sinkStream]; END; CreateCoForkedProducerStream: PUBLIC PROC[producer: PROC[self: REF ANY, sink: RefAnyStream.Handle--for use by the co-forked producer--], data: REF ANY] RETURNS[RefAnyStream.Handle--for use by the client consumer--] = BEGIN source: RefAnySource; stop: RefAnyStopper; d: StreamData; sourceStream: RefAnyStream.Handle; [source, stop] _ MakeCoForkedProducer[producer, data]; d _ NEW[StreamDataBody _ [ , source, stop, FALSE, NIL, NIL]]; sourceStream _ NEW[RefAnyStream.Object _ [procs: ClientConsumerStreamProcs, data: d]]; RETURN[sourceStream]; END; -- RefAnyStream implementation seen by a client RefAny producer ClientProducerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: ClientProducerGet, put: ClientProducerPut, putBack: ClientProducerPutBack, flush: ClientProducerFlush, close: ClientProducerClose, endOf: ClientProducerEndOf, empty: ErrProc]]; ClientProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {IF NOT (d.source[TRUE, event]).more THEN d.source _ NIL} ELSE ERROR Error[StreamClosed]; END; ClientProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {d.stop[FALSE, NIL]; d.source _ NIL}; END; ClientProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; RETURN[d.source=NIL]; END; -- RefAnyStream implementation seen by a co-forked consumer CoForkedConsumerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: CoForkedConsumerGet, put: CoForkedConsumerPut, putBack: CoForkedConsumerPutBack, flush: CoForkedConsumerFlush, close: CoForkedConsumerClose, endOf: CoForkedConsumerEndOf, empty: ErrProc]]; CoForkedConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.sink = NIL THEN ERROR Error[StreamClosed]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]}; [d.itemReady, d.item] _ d.sink[TRUE, NIL]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]} ELSE {d.sink _ NIL; ERROR Error[StreamClosed]}; END; CoForkedConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented CoForkedConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.itemReady THEN RETURN[FALSE]; IF d.sink = NIL THEN RETURN[TRUE]; [d.itemReady, d.item] _ d.source[TRUE, NIL]; IF NOT d.itemReady THEN d.sink _ NIL; RETURN[NOT d.itemReady] END; CoForkedConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; d.sink _ NIL; END; -- co-forked consumer client code MakeCoForkedConsumer: PROCEDURE[consumer: PROC[REF ANY, RefAnyStream.Handle], data: REF ANY] RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] = BEGIN sink: RefAnySink _ CoFork.CoForkMe[]; d: StreamData _ NEW[StreamDataBody _ [ , NIL, NIL, FALSE, NIL, sink]]; sinkStream: RefAnyStream.Handle; IF NOT (sink[FALSE, NIL]).continue THEN d.sink _ NIL; sinkStream _ NEW[RefAnyStream.Object _ [procs: CoForkedConsumerStreamProcs, data: d]]; consumer[data, sinkStream]; END; CreateCoForkedConsumerStream: PUBLIC PROC[consumer: PROC[self: REF ANY, source: RefAnyStream.Handle--for use by the co-forked consumer--], data: REF ANY] RETURNS[RefAnyStream.Handle--for use by the client producer--] = BEGIN source: RefAnySource; stop: RefAnyStopper; d: StreamData; sourceStream: RefAnyStream.Handle; [source, stop] _ MakeCoForkedConsumer[consumer, data]; d _ NEW[StreamDataBody _ [ , source, stop, FALSE, NIL, NIL]]; sourceStream _ NEW[RefAnyStream.Object _ [procs: ClientProducerStreamProcs, data: d]]; RETURN[sourceStream]; END; END. -- 16-Dec-81 17:26:53: start PortRealEventStreamImpl, begin formulating how it will work -- 17-Dec-81 16:46:46: basic code compiles. Yet to do: 1) monitor entry procedures, 2) define an interface and export to it, 3) finalization code to arrange to call stop (and nill out the pointer) (or call destroy) 4) opposite direction streams. -- 21-Dec-81 13:24:26: now exports PortRealEventStream -- 21-Dec-81 13:42:11: add object monitor. UGH, since monitor lock expression must be uniform over all entry procedures, and since I did not want to split into more than one file, I have had to combine the two type stream object records, leading to waste space. UGH -- 21-Dec-81 14:36:32 made Howard upset about Cedar and its performance. Converted to RefAnyStream. PDR -- 21-Dec-81 15:39:32: renamed all things to be RefAnyFoo rather than RealEventFoo -- 21-Dec-81 16:52:59: modified ports to pass shut down signals (out of band) in both directions. -- 21-Dec-81 17:55:48: renamed module to RefAnyStreamImpl. Renamed XXX to CreateGeneratorDrivenStream, exported to RefAnyStream. Made PortRealEventStream go away. Put in calls on Error in place of ERROR. PDR -- 22-Dec-81 11:30:41: confusion on shutdown indication, removed continue field, and will use sink=NIL as signal for consumer requested termination. -- 22-Dec-81 12:24:01: rename generator to be producer, and add consumer driven stream (opposite sense from first version) -- 22-Dec-81 14:29:53: change nomenclature so as to distinguish producer/consumer, coforked/client, etc issues