<> <> DIRECTORY Atom, IO, Process, RoseSets, RoseTypes, RoseValueSequences; RoseValueSequencePipesImpl: CEDAR MONITOR LOCKS pd USING pd: PipeData IMPORTS IO, Process EXPORTS RoseValueSequences = { OPEN RoseTypes, RoseSets, RoseValueSequences; PipeData: TYPE = REF PipeDataRep; PipeDataRep: TYPE = MONITORED RECORD [ header: Header _ NIL, curAvail: INT _ -1, curConsumed: INT _ -1, firstGet: BOOL _ TRUE, knownEnd: BOOL _ TRUE, <> change: CONDITION ]; NumChars: INT = CHAR.LAST.ORD.SUCC - CHAR.FIRST.ORD; outPipeProcs: REF IO.StreamProcs _ IO.CreateStreamProcs[ variety: output, class: $RoseValueSequencePipeOut, putChar: PipePut, flush: PipeFlush, reset: OutPipeReset, close: OutPipeClose, getIndex: OutPipeGetIndex]; inPipeProcs: REF IO.StreamProcs _ IO.CreateStreamProcs[ variety: input, class: $RoseValueSequencePipeIn, getChar: PipeGet, endOf: PipeEndOf, charsAvail: PipeCharsAvail, backup: PipeBackup, reset: InPipeReset, close: InPipeClose, getIndex: InPipeGetIndex]; PipeGet: PROC [self: STREAM] RETURNS [c: CHAR] = { EntryGet: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.EndOfStream[self]; IF pd.firstGet THEN pd.firstGet _ FALSE ELSE pd.curConsumed _ pd.curConsumed + 1; BROADCAST pd.change; DO SELECT pd.curAvail - pd.curConsumed FROM 0 => IF pd.knownEnd THEN ERROR IO.EndOfStream[self] ELSE WAIT pd.change; 1 => EXIT; ENDCASE => ERROR IO.Error[Failure, self]; ENDLOOP; c _ 0C + (pd.curAvail MOD NumChars); }; pd: PipeData _ NARROW[self.streamData]; EntryGet[pd]; }; PipePut: PROC [self: STREAM, char: CHAR] = { EntryPut: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self]; pd.curAvail _ pd.curAvail + 1; BROADCAST pd.change; DO SELECT pd.curAvail - pd.curConsumed FROM 0 => EXIT; 1 => WAIT pd.change; ENDCASE => ERROR IO.Error[Failure, self]; ENDLOOP; }; pd: PipeData _ NARROW[self.streamData]; EntryPut[pd]; }; PipeFlush: PROC [self: STREAM] = { EntryFlush: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self]; DO SELECT pd.curAvail - pd.curConsumed FROM 0 => EXIT; 1 => WAIT pd.change; ENDCASE => ERROR IO.Error[Failure, self]; ENDLOOP; }; pd: PipeData _ NARROW[self.streamData]; EntryFlush[pd]; }; PipeEndOf: PROC [self: STREAM] RETURNS [atEnd: BOOL] = { EntryEndOf: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; UNTIL pd.curAvail > pd.curConsumed OR pd.knownEnd DO WAIT pd.change ENDLOOP; atEnd _ NOT pd.curAvail > pd.curConsumed; }; pd: PipeData _ NARROW[self.streamData]; EntryEndOf[pd]; }; PipeCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [n: INT] = { EntryCharsAvail: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF wait THEN UNTIL pd.curAvail > pd.curConsumed DO WAIT pd.change ENDLOOP; n _ pd.curAvail - pd.curConsumed; }; pd: PipeData _ NARROW[self.streamData]; EntryCharsAvail[pd]; }; PipeBackup: PROC [self: STREAM, char: CHAR] = {ERROR IO.Error[NotImplementedForThisStream, self]}; OutPipeReset: PROC [self: STREAM] = { EntryReset: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self]; pd.curAvail _ -1; BROADCAST pd.change; }; pd: PipeData _ NARROW[self.streamData]; EntryReset[pd]; }; InPipeReset: PROC [self: STREAM] = { EntryReset: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self]; pd.curConsumed _ -1; BROADCAST pd.change; }; pd: PipeData _ NARROW[self.streamData]; EntryReset[pd]; }; OutPipeClose: PROC [self: STREAM, abort: BOOL _ FALSE] = { EntryClose: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; pd.knownEnd _ TRUE; BROADCAST pd.change; }; pd: PipeData _ NARROW[self.streamData]; EntryClose[pd]; }; InPipeClose: PROC [self: STREAM, abort: BOOL _ FALSE] = { EntryClose: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; pd.curConsumed _ pd.curConsumed + 1; BROADCAST pd.change; }; pd: PipeData _ NARROW[self.streamData]; EntryClose[pd]; }; OutPipeGetIndex: PROC [self: STREAM] RETURNS [index: INT] = { EntryGetIndex: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; index _ pd.curAvail; }; pd: PipeData _ NARROW[self.streamData]; EntryGetIndex[pd]; }; InPipeGetIndex: PROC [self: STREAM] RETURNS [index: INT] = { EntryGetIndex: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; index _ pd.curConsumed; }; pd: PipeData _ NARROW[self.streamData]; EntryGetIndex[pd]; }; CreatePipe: PUBLIC PROC RETURNS [p: Producer, c: Consumer] = { pd: PipeData _ NEW [PipeDataRep _ []]; ps: STREAM _ IO.CreateStream[inPipeProcs, pd]; cs: STREAM _ IO.CreateStream[outPipeProcs, pd]; TRUSTED { Process.InitializeCondition[@pd.change, Process.SecondsToTicks[10]]; Process.EnableAborts[@pd.change]; }; p _ NEW [ProducerRep _ [pd, PipeGiveHeader, ps]]; c _ NEW [ConsumerRep _ [pd, PipeTakeHeader, cs]]; }; PipeGiveHeader: PROC [data: REF ANY] RETURNS [header: Header] = { EntryGiveHeader: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; WHILE pd.header = NIL DO WAIT pd.change ENDLOOP; header _ pd.header; }; pd: PipeData _ NARROW[data]; EntryGiveHeader[pd]; }; PipeTakeHeader: PROC [data: REF ANY, header: Header] = { EntryTakeHeader: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.header # NIL THEN ERROR; pd.header _ header; BROADCAST pd.change; }; pd: PipeData _ NARROW[data]; IF header = NIL THEN ERROR; EntryTakeHeader[pd]; }; Copy: PUBLIC PROC [from: Producer, to: Consumer] = { to.takeHeader[to.data, from.giveHeader[from.data]]; DO to.synch.PutChar[from.synch.GetChar[!IO.EndOfStream => EXIT]]; ENDLOOP; }; }.