<> <> <<>> DIRECTORY RefAnyStream USING[Handle, ObjectProcs, ErrorCode]; RefAnyStreamImpl: MONITOR LOCKS NARROW[self.data, StreamData] USING self: RefAnyStream.Handle EXPORTS RefAnyStream = BEGIN <> <> <> <> <> <> <> Error: PUBLIC ERROR [ec: RefAnyStream.ErrorCode] = CODE; <> <> RefAnySource: TYPE = POINTER TO PORT[continue: BOOLEAN, aItem: REF ANY] RETURNS[more: BOOLEAN, bItem: REF ANY]; -- used by client RefAnyStopper: TYPE = PROCEDURE[continue: BOOLEAN, aItem: REF ANY]; -- used by client <> StreamData: TYPE = REF StreamDataBody; StreamDataBody: TYPE = MONITORED RECORD[ <> source: RefAnySource, stop: RefAnyStopper, <> itemReady: BOOLEAN, item: REF ANY]; ErrProc: PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = {ERROR Error[NotImplementedForThisStream]}; <> ClientConsumerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: ClientConsumerGet, put: ClientConsumerPut, putBack: ClientConsumerPutBack, flush: ClientConsumerFlush, close: ClientConsumerClose, endOf: ClientConsumerEndOf, empty: ErrProc]]; ClientConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source = NIL THEN ERROR Error[StreamClosed]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]}; [d.itemReady, d.item] _ d.source[TRUE, NIL]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]} ELSE {d.stop[FALSE, NIL]; d.source _ NIL; ERROR Error[StreamClosed]}; END; ClientConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented ClientConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.itemReady THEN RETURN[FALSE]; IF d.source = NIL THEN RETURN[TRUE]; [d.itemReady, d.item] _ d.source[TRUE, NIL]; RETURN[NOT d.itemReady] END; ClientConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {d.stop[FALSE, NIL]; d.source _ NIL}; END; <> ClientProducerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: ClientProducerGet, put: ClientProducerPut, putBack: ClientProducerPutBack, flush: ClientProducerFlush, close: ClientProducerClose, endOf: ClientProducerEndOf, empty: ErrProc]]; ClientProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {IF NOT (d.source[TRUE, event]).more THEN d.source _ NIL} ELSE ERROR Error[StreamClosed]; END; ClientProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {d.stop[FALSE, NIL]; d.source _ NIL}; END; ClientProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; RETURN[d.source=NIL]; END; END. <<>>