ArpaTCPMain.mesa
Copyright (C) 1983, 1985 by Xerox Corporation. All rights reserved. The following program was created in 1983 but has not been published within the meaning of the copyright law, is furnished under license, and may not be used, copied and/or disclosed except in accordance with the terms of said license.
Last Edited by: Nichols, September 1, 1983 4:39 pm
Last Edited by: Taft, January 2, 1984 4:18 pm
Last Edited by: HGM, April 17, 1984 7:06:25 pm PST
Demers, September 7, 1988 1:56:43 pm PDT
Weiser, February 18, 1988 11:41:55 am PST
Doug Terry, November 16, 1987 10:28:40 am PST
Hal Murray May 16, 1985 3:13:52 am PDT
John Larson, October 11, 1987 9:40:30 pm PDT
DIRECTORY
Arpa USING [Address],
ArpaExtras USING [MyAddress],
ArpaTCP,
ArpaTCPExtras,
ArpaTCPOps USING [Open, Close, Abort, GetDataTimeout, TCPHandle, TCPRcvBuffer, SetDataTimeout, SetUrgent, WaitForUrgentData, GetNextDatagram, SendCurrentDatagram, WaitForListenerOpen],
IO USING [CreateStream, CreateStreamProcs, EndOfStream, Error, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock],
PrincOpsUtils USING [ByteBlt];
ArpaTCPMain:
CEDAR
PROGRAM
IMPORTS ArpaExtras, IO, PrincOpsUtils, ArpaTCPOps
EXPORTS ArpaTCP, ArpaTCPExtras =
BEGIN OPEN ArpaTCP, ArpaTCPExtras;
TCPStreamProcs:
REF
IO.StreamProcs =
IO.CreateStreamProcs[
variety: inputOutput,
class: $TCP,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
putBlock: PutBlock,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
close: Close,
getIndex: GetIndex
];
Error: PUBLIC ERROR [reason: Reason] = CODE;
Timeout: PUBLIC --INFORMATIONAL-- SIGNAL = CODE;
CreateTCPStream:
PUBLIC
PROC [tcpInfo: TCPInfo]
RETURNS [s:
STREAM] ~ {
handle: ArpaTCPOps.TCPHandle ← ArpaTCPOps.Open[tcpInfo];
s ← IO.CreateStream[streamProcs: TCPStreamProcs, streamData: handle]; };
AbortTCPStream:
PUBLIC
PROC [s:
STREAM] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
IF handle # NIL THEN ArpaTCPOps.Abort[handle]; };
GetTimeout:
PUBLIC
PROC [s:
STREAM]
RETURNS [timeout:
INT] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
timeout ← ArpaTCPOps.GetDataTimeout[handle];
};
SetTimeout:
PUBLIC
PROC [s:
STREAM, timeout:
INT] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
ArpaTCPOps.SetDataTimeout[handle, timeout];
};
WaitForListenerOpen:
PUBLIC
PROC [s:
STREAM, timeout:
INT ← neverTimeout] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
ArpaTCPOps.WaitForListenerOpen[handle, timeout]; };
GetRemoteSocket:
PUBLIC
PROC [s:
STREAM]
RETURNS [addr: Arpa.Address, port: Port] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
addr ← handle.foreignAddr;
port ← handle.foreignPort;
};
GetLocalSocket:
PUBLIC
PROC [s:
STREAM]
RETURNS [addr: Arpa.Address, port: Port] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
addr ← ArpaExtras.MyAddress[];
port ← handle.localPort;
};
SendNow:
PUBLIC
PROC [s:
STREAM] ~ {
Like IO.Flush[s], except that Flush waits for the packets to be acknowledged and SendNow doesn't.
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
handle.SendCurrentDatagram[FALSE ! Error => {handle.reason ← reason; GOTO error}];
EXITS
error => ERROR IO.Error[$Failure, s];
};
GetStats:
PUBLIC
PROC [s:
STREAM]
RETURNS [stats: Stats] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
RETURN [[handle.rtt, handle.rexmits, handle.duplicates]];
};
SetUrgent:
PUBLIC
PROC [s:
STREAM] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
handle.SetUrgent[];
};
WaitForUrgentData:
PUBLIC
PROC [s:
STREAM]
RETURNS [urgentIndex:
INT] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
RETURN [handle.WaitForUrgentData[] - handle.irs - 1];
};
ErrorFromStream:
PUBLIC
PROC [s:
STREAM]
RETURNS [reason: Reason] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
RETURN [handle.reason];
};
GetChar:
PROC [self:
STREAM]
RETURNS [c:
CHAR] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
rb: REF ArpaTCPOps.TCPRcvBuffer ← handle.currentInputBuffer;
IF rb #
NIL
AND rb.dataByteCount > 0
THEN {
c ← LOOPHOLE[rb.datagramPtr.body.bytes[rb.dataOffset]];
rb.dataOffset ← rb.dataOffset + 1;
rb.dataByteCount ← rb.dataByteCount - 1; }
ELSE
TRUSTED {
buff: PACKED ARRAY [0..1] OF CHAR;
n: INT = IO.UnsafeGetBlock[self, [base: LOOPHOLE[LONG[@buff]], startIndex: 0, count: 1]];
IF n = 0 THEN ERROR IO.EndOfStream[self];
RETURN[buff[0]]; };
};
EndOf:
PROC [self:
STREAM]
RETURNS [
BOOL] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
RETURN [(
SELECT handle.state
FROM
closed, closing, closeWait, timeWait, lastAck => TRUE,
ENDCASE => FALSE) AND (handle.currentInputBuffer=NIL OR handle.currentInputBuffer.dataByteCount<=0) AND handle.readyToReadQueue=NIL];
};
CharsAvail:
PROC [self:
STREAM, wait:
BOOL ←
FALSE]
RETURNS [
INT] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
WHILE handle.currentInputBuffer=
NIL
OR handle.currentInputBuffer.dataByteCount<=0
DO
IF handle.readyToReadQueue=NIL AND ~wait THEN RETURN [0];
handle.GetNextDatagram[ ! Error => GOTO error];
ENDLOOP;
RETURN [handle.currentInputBuffer.dataByteCount];
EXITS
error => RETURN[1]; -- will cause client to read the stream and get the error
};
GetBlock:
PROC [self:
STREAM, block:
REF
TEXT, startIndex:
NAT, count:
NAT]
RETURNS [nBytesRead:
NAT] ~
TRUSTED {
nBytesRead ← self.UnsafeGetBlock[[
base: LOOPHOLE[BASE[DESCRIPTOR[block]]],
startIndex: startIndex,
count: MIN[count, block.maxLength-startIndex] ] ];
block.length ← startIndex+nBytesRead;
};
UnsafeGetBlock:
UNSAFE
PROC [self:
STREAM, block:
IO.UnsafeBlock]
RETURNS [nBytesRead:
INT] ~
UNCHECKED {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
rb: REF ArpaTCPOps.TCPRcvBuffer ← handle.currentInputBuffer;
nBytes: INT;
nBytesRead ← 0;
WHILE block.count>0
DO
IF rb =
NIL
OR rb.dataByteCount <= 0
THEN
handle.GetNextDatagram[ ! Error => {handle.reason ← reason; GOTO error}];
rb ← handle.currentInputBuffer;
nBytes ←
IF block.base =
NIL
-- means "discard data"
THEN MIN[block.count, rb.dataByteCount]
ELSE PrincOpsUtils.ByteBlt[
to: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: block.startIndex+block.count],
from: [blockPointer: @rb.datagramPtr.body, startIndex: rb.dataOffset, stopIndexPlusOne: rb.dataOffset+rb.dataByteCount] ];
block.startIndex ← block.startIndex + nBytes;
block.count ← block.count - nBytes;
nBytesRead ← nBytesRead + nBytes;
rb.dataOffset ← rb.dataOffset + nBytes;
rb.dataByteCount ← rb.dataByteCount - nBytes;
REPEAT
error => IF handle.reason # localClose AND handle.reason # remoteClose AND handle.reason # remoteAbort THEN ERROR IO.Error[$Failure, self];
Treat remoteClose or localClose as EndOfStream; that is, just exit the loop and return the number of bytes successfully transferred (which might be zero).
ENDLOOP;
};
PutChar:
PROC [self:
STREAM, char:
CHAR] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
IF handle.currentOutputDatagram #
NIL
AND handle.currentOutputPtr < handle.currentOutputLimit
THEN {
handle.currentOutputDatagram.body.bytes[handle.currentOutputPtr] ← LOOPHOLE[char];
handle.currentOutputPtr ← handle.currentOutputPtr + 1; }
ELSE
TRUSTED {
buff: PACKED ARRAY [0..1] OF CHAR;
buff[0] ← char;
IO.UnsafePutBlock[self, [base: LOOPHOLE[LONG[@buff]], startIndex: 0, count: 1]]; };
};
PutBlock:
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT, count:
NAT] ~
TRUSTED {
self.UnsafePutBlock[[
base: LOOPHOLE[BASE[DESCRIPTOR[block]]],
startIndex: startIndex,
count: MIN[count, block.length-startIndex] ] ];
};
UnsafePutBlock:
PROC [self:
STREAM, block:
IO.UnsafeBlock] ~
TRUSTED {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
nBytes: INT;
WHILE block.count>0
DO
IF handle.currentOutputDatagram =
NIL
OR handle.currentOutputPtr >= handle.currentOutputLimit
THEN
handle.SendCurrentDatagram[FALSE ! Error => {handle.reason ← reason; GOTO error}];
nBytes ←
IF block.base =
NIL
-- means "ignore data"
THEN MIN[block.count, handle.currentOutputLimit - handle.currentOutputPtr]
ELSE PrincOpsUtils.ByteBlt[
from: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: block.startIndex+block.count],
to: [blockPointer: @handle.currentOutputDatagram.body, startIndex: handle.currentOutputPtr, stopIndexPlusOne: handle.currentOutputLimit] ];
block.startIndex ← block.startIndex + nBytes;
block.count ← block.count - nBytes;
handle.currentOutputPtr ← handle.currentOutputPtr + nBytes;
ENDLOOP;
EXITS
error => ERROR IO.Error[$Failure, self];
};
Flush:
PROC [self:
STREAM] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
handle.SendCurrentDatagram[TRUE ! Error => {handle.reason ← reason; GOTO error}];
EXITS
error => ERROR IO.Error[$Failure, self];
};
Close:
PROC [self:
STREAM, abort:
BOOL ←
FALSE] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
handle.Close[ ! Error => {handle.reason ← reason; GOTO error}];
EXITS
error => ERROR IO.Error[$Failure, self];
};
GetIndex:
PROC [self:
STREAM]
RETURNS [
INT] ~ {
This gives the input index, not the output index.
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
RETURN [handle.rcvNxt - handle.irs - 1];
};
END.