-- File: TcpImpl.mesa - last edit:
-- AOF 3-Mar-88 14:05:23
-- JAV 19-Nov-87 13:59:40
-- sma 9-Feb-87 19:35:36
-- Copyright (C) 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved.
DIRECTORY
ArpaBuffer USING [
AccessHandle, Body, Buffer, DataBytesPerRawBuffer, GetBuffer, ReturnBuffer,
To, From],
ArpaFlags USING [doStats],
ArpaPortInternal USING [AddrMismatch, BuildMasks, GetSubnetMask],
ArpaPort USING [GetPacket, Handle, minIPHeaderBytes, PutPacket],
ArpaRouter USING [InternetAddress, GetAddress, unknownInternetAddress, Port],
ArpaStats USING [Incr],
ArpaTypes USING [Cardinal32, InternetAddress, Port],
ByteBlt USING [ByteBlt],
CommHeap USING [zone],
CommPriorities USING [receiver],
CommUtil USING [PulsesToTicks],
Driver USING [Glitch],
Environment USING [Block, Byte, bytesPerWord],
Mopcodes USING [zEXCH],
Process USING [
Abort, CancelAbort, DisableTimeout, GetCurrent, Pause,
SetPriority, SetTimeout],
Space USING [Interval, Unmap],
System USING [GetClockPulses, MicrosecondsToPulses],
TcpOps USING [PSProc, PSProcess, StartTcpStreamProc, StopTcpStreamProc],
TcpPort USING [DeleteTcpPort, SpecifyRemote],
TcpStream USING [Closed, CompletionCode, Failed, FailureReason, ListenTimeout,
Object, Precedence, Security, Suspended, SuspendReason, WaitTime],
TcpStreamInternal USING [Connection, DataPair, DataList,
GetDataLength, Greater, GreaterOrEqual, Less, LessOrEqual, LowHalf, Max,
MaxSegment, MaxSegmentObj, maxTcpDataBytes, Min, minTcpHeaderBytes, PairObject,
PrecedenceMatch, Rcvr, SecurityMatch, State, Xmtr, Rexmtr, InitStream,
SetHeaderFields];
TcpImpl: MONITOR
IMPORTS
ArpaBuffer, ArpaPortInternal, ArpaPort, ArpaRouter, ArpaStats,
ByteBlt, CommHeap, CommUtil, Driver, Process, Space, System, TcpPort,
TcpStream, TcpStreamInternal
EXPORTS TcpOps, ArpaRouter =
BEGIN OPEN TcpStreamInternal;
--debugging the @#$%~&* receive list.
sanityChecking: BOOLEAN = TRUE;
bpw: NATURAL = Environment.bytesPerWord;
start: PUBLIC TcpOps.StartTcpStreamProc ← StartTcpStream;
stop: PUBLIC TcpOps.StopTcpStreamProc ← StopTcpStream;
--for calculations on 32 bit sequence numbers
SeqToCard: PROC [ArpaTypes.Cardinal32] RETURNS [LONG CARDINAL] =
MACHINE CODE {Mopcodes.zEXCH};
CardToSeq: PROC [LONG CARDINAL] RETURNS [ArpaTypes.Cardinal32] =
MACHINE CODE {Mopcodes.zEXCH};
<<ListSanityCheck: INTERNAL PROC =
BEGIN
--Compares the length of the list to the number of nodes in hopes of
--glitching just after the code that made it bad.
SELECT rcvr.inuse.length FROM
0 =>
IF rcvr.inuse.head = NIL THEN RETURN ELSE Driver.Glitch[ListGarbled];
ENDCASE =>
BEGIN
p: TcpStreamInternal.DataPair ← NIL;
n: CARDINAL ← 0;
FOR p ← rcvr.inuse.head, p.next UNTIL p = NIL DO
n ← n + 1;
ENDLOOP;
IF n # rcvr.inuse.length THEN Driver.Glitch[ListGarbled];
END;
END; --ListSanityCheck >>
Port: PUBLIC --ArpaRouter-- TYPE = ArpaTypes.Port;
InternetAddress: PUBLIC --ArpaRouter-- TYPE = ArpaTypes.InternetAddress;
maxMsecToPulses: LONG CARDINAL = LAST[LONG CARDINAL] / 1000;
--max sender can have outstanding.
advertiseSize: BOOLEAN = TRUE; --send max size option on conn request.
xmtr: PUBLIC TcpStreamInternal.Xmtr;
rcvr: PUBLIC TcpStreamInternal.Rcvr;
rexmtr: PUBLIC TcpStreamInternal.Rexmtr;
connection: PUBLIC TcpStreamInternal.Connection;
--for managing blocks of client output.
output: PUBLIC RECORD [
b: ArpaBuffer.Buffer, --current output buffer in use (or NIL).
finger: CARDINAL, --bytes consumed from b.
bufferSize: CARDINAL]; --max bytes permitted in output.
tcpStreamObject: TcpStream.Object;
--Glitches for debugging, some may be eventually subject to a "doDebug" switch.
IllegalState: ERROR = CODE;
TrashInRexmtr: ERROR = CODE;
DataAfterFin: ERROR = CODE;
ListGarbled: ERROR = CODE;
PushNotCleared: ERROR = CODE;
--Used by procs that notify|wait without state checking required
Notify: ENTRY PROC [c: LONG POINTER TO CONDITION] = {NOTIFY c↑};
Wait: ENTRY PROC [c: LONG POINTER TO CONDITION] =
{ENABLE UNWIND => NULL; WAIT c↑};
--PROCEDURES
Close: PUBLIC PROC =
BEGIN
ENABLE UNWIND => NULL;
SELECT connection.state FROM
established =>
{ForceOut[setPush: FALSE, setFin: TRUE, setUrg: FALSE];
connection.state ← finWait1};
closeWait =>
{ForceOut[setPush: FALSE, setFin: TRUE, setUrg: FALSE];
connection.state ← lastAck};
suspended => RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended];
finWait1, finWait2, closing, lastAck, timeWait => ERROR TcpStream.Closed;
ENDCASE => --closed, listen, synSent, synReceived.
Driver.Glitch[IllegalState]; --How did we get so far?
END; --Close
ConnReply: PROC [b: ArpaBuffer.Buffer] =
<<
PROCESS: RECEIVER
In synSent state, if the incoming packet is a SYN with an ACK to the
SYN that we sent, advance to established state.
>>
BEGIN
body: ArpaBuffer.Body = b.arpa;
SELECT TRUE FROM
(~body.tcp.ack) => RETURN; --reply syn must always carry ack
--Must be a stray - he can't ack something that we haven't even sent.
LessOrEqual[SeqToCard[body.tcp.acknowledgement], xmtr.iss],
Greater[SeqToCard[body.tcp.acknowledgement], xmtr.nextSeq] =>
{IF ~body.tcp.rst THEN SendRst[b]; RETURN};
--duplicate
Less[SeqToCard[body.tcp.acknowledgement], xmtr.unackedSeq] => RETURN;
ENDCASE;
--carries an acceptable ack - check for other error conditions.
SELECT TRUE FROM
body.tcp.rst => ERROR TcpStream.Failed[remoteReject];
~SecurityMatch[body] =>
{IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedSecurityMismatch];
SendRst[b]; RETURN};
~PrecedenceMatch[body] =>
{IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedPrecedencedMismatch];
SendRst[b]; RETURN};
(body.tcp.dataOffset > 5) => ProcessOptions[body];
ENDCASE;
--only good guys get here.
InitRcvr[body.tcp.sequence];
ProcessTcpState[body];
[] ← CheckForData[body]; --may have data for client.
SendAck[]; -- send a SYNACK
END; --ConnReply
ConnRequest: PUBLIC PROC [b: ArpaBuffer.Buffer] =
BEGIN
<<
PROCESS: RECEIVER
In listen state, if the incoming packet is a syn, advance to synReceived
state and send a syn/ack. Also process precedence, security and options.
arbitration.
>>
body: ArpaBuffer.Body = b.arpa;
SELECT TRUE FROM
(body.tcp.rst) =>
IF ArpaFlags.doStats THEN ArpaStats.Incr[droppedBadRst];
(body.tcp.ack) =>
{IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedBadAck]; SendRst[b]};
(body.tcp.syn) =>
BEGIN
SELECT TRUE FROM
~SecurityMatch[body] =>
BEGIN
IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedSecurityMismatch];
SendRst[b];
END;
~PrecedenceMatch[body] =>
BEGIN
IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedPrecedencedMismatch];
SendRst[b];
END;
ENDCASE => --OK, we like this one.
BEGIN
mySubnetMask: ArpaRouter.InternetAddress ← ArpaPortInternal.GetSubnetMask[];
myInternetAddress: ArpaRouter.InternetAddress ← ArpaRouter.GetAddress[];
connection.remoteAddr ← body.ipHeader.source;
connection.remotePort ← body.tcp.sourcePort;
IF ArpaPortInternal.AddrMismatch[
IF ArpaPortInternal.GetSubnetMask[] # ArpaRouter.unknownInternetAddress THEN
ArpaPortInternal.GetSubnetMask[]
ELSE
ArpaPortInternal.BuildMasks[myInternetAddress].netMask,
myInternetAddress,
body.ipHeader.source]
THEN
connection.offNet ← TRUE ELSE connection.offNet ← FALSE;
TcpPort.SpecifyRemote[connection.port, connection.remoteAddr,
connection.remotePort];
IF body.tcp.dataOffset > 5 THEN ProcessOptions[body];
InitXmtr[];
InitRcvr[body.tcp.sequence];
connection.state ← synReceived;
SendSyn[ack: TRUE]; --send a syn in reply
IF CheckForData[body] THEN SendAck[]; --may have data for client.
END;
END; --tcp.syn
ENDCASE => IF ArpaFlags.doStats THEN ArpaStats.Incr[droppedJunkRequest];
END; --ConnRequest
ConsumeAcked: PUBLIC ENTRY PROC =
<<
PROCESS: RETRANSMITTER
Cleans house on the retransmission list.
No possibility of signals.
>>
BEGIN
UNTIL rexmtr.list.head = NIL DO
b: ArpaBuffer.Buffer ← rexmtr.list.head;
body: ArpaBuffer.Body = b.arpa;
dataLen: CARDINAL ← GetDataLength[body].l;
IF (dataLen = 0) AND ~body.tcp.fin AND ~body.tcp.syn THEN
Driver.Glitch[TrashInRexmtr]; --how did an empty buffer get in here?
--fins and syn consume a sequence number.
IF body.tcp.fin OR body.tcp.syn THEN dataLen ← SUCC[dataLen];
IF (LessOrEqual[xmtr.unackedSeq,
SeqToCard[body.tcp.sequence] + PRED[dataLen]]) THEN EXIT; --not acked.
<<
This buffer has been acked, take it out of the list and collect data for
retransmission interval calculation. Delay is time since buffer was first
sent.
>>
rexmtr.delay ← rexmtr.delay + (System.GetClockPulses[] - b.fo.time);
<<
Why is the "(CARDINAL[b.fo.tries - 1] * rexmtr.interval)" factor in
here? I'm probably missing something. AOF 2-Jun-87 17:31:39
rexmtr.delay ← rexmtr.delay + (CARDINAL[b.fo.tries - 1] * rexmtr.interval)
+ (System.GetClockPulses[] - b.fo.time);
>>
rexmtr.count ← SUCC[rexmtr.count]; --and the number of participants
rexmtr.list.head ← ArpaBuffer.From[b.fo.next];
rexmtr.list.length ← PRED[rexmtr.list.length];
ArpaBuffer.ReturnBuffer[b];
ENDLOOP;
IF rexmtr.list.length > 0 THEN NOTIFY rexmtr.condition --on to next
ELSE NOTIFY xmtr.newAllocation;
END; --ConsumeAcked
FindAddresses: PROC RETURNS [
localAddr, remoteAddr: InternetAddress,
localPort, remotePort: ArpaRouter.Port] =
{RETURN[connection.localAddr, connection.remoteAddr, connection.localPort,
connection.remotePort]}; --FindAddresses
FlushDataList: PROC[list: LONG POINTER TO TcpStreamInternal.DataList] =
BEGIN --Frees list of sequence number pairs.
UNTIL list.head = NIL DO
this: TcpStreamInternal.DataPair ← list.head; --pick up list head
list.head ← this.next; --record next entry in list head
CommHeap.zone.FREE[@this]; --free this enry
ENDLOOP;
END; --FlushDataList
FlushRexmtrList: PROC =
BEGIN --Returns all buffers on the rexmtr list.
UNTIL rexmtr.list.head = NIL DO
b: ArpaBuffer.Buffer ← rexmtr.list.head; --pick first element off
rexmtr.list.head ← ArpaBuffer.From[b.fo.next]; --copy link to head
ArpaBuffer.ReturnBuffer[b]; --free this buffer
rexmtr.list.length ← PRED[rexmtr.list.length]; --decrement the count
ENDLOOP;
END; --FlushRexmtrList
ForceOut: PROC [setPush, setFin, setUrg: BOOLEAN] =
BEGIN
<<
PROCESS: CLIENT SENDING
Assigns the sequence number and then either sends the packet or waits for
allocation from remote end.
>>
b: ArpaBuffer.Buffer;
dataLen: CARDINAL ← 0;
CopyOutBuffer: ENTRY PROC = INLINE
BEGIN
b ← output.b;
output.b ← NIL;
dataLen ← output.finger;
output.finger ← 0;
END; --CopyOutBuffer
AssignSeqAndAssertSend: ENTRY PROC =
BEGIN
ENABLE UNWIND => NULL;
--assign the next sequence number to this buffer.
sequenceNumber: LONG CARDINAL ← xmtr.nextSeq;
b.arpa.tcp.sequence ← CardToSeq[xmtr.nextSeq];
b.arpa.tcp.fin ← setFin;
DO --EXITs (success) or ERRORs (failure)
SELECT connection.state FROM
established, closeWait =>
BEGIN
IF LessOrEqual[sequenceNumber, xmtr.maxSeq] THEN EXIT;
<<
Copy b so that the allocation probing code in the rexmtr can
get a hold of the oldest byte of data.
>>
xmtr.blocked ← [b, dataLen];
WAIT xmtr.newAllocation; --wait for something to happen
xmtr.blocked ← [NIL, 0]; --clear state info
END;
suspended =>
RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended];
finWait1, finWait2, closing, lastAck, timeWait =>
RETURN WITH ERROR TcpStream.Closed; --no send after closing.
--listen, synSent, synReceived
ENDCASE => Driver.Glitch[IllegalState]; --How did we get so far?
ENDLOOP;
--This is the only proc that updates xmtr.nextSeq.
xmtr.nextSeq ← xmtr.nextSeq + dataLen;
--and fins consume a sequence number.
IF (b.arpa.tcp.fin ← setFin) THEN xmtr.nextSeq ← SUCC[xmtr.nextSeq];
END; --AssignSeqAndAssertSend
--start of procedure ForceOut.
CopyOutBuffer[]; --get the state copied out safely
IF b = NIL THEN b ← GetOutputBuffer[];
AssignSeqAndAssertSend[! UNWIND => xmtr.blocked ← [NIL, 0]];
SetHeaderFields[b, dataLen, 0];
b.arpa.tcp.psh ← setPush; --copy client's request
IF setUrg THEN --dataLen should always be > 0, so PRED[dataLen] is safe.
{b.arpa.tcp.urg ← TRUE; b.arpa.tcp.urgentPointer ← LOOPHOLE[PRED[dataLen]]};
SendPacket[b]; --on its way out
END; --ForceOut
GetBlock: PUBLIC PROC[block: Environment.Block]
RETURNS [byteCount: CARDINAL, completionCode: TcpStream.CompletionCode] =
--PROCESS: CLIENT RECEIVING
BEGIN
ENABLE UNWIND => NULL;
moved: CARDINAL; --amount transferred.
remoteClosed: BOOLEAN ← FALSE; --implies a push of data.
start, end: CARDINAL; --starting and ending bytes of data, space relative.
count: CARDINAL; --minimum of requested and available bytes.
ack, out: BOOLEAN;
Locked: ENTRY PROC RETURNS[exit, needsAck: BOOLEAN ← FALSE] =
BEGIN ENABLE UNWIND => NULL;
SELECT TRUE FROM
--did we find a push previous time thru loop?
(completionCode = pushed) => {exit ← TRUE; RETURN}; --yes
--is there data pending?
(rcvr.inuse.head # NIL) AND
(rcvr.inuse.head.start = SUCC[rcvr.lastConsumed]) => --yes
BEGIN
rcvrStart: LONG CARDINAL = rcvr.inuse.head.start;
rcvrStartMinus1: LONG CARDINAL = PRED[rcvrStart];
start ← LowHalf[rcvrStart MOD rcvr.inputSpace.size];
count ← LowHalf[Min[ --min of data available & data wanted.
rcvr.inuse.head.end - rcvrStartMinus1,
block.stopIndexPlusOne - block.startIndex]];
--is the pending data within client request AND pushed?
SELECT TRUE FROM
~rcvr.pushSig => NULL; --no push
GreaterOrEqual[rcvr.push, rcvrStart] =>
IF LessOrEqual[rcvr.push, rcvrStartMinus1 + count] THEN
BEGIN
<<count ← LowHalf[Min[count, rcvr.push - rcvrStartMinus1]];>>
completionCode ← pushed;
rcvr.pushSig ← FALSE;
END;
ENDCASE => Driver.Glitch[PushNotCleared];
--is there an urgent within client requested data?
SELECT TRUE FROM
~rcvr.urgSig => NULL; --no urgent
GreaterOrEqual[rcvr.urg, rcvrStart] =>
IF LessOrEqual[rcvr.urg, rcvrStartMinus1 + count] THEN
BEGIN
count ← LowHalf[Min[count, rcvr.urg - rcvrStartMinus1]];
completionCode ← endUrgent; --even if it is also pushed.
rcvr.urgSig ← FALSE;
END;
ENDCASE;
end ← start + count - 1;
IF end >= rcvr.inputSpace.size THEN --retrieving wrapped data?
BEGIN
moved ← ByteBlt.ByteBlt[block,
[rcvr.inputSpace.space.pointer, start, rcvr.inputSpace.size]];
moved ← moved + ByteBlt.ByteBlt[
[block.blockPointer, block.startIndex + moved,
block.stopIndexPlusOne],
[rcvr.inputSpace.space.pointer, 0, count - moved]]
END
ELSE moved ← ByteBlt.ByteBlt[block,
[rcvr.inputSpace.space.pointer, start, SUCC[end]]];
block.startIndex ← block.startIndex + moved;
byteCount ← byteCount + moved;
needsAck ← UpdateInputSpace[moved];
rcvr.maxSeq ← rcvr.nextSeq +
rcvr.inputSpace.size - (rcvr.nextSeq - rcvr.lastConsumed);
END;
--connection is closing, implying a push at the end of the data.
(connection.state = closed),
(connection.state = closeWait),
(connection.state = closing),
(connection.state = lastAck),
(connection.state = timeWait) => {completionCode ← closing; exit ← TRUE};
rcvr.waitTime = 0 => {completionCode ← timeout; exit ← TRUE};
(System.GetClockPulses[] - rcvr.timeout) > rcvr.interval =>
{completionCode ← timeout; exit ← TRUE};
--Wait for the rest of data.
(connection.state = established),
(connection.state = finWait1),
(connection.state = finWait2) =>
IF (rcvr.inuse.head = NIL) OR
(rcvr.inuse.head.start # SUCC[rcvr.lastConsumed]) THEN
WAIT rcvr.newInput;
(connection.state = suspended) =>
RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended];
ENDCASE => --listen, synSent, synReceived.
Driver.Glitch[IllegalState]; --How on earth did we get this far?
END; --Locked
--start of GetBlock.
byteCount ← 0;
completionCode ← normal;
rcvr.timeout ← System.GetClockPulses[];
WHILE block.startIndex < block.stopIndexPlusOne DO
[out, ack] ← Locked[]; --can't call SendAck while holding monitor.
IF ack THEN SendAck[]; --but we can right afterwards
IF out THEN EXIT; --and maybe we can get out of here too
ENDLOOP;
END; --GetBlock
GetOutputBuffer: PUBLIC PROC RETURNS [b: ArpaBuffer.Buffer] =
BEGIN
body: ArpaBuffer.Body;
CleanupClientProcess: ENTRY PROC =
BEGIN
SELECT TRUE FROM
(xmtr.clientProcess = NIL) => NULL; --multiple clients in BufferMgr??!!
(connection.state = suspended) => --we're suspended, we did the abort.
{Process.CancelAbort[xmtr.clientProcess]; xmtr.clientProcess ← NIL};
ENDCASE => xmtr.clientProcess ← NIL;
END; --CleanupClientProcess
IF (connection.state = suspended) THEN GOTO returnSuspended;
xmtr.clientProcess ← Process.GetCurrent[]; --capture client process
BEGIN
<<
Don't let greedy client use all the send buffers, else we won't be
able to probe for allocation if we need to... Doing a Pause is slow,
but at this point, the remote is so slow anyway, anything we do is in
the noise.
>>
ENABLE
BEGIN
UNWIND => xmtr.clientProcess ← NIL;
ABORTED => IF connection.state = suspended THEN GOTO suspended;
END;
WHILE connection.pool.sendInUse = (PRED[connection.pool.send]) DO
Process.Pause[CommUtil.PulsesToTicks[[rexmtr.interval]]]; ENDLOOP;
b ← ArpaBuffer.GetBuffer[
connection.pool, send, TRUE, connection.family.maxBufferSize];
EXITS suspended => {xmtr.clientProcess ← NIL; GOTO returnSuspended};
END; --protected code.
CleanupClientProcess[];
IF connection.state = suspended THEN
{ArpaBuffer.ReturnBuffer[b]; GOTO returnSuspended};
--now that we (finally) have a buffer, set defaults for fields.
body ← b.arpa;
body.tcp.urg ← body.tcp.psh ← body.tcp.rst ← body.tcp.syn ←
body.tcp.fin ← FALSE;
body.tcp.ack ← TRUE;
body.tcp.urgentPointer ← LOOPHOLE[0];
b.requeueProcedure ← RequeueProc; --put buffer in retransmit list.
EXITS
returnSuspended =>
RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended];
END; --GetOutputBuffer
GetSystemBuffer: PROC[setAck: BOOLEAN] RETURNS [b: ArpaBuffer.Buffer] =
BEGIN
--gets small buffer for sending packets that will not be retransmitted.
body: ArpaBuffer.Body;
n: NATURAL = ArpaPort.minIPHeaderBytes + minTcpHeaderBytes;
body ← (b ← ArpaBuffer.GetBuffer[connection.pool, send, TRUE, n]).arpa;
body.tcp.urg ← body.tcp.psh ← body.tcp.rst ← body.tcp.syn ←
body.tcp.fin ← FALSE;
body.tcp.ack ← setAck;
body.tcp.urgentPointer ← LOOPHOLE[0];
body.tcp.sequence ← CardToSeq[xmtr.nextSeq];
END; --GetSystemBuffer
InitRcvr: ENTRY PROC [sequence: ArpaTypes.Cardinal32] =
BEGIN
rcvr.irs ← SeqToCard[sequence];
rcvr.lastConsumed ← rcvr.push ←
rcvr.urg ← rcvr.fin ← rcvr.irs; --just to get things started.
rcvr.nextSeq ← SUCC[rcvr.irs];
rcvr.maxSeq ← rcvr.nextSeq + PRED[rcvr.inputSpace.size];
END; --InitRcvr
InitXmtr: PROC =
BEGIN
InitXmtrLocked: ENTRY PROC =
BEGIN
xmtr.iss ← System.GetClockPulses[];
xmtr.nextSeq ← SUCC[xmtr.iss];
xmtr.unackedSeq ← xmtr.iss;
xmtr.maxSeq ← xmtr.unackedSeq + xmtr.maxAlloc;
END; --InitXmtrLocked
InitXmtrLocked[];
rexmtr.process ← FORK Retransmitter[];
END; --InitXmtr
InsertRexmt: PUBLIC ENTRY PROC [b: ArpaBuffer.Buffer] =
<<
PROCESS: RETRANSMITTER
Puts the buffer in the retransmission list. The list is ordered.
>>
BEGIN
body: ArpaBuffer.Body ← b.arpa;
prev, bList: ArpaBuffer.Buffer ← NIL;
dataLen: CARDINAL ← GetDataLength[body].l;
--has this one already been acked? Syns and fins may carry no data,
--but still must be put in the retransmission list.
SELECT dataLen FROM
0 =>
IF Greater[xmtr.unackedSeq, SeqToCard[body.tcp.sequence]] THEN
{ArpaBuffer.ReturnBuffer[b]; RETURN};
ENDCASE =>
IF (Greater[xmtr.unackedSeq, SeqToCard[body.tcp.sequence] +
dataLen - 1]) THEN {ArpaBuffer.ReturnBuffer[b]; RETURN};
b.fo.next ← NIL;
rexmtr.list.length ← SUCC[rexmtr.list.length];
IF (rexmtr.list.head = NIL) THEN {rexmtr.list.head ← b}
ELSE
FOR bList ← rexmtr.list.head, ArpaBuffer.From[bList.fo.next]
UNTIL bList = NIL DO
SELECT TRUE FROM
(Less[SeqToCard[body.tcp.sequence],
SeqToCard[bList.arpa.tcp.sequence]]) =>
BEGIN
IF prev = NIL THEN
BEGIN
b.fo.next ← ArpaBuffer.To[rexmtr.list.head];
rexmtr.list.head ← b;
END
ELSE
BEGIN
b.fo.next ← prev.fo.next;
prev.fo.next ← ArpaBuffer.To[b];
END;
EXIT;
END;
(body.tcp.sequence = bList.arpa.tcp.sequence) =>
Driver.Glitch[TrashInRexmtr];
ENDCASE;
prev ← bList;
REPEAT FINISHED =>
BEGIN
b.fo.next ← prev.fo.next;
prev.fo.next ← ArpaBuffer.To[b];
END;
ENDLOOP;
END; --InsertRexmt
New: INTERNAL PROC [
link: TcpStreamInternal.DataPair, startSeq, endSeq: LONG CARDINAL]
RETURNS [new: TcpStreamInternal.DataPair] =
BEGIN
<<
Try to allocate a DataObject from the available list. If that fails,
then allocate one from the heap. Once we get running, they should all
come from the the avail list rather then the heap. Initially it is
expected that both the inuse and avail lists are empty.
>>
IF rcvr.avail.length # 0 THEN
BEGIN
new ← rcvr.avail.head;
rcvr.avail.head ← new.next;
rcvr.avail.length ← PRED[rcvr.avail.length];
END
ELSE new ← CommHeap.zone.NEW[TcpStreamInternal.PairObject];
new↑ ← [startSeq, endSeq, NIL];
--IF sanityChecking THEN ListSanityCheck[];
IF link = NIL THEN {new.next ← rcvr.inuse.head; rcvr.inuse.head ← new}
ELSE {new.next ← link.next; link.next ← new};
rcvr.inuse.length ← SUCC[rcvr.inuse.length];
--IF sanityChecking THEN ListSanityCheck[];
END; --New
CheckForData: ENTRY PROC [body: ArpaBuffer.Body] RETURNS [BOOLEAN] =
BEGIN
<<
Puts the data into the input space and updates the list of data pointers
into that space. Also does fin processing.
>>
CheckForClose: INTERNAL PROC =
BEGIN
<<
If we have received a fin, check if we can consume it and modify the
state of the close handshake.
>>
SELECT TRUE FROM
(~rcvr.finSig) => NULL; --no fin received
(rcvr.fin = rcvr.nextSeq) =>
BEGIN
--we have all the data before it.
rcvr.nextSeq ← SUCC[rcvr.fin]; --consume fin's sequence number.
SELECT connection.state FROM
established => connection.state ← closeWait;
finWait1 => connection.state ← closing;
finWait2 =>
BEGIN
xmtr.timeWaitStart ← System.GetClockPulses[];
connection.state ← timeWait;
END;
ENDCASE;
NOTIFY rcvr.newInput;
END;
(Less[rcvr.fin, PRED[rcvr.nextSeq]]) => Driver.Glitch[DataAfterFin];
ENDCASE; --fin was out of sequence - wait for other data to arrive.
END; --CheckForClose
--start of CheckForData
dataLen: CARDINAL;
data: LONG POINTER;
startSeq, endSeq: LONG CARDINAL; --first and last seq of new data.
spaceAvail: LONG CARDINAL =
rcvr.inputSpace.size - (rcvr.nextSeq - 1 - rcvr.lastConsumed);
[dataLen, data] ← GetDataLength[body];
IF dataLen = 0 THEN {CheckForClose[]; RETURN[FALSE]};
--empty packet may carry fin.
--set initial values.
startSeq ← SeqToCard[body.tcp.sequence];
endSeq ← startSeq + dataLen - 1;
<<
We assume size is greater than the greatest dataLen. The amount of data
we will move is the maximum of the amount we want to move and the amount
we have space to move.
>>
startSeq ← Max[startSeq, rcvr.nextSeq];
dataLen ← LowHalf[endSeq - startSeq + 1]; --adjust for new start.
dataLen ← LowHalf[Min[dataLen, spaceAvail]]; --again for space avail
endSeq ← startSeq + dataLen - 1; --and use the result to set new endpoint.
IF dataLen = 0 THEN RETURN[FALSE]; --no data, no more to do
--ack will be generated when client consumes at least 1/2 of data.
IF dataLen >= spaceAvail THEN rcvr.remoteBlocked ← TRUE;
InsertData[body, data, startSeq, endSeq];
CheckForClose[];
RETURN[TRUE];
END; --CheckForData
InsertData: INTERNAL PROC [
body: ArpaBuffer.Body, data: LONG POINTER,
startSeq, endSeq: LONG CARDINAL] =
BEGIN
<<
This is very fragile code. You mess with it without knowing exactly what
is going on, you deserve whatever you get! It's also probably more
complex than it needs to be - IP reassembly algorithms are simpler, and may
be usable here.
>>
moved, dataLen: CARDINAL;
start, end: CARDINAL; --space relative.
p, q, prev, tmp: TcpStreamInternal.DataPair ← NIL;
bStart: CARDINAL; --starting point of data in the buffer to be moved.
--set the ends to reflect the data we can actually move.
dataLen ← LowHalf[endSeq - startSeq] + 1;
start ← LowHalf[startSeq MOD rcvr.inputSpace.size];
end ← start + dataLen - 1;
--Record the starting point in the buffer of the data to be moved.
bStart ← LowHalf[startSeq - SeqToCard[body.tcp.sequence]];
IF end >= rcvr.inputSpace.size THEN --is data going to wrap?
BEGIN
moved ← ByteBlt.ByteBlt[
to: [rcvr.inputSpace.space.pointer, start, rcvr.inputSpace.size],
from: [data, bStart, rcvr.inputSpace.size]];
moved ← moved + ByteBlt.ByteBlt[
to: [rcvr.inputSpace.space.pointer, 0, dataLen - moved],
from: [data, bStart + moved, rcvr.inputSpace.size]];
END
ELSE
moved ← ByteBlt.ByteBlt[
to: [rcvr.inputSpace.space.pointer, start, SUCC[end]],
from: [data, bStart, rcvr.inputSpace.size]];
--Update the sequence number list.
FOR p ← rcvr.inuse.head, p.next UNTIL p = NIL DO --look for startSeq.
SELECT TRUE FROM
--is the starting seq within (or right on either end of) this node?
(GreaterOrEqual[startSeq, p.start-1] AND
LessOrEqual[startSeq, p.end+1]) =>
{p.end ← Max[endSeq, p.end]; p.start ← Min[startSeq, p.start]; EXIT};
--is starting seq going to create a disjoint node before this node?
(Less[startSeq, p.start]) =>
{p ← New[prev, startSeq, endSeq]; EXIT};
ENDCASE; -- (startSeq > p.end) means keep searching
prev ← p;
REPEAT FINISHED =>
--must be disjoint node at the end of the list.
p ← New[prev, startSeq, endSeq];
ENDLOOP;
--now compact the list so that there are no adjacent or overlapping seqments.
FOR q ← p.next, q.next UNTIL q = NIL DO
IF Less[endSeq, q.start-1] THEN EXIT;
p.end ← Max[p.end, q.end];
--IF sanityChecking THEN ListSanityCheck[];
tmp ← q; p.next ← q.next; q ← p; --pull element from middle
rcvr.inuse.length ← PRED[rcvr.inuse.length];
tmp.next ← rcvr.avail.head; rcvr.avail.head ← tmp; --put in avail list
rcvr.avail.length ← SUCC[rcvr.avail.length];
--IF sanityChecking THEN ListSanityCheck[];
ENDLOOP;
IF GreaterOrEqual[rcvr.nextSeq, rcvr.inuse.head.start] AND
LessOrEqual[rcvr.nextSeq, rcvr.inuse.head.end] THEN
{rcvr.nextSeq ← SUCC[rcvr.inuse.head.end]; NOTIFY rcvr.newInput};
END; --InsertData
ProcessOptions: PROC [body: ArpaBuffer.Body] =
BEGIN
m: TcpStreamInternal.MaxSegment = LOOPHOLE[@body.tcp.options];
--**the option type should be a constant in TcpStreamInternal.
IF m.type = 2 THEN xmtr.maxTcpBytes ← m.maxSize;
END; --ProcessOptions
ProcessTcpState: PUBLIC PROC [body: ArpaBuffer.Body] =
--PROCESS: RECEIVER
BEGIN
ack: LONG CARDINAL ← SeqToCard[body.tcp.acknowledgement];
IF Greater[ack, xmtr.unackedSeq] THEN --process the acknowledgement.
BEGIN --process the ack and do the necessary state changes.
SELECT connection.state FROM
synReceived, synSent =>
IF Greater[ack, xmtr.iss] THEN --notify any waiters
{connection.state ← established; Notify[@connection.isEstablished]};
finWait1 =>
IF (ack = xmtr.nextSeq) AND (~body.tcp.fin) THEN
connection.state ← finWait2;
closing =>
IF (ack = xmtr.nextSeq) THEN
{xmtr.timeWaitStart ← System.GetClockPulses[];
connection.state ← timeWait};
lastAck => IF (ack = xmtr.nextSeq) THEN connection.state ← closed;
ENDCASE;
xmtr.unackedSeq ← ack; --This is the only place unackedSeq gets updated.
ConsumeAcked[];
END;
xmtr.maxSeq ← xmtr.unackedSeq + body.tcp.window;
--offered window minus outstanding data is usable window.
IF (body.tcp.window - (xmtr.nextSeq - xmtr.unackedSeq)) >
body.tcp.window / 4 THEN
BEGIN
IF rexmtr.list.head # NIL THEN Notify[@rexmtr.condition]
ELSE Notify[@xmtr.newAllocation];
END;
END; --ProcessTcpState
RcvdTcpPkt: PROC [b: ArpaBuffer.Buffer] =
<<
PROCESS: RECEIVER
Parses incoming packets, no mean feat. There is some redundancy in ConnReq
and ConnRep that should be looked at.
>>
BEGIN
--so clients don't lose data when causing unwind after Close.
ENABLE UNWIND =>
{IF CheckForData[b.arpa] THEN SendAck[]; ArpaBuffer.ReturnBuffer[b]};
body: ArpaBuffer.Body = b.arpa;
len: CARDINAL ← GetDataLength[body].l;
SELECT TRUE FROM
connection.state = closed, --client aborted by calling delete
connection.state = suspended => NULL; --or stream has been suspended.
body.tcp.syn => --syn packet
BEGIN
IF ArpaFlags.doStats THEN ArpaStats.Incr[synsRcvd];
SELECT connection.state FROM
listen => ConnRequest[b];
synSent => ConnReply[b];
ENDCASE => --should not get a syn in already synchronized states.
BEGIN
IF ~ValidSeq[body, len] THEN SendAck[]
ELSE
{SendRst[b]; SuspendStream[reset, remoteReject];
IF ArpaFlags.doStats THEN ArpaStats.Incr[badSyn]};
END;
END;
(body.tcp.sourcePort # connection.remotePort) =>
--Stray packet arrived on a listening conn (remotePort not yet estab.)
{SendRst[b];
IF ArpaFlags.doStats THEN ArpaStats.Incr[badSourcePort]};
~ValidSeq[body, len] =>
{SELECT TRUE FROM
body.tcp.rst => NULL; --stray reset.
(connection.state = listen) => SendRst[b]; --stray packet (not syn).
(connection.state = synSent),
(connection.state = synReceived) => --stray packet, reset sender.
IF ~ValidAck[b] THEN SendRst[b];
(len = 0) AND (~body.tcp.fin) => NULL; --don't ack acks.
(connection.state = timeWait) =>
{SendAck[]; xmtr.timeWaitStart ← System.GetClockPulses[]};
ENDCASE => SendAck[];
IF ArpaFlags.doStats THEN ArpaStats.Incr[seqOutOfRange]};
body.tcp.rst => --valid seq with rst.
{SuspendStream[reset, remoteReject];
IF ArpaFlags.doStats THEN ArpaStats.Incr[resetsReceived]};
~SecurityMatch[body] =>
{SendRst[b];
SELECT connection.state FROM
established, finWait1, finWait2, closeWait, closing, lastAck,
timeWait => SuspendStream[securityMismatch, securityMismatch];
ENDCASE; --closed, listen, synSent, synReceived;
IF ArpaFlags.doStats THEN ArpaStats.Incr[badSecurity]};
~PrecedenceMatch[body] =>
{SendRst[b];
SELECT connection.state FROM
established, finWait1, finWait2, closeWait, closing, lastAck,
timeWait => SuspendStream[precedenceMismatch, precedenceMismatch];
ENDCASE; --closed, listen, synSent, synReceived;
IF ArpaFlags.doStats THEN ArpaStats.Incr[badPrecedence]};
~body.tcp.ack => --every packet except original syn should carry ack.
IF ArpaFlags.doStats THEN ArpaStats.Incr[badAck];
body.tcp.fin =>
SELECT connection.state FROM
established,
finWait1,
finWait2 =>
BEGIN
rcvr.finSig ← TRUE;
rcvr.fin ← SeqToCard[body.tcp.sequence] + len;
[] ← CheckForData[body];
SendAck[];
END;
ENDCASE =>
IF ArpaFlags.doStats THEN ArpaStats.Incr[badFin]; --**Reset here?
body.tcp.urg => --mark urgent pointer, earlier urgents are superceded
BEGIN
rcvr.urg ← Max[
rcvr.urg, SeqToCard[body.tcp.sequence] +
LOOPHOLE[body.tcp.urgentPointer, CARDINAL]];
IF ~rcvr.urgSig THEN --multiple urgents are merged.
{rcvr.urgSig ← TRUE; Notify[@rcvr.urgArrived]};
[] ← CheckForData[body];
SendAck[];
END;
body.tcp.psh => --mark pushed data, earlier pushes are superceded.
BEGIN
lastSeq: LONG CARDINAL ← SeqToCard[body.tcp.sequence];
IF len # 0 THEN lastSeq ← lastSeq + len - 1;
rcvr.push ← Max[lastSeq, rcvr.push];
rcvr.pushSig ← TRUE;
[] ← CheckForData[body];
SendAck[];
END;
ENDCASE => --ah, the vanilla data packet!
IF CheckForData[body] THEN SendAck[];
ArpaBuffer.ReturnBuffer[b]; --This is where receive buffers get returned.
END; --RcvdTcpPkt
Receiver: PUBLIC PROC RETURNS [TcpOps.PSProc] =
--PROCESS: THIS IS THE RECEIVER
BEGIN
b: ArpaBuffer.Buffer;
body: ArpaBuffer.Body;
Process.SetPriority[CommPriorities.receiver];
UNTIL connection.pleaseStop DO
ENABLE ABORTED => EXIT;
--get next packet from socket queue
body ← (b ← ArpaPort.GetPacket[connection.port]).arpa;
--is it worth it?
SELECT TRUE FROM
(body.ipHeader.protocol = tcp) =>
BEGIN
IF ArpaFlags.doStats THEN ArpaStats.Incr[tcpsRcvd];
SELECT TRUE FROM
(body.ipHeader.length < TcpStreamInternal.minTcpHeaderBytes +
ArpaPort.minIPHeaderBytes) =>
{IF ArpaFlags.doStats THEN ArpaStats.Incr[pktTooShort];
ArpaBuffer.ReturnBuffer[b]};
(connection.remoteAddr = ArpaRouter.unknownInternetAddress) =>
RcvdTcpPkt[b]; --this could be the first syn.
ArpaPortInternal.AddrMismatch[
ArpaPortInternal.BuildMasks[body.ipHeader.destination].hostMask,
body.ipHeader.source, connection.remoteAddr] =>
{IF ArpaFlags.doStats THEN ArpaStats.Incr[badSource];
ArpaBuffer.ReturnBuffer[b]};
ENDCASE => RcvdTcpPkt[b];
END; --protocol = tcp
(body.ipHeader.protocol = icmp) =>
BEGIN
SELECT body.icmp.type FROM
unreachable =>
{SuspendStream[noRouteToDestination, noRouteToDestination];
IF ArpaFlags.doStats THEN ArpaStats.Incr[icmpUnreachable]};
ENDCASE => NULL;
ArpaBuffer.ReturnBuffer[b];
END;
ENDCASE =>
BEGIN
IF ArpaFlags.doStats THEN ArpaStats.Incr[badProtocol];
--****icmp error packet here?
ArpaBuffer.ReturnBuffer[b];
END;
ENDLOOP;
RETURN[LOOPHOLE[Receiver]];
END; --Receiver
RequeueProc: PUBLIC PROC [b: ArpaBuffer.Buffer] =
<<
PROCESS: BUFFER MANAGER
Puts the buffer in the retransmission list.
>>
BEGIN
SELECT TRUE FROM
--ARP should find this for us on next try.
(b.fo.status = invalidDestAddr) => {InsertRexmt[b]; ConsumeAcked[]};
(b.fo.status ~IN[pending..aborted]) =>
BEGIN
SuspendStream[noRouteToDestination, SELECT b.fo.status FROM
noTranslationForDestination => noTranslationForDestination,
noAnswerOrBusy => noAnswerOrBusy,
circuitInUse => circuitInUse,
circuitNotReady => circuitNotReady,
dialerHardwareProblem => circuitNotReady,
noDialingHardware => noDialingHardware,
--noRouteToNetwork, hardwareProblem => noRouteToDestination,
ENDCASE => noRouteToDestination];
ArpaBuffer.ReturnBuffer[b]; --don't keep this one - we're dead
END;
--put it on the retransmission list.
ENDCASE => {InsertRexmt[b]; ConsumeAcked[]}; --poke the rexmt list cleaner.
END; --RequeueProc
Retransmitter: PUBLIC PROC RETURNS [TcpOps.PSProc] =
--PROCESS: THIS IS THE RETRANSMITTER
BEGIN
time: LONG CARDINAL;
b: ArpaBuffer.Buffer;
ExtractHead: ENTRY PROC RETURNS [BOOLEAN ← TRUE] =
BEGIN
--stops access to the head of the list (which is being rexmittd).
IF rexmtr.list.head = NIL THEN RETURN[FALSE]; --make sure it's still there.
rexmtr.list.head ← ArpaBuffer.From[rexmtr.list.head.fo.next];
rexmtr.list.length ← PRED[rexmtr.list.length];
END; --ExtractHead
UNTIL connection.pleaseStop DO
ENABLE ABORTED => EXIT;
BEGIN
time ← System.GetClockPulses[];
SELECT TRUE FROM
(connection.state = suspended) => GOTO suspended;
--anything to retransmit?
((b ← rexmtr.list.head) # NIL) =>
SELECT TRUE FROM
((time - b.fo.time) < MIN[(rexmtr.interval * CARDINAL[b.fo.tries]),
rexmtr.ceiling]) => NULL; --too soon
((b.fo.tries ← SUCC[b.fo.tries]) > rexmtr.giveUp) => GOTO suspend;
ENDCASE => --data retransmission.
IF ExtractHead[] THEN --ConsumeAcked may try to return it.
BEGIN
b.fo.time ← System.GetClockPulses[]; --record packet send time
SendPacket[b]; --retransmit the packet
IF ArpaFlags.doStats THEN ArpaStats.Incr[retransmitted];
END;
<<
Remote has closed window or we have missed his window update - is
it time to probe for allocation?
>>
(Greater[xmtr.nextSeq, xmtr.maxSeq]) AND
(System.GetClockPulses[] - xmtr.lastSent) > (rexmtr.interval * 2) =>
SendAllocProbe[];
ENDCASE;
--time to recompute retransmission interval?
IF ((time - rexmtr.calculation) > rexmtr.calculationInterval) THEN
BEGIN
rexmtr.calculation ← time; --reset start of interval
--retransmission interval recalculation
IF (rexmtr.count # 0) THEN
BEGIN
--recalculate retransmission interval
--REUSES LOCAL VARIABLE 'time'
time ← rexmtr.delay / rexmtr.count;
time ← time + (time / 2); --don't get too close
rexmtr.delay ← 0; rexmtr.count ← 0;
--weight old interval more than new (3 to 1)
time ← ((rexmtr.interval * 3) + time) / 4;
--interval is limited both low and high
rexmtr.interval ← MAX[rexmtr.floor, MIN[rexmtr.ceiling, time]];
END;
END; --recompute interval
EXITS
suspended => NULL;
suspend => SuspendStream[transmissionTimeout, timeout];
END; --transmitter not blocked
Wait[@rexmtr.condition];
ENDLOOP;
RETURN[LOOPHOLE[Retransmitter]];
END; --Retransmitter
SendAck: PROC [] =
BEGIN
b: ArpaBuffer.Buffer;
--why send an empty ack when we have something real to send?
IF rexmtr.list.head # NIL THEN {Notify[@rexmtr.condition]; RETURN};
b ← GetSystemBuffer[TRUE]; --he sets b.arpa.tcp.sequence
SetHeaderFields[b, 0, 0];
SendPacket[b];
--**IF ArpaFlags.doStats THEN ArpaStats.Incr[acksSent];
END; --SendAck
SendAllocProbe: ENTRY PROC =
BEGIN
<<
Sends a packet probing the remote to open his allocation window for us.
This is the only case where a data packet is sent using a buffer obtained
from GetSystemBuffer instead of GetOutputBuffer, as we want to always be
able to obtain a buffer for a probe, and we don't want it put in the
rexmtr table.
>>
b: ArpaBuffer.Buffer = GetSystemBuffer[TRUE];
body: ArpaBuffer.Body = b.arpa;
blocked: ArpaBuffer.Body = xmtr.blocked.b.arpa;
body.tcp.sequence ← blocked.tcp.sequence;
SELECT TRUE FROM
(xmtr.blocked.dataLen > 0) =>
BEGIN
body.tcp.psh ← TRUE; --maybe this will make him answer.
body.tcp.bytes[0] ← blocked.tcp.bytes[0];
SetHeaderFields[b, 1, 0];
END;
(blocked.tcp.fin) =>
BEGIN
body.tcp.fin ← TRUE;
SetHeaderFields[b, 0, 0];
END;
ENDCASE => --heaven knows what this is.
BEGIN
ArpaBuffer.ReturnBuffer[b];
RETURN;
END;
SendPacket[b];
END; --SendAllocProbe
SendPacket: PUBLIC PROC [b: ArpaBuffer.Buffer] =
BEGIN
<<
PROCESS: RETRANSMITTER, CLIENT SENDING
Sets the current state information, and transmits. (Or retransmits.)
>>
--never send a packet without current state information
body: ArpaBuffer.Body = b.arpa;
body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq];
body.tcp.window ← LowHalf[rcvr.maxSeq - rcvr.nextSeq] + 1;
ArpaPort.PutPacket[connection.port, b];
xmtr.lastSent ← System.GetClockPulses[]; --this is sufficient for any ack.
END; --SendPacket
SendRst: PUBLIC PROC [offender: ArpaBuffer.Buffer] =
BEGIN
<<
Sends a reset packet in response to the offending packet. If
offender is nil, then the reset is a user-initiated abort.
>>
off: ArpaBuffer.Body;
b: ArpaBuffer.Buffer ← GetSystemBuffer[TRUE];
body: ArpaBuffer.Body = b.arpa;
AssignSeq: ENTRY PROC =
{body.tcp.sequence ← CardToSeq[xmtr.nextSeq]};
SetHeaderFields[b, 0, 0];
body.tcp.rst ← TRUE; --must be why we're here
body.tcp.window ← 0; --This is probably silly.
SELECT TRUE FROM
(offender = NIL) =>
{AssignSeq[]; body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq]};
((off ← offender.arpa).tcp.ack) =>
{body.tcp.sequence ← off.tcp.acknowledgement; body.tcp.ack ← FALSE}
ENDCASE =>
BEGIN
ack: LONG CARDINAL ←
SeqToCard[off.tcp.sequence] + TcpStreamInternal.GetDataLength[off].l;
body.tcp.sequence ← [0, 0];
body.tcp.acknowledgement ← CardToSeq[ack];
END;
ArpaPort.PutPacket[connection.port, b];
IF ArpaFlags.doStats THEN ArpaStats.Incr[resetsSent];
END; --SendRst
SendSyn: PUBLIC PROC [ack: BOOLEAN] =
BEGIN
<<
Send a syn packet, either connection request or response. The only
difference is the acknowledge fields and the maxSize option.
>>
optionLen: CARDINAL ← 0;
b: ArpaBuffer.Buffer = GetOutputBuffer[]; --sets ack field by default.
body: ArpaBuffer.Body = b.arpa;
IF ack THEN body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq]
ELSE body.tcp.ack ← FALSE;
IF advertiseSize THEN
BEGIN
--ends on a 32-bit boundry, so no zero padding.
n: CARDINAL ← (ArpaBuffer.DataBytesPerRawBuffer[b]) -
ArpaPort.minIPHeaderBytes - TcpStreamInternal.minTcpHeaderBytes - 18;
-- 18 is ethernet encap.
m: TcpStreamInternal.MaxSegment ← LOOPHOLE[@body.tcp.options];
m↑ ← [2, 4, n]; --and what the hell is this?
optionLen ← SIZE[TcpStreamInternal.MaxSegmentObj] * bpw;
rcvr.maxTcpBytes ← IF connection.offNet THEN MIN[n, TcpStreamInternal.maxTcpDataBytes] ELSE n;
END;
body.tcp.syn ← TRUE; --this is why we're here
SetHeaderFields[b, 0, optionLen];
body.tcp.sequence ← CardToSeq[xmtr.iss];
body.tcp.window ← rcvr.inputSpace.size;
ArpaPort.PutPacket[connection.port, b];
IF ArpaFlags.doStats THEN ArpaStats.Incr[synsSent];
END; --SendSyn
SetWaitTime: ENTRY PROC [timeout: TcpStream.WaitTime] =
BEGIN --no possibility of signals
--time IN[maxMsecToPulses..INFINITY] => silly client NEVER wake up!
rcvr.waitTime ← timeout;
rcvr.interval ← IF timeout > maxMsecToPulses THEN LAST[LONG CARDINAL]
ELSE System.MicrosecondsToPulses[timeout*1000];
IF rcvr.interval = LAST[LONG CARDINAL] THEN
Process.DisableTimeout[@rcvr.newInput]
ELSE Process.SetTimeout[
@rcvr.newInput, CommUtil.PulsesToTicks[[rcvr.interval]]];
END; --SetWaitTime
StartTcpStream: PUBLIC TcpOps.StartTcpStreamProc =
<<
PROC[
local, remote: ArpaRouter.InternetAddress,
localPort, remotePort: ArpaRouter.Port,
timeout: TcpStream.WaitTime, precedence: TcpStream.Precedence,
security: TcpStream.Security, options: Environment.Block,
establish: BOOLEAN, gf: LONG POINTER --TO FRAME[TcpImpl]--] RETURNS [
TcpStream.Handle, ArpaRouter.InternetAddress, ArpaRouter.Port];
Process is the client's.
>>
BEGIN
time: LONG CARDINAL ← System.GetClockPulses[];
--Initialize stream object.
tcpStreamObject ← [
destroy: NIL, --done from the outside(Mgr)
put: PutBlock,
get: GetBlock,
waitForUrgent: WaitForUrgent,
close: Close,
setWaitTime: SetWaitTime,
findAddresses: FindAddresses];
InitStream[
local: local, remote: remote, localPort: localPort,
remotePort: remotePort, timeout: timeout, precedence: precedence,
security: security, options: options];
output.b ← NIL;
output.bufferSize ← 0;
output.finger ← 0;
SetWaitTime[timeout];
--and now create the stream (or listener).
time ← System.GetClockPulses[];
SELECT TRUE FROM
(connection.state = closed) AND establish => --active create
BEGIN
rcvr.process ← FORK Receiver[];
InitXmtr[];
connection.state ← synSent;
SendSyn[ack: FALSE];
DO --UNTIL failed | established.
Wait[@connection.isEstablished];
SELECT TRUE FROM
(connection.state = suspended) =>
ERROR TcpStream.Failed[connection.whyFailed];
(connection.state = established),
(connection.state = closeWait),
(connection.state = closing),
(connection.state = lastAck),
(connection.state = timeWait) => EXIT; --stream is (or was) estab.
--streams in listen, synSent, and synReceived can still fail.
((System.GetClockPulses[] - time) > rcvr.interval) =>
BEGIN
time ← System.GetClockPulses[];
ERROR TcpStream.Failed[timeout];
END;
ENDCASE;
ENDLOOP;
END;
connection.state = closed => --create listener
BEGIN
connection.state ← listen;
rcvr.process ← FORK Receiver[];
IF notifyListenerStarted # NIL THEN notifyListenerStarted[];
DO --UNTIL failed | established.
Wait[@connection.isEstablished];
SELECT TRUE FROM
(connection.state = suspended) =>
ERROR TcpStream.Failed[connection.whyFailed];
(connection.state = established),
(connection.state = closeWait),
(connection.state = closing),
(connection.state = lastAck),
(connection.state = timeWait) => EXIT; --stream is (or was) estab.
--streams in listen, synSent, and synReceived can still fail.
((System.GetClockPulses[] - time) > rcvr.interval) =>
BEGIN
time ← System.GetClockPulses[];
SIGNAL TcpStream.ListenTimeout;
END;
ENDCASE;
ENDLOOP;
END;
ENDCASE => Driver.Glitch[IllegalState]; --How did we get here?!
RETURN[@tcpStreamObject, connection.remoteAddr, connection.remotePort];
END; --StartTcpStream
StopTcpStream: PUBLIC TcpOps.StopTcpStreamProc =
--PROCESS: CLIENT STREAM DELETION.
BEGIN
--allowed for last ack to get to remote.
timeWaitInterval: LONG CARDINAL ← rcvr.ackInterval * 2;
TimeWaitLoop: PROC =
BEGIN
UNTIL (System.GetClockPulses[] - xmtr.timeWaitStart) > timeWaitInterval DO
Process.Pause[1];
IF (connection.state = suspended) THEN EXIT; --got a reset and suspended.
ENDLOOP;
END; --TimeWaitLoop
--Give the transmitter a kick in case he is waiting on allocation.
Notify[@xmtr.newAllocation];
--try to let close protocol complete.
SELECT connection.state FROM
closed => NULL;
lastAck =>
WHILE (xmtr.nextSeq # xmtr.unackedSeq) AND --until we get that last ack.
(connection.state # suspended) DO --or we get a reset and suspend.
Process.Pause[2];
ENDLOOP;
closing =>
BEGIN
WHILE (xmtr.nextSeq # xmtr.unackedSeq) AND --until we get that last ack
(connection.state # suspended) DO --or we get a reset and suspend.
Process.Pause[2];
ENDLOOP;
--timer is reset if a retransmitted fin is received.
xmtr.timeWaitStart ← System.GetClockPulses[];
connection.state ← timeWait;
TimeWaitLoop[];
END;
timeWait => TimeWaitLoop[];
suspended, listen => NULL;
ENDCASE => SendRst[NIL];
connection.pleaseStop ← TRUE;
IF rcvr.process # NIL THEN
{Process.Abort[rcvr.process]; [] ← JOIN rcvr.process;
rcvr.process ← NIL};
IF rexmtr.process # NIL THEN
{Process.Abort[rexmtr.process]; [] ← JOIN rexmtr.process;
rexmtr.process ← NIL};
connection.state ← closed;
FlushDataList[@rcvr.inuse];
FlushDataList[@rcvr.avail];
FlushRexmtrList[];
IF output.b # NIL THEN ArpaBuffer.ReturnBuffer[output.b];
[] ← Space.Unmap[rcvr.inputSpace.space.pointer];
TcpPort.DeleteTcpPort[connection.port];
END; --StopTcpStream
SuspendStream: PUBLIC ENTRY PROC [
suspend: TcpStream.SuspendReason, fail: TcpStream.FailureReason] =
BEGIN
--PROCESS: RECEIVER OR RETRANSMITTER
SELECT connection.state FROM
suspended => NULL;
ENDCASE =>
BEGIN
connection.stateBeforeSuspension ← connection.state;
connection.state ← suspended;
connection.whySuspended ← suspend;
connection.whyFailed ← fail;
END;
<<
Notify the internal processes, so they will loop again and discover the
stream has been suspended.
>>
NOTIFY rcvr.newInput;
NOTIFY xmtr.newAllocation;
<<
Abort any client waiting for a buffer. This will be translated into
Suspended by the GetBuffer code.
>>
IF xmtr.clientProcess # NIL THEN Process.Abort[xmtr.clientProcess];
END; --SuspendStream
PutBlock: PUBLIC PROC [block: Environment.Block, push, urgent: BOOLEAN] =
--PROCESS: CLIENT SENDING
BEGIN
--monitored so ForceOut can check for output data pending.
AssignBuffer: ENTRY PROC = INLINE {output.b ← b};
moved, n: CARDINAL ← 0;
oBlock: Environment.Block;
b: ArpaBuffer.Buffer ← NIL;
--a push with no new data.
IF (block.startIndex = block.stopIndexPlusOne) AND (push OR urgent) THEN
{IF (output.b # NIL) THEN --is there any old data to be pushed?
ForceOut[setPush: TRUE, setFin: FALSE, setUrg: TRUE]}
ELSE
WHILE block.startIndex < block.stopIndexPlusOne DO
IF output.b = NIL THEN --need a new buffer.
BEGIN
b ← GetOutputBuffer[];
n ← (ArpaBuffer.DataBytesPerRawBuffer[b]) -
ArpaPort.minIPHeaderBytes - TcpStreamInternal.minTcpHeaderBytes - 18;
-- 18 ethernet encap
AssignBuffer[];
output.finger ← 0;
output.bufferSize ← MIN[IF connection.offNet THEN MIN[TcpStreamInternal.maxTcpDataBytes, xmtr.maxTcpBytes] ELSE xmtr.maxTcpBytes, n];
END;
oBlock ← [
blockPointer: LOOPHOLE[@output.b.arpa.tcp.bytes],
startIndex: output.finger, stopIndexPlusOne: output.bufferSize];
moved ← ByteBlt.ByteBlt[oBlock, block];
block.startIndex ← block.startIndex + moved;
--Is packet full?
IF (output.finger ← output.finger + moved) = output.bufferSize THEN
--Push bit may get set on last packet, and this may be last packet (if
--packet and client data end at the same time).
ForceOut[
setPush: (block.startIndex >= block.stopIndexPlusOne) AND push,
setFin: FALSE, setUrg: urgent];
REPEAT
FINISHED => --pushed data indicates push on last packet.
IF push AND (output.b # NIL) THEN
ForceOut[setPush: TRUE, setFin: FALSE, setUrg: urgent];
ENDLOOP;
END; --PutBlock
UpdateInputSpace: INTERNAL PROC [consumed: CARDINAL]
RETURNS [needsAck: BOOLEAN ← FALSE] =
BEGIN
<<
Updates the head of list based on how much data the client just consumed.
>>
tmp: TcpStreamInternal.DataPair;
rcvr.inuse.head.start ← rcvr.inuse.head.start + consumed;
rcvr.lastConsumed ← PRED[rcvr.inuse.head.start];
IF TcpStreamInternal.Greater[rcvr.inuse.head.start, rcvr.inuse.head.end] THEN
BEGIN
<<
Client consumed all the available (contiguous) data. So pull the
DataPair out of the list and put it in the avail list.
>>
--IF sanityChecking THEN ListSanityCheck[];
tmp ← rcvr.inuse.head; rcvr.inuse.head ← tmp.next;
rcvr.inuse.length ← PRED[rcvr.inuse.length];
tmp.next ← rcvr.avail.head; rcvr.avail.head ← tmp;
rcvr.avail.length ← SUCC[rcvr.avail.length];
--IF sanityChecking THEN ListSanityCheck[];
END
ELSE
<<
rcvr.inputSpace.size => total space available
CARDINAL[rcvr.nextSeq - rcvr.lastConsumed] => amount of space in use
So, if he was blocked and we now have more than half of the data space
available, prod him for some more data.
This assumes that the input space size is at least twice as large as
the packets being used.
>>
IF rcvr.remoteBlocked AND
CARDINAL[
rcvr.nextSeq - rcvr.lastConsumed] < (rcvr.inputSpace.size / 2) THEN
{rcvr.remoteBlocked ← FALSE; needsAck ← TRUE};
END; --UpdateInputSpace
ValidAck: PROC [b: ArpaBuffer.Buffer] RETURNS [BOOLEAN] =
BEGIN
IF GreaterOrEqual[SeqToCard[b.arpa.tcp.acknowledgement], xmtr.iss] AND
LessOrEqual[SeqToCard[b.arpa.tcp.acknowledgement], xmtr.nextSeq] THEN
RETURN[TRUE]
ELSE RETURN[FALSE];
END; --ValidAck
ValidSeq: PROC [body: ArpaBuffer.Body, len: CARDINAL]
RETURNS [accept: BOOLEAN ← FALSE] =
BEGIN
inSeq: LONG CARDINAL ← SeqToCard[body.tcp.sequence];
SELECT len FROM
> 0 => --most common case of carrying at least one data byte.
BEGIN
lastByte: LONG CARDINAL ← inSeq + len - 1;
SELECT TRUE FROM
inSeq IN [rcvr.nextSeq..rcvr.maxSeq], --first byte in window.
lastByte IN [rcvr.nextSeq..rcvr.maxSeq], --last byte in window.
Greater[rcvr.nextSeq, inSeq] AND
Less[rcvr.maxSeq, lastByte] => --some middle bytes in window.
{ProcessTcpState[body]; accept ← TRUE};
ENDCASE;
END;
0 => --zero bytes of data, but need to check for state.
IF Greater[rcvr.nextSeq, rcvr.maxSeq] THEN
--window closed, seq must equal receiver's next.
{IF (inSeq = rcvr.nextSeq) THEN
ProcessTcpState[body]; --can't accept, but process state anyway.
}
ELSE --window not closed.
IF inSeq IN [rcvr.nextSeq..rcvr.maxSeq] THEN
{ProcessTcpState[body]; accept ← TRUE};
ENDCASE; --length < 0?!
END; --ValidSeq
WaitForUrgent: ENTRY PROC [block: Environment.Block] =
BEGIN ENABLE UNWIND => NULL;
WAIT rcvr.urgArrived;
END; --WaitForUrgent
END...
LOG
15-May-85 12:43:27 SMA Created file.
6-Jan-86 11:58:28 SMA More twiddles to CheckForData and InsertData
7-Jan-86 10:35:02 SMA Maximum packet size from 100 to 576.
23-Mar-86 16:19:40 SMA Months and months of debugging.
14-May-86 11:27:27 SMA ENABLE UNWIND in WaitForUrgent.
16-May-86 8:33:42 SMA Problems with out-of-seq one byte packets.
16-May-86 8:35:13 SMA Urgent passed to ForceOut
19-May-86 12:52:36 SMA Check incoming urgent before push.
4-Jun-86 11:49:15 SMA Set urgent pointer correctly.
5-Jun-86 12:45:33 SMA Fix push processing for empty packets.
13-Jun-86 11:10:22 SMA Check for suspended (from reset) in StopTcpStream.
18-Jun-86 14:28:24 SMA Reset b.time on retransmission.
19-Jun-86 14:49:08 SMA Integer arithmetic when calsulating usable window.
20-Jun-86 16:17:27 SMA Don't consume fin prematurely.
20-Jun-86 17:13:29 SMA Receiver more generous with acks.
? ALD/ISI Fix ceiling calc. in rexmtr, ICMP bug, etc.
8-Jul-86 14:42:02 JAV Fixed loop when pushed recieved in GetBlock
17-Dec-86 12:19:33 SMA Ack not generated on every GetBlock (use remoteBlocked).
7-Jan-87 15:27:53 SMA Initialize "stream markers" to initial receive seq. #.
2-Feb-87 11:42:15 SMA Added probing for allocation code.
9-Feb-87 19:35:06 SMA Receiver less generous with acks (sigh).
11-Mar-87 12:17:37 AOF Funston buffer management, et al.
2-Jun-87 10:55:52 AOF Caching of DataObject entries.
2-Jun-87 10:56:10 AOF Recoding of time to send ack computation.
3-Mar-88 13:47:16 AOF Use global variable for start/stop procs.