TCPMain.mesa
Last Edited by: Nichols, September 1, 1983 4:39 pm
Last Edited by: Taft, January 2, 1984 4:18 pm
Last Edited by: HGM, April 17, 1984 7:06:25 pm PST
Hal Murray May 16, 1985 3:13:52 am PDT
John Larson, April 14, 1986 11:22:18 pm PST
DIRECTORY
IO USING [CreateStream, CreateStreamProcs, EndOfStream, Error, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock],
IPDefs USING [Address],
PrincOpsUtils USING [ByteBlt],
TCP,
TCPOps USING [Open, Close, Abort, TCPHandle, TCPRcvBuffer, SetUrgent, WaitForUrgentData, GetNextDatagram, SendCurrentDatagram, WaitForListenerOpen];
TCPMain:
CEDAR
PROGRAM
IMPORTS IO, PrincOpsUtils, TCPOps
EXPORTS TCP =
BEGIN OPEN TCP;
TCPStreamProcs:
REF
IO.StreamProcs =
IO.CreateStreamProcs[
variety: inputOutput,
class: $TCP,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
putBlock: PutBlock,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
close: Close,
getIndex: GetIndex
];
Error: PUBLIC ERROR [reason: Reason] = CODE;
Timeout: PUBLIC --INFORMATIONAL-- SIGNAL = CODE;
CreateTCPStream:
PUBLIC
PROC [tcpInfo: TCPInfo]
RETURNS [s:
STREAM] ~ {
handle: TCPOps.TCPHandle ← TCPOps.Open[tcpInfo];
s ← IO.CreateStream[streamProcs: TCPStreamProcs, streamData: handle]; };
AbortTCPStream:
PUBLIC
PROC [s:
STREAM] ~ {
handle: TCPOps.TCPHandle ← NARROW[s.streamData];
TCPOps.Abort[handle]; };
WaitForListenerOpen:
PUBLIC
PROC [s:
STREAM, timeout:
INT ← neverTimeout] ~ {
handle: TCPOps.TCPHandle ← NARROW[s.streamData];
TCPOps.WaitForListenerOpen[handle, timeout]; };
GetRemoteAddress:
PUBLIC
PROC [s:
STREAM]
RETURNS [remote: IPDefs.Address] ~ {
handle: TCPOps.TCPHandle ← NARROW[s.streamData];
remote ← handle.foreignAddr;
};
SetUrgent:
PUBLIC
PROC [s:
STREAM] ~ {
handle: TCPOps.TCPHandle ← NARROW[s.streamData];
handle.SetUrgent[];
};
WaitForUrgentData:
PUBLIC
PROC [s:
STREAM]
RETURNS [urgentIndex:
INT] ~ {
handle: TCPOps.TCPHandle ← NARROW[s.streamData];
RETURN [handle.WaitForUrgentData[] - handle.irs - 1];
};
ErrorFromStream:
PUBLIC
PROC [s:
STREAM]
RETURNS [reason: Reason] ~ {
handle: TCPOps.TCPHandle ← NARROW[s.streamData];
RETURN [handle.reason];
};
GetChar:
PROC [self:
STREAM]
RETURNS [c:
CHAR] ~ {
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
rb: REF TCPOps.TCPRcvBuffer ← handle.currentInputBuffer;
IF rb #
NIL
AND rb.dataByteCount > 0
THEN {
c ← LOOPHOLE[rb.datagramPtr.data[rb.dataOffset]];
rb.dataOffset ← rb.dataOffset + 1;
rb.dataByteCount ← rb.dataByteCount - 1; }
ELSE
TRUSTED {
buff: PACKED ARRAY [0..1] OF CHAR;
n: INT = IO.UnsafeGetBlock[self, [base: LOOPHOLE[LONG[@buff]], startIndex: 0, count: 1]];
IF n = 0 THEN ERROR IO.EndOfStream[self];
RETURN[buff[0]]; };
};
EndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] ~ {
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
RETURN [(
SELECT handle.state
FROM
closed, closing, closeWait, timeWait, lastAck => TRUE,
ENDCASE => FALSE) AND (handle.currentInputBuffer=NIL OR handle.currentInputBuffer.dataByteCount<=0) AND handle.readyToReadQueue=NIL];
};
CharsAvail:
PROC [self:
STREAM, wait:
BOOL ←
FALSE]
RETURNS [
INT] ~ {
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
WHILE handle.currentInputBuffer=
NIL
OR handle.currentInputBuffer.dataByteCount<=0
DO
IF handle.readyToReadQueue=NIL AND ~wait THEN RETURN [0];
handle.GetNextDatagram[ ! Error => GOTO error];
ENDLOOP;
RETURN [handle.currentInputBuffer.dataByteCount];
EXITS
error => RETURN[1]; -- will cause client to read the stream and get the error
};
GetBlock:
PROC [self:
STREAM, block:
REF
TEXT, startIndex:
NAT, count:
NAT]
RETURNS [nBytesRead:
NAT] ~
TRUSTED {
RETURN[ self.UnsafeGetBlock[[
base: LOOPHOLE[BASE[DESCRIPTOR[block]]],
startIndex: startIndex,
count: MIN[count, block.maxLength-startIndex] ] ] ]
};
UnsafeGetBlock:
UNSAFE
PROC [self:
STREAM, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT] ~
UNCHECKED {
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
rb: REF TCPOps.TCPRcvBuffer ← handle.currentInputBuffer;
nBytes: INT;
nBytesRead ← 0;
WHILE block.count>0
DO
IF rb =
NIL
OR rb.dataByteCount <= 0
THEN
handle.GetNextDatagram[ ! Error => {handle.reason ← reason; GOTO error}];
rb ← handle.currentInputBuffer;
nBytes ←
IF block.base =
NIL
-- means "discard data"
THEN MIN[block.count, rb.dataByteCount]
ELSE PrincOpsUtils.ByteBlt[
to: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: block.startIndex+block.count],
from: [blockPointer: @rb.datagramPtr.data, startIndex: rb.dataOffset, stopIndexPlusOne: rb.dataOffset+rb.dataByteCount] ];
block.startIndex ← block.startIndex + nBytes;
block.count ← block.count - nBytes;
nBytesRead ← nBytesRead + nBytes;
rb.dataOffset ← rb.dataOffset + nBytes;
rb.dataByteCount ← rb.dataByteCount - nBytes;
REPEAT
error => IF handle.reason#remoteClose THEN ERROR IO.Error[$Failure, self];
Treat remoteClose as EndOfStream; that is, just exit the loop and return the number of bytes successfully transferred (which might be zero).
ENDLOOP;
};
PutChar:
PROC [self:
STREAM, char:
CHAR] ~ {
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
IF handle.currentOutputDatagram #
NIL
AND handle.currentOutputPtr < handle.currentOutputLimit
THEN {
handle.currentOutputDatagram.data[handle.currentOutputPtr] ← LOOPHOLE[char];
handle.currentOutputPtr ← handle.currentOutputPtr + 1; }
ELSE
TRUSTED {
buff: PACKED ARRAY [0..1] OF CHAR;
buff[0] ← char;
IO.UnsafePutBlock[self, [base: LOOPHOLE[LONG[@buff]], startIndex: 0, count: 1]]; };
};
PutBlock:
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT, count:
NAT] ~
TRUSTED {
self.UnsafePutBlock[[
base: LOOPHOLE[BASE[DESCRIPTOR[block]]],
startIndex: startIndex,
count: MIN[count, block.length-startIndex] ] ];
};
UnsafePutBlock:
PROC [self:
STREAM, block:
IO.UnsafeBlock] ~
TRUSTED {
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
nBytes: INT;
WHILE block.count>0
DO
IF handle.currentOutputDatagram =
NIL
OR handle.currentOutputPtr >= handle.currentOutputLimit
THEN
handle.SendCurrentDatagram[FALSE ! Error => {handle.reason ← reason; GOTO error}];
nBytes ←
IF block.base =
NIL
-- means "ignore data"
THEN MIN[block.count, handle.currentOutputLimit - handle.currentOutputPtr]
ELSE PrincOpsUtils.ByteBlt[
from: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: block.startIndex+block.count],
to: [blockPointer: @handle.currentOutputDatagram.data, startIndex: handle.currentOutputPtr, stopIndexPlusOne: handle.currentOutputLimit] ];
block.startIndex ← block.startIndex + nBytes;
block.count ← block.count - nBytes;
handle.currentOutputPtr ← handle.currentOutputPtr + nBytes;
ENDLOOP;
EXITS
error => ERROR IO.Error[$Failure, self];
};
Flush:
PROC [self:
STREAM] ~ {
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
handle.SendCurrentDatagram[TRUE ! Error => {handle.reason ← reason; GOTO error}];
EXITS
error => ERROR IO.Error[$Failure, self];
};
Close:
PROC [self:
STREAM, abort:
BOOL ←
FALSE] ~ {
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
handle.Close[ ! Error => {handle.reason ← reason; GOTO error}];
self.streamData ← NIL; -- Help GC
EXITS
error => ERROR IO.Error[$Failure, self];
};
GetIndex:
PROC [self:
STREAM]
RETURNS [
INT] ~ {
This gives the input index, not the output index.
handle: TCPOps.TCPHandle ← NARROW[self.streamData];
RETURN [handle.rcvNxt - handle.irs - 1];
};
END.