RefAnyStreamImpl.mesa
Jlarson, July 10, 1985 4:24:53 pm PDT
DIRECTORY
RefAnyStream USING[Handle, ObjectProcs, ErrorCode];
RefAnyStreamImpl: MONITOR LOCKS NARROW[self.data, StreamData] USING self: RefAnyStream.Handle
EXPORTS RefAnyStream =
BEGIN
nomenclature
source, stopper are associated with the client
producer puts ref-anys
consumer gets ref-anys
client signals end by calling stopper.
the producer will put ref-anys to the source
errors
Error: PUBLIC ERROR [ec: RefAnyStream.ErrorCode] = CODE;
port declarations, must follow a precise pattern specified by CoFork
aItems go from client to co-fork, bItems go from co-fork to client
RefAnySource: TYPE = POINTER TO PORT[continue: BOOLEAN, aItem: REF ANY] RETURNS[more: BOOLEAN, bItem: REF ANY]; -- used by client
RefAnyStopper: TYPE = PROCEDURE[continue: BOOLEAN, aItem: REF ANY]; -- used by client
the following record type is used by all stream types so that I can compile all in one module, so that the locks expression is uniform for all entry procedures (ugh!!!) note that some fields are unused in each case.
StreamData: TYPE = REF StreamDataBody;
StreamDataBody: TYPE = MONITORED RECORD[
following fields are used in the client stream
source: RefAnySource,
stop: RefAnyStopper,
following fields are used by the consumer
itemReady: BOOLEAN,
item: REF ANY];
ErrProc: PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
{ERROR Error[NotImplementedForThisStream]};
RefAnyStream implementation seen by a client consumer
ClientConsumerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: ClientConsumerGet, put: ClientConsumerPut, putBack: ClientConsumerPutBack, flush: ClientConsumerFlush, close: ClientConsumerClose, endOf: ClientConsumerEndOf, empty: ErrProc]];
ClientConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source = NIL THEN ERROR Error[StreamClosed];
IF d.itemReady THEN {d.itemReady ← FALSE; RETURN[d.item]};
[d.itemReady, d.item] ← d.source[TRUE, NIL];
IF d.itemReady
THEN {d.itemReady ← FALSE; RETURN[d.item]}
ELSE {d.stop[FALSE, NIL];
d.source ← NIL;
ERROR Error[StreamClosed]};
END;
ClientConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented
ClientConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.itemReady THEN RETURN[FALSE];
IF d.source = NIL THEN RETURN[TRUE];
[d.itemReady, d.item] ← d.source[TRUE, NIL];
RETURN[NOT d.itemReady]
END;
ClientConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source # NIL
THEN {d.stop[FALSE, NIL];
d.source ← NIL};
END;
RefAnyStream implementation seen by a client RefAny producer
ClientProducerStreamProcs: REF RefAnyStream.ObjectProcs ← NEW[RefAnyStream.ObjectProcs ← [get: ClientProducerGet, put: ClientProducerPut, putBack: ClientProducerPutBack, flush: ClientProducerFlush, close: ClientProducerClose, endOf: ClientProducerEndOf, empty: ErrProc]];
ClientProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source # NIL THEN {IF NOT (d.source[TRUE, event]).more THEN d.source ← NIL} ELSE ERROR Error[StreamClosed];
END;
ClientProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
{ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]};
ClientProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
IF d.source # NIL
THEN {d.stop[FALSE, NIL];
d.source ← NIL};
END;
ClientProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] =
BEGIN ENABLE UNWIND => NULL;
d: StreamData ← NARROW[self.data];
RETURN[d.source=NIL];
END;
END.