<> <> <> <> <> DIRECTORY IO USING [CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock ], PrincOps USING [ByteBltBlock], PrincOpsUtils USING [ByteBlt], PupPktDefs USING [Get, GetSenderSizeLimit, PktsAvailable, PupPktStream, PupPktStreamAbort, PupPktStreamDestroy, PupPktStreamMake, Put, PutMark, SendAttention, WaitForAttention], PupStream USING [PupOpenMode], PupDefs USING [GetFreePupBuffer, ReturnFreePupBuffer, GetPupContentsBytes, SetPupContentsBytes, Pair, PupAddress, PupBuffer, PupSocketID, Tocks], PupTypes USING [fillInSocketID], Rope USING [ROPE]; PupByteStreams: CEDAR MONITOR IMPORTS IO, PrincOpsUtils, PupPktDefs, PupDefs EXPORTS PupStream = { STREAM: TYPE = IO.STREAM; BSPData: TYPE = REF BSPDataObject; BSPDataObject: TYPE = MONITORED RECORD[ pktStream: PupPktDefs.PupPktStream, inputBuffer: PupDefs.PupBuffer _ NIL, -- if we have characters to consume markBuffer: PupDefs.PupBuffer _ NIL, -- if we have a mark to consume inputFinger: INT _ 0, -- index of next byte to take from inputBuffer inputBufferSize: INT _ 0, outputBuffer: PupDefs.PupBuffer _ NIL, -- partially filled output buffer outputFinger: INT _ 0, -- index of next byte to store in outputBuffer outputBufferSize: INT _ 0 -- negotiated for each stream ]; MyStreamProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $inputOutput, class: $Pup, getChar: GetChar, endOf: EndOf, charsAvail: CharsAvail, unsafeGetBlock: UnsafeGetBlock, putChar: PutChar, unsafePutBlock: UnsafePutBlock, flush: Flush, close: Close ]; PupByteStreamCreate: PUBLIC PROC [remote: PupDefs.PupAddress, ticks: PupDefs.Tocks] RETURNS [STREAM] = { RETURN [PupByteStreamMake[PupTypes.fillInSocketID, remote, ticks, sendRfc, [0, 0]]] }; PupByteStreamMake: PUBLIC PROC [local: PupDefs.PupSocketID, remote: PupDefs.PupAddress, ticks: PupDefs.Tocks, mode: PupStream.PupOpenMode, id: PupDefs.Pair] RETURNS [STREAM] = TRUSTED { data: BSPData = NEW[BSPDataObject _ [ pktStream: PupPktDefs.PupPktStreamMake[local, remote, ticks, mode, id] ] ]; RETURN [IO.CreateStream[streamProcs: MyStreamProcs, streamData: data]] }; PupByteStreamAbort: PUBLIC PROC [self: STREAM, text: Rope.ROPE] = TRUSTED { data: BSPData = NARROW[self.streamData]; PupPktDefs.PupPktStreamAbort[data.pktStream, text]; }; Close: PROC [self: STREAM, abort: BOOL _ FALSE] = TRUSTED { data: BSPData = NARROW[self.streamData]; IF data.pktStream # NIL THEN { PupPktDefs.PupPktStreamDestroy[data.pktStream]; data.pktStream _ NIL; }; IF data.inputBuffer # NIL THEN { PupDefs.ReturnFreePupBuffer[data.inputBuffer]; data.inputBuffer _ NIL; }; IF data.markBuffer # NIL THEN { PupDefs.ReturnFreePupBuffer[data.markBuffer]; data.markBuffer _ NIL; }; IF data.outputBuffer # NIL THEN { PupDefs.ReturnFreePupBuffer[data.outputBuffer]; data.outputBuffer _ NIL; }; }; GetChar: PROC [self: STREAM] RETURNS [CHAR] = TRUSTED { data: BSPData = NARROW[self.streamData]; IF data.inputBuffer # NIL AND data.inputFinger+1 < data.inputBufferSize THEN { <> data.inputFinger _ data.inputFinger+1; RETURN [data.inputBuffer.pupChars[data.inputFinger-1]] } ELSE { buff: PACKED ARRAY [0..1] OF CHAR; n: INT = self.UnsafeGetBlock[[base: LOOPHOLE[LONG[@buff]], startIndex:0, count:1]]; IF n = 0 THEN ERROR IO.EndOfStream[self]; RETURN [buff[0]] } }; EndOf: PROC [self: STREAM] RETURNS [BOOL] = { data: BSPData = NARROW[self.streamData]; GetInputBuffer[data, 0, TRUE]; RETURN [data.markBuffer # NIL] }; CharsAvail: PROC [self: STREAM, wait: BOOL] RETURNS [INT] = { < return # of chars we can guarantee to return without waiting>> < return # of chars we can guarantee to return without waiting,>> < 0 (note: if EndOf[self] then return 1)>> data: BSPData = NARROW[self.streamData]; GetInputBuffer[data, 0, NOT wait]; SELECT TRUE FROM data.inputBuffer # NIL => RETURN [data.inputBufferSize-data.inputFinger]; data.markBuffer # NIL => RETURN [1]; ENDCASE => RETURN [0]; }; TimeOut: PUBLIC SAFE SIGNAL[nextIndex: INT] = CODE; UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT _ 0] = TRUSTED { data: BSPData = NARROW[self.streamData]; stopIndexPlusOne: INT = block.startIndex + block.count; WHILE block.startIndex < stopIndexPlusOne DO GetInputBuffer[data, block.startIndex, FALSE]; SELECT TRUE FROM data.markBuffer # NIL => RETURN; ENDCASE => { <> nBytes: INT = IF block.base = NIL -- means "discard data" -- THEN MIN[stopIndexPlusOne-block.startIndex, data.inputBufferSize-data.inputFinger] ELSE PrincOpsUtils.ByteBlt[ to: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: stopIndexPlusOne], from: [blockPointer: @data.inputBuffer.pupChars, startIndex: data.inputFinger, stopIndexPlusOne: data.inputBufferSize] ]; block.startIndex _ block.startIndex + nBytes; nBytesRead _ nBytesRead + nBytes; data.inputFinger _ data.inputFinger + nBytes; }; IF data.inputFinger = data.inputBufferSize THEN { PupDefs.ReturnFreePupBuffer[data.inputBuffer]; data.inputBuffer _ NIL; }; ENDLOOP; }; GetInputBuffer: PROC [data: BSPData, start: INT _ 0, dontWait: BOOL _ TRUE] = TRUSTED { <> <<(data.inputBuffer # NIL AND data.inputFinger # data.inputBufferSize)>> <> <> DO SELECT TRUE FROM data.inputBuffer # NIL => { IF data.inputFinger # data.inputBufferSize THEN RETURN; PupDefs.ReturnFreePupBuffer[data.inputBuffer]; data.inputBuffer _ NIL; }; data.markBuffer # NIL => RETURN; ENDCASE; IF dontWait AND NOT PupPktDefs.PktsAvailable[data.pktStream] THEN RETURN; data.inputBuffer _ data.pktStream.Get[]; SELECT TRUE FROM data.inputBuffer = NIL => SIGNAL TimeOut[start]; <> data.inputBuffer.pupType = mark, data.inputBuffer.pupType = aMark => { <> data.markBuffer _ data.inputBuffer; data.inputBuffer _ NIL; }; ENDCASE => { <> data.inputFinger _ 0; data.inputBufferSize _ PupDefs.GetPupContentsBytes[data.inputBuffer]; }; ENDLOOP; }; NoMarkAvailable: PUBLIC ERROR = CODE; ConsumeMark: PUBLIC PROC [self: STREAM] RETURNS [mark: [0..256)] = TRUSTED { data: BSPData = NARROW[self.streamData]; -- skip to a mark, discarding any data along the way -- WHILE data.markBuffer = NIL DO [] _ self.UnsafeGetBlock[ [base: NIL, startIndex: 0, count: LAST[INT]] ]; ENDLOOP; mark _ data.markBuffer.pupBytes[0]; PupDefs.ReturnFreePupBuffer[data.markBuffer]; data.markBuffer _ NIL; }; WaitAttention: PUBLIC PROC [self: STREAM] = TRUSTED { data: BSPData = NARROW[self.streamData]; data.pktStream.WaitForAttention[]; }; PutChar: SAFE PROC [self: STREAM, char: CHAR] = TRUSTED { data: BSPData = NARROW[self.streamData]; IF data.outputBuffer # NIL AND data.outputFinger+1 < data.outputBufferSize THEN { <> data.outputBuffer.pupChars[data.outputFinger] _ char; data.outputFinger _ data.outputFinger+1; } ELSE { buff: PACKED ARRAY [0..1] OF CHAR; buff[0] _ char; self.UnsafePutBlock[[base: LOOPHOLE[LONG[@buff]], startIndex:0, count:1]]; } }; UnsafePutBlock: --UN-- SAFE PROC [self: STREAM, block: IO.UnsafeBlock] = TRUSTED { data: BSPData = NARROW[self.streamData]; stopIndexPlusOne: INT = block.startIndex + block.count; IF data.outputBufferSize = 0 THEN data.outputBufferSize _ data.pktStream.GetSenderSizeLimit[]; WHILE block.startIndex < stopIndexPlusOne DO IF data.outputBuffer = NIL THEN { data.outputBuffer _ PupDefs.GetFreePupBuffer[]; data.outputFinger _ 0; }; { nBytes: INT = IF block.base = NIL -- means "ignore data" -- THEN MIN[stopIndexPlusOne-block.startIndex, data.outputBufferSize-data.outputFinger] ELSE PrincOpsUtils.ByteBlt[ from: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: stopIndexPlusOne], to: [blockPointer: @data.outputBuffer.pupChars, startIndex: data.outputFinger, stopIndexPlusOne: data.outputBufferSize] ]; block.startIndex _ block.startIndex + nBytes; data.outputFinger _ data.outputFinger + nBytes; }; IF data.outputFinger = data.outputBufferSize THEN Transmit[data]; ENDLOOP; }; Flush: PROC [self: STREAM] = TRUSTED { data: BSPData = NARROW[self.streamData]; <> IF data.outputBuffer = NIL THEN { data.outputBuffer _ PupDefs.GetFreePupBuffer[]; data.outputFinger _ 0 }; data.outputBuffer.pupType _ aData; Transmit[data]; }; Transmit: PROC [data: BSPData] = TRUSTED { b: PupDefs.PupBuffer = data.outputBuffer; data.outputBuffer _ NIL; -- don't leave it dangling in case of StreamClosing IF b # NIL THEN { PupDefs.SetPupContentsBytes[b, data.outputFinger]; data.pktStream.Put[b] }; }; SendMark: PUBLIC PROC [self: STREAM, mark: [0..256)] = TRUSTED { data: BSPData = NARROW[self.streamData]; IF data.outputBuffer # NIL THEN Transmit[data]; data.pktStream.PutMark[mark]; }; SendAttention: PUBLIC PROC [self: STREAM] = TRUSTED { data: BSPData = NARROW[self.streamData]; data.pktStream.SendAttention[]; }; }.