DIRECTORY RefAnyStream USING[Handle, ObjectProcs, ErrorCode]; RefAnyStreamImpl: MONITOR LOCKS NARROW[self.data, StreamData] USING self: RefAnyStream.Handle EXPORTS RefAnyStream = BEGIN Error: PUBLIC ERROR [ec: RefAnyStream.ErrorCode] = CODE; RefAnySource: TYPE = POINTER TO PORT[continue: BOOLEAN, aItem: REF ANY] RETURNS[more: BOOLEAN, bItem: REF ANY]; -- used by client RefAnyStopper: TYPE = PROCEDURE[continue: BOOLEAN, aItem: REF ANY]; -- used by client StreamData: TYPE = REF StreamDataBody; StreamDataBody: TYPE = MONITORED RECORD[ source: RefAnySource, stop: RefAnyStopper, itemReady: BOOLEAN, item: REF ANY]; ErrProc: PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = {ERROR Error[NotImplementedForThisStream]}; ClientConsumerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: ClientConsumerGet, put: ClientConsumerPut, putBack: ClientConsumerPutBack, flush: ClientConsumerFlush, close: ClientConsumerClose, endOf: ClientConsumerEndOf, empty: ErrProc]]; ClientConsumerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source = NIL THEN ERROR Error[StreamClosed]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]}; [d.itemReady, d.item] _ d.source[TRUE, NIL]; IF d.itemReady THEN {d.itemReady _ FALSE; RETURN[d.item]} ELSE {d.stop[FALSE, NIL]; d.source _ NIL; ERROR Error[StreamClosed]}; END; ClientConsumerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientConsumerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; -- not implemented ClientConsumerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientConsumerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.itemReady THEN RETURN[FALSE]; IF d.source = NIL THEN RETURN[TRUE]; [d.itemReady, d.item] _ d.source[TRUE, NIL]; RETURN[NOT d.itemReady] END; ClientConsumerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {d.stop[FALSE, NIL]; d.source _ NIL}; END; ClientProducerStreamProcs: REF RefAnyStream.ObjectProcs _ NEW[RefAnyStream.ObjectProcs _ [get: ClientProducerGet, put: ClientProducerPut, putBack: ClientProducerPutBack, flush: ClientProducerFlush, close: ClientProducerClose, endOf: ClientProducerEndOf, empty: ErrProc]]; ClientProducerGet: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerPut: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {IF NOT (d.source[TRUE, event]).more THEN d.source _ NIL} ELSE ERROR Error[StreamClosed]; END; ClientProducerPutBack: ENTRY PROCEDURE[self: RefAnyStream.Handle, event: REF ANY] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerFlush: ENTRY PROCEDURE[self: RefAnyStream.Handle] = {ENABLE UNWIND => NULL; ERROR Error[NotImplementedForThisStream]}; ClientProducerClose: ENTRY PROCEDURE[self: RefAnyStream.Handle] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; IF d.source # NIL THEN {d.stop[FALSE, NIL]; d.source _ NIL}; END; ClientProducerEndOf: ENTRY PROCEDURE[self: RefAnyStream.Handle] RETURNS[BOOLEAN] = BEGIN ENABLE UNWIND => NULL; d: StreamData _ NARROW[self.data]; RETURN[d.source=NIL]; END; END. 8RefAnyStreamImpl.mesa Jlarson, July 10, 1985 4:24:53 pm PDT nomenclature source, stopper are associated with the client producer puts ref-anys consumer gets ref-anys client signals end by calling stopper. the producer will put ref-anys to the source errors port declarations, must follow a precise pattern specified by CoFork aItems go from client to co-fork, bItems go from co-fork to client the following record type is used by all stream types so that I can compile all in one module, so that the locks expression is uniform for all entry procedures (ugh!!!) note that some fields are unused in each case. following fields are used in the client stream following fields are used by the consumer RefAnyStream implementation seen by a client consumer RefAnyStream implementation seen by a client RefAny producer Ê1˜šœ™Icode™%—J™J˜šÏk ˜ Jšœ œ!˜3J˜—š œœœœœ˜^Jšœ˜Jš˜J˜šœ ™ Jšœ.™.Jšœ™Jšœ™J˜Jšœ&™&Jšœ,™,J˜——Jšœ™Jšœœœ œ˜8J˜JšœD™D˜JšœB™BJ˜J˜Jšœœœœœ œ œœœœ œœÏc˜Jš Ïn œœ œ œ œœž˜UJ˜JšœÙ™ÙJ˜Jšœ œœ˜&J˜šœœ œœ˜(šœ.™.J˜J˜J˜—šœ)™)Jšœ œ˜Jšœœœ˜J˜šŸœ œœœ˜@Jšœœ%˜+J˜——J˜—Jšœ5™5J˜J˜JšœœœÒ˜J˜š Ÿœœ œœœœ˜PJšœœœœ˜Jšœœ ˜"Jšœ œœœ˜1Jšœ œœœ ˜:Jšœ!œœ˜,šœ ˜Jšœœœ ˜*šœ œœ˜Jšœ œ˜Jšœ˜——Jšœ˜J˜—š Ÿœœ œ#œœ˜OJš œœœœœ%˜BJ˜—š Ÿœœ œ#œœ˜SJš œœœœœ&ž˜UJ˜—šŸœœ œ˜AJš œœœœœ%˜BJ˜—š Ÿœœ œœœ˜RJšœœœœ˜Jšœœ ˜"Jšœ œœœ˜"Jš œ œœœœ˜$Jšœ!œœ˜,Jšœœ ˜Jšœ˜J˜—šŸœœ œ˜AJšœœœœ˜Jšœœ ˜"šœ ˜šœ œœ˜Jšœ œ˜——Jšœ˜J˜J˜——Jšœ<™