RefAnyStreamImpl.mesa
Last Modified On 22-Dec-81 14:30:06 By Sturgis
Last Modified On 21-Jan-82 12:02:44 By Paul Rovner
DIRECTORY
CoFork USING[CoForkMe],
RefAnyStream USING[Handle, Object, ObjectProcs, ErrorCode];
RefAnyStreamImpl: MONITOR LOCKS NARROW[self.data, StreamData] USING self: RefAnyStream.Handle
IMPORTS CoFork
EXPORTS RefAnyStream =
BEGIN
nomenclature
sink is associated with the co-forked procedure
source, stopper are associated with the client
producer puts ref-anys
consumer gets ref-anys
co-fork refers to the "co-forked" call stack, this side signals end by returning
client refers to the non "co-forked" code, this side signals end by calling stopper.
if the producer is co-forked, then the producer will put ref-anys to the sink
it the producer is the client, then the producer will put ref-anys to the source
errors
Error: PUBLIC ERROR [ec: RefAnyStream.ErrorCode] = CODE;
port declarations, must follow a precise pattern specified by CoFork
aItems go from client to co-fork, bItems go from co-fork to client
RefAnySource: TYPE = POINTER TO PORT[continue: BOOLEAN, aItem: REF ANY] RETURNS[more: BOOLEAN, bItem: REF ANY]; -- used by client
RefAnySink: TYPE = POINTER TO PORT[more: BOOLEAN, bItem: REF ANY] RETURNS[continue: BOOLEAN, aItem: REF ANY]; -- used by co-fork
RefAnyStopper: TYPE = PROCEDURE[continue: BOOLEAN, aItem: REF ANY]; -- used by client
the following record type is used by all stream types so that I can compile all in one module, so that the locks expression is uniform for all entry procedures (ugh!!!) note that some fields are unused in each case.
StreamData: TYPE = REF StreamDataBody;
StreamDataBody: TYPE = MONITORED RECORD[
following fields are used in the client stream
source: RefAnySource,
stop: RefAnyStopper,
following fields are used by the consumer
itemReady: BOOLEAN,
item: REF ANY,
following fields are used in the co-forked stream
sink: RefAnySink];
RefAnyStream implementation seen by a co-forked RefAny producer
CoForkedProducerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: CoForkedProducerGet, put: CoForkedProducerPut, putBack: CoForkedProducerPutBack, flush: CoForkedProducerFlush, close: CoForkedProducerClose, endOf: CoForkedProducerEndOf, empty: ErrProc]];
ErrProc: PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
{ERROR Error[NotImplementedForThisStream]};
CoForkedProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.sink # NIL THEN {IF NOT (d.sink[TRUE, event]).continue THEN d.sink ← NIL} ELSE ERROR Error[StreamClosed];
END;
CoForkedProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
d.sink ← NIL;
END;
CoForkedProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
RETURN[d.sink=NIL];
END;
RefAnyStream implementation seen by a client consumer
ClientConsumerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: ClientConsumerGet, put: ClientConsumerPut, putBack: ClientConsumerPutBack, flush: ClientConsumerFlush, close: ClientConsumerClose, endOf: ClientConsumerEndOf, empty: ErrProc]];
ClientConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source = NIL THEN ERROR Error[StreamClosed];
IF d.itemReady THEN {d.itemReady ← FALSE; RETURN[d.item]};
[d.itemReady, d.item] ← d.source[TRUE, NIL];
IF d.itemReady
THEN {d.itemReady ← FALSE; RETURN[d.item]}
ELSE {d.stop[FALSE, NIL];
d.source ← NIL;
ERROR Error[StreamClosed]};
END;
ClientConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented
ClientConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.itemReady THEN RETURN[FALSE];
IF d.source = NIL THEN RETURN[TRUE];
[d.itemReady, d.item] ← d.source[TRUE, NIL];
RETURN[NOT d.itemReady]
END;
ClientConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source # NIL
THEN {d.stop[FALSE, NIL];
d.source ← NIL};
END;
co-forked producer client code
MakeCoForkedProducer: PROCEDURE[producer: PROC[REF ANY, RefAnyStream.Handle], data: REF ANY] RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] =
BEGIN
sink: RefAnySink ← CoFork.CoForkMe[];
d: StreamData ← NEW[StreamDataBody ← [ , NIL, NIL, FALSE, NIL, sink]];
sinkStream: RefAnyStream.Handle;
IF NOT (sink[FALSE, NIL]).continue THEN d.sink ← NIL;
sinkStream ← NEW[RefAnyStream.Object ← [procs: CoForkedProducerStreamProcs, data: d]];
producer[data, sinkStream];
END;
CreateCoForkedProducerStream: PUBLIC PROC[producer: PROC[self: REF ANY, sink: RefAnyStream.Handle--for use by the co-forked producer--], data: REF ANY]
RETURNS[RefAnyStream.Handle--for use by the client consumer--] =
BEGIN
source: RefAnySource;
stop: RefAnyStopper;
d: StreamData;
sourceStream: RefAnyStream.Handle;
[source, stop] ← MakeCoForkedProducer[producer, data];
d ← NEW[StreamDataBody ← [ , source, stop, FALSE, NIL, NIL]];
sourceStream ← NEW[RefAnyStream.Object ← [procs: ClientConsumerStreamProcs, data: d]];
RETURN[sourceStream];
END;
RefAnyStream implementation seen by a client RefAny producer
ClientProducerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: ClientProducerGet, put: ClientProducerPut, putBack: ClientProducerPutBack, flush: ClientProducerFlush, close: ClientProducerClose, endOf: ClientProducerEndOf, empty: ErrProc]];
ClientProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source # NIL THEN {IF NOT (d.source[TRUE, event]).more THEN d.source ← NIL} ELSE ERROR Error[StreamClosed];
END;
ClientProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source # NIL
THEN {d.stop[FALSE, NIL];
d.source ← NIL};
END;
ClientProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
RETURN[d.source=NIL];
END;
RefAnyStream implementation seen by a co-forked consumer
CoForkedConsumerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: CoForkedConsumerGet, put: CoForkedConsumerPut, putBack: CoForkedConsumerPutBack, flush: CoForkedConsumerFlush, close: CoForkedConsumerClose, endOf: CoForkedConsumerEndOf, empty: ErrProc]];
CoForkedConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.sink = NIL THEN ERROR Error[StreamClosed];
IF d.itemReady THEN {d.itemReady ← FALSE; RETURN[d.item]};
[d.itemReady, d.item] ← d.sink[TRUE, NIL];
IF d.itemReady
THEN {d.itemReady ← FALSE; RETURN[d.item]}
ELSE {d.sink ← NIL; ERROR Error[StreamClosed]};
END;
CoForkedConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented
CoForkedConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
CoForkedConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.itemReady THEN RETURN[FALSE];
IF d.sink = NIL THEN RETURN[TRUE];
[d.itemReady, d.item] ← d.source[TRUE, NIL];
IF NOT d.itemReady THEN d.sink ← NIL;
RETURN[NOT d.itemReady]
END;
CoForkedConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
d.sink ← NIL;
END;
co-forked consumer client code
MakeCoForkedConsumer: PROCEDURE[consumer: PROC[REF ANY, RefAnyStream.Handle], data: REF ANY] RETURNS[genItem: RefAnySource, stopItems: RefAnyStopper] =
BEGIN
sink: RefAnySink ← CoFork.CoForkMe[];
d: StreamData ← NEW[StreamDataBody ← [ , NIL, NIL, FALSE, NIL, sink]];
sinkStream: RefAnyStream.Handle;
IF NOT (sink[FALSE, NIL]).continue THEN d.sink ← NIL;
sinkStream ← NEW[RefAnyStream.Object ← [procs: CoForkedConsumerStreamProcs, data: d]];
consumer[data, sinkStream];
END;
CreateCoForkedConsumerStream: PUBLIC PROC[consumer: PROC[self: REF ANY, source: RefAnyStream.Handle--for use by the co-forked consumer--], data: REF ANY]
RETURNS[RefAnyStream.Handle--for use by the client producer--] =
BEGIN
source: RefAnySource;
stop: RefAnyStopper;
d: StreamData;
sourceStream: RefAnyStream.Handle;
[source, stop] ← MakeCoForkedConsumer[consumer, data];
d ← NEW[StreamDataBody ← [ , source, stop, FALSE, NIL, NIL]];
sourceStream ← NEW[RefAnyStream.Object ← [procs: ClientProducerStreamProcs, data: d]];
RETURN[sourceStream];
END;
END.
16-Dec-81 17:26:53: start PortRealEventStreamImpl, begin formulating how it will work
17-Dec-81 16:46:46: basic code compiles. Yet to do: 1) monitor entry procedures, 2) define an interface and export to it, 3) finalization code to arrange to call stop (and nill out the pointer) (or call destroy) 4) opposite direction streams.
21-Dec-81 13:24:26: now exports PortRealEventStream
21-Dec-81 13:42:11: add object monitor. UGH, since monitor lock expression must be uniform over all entry procedures, and since I did not want to split into more than one file, I have had to combine the two type stream object records, leading to waste space. UGH
21-Dec-81 14:36:32 made Howard upset about Cedar and its performance. Converted to RefAnyStream. PDR
21-Dec-81 15:39:32: renamed all things to be RefAnyFoo rather than RealEventFoo
21-Dec-81 16:52:59: modified ports to pass shut down signals (out of band) in both directions.
21-Dec-81 17:55:48: renamed module to RefAnyStreamImpl. Renamed XXX to CreateGeneratorDrivenStream, exported to RefAnyStream. Made PortRealEventStream go away. Put in calls on Error in place of ERROR. PDR
22-Dec-81 11:30:41: confusion on shutdown indication, removed continue field, and will use sink=NIL as signal for consumer requested termination.
22-Dec-81 12:24:01: rename generator to be producer, and add consumer driven stream (opposite sense from first version)
22-Dec-81 14:29:53: change nomenclature so as to distinguish producer/consumer, coforked/client, etc issues