DIRECTORY Process USING [EnableAborts], IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM, UnsafeBlock], IOClasses, Basics USING [ByteBlt]; IOPipeImpl: CEDAR MONITOR LOCKS data USING data: DataHandle IMPORTS IO, Basics, Process EXPORTS IOClasses = BEGIN STREAM: TYPE = IO.STREAM; Data: TYPE = MONITORED RECORD [ buffer: REF TEXT, pullPos: NAT ¬ 0, pushPos: NAT ¬ 0, pullerClosed: BOOL ¬ FALSE, pusherClosed: BOOL ¬ FALSE, pipeChange: CONDITION ]; DataHandle: TYPE = REF Data; CreatePipe: PUBLIC PROC [bufferSize: NAT] RETURNS [push, pull: STREAM] = { ENABLE UNWIND => NULL; nChars: NAT ¬ SELECT bufferSize FROM < 2 => 2, > LAST[NAT]/2 => LAST[NAT]/2, ENDCASE => bufferSize; common: DataHandle ¬ NEW[Data ¬ [buffer: NEW[TEXT[nChars]]]]; common.buffer.length ¬ nChars; push ¬ IO.CreateStream[pushProcs, common]; pull ¬ IO.CreateStream[pullProcs, common]; TRUSTED {Process.EnableAborts[@common.pipeChange]}; }; PullGetChar: PROC [self: STREAM] RETURNS [c: CHAR ¬ 0C] = { innerGetChar: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF data.pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN RETURN WITH ERROR IO.EndOfStream[self]; WAIT data.pipeChange; ENDLOOP; data.pullPos ¬ data.pullPos + 1; IF data.pullPos = data.buffer.length THEN data.pullPos ¬ 0; c ¬ data.buffer[data.pullPos]; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerGetChar[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE { RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]]; }; PullGetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT ¬ 0] = { innerGetBlock: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; pullPos: NAT ¬ data.pullPos; WHILE delta > 0 DO DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN RETURN; WAIT data.pipeChange; pullPos ¬ data.pullPos; ENDLOOP; IF pullPos > data.pushPos THEN { nBytes: NAT ~ MIN[data.buffer.maxLength - 1 - pullPos, delta]; FOR i: NAT IN (pullPos..pullPos+nBytes] DO block[startIndex] ¬ data.buffer[i]; startIndex ¬ startIndex + 1; ENDLOOP; pullPos ¬ pullPos + nBytes; nBytesRead ¬ nBytesRead + nBytes; delta ¬ delta - nBytes; }; IF pullPos = data.buffer.maxLength - 1 AND delta > 0 THEN { block[startIndex] ¬ data.buffer[pullPos ¬ 0]; startIndex ¬ startIndex + 1; nBytesRead ¬ nBytesRead + 1; delta ¬ delta - 1; }; IF pullPos < data.pushPos AND delta > 0 THEN { nBytes: NAT ~ MIN[data.pushPos - pullPos, delta]; FOR i: NAT IN (pullPos..pullPos+nBytes] DO block[startIndex] ¬ data.buffer[i]; startIndex ¬ startIndex + 1; ENDLOOP; pullPos ¬ pullPos + nBytes; nBytesRead ¬ nBytesRead + nBytes; delta ¬ delta - nBytes; }; data.pullPos ¬ pullPos; block.length ¬ startIndex; BROADCAST data.pipeChange; ENDLOOP; }; stopIndexPlusOne: NAT ¬ MIN[AddNAT[startIndex, count], block.maxLength]; delta: NAT ¬ IF stopIndexPlusOne > startIndex THEN stopIndexPlusOne - startIndex ELSE 0; WITH self.streamData SELECT FROM data: DataHandle => innerGetBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [diff: INT ¬ 0] = { innerCharsAvail: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; diff ¬ INT[data.pushPos] - INT[data.pullPos]; IF diff > 0 THEN EXIT; IF diff < 0 THEN {diff ¬ diff + data.buffer.maxLength; EXIT}; IF data.pusherClosed THEN {diff ¬ 1; EXIT}; IF NOT wait THEN {diff ¬ 0; EXIT}; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerCharsAvail[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullEndOf: PROC [self: STREAM] RETURNS [eof: BOOL ¬ FALSE] = { innerEndOf: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; eof ¬ data.pushPos = data.pullPos AND data.pusherClosed; }; WITH self.streamData SELECT FROM data: DataHandle => innerEndOf[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullClose: PROC [self: STREAM, abort: BOOL] = { innerClose: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; data.pullerClosed ¬ TRUE; IF ~abort AND data.pushPos # data.pullPos THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; data.pullPos ¬ data.pushPos; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerClose[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushPutChar: PROC [self: STREAM, char: CHAR] = { innerPutChar: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; DO new: NAT ¬ data.pushPos + 1; IF new = data.buffer.length THEN new ¬ 0; IF new # data.pullPos THEN { data.pushPos ¬ new; data.buffer[data.pushPos] ¬ char; BROADCAST data.pipeChange; RETURN; }; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerPutChar[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushPutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex, count: NAT] = { innerPutBlock: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; numChars: INTEGER; WHILE startIndex < stopIndexPlusOne DO new: NAT ¬ data.pushPos + 1; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF new = data.buffer.length THEN new ¬ 0; IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP}; numChars ¬ MIN[stopIndexPlusOne-startIndex, IF new > data.pullPos THEN data.buffer.length-new ELSE data.pullPos-new]; FOR i: NAT IN [new..new+numChars-1] DO data.buffer[i] ¬ block[startIndex]; startIndex ¬ startIndex+1; ENDLOOP; data.pushPos ¬ new+numChars-1; BROADCAST data.pipeChange; ENDLOOP; }; stopIndexPlusOne: NAT ¬ AddNAT[startIndex, count]; stopIndexPlusOne ¬ MIN[stopIndexPlusOne, block.length]; WITH self.streamData SELECT FROM data: DataHandle => innerPutBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushUnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] = { startIndex: NAT ¬ block.startIndex; innerPutBlock: ENTRY PROC [data: DataHandle] = TRUSTED { ENABLE UNWIND => NULL; numChars: INTEGER; bufferBase: LONG POINTER ¬ LOOPHOLE[data.buffer, LONG POINTER] + SIZE[TEXT[0]]; WHILE startIndex < stopIndexPlusOne DO new: NAT ¬ data.pushPos + 1; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF new = data.buffer.length THEN new ¬ 0; IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP}; numChars ¬ Basics.ByteBlt[ to: [bufferBase, new, IF new > data.pullPos THEN data.buffer.length ELSE data.pullPos], from: [block.base, startIndex, stopIndexPlusOne] ]; data.pushPos ¬ new+numChars-1; startIndex ¬ startIndex+numChars; BROADCAST data.pipeChange; ENDLOOP; }; stopIndexPlusOne: NAT ¬ AddNAT[startIndex, block.count]; WITH self.streamData SELECT FROM data: DataHandle => innerPutBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushFlush: PROC [self: STREAM] = { innerFlush: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; WHILE data.pushPos # data.pullPos DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerFlush[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushReset: PROC [self: STREAM] = { innerReset: ENTRY PROC [data: DataHandle] = INLINE { data.pushPos ¬ data.pullPos ¬ 0; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerReset[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushClose: PROC [self: STREAM, abort: BOOL ¬ FALSE] = { innerClose: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; IF NOT data.pusherClosed THEN { IF ~abort THEN WHILE data.pushPos # data.pullPos AND NOT data.pullerClosed DO WAIT data.pipeChange; ENDLOOP; IF data.pullerClosed AND data.pushPos # data.pullPos THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; data.pusherClosed ¬ TRUE; BROADCAST data.pipeChange; }; }; WITH self.streamData SELECT FROM data: DataHandle => innerClose[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; pushProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $output, class: $Pipe, putChar: PushPutChar, putBlock: PushPutBlock, unsafePutBlock: PushUnsafePutBlock, flush: PushFlush, reset: PushReset, close: PushClose ]; pullProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $input, class: $Pipe, getChar: PullGetChar, getBlock: PullGetBlock, endOf: PullEndOf, charsAvail: PullCharsAvail, close: PullClose ]; END. ΐIOPipeImpl.mesa Copyright Σ 1985, 1986, 1990, 1991 by Xerox Corporation. All rights reserved. Russ Atkinson (RRA) January 22, 1986 8:50:55 pm PST Mike Spreitzer February 27, 1987 1:32:36 pm PST Carl Hauser, December 7, 1988 3:19:19 pm PST JKF, September 12, 1990 3:06:01 pm PDT Circular buffer that holds the characters in the middle of the pipe. Position of the puller's (reader's) cursor into the buffer. The puller increments pullPos and then grabs that character. Position of the pusher's (writer's) cursor into the buffer. The pusher increments pushPos and then deposits the character there. The pipe is empty if pullPos = pushPos. TRUE iff the puller (reader) has closed the pipe. TRUE iff the pusher (writer) has closed the pipe. Condition raised if the state of the pipe has changed in any way. Creates a pipe and returns two streams that each access the ends of the pipe. The push stream is used to add data to the pipe; the pull stream is used to remove it. Returns the next character in the pipe, blocking until a character is available. Raises IO.EndOfStream ERROR if the pusher has closed the pipe. Raises IO.StreamClosed ERROR if the stream has already been closed. Reads a block from the pipe, returning the number of characters actually read. What should the approach be? For example, if there are 10 characters in the pipe, and the guy wants 50, do we give him the 10 or wait for the 50? IO.mesa and IODoc.Tioga clearly say to wait for the 50, so that's what we do. The first run of bytes to get lie between pullPos and data.buffer.maxLength-1 The first byte in the buffer should be read. There are bytes to read between pullPos and data.pushPos-1 Returns TRUE iff the pusher has closed his end of the stream. Closes the puller's end of the stream. Raises ERROR IO.Error[StreamClosed ..] if there is something in the pipe (unless abort if true). To avoid problems if/when the pusher closes Pushes a character onto the pipe, waiting until space is available in the buffer to hold it. Raises ERROR IO.Error[StreamClosed..] if the pusher has already closed the stream. Raises the same error if the puller has closed the stream and broken the pipe. There is room to place the character Clears out the pipe. Closes the pusher's end of the pipe. Flushes out the pipe iff ~abort. Raises IO.Error[StreamClosed..] if the puller has closed the stream and there is still something in it. Carl Hauser, November 19, 1987 6:16:54 pm PST Added PushUnsafePutBlock., IOPipeImpl Κ:•NewlineDelimiter –(cedarcode) style˜codešœ™Kšœ ΟeœC™NK™3K™/K™,K™&—K˜šΟk ˜ Kšœžœ˜KšžœžœDžœ˜aK˜ K˜—K˜šΟb œžœž˜Kšžœžœ˜!Kšžœžœ˜Kšžœ ž˜Kšžœžœžœžœ˜šœžœž œžœ˜šœžœžœ˜KšœD™D—šœ žœ˜Kšœx™x—šœ žœ˜Kšœͺ™ͺ—šœžœžœ˜Kšœ1™1—šœžœžœ˜Kšœ1™1—šœ ž ˜KšœA™A—Kšœ˜—Kšœ žœžœ˜—K˜š Οn œžœžœžœžœžœ˜JKšœ₯™₯Kšžœžœžœ˜šœžœžœ ž˜$Kš œ žœžœžœžœžœ˜>—Kšœžœžœžœ ˜=Kšœ˜Kšœžœ!˜*Kšœžœ!˜*Kšžœ,˜3K˜K˜—š   œžœžœžœžœ ˜;KšœP™PKšœƒ™ƒšœžœžœ˜/Kšžœžœžœ˜šž˜Kš žœžœžœžœžœžœ˜IKšžœžœžœ˜)Kš žœžœžœžœžœžœ˜AKšžœ˜Kšžœ˜—K˜ šžœ#ž˜)K˜—K˜Kšž œ˜K˜—šžœžœž˜ Kšœ'˜'Kšžœžœžœ˜)—K˜—K˜š  œžœžœžœžœžœ˜1Kš žœžœžœžœžœžœ˜0K˜K˜—š  œžœžœ žœžœžœ žœžœžœ ˜qKšœN™NKšœΰ™ΰšœžœžœ˜0Kšžœžœžœ˜Kšœ žœ˜šžœ ž˜šž˜Kš žœžœžœžœžœžœ˜IKšžœžœžœ˜$Kšžœžœžœ˜!Kšžœ˜Kšœ˜Kšžœ˜—šžœžœ˜ KšœM™MKšœžœžœ-˜>šžœžœžœž˜*K˜#K˜Kšžœ˜—Kšœ˜Kšœ!˜!Kšœ˜K˜—šžœ%žœ žœ˜;K™,Kšœ-˜-K˜Kšœ˜Kšœ˜K˜—šžœžœ žœ˜.Kšœ:™:Kšœžœžœ ˜1šžœžœžœž˜*K˜#K˜Kšžœ˜—Kšœ˜Kšœ!˜!Kšœ˜Kšœ˜—K˜K˜Kšž œ˜Kšžœ˜—K˜—Kšœžœžœ"Οr œ˜HKš œžœžœžœžœ˜Xšžœžœž˜ Kšœ(˜(Kšžœžœžœ˜)—Kšœ˜K˜—š  œžœžœžœžœžœ ˜Kšœžœžœ˜2Kšžœžœžœ˜šž˜Kš žœžœžœžœžœžœ˜IKšœžœžœ˜-Kšžœ žœžœ˜Kšžœ žœ'žœ˜=Kšžœžœ žœ˜+Kšžœžœžœ žœ˜"Kšžœ˜Kšžœ˜—K˜—šžœžœž˜ Kšœ*˜*Kšžœžœžœ˜)—K˜K˜—š   œžœžœžœžœžœ˜>Kšœ=™=šœ žœžœ˜-Kšžœžœžœ˜Kšœ"žœ˜8K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜K˜—š  œžœžœ žœ˜/Kšœ(™(Kšœ`™`šœ žœžœ˜-Kšžœžœžœ˜Kšœžœ˜šžœžœž˜.Kšžœžœžœžœ˜/—šœ˜Kšœ+™+—Kšž œ˜K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜—K˜š  œžœžœžœ˜0Kšœ\™\Kšœ’™’šœžœžœ˜/Kšžœžœžœ˜šžœžœž˜.Kšžœžœžœžœ˜/—šž˜Kšœžœ˜Kšžœžœ ˜)šžœžœ˜Kšœ$™$Kšœ˜K˜!Kšž œ˜Kšžœ˜Kšœ˜—Kšžœ˜Kšžœ˜—K˜—šžœžœž˜ Kšœ'˜'Kšžœžœžœ˜)—K˜K˜—š  œžœžœ žœžœžœžœ˜Wšœžœžœ˜0Kšžœžœžœ˜Kšœ žœ˜šžœž˜&Kšœžœ˜šžœžœž˜.Kšžœžœžœžœ˜/—Kšžœžœ ˜)Kšžœžœžœžœ˜8šœ žœ˜+Kšžœžœžœ˜I—šžœžœžœž˜&K˜#K˜Kšžœ˜—Kšœ˜Kšž œ˜Kšžœ˜—K˜—Kšœžœ˜2Kšœžœ!˜7šžœžœž˜ Kšœ(˜(Kšžœžœžœ˜)—K˜K˜—š œžœžœ˜BKšœ žœ˜#šœžœžœžœ˜8Kšžœžœžœ˜Kšœ žœ˜Kšœ žœžœžœžœžœžœžœ˜Ošžœž˜&Kšœžœ˜šžœžœž˜.Kšžœžœžœžœ˜/—Kšžœžœ ˜)Kšžœžœžœžœ˜8šœ˜Kšœžœžœžœ˜WKšœ0˜0Kšœ˜—Kšœ˜Kšœ!˜!Kšž œ˜Kšžœ˜—K˜—Kšœžœ#˜8šžœžœž˜ Kšœ(˜(Kšžœžœžœ˜)—K˜K˜K˜—š  œžœžœ˜"šœ žœžœ˜-Kšžœžœžœ˜šžœž˜$Kš žœžœžœžœžœžœ˜IKšžœ˜Kšžœ˜—K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜K˜—š  œžœžœ˜"Kšœ™šœ žœžœžœ˜4K˜ Kšž œ˜K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜K˜—š   œžœžœ žœžœ˜7KšœF™FKšœg™gšœ žœžœ˜-Kšžœžœžœ˜šžœžœžœ˜šžœž˜šžœžœžœž˜>Kšžœ˜Kšžœ˜——šžœžœž˜9Kšžœžœžœžœ˜/—Kšœžœ˜Kšž œ˜K˜—K˜—šžœžœž˜ Kšœ%˜%Kšžœžœžœ˜)—K˜—K˜šœ žœžœžœ˜5K˜Kšœ˜K˜K˜#Kšœ˜Kšœ˜Kšœ˜K˜K˜—šœ žœžœžœ˜5K˜Kšœ˜K˜Kšœ˜Kšœ˜Kšœ˜Kšœ˜—K˜Kšžœ˜K˜™-Kšœ‘ ™%—K™K™—…—":