IOPipeImpl.mesa
Copyright Ó 1985, 1986, 1990, 1991 by Xerox Corporation. All rights reserved.
Russ Atkinson (RRA) January 22, 1986 8:50:55 pm PST
Mike Spreitzer February 27, 1987 1:32:36 pm PST
Carl Hauser, December 7, 1988 3:19:19 pm PST
JKF, September 12, 1990 3:06:01 pm PDT
DIRECTORY
Process USING [EnableAborts],
IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM, UnsafeBlock],
IOClasses,
Basics USING [ByteBlt];
IOPipeImpl: CEDAR MONITOR
LOCKS data USING data: DataHandle
IMPORTS IO, Basics, Process
EXPORTS IOClasses = BEGIN
STREAM: TYPE = IO.STREAM;
Data: TYPE = MONITORED RECORD [
buffer: REF TEXT,
Circular buffer that holds the characters in the middle of the pipe.
pullPos: NAT ¬ 0,
Position of the puller's (reader's) cursor into the buffer. The puller increments pullPos and then grabs that character.
pushPos: NAT ¬ 0,
Position of the pusher's (writer's) cursor into the buffer. The pusher increments pushPos and then deposits the character there. The pipe is empty if pullPos = pushPos.
pullerClosed: BOOL ¬ FALSE,
TRUE iff the puller (reader) has closed the pipe.
pusherClosed: BOOL ¬ FALSE,
TRUE iff the pusher (writer) has closed the pipe.
pipeChange: CONDITION
Condition raised if the state of the pipe has changed in any way.
];
DataHandle: TYPE = REF Data;
CreatePipe: PUBLIC PROC [bufferSize: NAT] RETURNS [push, pull: STREAM] = {
Creates a pipe and returns two streams that each access the ends of the pipe. The push stream is used to add data to the pipe; the pull stream is used to remove it.
ENABLE UNWIND => NULL;
nChars: NAT ¬ SELECT bufferSize FROM
< 2 => 2, > LAST[NAT]/2 => LAST[NAT]/2, ENDCASE => bufferSize;
common: DataHandle ¬ NEW[Data ¬ [buffer: NEW[TEXT[nChars]]]];
common.buffer.length ¬ nChars;
push ¬ IO.CreateStream[pushProcs, common];
pull ¬ IO.CreateStream[pullProcs, common];
TRUSTED {Process.EnableAborts[@common.pipeChange]};
};
PullGetChar: PROC [self: STREAM] RETURNS [c: CHAR ¬ 0C] = {
Returns the next character in the pipe, blocking until a character is available.
Raises IO.EndOfStream ERROR if the pusher has closed the pipe. Raises IO.StreamClosed ERROR if the stream has already been closed.
innerGetChar: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
DO
IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self];
IF data.pullPos # data.pushPos THEN EXIT;
IF data.pusherClosed THEN RETURN WITH ERROR IO.EndOfStream[self];
WAIT data.pipeChange;
ENDLOOP;
data.pullPos ¬ data.pullPos + 1;
IF data.pullPos = data.buffer.length THEN
data.pullPos ¬ 0;
c ¬ data.buffer[data.pullPos];
BROADCAST data.pipeChange;
};
WITH self.streamData SELECT FROM
data: DataHandle => innerGetChar[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE {
RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]];
};
PullGetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT ¬ 0] = {
Reads a block from the pipe, returning the number of characters actually read.
What should the approach be? For example, if there are 10 characters in the pipe, and the guy wants 50, do we give him the 10 or wait for the 50? IO.mesa and IODoc.Tioga clearly say to wait for the 50, so that's what we do.
innerGetBlock: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
pullPos: NAT ¬ data.pullPos;
WHILE delta > 0 DO
DO
IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self];
IF pullPos # data.pushPos THEN EXIT;
IF data.pusherClosed THEN RETURN;
WAIT data.pipeChange;
pullPos ¬ data.pullPos;
ENDLOOP;
IF pullPos > data.pushPos THEN {
The first run of bytes to get lie between pullPos and data.buffer.maxLength-1
nBytes: NAT ~ MIN[data.buffer.maxLength - 1 - pullPos, delta];
FOR i: NAT IN (pullPos..pullPos+nBytes] DO
block[startIndex] ¬ data.buffer[i];
startIndex ¬ startIndex + 1;
ENDLOOP;
pullPos ¬ pullPos + nBytes;
nBytesRead ¬ nBytesRead + nBytes;
delta ¬ delta - nBytes;
};
IF pullPos = data.buffer.maxLength - 1 AND delta > 0 THEN {
The first byte in the buffer should be read.
block[startIndex] ¬ data.buffer[pullPos ¬ 0];
startIndex ¬ startIndex + 1;
nBytesRead ¬ nBytesRead + 1;
delta ¬ delta - 1;
};
IF pullPos < data.pushPos AND delta > 0 THEN {
There are bytes to read between pullPos and data.pushPos-1
nBytes: NAT ~ MIN[data.pushPos - pullPos, delta];
FOR i: NAT IN (pullPos..pullPos+nBytes] DO
block[startIndex] ¬ data.buffer[i];
startIndex ¬ startIndex + 1;
ENDLOOP;
pullPos ¬ pullPos + nBytes;
nBytesRead ¬ nBytesRead + nBytes;
delta ¬ delta - nBytes;
};
data.pullPos ¬ pullPos;
block.length ¬ startIndex;
BROADCAST data.pipeChange;
ENDLOOP;
};
stopIndexPlusOne: NAT ¬ MIN[AddNAT[startIndex, count], block.maxLength];
delta: NAT ¬ IF stopIndexPlusOne > startIndex THEN stopIndexPlusOne - startIndex ELSE 0;
WITH self.streamData SELECT FROM
data: DataHandle => innerGetBlock[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PullCharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [diff: INT ¬ 0] = {
innerCharsAvail: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
DO
IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self];
diff ¬ INT[data.pushPos] - INT[data.pullPos];
IF diff > 0 THEN EXIT;
IF diff < 0 THEN {diff ¬ diff + data.buffer.maxLength; EXIT};
IF data.pusherClosed THEN {diff ¬ 1; EXIT};
IF NOT wait THEN {diff ¬ 0; EXIT};
WAIT data.pipeChange;
ENDLOOP;
};
WITH self.streamData SELECT FROM
data: DataHandle => innerCharsAvail[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PullEndOf: PROC [self: STREAM] RETURNS [eof: BOOL ¬ FALSE] = {
Returns TRUE iff the pusher has closed his end of the stream.
innerEndOf: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
eof ¬ data.pushPos = data.pullPos AND data.pusherClosed;
};
WITH self.streamData SELECT FROM
data: DataHandle => innerEndOf[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PullClose: PROC [self: STREAM, abort: BOOL] = {
Closes the puller's end of the stream.
Raises ERROR IO.Error[StreamClosed ..] if there is something in the pipe (unless abort if true).
innerClose: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
data.pullerClosed ¬ TRUE;
IF ~abort AND data.pushPos # data.pullPos THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
data.pullPos ¬ data.pushPos;
To avoid problems if/when the pusher closes
BROADCAST data.pipeChange;
};
WITH self.streamData SELECT FROM
data: DataHandle => innerClose[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushPutChar: PROC [self: STREAM, char: CHAR] = {
Pushes a character onto the pipe, waiting until space is available in the buffer to hold it.
Raises ERROR IO.Error[StreamClosed..] if the pusher has already closed the stream. Raises the same error if the puller has closed the stream and broken the pipe.
innerPutChar: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
IF data.pusherClosed OR data.pullerClosed THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
DO
new: NAT ¬ data.pushPos + 1;
IF new = data.buffer.length THEN new ¬ 0;
IF new # data.pullPos THEN {
There is room to place the character
data.pushPos ¬ new;
data.buffer[data.pushPos] ¬ char;
BROADCAST data.pipeChange;
RETURN;
};
WAIT data.pipeChange;
ENDLOOP;
};
WITH self.streamData SELECT FROM
data: DataHandle => innerPutChar[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushPutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex, count: NAT] = {
innerPutBlock: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
numChars: INTEGER;
WHILE startIndex < stopIndexPlusOne DO
new: NAT ¬ data.pushPos + 1;
IF data.pusherClosed OR data.pullerClosed THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
IF new = data.buffer.length THEN new ¬ 0;
IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP};
numChars ¬ MIN[stopIndexPlusOne-startIndex,
IF new > data.pullPos THEN data.buffer.length-new ELSE data.pullPos-new];
FOR i: NAT IN [new..new+numChars-1] DO
data.buffer[i] ¬ block[startIndex];
startIndex ¬ startIndex+1;
ENDLOOP;
data.pushPos ¬ new+numChars-1;
BROADCAST data.pipeChange;
ENDLOOP;
};
stopIndexPlusOne: NAT ¬ AddNAT[startIndex, count];
stopIndexPlusOne ¬ MIN[stopIndexPlusOne, block.length];
WITH self.streamData SELECT FROM
data: DataHandle => innerPutBlock[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushUnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] = {
startIndex: NAT ¬ block.startIndex;
innerPutBlock: ENTRY PROC [data: DataHandle] = TRUSTED {
ENABLE UNWIND => NULL;
numChars: INTEGER;
bufferBase: LONG POINTER ¬ LOOPHOLE[data.buffer, LONG POINTER] + SIZE[TEXT[0]];
WHILE startIndex < stopIndexPlusOne DO
new: NAT ¬ data.pushPos + 1;
IF data.pusherClosed OR data.pullerClosed THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
IF new = data.buffer.length THEN new ¬ 0;
IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP};
numChars ¬ Basics.ByteBlt[
to: [bufferBase, new, IF new > data.pullPos THEN data.buffer.length ELSE data.pullPos],
from: [block.base, startIndex, stopIndexPlusOne]
];
data.pushPos ¬ new+numChars-1;
startIndex ¬ startIndex+numChars;
BROADCAST data.pipeChange;
ENDLOOP;
};
stopIndexPlusOne: NAT ¬ AddNAT[startIndex, block.count];
WITH self.streamData SELECT FROM
data: DataHandle => innerPutBlock[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushFlush: PROC [self: STREAM] = {
innerFlush: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
WHILE data.pushPos # data.pullPos DO
IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self];
WAIT data.pipeChange;
ENDLOOP;
};
WITH self.streamData SELECT FROM
data: DataHandle => innerFlush[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushReset: PROC [self: STREAM] = {
Clears out the pipe.
innerReset: ENTRY PROC [data: DataHandle] = INLINE {
data.pushPos ¬ data.pullPos ¬ 0;
BROADCAST data.pipeChange;
};
WITH self.streamData SELECT FROM
data: DataHandle => innerReset[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushClose: PROC [self: STREAM, abort: BOOL ¬ FALSE] = {
Closes the pusher's end of the pipe. Flushes out the pipe iff ~abort.
Raises IO.Error[StreamClosed..] if the puller has closed the stream and there is still something in it.
innerClose: ENTRY PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
IF NOT data.pusherClosed THEN {
IF ~abort THEN
WHILE data.pushPos # data.pullPos AND NOT data.pullerClosed DO
WAIT data.pipeChange;
ENDLOOP;
IF data.pullerClosed AND data.pushPos # data.pullPos THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
data.pusherClosed ¬ TRUE;
BROADCAST data.pipeChange;
};
};
WITH self.streamData SELECT FROM
data: DataHandle => innerClose[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
pushProcs: REF IO.StreamProcs = IO.CreateStreamProcs[
variety: $output, class: $Pipe,
putChar: PushPutChar,
putBlock: PushPutBlock,
unsafePutBlock: PushUnsafePutBlock,
flush: PushFlush,
reset: PushReset,
close: PushClose
];
pullProcs: REF IO.StreamProcs = IO.CreateStreamProcs[
variety: $input, class: $Pipe,
getChar: PullGetChar,
getBlock: PullGetBlock,
endOf: PullEndOf,
charsAvail: PullCharsAvail,
close: PullClose
];
END.
Carl Hauser, November 19, 1987 6:16:54 pm PST
Added PushUnsafePutBlock., IOPipeImpl