DIRECTORY Atom, IO, Process, RoseSets, RoseTypes, RoseValueSequences; RoseValueSequencePipesImpl: CEDAR MONITOR LOCKS pd USING pd: PipeData IMPORTS IO, Process EXPORTS RoseValueSequences = { OPEN RoseTypes, RoseSets, RoseValueSequences; PipeData: TYPE = REF PipeDataRep; PipeDataRep: TYPE = MONITORED RECORD [ header: Header _ NIL, curAvail: INT _ -1, curConsumed: INT _ -1, firstGet: BOOL _ TRUE, knownEnd: BOOL _ TRUE, change: CONDITION ]; NumChars: INT = CHAR.LAST.ORD.SUCC - CHAR.FIRST.ORD; outPipeProcs: REF IO.StreamProcs _ IO.CreateStreamProcs[ variety: output, class: $RoseValueSequencePipeOut, putChar: PipePut, flush: PipeFlush, reset: OutPipeReset, close: OutPipeClose, getIndex: OutPipeGetIndex]; inPipeProcs: REF IO.StreamProcs _ IO.CreateStreamProcs[ variety: input, class: $RoseValueSequencePipeIn, getChar: PipeGet, endOf: PipeEndOf, charsAvail: PipeCharsAvail, backup: PipeBackup, reset: InPipeReset, close: InPipeClose, getIndex: InPipeGetIndex]; PipeGet: PROC [self: STREAM] RETURNS [c: CHAR] = { EntryGet: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.EndOfStream[self]; IF pd.firstGet THEN pd.firstGet _ FALSE ELSE pd.curConsumed _ pd.curConsumed + 1; BROADCAST pd.change; DO SELECT pd.curAvail - pd.curConsumed FROM 0 => IF pd.knownEnd THEN ERROR IO.EndOfStream[self] ELSE WAIT pd.change; 1 => EXIT; ENDCASE => ERROR IO.Error[Failure, self]; ENDLOOP; c _ 0C + (pd.curAvail MOD NumChars); }; pd: PipeData _ NARROW[self.streamData]; EntryGet[pd]; }; PipePut: PROC [self: STREAM, char: CHAR] = { EntryPut: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self]; pd.curAvail _ pd.curAvail + 1; BROADCAST pd.change; DO SELECT pd.curAvail - pd.curConsumed FROM 0 => EXIT; 1 => WAIT pd.change; ENDCASE => ERROR IO.Error[Failure, self]; ENDLOOP; }; pd: PipeData _ NARROW[self.streamData]; EntryPut[pd]; }; PipeFlush: PROC [self: STREAM] = { EntryFlush: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self]; DO SELECT pd.curAvail - pd.curConsumed FROM 0 => EXIT; 1 => WAIT pd.change; ENDCASE => ERROR IO.Error[Failure, self]; ENDLOOP; }; pd: PipeData _ NARROW[self.streamData]; EntryFlush[pd]; }; PipeEndOf: PROC [self: STREAM] RETURNS [atEnd: BOOL] = { EntryEndOf: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; UNTIL pd.curAvail > pd.curConsumed OR pd.knownEnd DO WAIT pd.change ENDLOOP; atEnd _ NOT pd.curAvail > pd.curConsumed; }; pd: PipeData _ NARROW[self.streamData]; EntryEndOf[pd]; }; PipeCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [n: INT] = { EntryCharsAvail: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF wait THEN UNTIL pd.curAvail > pd.curConsumed DO WAIT pd.change ENDLOOP; n _ pd.curAvail - pd.curConsumed; }; pd: PipeData _ NARROW[self.streamData]; EntryCharsAvail[pd]; }; PipeBackup: PROC [self: STREAM, char: CHAR] = {ERROR IO.Error[NotImplementedForThisStream, self]}; OutPipeReset: PROC [self: STREAM] = { EntryReset: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self]; pd.curAvail _ -1; BROADCAST pd.change; }; pd: PipeData _ NARROW[self.streamData]; EntryReset[pd]; }; InPipeReset: PROC [self: STREAM] = { EntryReset: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.knownEnd THEN ERROR IO.Error[StreamClosed, self]; pd.curConsumed _ -1; BROADCAST pd.change; }; pd: PipeData _ NARROW[self.streamData]; EntryReset[pd]; }; OutPipeClose: PROC [self: STREAM, abort: BOOL _ FALSE] = { EntryClose: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; pd.knownEnd _ TRUE; BROADCAST pd.change; }; pd: PipeData _ NARROW[self.streamData]; EntryClose[pd]; }; InPipeClose: PROC [self: STREAM, abort: BOOL _ FALSE] = { EntryClose: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; pd.curConsumed _ pd.curConsumed + 1; BROADCAST pd.change; }; pd: PipeData _ NARROW[self.streamData]; EntryClose[pd]; }; OutPipeGetIndex: PROC [self: STREAM] RETURNS [index: INT] = { EntryGetIndex: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; index _ pd.curAvail; }; pd: PipeData _ NARROW[self.streamData]; EntryGetIndex[pd]; }; InPipeGetIndex: PROC [self: STREAM] RETURNS [index: INT] = { EntryGetIndex: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; index _ pd.curConsumed; }; pd: PipeData _ NARROW[self.streamData]; EntryGetIndex[pd]; }; CreatePipe: PUBLIC PROC RETURNS [p: Producer, c: Consumer] = { pd: PipeData _ NEW [PipeDataRep _ []]; ps: STREAM _ IO.CreateStream[inPipeProcs, pd]; cs: STREAM _ IO.CreateStream[outPipeProcs, pd]; TRUSTED { Process.InitializeCondition[@pd.change, Process.SecondsToTicks[10]]; Process.EnableAborts[@pd.change]; }; p _ NEW [ProducerRep _ [pd, PipeGiveHeader, ps]]; c _ NEW [ConsumerRep _ [pd, PipeTakeHeader, cs]]; }; PipeGiveHeader: PROC [data: REF ANY] RETURNS [header: Header] = { EntryGiveHeader: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; WHILE pd.header = NIL DO WAIT pd.change ENDLOOP; header _ pd.header; }; pd: PipeData _ NARROW[data]; EntryGiveHeader[pd]; }; PipeTakeHeader: PROC [data: REF ANY, header: Header] = { EntryTakeHeader: ENTRY PROC [pd: PipeData] = { ENABLE UNWIND => {}; IF pd.header # NIL THEN ERROR; pd.header _ header; BROADCAST pd.change; }; pd: PipeData _ NARROW[data]; IF header = NIL THEN ERROR; EntryTakeHeader[pd]; }; Copy: PUBLIC PROC [from: Producer, to: Consumer] = { to.takeHeader[to.data, from.giveHeader[from.data]]; DO to.synch.PutChar[from.synch.GetChar[!IO.EndOfStream => EXIT]]; ENDLOOP; }; }. –RoseValueSequencePipesImpl.Mesa Last Edited by: Spreitzer, May 17, 1985 4:39:49 pm PDT Certainly at end if TRUE; might also be at end if FALSE. Κχ– "cedar" style˜Icode™K™6K˜KšΟk œœ3˜EK˜šΠbxœœ˜)Kšœœ ˜Kšœœ ˜Kšœ˜K˜Kšœ˜Kšœ)˜-K˜Kšœ œœ ˜!šœ œ œœ˜&Kšœœ˜Kšœ œ˜Kšœ œ˜Kšœ œœ˜šœ œœ˜K™8—Kšœ ˜K˜—K˜Kšœ œœœœœœœœ˜4K˜šœœœœ˜8K˜Kšœ!˜!K˜K˜K˜K˜K˜—K˜šœ œœœ˜7K˜Kšœ ˜ Kšœ˜K˜K˜K˜K˜K˜K˜—K˜š Οnœœœœœ˜2šŸœœœ˜'Kšœœ˜Kšœ œœœ˜/Kšœ œœœ%˜QKš œ ˜š˜šœ˜(Kš œœ œœœœœ ˜HKšœœ˜ Kšœœœ˜)—Kšœ˜—Kšœœ ˜$K˜—Kšœœ˜'K˜ K˜—K˜šŸœœœœ˜,šŸœœœ˜'Kšœœ˜Kšœ œœœ˜7Kšœ˜Kš œ ˜š˜šœ˜(Kšœœ˜ Kšœœ ˜Kšœœœ˜)—Kšœ˜—K˜—Kšœœ˜'K˜ K˜—K˜šŸ œœœ˜"šŸ œœœ˜)Kšœœ˜Kšœ œœœ˜7š˜šœ˜(Kšœœ˜ Kšœœ ˜Kšœœœ˜)—Kšœ˜—K˜—Kšœœ˜'K˜K˜—K˜š Ÿ œœœœ œ˜8šŸ œœœ˜)Kšœœ˜Kš œœ œœ œ˜LKšœœ˜)K˜—Kšœœ˜'Kšœ˜K˜—K˜š Ÿœœœœœœ˜DšŸœœœ˜.Kšœœ˜Kš œœœœœ œ˜JKšœ!˜!K˜—Kšœœ˜'Kšœ˜K˜—K˜šŸ œœœœ˜-Kšœœœ+˜4—K˜šŸ œœœ˜%šŸ œœœ˜)Kšœœ˜Kšœ œœœ˜7K˜Kš œ ˜K˜—Kšœœ˜'Kšœ˜K˜—K˜šŸ œœœ˜$šŸ œœœ˜)Kšœœ˜Kšœ œœœ˜7Kšœ˜Kš œ ˜K˜—Kšœœ˜'Kšœ˜K˜—K˜š Ÿ œœœ œœ˜:šŸ œœœ˜)Kšœœ˜Kšœœ˜Kš œ ˜K˜—Kšœœ˜'Kšœ˜K˜—K˜š Ÿ œœœ œœ˜9šŸ œœœ˜)Kšœœ˜Kšœ$˜$Kš œ ˜K˜—Kšœœ˜'Kšœ˜Kšœ˜—K˜š Ÿœœœœ œ˜=šŸ œœœ˜,Kšœœ˜K˜K˜—Kšœœ˜'Kšœ˜K˜—K˜š Ÿœœœœ œ˜<šŸ œœœ˜,Kšœœ˜K˜K˜—Kšœœ˜'Kšœ˜K˜—K˜šŸ œœœœ˜>Kšœœ˜&Kšœœœ˜.Kšœœœ ˜/šœ˜ KšœD˜DKšœ!˜!K˜—Kšœœ*˜1Kšœœ*˜1K˜—K˜š Ÿœœœœœ˜AšŸœœœ˜.Kšœœ˜Kš œ œœœ œ˜0K˜K˜—Kšœœ˜K˜K˜—K˜šŸœœœœ˜8šŸœœœ˜.Kšœœ˜Kšœ œœœ˜K˜Kš œ ˜K˜—Kšœœ˜Kšœ œœœ˜Kšœ˜K˜—K˜šŸœœœ#˜4K˜3š˜Kšœ%œœ˜>Kšœ˜—K˜—K˜K˜——…—¨5