aItems go from client to co-fork, bItems go from co-fork to client
RefAnySource: TYPE = POINTER TO PORT[continue: BOOLEAN, aItem: REF ANY] RETURNS[more: BOOLEAN, bItem: REF ANY]; -- used by client
RefAnySink: TYPE = POINTER TO PORT[more: BOOLEAN, bItem: REF ANY] RETURNS[continue: BOOLEAN, aItem: REF ANY]; -- used by co-fork
RefAnyStopper: TYPE = PROCEDURE[continue: BOOLEAN, aItem: REF ANY]; -- used by client
the following record type is used by all stream types so that I can compile all in one module, so that the locks expression is uniform for all entry procedures (ugh!!!) note that some fields are unused in each case.
StreamData: TYPE = REF StreamDataBody;
StreamDataBody:
TYPE =
MONITORED
RECORD[
following fields are used in the client stream
source: RefAnySource,
stop: RefAnyStopper,
following fields are used by the consumer
itemReady: BOOLEAN,
item: REF ANY,
following fields are used in the co-forked stream
sink: RefAnySink];
RefAnyStream implementation seen by a co-forked RefAny producer
CoForkedProducerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: CoForkedProducerGet, put: CoForkedProducerPut, putBack: CoForkedProducerPutBack, flush: CoForkedProducerFlush, close: CoForkedProducerClose, endOf: CoForkedProducerEndOf, empty: ErrProc]];
ErrProc:
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
BOOLEAN] =
{ERROR Error[NotImplementedForThisStream]};
CoForkedProducerGet:
ENTRY
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
REF
ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedProducerPut:
ENTRY
PROCEDURE[self: RefAnyStream.Handle, event:
REF
ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.sink # NIL THEN {IF NOT (d.sink[TRUE, event]).continue THEN d.sink ← NIL} ELSE ERROR Error[StreamClosed];
END;
CoForkedProducerPutBack:
ENTRY
PROCEDURE[self: RefAnyStream.Handle, event:
REF
ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedProducerFlush:
ENTRY
PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedProducerClose:
ENTRY
PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
d.sink ← NIL;
END;
CoForkedProducerEndOf:
ENTRY
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
RETURN[d.sink=NIL];
END;
RefAnyStream implementation seen by a client consumer
ClientConsumerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: ClientConsumerGet, put: ClientConsumerPut, putBack: ClientConsumerPutBack, flush: ClientConsumerFlush, close: ClientConsumerClose, endOf: ClientConsumerEndOf, empty: ErrProc]];
ClientConsumerGet:
ENTRY
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
REF
ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source = NIL THEN ERROR Error[StreamClosed];
IF d.itemReady THEN {d.itemReady ← FALSE; RETURN[d.item]};
[d.itemReady, d.item] ← d.source[TRUE, NIL];
IF d.itemReady
THEN {d.itemReady ← FALSE; RETURN[d.item]}
ELSE {d.stop[
FALSE,
NIL];
d.source ← NIL;
ERROR Error[StreamClosed]};
END;
ClientConsumerPut:
ENTRY
PROCEDURE[self: RefAnyStream.Handle, event:
REF
ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientConsumerPutBack:
ENTRY
PROCEDURE[self: RefAnyStream.Handle, event:
REF
ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented
ClientConsumerFlush:
ENTRY
PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientConsumerEndOf:
ENTRY
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.itemReady THEN RETURN[FALSE];
IF d.source = NIL THEN RETURN[TRUE];
[d.itemReady, d.item] ← d.source[TRUE, NIL];
RETURN[NOT d.itemReady]
END;
ClientConsumerClose:
ENTRY
PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source #
NIL
THEN {d.stop[
FALSE,
NIL];
d.source ← NIL};
END;
co-forked producer client code
MakeCoForkedProducer:
PROCEDURE[producer:
PROC[
REF
ANY, RefAnyStream.Handle], data:
REF
ANY]
RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] =
BEGIN
sink: RefAnySink ← CoFork.CoForkMe[];
d: StreamData ← NEW[StreamDataBody ← [ , NIL, NIL, FALSE, NIL, sink]];
sinkStream: RefAnyStream.Handle;
IF NOT (sink[FALSE, NIL]).continue THEN d.sink ← NIL;
sinkStream ← NEW[RefAnyStream.Object ← [procs: CoForkedProducerStreamProcs, data: d]];
producer[data, sinkStream];
END;
CreateCoForkedProducerStream:
PUBLIC
PROC[producer:
PROC[self:
REF
ANY, sink: RefAnyStream.Handle
--for use by the co-forked producer--], data:
REF
ANY]
RETURNS[RefAnyStream.Handle--for use by the client consumer--] =
BEGIN
source: RefAnySource;
stop: RefAnyStopper;
d: StreamData;
sourceStream: RefAnyStream.Handle;
[source, stop] ← MakeCoForkedProducer[producer, data];
d ← NEW[StreamDataBody ← [ , source, stop, FALSE, NIL, NIL]];
sourceStream ← NEW[RefAnyStream.Object ← [procs: ClientConsumerStreamProcs, data: d]];
RETURN[sourceStream];
END;
ClientProducerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: ClientProducerGet, put: ClientProducerPut, putBack: ClientProducerPutBack, flush: ClientProducerFlush, close: ClientProducerClose, endOf: ClientProducerEndOf, empty: ErrProc]];
ClientProducerGet:
ENTRY
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
REF
ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerPut:
ENTRY
PROCEDURE[self: RefAnyStream.Handle, event:
REF
ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source # NIL THEN {IF NOT (d.source[TRUE, event]).more THEN d.source ← NIL} ELSE ERROR Error[StreamClosed];
END;
ClientProducerPutBack:
ENTRY
PROCEDURE[self: RefAnyStream.Handle, event:
REF
ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerFlush:
ENTRY
PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerClose:
ENTRY
PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source #
NIL
THEN {d.stop[
FALSE,
NIL];
d.source ← NIL};
END;
ClientProducerEndOf:
ENTRY
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
RETURN[d.source=NIL];
END;
RefAnyStream implementation seen by a co-forked consumer
CoForkedConsumerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: CoForkedConsumerGet, put: CoForkedConsumerPut, putBack: CoForkedConsumerPutBack, flush: CoForkedConsumerFlush, close: CoForkedConsumerClose, endOf: CoForkedConsumerEndOf, empty: ErrProc]];
CoForkedConsumerGet:
ENTRY
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
REF
ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.sink = NIL THEN ERROR Error[StreamClosed];
IF d.itemReady THEN {d.itemReady ← FALSE; RETURN[d.item]};
[d.itemReady, d.item] ← d.sink[TRUE, NIL];
IF d.itemReady
THEN {d.itemReady ← FALSE; RETURN[d.item]}
ELSE {d.sink ← NIL; ERROR Error[StreamClosed]};
END;
CoForkedConsumerPut:
ENTRY
PROCEDURE[self: RefAnyStream.Handle, event:
REF
ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedConsumerPutBack:
ENTRY
PROCEDURE[self: RefAnyStream.Handle, event:
REF
ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented
CoForkedConsumerFlush:
ENTRY
PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedConsumerEndOf:
ENTRY
PROCEDURE[self: RefAnyStream.Handle]
RETURNS[
BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.itemReady THEN RETURN[FALSE];
IF d.sink = NIL THEN RETURN[TRUE];
[d.itemReady, d.item] ← d.source[TRUE, NIL];
IF NOT d.itemReady THEN d.sink ← NIL;
RETURN[NOT d.itemReady]
END;
CoForkedConsumerClose:
ENTRY
PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
d.sink ← NIL;
END;
co-forked consumer client code
MakeCoForkedConsumer:
PROCEDURE[consumer:
PROC[
REF
ANY, RefAnyStream.Handle], data:
REF
ANY]
RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] =
BEGIN
sink: RefAnySink ← CoFork.CoForkMe[];
d: StreamData ← NEW[StreamDataBody ← [ , NIL, NIL, FALSE, NIL, sink]];
sinkStream: RefAnyStream.Handle;
IF NOT (sink[FALSE, NIL]).continue THEN d.sink ← NIL;
sinkStream ← NEW[RefAnyStream.Object ← [procs: CoForkedConsumerStreamProcs, data: d]];
consumer[data, sinkStream];
END;
CreateCoForkedConsumerStream:
PUBLIC
PROC[consumer:
PROC[self:
REF
ANY, source: RefAnyStream.Handle
--for use by the co-forked consumer--], data:
REF
ANY]
RETURNS[RefAnyStream.Handle--for use by the client producer--] =
BEGIN
source: RefAnySource;
stop: RefAnyStopper;
d: StreamData;
sourceStream: RefAnyStream.Handle;
[source, stop] ← MakeCoForkedConsumer[consumer, data];
d ← NEW[StreamDataBody ← [ , source, stop, FALSE, NIL, NIL]];
sourceStream ← NEW[RefAnyStream.Object ← [procs: ClientProducerStreamProcs, data: d]];
RETURN[sourceStream];
END;
END.
16-Dec-81 17:26:53: start PortRealEventStreamImpl, begin formulating how it will work
17-Dec-81 16:46:46: basic code compiles. Yet to do: 1) monitor entry procedures, 2) define an interface and export to it, 3) finalization code to arrange to call stop (and nill out the pointer) (or call destroy) 4) opposite direction streams.
21-Dec-81 13:24:26: now exports PortRealEventStream
21-Dec-81 13:42:11: add object monitor. UGH, since monitor lock expression must be uniform over all entry procedures, and since I did not want to split into more than one file, I have had to combine the two type stream object records, leading to waste space. UGH
21-Dec-81 14:36:32 made Howard upset about Cedar and its performance. Converted to RefAnyStream. PDR
21-Dec-81 15:39:32: renamed all things to be RefAnyFoo rather than RealEventFoo
21-Dec-81 16:52:59: modified ports to pass shut down signals (out of band) in both directions.
21-Dec-81 17:55:48: renamed module to RefAnyStreamImpl. Renamed XXX to CreateGeneratorDrivenStream, exported to RefAnyStream. Made PortRealEventStream go away. Put in calls on Error in place of ERROR. PDR
22-Dec-81 11:30:41: confusion on shutdown indication, removed continue field, and will use sink=NIL as signal for consumer requested termination.
22-Dec-81 12:24:01: rename generator to be producer, and add consumer driven stream (opposite sense from first version)
22-Dec-81 14:29:53: change nomenclature so as to distinguish producer/consumer, coforked/client, etc issues