DIRECTORY IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM], IOClasses; IOPipeImpl: CEDAR MONITOR IMPORTS IO EXPORTS IOClasses = BEGIN STREAM: TYPE = IO.STREAM; Data: TYPE = RECORD [ buffer: REF TEXT, pullPos: NAT _ 0, pushPos: NAT _ 0, pullerClosed: BOOL _ FALSE, pusherClosed: BOOL _ FALSE, pipeChange: CONDITION ]; DataHandle: TYPE = REF Data; CreatePipe: PUBLIC ENTRY PROC [bufferSize: NAT] RETURNS [push, pull: STREAM] = { ENABLE UNWIND => NULL; common: DataHandle _ NEW[Data _ [buffer: NEW[TEXT[bufferSize]]]]; common.buffer.length _ common.buffer.maxLength; push _ IO.CreateStream[ pushProcs, common ]; pull _ IO.CreateStream[ pullProcs, common ]; }; PullGetChar: ENTRY PROC [self: STREAM] RETURNS [c: CHAR] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; DO IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; IF data.pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN ERROR IO.EndOfStream[self]; WAIT data.pipeChange; ENDLOOP; data.pullPos _ data.pullPos + 1; IF data.pullPos = data.buffer.length THEN data.pullPos _ 0; c _ data.buffer[data.pullPos]; BROADCAST data.pipeChange; }; AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE { RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]]; }; PullGetBlock: ENTRY PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; stopIndexPlusOne: NAT _ AddNAT[startIndex, count]; stopIndexPlusOne _ MIN[ stopIndexPlusOne, block.length ]; DO IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; IF data.pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN ERROR IO.EndOfStream[self]; WAIT data.pipeChange; ENDLOOP; IF data.pullPos < data.pushPos THEN { nBytesRead _ MIN[ data.pushPos - data.pullPos, stopIndexPlusOne - startIndex ]; FOR i: INT IN (data.pullPos..data.pullPos+nBytesRead] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; data.pullPos _ data.pullPos + nBytesRead; } ELSE { nBytesRead _ MIN[ data.buffer.maxLength - 1 - data.pullPos, stopIndexPlusOne - startIndex ]; FOR i: INT IN (data.pullPos..data.pullPos+nBytesRead] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; data.pullPos _ data.pullPos + nBytesRead; IF startIndex < stopIndexPlusOne THEN { data.pullPos _ MIN[ data.pushPos, stopIndexPlusOne - startIndex - 1]; nBytesRead _ nBytesRead + data.pullPos + 1; FOR i: INT IN [0..data.pullPos] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; }; }; BROADCAST data.pipeChange; }; PullCharsAvail: ENTRY PROC [self: STREAM, wait: BOOL] RETURNS [INT] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; diff: INT; DO IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; diff _ LONG[data.pushPos] - LONG[data.pullPos]; IF diff > 0 THEN RETURN [diff]; IF diff < 0 THEN RETURN [LONG[data.buffer.maxLength] + diff]; IF data.pusherClosed THEN RETURN [1]; IF NOT wait THEN RETURN [0]; WAIT data.pipeChange; ENDLOOP; }; PullEndOf: ENTRY PROC [self: STREAM] RETURNS [eof: BOOL] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; RETURN [data.pushPos = data.pullPos AND data.pusherClosed]; }; PullClose: ENTRY PROC [self: STREAM, abort: BOOL] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; data.pullerClosed _ TRUE; IF data.pushPos # data.pullPos AND ~abort THEN ERROR IO.Error[StreamClosed, self]; BROADCAST data.pipeChange; }; PushPutChar: ENTRY PROC [self: STREAM, char: CHAR] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; IF data.pusherClosed THEN ERROR IO.Error[StreamClosed, self]; IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; WHILE TRUE DO data.pushPos _ data.pushPos + 1; IF data.pushPos = data.buffer.length THEN data.pushPos _ 0; IF data.pushPos # data.pullPos THEN EXIT; IF data.pushPos = 0 THEN data.pushPos _ data.buffer.length; data.pushPos _ data.pushPos - 1; WAIT data.pipeChange; ENDLOOP; data.buffer[data.pushPos] _ char; BROADCAST data.pipeChange; }; PushPutBlock: ENTRY PROC[self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] = { ENABLE UNWIND => NULL; WaitForSpace: INTERNAL PROC [] = { WHILE data.pushPos + 1 = data.pullPos OR (data.pullPos = 0 AND data.pushPos = data.buffer.length - 1) DO WAIT data.pipeChange; ENDLOOP; }; data: DataHandle _ NARROW[self.streamData]; numChars: INT; stopIndexPlusOne: NAT _ AddNAT[startIndex, count]; stopIndexPlusOne _ MIN[ stopIndexPlusOne, block.length ]; WHILE startIndex < stopIndexPlusOne DO IF data.pusherClosed THEN ERROR IO.Error[StreamClosed, self]; IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; WaitForSpace[]; data.pushPos _ data.pushPos + 1; IF data.pushPos = data.buffer.length THEN data.pushPos _ 0; IF data.pushPos > data.pullPos THEN numChars _ MIN[ data.buffer.length-data.pushPos, stopIndexPlusOne-startIndex ] ELSE numChars _ MIN[ data.pullPos-data.pushPos, stopIndexPlusOne-startIndex ]; FOR i: INT IN [data.pushPos..data.pushPos+numChars-1] DO data.buffer[i] _ block[startIndex]; startIndex _ startIndex+1; ENDLOOP; data.pushPos _ data.pushPos+numChars-1; ENDLOOP; BROADCAST data.pipeChange; }; PushFlush: ENTRY PROC [self: STREAM] = { ENABLE UNWIND => NULL; PushFlusher[self]; }; PushFlusher: INTERNAL PROC [self: STREAM] = { data: DataHandle _ NARROW[self.streamData]; WHILE data.pushPos # data.pullPos DO IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; WAIT data.pipeChange; ENDLOOP; BROADCAST data.pipeChange; }; PushReset: ENTRY PROC [self: STREAM] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; data.pushPos _ data.pullPos _ 0; BROADCAST data.pipeChange; }; PushClose: ENTRY PROC [self: STREAM, abort: BOOL _ FALSE] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; IF data.pullerClosed AND data.pushPos # data.pullPos THEN ERROR IO.Error[StreamClosed, self]; IF ~abort THEN PushFlusher[self]; data.pusherClosed _ TRUE; BROADCAST data.pipeChange; }; pushProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $output, class: $Pipe, putChar: PushPutChar, putBlock: PushPutBlock, flush: PushFlush, reset: PushReset, close: PushClose ]; pullProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $input, class: $Pipe, getChar: PullGetChar, getBlock: PullGetBlock, endOf: PullEndOf, charsAvail: PullCharsAvail, close: PullClose ]; END. CHANGE LOG Created by Nix on September 9, 1983 3:04 pm Changed by MBrown on October 25, 1983 4:52 pm Converted to Cedar 5.0; exports IOClasses.CreatePipe instead of Pipe.Open. Did minimal conversions of block operations; these should be reworked to use count instead of stopIndexPlusOne. Did substantial conversion of PullerCharsAvail. Bug in PullEndOf: returned true with stuff in buffer. BIOPipeImpl.mesa -- Last Edited by -- Nix on September 13, 1983 4:58 pm -- MBrown on January 13, 1984 2:21 pm Circular buffer that holds the characters in the middle of the pipe. Position of the puller's (reader's) cursor into the buffer. The puller increments pullPos and then grabs that character. Position of the pusher's (writer's) cursor into the buffer. The pusher increments pushPos and then deposits the character there. The pipe is empty if pullPos = pushPos. TRUE iff the puller (reader) has closed the pipe. TRUE iff the pusher (writer) has closed the pipe. Condition raised if the state of the pipe has changed in any way. Creates a pipe and returns two streams that each access the ends of the pipe. The push stream is used to add data to the pipe; the pull stream is used to remove it. Returns the next character in the pipe, blocking until a character is available. Raises IO.EndOfStream ERROR if the pusher has closed the pipe. Raises IO.StreamClosed ERROR if the stream has already been closed. Reads a block from the pipe, returning the number of characters actually read. What should the approach be? For example, if there are 10 characters in the pipe, and the guy wants 50, do we give him the 10 or wait for the 50? I've decided to give him the 10. Returns TRUE iff the pusher has closed his end of the stream. Closes the puller's end of the stream. Raises ERROR IO.Error[StreamClosed ..] if there is something in the pipe (unless abort if true). Pushes a character onto the pipe, waiting until space is available in the buffer to hold it. Raises ERROR IO.Error[StreamClosed..] if the pusher has already closed the stream. Raises the same error if the puller has closed the stream and broken the pipe. Waits until the reader has read all of the characters that are currently in the pipe. Raises IO.Error[StreamClosed..] if the puller closes the file while there's stuff in the pipe. Clears out the pipe. Closes the pusher's end of the pipe. Flushes out the pipe iff ~abort. Raises IO.Error[StreamClosed..] if the puller has closed the stream and there is still something in it. Κ ˜Jšœl™lJ˜šΟk ˜ JšœœDœ˜TJ˜ —J˜šΟb œœ˜Jšœ˜ Jšœ ˜—Jš˜Jšœœœœ˜šœœœ˜šœœœ˜JšœD™D—šœ œ˜Jšœx™x—šœ œ˜Jšœͺ™ͺ—šœœœ˜Jšœ1™1—šœœœ˜Jšœ1™1—šœ  ˜JšœA™A—Jšœ˜—Jšœ œœ˜Iunit˜šΟn œœœœœœœ˜PJšœ₯™₯Jšœœœ˜Jšœœœœ˜AJ˜/Jšœœ#˜,Jšœœ#˜,J˜—š Ÿ œœœœœœ˜=JšœP™PJšœƒ™ƒJšœœœ˜Jšœœ˜+š˜Jšœœœœ˜=Jšœœœ˜)Jšœœœœ˜5Jšœ˜Jšœ˜—J˜ šœ#˜)J˜—J˜Jš œ˜J˜—J˜š Ÿœœœœœœ˜1Jš œœœœœœ˜0J˜J˜—šŸ œœœœ œœœ œ˜UIcodešœ œ˜JšœN™NJšœ³™³Lšœœœ˜Lšœœ˜+Lšœœ˜2Lšœœ#˜9š˜Lšœœœœ˜=Lšœœœ˜)Lšœœœœ˜5Lšœ˜Lšœ˜—šœœ˜%Lšœ œ?˜Ošœœœ)˜8L˜#L˜Lšœ˜—L˜)L˜—šœ˜Lšœ œL˜\šœœœ)˜8L˜#L˜Lšœ˜—L˜)šœœ˜'Lšœœ3˜EL˜+šœœœ˜"L˜#L˜Lšœ˜—L˜—Lšœ˜—Lš œ˜Lšœ˜—šŸœœœœœœœ˜HJšœœœ˜Jšœœ˜+Jšœœ˜ š˜Jšœœœœ˜=Jšœœœ˜/Jšœ œœ˜Jšœ  œœ ˜=Jšœœœ˜%Jšœœœœ˜Jšœ˜Jšœ˜—J˜—š Ÿ œœœœœœ˜=Jšœ=™=Jšœœœ˜Jšœœ˜+Jšœœ˜;J˜—š Ÿ œœœœ œ˜6Jšœ(™(Jšœ`™`Jšœœœ˜Jšœœ˜+Jšœœ˜Jš œœœœœ˜RJš œ˜J˜J˜—J˜š Ÿ œœœœœ˜7Jšœ\™\Jšœ’™’Jšœœœ˜Jšœœ˜+Jšœœœœ˜=Jšœœœœ˜=šœœ˜ J˜ Jšœ#œ˜;Jšœœœ˜)Jšœœ#˜;J˜ Jšœ˜Jšœ˜—J˜!Jš œ˜J˜—šŸ œœœœ œœœœ œ˜aJšœœœ˜šŸ œ œ˜"šœ!œœ(˜hJšœ˜Jšœ˜—J˜—Jšœœ˜+Jšœ œ˜Jšœœ˜2Jšœœ#˜9šœ˜&Jšœœœœ˜=Jšœœœœ˜=J˜J˜ Jšœ#œ˜;šœœ˜$Jšœ œ@˜N—š˜Jšœ œ;˜I—šœœœ)˜8J˜#J˜Jšœ˜—Jšœ'˜'Jšœ˜—Jš œ˜J˜—šŸ œœœœ˜(Jšœœœ˜Jšœ˜J˜—šŸ œœœœ˜-JšœU™UJšœ^™^Jšœœ˜+šœœ˜%Jšœœœœ˜=Jšœ˜Jšœ˜—Jš œ˜J˜—šŸ œœœœ˜(Jšœ™Jšœœœ˜Jšœœ˜+J˜ Jš œ˜J˜—š Ÿ œœœœ œœ˜>JšœF™FJšœg™gJšœœœ˜Jšœœ˜+Jš œœœœœ˜]Jšœœ˜!Jšœœ˜Jš œ˜J˜—K˜šœ œœœ˜5J˜Jšœ˜J˜Jšœ˜Jšœ˜Jšœ˜J˜—šœ œœœ˜5J˜Jšœ˜J˜Jšœ˜Jšœ˜Jšœ˜Jšœ˜—J˜Jšœ˜J˜Jšœ˜ J˜Jšœ,˜,šœ-˜-Jšœ£˜£——…—Ϊ-<