Cedar Communication: IO.STREAM layer on top of Pup Packet Streams
PupByteStreams.mesa
Andrew Birrell June 22, 1983 5:40 pm
DIRECTORY
IO USING[ CreateProcsStream, CreateRefStreamProcs, EndOfStream, Flush, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock ],
PrincOps USING [ByteBltBlock],
PrincOpsUtils USING [ByteBlt],
PupPktDefs USING [Get, GetSenderSizeLimit, PktsAvailable, PupPktStream, PupPktStreamAbort, PupPktStreamDestroy, PupPktStreamMake, Put, PutMark, SendAttention, WaitForAttention],
PupStream USING [PupOpenMode],
PupDefs USING [GetFreePupBuffer, ReturnFreePupBuffer, GetPupContentsBytes, SetPupContentsBytes, Pair, PupAddress, PupBuffer, PupSocketID, Tocks],
PupTypes USING [fillInSocketID],
Rope USING [ROPE];
PupByteStreams: MONITOR
IMPORTS IO, PrincOpsUtils, PupPktDefs, PupDefs
EXPORTS PupStream =
BEGIN
BSPData: TYPE = REF BSPDataObject;
BSPDataObject: TYPE = MONITORED RECORD[
pktStream: PupPktDefs.PupPktStream,
inputBuffer: PupDefs.PupBuffer ← NIL, -- if we have characters to consume
markBuffer: PupDefs.PupBuffer ← NIL, -- if we have a mark to consume
inputFinger: INT ← 0, -- index of next byte to take from inputBuffer
inputBufferSize: INT ← 0,
outputBuffer: PupDefs.PupBuffer ← NIL, -- partially filled output buffer
outputFinger: INT ← 0, -- index of next byte to store in outputBuffer
outputBufferSize: INT ← 0 -- negotiated for each stream
];
MyStreamProcs: REF IO.StreamProcs = IO.CreateRefStreamProcs[
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
putBlock: PutBlock,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
close: Close
];
PupByteStreamCreate: PUBLIC SAFE PROC[remote: PupDefs.PupAddress, ticks: PupDefs.Tocks] RETURNS [IO.STREAM] = TRUSTED
BEGIN
RETURN[PupByteStreamMake[PupTypes.fillInSocketID, remote, ticks, sendRfc, [0, 0]]]
END;
PupByteStreamMake: PUBLIC SAFE PROC[local: PupDefs.PupSocketID, remote: PupDefs.PupAddress, ticks: PupDefs.Tocks, mode: PupStream.PupOpenMode, id: PupDefs.Pair] RETURNS [IO.STREAM] = TRUSTED
BEGIN
data: BSPData = NEW[BSPDataObject ← [pktStream:
PupPktDefs.PupPktStreamMake[local, remote, ticks, mode, id] ] ];
RETURN[IO.CreateProcsStream[streamProcs: MyStreamProcs, streamData: data]]
END;
PupByteStreamAbort: PUBLIC SAFE PROC[self: IO.STREAM, text: Rope.ROPE] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
PupPktDefs.PupPktStreamAbort[data.pktStream, text];
END;
Close: SAFE PROC[self: IO.STREAM, abort: BOOLFALSE] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.pktStream # NIL THEN PupPktDefs.PupPktStreamDestroy[data.pktStream];
data.pktStream ← NIL;
IF data.inputBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.inputBuffer];
data.inputBuffer ← NIL;
IF data.markBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.markBuffer];
data.markBuffer ← NIL;
IF data.outputBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.outputBuffer];
data.outputBuffer ← NIL;
END;
GetChar: SAFE PROC[self: IO.STREAM] RETURNS[CHAR] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.inputBuffer # NIL AND data.inputFinger+1 < data.inputBufferSize
THEN -- The "+1" is to prevent us taking the last byte, so that GetBlock can free the buffer
{ data.inputFinger ← data.inputFinger+1; RETURN[data.inputBuffer.pupChars[data.inputFinger-1]] }
ELSE BEGIN
buff: PACKED ARRAY [0..1] OF CHAR;
n: INT = self.UnsafeGetBlock[[base:@buff, startIndex:0, stopIndexPlusOne:1]];
IF n = 0 THEN ERROR IO.EndOfStream[self];
RETURN[buff[0]]
END
END;
EndOf: SAFE PROC[self: IO.STREAM] RETURNS[BOOL] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
RETURN[data.markBuffer # NIL]
END;
CharsAvail: SAFE PROC[self: IO.STREAM] RETURNS[BOOL] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
RETURN[data.inputBuffer#NIL OR (data.markBuffer=NIL AND data.pktStream.PktsAvailable[])]
END;
GetBlock: SAFE PROC[self: IO.STREAM, block: REF TEXT, startIndex: NAT, stopIndexPlusOne: NAT] RETURNS[nBytesRead: NAT] = TRUSTED
BEGIN
RETURN[ self.UnsafeGetBlock[[
base: LOOPHOLE[block,LONG POINTER]+SIZE[TEXT[0]],
startIndex: startIndex,
stopIndexPlusOne: MIN[stopIndexPlusOne, block.maxLength] ] ] ]
END;
TimeOut: PUBLIC SAFE SIGNAL[nextIndex: INT] = CODE;
UnsafeGetBlock: UNSAFE PROC[self: IO.STREAM, block: IO.UnsafeBlock] RETURNS[nBytesRead: INT] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
nBytesRead ← 0;
WHILE block.startIndex < block.stopIndexPlusOne DO
WHILE data.inputBuffer = NIL DO
IF data.markBuffer # NIL THEN RETURN; -- implies EndOf[] = TRUE
data.inputBuffer ← data.pktStream.Get[];
IF data.inputBuffer = NIL
THEN SIGNAL TimeOut[block.startIndex]
ELSE IF data.inputBuffer.pupType = mark OR data.inputBuffer.pupType = aMark
THEN { data.markBuffer ← data.inputBuffer; data.inputBuffer ← NIL }
ELSE { data.inputFinger ← 0;
data.inputBufferSize ← PupDefs.GetPupContentsBytes[data.inputBuffer] };
ENDLOOP;
BEGIN
-- transfer bytes from the input buffer --
nBytes: INT = IF block.base = NIL -- means "discard data" --
THEN MIN[block.stopIndexPlusOne-block.startIndex,
data.inputBufferSize-data.inputFinger]
ELSE PrincOpsUtils.ByteBlt[
to: [blockPointer: block.base,
startIndex: block.startIndex, stopIndexPlusOne: block.stopIndexPlusOne],
from: [blockPointer: @data.inputBuffer.pupChars,
startIndex: data.inputFinger, stopIndexPlusOne: data.inputBufferSize] ];
block.startIndex ← block.startIndex + nBytes;
nBytesRead ← nBytesRead + nBytes;
data.inputFinger ← data.inputFinger + nBytes;
END;
IF data.inputFinger = data.inputBufferSize
THEN { PupDefs.ReturnFreePupBuffer[data.inputBuffer]; data.inputBuffer ← NIL };
ENDLOOP;
END;
NoMarkAvailable: PUBLIC ERROR = CODE;
ConsumeMark: PUBLIC SAFE PROC[self: IO.STREAM] RETURNS[mark: [0..256)] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
-- skip to a mark --
WHILE data.markBuffer = NIL
DO [] ← self.UnsafeGetBlock[ [base: NIL, startIndex: 0, stopIndexPlusOne: LAST[INT]] ] ENDLOOP;
mark ← data.markBuffer.pupBytes[0];
PupDefs.ReturnFreePupBuffer[data.markBuffer];
data.markBuffer ← NIL;
END;
WaitForAttention: PUBLIC SAFE PROC[self: IO.STREAM] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
data.pktStream.WaitForAttention[];
END;
PutChar: SAFE PROC[self: IO.STREAM, char: CHAR] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.outputBuffer # NIL AND data.outputFinger+1 < data.outputBufferSize
THEN -- The "+1" is to prevent us storing the last byte, so that PutBlock can send the buffer
{ data.outputBuffer.pupChars[data.outputFinger] ← char;
data.outputFinger ← data.outputFinger+1 }
ELSE BEGIN
buff: PACKED ARRAY [0..1] OF CHAR;
buff[0] ← char;
self.UnsafePutBlock[[base:@buff, startIndex:0, stopIndexPlusOne:1]];
END
END;
PutBlock: SAFE PROC[self: IO.STREAM, block: REF READONLY TEXT, startIndex: NAT, stopIndexPlusOne: NAT] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
self.UnsafePutBlock[[
base: LOOPHOLE[block,LONG POINTER]+SIZE[TEXT[0]],
startIndex: startIndex,
stopIndexPlusOne: MIN[stopIndexPlusOne, block.length] ] ];
END;
UnsafePutBlock: SAFE PROC[self: IO.STREAM, block: IO.UnsafeBlock] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.outputBufferSize = 0 THEN data.outputBufferSize ← data.pktStream.GetSenderSizeLimit[];
WHILE block.startIndex < block.stopIndexPlusOne DO
IF data.outputBuffer = NIL
THEN { data.outputBuffer ← PupDefs.GetFreePupBuffer[]; data.outputFinger ← 0 };
BEGIN
nBytes: INT = IF block.base = NIL -- means "ignore data" --
THEN MIN[block.stopIndexPlusOne-block.startIndex,
data.outputBufferSize-data.outputFinger]
ELSE PrincOpsUtils.ByteBlt[
from: [blockPointer: block.base,
startIndex: block.startIndex, stopIndexPlusOne: block.stopIndexPlusOne],
to: [blockPointer: @data.outputBuffer.pupChars,
startIndex: data.outputFinger, stopIndexPlusOne: data.outputBufferSize] ];
block.startIndex ← block.startIndex + nBytes;
data.outputFinger ← data.outputFinger + nBytes;
END;
IF data.outputFinger = data.outputBufferSize THEN self.Flush[];
ENDLOOP;
END;
Flush: SAFE PROC[self: IO.STREAM] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
b: PupDefs.PupBuffer = data.outputBuffer;
data.outputBuffer ← NIL; -- don't leave it dangling in case of StreamClosing
IF b # NIL THEN { PupDefs.SetPupContentsBytes[b, data.outputFinger]; data.pktStream.Put[b] };
END;
SendMark: PUBLIC SAFE PROC[self: IO.STREAM, mark: [0..256)] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
IF data.outputBuffer # NIL THEN self.Flush[];
data.pktStream.PutMark[mark];
END;
SendAttention: PUBLIC SAFE PROC[self: IO.STREAM] = TRUSTED
BEGIN
data: BSPData = NARROW[self.streamData];
data.pktStream.SendAttention[];
END;
END.