DIRECTORY CoFork USING[CoForkMe], RefAnyStream USING[Handle, Object, ObjectProcs, ErrorCode]; RefAnyStreamImpl: MONITOR LOCKS NARROW[self.data, StreamData] USING self: RefAnyStream.Handle IMPORTS CoFork EXPORTS RefAnyStream = BEGIN Error: PUBLIC ERROR [ec: RefAnyStream.ErrorCode] = CODE; RefAnySource: TYPE = POINTER TO PORT[continue: BOOLEAN, aItem: REF ANY] RETURNS[more: BOOLEAN, bItem: REF ANY]; -- used by client RefAnySink: TYPE = POINTER TO PORT[more: BOOLEAN, bItem: REF ANY] RETURNS[continue: BOOLEAN, aItem: REF ANY]; -- used by co-fork RefAnyStopper: TYPE = PROCEDURE[continue: BOOLEAN, aItem: REF ANY]; -- used by client StreamData: TYPE = REF StreamDataBody; StreamDataBody: TYPE = MONITORED RECORD[ source: RefAnySource, stop: RefAnyStopper, itemReady: BOOLEAN, item: REF ANY, sink: RefAnySink]; CoForkedProducerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: CoForkedProducerGet, put: CoForkedProducerPut, putBack: CoForkedProducerPutBack, flush: CoForkedProducerFlush, close: CoForkedProducerClose, endOf: CoForkedProducerEndOf, empty: ErrProc]]; ErrProc: PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = {ERROR Error[NotImplementedForThisStream]}; CoForkedProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.sink # NIL THEN {IF NOT (d.sink[TRUE, event]).continue THEN d.sink _ NIL} ELSE ERROR Error[StreamClosed]; END; CoForkedProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; d.sink _ NIL; END; CoForkedProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; RETURN[d.sink=NIL]; END; ClientConsumerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: ClientConsumerGet, put: ClientConsumerPut, putBack: ClientConsumerPutBack, flush: ClientConsumerFlush, close: ClientConsumerClose, endOf: ClientConsumerEndOf, empty: ErrProc]]; ClientConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source = NIL THEN ERROR Error[StreamClosed]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]}; [d.itemReady, d.item] _ d.source[TRUE, NIL]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]} ELSE {d.stop[FALSE, NIL]; d.source _ NIL; ERROR Error[StreamClosed]}; END; ClientConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented ClientConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.itemReady THEN RETURN[FALSE]; IF d.source = NIL THEN RETURN[TRUE]; [d.itemReady, d.item] _ d.source[TRUE, NIL]; RETURN[NOT d.itemReady] END; ClientConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {d.stop[FALSE, NIL]; d.source _ NIL}; END; MakeCoForkedProducer: PROCEDURE[producer: PROC[REF ANY, RefAnyStream.Handle], data: REF ANY] RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] = BEGIN sink: RefAnySink _ CoFork.CoForkMe[]; d: StreamData _ NEW[StreamDataBody _ [ , NIL, NIL, FALSE, NIL, sink]]; sinkStream: RefAnyStream.Handle; IF NOT (sink[FALSE, NIL]).continue THEN d.sink _ NIL; sinkStream _ NEW[RefAnyStream.Object _ [procs: CoForkedProducerStreamProcs, data: d]]; producer[data, sinkStream]; END; CreateCoForkedProducerStream: PUBLIC PROC[producer: PROC[self: REF ANY, sink: RefAnyStream.Handle--for use by the co-forked producer--], data: REF ANY] RETURNS[RefAnyStream.Handle--for use by the client consumer--] = BEGIN source: RefAnySource; stop: RefAnyStopper; d: StreamData; sourceStream: RefAnyStream.Handle; [source, stop] _ MakeCoForkedProducer[producer, data]; d _ NEW[StreamDataBody _ [ , source, stop, FALSE, NIL, NIL]]; sourceStream _ NEW[RefAnyStream.Object _ [procs: ClientConsumerStreamProcs, data: d]]; RETURN[sourceStream]; END; ClientProducerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: ClientProducerGet, put: ClientProducerPut, putBack: ClientProducerPutBack, flush: ClientProducerFlush, close: ClientProducerClose, endOf: ClientProducerEndOf, empty: ErrProc]]; ClientProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {IF NOT (d.source[TRUE, event]).more THEN d.source _ NIL} ELSE ERROR Error[StreamClosed]; END; ClientProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {d.stop[FALSE, NIL]; d.source _ NIL}; END; ClientProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; RETURN[d.source=NIL]; END; CoForkedConsumerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: CoForkedConsumerGet, put: CoForkedConsumerPut, putBack: CoForkedConsumerPutBack, flush: CoForkedConsumerFlush, close: CoForkedConsumerClose, endOf: CoForkedConsumerEndOf, empty: ErrProc]]; CoForkedConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.sink = NIL THEN ERROR Error[StreamClosed]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]}; [d.itemReady, d.item] _ d.sink[TRUE, NIL]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]} ELSE {d.sink _ NIL; ERROR Error[StreamClosed]}; END; CoForkedConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented CoForkedConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; CoForkedConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.itemReady THEN RETURN[FALSE]; IF d.sink = NIL THEN RETURN[TRUE]; [d.itemReady, d.item] _ d.source[TRUE, NIL]; IF NOT d.itemReady THEN d.sink _ NIL; RETURN[NOT d.itemReady] END; CoForkedConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; d.sink _ NIL; END; MakeCoForkedConsumer: PROCEDURE[consumer: PROC[REF ANY, RefAnyStream.Handle], data: REF ANY] RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] = BEGIN sink: RefAnySink _ CoFork.CoForkMe[]; d: StreamData _ NEW[StreamDataBody _ [ , NIL, NIL, FALSE, NIL, sink]]; sinkStream: RefAnyStream.Handle; IF NOT (sink[FALSE, NIL]).continue THEN d.sink _ NIL; sinkStream _ NEW[RefAnyStream.Object _ [procs: CoForkedConsumerStreamProcs, data: d]]; consumer[data, sinkStream]; END; CreateCoForkedConsumerStream: PUBLIC PROC[consumer: PROC[self: REF ANY, source: RefAnyStream.Handle--for use by the co-forked consumer--], data: REF ANY] RETURNS[RefAnyStream.Handle--for use by the client producer--] = BEGIN source: RefAnySource; stop: RefAnyStopper; d: StreamData; sourceStream: RefAnyStream.Handle; [source, stop] _ MakeCoForkedConsumer[consumer, data]; d _ NEW[StreamDataBody _ [ , source, stop, FALSE, NIL, NIL]]; sourceStream _ NEW[RefAnyStream.Object _ [procs: ClientProducerStreamProcs, data: d]]; RETURN[sourceStream]; END; END. ZRefAnyStreamImpl.mesa Last Modified On 22-Dec-81 14:30:06 By Sturgis Last Modified On 21-Jan-82 12:02:44 By Paul Rovner nomenclature sink is associated with the co-forked procedure source, stopper are associated with the client producer puts ref-anys consumer gets ref-anys co-fork refers to the "co-forked" call stack, this side signals end by returning client refers to the non "co-forked" code, this side signals end by calling stopper. if the producer is co-forked, then the producer will put ref-anys to the sink it the producer is the client, then the producer will put ref-anys to the source errors port declarations, must follow a precise pattern specified by CoFork aItems go from client to co-fork, bItems go from co-fork to client the following record type is used by all stream types so that I can compile all in one module, so that the locks expression is uniform for all entry procedures (ugh!!!) note that some fields are unused in each case. following fields are used in the client stream following fields are used by the consumer following fields are used in the co-forked stream RefAnyStream implementation seen by a co-forked RefAny producer RefAnyStream implementation seen by a client consumer co-forked producer client code RefAnyStream implementation seen by a client RefAny producer RefAnyStream implementation seen by a co-forked consumer co-forked consumer client code 16-Dec-81 17:26:53: start PortRealEventStreamImpl, begin formulating how it will work 17-Dec-81 16:46:46: basic code compiles. Yet to do: 1) monitor entry procedures, 2) define an interface and export to it, 3) finalization code to arrange to call stop (and nill out the pointer) (or call destroy) 4) opposite direction streams. 21-Dec-81 13:24:26: now exports PortRealEventStream 21-Dec-81 13:42:11: add object monitor. UGH, since monitor lock expression must be uniform over all entry procedures, and since I did not want to split into more than one file, I have had to combine the two type stream object records, leading to waste space. UGH 21-Dec-81 14:36:32 made Howard upset about Cedar and its performance. Converted to RefAnyStream. PDR 21-Dec-81 15:39:32: renamed all things to be RefAnyFoo rather than RealEventFoo 21-Dec-81 16:52:59: modified ports to pass shut down signals (out of band) in both directions. 21-Dec-81 17:55:48: renamed module to RefAnyStreamImpl. Renamed XXX to CreateGeneratorDrivenStream, exported to RefAnyStream. Made PortRealEventStream go away. Put in calls on Error in place of ERROR. PDR 22-Dec-81 11:30:41: confusion on shutdown indication, removed continue field, and will use sink=NIL as signal for consumer requested termination. 22-Dec-81 12:24:01: rename generator to be producer, and add consumer driven stream (opposite sense from first version) 22-Dec-81 14:29:53: change nomenclature so as to distinguish producer/consumer, coforked/client, etc issues Ê e˜Jšœ™Jšœ.™.Jšœ2™2J˜šÏk ˜ Jšœœ ˜Jšœ œ)˜;J˜—š œœœœœ˜^Jšœ˜Jšœ˜Jš˜J˜šœ ™ Jšœ/™/Jšœ.™.J˜Jšœ™Jšœ™J˜JšœP™PJšœT™TJ˜JšœM™MJšœP™PJ˜——Jšœ™Jšœœœ œ˜8J˜JšœD™D˜JšœB™BJ˜J˜Jšœœœœœ œ œœœœ œœÏc˜Jšœ œœœœœ œœœ œ œœž˜€Jš Ïn œœ œ œ œœž˜UJ˜JšœÙ™ÙJ˜Jšœ œœ˜&J˜šœœ œœ˜(šœ.™.J˜J˜J˜—šœ)™)Jšœ œ˜Jšœœœ˜J˜—šœ1™1J˜J˜J˜J˜——Jšœ?™?J˜J˜Jšœœœß˜žJ˜šŸœ œœœ˜@Jšœœ%˜+J˜—š Ÿœœ œœœœ˜RJš œœœœœ%˜BJ˜—š Ÿœœ œ#œœ˜QJšœœœœ˜Jšœœ ˜"Jšœ œœœœ œœ œœœ˜nJšœ˜J˜—š Ÿœœ œ#œœ˜UJš œœœœœ%˜BJ˜—šŸœœ œ˜CJš œœœœœ%˜BJ˜—šŸœœ œ˜CJšœœœœ˜Jšœœ ˜"Jšœ œ˜ Jšœ˜J˜—š Ÿœœ œœœ˜TJšœœœœ˜Jšœœ ˜"Jšœœ˜Jšœ˜J˜J˜J˜—Jšœ5™5J˜J˜JšœœœÒ˜J˜š Ÿœœ œœœœ˜PJšœœœœ˜Jšœœ ˜"Jšœ œœœ˜1Jšœ œœœ ˜:Jšœ!œœ˜,šœ ˜Jšœœœ ˜*šœ œœ˜Jšœ œ˜Jšœ˜——Jšœ˜J˜—š Ÿœœ œ#œœ˜OJš œœœœœ%˜BJ˜—š Ÿœœ œ#œœ˜SJš œœœœœ&ž˜UJ˜—šŸœœ œ˜AJš œœœœœ%˜BJ˜—š Ÿœœ œœœ˜RJšœœœœ˜Jšœœ ˜"Jšœ œœœ˜"Jš œ œœœœ˜$Jšœ!œœ˜,Jšœœ ˜Jšœ˜J˜—šŸœœ œ˜AJšœœœœ˜Jšœœ ˜"šœ ˜šœ œœ˜Jšœ œ˜——Jšœ˜J˜J˜J˜J˜—Jšœ™J˜šŸœ œ œœœœœœ3˜—Jš˜J˜%Jš œœœœœœ ˜FJ˜ Jš œœœœ œ œ˜5Jšœ œF˜VJ˜Jšœ˜J˜—šŸœœœ œœœž%œ œœ˜—Jšœž"œ˜@Jš˜J˜J˜J˜J˜"J˜6Jš œœ$œœœ˜=JšœœD˜VJšœ˜Jšœ˜J˜J˜J˜——Jšœ<™