DIRECTORY Process USING [EnableAborts], IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM], IOClasses; IOPipeImpl: CEDAR MONITOR LOCKS data USING data: DataHandle IMPORTS IO, Process EXPORTS IOClasses = BEGIN STREAM: TYPE = IO.STREAM; Data: TYPE = MONITORED RECORD [ buffer: REF TEXT, pullPos: NAT _ 0, pushPos: NAT _ 0, pullerClosed: BOOL _ FALSE, pusherClosed: BOOL _ FALSE, pipeChange: CONDITION ]; DataHandle: TYPE = REF Data; CreatePipe: PUBLIC PROC [bufferSize: NAT] RETURNS [push, pull: STREAM] = { ENABLE UNWIND => NULL; nChars: NAT _ SELECT bufferSize FROM < 2 => 2, > LAST[NAT]/2 => LAST[NAT]/2, ENDCASE => bufferSize; common: DataHandle _ NEW[Data _ [buffer: NEW[TEXT[nChars]]]]; common.buffer.length _ nChars; push _ IO.CreateStream[pushProcs, common]; pull _ IO.CreateStream[pullProcs, common]; TRUSTED {Process.EnableAborts[@common.pipeChange]}; }; PullGetChar: PROC [self: STREAM] RETURNS [c: CHAR _ 0C] = { innerGetChar: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF data.pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN RETURN WITH ERROR IO.EndOfStream[self]; WAIT data.pipeChange; ENDLOOP; data.pullPos _ data.pullPos + 1; IF data.pullPos = data.buffer.length THEN data.pullPos _ 0; c _ data.buffer[data.pullPos]; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerGetChar[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE { RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]]; }; PullGetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT _ 0] = { innerGetBlock: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; pullPos: NAT _ data.pullPos; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN RETURN WITH ERROR IO.EndOfStream[self]; WAIT data.pipeChange; pullPos _ data.pullPos; ENDLOOP; IF pullPos > data.pushPos AND delta > 0 THEN { nBytesRead _ MIN[data.buffer.maxLength - 1 - pullPos, delta]; FOR i: NAT IN (pullPos..pullPos+nBytesRead] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; pullPos _ pullPos + nBytesRead; delta _ delta - nBytesRead; IF pullPos = data.buffer.maxLength THEN pullPos _ 0; }; IF pullPos < data.pushPos AND delta > 0 THEN { nBytesRead _ nBytesRead + MIN[data.pushPos - pullPos, delta]; FOR i: NAT IN (pullPos..pullPos+nBytesRead] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; pullPos _ pullPos + nBytesRead; }; data.pullPos _ pullPos; block.length _ startIndex; BROADCAST data.pipeChange; }; stopIndexPlusOne: NAT _ MIN[AddNAT[startIndex, count], block.maxLength]; delta: NAT _ IF stopIndexPlusOne > startIndex THEN stopIndexPlusOne - startIndex ELSE 0; WITH self.streamData SELECT FROM data: DataHandle => innerGetBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [diff: INT _ 0] = { innerCharsAvail: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; diff _ LONG[data.pushPos] - LONG[data.pullPos]; IF diff > 0 THEN EXIT; IF diff < 0 THEN {diff _ diff + data.buffer.maxLength; EXIT}; IF data.pusherClosed THEN {diff _ 1; EXIT}; IF NOT wait THEN {diff _ 0; EXIT}; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerCharsAvail[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullEndOf: PROC [self: STREAM] RETURNS [eof: BOOL _ FALSE] = { innerEndOf: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; eof _ data.pushPos = data.pullPos AND data.pusherClosed; }; WITH self.streamData SELECT FROM data: DataHandle => innerEndOf[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullClose: PROC [self: STREAM, abort: BOOL] = { innerClose: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; data.pullerClosed _ TRUE; IF ~abort AND data.pushPos # data.pullPos THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; data.pullPos _ data.pushPos; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerClose[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushPutChar: PROC [self: STREAM, char: CHAR] = { innerPutChar: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; DO new: NAT _ data.pushPos + 1; IF new = data.buffer.length THEN new _ 0; IF new # data.pullPos THEN { data.pushPos _ new; data.buffer[data.pushPos] _ char; BROADCAST data.pipeChange; RETURN; }; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerPutChar[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushPutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex, count: NAT] = { innerPutBlock: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; numChars: INTEGER; WHILE startIndex < stopIndexPlusOne DO new: NAT _ data.pushPos + 1; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF new = data.buffer.length THEN new _ 0; IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP}; numChars _ MIN[stopIndexPlusOne-startIndex, IF new > data.pullPos THEN data.buffer.length-new ELSE data.pullPos-new]; FOR i: NAT IN [new..new+numChars-1] DO data.buffer[i] _ block[startIndex]; startIndex _ startIndex+1; ENDLOOP; data.pushPos _ new+numChars-1; BROADCAST data.pipeChange; ENDLOOP; }; stopIndexPlusOne: NAT _ AddNAT[startIndex, count]; stopIndexPlusOne _ MIN[stopIndexPlusOne, block.length]; WITH self.streamData SELECT FROM data: DataHandle => innerPutBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushFlush: PROC [self: STREAM] = { innerFlush: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; WHILE data.pushPos # data.pullPos DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerFlush[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushReset: PROC [self: STREAM] = { innerReset: ENTRY PROC [data: DataHandle] = INLINE { data.pushPos _ data.pullPos _ 0; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerReset[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushClose: PROC [self: STREAM, abort: BOOL _ FALSE] = { innerClose: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; IF NOT data.pusherClosed THEN { IF ~abort THEN WHILE data.pushPos # data.pullPos AND NOT data.pullerClosed DO WAIT data.pipeChange; ENDLOOP; IF data.pullerClosed AND data.pushPos # data.pullPos THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; data.pusherClosed _ TRUE; BROADCAST data.pipeChange; }; }; WITH self.streamData SELECT FROM data: DataHandle => innerClose[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; pushProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $output, class: $Pipe, putChar: PushPutChar, putBlock: PushPutBlock, flush: PushFlush, reset: PushReset, close: PushClose ]; pullProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $input, class: $Pipe, getChar: PullGetChar, getBlock: PullGetBlock, endOf: PullEndOf, charsAvail: PullCharsAvail, close: PullClose ]; END. zIOPipeImpl.mesa Copyright c 1985 by Xerox Corporation. All rights reserved. Russ Atkinson (RRA) January 22, 1986 8:50:55 pm PST Circular buffer that holds the characters in the middle of the pipe. Position of the puller's (reader's) cursor into the buffer. The puller increments pullPos and then grabs that character. Position of the pusher's (writer's) cursor into the buffer. The pusher increments pushPos and then deposits the character there. The pipe is empty if pullPos = pushPos. TRUE iff the puller (reader) has closed the pipe. TRUE iff the pusher (writer) has closed the pipe. Condition raised if the state of the pipe has changed in any way. Creates a pipe and returns two streams that each access the ends of the pipe. The push stream is used to add data to the pipe; the pull stream is used to remove it. Returns the next character in the pipe, blocking until a character is available. Raises IO.EndOfStream ERROR if the pusher has closed the pipe. Raises IO.StreamClosed ERROR if the stream has already been closed. Reads a block from the pipe, returning the number of characters actually read. What should the approach be? For example, if there are 10 characters in the pipe, and the guy wants 50, do we give him the 10 or wait for the 50? I've decided to give him the 10. The first run of bytes to get lie between pullPos and data.buffer.maxLength-1 There are bytes to read between pullPos and data.pushPos-1 Returns TRUE iff the pusher has closed his end of the stream. Closes the puller's end of the stream. Raises ERROR IO.Error[StreamClosed ..] if there is something in the pipe (unless abort if true). To avoid problems if/when the pusher closes Pushes a character onto the pipe, waiting until space is available in the buffer to hold it. Raises ERROR IO.Error[StreamClosed..] if the pusher has already closed the stream. Raises the same error if the puller has closed the stream and broken the pipe. There is room to place the character Clears out the pipe. Closes the pusher's end of the pipe. Flushes out the pipe iff ~abort. Raises IO.Error[StreamClosed..] if the puller has closed the stream and there is still something in it. Κ P˜codešœ™Kšœ Οmœ1™—Kšœžœžœžœ ˜=Kšœ˜Kšœžœ!˜*Kšœžœ!˜*Jšžœ,˜3K˜K˜—š   œžœžœžœžœ ˜;KšœP™PKšœƒ™ƒšœžœžœ˜/Kšžœžœžœ˜šž˜Kš žœžœžœžœžœžœ˜IKšžœžœžœ˜)Kš žœžœžœžœžœžœ˜AKšžœ˜Kšžœ˜—K˜ šžœ#ž˜)K˜—K˜Kšž œ˜K˜—šžœžœž˜ Kšœ'˜'Kšžœžœžœ˜)—K˜—K˜š  œžœžœžœžœžœ˜1Kš žœžœžœžœžœžœ˜0K˜K˜—š  œžœžœ žœžœžœ žœžœžœ ˜qKšœN™NKšœ³™³šœžœžœ˜0Kšžœžœžœ˜Kšœ žœ˜šž˜Kš žœžœžœžœžœžœ˜IKšžœžœžœ˜$Kš žœžœžœžœžœžœ˜AKšžœ˜Kšœ˜Kšžœ˜—šžœžœ žœ˜.KšœM™MKšœ žœ-˜=šžœžœžœž˜.K˜#K˜Kšžœ˜—K˜Kšœ˜Kšžœ!žœ ˜4K˜—šžœžœ žœ˜.Kšœ:™:Kšœžœ ˜=šžœžœžœž˜.K˜#K˜Kšžœ˜—K˜Kšœ˜—K˜K˜Kšž œ˜K˜—Kšœžœžœ"Οr œ˜HKš œžœžœžœžœ˜Xšžœžœž˜ Kšœ(˜(Kšžœžœžœ˜)—Kšœ˜K˜—š  œžœžœžœžœžœ ˜Kšœžœžœ˜2Kšžœžœžœ˜šž˜Kš žœžœžœžœžœžœ˜IKšœžœžœ˜/Kšžœ žœžœ˜Kšžœ žœ'žœ˜=Kšžœžœ žœ˜+Kšžœžœžœ žœ˜"Kšžœ˜Kšžœ˜—K˜—šžœžœž˜ Kšœ*˜*Kšžœžœžœ˜)—K˜K˜—š   œžœžœžœžœžœ˜>Kšœ=™=šœ žœžœ˜-Kšžœžœžœ˜Kšœ"žœ˜8K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜K˜—š  œžœžœ žœ˜/Kšœ(™(Kšœ`™`šœ žœžœ˜-Kšžœžœžœ˜Kšœžœ˜šžœžœž˜.Kšžœžœžœžœ˜/—šœ˜Kšœ+™+—Kšž œ˜K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜—K˜š  œžœžœžœ˜0Kšœ\™\Kšœ’™’šœžœžœ˜/Kšžœžœžœ˜šžœžœž˜.Kšžœžœžœžœ˜/—šž˜Kšœžœ˜Kšžœžœ ˜)šžœžœ˜Kšœ$™$Kšœ˜K˜!Kšž œ˜Kšžœ˜Kšœ˜—Kšžœ˜Kšžœ˜—K˜—šžœžœž˜ Kšœ'˜'Kšžœžœžœ˜)—K˜K˜—š  œžœžœ žœžœžœžœ˜Wšœžœžœ˜0Kšžœžœžœ˜Kšœ žœ˜šžœž˜&Kšœžœ˜šžœžœž˜.Kšžœžœžœžœ˜/—Kšžœžœ ˜)Kšžœžœžœžœ˜8šœ žœ˜+Kšžœžœžœ˜I—šžœžœžœž˜&K˜#K˜Kšžœ˜—Kšœ˜Kšž œ˜Kšžœ˜—K˜—Kšœžœ˜2Kšœžœ!˜7šžœžœž˜ Kšœ(˜(Kšžœžœžœ˜)—K˜K˜—š  œžœžœ˜"šœ žœžœ˜-Kšžœžœžœ˜šžœž˜$Kš žœžœžœžœžœžœ˜IKšžœ˜Kšžœ˜—K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜K˜—š  œžœžœ˜"Kšœ™šœ žœžœžœ˜4K˜ Kšž œ˜K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜K˜—š   œžœžœ žœžœ˜7KšœF™FKšœg™gšœ žœžœ˜-Kšžœžœžœ˜šžœžœžœ˜šžœž˜šžœžœžœž˜>Kšžœ˜Kšžœ˜——šžœžœž˜9Kšžœžœžœžœ˜/—Kšœžœ˜Kšž œ˜K˜—K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜—K˜šœ žœžœžœ˜5K˜Kšœ˜K˜Kšœ˜Kšœ˜Kšœ˜K˜K˜—šœ žœžœžœ˜5K˜Kšœ˜K˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜—K˜Kšžœ˜K˜K˜—…—21ό