<> <> <> DIRECTORY Process USING [EnableAborts], IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM], IOClasses; IOPipeImpl: CEDAR MONITOR LOCKS data USING data: DataHandle IMPORTS IO, Process EXPORTS IOClasses = BEGIN STREAM: TYPE = IO.STREAM; Data: TYPE = MONITORED RECORD [ buffer: REF TEXT, <> pullPos: NAT _ 0, <> pushPos: NAT _ 0, <> pullerClosed: BOOL _ FALSE, <> pusherClosed: BOOL _ FALSE, <> pipeChange: CONDITION <> ]; DataHandle: TYPE = REF Data; CreatePipe: PUBLIC PROC [bufferSize: NAT] RETURNS [push, pull: STREAM] = { <> ENABLE UNWIND => NULL; nChars: NAT _ SELECT bufferSize FROM < 2 => 2, > LAST[NAT]/2 => LAST[NAT]/2, ENDCASE => bufferSize; common: DataHandle _ NEW[Data _ [buffer: NEW[TEXT[nChars]]]]; common.buffer.length _ nChars; push _ IO.CreateStream[pushProcs, common]; pull _ IO.CreateStream[pullProcs, common]; TRUSTED {Process.EnableAborts[@common.pipeChange]}; }; PullGetChar: PROC [self: STREAM] RETURNS [c: CHAR _ 0C] = { <> <> innerGetChar: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF data.pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN RETURN WITH ERROR IO.EndOfStream[self]; WAIT data.pipeChange; ENDLOOP; data.pullPos _ data.pullPos + 1; IF data.pullPos = data.buffer.length THEN data.pullPos _ 0; c _ data.buffer[data.pullPos]; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerGetChar[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE { RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]]; }; PullGetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT _ 0] = { <> <> innerGetBlock: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; pullPos: NAT _ data.pullPos; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN RETURN WITH ERROR IO.EndOfStream[self]; WAIT data.pipeChange; pullPos _ data.pullPos; ENDLOOP; IF pullPos > data.pushPos AND delta > 0 THEN { <> nBytesRead _ MIN[data.buffer.maxLength - 1 - pullPos, delta]; FOR i: NAT IN (pullPos..pullPos+nBytesRead] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; pullPos _ pullPos + nBytesRead; delta _ delta - nBytesRead; IF pullPos = data.buffer.maxLength THEN pullPos _ 0; }; IF pullPos < data.pushPos AND delta > 0 THEN { <> nBytesRead _ nBytesRead + MIN[data.pushPos - pullPos, delta]; FOR i: NAT IN (pullPos..pullPos+nBytesRead] DO block[startIndex] _ data.buffer[i]; startIndex _ startIndex + 1; ENDLOOP; pullPos _ pullPos + nBytesRead; }; data.pullPos _ pullPos; block.length _ startIndex; BROADCAST data.pipeChange; }; stopIndexPlusOne: NAT _ MIN[AddNAT[startIndex, count], block.maxLength]; delta: NAT _ IF stopIndexPlusOne > startIndex THEN stopIndexPlusOne - startIndex ELSE 0; WITH self.streamData SELECT FROM data: DataHandle => innerGetBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [diff: INT _ 0] = { innerCharsAvail: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; diff _ LONG[data.pushPos] - LONG[data.pullPos]; IF diff > 0 THEN EXIT; IF diff < 0 THEN {diff _ diff + data.buffer.maxLength; EXIT}; IF data.pusherClosed THEN {diff _ 1; EXIT}; IF NOT wait THEN {diff _ 0; EXIT}; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerCharsAvail[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullEndOf: PROC [self: STREAM] RETURNS [eof: BOOL _ FALSE] = { <> innerEndOf: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; eof _ data.pushPos = data.pullPos AND data.pusherClosed; }; WITH self.streamData SELECT FROM data: DataHandle => innerEndOf[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullClose: PROC [self: STREAM, abort: BOOL] = { <> <> innerClose: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; data.pullerClosed _ TRUE; IF ~abort AND data.pushPos # data.pullPos THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; data.pullPos _ data.pushPos; <> BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerClose[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushPutChar: PROC [self: STREAM, char: CHAR] = { <> <> innerPutChar: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; DO new: NAT _ data.pushPos + 1; IF new = data.buffer.length THEN new _ 0; IF new # data.pullPos THEN { <> data.pushPos _ new; data.buffer[data.pushPos] _ char; BROADCAST data.pipeChange; RETURN; }; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerPutChar[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushPutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex, count: NAT] = { innerPutBlock: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; numChars: INTEGER; WHILE startIndex < stopIndexPlusOne DO new: NAT _ data.pushPos + 1; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF new = data.buffer.length THEN new _ 0; IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP}; numChars _ MIN[stopIndexPlusOne-startIndex, IF new > data.pullPos THEN data.buffer.length-new ELSE data.pullPos-new]; FOR i: NAT IN [new..new+numChars-1] DO data.buffer[i] _ block[startIndex]; startIndex _ startIndex+1; ENDLOOP; data.pushPos _ new+numChars-1; BROADCAST data.pipeChange; ENDLOOP; }; stopIndexPlusOne: NAT _ AddNAT[startIndex, count]; stopIndexPlusOne _ MIN[stopIndexPlusOne, block.length]; WITH self.streamData SELECT FROM data: DataHandle => innerPutBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushFlush: PROC [self: STREAM] = { innerFlush: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; WHILE data.pushPos # data.pullPos DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerFlush[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushReset: PROC [self: STREAM] = { <> innerReset: ENTRY PROC [data: DataHandle] = INLINE { data.pushPos _ data.pullPos _ 0; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerReset[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushClose: PROC [self: STREAM, abort: BOOL _ FALSE] = { <> <> innerClose: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; IF NOT data.pusherClosed THEN { IF ~abort THEN WHILE data.pushPos # data.pullPos AND NOT data.pullerClosed DO WAIT data.pipeChange; ENDLOOP; IF data.pullerClosed AND data.pushPos # data.pullPos THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; data.pusherClosed _ TRUE; BROADCAST data.pipeChange; }; }; WITH self.streamData SELECT FROM data: DataHandle => innerClose[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; pushProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $output, class: $Pipe, putChar: PushPutChar, putBlock: PushPutBlock, flush: PushFlush, reset: PushReset, close: PushClose ]; pullProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $input, class: $Pipe, getChar: PullGetChar, getBlock: PullGetBlock, endOf: PullEndOf, charsAvail: PullCharsAvail, close: PullClose ]; END.