ArpaTCPMain.mesa
Copyright (C) 1983, 1985 by Xerox Corporation. All rights reserved. The following program was created in 1983 but has not been published within the meaning of the copyright law, is furnished under license, and may not be used, copied and/or disclosed except in accordance with the terms of said license.
Last Edited by: Nichols, September 1, 1983 4:39 pm
Last Edited by: Taft, January 2, 1984 4:18 pm
Last Edited by: HGM, April 17, 1984 7:06:25 pm PST
Demers, September 7, 1988 1:56:43 pm PDT
Weiser, February 18, 1988 11:41:55 am PST
Doug Terry, November 16, 1987 10:28:40 am PST
Hal Murray May 16, 1985 3:13:52 am PDT
John Larson, October 11, 1987 9:40:30 pm PDT
DIRECTORY
Arpa USING [Address],
ArpaExtras USING [MyAddress],
ArpaTCP,
ArpaTCPExtras,
ArpaTCPOps USING [Open, Close, Abort, GetDataTimeout, TCPHandle, TCPRcvBuffer, SetDataTimeout, SetUrgent, WaitForUrgentData, GetNextDatagram, SendCurrentDatagram, WaitForListenerOpen],
IO USING [CreateStream, CreateStreamProcs, EndOfStream, Error, STREAM, StreamProcs, UnsafeBlock, UnsafeGetBlock, UnsafePutBlock],
PrincOpsUtils USING [ByteBlt];
ArpaTCPMain: CEDAR PROGRAM
IMPORTS ArpaExtras, IO, PrincOpsUtils, ArpaTCPOps
EXPORTS ArpaTCP, ArpaTCPExtras =
BEGIN OPEN ArpaTCP, ArpaTCPExtras;
TCPStreamProcs: REF IO.StreamProcs = IO.CreateStreamProcs[
variety: inputOutput,
class: $TCP,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
putBlock: PutBlock,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
close: Close,
getIndex: GetIndex
];
Error: PUBLIC ERROR [reason: Reason] = CODE;
Timeout: PUBLIC --INFORMATIONAL-- SIGNAL = CODE;
CreateTCPStream: PUBLIC PROC [tcpInfo: TCPInfo] RETURNS [s: STREAM] ~ {
handle: ArpaTCPOps.TCPHandle ← ArpaTCPOps.Open[tcpInfo];
s ← IO.CreateStream[streamProcs: TCPStreamProcs, streamData: handle]; };
AbortTCPStream: PUBLIC PROC [s: STREAM] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
IF handle # NIL THEN ArpaTCPOps.Abort[handle]; };
GetTimeout: PUBLIC PROC [s: STREAM] RETURNS [timeout: INT] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
timeout ← ArpaTCPOps.GetDataTimeout[handle];
};
SetTimeout: PUBLIC PROC [s: STREAM, timeout: INT] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
ArpaTCPOps.SetDataTimeout[handle, timeout];
};
WaitForListenerOpen: PUBLIC PROC [s: STREAM, timeout: INT ← neverTimeout] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
ArpaTCPOps.WaitForListenerOpen[handle, timeout]; };
GetRemoteSocket: PUBLIC PROC [s: STREAM] RETURNS [addr: Arpa.Address, port: Port] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
addr ← handle.foreignAddr;
port ← handle.foreignPort;
};
GetLocalSocket: PUBLIC PROC [s: STREAM] RETURNS [addr: Arpa.Address, port: Port] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
addr ← ArpaExtras.MyAddress[];
port ← handle.localPort;
};
SendNow: PUBLIC PROC [s: STREAM] ~ {
Like IO.Flush[s], except that Flush waits for the packets to be acknowledged and SendNow doesn't.
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
handle.SendCurrentDatagram[FALSE ! Error => {handle.reason ← reason; GOTO error}];
EXITS
error => ERROR IO.Error[$Failure, s];
};
GetStats: PUBLIC PROC [s: STREAM] RETURNS [stats: Stats] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
RETURN [[handle.rtt, handle.rexmits, handle.duplicates]];
};
SetUrgent: PUBLIC PROC [s: STREAM] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
handle.SetUrgent[];
};
WaitForUrgentData: PUBLIC PROC [s: STREAM] RETURNS [urgentIndex: INT] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
RETURN [handle.WaitForUrgentData[] - handle.irs - 1];
};
ErrorFromStream: PUBLIC PROC [s: STREAM] RETURNS [reason: Reason] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[s.streamData];
RETURN [handle.reason];
};
GetChar: PROC [self: STREAM] RETURNS [c: CHAR] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
rb: REF ArpaTCPOps.TCPRcvBuffer ← handle.currentInputBuffer;
IF rb # NIL AND rb.dataByteCount > 0 THEN {
c ← LOOPHOLE[rb.datagramPtr.body.bytes[rb.dataOffset]];
rb.dataOffset ← rb.dataOffset + 1;
rb.dataByteCount ← rb.dataByteCount - 1; }
ELSE TRUSTED {
buff: PACKED ARRAY [0..1] OF CHAR;
n: INT = IO.UnsafeGetBlock[self, [base: LOOPHOLE[LONG[@buff]], startIndex: 0, count: 1]];
IF n = 0 THEN ERROR IO.EndOfStream[self];
RETURN[buff[0]]; };
};
EndOf: PROC [self: STREAM] RETURNS [BOOL] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
RETURN [(SELECT handle.state FROM
closed, closing, closeWait, timeWait, lastAck => TRUE,
ENDCASE => FALSE) AND (handle.currentInputBuffer=NIL OR handle.currentInputBuffer.dataByteCount<=0) AND handle.readyToReadQueue=NIL];
};
CharsAvail: PROC [self: STREAM, wait: BOOLFALSE] RETURNS [INT] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
WHILE handle.currentInputBuffer=NIL OR handle.currentInputBuffer.dataByteCount<=0 DO
IF handle.readyToReadQueue=NIL AND ~wait THEN RETURN [0];
handle.GetNextDatagram[ ! Error => GOTO error];
ENDLOOP;
RETURN [handle.currentInputBuffer.dataByteCount];
EXITS
error => RETURN[1]; -- will cause client to read the stream and get the error
};
GetBlock: PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT] ~ TRUSTED {
nBytesRead ← self.UnsafeGetBlock[[
base: LOOPHOLE[BASE[DESCRIPTOR[block]]],
startIndex: startIndex,
count: MIN[count, block.maxLength-startIndex] ] ];
block.length ← startIndex+nBytesRead;
};
UnsafeGetBlock: UNSAFE PROC [self: STREAM, block: IO.UnsafeBlock] RETURNS [nBytesRead: INT] ~ UNCHECKED {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
rb: REF ArpaTCPOps.TCPRcvBuffer ← handle.currentInputBuffer;
nBytes: INT;
nBytesRead ← 0;
WHILE block.count>0 DO
IF rb = NIL OR rb.dataByteCount <= 0 THEN
handle.GetNextDatagram[ ! Error => {handle.reason ← reason; GOTO error}];
rb ← handle.currentInputBuffer;
nBytes ← IF block.base = NIL -- means "discard data"
THEN MIN[block.count, rb.dataByteCount]
ELSE PrincOpsUtils.ByteBlt[
to: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: block.startIndex+block.count],
from: [blockPointer: @rb.datagramPtr.body, startIndex: rb.dataOffset, stopIndexPlusOne: rb.dataOffset+rb.dataByteCount] ];
block.startIndex ← block.startIndex + nBytes;
block.count ← block.count - nBytes;
nBytesRead ← nBytesRead + nBytes;
rb.dataOffset ← rb.dataOffset + nBytes;
rb.dataByteCount ← rb.dataByteCount - nBytes;
REPEAT
error => IF handle.reason # localClose AND handle.reason # remoteClose AND handle.reason # remoteAbort THEN ERROR IO.Error[$Failure, self];
Treat remoteClose or localClose as EndOfStream; that is, just exit the loop and return the number of bytes successfully transferred (which might be zero).
ENDLOOP;
};
PutChar: PROC [self: STREAM, char: CHAR] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
IF handle.currentOutputDatagram # NIL AND handle.currentOutputPtr < handle.currentOutputLimit THEN {
handle.currentOutputDatagram.body.bytes[handle.currentOutputPtr] ← LOOPHOLE[char];
handle.currentOutputPtr ← handle.currentOutputPtr + 1; }
ELSE TRUSTED {
buff: PACKED ARRAY [0..1] OF CHAR;
buff[0] ← char;
IO.UnsafePutBlock[self, [base: LOOPHOLE[LONG[@buff]], startIndex: 0, count: 1]]; };
};
PutBlock: PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] ~ TRUSTED {
self.UnsafePutBlock[[
base: LOOPHOLE[BASE[DESCRIPTOR[block]]],
startIndex: startIndex,
count: MIN[count, block.length-startIndex] ] ];
};
UnsafePutBlock: PROC [self: STREAM, block: IO.UnsafeBlock] ~ TRUSTED {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
nBytes: INT;
WHILE block.count>0 DO
IF handle.currentOutputDatagram = NIL OR handle.currentOutputPtr >= handle.currentOutputLimit THEN
handle.SendCurrentDatagram[FALSE ! Error => {handle.reason ← reason; GOTO error}];
nBytes ← IF block.base = NIL -- means "ignore data"
THEN MIN[block.count, handle.currentOutputLimit - handle.currentOutputPtr]
ELSE PrincOpsUtils.ByteBlt[
from: [blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: block.startIndex+block.count],
to: [blockPointer: @handle.currentOutputDatagram.body, startIndex: handle.currentOutputPtr, stopIndexPlusOne: handle.currentOutputLimit] ];
block.startIndex ← block.startIndex + nBytes;
block.count ← block.count - nBytes;
handle.currentOutputPtr ← handle.currentOutputPtr + nBytes;
ENDLOOP;
EXITS
error => ERROR IO.Error[$Failure, self];
};
Flush: PROC [self: STREAM] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
handle.SendCurrentDatagram[TRUE ! Error => {handle.reason ← reason; GOTO error}];
EXITS
error => ERROR IO.Error[$Failure, self];
};
Close: PROC [self: STREAM, abort: BOOLFALSE] ~ {
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
handle.Close[ ! Error => {handle.reason ← reason; GOTO error}];
EXITS
error => ERROR IO.Error[$Failure, self];
};
GetIndex: PROC [self: STREAM] RETURNS [INT] ~ {
This gives the input index, not the output index.
handle: ArpaTCPOps.TCPHandle ← NARROW[self.streamData];
RETURN [handle.rcvNxt - handle.irs - 1];
};
END.