IOPipeImpl.mesa
Copyright Ó 1985, 1986, 1990, 1991 by Xerox Corporation. All rights reserved.
Russ Atkinson (RRA) January 22, 1986 8:50:55 pm PST
Mike Spreitzer February 27, 1987 1:32:36 pm PST
Carl Hauser, December 7, 1988 3:19:19 pm PST
JKF, September 12, 1990 3:06:01 pm PDT
DIRECTORY
Process USING [EnableAborts],
IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM, UnsafeBlock],
IOClasses,
Basics USING [ByteBlt];
IOPipeImpl:
CEDAR
MONITOR
LOCKS data USING data: DataHandle
IMPORTS IO, Basics, Process
EXPORTS IOClasses = BEGIN
STREAM: TYPE = IO.STREAM;
Data:
TYPE =
MONITORED
RECORD [
buffer:
REF
TEXT,
Circular buffer that holds the characters in the middle of the pipe.
pullPos:
NAT ¬ 0,
Position of the puller's (reader's) cursor into the buffer. The puller increments pullPos and then grabs that character.
pushPos:
NAT ¬ 0,
Position of the pusher's (writer's) cursor into the buffer. The pusher increments pushPos and then deposits the character there. The pipe is empty if pullPos = pushPos.
pullerClosed:
BOOL ¬
FALSE,
TRUE iff the puller (reader) has closed the pipe.
pusherClosed:
BOOL ¬
FALSE,
TRUE iff the pusher (writer) has closed the pipe.
pipeChange:
CONDITION
Condition raised if the state of the pipe has changed in any way.
];
DataHandle: TYPE = REF Data;
CreatePipe:
PUBLIC
PROC [bufferSize:
NAT]
RETURNS [push, pull:
STREAM] = {
Creates a pipe and returns two streams that each access the ends of the pipe. The push stream is used to add data to the pipe; the pull stream is used to remove it.
ENABLE UNWIND => NULL;
nChars:
NAT ¬
SELECT bufferSize
FROM
< 2 => 2, > LAST[NAT]/2 => LAST[NAT]/2, ENDCASE => bufferSize;
common: DataHandle ¬ NEW[Data ¬ [buffer: NEW[TEXT[nChars]]]];
common.buffer.length ¬ nChars;
push ¬ IO.CreateStream[pushProcs, common];
pull ¬ IO.CreateStream[pullProcs, common];
TRUSTED {Process.EnableAborts[@common.pipeChange]};
};
PullGetChar:
PROC [self:
STREAM]
RETURNS [c:
CHAR ¬ 0C] = {
Returns the next character in the pipe, blocking until a character is available.
Raises IO.EndOfStream ERROR if the pusher has closed the pipe. Raises IO.StreamClosed ERROR if the stream has already been closed.
innerGetChar:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
DO
IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self];
IF data.pullPos # data.pushPos THEN EXIT;
IF data.pusherClosed THEN RETURN WITH ERROR IO.EndOfStream[self];
WAIT data.pipeChange;
ENDLOOP;
data.pullPos ¬ data.pullPos + 1;
IF data.pullPos = data.buffer.length
THEN
data.pullPos ¬ 0;
c ¬ data.buffer[data.pullPos];
BROADCAST data.pipeChange;
};
WITH self.streamData
SELECT
FROM
data: DataHandle => innerGetChar[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
AddNAT:
PROC [a, b:
NAT]
RETURNS [
NAT] =
INLINE {
RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]];
};
PullGetBlock:
PROC [self:
STREAM, block:
REF
TEXT, startIndex:
NAT, count:
NAT]
RETURNS [nBytesRead:
NAT ¬ 0] = {
Reads a block from the pipe, returning the number of characters actually read.
What should the approach be? For example, if there are 10 characters in the pipe, and the guy wants 50, do we give him the 10 or wait for the 50? IO.mesa and IODoc.Tioga clearly say to wait for the 50, so that's what we do.
innerGetBlock:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
pullPos: NAT ¬ data.pullPos;
WHILE delta > 0
DO
DO
IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self];
IF pullPos # data.pushPos THEN EXIT;
IF data.pusherClosed THEN RETURN;
WAIT data.pipeChange;
pullPos ¬ data.pullPos;
ENDLOOP;
IF pullPos > data.pushPos
THEN {
The first run of bytes to get lie between pullPos and data.buffer.maxLength-1
nBytes: NAT ~ MIN[data.buffer.maxLength - 1 - pullPos, delta];
FOR i:
NAT
IN (pullPos..pullPos+nBytes]
DO
block[startIndex] ¬ data.buffer[i];
startIndex ¬ startIndex + 1;
ENDLOOP;
pullPos ¬ pullPos + nBytes;
nBytesRead ¬ nBytesRead + nBytes;
delta ¬ delta - nBytes;
};
IF pullPos = data.buffer.maxLength - 1
AND delta > 0
THEN {
The first byte in the buffer should be read.
block[startIndex] ¬ data.buffer[pullPos ¬ 0];
startIndex ¬ startIndex + 1;
nBytesRead ¬ nBytesRead + 1;
delta ¬ delta - 1;
};
IF pullPos < data.pushPos
AND delta > 0
THEN {
There are bytes to read between pullPos and data.pushPos-1
nBytes: NAT ~ MIN[data.pushPos - pullPos, delta];
FOR i:
NAT
IN (pullPos..pullPos+nBytes]
DO
block[startIndex] ¬ data.buffer[i];
startIndex ¬ startIndex + 1;
ENDLOOP;
pullPos ¬ pullPos + nBytes;
nBytesRead ¬ nBytesRead + nBytes;
delta ¬ delta - nBytes;
};
data.pullPos ¬ pullPos;
block.length ¬ startIndex;
BROADCAST data.pipeChange;
ENDLOOP;
};
stopIndexPlusOne: NAT ¬ MIN[AddNAT[startIndex, count], block.maxLength];
delta: NAT ¬ IF stopIndexPlusOne > startIndex THEN stopIndexPlusOne - startIndex ELSE 0;
WITH self.streamData
SELECT
FROM
data: DataHandle => innerGetBlock[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PullCharsAvail:
PROC [self:
STREAM, wait:
BOOL]
RETURNS [diff:
INT ¬ 0] = {
innerCharsAvail:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
DO
IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self];
diff ¬ INT[data.pushPos] - INT[data.pullPos];
IF diff > 0 THEN EXIT;
IF diff < 0 THEN {diff ¬ diff + data.buffer.maxLength; EXIT};
IF data.pusherClosed THEN {diff ¬ 1; EXIT};
IF NOT wait THEN {diff ¬ 0; EXIT};
WAIT data.pipeChange;
ENDLOOP;
};
WITH self.streamData
SELECT
FROM
data: DataHandle => innerCharsAvail[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PullEndOf:
PROC [self:
STREAM]
RETURNS [eof:
BOOL ¬
FALSE] = {
Returns TRUE iff the pusher has closed his end of the stream.
innerEndOf:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
eof ¬ data.pushPos = data.pullPos AND data.pusherClosed;
};
WITH self.streamData
SELECT
FROM
data: DataHandle => innerEndOf[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PullClose:
PROC [self:
STREAM, abort:
BOOL] = {
Closes the puller's end of the stream.
Raises ERROR IO.Error[StreamClosed ..] if there is something in the pipe (unless abort if true).
innerClose:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
data.pullerClosed ¬ TRUE;
IF ~abort
AND data.pushPos # data.pullPos
THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
data.pullPos ¬ data.pushPos;
To avoid problems if/when the pusher closes
BROADCAST data.pipeChange;
};
WITH self.streamData
SELECT
FROM
data: DataHandle => innerClose[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushPutChar:
PROC [self:
STREAM, char:
CHAR] = {
Pushes a character onto the pipe, waiting until space is available in the buffer to hold it.
Raises ERROR IO.Error[StreamClosed..] if the pusher has already closed the stream. Raises the same error if the puller has closed the stream and broken the pipe.
innerPutChar:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
IF data.pusherClosed
OR data.pullerClosed
THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
DO
new: NAT ¬ data.pushPos + 1;
IF new = data.buffer.length THEN new ¬ 0;
IF new # data.pullPos
THEN {
There is room to place the character
data.pushPos ¬ new;
data.buffer[data.pushPos] ¬ char;
BROADCAST data.pipeChange;
RETURN;
};
WAIT data.pipeChange;
ENDLOOP;
};
WITH self.streamData
SELECT
FROM
data: DataHandle => innerPutChar[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushPutBlock:
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex, count:
NAT] = {
innerPutBlock:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
numChars: INTEGER;
WHILE startIndex < stopIndexPlusOne
DO
new: NAT ¬ data.pushPos + 1;
IF data.pusherClosed
OR data.pullerClosed
THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
IF new = data.buffer.length THEN new ¬ 0;
IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP};
numChars ¬
MIN[stopIndexPlusOne-startIndex,
IF new > data.pullPos THEN data.buffer.length-new ELSE data.pullPos-new];
FOR i:
NAT
IN [new..new+numChars-1]
DO
data.buffer[i] ¬ block[startIndex];
startIndex ¬ startIndex+1;
ENDLOOP;
data.pushPos ¬ new+numChars-1;
BROADCAST data.pipeChange;
ENDLOOP;
};
stopIndexPlusOne: NAT ¬ AddNAT[startIndex, count];
stopIndexPlusOne ¬ MIN[stopIndexPlusOne, block.length];
WITH self.streamData
SELECT
FROM
data: DataHandle => innerPutBlock[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushUnsafePutBlock:
PROC [self:
STREAM, block: IO.UnsafeBlock] = {
startIndex: NAT ¬ block.startIndex;
innerPutBlock:
ENTRY
PROC [data: DataHandle] =
TRUSTED {
ENABLE UNWIND => NULL;
numChars: INTEGER;
bufferBase: LONG POINTER ¬ LOOPHOLE[data.buffer, LONG POINTER] + SIZE[TEXT[0]];
WHILE startIndex < stopIndexPlusOne
DO
new: NAT ¬ data.pushPos + 1;
IF data.pusherClosed
OR data.pullerClosed
THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
IF new = data.buffer.length THEN new ¬ 0;
IF new = data.pullPos THEN {WAIT data.pipeChange; LOOP};
numChars ¬ Basics.ByteBlt[
to: [bufferBase, new, IF new > data.pullPos THEN data.buffer.length ELSE data.pullPos],
from: [block.base, startIndex, stopIndexPlusOne]
];
data.pushPos ¬ new+numChars-1;
startIndex ¬ startIndex+numChars;
BROADCAST data.pipeChange;
ENDLOOP;
};
stopIndexPlusOne: NAT ¬ AddNAT[startIndex, block.count];
WITH self.streamData
SELECT
FROM
data: DataHandle => innerPutBlock[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushFlush:
PROC [self:
STREAM] = {
innerFlush:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
WHILE data.pushPos # data.pullPos
DO
IF data.pullerClosed THEN RETURN WITH ERROR IO.Error[StreamClosed, self];
WAIT data.pipeChange;
ENDLOOP;
};
WITH self.streamData
SELECT
FROM
data: DataHandle => innerFlush[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushReset:
PROC [self:
STREAM] = {
Clears out the pipe.
innerReset:
ENTRY
PROC [data: DataHandle] =
INLINE {
data.pushPos ¬ data.pullPos ¬ 0;
BROADCAST data.pipeChange;
};
WITH self.streamData
SELECT
FROM
data: DataHandle => innerReset[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
PushClose:
PROC [self:
STREAM, abort:
BOOL ¬
FALSE] = {
Closes the pusher's end of the pipe. Flushes out the pipe iff ~abort.
Raises IO.Error[StreamClosed..] if the puller has closed the stream and there is still something in it.
innerClose:
ENTRY
PROC [data: DataHandle] = {
ENABLE UNWIND => NULL;
IF
NOT data.pusherClosed
THEN {
IF ~abort
THEN
WHILE data.pushPos # data.pullPos
AND
NOT data.pullerClosed
DO
WAIT data.pipeChange;
ENDLOOP;
IF data.pullerClosed
AND data.pushPos # data.pullPos
THEN
RETURN WITH ERROR IO.Error[StreamClosed, self];
data.pusherClosed ¬ TRUE;
BROADCAST data.pipeChange;
};
};
WITH self.streamData
SELECT
FROM
data: DataHandle => innerClose[data];
ENDCASE => ERROR IO.Error[Failure, self];
};
pushProcs:
REF
IO.StreamProcs =
IO.CreateStreamProcs[
variety: $output, class: $Pipe,
putChar: PushPutChar,
putBlock: PushPutBlock,
unsafePutBlock: PushUnsafePutBlock,
flush: PushFlush,
reset: PushReset,
close: PushClose
];
pullProcs:
REF
IO.StreamProcs =
IO.CreateStreamProcs[
variety: $input, class: $Pipe,
getChar: PullGetChar,
getBlock: PullGetBlock,
endOf: PullEndOf,
charsAvail: PullCharsAvail,
close: PullClose
];
END.
Carl Hauser, November 19, 1987 6:16:54 pm PST
Added PushUnsafePutBlock., IOPipeImpl