DIRECTORY IO USING[ CreateStream, CreateStreamProcs, EndOfStream, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock ], PrincOps USING [ByteBltBlock], PrincOpsUtils USING [ByteBlt], PupPktDefs USING [Get, GetSenderSizeLimit, PktsAvailable, PupPktStream, PupPktStreamAbort, PupPktStreamDestroy, PupPktStreamMake, Put, PutMark, SendAttention, WaitForAttention], PupStream USING [PupOpenMode], PupDefs USING [GetFreePupBuffer, ReturnFreePupBuffer, GetPupContentsBytes, SetPupContentsBytes, Pair, PupAddress, PupBuffer, PupSocketID, Tocks], PupTypes USING [fillInSocketID], Rope USING [ROPE]; PupByteStreams: MONITOR IMPORTS IO, PrincOpsUtils, PupPktDefs, PupDefs EXPORTS PupStream = BEGIN BSPData: TYPE = REF BSPDataObject; BSPDataObject: TYPE = MONITORED RECORD[ pktStream: PupPktDefs.PupPktStream, inputBuffer: PupDefs.PupBuffer _ NIL, -- if we have characters to consume markBuffer: PupDefs.PupBuffer _ NIL, -- if we have a mark to consume inputFinger: INT _ 0, -- index of next byte to take from inputBuffer inputBufferSize: INT _ 0, outputBuffer: PupDefs.PupBuffer _ NIL, -- partially filled output buffer outputFinger: INT _ 0, -- index of next byte to store in outputBuffer outputBufferSize: INT _ 0 -- negotiated for each stream ]; MyStreamProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: $inputOutput, class: $Pup, getChar: GetChar, endOf: EndOf, charsAvail: CharsAvail, unsafeGetBlock: UnsafeGetBlock, putChar: PutChar, unsafePutBlock: UnsafePutBlock, flush: Flush, close: Close ]; PupByteStreamCreate: PUBLIC SAFE PROC[remote: PupDefs.PupAddress, ticks: PupDefs.Tocks] RETURNS [IO.STREAM] = TRUSTED BEGIN RETURN[PupByteStreamMake[PupTypes.fillInSocketID, remote, ticks, sendRfc, [0, 0]]] END; PupByteStreamMake: PUBLIC SAFE PROC[local: PupDefs.PupSocketID, remote: PupDefs.PupAddress, ticks: PupDefs.Tocks, mode: PupStream.PupOpenMode, id: PupDefs.Pair] RETURNS [IO.STREAM] = TRUSTED BEGIN data: BSPData = NEW[BSPDataObject _ [pktStream: PupPktDefs.PupPktStreamMake[local, remote, ticks, mode, id] ] ]; RETURN[IO.CreateStream[streamProcs: MyStreamProcs, streamData: data]] END; PupByteStreamAbort: PUBLIC SAFE PROC[self: IO.STREAM, text: Rope.ROPE] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; PupPktDefs.PupPktStreamAbort[data.pktStream, text]; END; Close: SAFE PROC[self: IO.STREAM, abort: BOOL _ FALSE] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; IF data.pktStream # NIL THEN PupPktDefs.PupPktStreamDestroy[data.pktStream]; data.pktStream _ NIL; IF data.inputBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.inputBuffer]; data.inputBuffer _ NIL; IF data.markBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.markBuffer]; data.markBuffer _ NIL; IF data.outputBuffer # NIL THEN PupDefs.ReturnFreePupBuffer[data.outputBuffer]; data.outputBuffer _ NIL; END; GetChar: SAFE PROC[self: IO.STREAM] RETURNS[CHAR] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; IF data.inputBuffer # NIL AND data.inputFinger+1 < data.inputBufferSize THEN -- The "+1" is to prevent us taking the last byte, so that GetBlock can free the buffer { data.inputFinger _ data.inputFinger+1; RETURN[data.inputBuffer.pupChars[data.inputFinger-1]] } ELSE BEGIN buff: PACKED ARRAY [0..1] OF CHAR; n: INT = self.UnsafeGetBlock[[base:@buff, startIndex:0, count:1]]; IF n = 0 THEN ERROR IO.EndOfStream[self]; RETURN[buff[0]] END END; EndOf: SAFE PROC[self: IO.STREAM] RETURNS[BOOL] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; RETURN[data.markBuffer # NIL] END; CharsAvail: SAFE PROC[self: IO.STREAM, wait: BOOL] RETURNS[INT] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; RETURN[IF data.inputBuffer#NIL OR (data.markBuffer=NIL AND data.pktStream.PktsAvailable[]) THEN 1 ELSE 0] END; TimeOut: PUBLIC SAFE SIGNAL[nextIndex: INT] = CODE; UnsafeGetBlock: UNSAFE PROC[self: IO.STREAM, block: IO.UnsafeBlock] RETURNS[nBytesRead: INT] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; stopIndexPlusOne: INT = block.startIndex + block.count; nBytesRead _ 0; WHILE block.startIndex < stopIndexPlusOne DO WHILE data.inputBuffer = NIL DO IF data.markBuffer # NIL THEN RETURN; -- implies EndOf[] = TRUE data.inputBuffer _ data.pktStream.Get[]; IF data.inputBuffer = NIL THEN SIGNAL TimeOut[block.startIndex] ELSE IF data.inputBuffer.pupType = mark OR data.inputBuffer.pupType = aMark THEN { data.markBuffer _ data.inputBuffer; data.inputBuffer _ NIL } ELSE { data.inputFinger _ 0; data.inputBufferSize _ PupDefs.GetPupContentsBytes[data.inputBuffer] }; ENDLOOP; BEGIN -- transfer bytes from the input buffer -- nBytes: INT = IF block.base = NIL -- means "discard data" -- THEN MIN[stopIndexPlusOne-block.startIndex, data.inputBufferSize-data.inputFinger] ELSE PrincOpsUtils.ByteBlt[ to: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: stopIndexPlusOne], from: [blockPointer: @data.inputBuffer.pupChars, startIndex: data.inputFinger, stopIndexPlusOne: data.inputBufferSize] ]; block.startIndex _ block.startIndex + nBytes; nBytesRead _ nBytesRead + nBytes; data.inputFinger _ data.inputFinger + nBytes; END; IF data.inputFinger = data.inputBufferSize THEN { PupDefs.ReturnFreePupBuffer[data.inputBuffer]; data.inputBuffer _ NIL }; ENDLOOP; END; NoMarkAvailable: PUBLIC ERROR = CODE; ConsumeMark: PUBLIC SAFE PROC[self: IO.STREAM] RETURNS[mark: [0..256)] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; -- skip to a mark -- WHILE data.markBuffer = NIL DO [] _ self.UnsafeGetBlock[ [base: NIL, startIndex: 0, count: LAST[INT]] ] ENDLOOP; mark _ data.markBuffer.pupBytes[0]; PupDefs.ReturnFreePupBuffer[data.markBuffer]; data.markBuffer _ NIL; END; WaitForAttention: PUBLIC SAFE PROC[self: IO.STREAM] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; data.pktStream.WaitForAttention[]; END; PutChar: SAFE PROC[self: IO.STREAM, char: CHAR] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; IF data.outputBuffer # NIL AND data.outputFinger+1 < data.outputBufferSize THEN -- The "+1" is to prevent us storing the last byte, so that PutBlock can send the buffer { data.outputBuffer.pupChars[data.outputFinger] _ char; data.outputFinger _ data.outputFinger+1 } ELSE BEGIN buff: PACKED ARRAY [0..1] OF CHAR; buff[0] _ char; self.UnsafePutBlock[[base:@buff, startIndex:0, count:1]]; END END; UnsafePutBlock: SAFE PROC[self: IO.STREAM, block: IO.UnsafeBlock] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; stopIndexPlusOne: INT = block.startIndex + block.count; IF data.outputBufferSize = 0 THEN data.outputBufferSize _ data.pktStream.GetSenderSizeLimit[]; WHILE block.startIndex < stopIndexPlusOne DO IF data.outputBuffer = NIL THEN { data.outputBuffer _ PupDefs.GetFreePupBuffer[]; data.outputFinger _ 0 }; BEGIN nBytes: INT = IF block.base = NIL -- means "ignore data" -- THEN MIN[stopIndexPlusOne-block.startIndex, data.outputBufferSize-data.outputFinger] ELSE PrincOpsUtils.ByteBlt[ from: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: stopIndexPlusOne], to: [blockPointer: @data.outputBuffer.pupChars, startIndex: data.outputFinger, stopIndexPlusOne: data.outputBufferSize] ]; block.startIndex _ block.startIndex + nBytes; data.outputFinger _ data.outputFinger + nBytes; END; IF data.outputFinger = data.outputBufferSize THEN Transmit[data]; ENDLOOP; END; Flush: SAFE PROC[self: IO.STREAM] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; IF data.outputBuffer = NIL THEN { data.outputBuffer _ PupDefs.GetFreePupBuffer[]; data.outputFinger _ 0 }; data.outputBuffer.pupType _ aData; Transmit[data]; END; Transmit: SAFE PROC[data: BSPData] = TRUSTED BEGIN b: PupDefs.PupBuffer = data.outputBuffer; data.outputBuffer _ NIL; -- don't leave it dangling in case of StreamClosing IF b # NIL THEN { PupDefs.SetPupContentsBytes[b, data.outputFinger]; data.pktStream.Put[b] }; END; SendMark: PUBLIC SAFE PROC[self: IO.STREAM, mark: [0..256)] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; IF data.outputBuffer # NIL THEN Transmit[data]; data.pktStream.PutMark[mark]; END; SendAttention: PUBLIC SAFE PROC[self: IO.STREAM] = TRUSTED BEGIN data: BSPData = NARROW[self.streamData]; data.pktStream.SendAttention[]; END; END. Cedar Communication: IO.STREAM layer on top of Pup Packet Streams PupByteStreams.mesa Andrew Birrell August 25, 1983 3:26 pm MBrown September 17, 1983 8:11 pm Even if we have nothing to send, transmit a buffer to provoke an ack from the other end Ê õ˜JšœA™AJšœ™Jšœ'™'Jšœ"™"J˜šÏk ˜ Jšœœ0œ=˜{Jšœ˜Jšœœ ˜Jšœ œ¡˜±Jšœ œ˜Jšœœ„˜‘Jšœ œ˜ Jšœœœ˜—J˜šœ˜Jšœ'˜.Jšœ ˜—J˜Jš˜J˜Jšœ œœ˜"J˜šœœ œœ˜'J˜#Jšœ!œÏc#˜IJšœ œž˜DJšœ œž.˜DJšœœ˜Jšœ"œž!˜HJšœœž.˜EJšœœž˜7J˜—J˜šœœœœ˜9Jšœ#˜#J˜J˜ J˜J˜J˜J˜J˜ J˜ J˜—J˜š Ïnœœ œ3œœœ˜uJš˜JšœL˜RJšœ˜—J˜š Ÿœœ œ~œœœ˜¾Jš˜šœœ˜/J˜@—Jšœœ<˜EJšœ˜—J˜š Ÿœœ œœœ œ˜PJš˜Jšœœ˜)J˜3Jšœ˜—J˜šŸœœœœœ œœ˜@Jš˜Jšœœ˜)Jšœœœ0˜LJšœœ˜Jšœœœ/˜MJšœœ˜Jšœœœ.˜KJšœœ˜Jšœœœ0˜OJšœœ˜Jšœ˜—J˜šŸœœœœœœœ˜;Jš˜Jšœœ˜)Jšœœœ*˜GšœžW˜\Jšœ)œ1˜`—šœ˜ Jš œœœœœ˜"Jšœœ<˜BJšœœœœ˜)Jšœ ˜Jš˜—Jšœ˜—J˜šŸœœœœœœœ˜9Jš˜Jšœœ˜)Jšœœ˜Jšœ˜—J˜šŸ œœœœœœœœ˜IJš˜Jšœœ˜)šœœœ˜!Jš œœœ!œœ˜G—Jšœ˜—J˜Jš œ œ œ œœ˜3J˜šŸœœœœœ œœ œ˜fJš˜Jšœœ˜)Jšœœ"˜7Jšœ˜šœ%˜,šœœ˜Jš œœœœž˜?Jšœ(˜(Jšœ˜Jšœœ˜%šœœ!œ!˜KJšœ:œ˜Cšœ˜JšœG˜G———Jšœ˜š˜Jšž*˜*šœœœœž˜<šœœ#˜+Jšœ&˜&—šœ˜šœ˜JšœB˜B—šœ1˜1JšœH˜H———Jšœ-˜-Jšœ!˜!Jšœ-˜-—Jšœ˜Jšœ(˜*JšœEœ˜O—Jšœ˜Jšœ˜—J˜Jšœœœœ˜%J˜šŸ œœœœœœœ˜PJš˜Jšœœ˜)Jšž˜Jšœ˜Jš œ"œœœœ˜TJšœ#˜#Jšœ-˜-Jšœœ˜Jšœ˜—J˜š Ÿœœœœœœ˜=Jš˜Jšœœ˜)J˜"Jšœ˜—J˜š Ÿœœœœœœ˜9Jš˜Jšœœ˜)Jšœœœ,˜JšœžX˜]Jšœ7˜7Jšœ)˜)—šœ˜ Jš œœœœœ˜"J˜Jšœ9˜9Jš˜—Jšœ˜—J˜š Ÿœœœœœ œ˜KJš˜Jšœœ˜)Jšœœ"˜7Jšœœ=˜^šœ%˜,Jšœ˜JšœK˜Oš˜šœœœœž˜;šœœ#˜+Jšœ(˜(—šœ˜šœ ˜ JšœB˜B—šœ0˜0JšœJ˜J———Jšœ-˜-Jšœ/˜/—Jšœ˜Jšœ+œ˜A—Jšœ˜Jšœ˜—J˜š Ÿœœœœœ˜+Jš˜Jšœœ˜)JšœW™WJšœ˜JšœK˜OJšœ"˜"Jšœ˜Jšœ˜—J˜šŸœœœ˜,Jš˜Jšœ)˜)Jšœœž3˜LJšœœœN˜]Jšœ˜—J˜š Ÿœœœœœœ˜EJš˜Jšœœ˜)Jšœœœ˜/Jšœ˜Jšœ˜—J˜š Ÿ œœœœœœ˜:Jš˜Jšœœ˜)Jšœ˜Jšœ˜—J˜Jšœ˜—…— *ý