AbortablePipes.mesa
-- Last Edited by
-- Nix on September 13, 1983 4:58 pm
-- MBrown on January 13, 1984 2:21 pm
Last Edited by: Spreitzer, March 13, 1985 9:31:25 pm PST
DIRECTORY
IO USING [Error, StreamProcs, EndOfStream, CreateStream, CreateStreamProcs, STREAM],
IOClasses,
Process;
AbortablePipes: CEDAR MONITOR
IMPORTS IO, Process
EXPORTS IOClasses =
BEGIN
STREAM: TYPE = IO.STREAM;
Data: TYPE = RECORD [
buffer: REF TEXT,
Circular buffer that holds the characters in the middle of the pipe.
pullPos: NAT ← 0,
Position of the puller's (reader's) cursor into the buffer. The puller increments pullPos and then grabs that character.
pushPos: NAT ← 0,
Position of the pusher's (writer's) cursor into the buffer. The pusher increments pushPos and then deposits the character there. The pipe is empty if pullPos = pushPos.
pullerClosed: BOOLFALSE,
TRUE iff the puller (reader) has closed the pipe.
pusherClosed: BOOLFALSE,
TRUE iff the pusher (writer) has closed the pipe.
pipeChange: CONDITION
Condition raised if the state of the pipe has changed in any way.
];
DataHandle: TYPE = REF Data;
CreatePipe: PUBLIC ENTRY PROC [bufferSize: NAT] RETURNS [push, pull: STREAM] = {
Creates a pipe and returns two streams that each access the ends of the pipe. The push stream is used to add data to the pipe; the pull stream is used to remove it.
ENABLE UNWIND => NULL;
common: DataHandle ← NEW[Data ← [buffer: NEW[TEXT[bufferSize]]]];
common.buffer.length ← common.buffer.maxLength;
push ← IO.CreateStream[ pushProcs, common ];
pull ← IO.CreateStream[ pullProcs, common ];
TRUSTED {Process.EnableAborts[@common.pipeChange]};
};
PullGetChar: ENTRY PROC [self: STREAM] RETURNS [c: CHAR] = {
Returns the next character in the pipe, blocking until a character is available.
Raises IO.EndOfStream ERROR if the pusher has closed the pipe. Raises IO.StreamClosed ERROR if the stream has already been closed.
ENABLE UNWIND => NULL;
data: DataHandle ← NARROW[self.streamData];
DO
IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self];
IF data.pullPos # data.pushPos THEN EXIT;
IF data.pusherClosed THEN ERROR IO.EndOfStream[self];
WAIT data.pipeChange;
ENDLOOP;
data.pullPos ← data.pullPos + 1;
IF data.pullPos = data.buffer.length THEN
data.pullPos ← 0;
c ← data.buffer[data.pullPos];
BROADCAST data.pipeChange;
};
AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE {
RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]];
};
PullGetBlock: ENTRY PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT]
RETURNS [nBytesRead: NAT] = {
Reads a block from the pipe, returning the number of characters actually read.
What should the approach be? For example, if there are 10 characters in the pipe, and the guy wants 50, do we give him the 10 or wait for the 50? I've decided to give him the 10.
ENABLE UNWIND => NULL;
data: DataHandle ← NARROW[self.streamData];
stopIndexPlusOne: NAT ← AddNAT[startIndex, count];
stopIndexPlusOne ← MIN[ stopIndexPlusOne, block.length ];
DO
IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self];
IF data.pullPos # data.pushPos THEN EXIT;
IF data.pusherClosed THEN ERROR IO.EndOfStream[self];
WAIT data.pipeChange;
ENDLOOP;
IF data.pullPos < data.pushPos THEN {
nBytesRead ← MIN[ data.pushPos - data.pullPos, stopIndexPlusOne - startIndex ];
FOR i: INT IN (data.pullPos..data.pullPos+nBytesRead] DO
block[startIndex] ← data.buffer[i];
startIndex ← startIndex + 1;
ENDLOOP;
data.pullPos ← data.pullPos + nBytesRead;
}
ELSE {
nBytesRead ← MIN[ data.buffer.maxLength - 1 - data.pullPos, stopIndexPlusOne - startIndex ];
FOR i: INT IN (data.pullPos..data.pullPos+nBytesRead] DO
block[startIndex] ← data.buffer[i];
startIndex ← startIndex + 1;
ENDLOOP;
data.pullPos ← data.pullPos + nBytesRead;
IF startIndex < stopIndexPlusOne THEN {
data.pullPos ← MIN[ data.pushPos, stopIndexPlusOne - startIndex - 1];
nBytesRead ← nBytesRead + data.pullPos + 1;
FOR i: INT IN [0..data.pullPos] DO
block[startIndex] ← data.buffer[i];
startIndex ← startIndex + 1;
ENDLOOP;
};
};
BROADCAST data.pipeChange;
};
PullCharsAvail: ENTRY PROC [self: STREAM, wait: BOOL] RETURNS [INT] = {
ENABLE UNWIND => NULL;
data: DataHandle ← NARROW[self.streamData];
diff: INT;
DO
IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self];
diff ← LONG[data.pushPos] - LONG[data.pullPos];
IF diff > 0 THEN RETURN [diff];
IF diff < 0 THEN RETURN [LONG[data.buffer.maxLength] + diff];
IF data.pusherClosed THEN RETURN [1];
IF NOT wait THEN RETURN [0];
WAIT data.pipeChange;
ENDLOOP;
};
PullEndOf: ENTRY PROC [self: STREAM] RETURNS [eof: BOOL] = {
Returns TRUE iff the pusher has closed his end of the stream.
ENABLE UNWIND => NULL;
data: DataHandle ← NARROW[self.streamData];
RETURN [data.pushPos = data.pullPos AND data.pusherClosed];
};
PullClose: ENTRY PROC [self: STREAM, abort: BOOL] = {
Closes the puller's end of the stream.
Raises ERROR IO.Error[StreamClosed ..] if there is something in the pipe (unless abort if true).
ENABLE UNWIND => NULL;
data: DataHandle ← NARROW[self.streamData];
data.pullerClosed ← TRUE;
IF data.pushPos # data.pullPos AND ~abort THEN ERROR IO.Error[StreamClosed, self];
BROADCAST data.pipeChange;
};
PushPutChar: ENTRY PROC [self: STREAM, char: CHAR] = {
Pushes a character onto the pipe, waiting until space is available in the buffer to hold it.
Raises ERROR IO.Error[StreamClosed..] if the pusher has already closed the stream. Raises the same error if the puller has closed the stream and broken the pipe.
ENABLE UNWIND => NULL;
data: DataHandle ← NARROW[self.streamData];
IF data.pusherClosed THEN ERROR IO.Error[StreamClosed, self];
IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self];
WHILE TRUE DO
data.pushPos ← data.pushPos + 1;
IF data.pushPos = data.buffer.length THEN data.pushPos ← 0;
IF data.pushPos # data.pullPos THEN EXIT;
IF data.pushPos = 0 THEN data.pushPos ← data.buffer.length;
data.pushPos ← data.pushPos - 1;
WAIT data.pipeChange;
ENDLOOP;
data.buffer[data.pushPos] ← char;
BROADCAST data.pipeChange;
};
PushPutBlock: ENTRY PROC[self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] = {
ENABLE UNWIND => NULL;
WaitForSpace: INTERNAL PROC [] = {
WHILE data.pushPos + 1 = data.pullPos OR (data.pullPos = 0 AND data.pushPos = data.buffer.length - 1) DO
WAIT data.pipeChange;
ENDLOOP;
};
data: DataHandle ← NARROW[self.streamData];
numChars: INT;
stopIndexPlusOne: NAT ← AddNAT[startIndex, count];
stopIndexPlusOne ← MIN[ stopIndexPlusOne, block.length ];
WHILE startIndex < stopIndexPlusOne DO
IF data.pusherClosed THEN ERROR IO.Error[StreamClosed, self];
IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self];
WaitForSpace[];
data.pushPos ← data.pushPos + 1;
IF data.pushPos = data.buffer.length THEN data.pushPos ← 0;
IF data.pushPos > data.pullPos THEN
numChars ← MIN[ data.buffer.length-data.pushPos, stopIndexPlusOne-startIndex ]
ELSE
numChars ← MIN[ data.pullPos-data.pushPos, stopIndexPlusOne-startIndex ];
FOR i: INT IN [data.pushPos..data.pushPos+numChars-1] DO
data.buffer[i] ← block[startIndex];
startIndex ← startIndex+1;
ENDLOOP;
data.pushPos ← data.pushPos+numChars-1;
ENDLOOP;
BROADCAST data.pipeChange;
};
PushFlush: ENTRY PROC [self: STREAM] = {
ENABLE UNWIND => NULL;
PushFlusher[self];
};
PushFlusher: INTERNAL PROC [self: STREAM] = {
Waits until the reader has read all of the characters that are currently in the pipe.
Raises IO.Error[StreamClosed..] if the puller closes the file while there's stuff in the pipe.
data: DataHandle ← NARROW[self.streamData];
WHILE data.pushPos # data.pullPos DO
IF data.pullerClosed THEN ERROR IO.Error[StreamClosed, self];
WAIT data.pipeChange;
ENDLOOP;
BROADCAST data.pipeChange;
};
PushReset: ENTRY PROC [self: STREAM] = {
Clears out the pipe.
ENABLE UNWIND => NULL;
data: DataHandle ← NARROW[self.streamData];
data.pushPos ← data.pullPos ← 0;
BROADCAST data.pipeChange;
};
PushClose: ENTRY PROC [self: STREAM, abort: BOOLFALSE] = {
Closes the pusher's end of the pipe. Flushes out the pipe iff ~abort.
Raises IO.Error[StreamClosed..] if the puller has closed the stream and there is still something in it.
ENABLE UNWIND => NULL;
data: DataHandle ← NARROW[self.streamData];
IF data.pullerClosed AND data.pushPos # data.pullPos THEN ERROR IO.Error[StreamClosed, self];
IF ~abort THEN PushFlusher[self];
data.pusherClosed ← TRUE;
BROADCAST data.pipeChange;
};
pushProcs: REF IO.StreamProcs = IO.CreateStreamProcs[
variety: $output, class: $Pipe,
putChar: PushPutChar,
putBlock: PushPutBlock,
flush: PushFlush,
reset: PushReset,
close: PushClose
];
pullProcs: REF IO.StreamProcs = IO.CreateStreamProcs[
variety: $input, class: $Pipe,
getChar: PullGetChar,
getBlock: PullGetBlock,
endOf: PullEndOf,
charsAvail: PullCharsAvail,
close: PullClose
];
END.
CHANGE LOG
Created by Nix on September 9, 1983 3:04 pm
Changed by MBrown on October 25, 1983 4:52 pm
Converted to Cedar 5.0; exports IOClasses.CreatePipe instead of Pipe.Open. Did minimal conversions of block operations; these should be reworked to use count instead of stopIndexPlusOne. Did substantial conversion of PullerCharsAvail. Bug in PullEndOf: returned true with stuff in buffer.
Edited on March 13, 1985 9:21:35 pm PST, by Spreitzer
Copied from [Indigo]<Cedar5.1>IO>IOPipeImpl.mesa!1 of January 13, 1984 2:22:51 pm PST.
Enabled aborts on conditions
changes to: DIRECTORY 
Edited on March 13, 1985 9:29:25 pm PST, by Spreitzer
general comments
changes to: