<> <> <> <> <> <> <> <<>> DIRECTORY IO USING [CreateStream, CreateStreamProcs, EndOfStream, Error, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock], IPDefs USING [Address], PrincOpsUtils USING [ByteBlt], TCP, TCPOps USING [Open, Close, Abort, TCPHandle, TCPRcvBuffer, SetUrgent, WaitForUrgentData, GetNextDatagram, SendCurrentDatagram, WaitForListenerOpen]; TCPMain: CEDAR PROGRAM IMPORTS IO, PrincOpsUtils, TCPOps EXPORTS TCP = BEGIN OPEN TCP; TCPStreamProcs: REF IO.StreamProcs = IO.CreateStreamProcs[ variety: inputOutput, class: $TCP, getChar: GetChar, endOf: EndOf, charsAvail: CharsAvail, getBlock: GetBlock, unsafeGetBlock: UnsafeGetBlock, putChar: PutChar, putBlock: PutBlock, unsafePutBlock: UnsafePutBlock, flush: Flush, close: Close, getIndex: GetIndex ]; Error: PUBLIC ERROR [reason: Reason] = CODE; Timeout: PUBLIC --INFORMATIONAL-- SIGNAL = CODE; CreateTCPStream: PUBLIC PROC [tcpInfo: TCPInfo] RETURNS [s: STREAM] ~ { handle: TCPOps.TCPHandle _ TCPOps.Open[tcpInfo]; s _ IO.CreateStream[streamProcs: TCPStreamProcs, streamData: handle]; }; AbortTCPStream: PUBLIC PROC [s: STREAM] ~ { handle: TCPOps.TCPHandle _ NARROW[s.streamData]; TCPOps.Abort[handle]; }; WaitForListenerOpen: PUBLIC PROC [s: STREAM, timeout: INT _ neverTimeout] ~ { handle: TCPOps.TCPHandle _ NARROW[s.streamData]; TCPOps.WaitForListenerOpen[handle, timeout]; }; GetRemoteAddress: PUBLIC PROC [s: STREAM] RETURNS [remote: IPDefs.Address] ~ { handle: TCPOps.TCPHandle _ NARROW[s.streamData]; remote _ handle.foreignAddr; }; SetUrgent: PUBLIC PROC [s: STREAM] ~ { handle: TCPOps.TCPHandle _ NARROW[s.streamData]; handle.SetUrgent[]; }; WaitForUrgentData: PUBLIC PROC [s: STREAM] RETURNS [urgentIndex: INT] ~ { handle: TCPOps.TCPHandle _ NARROW[s.streamData]; RETURN [handle.WaitForUrgentData[] - handle.irs - 1]; }; ErrorFromStream: PUBLIC PROC [s: STREAM] RETURNS [reason: Reason] ~ { handle: TCPOps.TCPHandle _ NARROW[s.streamData]; RETURN [handle.reason]; }; GetChar: PROC [self: STREAM] RETURNS [c: CHAR] ~ { handle: TCPOps.TCPHandle _ NARROW[self.streamData]; rb: REF TCPOps.TCPRcvBuffer _ handle.currentInputBuffer; IF rb # NIL AND rb.dataByteCount > 0 THEN { c _ LOOPHOLE[rb.datagramPtr.data[rb.dataOffset]]; rb.dataOffset _ rb.dataOffset + 1; rb.dataByteCount _ rb.dataByteCount - 1; } ELSE TRUSTED { buff: PACKED ARRAY [0..1] OF CHAR; n: INT = IO.UnsafeGetBlock[self, [base: LOOPHOLE[LONG[@buff]], startIndex: 0, count: 1]]; IF n = 0 THEN ERROR IO.EndOfStream[self]; RETURN[buff[0]]; }; }; EndOf: PROC [self: STREAM] RETURNS [BOOL] ~ { handle: TCPOps.TCPHandle _ NARROW[self.streamData]; RETURN [(SELECT handle.state FROM closed, closing, closeWait, timeWait, lastAck => TRUE, ENDCASE => FALSE) AND (handle.currentInputBuffer=NIL OR handle.currentInputBuffer.dataByteCount<=0) AND handle.readyToReadQueue=NIL]; }; CharsAvail: PROC [self: STREAM, wait: BOOL _ FALSE] RETURNS [INT] ~ { handle: TCPOps.TCPHandle _ NARROW[self.streamData]; WHILE handle.currentInputBuffer=NIL OR handle.currentInputBuffer.dataByteCount<=0 DO IF handle.readyToReadQueue=NIL AND ~wait THEN RETURN [0]; handle.GetNextDatagram[ ! Error => GOTO error]; ENDLOOP; RETURN [handle.currentInputBuffer.dataByteCount]; EXITS error => RETURN[1]; -- will cause client to read the stream and get the error }; GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT] ~ TRUSTED { RETURN[ self.UnsafeGetBlock[[ base: LOOPHOLE[BASE[DESCRIPTOR[block]]], startIndex: startIndex, count: MIN[count, block.maxLength-startIndex] ] ] ] }; UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT] ~ UNCHECKED { handle: TCPOps.TCPHandle _ NARROW[self.streamData]; rb: REF TCPOps.TCPRcvBuffer _ handle.currentInputBuffer; nBytes: INT; nBytesRead _ 0; WHILE block.count>0 DO IF rb = NIL OR rb.dataByteCount <= 0 THEN handle.GetNextDatagram[ ! Error => {handle.reason _ reason; GOTO error}]; rb _ handle.currentInputBuffer; nBytes _ IF block.base = NIL -- means "discard data" THEN MIN[block.count, rb.dataByteCount] ELSE PrincOpsUtils.ByteBlt[ to: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: block.startIndex+block.count], from: [blockPointer: @rb.datagramPtr.data, startIndex: rb.dataOffset, stopIndexPlusOne: rb.dataOffset+rb.dataByteCount] ]; block.startIndex _ block.startIndex + nBytes; block.count _ block.count - nBytes; nBytesRead _ nBytesRead + nBytes; rb.dataOffset _ rb.dataOffset + nBytes; rb.dataByteCount _ rb.dataByteCount - nBytes; REPEAT error => IF handle.reason#remoteClose THEN ERROR IO.Error[$Failure, self]; <> ENDLOOP; }; PutChar: PROC [self: STREAM, char: CHAR] ~ { handle: TCPOps.TCPHandle _ NARROW[self.streamData]; IF handle.currentOutputDatagram # NIL AND handle.currentOutputPtr < handle.currentOutputLimit THEN { handle.currentOutputDatagram.data[handle.currentOutputPtr] _ LOOPHOLE[char]; handle.currentOutputPtr _ handle.currentOutputPtr + 1; } ELSE TRUSTED { buff: PACKED ARRAY [0..1] OF CHAR; buff[0] _ char; IO.UnsafePutBlock[self, [base: LOOPHOLE[LONG[@buff]], startIndex: 0, count: 1]]; }; }; PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] ~ TRUSTED { self.UnsafePutBlock[[ base: LOOPHOLE[BASE[DESCRIPTOR[block]]], startIndex: startIndex, count: MIN[count, block.length-startIndex] ] ]; }; UnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ TRUSTED { handle: TCPOps.TCPHandle _ NARROW[self.streamData]; nBytes: INT; WHILE block.count>0 DO IF handle.currentOutputDatagram = NIL OR handle.currentOutputPtr >= handle.currentOutputLimit THEN handle.SendCurrentDatagram[FALSE ! Error => {handle.reason _ reason; GOTO error}]; nBytes _ IF block.base = NIL -- means "ignore data" THEN MIN[block.count, handle.currentOutputLimit - handle.currentOutputPtr] ELSE PrincOpsUtils.ByteBlt[ from: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: block.startIndex+block.count], to: [blockPointer: @handle.currentOutputDatagram.data, startIndex: handle.currentOutputPtr, stopIndexPlusOne: handle.currentOutputLimit] ]; block.startIndex _ block.startIndex + nBytes; block.count _ block.count - nBytes; handle.currentOutputPtr _ handle.currentOutputPtr + nBytes; ENDLOOP; EXITS error => ERROR IO.Error[$Failure, self]; }; Flush: PROC [self: STREAM] ~ { handle: TCPOps.TCPHandle _ NARROW[self.streamData]; handle.SendCurrentDatagram[TRUE ! Error => {handle.reason _ reason; GOTO error}]; EXITS error => ERROR IO.Error[$Failure, self]; }; Close: PROC [self: STREAM, abort: BOOL _ FALSE] ~ { handle: TCPOps.TCPHandle _ NARROW[self.streamData]; handle.Close[ ! Error => {handle.reason _ reason; GOTO error}]; self.streamData _ NIL; -- Help GC EXITS error => ERROR IO.Error[$Failure, self]; }; GetIndex: PROC [self: STREAM] RETURNS [INT] ~ { <> handle: TCPOps.TCPHandle _ NARROW[self.streamData]; RETURN [handle.rcvNxt - handle.irs - 1]; }; END.