-- File: TcpImpl.mesa - last edit:
-- AOF                  3-Mar-88 14:05:23
-- JAV                 19-Nov-87 13:59:40
-- sma                  9-Feb-87 19:35:36
-- Copyright (C) 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved.

DIRECTORY
  ArpaBuffer USING [
    AccessHandle, Body, Buffer, DataBytesPerRawBuffer, GetBuffer, ReturnBuffer,
    To, From],
  ArpaFlags USING [doStats],
  ArpaPortInternal USING [AddrMismatch, BuildMasks, GetSubnetMask],
  ArpaPort USING [GetPacket, Handle, minIPHeaderBytes, PutPacket],
  ArpaRouter USING [InternetAddress, GetAddress, unknownInternetAddress, Port],
  ArpaStats USING [Incr],
  ArpaTypes USING [Cardinal32, InternetAddress, Port],
  ByteBlt USING [ByteBlt],
  CommHeap USING [zone],
  CommPriorities USING [receiver],
  CommUtil USING [PulsesToTicks],
  Driver USING [Glitch],
  Environment USING [Block, Byte, bytesPerWord],
  Mopcodes USING [zEXCH],
  Process USING [
    Abort, CancelAbort, DisableTimeout, GetCurrent, Pause,
    SetPriority, SetTimeout],
  Space USING [Interval, Unmap],
  System USING [GetClockPulses, MicrosecondsToPulses],
  TcpOps USING [PSProc, PSProcess, StartTcpStreamProc, StopTcpStreamProc],
  TcpPort USING [DeleteTcpPort, SpecifyRemote],
  TcpStream USING [Closed, CompletionCode, Failed, FailureReason, ListenTimeout,
    Object, Precedence, Security, Suspended, SuspendReason, WaitTime],
  TcpStreamInternal USING [Connection, DataPair, DataList, 
    GetDataLength, Greater, GreaterOrEqual, Less, LessOrEqual, LowHalf, Max,
    MaxSegment, MaxSegmentObj, maxTcpDataBytes, Min, minTcpHeaderBytes, PairObject,
    PrecedenceMatch, Rcvr, SecurityMatch, State, Xmtr, Rexmtr, InitStream,
    SetHeaderFields];
  
TcpImpl: MONITOR
  IMPORTS
    ArpaBuffer, ArpaPortInternal, ArpaPort, ArpaRouter, ArpaStats,
    ByteBlt, CommHeap, CommUtil, Driver, Process, Space, System, TcpPort,
    TcpStream, TcpStreamInternal
  EXPORTS TcpOps, ArpaRouter =
  BEGIN OPEN TcpStreamInternal;
  
  --debugging the @#$%~&* receive list.
  sanityChecking: BOOLEAN = TRUE;
  bpw: NATURAL = Environment.bytesPerWord;
  start: PUBLIC TcpOps.StartTcpStreamProc ← StartTcpStream;
  stop: PUBLIC TcpOps.StopTcpStreamProc ← StopTcpStream;
  
  --for calculations on 32 bit sequence numbers  
  SeqToCard: PROC [ArpaTypes.Cardinal32] RETURNS [LONG CARDINAL] =
    MACHINE CODE {Mopcodes.zEXCH};
    
  CardToSeq: PROC [LONG CARDINAL] RETURNS [ArpaTypes.Cardinal32] =
    MACHINE CODE {Mopcodes.zEXCH};
  
  <<ListSanityCheck: INTERNAL PROC =
    BEGIN
    --Compares the length of the list to the number of nodes in hopes of
    --glitching just after the code that made it bad.
    SELECT rcvr.inuse.length FROM
      0 =>
        IF rcvr.inuse.head = NIL THEN RETURN ELSE Driver.Glitch[ListGarbled];
      ENDCASE =>
        BEGIN
	p: TcpStreamInternal.DataPair ← NIL;
	n: CARDINAL ← 0;
	FOR p ← rcvr.inuse.head, p.next UNTIL p = NIL DO
	  n ← n + 1;
	  ENDLOOP;
	IF n # rcvr.inuse.length THEN Driver.Glitch[ListGarbled];
	END;
    END;  --ListSanityCheck >>
    
    
  Port: PUBLIC --ArpaRouter-- TYPE = ArpaTypes.Port;
  InternetAddress: PUBLIC --ArpaRouter-- TYPE = ArpaTypes.InternetAddress;
  
  maxMsecToPulses: LONG CARDINAL = LAST[LONG CARDINAL] / 1000;

  --max sender can have outstanding.
  advertiseSize: BOOLEAN = TRUE;  --send max size option on conn request.
    
  xmtr: PUBLIC TcpStreamInternal.Xmtr;
  rcvr: PUBLIC TcpStreamInternal.Rcvr;
  rexmtr: PUBLIC TcpStreamInternal.Rexmtr;
  connection: PUBLIC TcpStreamInternal.Connection;

  --for managing blocks of client output.
  output: PUBLIC RECORD [
    b: ArpaBuffer.Buffer,  --current output buffer in use (or NIL).
    finger: CARDINAL,  --bytes consumed from b.
    bufferSize: CARDINAL];  --max bytes permitted in output.
    
  tcpStreamObject: TcpStream.Object;
  
  --Glitches for debugging, some may be eventually subject to a "doDebug" switch.
  IllegalState: ERROR = CODE;
  TrashInRexmtr: ERROR = CODE;
  DataAfterFin: ERROR = CODE;
  ListGarbled: ERROR = CODE;
  PushNotCleared: ERROR = CODE;
     
  
  --Used by procs that notify|wait without state checking required
  Notify: ENTRY PROC [c: LONG POINTER TO CONDITION] = {NOTIFY c↑};
  Wait: ENTRY PROC [c: LONG POINTER TO CONDITION] =
    {ENABLE UNWIND => NULL; WAIT c↑};
    
  --PROCEDURES
  
  Close: PUBLIC PROC =
    BEGIN
    ENABLE UNWIND => NULL;
    SELECT connection.state FROM
      established =>
        {ForceOut[setPush: FALSE, setFin: TRUE, setUrg: FALSE];
	connection.state ← finWait1};
      closeWait =>
        {ForceOut[setPush: FALSE, setFin: TRUE, setUrg: FALSE];
	connection.state ← lastAck};
      suspended => RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended];
      finWait1, finWait2, closing, lastAck, timeWait => ERROR TcpStream.Closed;
      ENDCASE =>  --closed, listen, synSent, synReceived.
        Driver.Glitch[IllegalState];  --How did we get so far?
    END;  --Close
    
    
  ConnReply: PROC [b: ArpaBuffer.Buffer] =
     <<
     PROCESS: RECEIVER
     In synSent state, if the incoming packet is a SYN with an ACK to the
     SYN that we sent, advance to established state.
     >>
     BEGIN
     body: ArpaBuffer.Body = b.arpa;

     SELECT TRUE FROM
       (~body.tcp.ack) => RETURN;  --reply syn must always carry ack
       --Must be a stray - he can't ack something that we haven't even sent.
       LessOrEqual[SeqToCard[body.tcp.acknowledgement], xmtr.iss],        
       Greater[SeqToCard[body.tcp.acknowledgement], xmtr.nextSeq] =>
	 {IF ~body.tcp.rst THEN SendRst[b]; RETURN};
       --duplicate
       Less[SeqToCard[body.tcp.acknowledgement], xmtr.unackedSeq] => RETURN;
       ENDCASE;
     
     --carries an acceptable ack - check for other error conditions.
     SELECT TRUE FROM
       body.tcp.rst => ERROR TcpStream.Failed[remoteReject];
       ~SecurityMatch[body] =>
	 {IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedSecurityMismatch];
         SendRst[b]; RETURN};
       ~PrecedenceMatch[body] =>
         {IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedPrecedencedMismatch];
	 SendRst[b]; RETURN};
       (body.tcp.dataOffset > 5) => ProcessOptions[body];
       ENDCASE;
       
     --only good guys get here.
     InitRcvr[body.tcp.sequence];
     ProcessTcpState[body];
     [] ← CheckForData[body];  --may have data for client. 
     SendAck[];  -- send a SYNACK
     END;  --ConnReply
     
      
  ConnRequest: PUBLIC PROC [b: ArpaBuffer.Buffer] =
    BEGIN
    <<
    PROCESS: RECEIVER
    In listen state, if the incoming packet is a syn, advance to synReceived
    state and send a syn/ack.  Also process precedence, security and options. 
    arbitration.
    >>
    body: ArpaBuffer.Body = b.arpa;
    SELECT TRUE FROM
      (body.tcp.rst) =>
        IF ArpaFlags.doStats THEN ArpaStats.Incr[droppedBadRst];
      (body.tcp.ack) =>
        {IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedBadAck]; SendRst[b]};
      (body.tcp.syn) =>
        BEGIN
        SELECT TRUE FROM
	  ~SecurityMatch[body] =>
            BEGIN
	    IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedSecurityMismatch];
	    SendRst[b];
	    END;
	  ~PrecedenceMatch[body] =>
	    BEGIN
	    IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedPrecedencedMismatch];
	    SendRst[b];
	    END;
	  ENDCASE =>  --OK, we like this one.
	    BEGIN
	    mySubnetMask: ArpaRouter.InternetAddress ← ArpaPortInternal.GetSubnetMask[];
	    myInternetAddress: ArpaRouter.InternetAddress ← ArpaRouter.GetAddress[];
	    
	    connection.remoteAddr ← body.ipHeader.source;
	    connection.remotePort ← body.tcp.sourcePort;
	    IF ArpaPortInternal.AddrMismatch[
	      IF ArpaPortInternal.GetSubnetMask[] # ArpaRouter.unknownInternetAddress THEN 
	        ArpaPortInternal.GetSubnetMask[] 
	      ELSE 
	        ArpaPortInternal.BuildMasks[myInternetAddress].netMask, 
	      myInternetAddress, 
	      body.ipHeader.source] 
	      THEN
	        connection.offNet ← TRUE ELSE connection.offNet ← FALSE;
	    TcpPort.SpecifyRemote[connection.port, connection.remoteAddr,
	      connection.remotePort];
	    IF body.tcp.dataOffset > 5 THEN ProcessOptions[body];
	    InitXmtr[];
	    InitRcvr[body.tcp.sequence];
	    connection.state ← synReceived;
	    SendSyn[ack: TRUE];  --send a syn in reply
	    IF CheckForData[body] THEN SendAck[];  --may have data for client.
	    END;
	  END;  --tcp.syn
       ENDCASE => IF ArpaFlags.doStats THEN ArpaStats.Incr[droppedJunkRequest];
     END;  --ConnRequest
     
       
  ConsumeAcked: PUBLIC ENTRY PROC =
    <<
    PROCESS: RETRANSMITTER
    Cleans house on the retransmission list.
    No possibility of signals.
    >>
    BEGIN
    
    UNTIL rexmtr.list.head = NIL DO
      b: ArpaBuffer.Buffer ← rexmtr.list.head;
      body: ArpaBuffer.Body = b.arpa;
      dataLen: CARDINAL ← GetDataLength[body].l;
      IF (dataLen = 0) AND ~body.tcp.fin AND ~body.tcp.syn THEN
        Driver.Glitch[TrashInRexmtr];  --how did an empty buffer get in here? 
      --fins and syn consume a sequence number.
      IF body.tcp.fin OR body.tcp.syn THEN dataLen ← SUCC[dataLen];
      
      IF (LessOrEqual[xmtr.unackedSeq, 
	  SeqToCard[body.tcp.sequence] + PRED[dataLen]]) THEN EXIT;  --not acked.
		    
      <<
      This buffer has been acked, take it out of the list and collect data for
      retransmission interval calculation. Delay is time since buffer was first
      sent.
      >>
      rexmtr.delay ← rexmtr.delay + (System.GetClockPulses[] - b.fo.time);  
      <<
      Why is the "(CARDINAL[b.fo.tries - 1] * rexmtr.interval)" factor in
      here? I'm probably missing something. AOF  2-Jun-87 17:31:39

      rexmtr.delay ← rexmtr.delay + (CARDINAL[b.fo.tries - 1] * rexmtr.interval) 
      	+ (System.GetClockPulses[] - b.fo.time);
      >>
      rexmtr.count ← SUCC[rexmtr.count];  --and the number of participants
      rexmtr.list.head ← ArpaBuffer.From[b.fo.next];
      rexmtr.list.length ← PRED[rexmtr.list.length];
      ArpaBuffer.ReturnBuffer[b];
      ENDLOOP;
    IF rexmtr.list.length > 0 THEN NOTIFY rexmtr.condition  --on to next
    ELSE NOTIFY xmtr.newAllocation;
    END;  --ConsumeAcked
    
    
  FindAddresses: PROC RETURNS [
    localAddr, remoteAddr: InternetAddress,
    localPort, remotePort: ArpaRouter.Port] =
    {RETURN[connection.localAddr, connection.remoteAddr, connection.localPort,
      connection.remotePort]};  --FindAddresses
    
  FlushDataList: PROC[list: LONG POINTER TO TcpStreamInternal.DataList] =
    BEGIN  --Frees list of sequence number pairs.
    UNTIL list.head = NIL DO
      this: TcpStreamInternal.DataPair ← list.head;  --pick up list head
      list.head ← this.next;  --record next entry in list head
      CommHeap.zone.FREE[@this];  --free this enry
      ENDLOOP;
    END;  --FlushDataList
    
    
  FlushRexmtrList: PROC =
    BEGIN  --Returns all buffers on the rexmtr list.
    UNTIL rexmtr.list.head = NIL DO
      b: ArpaBuffer.Buffer ← rexmtr.list.head;  --pick first element off
      rexmtr.list.head ← ArpaBuffer.From[b.fo.next];  --copy link to head
      ArpaBuffer.ReturnBuffer[b];  --free this buffer
      rexmtr.list.length ← PRED[rexmtr.list.length];  --decrement the count
      ENDLOOP;
    END;  --FlushRexmtrList
    
     
  ForceOut: PROC [setPush, setFin, setUrg: BOOLEAN] =
    BEGIN
    <<
    PROCESS: CLIENT SENDING
    Assigns the sequence number and then either sends the packet or waits for
    allocation from remote end.
    >>
    b: ArpaBuffer.Buffer;
    dataLen: CARDINAL ← 0;
       
    CopyOutBuffer: ENTRY PROC = INLINE
      BEGIN
      b ← output.b;
      output.b ← NIL;
      dataLen ← output.finger;
      output.finger ← 0;
      END;  --CopyOutBuffer
    
    AssignSeqAndAssertSend: ENTRY PROC =
      BEGIN
      ENABLE UNWIND => NULL;
      
      --assign the next sequence number to this buffer.
      sequenceNumber: LONG CARDINAL ← xmtr.nextSeq;
      b.arpa.tcp.sequence ← CardToSeq[xmtr.nextSeq];
      
      b.arpa.tcp.fin ← setFin;
	
      DO  --EXITs (success) or ERRORs (failure)
	SELECT connection.state FROM
	  established, closeWait =>
	    BEGIN
	    IF LessOrEqual[sequenceNumber, xmtr.maxSeq] THEN EXIT;
	    <<
	    Copy b so that the allocation probing code in the rexmtr can
	    get a hold of the oldest byte of data.
	    >>
	    xmtr.blocked ← [b, dataLen];   
	    WAIT xmtr.newAllocation;  --wait for something to happen
	    xmtr.blocked ← [NIL, 0];  --clear state info
	    END;
	  suspended =>
	    RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended];
	  finWait1, finWait2, closing, lastAck, timeWait =>
	    RETURN WITH ERROR TcpStream.Closed;  --no send after closing.
	  --listen, synSent, synReceived
	  ENDCASE =>  Driver.Glitch[IllegalState];  --How did we get so far?
        ENDLOOP;
      
      --This is the only proc that updates xmtr.nextSeq.
      xmtr.nextSeq ← xmtr.nextSeq + dataLen;
      --and fins consume a sequence number.
      IF (b.arpa.tcp.fin ← setFin) THEN xmtr.nextSeq ← SUCC[xmtr.nextSeq];
      END;  --AssignSeqAndAssertSend
    
    --start of procedure ForceOut.
    CopyOutBuffer[];  --get the state copied out safely
    IF b = NIL THEN b ← GetOutputBuffer[];
    
    AssignSeqAndAssertSend[! UNWIND => xmtr.blocked ← [NIL, 0]];
    SetHeaderFields[b, dataLen, 0];
    b.arpa.tcp.psh ← setPush;  --copy client's request
    IF setUrg THEN  --dataLen should always be > 0, so PRED[dataLen] is safe.
      {b.arpa.tcp.urg ← TRUE; b.arpa.tcp.urgentPointer ← LOOPHOLE[PRED[dataLen]]};
    SendPacket[b];  --on its way out
    END;  --ForceOut

  
  GetBlock: PUBLIC PROC[block: Environment.Block]
    RETURNS [byteCount: CARDINAL, completionCode: TcpStream.CompletionCode] =
    --PROCESS: CLIENT RECEIVING
    BEGIN
    ENABLE UNWIND => NULL;
    moved: CARDINAL;  --amount transferred.
    remoteClosed: BOOLEAN ← FALSE;  --implies a push of data.
    start, end: CARDINAL;  --starting and ending bytes of data, space relative.
    count: CARDINAL;  --minimum of requested and available bytes.
    ack, out: BOOLEAN;
    
    Locked: ENTRY PROC RETURNS[exit, needsAck: BOOLEAN ← FALSE] =
      BEGIN ENABLE UNWIND => NULL;
      SELECT TRUE FROM
        --did we find a push previous time thru loop?
	(completionCode = pushed) => {exit ← TRUE; RETURN};  --yes
	--is there data pending?
	(rcvr.inuse.head # NIL) AND
	  (rcvr.inuse.head.start = SUCC[rcvr.lastConsumed]) =>  --yes
	  BEGIN
	  rcvrStart: LONG CARDINAL = rcvr.inuse.head.start;
	  rcvrStartMinus1: LONG CARDINAL = PRED[rcvrStart];
	  start ← LowHalf[rcvrStart MOD rcvr.inputSpace.size];
	  count ← LowHalf[Min[  --min of data available & data wanted.
	    rcvr.inuse.head.end - rcvrStartMinus1,
	    block.stopIndexPlusOne - block.startIndex]];
		  
	  --is the pending data within client request AND pushed?
	  SELECT TRUE FROM
	    ~rcvr.pushSig => NULL;     --no push
	    GreaterOrEqual[rcvr.push, rcvrStart] =>
	      IF LessOrEqual[rcvr.push, rcvrStartMinus1 + count] THEN
		BEGIN
		<<count ← LowHalf[Min[count, rcvr.push - rcvrStartMinus1]];>>
		completionCode ← pushed;
		rcvr.pushSig ← FALSE;
		END;
	    ENDCASE => Driver.Glitch[PushNotCleared];
	      
	  --is there an urgent within client requested data?
	  SELECT TRUE FROM
	    ~rcvr.urgSig => NULL;  --no urgent
	    GreaterOrEqual[rcvr.urg, rcvrStart] =>
	      IF LessOrEqual[rcvr.urg, rcvrStartMinus1 + count] THEN
		BEGIN
		count ← LowHalf[Min[count, rcvr.urg - rcvrStartMinus1]];
		completionCode ← endUrgent;  --even if it is also pushed.
		rcvr.urgSig ← FALSE;
		END;
	    ENDCASE;
	      
	  end ← start + count - 1;
	    
	  IF end >= rcvr.inputSpace.size THEN  --retrieving wrapped data?
	    BEGIN
	    moved ← ByteBlt.ByteBlt[block,
	      [rcvr.inputSpace.space.pointer, start, rcvr.inputSpace.size]];
	    moved ← moved + ByteBlt.ByteBlt[
	      [block.blockPointer, block.startIndex + moved,
	      block.stopIndexPlusOne],
	      [rcvr.inputSpace.space.pointer, 0, count - moved]] 
	    END
	  ELSE moved ← ByteBlt.ByteBlt[block,
	    [rcvr.inputSpace.space.pointer, start, SUCC[end]]];
	  block.startIndex ← block.startIndex + moved;
	  byteCount ← byteCount + moved;
	  needsAck ← UpdateInputSpace[moved];
	  rcvr.maxSeq ← rcvr.nextSeq +
	    rcvr.inputSpace.size - (rcvr.nextSeq - rcvr.lastConsumed);
	  END;
	  
	--connection is closing, implying a push at the end of the data.
	(connection.state = closed),
	(connection.state = closeWait),
	(connection.state = closing),
	(connection.state = lastAck),
	(connection.state = timeWait) => {completionCode ← closing; exit ← TRUE};
	
	rcvr.waitTime = 0 => {completionCode ← timeout; exit ← TRUE};
 
	(System.GetClockPulses[] - rcvr.timeout) > rcvr.interval =>
	   {completionCode ← timeout; exit ← TRUE};
	
	--Wait for the rest of data.
	(connection.state = established),
	(connection.state = finWait1),
	(connection.state = finWait2) =>
	  IF (rcvr.inuse.head = NIL) OR
	    (rcvr.inuse.head.start # SUCC[rcvr.lastConsumed]) THEN
	    WAIT rcvr.newInput;
	    
	(connection.state = suspended) =>   
	   RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended];
	 ENDCASE =>  --listen, synSent, synReceived.
	   Driver.Glitch[IllegalState];  --How on earth did we get this far?
      END;  --Locked
    
    --start of GetBlock.
    byteCount ← 0;
    completionCode ← normal;
    rcvr.timeout ← System.GetClockPulses[];
    WHILE block.startIndex < block.stopIndexPlusOne DO  
      [out, ack] ← Locked[];  --can't call SendAck while holding monitor.
      IF ack THEN SendAck[];  --but we can right afterwards
      IF out THEN EXIT;  --and maybe we can get out of here too
      ENDLOOP;
    END;  --GetBlock
    
    
  GetOutputBuffer: PUBLIC PROC RETURNS [b: ArpaBuffer.Buffer] =
    BEGIN
    body: ArpaBuffer.Body;    
    CleanupClientProcess: ENTRY PROC =
      BEGIN
      SELECT TRUE FROM
        (xmtr.clientProcess = NIL) => NULL;  --multiple clients in BufferMgr??!!
	(connection.state = suspended) =>  --we're suspended, we did the abort.
	{Process.CancelAbort[xmtr.clientProcess]; xmtr.clientProcess ← NIL};
	ENDCASE => xmtr.clientProcess ← NIL; 
      END;  --CleanupClientProcess
      
    IF (connection.state = suspended) THEN GOTO returnSuspended;
      xmtr.clientProcess ← Process.GetCurrent[];  --capture client process
      
    BEGIN
    <<
    Don't let greedy client use all the send buffers, else we won't be
    able to probe for allocation if we need to... Doing a Pause is slow,
    but at this point, the remote is so slow anyway, anything we do is in
    the noise.
    >>
    ENABLE
      BEGIN
      UNWIND => xmtr.clientProcess ← NIL;
      ABORTED => IF connection.state = suspended THEN GOTO suspended;
      END;
    WHILE connection.pool.sendInUse = (PRED[connection.pool.send]) DO
      Process.Pause[CommUtil.PulsesToTicks[[rexmtr.interval]]]; ENDLOOP;
    b ← ArpaBuffer.GetBuffer[
      connection.pool, send, TRUE, connection.family.maxBufferSize];
    EXITS suspended => {xmtr.clientProcess ← NIL; GOTO returnSuspended};
    END;  --protected code.
    
    CleanupClientProcess[];
    IF connection.state = suspended THEN
      {ArpaBuffer.ReturnBuffer[b]; GOTO returnSuspended};
      
    --now that we (finally) have a buffer, set defaults for fields.
    body ← b.arpa;
    body.tcp.urg ← body.tcp.psh ← body.tcp.rst ← body.tcp.syn ←
      body.tcp.fin ← FALSE;
    body.tcp.ack ← TRUE;
    body.tcp.urgentPointer ← LOOPHOLE[0];
    b.requeueProcedure ← RequeueProc;  --put buffer in retransmit list.
    EXITS
      returnSuspended =>
        RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended];
    END;  --GetOutputBuffer
    
    
  GetSystemBuffer: PROC[setAck: BOOLEAN] RETURNS [b: ArpaBuffer.Buffer] =
    BEGIN
    --gets small buffer for sending packets that will not be retransmitted.
    body: ArpaBuffer.Body;
    n: NATURAL = ArpaPort.minIPHeaderBytes + minTcpHeaderBytes;
    body ← (b ← ArpaBuffer.GetBuffer[connection.pool, send, TRUE, n]).arpa;
    body.tcp.urg ← body.tcp.psh ← body.tcp.rst ← body.tcp.syn ←
      body.tcp.fin ← FALSE;
    body.tcp.ack ← setAck;
    body.tcp.urgentPointer ← LOOPHOLE[0];
    body.tcp.sequence ← CardToSeq[xmtr.nextSeq];
    END;  --GetSystemBuffer
    
    
  InitRcvr: ENTRY PROC [sequence: ArpaTypes.Cardinal32] =
    BEGIN
    rcvr.irs ← SeqToCard[sequence];
    rcvr.lastConsumed ← rcvr.push ←
      rcvr.urg ← rcvr.fin ← rcvr.irs;  --just to get things started.
    rcvr.nextSeq ← SUCC[rcvr.irs];
    rcvr.maxSeq ← rcvr.nextSeq + PRED[rcvr.inputSpace.size];
    END;  --InitRcvr
    
  
  InitXmtr: PROC =
    BEGIN
    
    InitXmtrLocked: ENTRY PROC =
      BEGIN
      xmtr.iss ← System.GetClockPulses[];  
      xmtr.nextSeq ← SUCC[xmtr.iss];
      xmtr.unackedSeq ← xmtr.iss;
      xmtr.maxSeq ← xmtr.unackedSeq + xmtr.maxAlloc;
      END;  --InitXmtrLocked
    
    InitXmtrLocked[];
    rexmtr.process ← FORK Retransmitter[];
    END;  --InitXmtr

 
  InsertRexmt: PUBLIC ENTRY PROC [b: ArpaBuffer.Buffer] =
    <<
    PROCESS: RETRANSMITTER
    Puts the buffer in the retransmission list.  The list is ordered.
    >>
    BEGIN
    body: ArpaBuffer.Body ← b.arpa;
    prev, bList: ArpaBuffer.Buffer ← NIL;
    dataLen: CARDINAL ← GetDataLength[body].l;
    
    --has this one already been acked?  Syns and fins may carry no data,
    --but still must be put in the retransmission list.
    SELECT dataLen FROM
      0 =>
        IF Greater[xmtr.unackedSeq, SeqToCard[body.tcp.sequence]] THEN
	  {ArpaBuffer.ReturnBuffer[b]; RETURN};
      ENDCASE =>
        IF (Greater[xmtr.unackedSeq, SeqToCard[body.tcp.sequence] + 
          dataLen - 1]) THEN {ArpaBuffer.ReturnBuffer[b]; RETURN};
      
    b.fo.next ← NIL;
    rexmtr.list.length ← SUCC[rexmtr.list.length];
    IF (rexmtr.list.head = NIL) THEN {rexmtr.list.head ← b}
    ELSE
      FOR bList ← rexmtr.list.head, ArpaBuffer.From[bList.fo.next]
        UNTIL bList = NIL DO
	SELECT TRUE FROM 
	  (Less[SeqToCard[body.tcp.sequence],
	    SeqToCard[bList.arpa.tcp.sequence]]) =>
	    BEGIN
	    IF prev = NIL THEN
	      BEGIN
	      b.fo.next ← ArpaBuffer.To[rexmtr.list.head];
	      rexmtr.list.head ← b;
	      END
	    ELSE
	      BEGIN
	      b.fo.next ← prev.fo.next;
	      prev.fo.next ← ArpaBuffer.To[b];
	      END;
	    EXIT;
	    END;
	  (body.tcp.sequence = bList.arpa.tcp.sequence) =>
	    Driver.Glitch[TrashInRexmtr];
	  ENDCASE;
	prev ← bList;
	REPEAT FINISHED =>
	  BEGIN
	  b.fo.next ← prev.fo.next;
	  prev.fo.next ← ArpaBuffer.To[b];
	  END;
	ENDLOOP;
    END;  --InsertRexmt
    
    
  New: INTERNAL PROC [
    link: TcpStreamInternal.DataPair, startSeq, endSeq: LONG CARDINAL]
    RETURNS [new: TcpStreamInternal.DataPair] =
    BEGIN
    <<
    Try to allocate a DataObject from the available list. If that fails,
    then allocate one from the heap. Once we get running, they should all
    come from the the avail list rather then the heap. Initially it is
    expected that both the inuse and avail lists are empty.
    >>
    IF rcvr.avail.length # 0 THEN
      BEGIN
      new ← rcvr.avail.head;
      rcvr.avail.head ← new.next;
      rcvr.avail.length ← PRED[rcvr.avail.length];
      END
    ELSE new ← CommHeap.zone.NEW[TcpStreamInternal.PairObject];
    new↑ ← [startSeq, endSeq, NIL];
    --IF sanityChecking THEN ListSanityCheck[];
    IF link = NIL THEN {new.next ← rcvr.inuse.head; rcvr.inuse.head ← new}
    ELSE {new.next ← link.next; link.next ← new};
    rcvr.inuse.length ← SUCC[rcvr.inuse.length];
    --IF sanityChecking THEN ListSanityCheck[];
    END;  --New
      
      
  CheckForData: ENTRY PROC [body: ArpaBuffer.Body] RETURNS [BOOLEAN] =
    BEGIN
    <<
    Puts the data into the input space and updates the list of data pointers
    into that space.  Also does fin processing.
    >>
      
    CheckForClose: INTERNAL PROC =
      BEGIN
      <<
      If we have received a fin, check if we can consume it and modify the
      state of the close handshake.
      >>
      SELECT TRUE FROM
	(~rcvr.finSig) => NULL;  --no fin received
	(rcvr.fin = rcvr.nextSeq) =>
	  BEGIN
	  --we have all the data before it.
	  rcvr.nextSeq ← SUCC[rcvr.fin];  --consume fin's sequence number.
	  SELECT connection.state FROM
	    established => connection.state ← closeWait;
	    finWait1 => connection.state ← closing;
	    finWait2 =>
	      BEGIN
	      xmtr.timeWaitStart ← System.GetClockPulses[];
	      connection.state ← timeWait;
	      END;
	    ENDCASE;
	  NOTIFY rcvr.newInput;
	  END;
	(Less[rcvr.fin, PRED[rcvr.nextSeq]]) => Driver.Glitch[DataAfterFin];
	ENDCASE;  --fin was out of sequence - wait for other data to arrive.
      END;  --CheckForClose
    
    --start of CheckForData
    dataLen: CARDINAL;
    data: LONG POINTER;
    startSeq, endSeq: LONG CARDINAL;  --first and last seq of new data.
    spaceAvail: LONG CARDINAL =
      rcvr.inputSpace.size - (rcvr.nextSeq - 1 - rcvr.lastConsumed);
    [dataLen, data] ← GetDataLength[body];
    IF dataLen = 0 THEN {CheckForClose[]; RETURN[FALSE]};
    --empty packet may carry fin.
    
    --set initial values.
    startSeq ← SeqToCard[body.tcp.sequence];
    endSeq ← startSeq + dataLen - 1;
    
    <<
    We assume size is greater than the greatest dataLen.  The amount of data
    we will move is the maximum of the amount we want to move and the amount
    we have space to move.
    >>
      
    startSeq ← Max[startSeq, rcvr.nextSeq];
    dataLen ← LowHalf[endSeq - startSeq + 1];  --adjust for new start.
    dataLen ← LowHalf[Min[dataLen, spaceAvail]];  --again for space avail
    endSeq ← startSeq + dataLen - 1;  --and use the result to set new endpoint.
    
    IF dataLen = 0 THEN RETURN[FALSE];  --no data, no more to do
    --ack will be generated when client consumes at least 1/2 of data.
    IF dataLen >= spaceAvail THEN rcvr.remoteBlocked ← TRUE;  
    InsertData[body, data, startSeq, endSeq]; 
    CheckForClose[];
    RETURN[TRUE]; 
    END;  --CheckForData
    
    
  InsertData: INTERNAL PROC [
    body: ArpaBuffer.Body, data: LONG POINTER,
    startSeq, endSeq: LONG CARDINAL] =
    BEGIN
    <<
    This is very fragile code.  You mess with it without knowing exactly what
    is going on, you deserve whatever you get!  It's also probably more
    complex than it needs to be - IP reassembly algorithms are simpler, and may
    be usable here.
    >>
       
    moved, dataLen: CARDINAL;
    start, end: CARDINAL;  --space relative.
    p, q, prev, tmp: TcpStreamInternal.DataPair ← NIL;
    bStart: CARDINAL;  --starting point of data in the buffer to be moved.
    
    --set the ends to reflect the data we can actually move.
    dataLen ← LowHalf[endSeq - startSeq] + 1;
    start ← LowHalf[startSeq MOD rcvr.inputSpace.size];
    end ← start + dataLen - 1;
    
    --Record the starting point in the buffer of the data to be moved.
    bStart ← LowHalf[startSeq - SeqToCard[body.tcp.sequence]];
    
    IF end >= rcvr.inputSpace.size THEN  --is data going to wrap?
      BEGIN
      moved ← ByteBlt.ByteBlt[
        to: [rcvr.inputSpace.space.pointer, start, rcvr.inputSpace.size],
	from: [data, bStart, rcvr.inputSpace.size]];
      moved ← moved + ByteBlt.ByteBlt[
        to: [rcvr.inputSpace.space.pointer, 0, dataLen - moved],
	from: [data, bStart + moved, rcvr.inputSpace.size]];
      END
    ELSE
      moved ← ByteBlt.ByteBlt[
        to: [rcvr.inputSpace.space.pointer, start, SUCC[end]],
        from: [data, bStart, rcvr.inputSpace.size]];
      
    --Update the sequence number list.
    FOR p ← rcvr.inuse.head, p.next UNTIL p = NIL DO  --look for startSeq.
      SELECT TRUE FROM
        --is the starting seq within (or right on either end of) this node?
	(GreaterOrEqual[startSeq, p.start-1] AND
	  LessOrEqual[startSeq, p.end+1]) =>
	  {p.end ← Max[endSeq, p.end]; p.start ← Min[startSeq, p.start]; EXIT};
	  
	--is starting seq going to create a disjoint node before this node?
	(Less[startSeq, p.start]) =>
	  {p ← New[prev, startSeq, endSeq]; EXIT};
	  
	ENDCASE;  -- (startSeq > p.end) means keep searching
      prev ← p;
      REPEAT FINISHED =>
        --must be disjoint node at the end of the list.
	p ← New[prev, startSeq, endSeq];
      ENDLOOP;
      
    --now compact the list so that there are no adjacent or overlapping seqments.
    FOR q ← p.next, q.next UNTIL q = NIL DO
      IF Less[endSeq, q.start-1] THEN EXIT;
      p.end ← Max[p.end, q.end];
      --IF sanityChecking THEN ListSanityCheck[];
      tmp ← q; p.next ← q.next; q ← p;  --pull element from middle
      rcvr.inuse.length ← PRED[rcvr.inuse.length];
      tmp.next ← rcvr.avail.head; rcvr.avail.head ← tmp;  --put in avail list
      rcvr.avail.length ← SUCC[rcvr.avail.length];
      --IF sanityChecking THEN ListSanityCheck[];
      ENDLOOP;
      
    IF GreaterOrEqual[rcvr.nextSeq, rcvr.inuse.head.start] AND
      LessOrEqual[rcvr.nextSeq, rcvr.inuse.head.end] THEN
	{rcvr.nextSeq ← SUCC[rcvr.inuse.head.end]; NOTIFY rcvr.newInput};
    END;  --InsertData
    

  ProcessOptions: PROC [body: ArpaBuffer.Body] =
    BEGIN
    m: TcpStreamInternal.MaxSegment = LOOPHOLE[@body.tcp.options];
    --**the option type should be a constant in TcpStreamInternal.
    IF m.type = 2 THEN xmtr.maxTcpBytes ← m.maxSize;
    END;  --ProcessOptions
    
    
  ProcessTcpState: PUBLIC PROC [body: ArpaBuffer.Body] =
  --PROCESS: RECEIVER
    BEGIN
    ack: LONG CARDINAL ← SeqToCard[body.tcp.acknowledgement];
    
    IF Greater[ack, xmtr.unackedSeq] THEN  --process the acknowledgement.
      BEGIN  --process the ack and do the necessary state changes.
      SELECT connection.state FROM
        synReceived, synSent =>
          IF Greater[ack, xmtr.iss] THEN  --notify any waiters
	    {connection.state ← established; Notify[@connection.isEstablished]};
	finWait1 =>
	  IF (ack = xmtr.nextSeq) AND (~body.tcp.fin) THEN
	    connection.state ← finWait2;
	closing =>
	  IF (ack = xmtr.nextSeq) THEN
	    {xmtr.timeWaitStart ← System.GetClockPulses[];
	    connection.state ← timeWait};
	lastAck => IF (ack = xmtr.nextSeq) THEN connection.state ← closed;
	ENDCASE;
      xmtr.unackedSeq ← ack;  --This is the only place unackedSeq gets updated.
      ConsumeAcked[];
      END;
    
    xmtr.maxSeq ← xmtr.unackedSeq + body.tcp.window; 
    --offered window minus outstanding data is usable window.
    IF (body.tcp.window - (xmtr.nextSeq - xmtr.unackedSeq)) >
      body.tcp.window / 4 THEN
      BEGIN
      IF rexmtr.list.head # NIL THEN Notify[@rexmtr.condition]
      ELSE Notify[@xmtr.newAllocation];
      END;
    END;  --ProcessTcpState
    
    
  RcvdTcpPkt: PROC [b: ArpaBuffer.Buffer] =
    <<
    PROCESS: RECEIVER
    Parses incoming packets, no mean feat.  There is some redundancy in ConnReq
    and ConnRep that should be looked at.
    >>
    BEGIN
    --so clients don't lose data when causing unwind after Close.
    ENABLE UNWIND =>
      {IF CheckForData[b.arpa] THEN SendAck[]; ArpaBuffer.ReturnBuffer[b]};
    body: ArpaBuffer.Body = b.arpa;
    len: CARDINAL ← GetDataLength[body].l;
    
    SELECT TRUE FROM
      connection.state = closed,             --client aborted by calling delete
      connection.state = suspended => NULL;  --or stream has been suspended.
      body.tcp.syn =>  --syn packet
        BEGIN
	IF ArpaFlags.doStats THEN ArpaStats.Incr[synsRcvd];
	SELECT connection.state FROM
	  listen => ConnRequest[b];
	  synSent => ConnReply[b];
	  ENDCASE =>  --should not get a syn in already synchronized states.
	    BEGIN
	    IF ~ValidSeq[body, len] THEN SendAck[]
	    ELSE
	      {SendRst[b]; SuspendStream[reset, remoteReject];
	      IF ArpaFlags.doStats THEN ArpaStats.Incr[badSyn]};
	    END;
	END;
      (body.tcp.sourcePort # connection.remotePort) =>
        --Stray packet arrived on a listening conn (remotePort not yet estab.)
	{SendRst[b];
	IF ArpaFlags.doStats THEN ArpaStats.Incr[badSourcePort]};
      ~ValidSeq[body, len] =>
	{SELECT TRUE FROM
	  body.tcp.rst => NULL;  --stray reset.
	  (connection.state = listen) => SendRst[b];  --stray packet (not syn).
	  (connection.state = synSent),
	  (connection.state = synReceived) =>  --stray packet, reset sender.
	    IF ~ValidAck[b] THEN SendRst[b];
	  (len = 0) AND (~body.tcp.fin) => NULL;  --don't ack acks.
	  (connection.state = timeWait) =>
	    {SendAck[]; xmtr.timeWaitStart ← System.GetClockPulses[]};
	  ENDCASE => SendAck[];
	IF ArpaFlags.doStats THEN ArpaStats.Incr[seqOutOfRange]};
      body.tcp.rst =>  --valid seq with rst.
	{SuspendStream[reset, remoteReject];
	IF ArpaFlags.doStats THEN ArpaStats.Incr[resetsReceived]};
      ~SecurityMatch[body] =>
        {SendRst[b];
	SELECT connection.state FROM
	  established, finWait1, finWait2, closeWait, closing, lastAck,
	    timeWait => SuspendStream[securityMismatch, securityMismatch];
	  ENDCASE;  --closed, listen, synSent, synReceived;
	IF ArpaFlags.doStats THEN ArpaStats.Incr[badSecurity]};
      ~PrecedenceMatch[body] =>
	{SendRst[b];
	SELECT connection.state FROM
	  established, finWait1, finWait2, closeWait, closing, lastAck,
	    timeWait => SuspendStream[precedenceMismatch, precedenceMismatch];
	  ENDCASE;  --closed, listen, synSent, synReceived;
	IF ArpaFlags.doStats THEN ArpaStats.Incr[badPrecedence]};
      ~body.tcp.ack =>  --every packet except original syn should carry ack.
	IF ArpaFlags.doStats THEN ArpaStats.Incr[badAck];
      body.tcp.fin =>
	SELECT connection.state FROM
	  established,
	  finWait1,
	  finWait2 => 
	    BEGIN
	    rcvr.finSig ← TRUE;
	    rcvr.fin ← SeqToCard[body.tcp.sequence] + len;
	    [] ← CheckForData[body];
	    SendAck[];
	    END;
	  ENDCASE =>
	    IF ArpaFlags.doStats THEN ArpaStats.Incr[badFin];  --**Reset here?
	    
      body.tcp.urg =>  --mark urgent pointer, earlier urgents are superceded
        BEGIN
	rcvr.urg ← Max[
	  rcvr.urg, SeqToCard[body.tcp.sequence] +
	  LOOPHOLE[body.tcp.urgentPointer, CARDINAL]];
	IF ~rcvr.urgSig THEN  --multiple urgents are merged.
          {rcvr.urgSig ← TRUE; Notify[@rcvr.urgArrived]};
	[] ← CheckForData[body];
	SendAck[];
	END;
	
      body.tcp.psh =>  --mark pushed data, earlier pushes are superceded.
        BEGIN
	lastSeq: LONG CARDINAL ← SeqToCard[body.tcp.sequence];
	IF len # 0 THEN lastSeq ← lastSeq + len - 1;
	rcvr.push ← Max[lastSeq, rcvr.push];
	rcvr.pushSig ← TRUE;
        [] ← CheckForData[body];
	SendAck[];
        END;
      ENDCASE =>  --ah, the vanilla data packet!
	IF CheckForData[body] THEN SendAck[];
	
    ArpaBuffer.ReturnBuffer[b];  --This is where receive buffers get returned.
    END;  --RcvdTcpPkt
    
    
  Receiver: PUBLIC PROC RETURNS [TcpOps.PSProc] =
  --PROCESS:  THIS IS THE RECEIVER
    BEGIN
    b: ArpaBuffer.Buffer;
    body: ArpaBuffer.Body;
    Process.SetPriority[CommPriorities.receiver];
    UNTIL connection.pleaseStop DO
      ENABLE ABORTED => EXIT;
      --get next packet from socket queue
      body ← (b ← ArpaPort.GetPacket[connection.port]).arpa;
      --is it worth it?
      
      SELECT TRUE FROM
        (body.ipHeader.protocol = tcp) =>
	  BEGIN
	  IF ArpaFlags.doStats THEN ArpaStats.Incr[tcpsRcvd];
	  SELECT TRUE FROM
	    (body.ipHeader.length < TcpStreamInternal.minTcpHeaderBytes +
	      ArpaPort.minIPHeaderBytes) =>
	      {IF ArpaFlags.doStats THEN ArpaStats.Incr[pktTooShort];
	      ArpaBuffer.ReturnBuffer[b]};
	    (connection.remoteAddr = ArpaRouter.unknownInternetAddress) =>
	      RcvdTcpPkt[b];  --this could be the first syn.
	    ArpaPortInternal.AddrMismatch[
	      ArpaPortInternal.BuildMasks[body.ipHeader.destination].hostMask,
	      body.ipHeader.source, connection.remoteAddr] =>
	      {IF ArpaFlags.doStats THEN ArpaStats.Incr[badSource];
	      ArpaBuffer.ReturnBuffer[b]};
	    ENDCASE => RcvdTcpPkt[b];
	  END;  --protocol = tcp
	(body.ipHeader.protocol = icmp) =>
	  BEGIN 
	  SELECT body.icmp.type FROM
	    unreachable =>
	      {SuspendStream[noRouteToDestination, noRouteToDestination];
	      IF ArpaFlags.doStats THEN ArpaStats.Incr[icmpUnreachable]};
	    ENDCASE => NULL;
	  ArpaBuffer.ReturnBuffer[b];
	  END;
	ENDCASE =>
	  BEGIN
          IF ArpaFlags.doStats THEN ArpaStats.Incr[badProtocol];
	  --****icmp error packet here?
	  ArpaBuffer.ReturnBuffer[b];
	  END;
      
      ENDLOOP;
    RETURN[LOOPHOLE[Receiver]];
    END;  --Receiver
    
    
  RequeueProc: PUBLIC PROC [b: ArpaBuffer.Buffer] =
    <<
    PROCESS: BUFFER MANAGER
    Puts the buffer in the retransmission list.
    >>
    BEGIN
    SELECT TRUE FROM
      --ARP should find this for us on next try.
      (b.fo.status = invalidDestAddr) => {InsertRexmt[b]; ConsumeAcked[]};
      (b.fo.status ~IN[pending..aborted]) =>
	BEGIN
	SuspendStream[noRouteToDestination, SELECT b.fo.status FROM
	  noTranslationForDestination => noTranslationForDestination,
	  noAnswerOrBusy => noAnswerOrBusy,
	  circuitInUse => circuitInUse,
	  circuitNotReady => circuitNotReady,
	  dialerHardwareProblem => circuitNotReady,
	  noDialingHardware => noDialingHardware,
	  --noRouteToNetwork, hardwareProblem => noRouteToDestination,
	  ENDCASE => noRouteToDestination];
	ArpaBuffer.ReturnBuffer[b];  --don't keep this one - we're dead
	END;
      --put it on the retransmission list.
      ENDCASE => {InsertRexmt[b]; ConsumeAcked[]}; --poke the rexmt list cleaner.
    END;  --RequeueProc
    
      
  Retransmitter: PUBLIC PROC RETURNS [TcpOps.PSProc] =
  --PROCESS: THIS IS THE RETRANSMITTER
    BEGIN
    time: LONG CARDINAL;
    b: ArpaBuffer.Buffer;
    
    ExtractHead: ENTRY PROC RETURNS [BOOLEAN ← TRUE] =
      BEGIN
      --stops access to the head of the list (which is being rexmittd).
      IF rexmtr.list.head = NIL THEN RETURN[FALSE];  --make sure it's still there.
      rexmtr.list.head ← ArpaBuffer.From[rexmtr.list.head.fo.next];
      rexmtr.list.length ← PRED[rexmtr.list.length];
      END;  --ExtractHead

    UNTIL connection.pleaseStop DO
      ENABLE ABORTED => EXIT;
      BEGIN
      time ← System.GetClockPulses[];
      
      SELECT TRUE FROM
	(connection.state = suspended) => GOTO suspended;
	
	--anything to retransmit?
	((b ← rexmtr.list.head) # NIL) =>
	  SELECT TRUE FROM
	    ((time - b.fo.time) < MIN[(rexmtr.interval * CARDINAL[b.fo.tries]),
	      rexmtr.ceiling]) => NULL; --too soon
	    ((b.fo.tries ← SUCC[b.fo.tries]) > rexmtr.giveUp) => GOTO suspend;
	    ENDCASE =>  --data retransmission.
	      IF ExtractHead[] THEN --ConsumeAcked may try to return it.
		BEGIN
		b.fo.time ← System.GetClockPulses[];  --record packet send time
		SendPacket[b];  --retransmit the packet
		IF ArpaFlags.doStats THEN ArpaStats.Incr[retransmitted];
		END;
	  <<
	  Remote has closed window or we have missed his window update - is
	  it time to probe for allocation?
	  >>
	  (Greater[xmtr.nextSeq, xmtr.maxSeq]) AND
	  (System.GetClockPulses[] - xmtr.lastSent) > (rexmtr.interval * 2) =>
	    SendAllocProbe[];
	 ENDCASE;
	
      --time to recompute retransmission interval?
      IF ((time - rexmtr.calculation) > rexmtr.calculationInterval) THEN
	BEGIN
	rexmtr.calculation ← time;  --reset start of interval
	--retransmission interval recalculation
	IF (rexmtr.count # 0) THEN
	  BEGIN
	  --recalculate retransmission interval
	  --REUSES LOCAL VARIABLE 'time'
	  time ← rexmtr.delay / rexmtr.count;
	  time ← time + (time / 2);  --don't get too close
	  rexmtr.delay ← 0; rexmtr.count ← 0;
	  --weight old interval more than new (3 to 1)
	  time ← ((rexmtr.interval * 3) + time) / 4;
	  --interval is limited both low and high
	  rexmtr.interval ← MAX[rexmtr.floor, MIN[rexmtr.ceiling, time]];
	  END;
	END;  --recompute interval
	
      EXITS
	suspended => NULL;
	suspend => SuspendStream[transmissionTimeout, timeout];
      END;  --transmitter not blocked
	
      Wait[@rexmtr.condition];
      ENDLOOP;
    RETURN[LOOPHOLE[Retransmitter]];
    END;  --Retransmitter 
    
    
  SendAck: PROC [] =
    BEGIN 
    b: ArpaBuffer.Buffer;
    --why send an empty ack when we have something real to send?
    IF rexmtr.list.head # NIL THEN {Notify[@rexmtr.condition]; RETURN};
    b ← GetSystemBuffer[TRUE];  --he sets b.arpa.tcp.sequence
    SetHeaderFields[b, 0, 0];
    SendPacket[b];
    --**IF ArpaFlags.doStats THEN ArpaStats.Incr[acksSent];
    END;  --SendAck
    
  SendAllocProbe: ENTRY PROC =
    BEGIN
    <<
    Sends a packet probing the remote to open his allocation window for us.
    This is the only case where a data packet is sent using a buffer obtained
    from GetSystemBuffer instead of GetOutputBuffer, as we want to always be
    able to obtain a buffer for a probe, and we don't want it put in the
    rexmtr table.
    >> 
       
    b: ArpaBuffer.Buffer = GetSystemBuffer[TRUE];
    body: ArpaBuffer.Body = b.arpa;
    blocked: ArpaBuffer.Body = xmtr.blocked.b.arpa;
    body.tcp.sequence ← blocked.tcp.sequence;
    SELECT TRUE FROM
      (xmtr.blocked.dataLen > 0) =>
        BEGIN
	body.tcp.psh ← TRUE;  --maybe this will make him answer.
	body.tcp.bytes[0] ← blocked.tcp.bytes[0];
        SetHeaderFields[b, 1, 0];
	END;
      (blocked.tcp.fin) =>
        BEGIN
	body.tcp.fin ← TRUE;
	SetHeaderFields[b, 0, 0];
	END;
      ENDCASE =>  --heaven knows what this is.
        BEGIN
	ArpaBuffer.ReturnBuffer[b];
	RETURN;
	END;
    
    SendPacket[b];
    END;  --SendAllocProbe
    
    
  SendPacket: PUBLIC PROC [b: ArpaBuffer.Buffer] =
    BEGIN
    <<
    PROCESS: RETRANSMITTER, CLIENT SENDING
    Sets the current state information, and transmits.  (Or retransmits.)
    >>
    --never send a packet without current state information
    body: ArpaBuffer.Body = b.arpa;
    body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq];
    body.tcp.window ← LowHalf[rcvr.maxSeq - rcvr.nextSeq] + 1;
    ArpaPort.PutPacket[connection.port, b];
    xmtr.lastSent ← System.GetClockPulses[];  --this is sufficient for any ack.
    END;  --SendPacket
    
    
  SendRst: PUBLIC PROC [offender: ArpaBuffer.Buffer] =
    BEGIN
    <<
    Sends a reset packet in response to the offending packet.  If
    offender is nil, then the reset is a user-initiated abort.
    >>
    off: ArpaBuffer.Body;
    b: ArpaBuffer.Buffer ← GetSystemBuffer[TRUE];
    body: ArpaBuffer.Body = b.arpa;
    
    AssignSeq: ENTRY PROC =
      {body.tcp.sequence ← CardToSeq[xmtr.nextSeq]};
      
    SetHeaderFields[b, 0, 0];
    body.tcp.rst ← TRUE;  --must be why we're here
    body.tcp.window ← 0;  --This is probably silly.
    SELECT TRUE FROM
      (offender = NIL) =>
        {AssignSeq[]; body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq]};
      ((off ← offender.arpa).tcp.ack) =>
	{body.tcp.sequence ← off.tcp.acknowledgement; body.tcp.ack ← FALSE}
      ENDCASE =>
	BEGIN
	ack: LONG CARDINAL ←
	  SeqToCard[off.tcp.sequence] + TcpStreamInternal.GetDataLength[off].l;
	body.tcp.sequence ← [0, 0];
	body.tcp.acknowledgement ← CardToSeq[ack];
	END;
    ArpaPort.PutPacket[connection.port, b];
    IF ArpaFlags.doStats THEN ArpaStats.Incr[resetsSent];
    END;  --SendRst
    
    
  SendSyn: PUBLIC PROC [ack: BOOLEAN] =
    BEGIN 
    <<
    Send a syn packet, either connection request or response.  The only
    difference is the acknowledge fields and the maxSize option.
    >>
    optionLen: CARDINAL ← 0;
    b: ArpaBuffer.Buffer = GetOutputBuffer[];  --sets ack field by default.
    body: ArpaBuffer.Body = b.arpa;
    IF ack THEN body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq]
    ELSE body.tcp.ack ← FALSE;
    IF advertiseSize THEN
      BEGIN
      --ends on a 32-bit boundry, so no zero padding.
      n: CARDINAL ← (ArpaBuffer.DataBytesPerRawBuffer[b]) -
        ArpaPort.minIPHeaderBytes - TcpStreamInternal.minTcpHeaderBytes - 18;
		-- 18 is ethernet encap.
      m: TcpStreamInternal.MaxSegment ← LOOPHOLE[@body.tcp.options];
      m↑ ← [2, 4, n];  --and what the hell is this?
      optionLen ← SIZE[TcpStreamInternal.MaxSegmentObj] * bpw;
      rcvr.maxTcpBytes ← IF connection.offNet THEN MIN[n, TcpStreamInternal.maxTcpDataBytes] ELSE n;
      END;
    body.tcp.syn ← TRUE;  --this is why we're here
    SetHeaderFields[b, 0, optionLen];
    body.tcp.sequence ← CardToSeq[xmtr.iss];
    body.tcp.window ← rcvr.inputSpace.size;
    ArpaPort.PutPacket[connection.port, b];
    IF ArpaFlags.doStats THEN ArpaStats.Incr[synsSent];
    END;  --SendSyn

  SetWaitTime: ENTRY PROC [timeout: TcpStream.WaitTime] =
    BEGIN  --no possibility of signals
    --time IN[maxMsecToPulses..INFINITY] => silly client NEVER wake up!
    rcvr.waitTime ← timeout;
    rcvr.interval ← IF timeout > maxMsecToPulses THEN LAST[LONG CARDINAL]
      ELSE System.MicrosecondsToPulses[timeout*1000];
    IF rcvr.interval = LAST[LONG CARDINAL] THEN
      Process.DisableTimeout[@rcvr.newInput]
    ELSE Process.SetTimeout[
      @rcvr.newInput, CommUtil.PulsesToTicks[[rcvr.interval]]];
    END;  --SetWaitTime

  StartTcpStream: PUBLIC TcpOps.StartTcpStreamProc =
    <<
    PROC[
      local, remote: ArpaRouter.InternetAddress,
      localPort, remotePort: ArpaRouter.Port,
      timeout: TcpStream.WaitTime, precedence: TcpStream.Precedence,
      security: TcpStream.Security, options: Environment.Block,
      establish: BOOLEAN, gf: LONG POINTER --TO FRAME[TcpImpl]--] RETURNS [
      TcpStream.Handle, ArpaRouter.InternetAddress, ArpaRouter.Port];
      Process is the client's.
    >>
    BEGIN
    time: LONG CARDINAL ← System.GetClockPulses[];
    
    --Initialize stream object.
    tcpStreamObject ← [
      destroy: NIL,  --done from the outside(Mgr)
      put: PutBlock,
      get: GetBlock,
      waitForUrgent: WaitForUrgent,
      close: Close,
      setWaitTime: SetWaitTime,
      findAddresses: FindAddresses];
      
    InitStream[
      local: local, remote: remote, localPort: localPort,
      remotePort: remotePort, timeout: timeout, precedence: precedence,
      security: security, options: options];
     
    output.b ← NIL;
    output.bufferSize ← 0;
    output.finger ← 0; 
    SetWaitTime[timeout];
    
    --and now create the stream (or listener).
    time ← System.GetClockPulses[];
    SELECT TRUE FROM
    
      (connection.state = closed) AND establish  =>  --active create
        BEGIN
        rcvr.process ← FORK Receiver[];
	InitXmtr[];
	connection.state ← synSent;
	SendSyn[ack: FALSE];
	DO  --UNTIL failed | established.
	  Wait[@connection.isEstablished];
	  SELECT TRUE FROM
	    (connection.state = suspended) =>
	      ERROR TcpStream.Failed[connection.whyFailed];
	    (connection.state = established),
	    (connection.state = closeWait),
	    (connection.state = closing),
	    (connection.state = lastAck),
	    (connection.state = timeWait) => EXIT;  --stream is (or was) estab.
	    --streams in listen, synSent, and synReceived can still fail.
	    ((System.GetClockPulses[] - time) > rcvr.interval) =>
	      BEGIN
	      time ← System.GetClockPulses[];
	      ERROR TcpStream.Failed[timeout];
	      END;
	    ENDCASE;
	  ENDLOOP;
	END;
	
      connection.state = closed =>    --create listener
        BEGIN
        connection.state ← listen;
        rcvr.process ← FORK Receiver[];
	IF notifyListenerStarted # NIL THEN notifyListenerStarted[];
	DO  --UNTIL failed | established.
	  Wait[@connection.isEstablished];
	  SELECT TRUE FROM
	    (connection.state = suspended) =>
	      ERROR TcpStream.Failed[connection.whyFailed];
	    (connection.state = established),
	    (connection.state = closeWait),
	    (connection.state = closing),
	    (connection.state = lastAck),
	    (connection.state = timeWait) => EXIT;  --stream is (or was) estab.
	    --streams in listen, synSent, and synReceived can still fail.
	    ((System.GetClockPulses[] - time) > rcvr.interval) =>
	      BEGIN
	      time ← System.GetClockPulses[];
	      SIGNAL TcpStream.ListenTimeout;
	      END;
	    ENDCASE;
	  ENDLOOP;
	END;
      ENDCASE => Driver.Glitch[IllegalState];  --How did we get here?!

    RETURN[@tcpStreamObject, connection.remoteAddr, connection.remotePort];
    END;  --StartTcpStream
    
  StopTcpStream: PUBLIC TcpOps.StopTcpStreamProc =
    --PROCESS: CLIENT STREAM DELETION.
    BEGIN
    --allowed for last ack to get to remote.
    timeWaitInterval: LONG CARDINAL ← rcvr.ackInterval * 2;
       
    TimeWaitLoop: PROC =
      BEGIN
      UNTIL (System.GetClockPulses[] - xmtr.timeWaitStart) > timeWaitInterval DO
        Process.Pause[1];
	IF (connection.state = suspended) THEN EXIT;  --got a reset and suspended.
      ENDLOOP;
      END;  --TimeWaitLoop
      
    --Give the transmitter a kick in case he is waiting on allocation.
    Notify[@xmtr.newAllocation];
    
    --try to let close protocol complete.
    SELECT connection.state FROM
      closed => NULL;
      lastAck =>
        WHILE (xmtr.nextSeq # xmtr.unackedSeq) AND --until we get that last ack.
          (connection.state # suspended) DO  --or we get a reset and suspend.
          Process.Pause[2];
	  ENDLOOP;
      closing =>
        BEGIN
        WHILE (xmtr.nextSeq # xmtr.unackedSeq) AND  --until we get that last ack
	  (connection.state # suspended) DO  --or we get a reset and suspend.
	  Process.Pause[2];
	  ENDLOOP;
	--timer is reset if a retransmitted fin is received.
        xmtr.timeWaitStart ← System.GetClockPulses[];
        connection.state ← timeWait;
	TimeWaitLoop[];
	END;
      timeWait => TimeWaitLoop[];
      suspended, listen => NULL;
      ENDCASE => SendRst[NIL];

    connection.pleaseStop ← TRUE;
    IF rcvr.process # NIL THEN
      {Process.Abort[rcvr.process]; [] ← JOIN rcvr.process;
      rcvr.process ← NIL};
    IF rexmtr.process # NIL THEN
      {Process.Abort[rexmtr.process]; [] ← JOIN rexmtr.process;
      rexmtr.process ← NIL};
    connection.state ← closed;
    FlushDataList[@rcvr.inuse];
    FlushDataList[@rcvr.avail];
    FlushRexmtrList[];
    IF output.b # NIL THEN ArpaBuffer.ReturnBuffer[output.b];
    [] ← Space.Unmap[rcvr.inputSpace.space.pointer];
    TcpPort.DeleteTcpPort[connection.port];
    END;  --StopTcpStream
    
    
  SuspendStream: PUBLIC ENTRY PROC [
    suspend: TcpStream.SuspendReason, fail: TcpStream.FailureReason] =
    BEGIN
    --PROCESS: RECEIVER OR RETRANSMITTER
    SELECT connection.state FROM
      suspended => NULL;
      ENDCASE =>
        BEGIN
	connection.stateBeforeSuspension ← connection.state;
	connection.state ← suspended;
	connection.whySuspended ← suspend;
	connection.whyFailed ← fail;
	END;
    <<
    Notify the internal processes, so they will loop again and discover the
    stream has been suspended.
    >>
    NOTIFY rcvr.newInput;
    NOTIFY xmtr.newAllocation;
    <<
    Abort any client waiting for a buffer.  This will be translated into
    Suspended by the GetBuffer code.
    >>
    IF xmtr.clientProcess # NIL THEN Process.Abort[xmtr.clientProcess];
    END;  --SuspendStream

    
  PutBlock: PUBLIC PROC [block: Environment.Block, push, urgent: BOOLEAN] =
  --PROCESS: CLIENT SENDING
    BEGIN
    
    --monitored so ForceOut can check for output data pending.
    AssignBuffer: ENTRY PROC = INLINE {output.b ← b};
      
    moved, n: CARDINAL ← 0;
    oBlock: Environment.Block;
    b: ArpaBuffer.Buffer ← NIL;
    --a push with no new data.
    IF (block.startIndex = block.stopIndexPlusOne) AND (push OR urgent) THEN
      {IF (output.b # NIL) THEN  --is there any old data to be pushed?
      ForceOut[setPush: TRUE, setFin: FALSE, setUrg: TRUE]}
    ELSE
      WHILE block.startIndex < block.stopIndexPlusOne DO
        IF output.b = NIL THEN  --need a new buffer.
	  BEGIN
          b ← GetOutputBuffer[];
	  n ← (ArpaBuffer.DataBytesPerRawBuffer[b]) -
	    ArpaPort.minIPHeaderBytes - TcpStreamInternal.minTcpHeaderBytes - 18;
	    	-- 18 ethernet encap
	  AssignBuffer[];
	  output.finger ← 0;
	  output.bufferSize ← MIN[IF connection.offNet THEN MIN[TcpStreamInternal.maxTcpDataBytes, xmtr.maxTcpBytes] ELSE xmtr.maxTcpBytes, n];
	  END;
	oBlock ← [
	  blockPointer: LOOPHOLE[@output.b.arpa.tcp.bytes],
	  startIndex: output.finger, stopIndexPlusOne: output.bufferSize];
	moved ← ByteBlt.ByteBlt[oBlock, block];
	block.startIndex ← block.startIndex + moved;
	--Is packet full?
	IF (output.finger ← output.finger + moved) = output.bufferSize THEN
	  --Push bit may get set on last packet, and this may be last packet (if
	  --packet and client data end at the same time).
	  ForceOut[
	    setPush: (block.startIndex >= block.stopIndexPlusOne) AND push,
	    setFin: FALSE, setUrg: urgent];
	REPEAT
	  FINISHED =>  --pushed data indicates push on last packet.
	    IF push AND (output.b # NIL) THEN
	      ForceOut[setPush: TRUE, setFin: FALSE, setUrg: urgent];
        ENDLOOP;
    
    END;  --PutBlock
    
    
  UpdateInputSpace: INTERNAL PROC [consumed: CARDINAL]
    RETURNS [needsAck: BOOLEAN ← FALSE] =
    BEGIN
    <<
    Updates the head of list based on how much data the client just consumed.
    >>
    tmp: TcpStreamInternal.DataPair;
    rcvr.inuse.head.start ← rcvr.inuse.head.start + consumed;
    rcvr.lastConsumed ← PRED[rcvr.inuse.head.start];
    
    IF TcpStreamInternal.Greater[rcvr.inuse.head.start, rcvr.inuse.head.end] THEN
      BEGIN
      <<
      Client consumed all the available (contiguous) data. So pull the
      DataPair out of the list and put it in the avail list.
      >>
      --IF sanityChecking THEN ListSanityCheck[];
      tmp ← rcvr.inuse.head; rcvr.inuse.head ← tmp.next;
      rcvr.inuse.length ← PRED[rcvr.inuse.length];

      tmp.next ← rcvr.avail.head; rcvr.avail.head ← tmp;
      rcvr.avail.length ← SUCC[rcvr.avail.length];
      --IF sanityChecking THEN ListSanityCheck[];
      END
    ELSE
      <<
      rcvr.inputSpace.size => total space available
      CARDINAL[rcvr.nextSeq - rcvr.lastConsumed] => amount of space in use
      So, if he was blocked and we now have more than half of the data space
      available, prod him for some more data.
      This assumes that the input space size is at least twice as large as
      the packets being used.
      >>
      IF rcvr.remoteBlocked AND
        CARDINAL[
	  rcvr.nextSeq - rcvr.lastConsumed] < (rcvr.inputSpace.size / 2) THEN
	 {rcvr.remoteBlocked ← FALSE; needsAck ← TRUE};
    END;  --UpdateInputSpace
    
    
  ValidAck: PROC [b: ArpaBuffer.Buffer] RETURNS [BOOLEAN] =
    BEGIN
    IF GreaterOrEqual[SeqToCard[b.arpa.tcp.acknowledgement], xmtr.iss] AND        
      LessOrEqual[SeqToCard[b.arpa.tcp.acknowledgement], xmtr.nextSeq] THEN
      RETURN[TRUE]
    ELSE RETURN[FALSE];
    END;  --ValidAck
    
  
  ValidSeq: PROC [body: ArpaBuffer.Body, len: CARDINAL]
    RETURNS [accept: BOOLEAN ← FALSE] =
    BEGIN
    inSeq: LONG CARDINAL ← SeqToCard[body.tcp.sequence];
    SELECT len FROM
      > 0 =>  --most common case of carrying at least one data byte.
	BEGIN
	lastByte: LONG CARDINAL ← inSeq + len - 1;
	SELECT TRUE FROM
	  inSeq IN [rcvr.nextSeq..rcvr.maxSeq],  --first byte in window.
	  lastByte IN [rcvr.nextSeq..rcvr.maxSeq],  --last byte in window.
	  Greater[rcvr.nextSeq, inSeq] AND
	  Less[rcvr.maxSeq, lastByte] =>  --some middle bytes in window.
	    {ProcessTcpState[body]; accept ← TRUE};
	  ENDCASE;
	END;
      0 =>  --zero bytes of data, but need to check for state.
	IF Greater[rcvr.nextSeq, rcvr.maxSeq] THEN
	   --window closed, seq must equal receiver's next.
	   {IF (inSeq = rcvr.nextSeq) THEN
	      ProcessTcpState[body];  --can't accept, but process state anyway.
	   }
	ELSE --window not closed.
	   IF inSeq IN [rcvr.nextSeq..rcvr.maxSeq] THEN
	      {ProcessTcpState[body]; accept ← TRUE};
      ENDCASE;  --length < 0?!
    END;  --ValidSeq
    
    
  WaitForUrgent: ENTRY PROC [block: Environment.Block] =
    BEGIN ENABLE UNWIND => NULL;
    WAIT rcvr.urgArrived;
    END;  --WaitForUrgent
  
  END...
  
LOG

15-May-85 12:43:27  SMA  Created file.
 6-Jan-86 11:58:28  SMA  More twiddles to CheckForData and InsertData 
 7-Jan-86 10:35:02  SMA  Maximum packet size from 100 to 576.
23-Mar-86 16:19:40  SMA  Months and months of debugging.
14-May-86 11:27:27  SMA  ENABLE UNWIND in WaitForUrgent.
16-May-86  8:33:42  SMA  Problems with out-of-seq one byte packets.
16-May-86  8:35:13  SMA  Urgent passed to ForceOut
19-May-86 12:52:36  SMA  Check incoming urgent before push.
 4-Jun-86 11:49:15  SMA  Set urgent pointer correctly.
 5-Jun-86 12:45:33  SMA  Fix push processing for empty packets.
13-Jun-86 11:10:22  SMA  Check for suspended (from reset) in StopTcpStream.
18-Jun-86 14:28:24  SMA  Reset b.time on retransmission.
19-Jun-86 14:49:08  SMA  Integer arithmetic when calsulating usable window.
20-Jun-86 16:17:27  SMA  Don't consume fin prematurely.
20-Jun-86 17:13:29  SMA  Receiver more generous with acks.
 ?                  ALD/ISI Fix ceiling calc. in rexmtr, ICMP bug, etc.
 8-Jul-86 14:42:02  JAV  Fixed loop when pushed recieved in GetBlock
17-Dec-86 12:19:33  SMA  Ack not generated on every GetBlock (use remoteBlocked).
 7-Jan-87 15:27:53  SMA  Initialize "stream markers" to initial receive seq. #.
 2-Feb-87 11:42:15  SMA  Added probing for allocation code. 
 9-Feb-87 19:35:06  SMA  Receiver less generous with acks (sigh).
11-Mar-87 12:17:37  AOF  Funston buffer management, et al.
 2-Jun-87 10:55:52  AOF  Caching of DataObject entries.
 2-Jun-87 10:56:10  AOF  Recoding of time to send ack computation.
 3-Mar-88 13:47:16  AOF  Use global variable for start/stop procs.