Cedar Communication: IO.STREAM layer on top of Pup Packet Streams
PupByteStreams.mesa
Andrew Birrell August 25, 1983 3:26 pm
MBrown September 17, 1983 8:11 pm
DIRECTORY
IO USING[ CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock ],
PrincOps USING [ByteBltBlock],
PrincOpsUtils USING [ByteBlt],
PupPktDefs USING [Get, GetSenderSizeLimit, PktsAvailable, PupPktStream, PupPktStreamAbort, PupPktStreamDestroy, PupPktStreamMake, Put, PutMark, SendAttention, WaitForAttention],
PupStream USING [PupOpenMode],
PupDefs USING [GetFreePupBuffer, ReturnFreePupBuffer, GetPupContentsBytes, SetPupContentsBytes, Pair, PupAddress, PupBuffer, PupSocketID, Tocks],
PupTypes USING [fillInSocketID],
Rope USING [ROPE];
PupByteStreams:
MONITOR
IMPORTS IO, PrincOpsUtils, PupPktDefs, PupDefs
EXPORTS PupStream =
BEGIN
BSPData: TYPE = REF BSPDataObject;
BSPDataObject:
TYPE =
MONITORED
RECORD[
pktStream: PupPktDefs.PupPktStream,
inputBuffer: PupDefs.PupBuffer ← NIL, -- if we have characters to consume
markBuffer: PupDefs.PupBuffer ← NIL, -- if we have a mark to consume
inputFinger: INT ← 0, -- index of next byte to take from inputBuffer
inputBufferSize: INT ← 0,
outputBuffer: PupDefs.PupBuffer ← NIL, -- partially filled output buffer
outputFinger: INT ← 0, -- index of next byte to store in outputBuffer
outputBufferSize: INT ← 0 -- negotiated for each stream
];
MyStreamProcs:
REF
IO.StreamProcs =
IO.CreateStreamProcs[
variety: $inputOutput, class: $Pup,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
close: Close
];
PupByteStreamCreate:
PUBLIC
SAFE PROC[remote: PupDefs.PupAddress, ticks: PupDefs.Tocks]
RETURNS [
IO.
STREAM] =
TRUSTED
BEGIN
RETURN[PupByteStreamMake[PupTypes.fillInSocketID, remote, ticks, sendRfc, [0, 0]]]
END;
PupByteStreamMake:
PUBLIC
SAFE PROC[local: PupDefs.PupSocketID, remote: PupDefs.PupAddress, ticks: PupDefs.Tocks, mode: PupStream.PupOpenMode, id: PupDefs.Pair]
RETURNS [
IO.
STREAM] =
TRUSTED
BEGIN
data: BSPData =
NEW[BSPDataObject ← [pktStream:
PupPktDefs.PupPktStreamMake[local, remote, ticks, mode, id] ] ];
RETURN[IO.CreateStream[streamProcs: MyStreamProcs, streamData: data]]
END;
PupByteStreamAbort:
PUBLIC
SAFE PROC[self:
IO.
STREAM, text: Rope.
ROPE] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
PupPktDefs.PupPktStreamAbort[data.pktStream, text];
END;
Close:
SAFE
PROC[self:
IO.
STREAM, abort:
BOOL ←
FALSE] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.pktStream # NIL THEN PupPktDefs.PupPktStreamDestroy[data.pktStream];
data.pktStream ← NIL;
IF data.inputBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.inputBuffer];
data.inputBuffer ← NIL;
IF data.markBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.markBuffer];
data.markBuffer ← NIL;
IF data.outputBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.outputBuffer];
data.outputBuffer ← NIL;
END;
GetChar:
SAFE
PROC[self:
IO.
STREAM]
RETURNS[
CHAR] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.inputBuffer # NIL AND data.inputFinger+1 < data.inputBufferSize
THEN
-- The "+1" is to prevent us taking the last byte, so that GetBlock can free the buffer
{ data.inputFinger ← data.inputFinger+1; RETURN[data.inputBuffer.pupChars[data.inputFinger-1]] }
ELSE
BEGIN
buff: PACKED ARRAY [0..1] OF CHAR;
n: INT = self.UnsafeGetBlock[[base:@buff, startIndex:0, count:1]];
IF n = 0 THEN ERROR IO.EndOfStream[self];
RETURN[buff[0]]
END
END;
EndOf:
SAFE
PROC[self:
IO.
STREAM]
RETURNS[
BOOL] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
RETURN[data.markBuffer # NIL]
END;
CharsAvail:
SAFE
PROC[self:
IO.
STREAM, wait:
BOOL]
RETURNS[
INT] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
RETURN[
IF data.inputBuffer#
NIL
OR
(data.markBuffer=NIL AND data.pktStream.PktsAvailable[]) THEN 1 ELSE 0]
END;
TimeOut: PUBLIC SAFE SIGNAL[nextIndex: INT] = CODE;
UnsafeGetBlock:
UNSAFE
PROC[self:
IO.
STREAM, block:
IO.UnsafeBlock]
RETURNS[nBytesRead:
INT] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
stopIndexPlusOne: INT = block.startIndex + block.count;
nBytesRead ← 0;
WHILE block.startIndex < stopIndexPlusOne
DO
WHILE data.inputBuffer =
NIL
DO
IF data.markBuffer # NIL THEN RETURN; -- implies EndOf[] = TRUE
data.inputBuffer ← data.pktStream.Get[];
IF data.inputBuffer = NIL
THEN SIGNAL TimeOut[block.startIndex]
ELSE
IF data.inputBuffer.pupType = mark
OR data.inputBuffer.pupType = aMark
THEN { data.markBuffer ← data.inputBuffer; data.inputBuffer ← NIL }
ELSE { data.inputFinger ← 0;
data.inputBufferSize ← PupDefs.GetPupContentsBytes[data.inputBuffer] };
ENDLOOP;
BEGIN
-- transfer bytes from the input buffer --
nBytes:
INT =
IF block.base =
NIL
-- means "discard data" --
THEN
MIN[stopIndexPlusOne-block.startIndex,
data.inputBufferSize-data.inputFinger]
ELSE PrincOpsUtils.ByteBlt[
to: [blockPointer: block.base,
startIndex: block.startIndex, stopIndexPlusOne: stopIndexPlusOne],
from: [blockPointer: @data.inputBuffer.pupChars,
startIndex: data.inputFinger, stopIndexPlusOne: data.inputBufferSize] ];
block.startIndex ← block.startIndex + nBytes;
nBytesRead ← nBytesRead + nBytes;
data.inputFinger ← data.inputFinger + nBytes;
END;
IF data.inputFinger = data.inputBufferSize
THEN { PupDefs.ReturnFreePupBuffer[data.inputBuffer]; data.inputBuffer ← NIL };
ENDLOOP;
END;
NoMarkAvailable: PUBLIC ERROR = CODE;
ConsumeMark:
PUBLIC
SAFE
PROC[self:
IO.
STREAM]
RETURNS[mark: [0..256)] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
-- skip to a mark --
WHILE data.markBuffer = NIL
DO [] ← self.UnsafeGetBlock[ [base: NIL, startIndex: 0, count: LAST[INT]] ] ENDLOOP;
mark ← data.markBuffer.pupBytes[0];
PupDefs.ReturnFreePupBuffer[data.markBuffer];
data.markBuffer ← NIL;
END;
WaitForAttention:
PUBLIC
SAFE
PROC[self:
IO.
STREAM] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
data.pktStream.WaitForAttention[];
END;
PutChar:
SAFE
PROC[self:
IO.
STREAM, char:
CHAR] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.outputBuffer # NIL AND data.outputFinger+1 < data.outputBufferSize
THEN
-- The "+1" is to prevent us storing the last byte, so that PutBlock can send the buffer
{ data.outputBuffer.pupChars[data.outputFinger] ← char;
data.outputFinger ← data.outputFinger+1 }
ELSE
BEGIN
buff: PACKED ARRAY [0..1] OF CHAR;
buff[0] ← char;
self.UnsafePutBlock[[base:@buff, startIndex:0, count:1]];
END
END;
UnsafePutBlock:
SAFE
PROC[self:
IO.
STREAM, block:
IO.UnsafeBlock] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
stopIndexPlusOne: INT = block.startIndex + block.count;
IF data.outputBufferSize = 0 THEN data.outputBufferSize ← data.pktStream.GetSenderSizeLimit[];
WHILE block.startIndex < stopIndexPlusOne
DO
IF data.outputBuffer = NIL
THEN { data.outputBuffer ← PupDefs.GetFreePupBuffer[]; data.outputFinger ← 0 };
BEGIN
nBytes:
INT =
IF block.base =
NIL
-- means "ignore data" --
THEN
MIN[stopIndexPlusOne-block.startIndex,
data.outputBufferSize-data.outputFinger]
ELSE PrincOpsUtils.ByteBlt[
from: [blockPointer: block.base,
startIndex: block.startIndex, stopIndexPlusOne: stopIndexPlusOne],
to: [blockPointer: @data.outputBuffer.pupChars,
startIndex: data.outputFinger, stopIndexPlusOne: data.outputBufferSize] ];
block.startIndex ← block.startIndex + nBytes;
data.outputFinger ← data.outputFinger + nBytes;
END;
IF data.outputFinger = data.outputBufferSize THEN Transmit[data];
ENDLOOP;
END;
Flush:
SAFE
PROC[self:
IO.
STREAM] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
Even if we have nothing to send, transmit a buffer to provoke an ack from the other end
IF data.outputBuffer = NIL
THEN { data.outputBuffer ← PupDefs.GetFreePupBuffer[]; data.outputFinger ← 0 };
data.outputBuffer.pupType ← aData;
Transmit[data];
END;
Transmit:
SAFE
PROC[data: BSPData] =
TRUSTED
BEGIN
b: PupDefs.PupBuffer = data.outputBuffer;
data.outputBuffer ← NIL; -- don't leave it dangling in case of StreamClosing
IF b # NIL THEN { PupDefs.SetPupContentsBytes[b, data.outputFinger]; data.pktStream.Put[b] };
END;
SendMark:
PUBLIC
SAFE
PROC[self:
IO.
STREAM, mark: [0..256)] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.outputBuffer # NIL THEN Transmit[data];
data.pktStream.PutMark[mark];
END;
SendAttention:
PUBLIC
SAFE
PROC[self:
IO.
STREAM] =
TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
data.pktStream.SendAttention[];
END;
END.