<> <> DIRECTORY IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM], IOClasses, Process; AbortablePipes: CEDAR MONITOR IMPORTS IO, Process EXPORTS IOClasses = BEGIN STREAM: TYPE = IO.STREAM; Data: TYPE = RECORD [ buffer: REF TEXT, <> pullPos: NAT _ 0, <> pushPos: NAT _ 0, <> pullerClosed: BOOL _ FALSE, <> pusherClosed: BOOL _ FALSE, <> pipeChange: CONDITION <> ]; DataHandle: TYPE = REF Data; CreatePipe: PUBLIC ENTRY PROC [bufferSize: NAT] RETURNS [push, pull: STREAM] = { <> ENABLE UNWIND => NULL; common: DataHandle _ NEW[Data _ [buffer: NEW[TEXT[bufferSize]]]]; common.buffer.length _ common.buffer.maxLength; push _ IO.CreateStream[ pushProcs, common ]; pull _ IO.CreateStream[ pullProcs, common ]; TRUSTED {Process.EnableAborts[@common.pipeChange]}; }; PullGetChar: ENTRY PROC [self: STREAM] RETURNS [c: CHAR] = { <> <> ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; DO IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; IF data.pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN ERROR IO.EndOfStream[self]; WAIT data.pipeChange; ENDLOOP; data.pullPos _ data.pullPos + 1; IF data.pullPos = data.buffer.length THEN data.pullPos _ 0; c _ data.buffer[data.pullPos]; BROADCAST data.pipeChange; }; AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE { RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]]; }; PullGetBlock: ENTRY PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT] = { <> <> ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; stopIndexPlusOne: NAT _ AddNAT[startIndex, count]; stopIndexPlusOne _ MIN[ stopIndexPlusOne, block.length ]; DO IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; IF data.pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN ERROR IO.EndOfStream[self]; WAIT data.pipeChange; ENDLOOP; IF data.pullPos < data.pushPos THEN { nBytesRead _ MIN[ data.pushPos - data.pullPos, stopIndexPlusOne - startIndex ]; FOR i: INT IN (data.pullPos..data.pullPos+nBytesRead] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; data.pullPos _ data.pullPos + nBytesRead; } ELSE { nBytesRead _ MIN[ data.buffer.maxLength - 1 - data.pullPos, stopIndexPlusOne - startIndex ]; FOR i: INT IN (data.pullPos..data.pullPos+nBytesRead] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; data.pullPos _ data.pullPos + nBytesRead; IF startIndex < stopIndexPlusOne THEN { data.pullPos _ MIN[ data.pushPos, stopIndexPlusOne - startIndex - 1]; nBytesRead _ nBytesRead + data.pullPos + 1; FOR i: INT IN [0..data.pullPos] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; }; }; BROADCAST data.pipeChange; }; PullCharsAvail: ENTRY PROC [self: STREAM, wait: BOOL] RETURNS [INT] = { ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; diff: INT; DO IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; diff _ LONG[data.pushPos] - LONG[data.pullPos]; IF diff > 0 THEN RETURN [diff]; IF diff < 0 THEN RETURN [LONG[data.buffer.maxLength] + diff]; IF data.pusherClosed THEN RETURN [1]; IF NOT wait THEN RETURN [0]; WAIT data.pipeChange; ENDLOOP; }; PullEndOf: ENTRY PROC [self: STREAM] RETURNS [eof: BOOL] = { <> ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; RETURN [data.pushPos = data.pullPos AND data.pusherClosed]; }; PullClose: ENTRY PROC [self: STREAM, abort: BOOL] = { <> <> ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; data.pullerClosed _ TRUE; IF data.pushPos # data.pullPos AND ~abort THEN ERROR IO.Error[StreamClosed, self]; BROADCAST data.pipeChange; }; PushPutChar: ENTRY PROC [self: STREAM, char: CHAR] = { <> <> ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; IF data.pusherClosed THEN ERROR IO.Error[StreamClosed, self]; IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; WHILE TRUE DO data.pushPos _ data.pushPos + 1; IF data.pushPos = data.buffer.length THEN data.pushPos _ 0; IF data.pushPos # data.pullPos THEN EXIT; IF data.pushPos = 0 THEN data.pushPos _ data.buffer.length; data.pushPos _ data.pushPos - 1; WAIT data.pipeChange; ENDLOOP; data.buffer[data.pushPos] _ char; BROADCAST data.pipeChange; }; PushPutBlock: ENTRY PROC[self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] = { ENABLE UNWIND => NULL; WaitForSpace: INTERNAL PROC [] = { WHILE data.pushPos + 1 = data.pullPos OR (data.pullPos = 0 AND data.pushPos = data.buffer.length - 1) DO WAIT data.pipeChange; ENDLOOP; }; data: DataHandle _ NARROW[self.streamData]; numChars: INT; stopIndexPlusOne: NAT _ AddNAT[startIndex, count]; stopIndexPlusOne _ MIN[ stopIndexPlusOne, block.length ]; WHILE startIndex < stopIndexPlusOne DO IF data.pusherClosed THEN ERROR IO.Error[StreamClosed, self]; IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; WaitForSpace[]; data.pushPos _ data.pushPos + 1; IF data.pushPos = data.buffer.length THEN data.pushPos _ 0; IF data.pushPos > data.pullPos THEN numChars _ MIN[ data.buffer.length-data.pushPos, stopIndexPlusOne-startIndex ] ELSE numChars _ MIN[ data.pullPos-data.pushPos, stopIndexPlusOne-startIndex ]; FOR i: INT IN [data.pushPos..data.pushPos+numChars-1] DO data.buffer[i] _ block[startIndex]; startIndex _ startIndex+1; ENDLOOP; data.pushPos _ data.pushPos+numChars-1; ENDLOOP; BROADCAST data.pipeChange; }; PushFlush: ENTRY PROC [self: STREAM] = { ENABLE UNWIND => NULL; PushFlusher[self]; }; PushFlusher: INTERNAL PROC [self: STREAM] = { <> <> data: DataHandle _ NARROW[self.streamData]; WHILE data.pushPos # data.pullPos DO IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self]; WAIT data.pipeChange; ENDLOOP; BROADCAST data.pipeChange; }; PushReset: ENTRY PROC [self: STREAM] = { <> ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; data.pushPos _ data.pullPos _ 0; BROADCAST data.pipeChange; }; PushClose: ENTRY PROC [self: STREAM, abort: BOOL _ FALSE] = { <> <> ENABLE UNWIND => NULL; data: DataHandle _ NARROW[self.streamData]; IF data.pullerClosed AND data.pushPos # data.pullPos THEN ERROR IO.Error[StreamClosed, self]; IF ~abort THEN PushFlusher[self]; data.pusherClosed _ TRUE; BROADCAST data.pipeChange; }; pushProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $output, class: $Pipe, putChar: PushPutChar, putBlock: PushPutBlock, flush: PushFlush, reset: PushReset, close: PushClose ]; pullProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $input, class: $Pipe, getChar: PullGetChar, getBlock: PullGetBlock, endOf: PullEndOf, charsAvail: PullCharsAvail, close: PullClose ]; END. CHANGE LOG Created by Nix on September 9, 1983 3:04 pm Changed by MBrown on October 25, 1983 4:52 pm Converted to Cedar 5.0; exports IOClasses.CreatePipe instead of Pipe.Open. Did minimal conversions of block operations; these should be reworked to use count instead of stopIndexPlusOne. Did substantial conversion of PullerCharsAvail. Bug in PullEndOf: returned true with stuff in buffer. <<>> <> <IO>IOPipeImpl.mesa!1 of January 13, 1984 2:22:51 pm PST.>> <> <> <<>> <> <<general comments>> <> <<>>