<> <> <> <> <> <> DIRECTORY Process USING [EnableAborts], IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM, UnsafeBlock], IOClasses, Basics USING [ByteBlt]; IOPipeImpl: CEDAR MONITOR LOCKS data USING data: DataHandle IMPORTS IO, Basics, Process EXPORTS IOClasses = BEGIN STREAM: TYPE = IO.STREAM; Data: TYPE = MONITORED RECORD [ buffer: REF TEXT, <> pullPos: NAT ¬ 0, <> pushPos: NAT ¬ 0, <> pullerClosed: BOOL ¬ FALSE, <> pusherClosed: BOOL ¬ FALSE, <> pipeChange: CONDITION <> ]; DataHandle: TYPE = REF Data; CreatePipe: PUBLIC PROC [bufferSize: NAT] RETURNS [push, pull: STREAM] = { <> ENABLE UNWIND => NULL; nChars: NAT ¬ SELECT bufferSize FROM < 2 => 2, > LAST[NAT]/2 => LAST[NAT]/2, ENDCASE => bufferSize; common: DataHandle ¬ NEW[Data ¬ [buffer: NEW[TEXT[nChars]]]]; common.buffer.length ¬ nChars; push ¬ IO.CreateStream[pushProcs, common]; pull ¬ IO.CreateStream[pullProcs, common]; TRUSTED {Process.EnableAborts[@common.pipeChange]}; }; PullGetChar: PROC [self: STREAM] RETURNS [c: CHAR ¬ 0C] = { <> <> innerGetChar: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF data.pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN RETURN WITH ERROR IO.EndOfStream[self]; WAIT data.pipeChange; ENDLOOP; data.pullPos ¬ data.pullPos + 1; IF data.pullPos = data.buffer.length THEN data.pullPos ¬ 0; c ¬ data.buffer[data.pullPos]; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerGetChar[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE { RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]]; }; PullGetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT ¬ 0] = { <> <> innerGetBlock: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; pullPos: NAT ¬ data.pullPos; WHILE delta > 0 DO DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF pullPos # data.pushPos THEN EXIT; IF data.pusherClosed THEN RETURN; WAIT data.pipeChange; pullPos ¬ data.pullPos; ENDLOOP; IF pullPos > data.pushPos THEN { <> nBytes: NAT ~ MIN[data.buffer.maxLength - 1 - pullPos, delta]; FOR i: NAT IN (pullPos..pullPos+nBytes] DO block[startIndex] ¬ data.buffer[i]; startIndex ¬ startIndex + 1; ENDLOOP; pullPos ¬ pullPos + nBytes; nBytesRead ¬ nBytesRead + nBytes; delta ¬ delta - nBytes; }; IF pullPos = data.buffer.maxLength - 1 AND delta > 0 THEN { <> block[startIndex] ¬ data.buffer[pullPos ¬ 0]; startIndex ¬ startIndex + 1; nBytesRead ¬ nBytesRead + 1; delta ¬ delta - 1; }; IF pullPos < data.pushPos AND delta > 0 THEN { <> nBytes: NAT ~ MIN[data.pushPos - pullPos, delta]; FOR i: NAT IN (pullPos..pullPos+nBytes] DO block[startIndex] ¬ data.buffer[i]; startIndex ¬ startIndex + 1; ENDLOOP; pullPos ¬ pullPos + nBytes; nBytesRead ¬ nBytesRead + nBytes; delta ¬ delta - nBytes; }; data.pullPos ¬ pullPos; block.length ¬ startIndex; BROADCAST data.pipeChange; ENDLOOP; }; stopIndexPlusOne: NAT ¬ MIN[AddNAT[startIndex, count], block.maxLength]; delta: NAT ¬ IF stopIndexPlusOne > startIndex THEN stopIndexPlusOne - startIndex ELSE 0; WITH self.streamData SELECT FROM data: DataHandle => innerGetBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [diff: INT ¬ 0] = { innerCharsAvail: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; diff ¬ INT[data.pushPos] - INT[data.pullPos]; IF diff > 0 THEN EXIT; IF diff < 0 THEN {diff ¬ diff + data.buffer.maxLength; EXIT}; IF data.pusherClosed THEN {diff ¬ 1; EXIT}; IF NOT wait THEN {diff ¬ 0; EXIT}; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerCharsAvail[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullEndOf: PROC [self: STREAM] RETURNS [eof: BOOL ¬ FALSE] = { <> innerEndOf: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; eof ¬ data.pushPos = data.pullPos AND data.pusherClosed; }; WITH self.streamData SELECT FROM data: DataHandle => innerEndOf[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PullClose: PROC [self: STREAM, abort: BOOL] = { <> <> innerClose: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; data.pullerClosed ¬ TRUE; IF ~abort AND data.pushPos # data.pullPos THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; data.pullPos ¬ data.pushPos; <> BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerClose[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushPutChar: PROC [self: STREAM, char: CHAR] = { <> <> innerPutChar: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; DO new: NAT ¬ data.pushPos + 1; IF new = data.buffer.length THEN new ¬ 0; IF new # data.pullPos THEN { <> data.pushPos ¬ new; data.buffer[data.pushPos] ¬ char; BROADCAST data.pipeChange; RETURN; }; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerPutChar[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushPutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex, count: NAT] = { innerPutBlock: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; numChars: INTEGER; WHILE startIndex < stopIndexPlusOne DO new: NAT ¬ data.pushPos + 1; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF new = data.buffer.length THEN new ¬ 0; IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP}; numChars ¬ MIN[stopIndexPlusOne-startIndex, IF new > data.pullPos THEN data.buffer.length-new ELSE data.pullPos-new]; FOR i: NAT IN [new..new+numChars-1] DO data.buffer[i] ¬ block[startIndex]; startIndex ¬ startIndex+1; ENDLOOP; data.pushPos ¬ new+numChars-1; BROADCAST data.pipeChange; ENDLOOP; }; stopIndexPlusOne: NAT ¬ AddNAT[startIndex, count]; stopIndexPlusOne ¬ MIN[stopIndexPlusOne, block.length]; WITH self.streamData SELECT FROM data: DataHandle => innerPutBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushUnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] = { startIndex: NAT ¬ block.startIndex; innerPutBlock: ENTRY PROC [data: DataHandle] = TRUSTED { ENABLE UNWIND => NULL; numChars: INTEGER; bufferBase: LONG POINTER ¬ LOOPHOLE[data.buffer, LONG POINTER] + SIZE[TEXT[0]]; WHILE startIndex < stopIndexPlusOne DO new: NAT ¬ data.pushPos + 1; IF data.pusherClosed OR data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; IF new = data.buffer.length THEN new ¬ 0; IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP}; numChars ¬ Basics.ByteBlt[ to: [bufferBase, new, IF new > data.pullPos THEN data.buffer.length ELSE data.pullPos], from: [block.base, startIndex, stopIndexPlusOne] ]; data.pushPos ¬ new+numChars-1; startIndex ¬ startIndex+numChars; BROADCAST data.pipeChange; ENDLOOP; }; stopIndexPlusOne: NAT ¬ AddNAT[startIndex, block.count]; WITH self.streamData SELECT FROM data: DataHandle => innerPutBlock[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushFlush: PROC [self: STREAM] = { innerFlush: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; WHILE data.pushPos # data.pullPos DO IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; WAIT data.pipeChange; ENDLOOP; }; WITH self.streamData SELECT FROM data: DataHandle => innerFlush[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushReset: PROC [self: STREAM] = { <> innerReset: ENTRY PROC [data: DataHandle] = INLINE { data.pushPos ¬ data.pullPos ¬ 0; BROADCAST data.pipeChange; }; WITH self.streamData SELECT FROM data: DataHandle => innerReset[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; PushClose: PROC [self: STREAM, abort: BOOL ¬ FALSE] = { <> <> innerClose: ENTRY PROC [data: DataHandle] = { ENABLE UNWIND => NULL; IF NOT data.pusherClosed THEN { IF ~abort THEN WHILE data.pushPos # data.pullPos AND NOT data.pullerClosed DO WAIT data.pipeChange; ENDLOOP; IF data.pullerClosed AND data.pushPos # data.pullPos THEN RETURN WITH ERROR IO.Error[StreamClosed, self]; data.pusherClosed ¬ TRUE; BROADCAST data.pipeChange; }; }; WITH self.streamData SELECT FROM data: DataHandle => innerClose[data]; ENDCASE => ERROR IO.Error[Failure, self]; }; pushProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $output, class: $Pipe, putChar: PushPutChar, putBlock: PushPutBlock, unsafePutBlock: PushUnsafePutBlock, flush: PushFlush, reset: PushReset, close: PushClose ]; pullProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $input, class: $Pipe, getChar: PullGetChar, getBlock: PullGetBlock, endOf: PullEndOf, charsAvail: PullCharsAvail, close: PullClose ]; END. <> <> <<>> <<>>