PupByteStreams.mesa - STREAM layer on top of Pup Packet Streams
Copyright © 1985 by Xerox Corporation. All rights reserved.
Andrew Birrell August 25, 1983 3:26 pm
MBrown September 17, 1983 8:11 pm
Russ Atkinson (RRA) September 6, 1985 2:23:15 pm PDT
DIRECTORY
IO USING [CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock ],
PrincOps USING [ByteBltBlock],
PrincOpsUtils USING [ByteBlt],
PupPktDefs USING [Get, GetSenderSizeLimit, PktsAvailable, PupPktStream, PupPktStreamAbort, PupPktStreamDestroy, PupPktStreamMake, Put, PutMark, SendAttention, WaitForAttention],
PupStream USING [PupOpenMode],
PupDefs USING [GetFreePupBuffer, ReturnFreePupBuffer, GetPupContentsBytes, SetPupContentsBytes, Pair, PupAddress, PupBuffer, PupSocketID, Tocks],
PupTypes USING [fillInSocketID],
Rope USING [ROPE];
PupByteStreams:
CEDAR MONITOR
IMPORTS IO, PrincOpsUtils, PupPktDefs, PupDefs
EXPORTS PupStream = {
STREAM: TYPE = IO.STREAM;
BSPData: TYPE = REF BSPDataObject;
BSPDataObject:
TYPE =
MONITORED
RECORD[
pktStream: PupPktDefs.PupPktStream,
inputBuffer: PupDefs.PupBuffer ← NIL, -- if we have characters to consume
markBuffer: PupDefs.PupBuffer ← NIL, -- if we have a mark to consume
inputFinger: INT ← 0, -- index of next byte to take from inputBuffer
inputBufferSize: INT ← 0,
outputBuffer: PupDefs.PupBuffer ← NIL, -- partially filled output buffer
outputFinger: INT ← 0, -- index of next byte to store in outputBuffer
outputBufferSize: INT ← 0 -- negotiated for each stream
];
MyStreamProcs:
REF
IO.StreamProcs =
IO.CreateStreamProcs[
variety: $inputOutput, class: $Pup,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
close: Close
];
PupByteStreamCreate:
PUBLIC
PROC [remote: PupDefs.PupAddress, ticks: PupDefs.Tocks]
RETURNS [
STREAM] = {
RETURN [PupByteStreamMake[PupTypes.fillInSocketID, remote, ticks, sendRfc, [0, 0]]]
};
PupByteStreamMake:
PUBLIC
PROC [local: PupDefs.PupSocketID, remote: PupDefs.PupAddress, ticks: PupDefs.Tocks, mode: PupStream.PupOpenMode, id: PupDefs.Pair]
RETURNS [
STREAM] =
TRUSTED {
data: BSPData = NEW[BSPDataObject ← [
pktStream: PupPktDefs.PupPktStreamMake[local, remote, ticks, mode, id] ] ];
RETURN [IO.CreateStream[streamProcs: MyStreamProcs, streamData: data]]
};
PupByteStreamAbort:
PUBLIC
PROC [self:
STREAM, text: Rope.
ROPE] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
PupPktDefs.PupPktStreamAbort[data.pktStream, text];
};
Close:
PROC [self:
STREAM, abort:
BOOL ←
FALSE] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
IF data.pktStream #
NIL
THEN {
PupPktDefs.PupPktStreamDestroy[data.pktStream];
data.pktStream ← NIL;
};
IF data.inputBuffer #
NIL
THEN {
PupDefs.ReturnFreePupBuffer[data.inputBuffer];
data.inputBuffer ← NIL;
};
IF data.markBuffer #
NIL
THEN {
PupDefs.ReturnFreePupBuffer[data.markBuffer];
data.markBuffer ← NIL;
};
IF data.outputBuffer #
NIL
THEN {
PupDefs.ReturnFreePupBuffer[data.outputBuffer];
data.outputBuffer ← NIL;
};
};
GetChar:
PROC [self:
STREAM]
RETURNS [
CHAR] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
IF data.inputBuffer #
NIL
AND data.inputFinger+1 < data.inputBufferSize
THEN {
The "+1" is to prevent us taking the last byte, so that GetBlock can free the buffer
data.inputFinger ← data.inputFinger+1; RETURN [data.inputBuffer.pupChars[data.inputFinger-1]] }
ELSE {
buff: PACKED ARRAY [0..1] OF CHAR;
n: INT = self.UnsafeGetBlock[[base: LOOPHOLE[LONG[@buff]], startIndex:0, count:1]];
IF n = 0 THEN ERROR IO.EndOfStream[self];
RETURN [buff[0]]
}
};
EndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] = {
data: BSPData = NARROW[self.streamData];
GetInputBuffer[data, 0, TRUE];
RETURN [data.markBuffer # NIL]
};
CharsAvail:
PROC [self:
STREAM, wait:
BOOL]
RETURNS [
INT] = {
wait = FALSE => return # of chars we can guarantee to return without waiting
wait = TRUE => return # of chars we can guarantee to return without waiting,
and wait until we can return > 0 (note: if EndOf[self] then return 1)
data: BSPData = NARROW[self.streamData];
GetInputBuffer[data, 0, NOT wait];
SELECT
TRUE
FROM
data.inputBuffer # NIL => RETURN [data.inputBufferSize-data.inputFinger];
data.markBuffer # NIL => RETURN [1];
ENDCASE => RETURN [0];
};
TimeOut: PUBLIC SAFE SIGNAL[nextIndex: INT] = CODE;
UnsafeGetBlock:
UNSAFE
PROC [self:
STREAM, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT ← 0] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
stopIndexPlusOne: INT = block.startIndex + block.count;
WHILE block.startIndex < stopIndexPlusOne
DO
GetInputBuffer[data, block.startIndex, FALSE];
SELECT
TRUE
FROM
data.markBuffer # NIL => RETURN;
ENDCASE => {
transfer bytes from the input buffer
nBytes:
INT =
IF block.base =
NIL
-- means "discard data" --
THEN MIN[stopIndexPlusOne-block.startIndex, data.inputBufferSize-data.inputFinger]
ELSE PrincOpsUtils.ByteBlt[
to: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: stopIndexPlusOne],
from: [blockPointer: @data.inputBuffer.pupChars, startIndex: data.inputFinger, stopIndexPlusOne: data.inputBufferSize] ];
block.startIndex ← block.startIndex + nBytes;
nBytesRead ← nBytesRead + nBytes;
data.inputFinger ← data.inputFinger + nBytes;
};
IF data.inputFinger = data.inputBufferSize
THEN {
PupDefs.ReturnFreePupBuffer[data.inputBuffer];
data.inputBuffer ← NIL;
};
ENDLOOP;
};
GetInputBuffer:
PROC [data: BSPData, start:
INT ← 0, dontWait:
BOOL ←
TRUE] =
TRUSTED {
Ensures
(data.inputBuffer # NIL AND data.inputFinger # data.inputBufferSize)
OR data.markBuffer # NIL
OR (dontWait AND NOT PupPktDefs.PktsAvailable[data.pktStream])
DO
SELECT
TRUE
FROM
data.inputBuffer #
NIL => {
IF data.inputFinger # data.inputBufferSize THEN RETURN;
PupDefs.ReturnFreePupBuffer[data.inputBuffer];
data.inputBuffer ← NIL;
};
data.markBuffer # NIL => RETURN;
ENDCASE;
IF dontWait AND NOT PupPktDefs.PktsAvailable[data.pktStream] THEN RETURN;
data.inputBuffer ← data.pktStream.Get[];
SELECT
TRUE
FROM
data.inputBuffer =
NIL =>
SIGNAL TimeOut[start];
stream will not deliver data within acceptable time
data.inputBuffer.pupType = mark, data.inputBuffer.pupType = aMark => {
A mark has arrived, so EndOf[stream] = TRUE
data.markBuffer ← data.inputBuffer;
data.inputBuffer ← NIL;
};
ENDCASE => {
A data buffer has arrived, so setup the input finger and limit
data.inputFinger ← 0;
data.inputBufferSize ← PupDefs.GetPupContentsBytes[data.inputBuffer];
};
ENDLOOP;
};
NoMarkAvailable: PUBLIC ERROR = CODE;
ConsumeMark:
PUBLIC
PROC [self:
STREAM]
RETURNS [mark: [0..256)] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
-- skip to a mark, discarding any data along the way --
WHILE data.markBuffer =
NIL DO
[] ← self.UnsafeGetBlock[ [base: NIL, startIndex: 0, count: LAST[INT]] ];
ENDLOOP;
mark ← data.markBuffer.pupBytes[0];
PupDefs.ReturnFreePupBuffer[data.markBuffer];
data.markBuffer ← NIL;
};
WaitAttention:
PUBLIC
PROC [self:
STREAM] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
data.pktStream.WaitForAttention[];
};
PutChar:
SAFE
PROC [self:
STREAM, char:
CHAR] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
IF data.outputBuffer #
NIL
AND data.outputFinger+1 < data.outputBufferSize
THEN {
The "+1" is to prevent us storing the last byte, so that PutBlock can send the buffer
data.outputBuffer.pupChars[data.outputFinger] ← char;
data.outputFinger ← data.outputFinger+1;
}
ELSE {
buff: PACKED ARRAY [0..1] OF CHAR;
buff[0] ← char;
self.UnsafePutBlock[[base: LOOPHOLE[LONG[@buff]], startIndex:0, count:1]];
}
};
UnsafePutBlock:
--UN--
SAFE
PROC [self:
STREAM, block:
IO.UnsafeBlock] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
stopIndexPlusOne: INT = block.startIndex + block.count;
IF data.outputBufferSize = 0 THEN data.outputBufferSize ← data.pktStream.GetSenderSizeLimit[];
WHILE block.startIndex < stopIndexPlusOne
DO
IF data.outputBuffer =
NIL
THEN {
data.outputBuffer ← PupDefs.GetFreePupBuffer[];
data.outputFinger ← 0;
};
{
nBytes:
INT =
IF block.base =
NIL
-- means "ignore data" --
THEN
MIN[stopIndexPlusOne-block.startIndex,
data.outputBufferSize-data.outputFinger]
ELSE PrincOpsUtils.ByteBlt[
from: [blockPointer: block.base,
startIndex: block.startIndex, stopIndexPlusOne: stopIndexPlusOne],
to: [blockPointer: @data.outputBuffer.pupChars,
startIndex: data.outputFinger, stopIndexPlusOne: data.outputBufferSize] ];
block.startIndex ← block.startIndex + nBytes;
data.outputFinger ← data.outputFinger + nBytes;
};
IF data.outputFinger = data.outputBufferSize THEN Transmit[data];
ENDLOOP;
};
Flush:
PROC [self:
STREAM] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
Even if we have nothing to send, transmit a buffer to provoke an ack from the other end
IF data.outputBuffer =
NIL THEN {
data.outputBuffer ← PupDefs.GetFreePupBuffer[];
data.outputFinger ← 0 };
data.outputBuffer.pupType ← aData;
Transmit[data];
};
Transmit:
PROC [data: BSPData] =
TRUSTED {
b: PupDefs.PupBuffer = data.outputBuffer;
data.outputBuffer ← NIL; -- don't leave it dangling in case of StreamClosing
IF b # NIL THEN { PupDefs.SetPupContentsBytes[b, data.outputFinger]; data.pktStream.Put[b] };
};
SendMark:
PUBLIC
PROC [self:
STREAM, mark: [0..256)] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
IF data.outputBuffer # NIL THEN Transmit[data];
data.pktStream.PutMark[mark];
};
SendAttention:
PUBLIC
PROC [self:
STREAM] =
TRUSTED {
data: BSPData = NARROW[self.streamData];
data.pktStream.SendAttention[];
};
}.