-- File: TcpImpl.mesa - last edit: -- AOF 3-Mar-88 14:05:23 -- JAV 19-Nov-87 13:59:40 -- sma 9-Feb-87 19:35:36 -- Copyright (C) 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved. DIRECTORY ArpaBuffer USING [ AccessHandle, Body, Buffer, DataBytesPerRawBuffer, GetBuffer, ReturnBuffer, To, From], ArpaFlags USING [doStats], ArpaPortInternal USING [AddrMismatch, BuildMasks, GetSubnetMask], ArpaPort USING [GetPacket, Handle, minIPHeaderBytes, PutPacket], ArpaRouter USING [InternetAddress, GetAddress, unknownInternetAddress, Port], ArpaStats USING [Incr], ArpaTypes USING [Cardinal32, InternetAddress, Port], ByteBlt USING [ByteBlt], CommHeap USING [zone], CommPriorities USING [receiver], CommUtil USING [PulsesToTicks], Driver USING [Glitch], Environment USING [Block, Byte, bytesPerWord], Mopcodes USING [zEXCH], Process USING [ Abort, CancelAbort, DisableTimeout, GetCurrent, Pause, SetPriority, SetTimeout], Space USING [Interval, Unmap], System USING [GetClockPulses, MicrosecondsToPulses], TcpOps USING [PSProc, PSProcess, StartTcpStreamProc, StopTcpStreamProc], TcpPort USING [DeleteTcpPort, SpecifyRemote], TcpStream USING [Closed, CompletionCode, Failed, FailureReason, ListenTimeout, Object, Precedence, Security, Suspended, SuspendReason, WaitTime], TcpStreamInternal USING [Connection, DataPair, DataList, GetDataLength, Greater, GreaterOrEqual, Less, LessOrEqual, LowHalf, Max, MaxSegment, MaxSegmentObj, maxTcpDataBytes, Min, minTcpHeaderBytes, PairObject, PrecedenceMatch, Rcvr, SecurityMatch, State, Xmtr, Rexmtr, InitStream, SetHeaderFields]; TcpImpl: MONITOR IMPORTS ArpaBuffer, ArpaPortInternal, ArpaPort, ArpaRouter, ArpaStats, ByteBlt, CommHeap, CommUtil, Driver, Process, Space, System, TcpPort, TcpStream, TcpStreamInternal EXPORTS TcpOps, ArpaRouter = BEGIN OPEN TcpStreamInternal; --debugging the @#$%~&* receive list. sanityChecking: BOOLEAN = TRUE; bpw: NATURAL = Environment.bytesPerWord; start: PUBLIC TcpOps.StartTcpStreamProc ← StartTcpStream; stop: PUBLIC TcpOps.StopTcpStreamProc ← StopTcpStream; --for calculations on 32 bit sequence numbers SeqToCard: PROC [ArpaTypes.Cardinal32] RETURNS [LONG CARDINAL] = MACHINE CODE {Mopcodes.zEXCH}; CardToSeq: PROC [LONG CARDINAL] RETURNS [ArpaTypes.Cardinal32] = MACHINE CODE {Mopcodes.zEXCH}; <<ListSanityCheck: INTERNAL PROC = BEGIN --Compares the length of the list to the number of nodes in hopes of --glitching just after the code that made it bad. SELECT rcvr.inuse.length FROM 0 => IF rcvr.inuse.head = NIL THEN RETURN ELSE Driver.Glitch[ListGarbled]; ENDCASE => BEGIN p: TcpStreamInternal.DataPair ← NIL; n: CARDINAL ← 0; FOR p ← rcvr.inuse.head, p.next UNTIL p = NIL DO n ← n + 1; ENDLOOP; IF n # rcvr.inuse.length THEN Driver.Glitch[ListGarbled]; END; END; --ListSanityCheck >> Port: PUBLIC --ArpaRouter-- TYPE = ArpaTypes.Port; InternetAddress: PUBLIC --ArpaRouter-- TYPE = ArpaTypes.InternetAddress; maxMsecToPulses: LONG CARDINAL = LAST[LONG CARDINAL] / 1000; --max sender can have outstanding. advertiseSize: BOOLEAN = TRUE; --send max size option on conn request. xmtr: PUBLIC TcpStreamInternal.Xmtr; rcvr: PUBLIC TcpStreamInternal.Rcvr; rexmtr: PUBLIC TcpStreamInternal.Rexmtr; connection: PUBLIC TcpStreamInternal.Connection; --for managing blocks of client output. output: PUBLIC RECORD [ b: ArpaBuffer.Buffer, --current output buffer in use (or NIL). finger: CARDINAL, --bytes consumed from b. bufferSize: CARDINAL]; --max bytes permitted in output. tcpStreamObject: TcpStream.Object; --Glitches for debugging, some may be eventually subject to a "doDebug" switch. IllegalState: ERROR = CODE; TrashInRexmtr: ERROR = CODE; DataAfterFin: ERROR = CODE; ListGarbled: ERROR = CODE; PushNotCleared: ERROR = CODE; --Used by procs that notify|wait without state checking required Notify: ENTRY PROC [c: LONG POINTER TO CONDITION] = {NOTIFY c↑}; Wait: ENTRY PROC [c: LONG POINTER TO CONDITION] = {ENABLE UNWIND => NULL; WAIT c↑}; --PROCEDURES Close: PUBLIC PROC = BEGIN ENABLE UNWIND => NULL; SELECT connection.state FROM established => {ForceOut[setPush: FALSE, setFin: TRUE, setUrg: FALSE]; connection.state ← finWait1}; closeWait => {ForceOut[setPush: FALSE, setFin: TRUE, setUrg: FALSE]; connection.state ← lastAck}; suspended => RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended]; finWait1, finWait2, closing, lastAck, timeWait => ERROR TcpStream.Closed; ENDCASE => --closed, listen, synSent, synReceived. Driver.Glitch[IllegalState]; --How did we get so far? END; --Close ConnReply: PROC [b: ArpaBuffer.Buffer] = << PROCESS: RECEIVER In synSent state, if the incoming packet is a SYN with an ACK to the SYN that we sent, advance to established state. >> BEGIN body: ArpaBuffer.Body = b.arpa; SELECT TRUE FROM (~body.tcp.ack) => RETURN; --reply syn must always carry ack --Must be a stray - he can't ack something that we haven't even sent. LessOrEqual[SeqToCard[body.tcp.acknowledgement], xmtr.iss], Greater[SeqToCard[body.tcp.acknowledgement], xmtr.nextSeq] => {IF ~body.tcp.rst THEN SendRst[b]; RETURN}; --duplicate Less[SeqToCard[body.tcp.acknowledgement], xmtr.unackedSeq] => RETURN; ENDCASE; --carries an acceptable ack - check for other error conditions. SELECT TRUE FROM body.tcp.rst => ERROR TcpStream.Failed[remoteReject]; ~SecurityMatch[body] => {IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedSecurityMismatch]; SendRst[b]; RETURN}; ~PrecedenceMatch[body] => {IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedPrecedencedMismatch]; SendRst[b]; RETURN}; (body.tcp.dataOffset > 5) => ProcessOptions[body]; ENDCASE; --only good guys get here. InitRcvr[body.tcp.sequence]; ProcessTcpState[body]; [] ← CheckForData[body]; --may have data for client. SendAck[]; -- send a SYNACK END; --ConnReply ConnRequest: PUBLIC PROC [b: ArpaBuffer.Buffer] = BEGIN << PROCESS: RECEIVER In listen state, if the incoming packet is a syn, advance to synReceived state and send a syn/ack. Also process precedence, security and options. arbitration. >> body: ArpaBuffer.Body = b.arpa; SELECT TRUE FROM (body.tcp.rst) => IF ArpaFlags.doStats THEN ArpaStats.Incr[droppedBadRst]; (body.tcp.ack) => {IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedBadAck]; SendRst[b]}; (body.tcp.syn) => BEGIN SELECT TRUE FROM ~SecurityMatch[body] => BEGIN IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedSecurityMismatch]; SendRst[b]; END; ~PrecedenceMatch[body] => BEGIN IF ArpaFlags.doStats THEN ArpaStats.Incr[rejectedPrecedencedMismatch]; SendRst[b]; END; ENDCASE => --OK, we like this one. BEGIN mySubnetMask: ArpaRouter.InternetAddress ← ArpaPortInternal.GetSubnetMask[]; myInternetAddress: ArpaRouter.InternetAddress ← ArpaRouter.GetAddress[]; connection.remoteAddr ← body.ipHeader.source; connection.remotePort ← body.tcp.sourcePort; IF ArpaPortInternal.AddrMismatch[ IF ArpaPortInternal.GetSubnetMask[] # ArpaRouter.unknownInternetAddress THEN ArpaPortInternal.GetSubnetMask[] ELSE ArpaPortInternal.BuildMasks[myInternetAddress].netMask, myInternetAddress, body.ipHeader.source] THEN connection.offNet ← TRUE ELSE connection.offNet ← FALSE; TcpPort.SpecifyRemote[connection.port, connection.remoteAddr, connection.remotePort]; IF body.tcp.dataOffset > 5 THEN ProcessOptions[body]; InitXmtr[]; InitRcvr[body.tcp.sequence]; connection.state ← synReceived; SendSyn[ack: TRUE]; --send a syn in reply IF CheckForData[body] THEN SendAck[]; --may have data for client. END; END; --tcp.syn ENDCASE => IF ArpaFlags.doStats THEN ArpaStats.Incr[droppedJunkRequest]; END; --ConnRequest ConsumeAcked: PUBLIC ENTRY PROC = << PROCESS: RETRANSMITTER Cleans house on the retransmission list. No possibility of signals. >> BEGIN UNTIL rexmtr.list.head = NIL DO b: ArpaBuffer.Buffer ← rexmtr.list.head; body: ArpaBuffer.Body = b.arpa; dataLen: CARDINAL ← GetDataLength[body].l; IF (dataLen = 0) AND ~body.tcp.fin AND ~body.tcp.syn THEN Driver.Glitch[TrashInRexmtr]; --how did an empty buffer get in here? --fins and syn consume a sequence number. IF body.tcp.fin OR body.tcp.syn THEN dataLen ← SUCC[dataLen]; IF (LessOrEqual[xmtr.unackedSeq, SeqToCard[body.tcp.sequence] + PRED[dataLen]]) THEN EXIT; --not acked. << This buffer has been acked, take it out of the list and collect data for retransmission interval calculation. Delay is time since buffer was first sent. >> rexmtr.delay ← rexmtr.delay + (System.GetClockPulses[] - b.fo.time); << Why is the "(CARDINAL[b.fo.tries - 1] * rexmtr.interval)" factor in here? I'm probably missing something. AOF 2-Jun-87 17:31:39 rexmtr.delay ← rexmtr.delay + (CARDINAL[b.fo.tries - 1] * rexmtr.interval) + (System.GetClockPulses[] - b.fo.time); >> rexmtr.count ← SUCC[rexmtr.count]; --and the number of participants rexmtr.list.head ← ArpaBuffer.From[b.fo.next]; rexmtr.list.length ← PRED[rexmtr.list.length]; ArpaBuffer.ReturnBuffer[b]; ENDLOOP; IF rexmtr.list.length > 0 THEN NOTIFY rexmtr.condition --on to next ELSE NOTIFY xmtr.newAllocation; END; --ConsumeAcked FindAddresses: PROC RETURNS [ localAddr, remoteAddr: InternetAddress, localPort, remotePort: ArpaRouter.Port] = {RETURN[connection.localAddr, connection.remoteAddr, connection.localPort, connection.remotePort]}; --FindAddresses FlushDataList: PROC[list: LONG POINTER TO TcpStreamInternal.DataList] = BEGIN --Frees list of sequence number pairs. UNTIL list.head = NIL DO this: TcpStreamInternal.DataPair ← list.head; --pick up list head list.head ← this.next; --record next entry in list head CommHeap.zone.FREE[@this]; --free this enry ENDLOOP; END; --FlushDataList FlushRexmtrList: PROC = BEGIN --Returns all buffers on the rexmtr list. UNTIL rexmtr.list.head = NIL DO b: ArpaBuffer.Buffer ← rexmtr.list.head; --pick first element off rexmtr.list.head ← ArpaBuffer.From[b.fo.next]; --copy link to head ArpaBuffer.ReturnBuffer[b]; --free this buffer rexmtr.list.length ← PRED[rexmtr.list.length]; --decrement the count ENDLOOP; END; --FlushRexmtrList ForceOut: PROC [setPush, setFin, setUrg: BOOLEAN] = BEGIN << PROCESS: CLIENT SENDING Assigns the sequence number and then either sends the packet or waits for allocation from remote end. >> b: ArpaBuffer.Buffer; dataLen: CARDINAL ← 0; CopyOutBuffer: ENTRY PROC = INLINE BEGIN b ← output.b; output.b ← NIL; dataLen ← output.finger; output.finger ← 0; END; --CopyOutBuffer AssignSeqAndAssertSend: ENTRY PROC = BEGIN ENABLE UNWIND => NULL; --assign the next sequence number to this buffer. sequenceNumber: LONG CARDINAL ← xmtr.nextSeq; b.arpa.tcp.sequence ← CardToSeq[xmtr.nextSeq]; b.arpa.tcp.fin ← setFin; DO --EXITs (success) or ERRORs (failure) SELECT connection.state FROM established, closeWait => BEGIN IF LessOrEqual[sequenceNumber, xmtr.maxSeq] THEN EXIT; << Copy b so that the allocation probing code in the rexmtr can get a hold of the oldest byte of data. >> xmtr.blocked ← [b, dataLen]; WAIT xmtr.newAllocation; --wait for something to happen xmtr.blocked ← [NIL, 0]; --clear state info END; suspended => RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended]; finWait1, finWait2, closing, lastAck, timeWait => RETURN WITH ERROR TcpStream.Closed; --no send after closing. --listen, synSent, synReceived ENDCASE => Driver.Glitch[IllegalState]; --How did we get so far? ENDLOOP; --This is the only proc that updates xmtr.nextSeq. xmtr.nextSeq ← xmtr.nextSeq + dataLen; --and fins consume a sequence number. IF (b.arpa.tcp.fin ← setFin) THEN xmtr.nextSeq ← SUCC[xmtr.nextSeq]; END; --AssignSeqAndAssertSend --start of procedure ForceOut. CopyOutBuffer[]; --get the state copied out safely IF b = NIL THEN b ← GetOutputBuffer[]; AssignSeqAndAssertSend[! UNWIND => xmtr.blocked ← [NIL, 0]]; SetHeaderFields[b, dataLen, 0]; b.arpa.tcp.psh ← setPush; --copy client's request IF setUrg THEN --dataLen should always be > 0, so PRED[dataLen] is safe. {b.arpa.tcp.urg ← TRUE; b.arpa.tcp.urgentPointer ← LOOPHOLE[PRED[dataLen]]}; SendPacket[b]; --on its way out END; --ForceOut GetBlock: PUBLIC PROC[block: Environment.Block] RETURNS [byteCount: CARDINAL, completionCode: TcpStream.CompletionCode] = --PROCESS: CLIENT RECEIVING BEGIN ENABLE UNWIND => NULL; moved: CARDINAL; --amount transferred. remoteClosed: BOOLEAN ← FALSE; --implies a push of data. start, end: CARDINAL; --starting and ending bytes of data, space relative. count: CARDINAL; --minimum of requested and available bytes. ack, out: BOOLEAN; Locked: ENTRY PROC RETURNS[exit, needsAck: BOOLEAN ← FALSE] = BEGIN ENABLE UNWIND => NULL; SELECT TRUE FROM --did we find a push previous time thru loop? (completionCode = pushed) => {exit ← TRUE; RETURN}; --yes --is there data pending? (rcvr.inuse.head # NIL) AND (rcvr.inuse.head.start = SUCC[rcvr.lastConsumed]) => --yes BEGIN rcvrStart: LONG CARDINAL = rcvr.inuse.head.start; rcvrStartMinus1: LONG CARDINAL = PRED[rcvrStart]; start ← LowHalf[rcvrStart MOD rcvr.inputSpace.size]; count ← LowHalf[Min[ --min of data available & data wanted. rcvr.inuse.head.end - rcvrStartMinus1, block.stopIndexPlusOne - block.startIndex]]; --is the pending data within client request AND pushed? SELECT TRUE FROM ~rcvr.pushSig => NULL; --no push GreaterOrEqual[rcvr.push, rcvrStart] => IF LessOrEqual[rcvr.push, rcvrStartMinus1 + count] THEN BEGIN <<count ← LowHalf[Min[count, rcvr.push - rcvrStartMinus1]];>> completionCode ← pushed; rcvr.pushSig ← FALSE; END; ENDCASE => Driver.Glitch[PushNotCleared]; --is there an urgent within client requested data? SELECT TRUE FROM ~rcvr.urgSig => NULL; --no urgent GreaterOrEqual[rcvr.urg, rcvrStart] => IF LessOrEqual[rcvr.urg, rcvrStartMinus1 + count] THEN BEGIN count ← LowHalf[Min[count, rcvr.urg - rcvrStartMinus1]]; completionCode ← endUrgent; --even if it is also pushed. rcvr.urgSig ← FALSE; END; ENDCASE; end ← start + count - 1; IF end >= rcvr.inputSpace.size THEN --retrieving wrapped data? BEGIN moved ← ByteBlt.ByteBlt[block, [rcvr.inputSpace.space.pointer, start, rcvr.inputSpace.size]]; moved ← moved + ByteBlt.ByteBlt[ [block.blockPointer, block.startIndex + moved, block.stopIndexPlusOne], [rcvr.inputSpace.space.pointer, 0, count - moved]] END ELSE moved ← ByteBlt.ByteBlt[block, [rcvr.inputSpace.space.pointer, start, SUCC[end]]]; block.startIndex ← block.startIndex + moved; byteCount ← byteCount + moved; needsAck ← UpdateInputSpace[moved]; rcvr.maxSeq ← rcvr.nextSeq + rcvr.inputSpace.size - (rcvr.nextSeq - rcvr.lastConsumed); END; --connection is closing, implying a push at the end of the data. (connection.state = closed), (connection.state = closeWait), (connection.state = closing), (connection.state = lastAck), (connection.state = timeWait) => {completionCode ← closing; exit ← TRUE}; rcvr.waitTime = 0 => {completionCode ← timeout; exit ← TRUE}; (System.GetClockPulses[] - rcvr.timeout) > rcvr.interval => {completionCode ← timeout; exit ← TRUE}; --Wait for the rest of data. (connection.state = established), (connection.state = finWait1), (connection.state = finWait2) => IF (rcvr.inuse.head = NIL) OR (rcvr.inuse.head.start # SUCC[rcvr.lastConsumed]) THEN WAIT rcvr.newInput; (connection.state = suspended) => RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended]; ENDCASE => --listen, synSent, synReceived. Driver.Glitch[IllegalState]; --How on earth did we get this far? END; --Locked --start of GetBlock. byteCount ← 0; completionCode ← normal; rcvr.timeout ← System.GetClockPulses[]; WHILE block.startIndex < block.stopIndexPlusOne DO [out, ack] ← Locked[]; --can't call SendAck while holding monitor. IF ack THEN SendAck[]; --but we can right afterwards IF out THEN EXIT; --and maybe we can get out of here too ENDLOOP; END; --GetBlock GetOutputBuffer: PUBLIC PROC RETURNS [b: ArpaBuffer.Buffer] = BEGIN body: ArpaBuffer.Body; CleanupClientProcess: ENTRY PROC = BEGIN SELECT TRUE FROM (xmtr.clientProcess = NIL) => NULL; --multiple clients in BufferMgr??!! (connection.state = suspended) => --we're suspended, we did the abort. {Process.CancelAbort[xmtr.clientProcess]; xmtr.clientProcess ← NIL}; ENDCASE => xmtr.clientProcess ← NIL; END; --CleanupClientProcess IF (connection.state = suspended) THEN GOTO returnSuspended; xmtr.clientProcess ← Process.GetCurrent[]; --capture client process BEGIN << Don't let greedy client use all the send buffers, else we won't be able to probe for allocation if we need to... Doing a Pause is slow, but at this point, the remote is so slow anyway, anything we do is in the noise. >> ENABLE BEGIN UNWIND => xmtr.clientProcess ← NIL; ABORTED => IF connection.state = suspended THEN GOTO suspended; END; WHILE connection.pool.sendInUse = (PRED[connection.pool.send]) DO Process.Pause[CommUtil.PulsesToTicks[[rexmtr.interval]]]; ENDLOOP; b ← ArpaBuffer.GetBuffer[ connection.pool, send, TRUE, connection.family.maxBufferSize]; EXITS suspended => {xmtr.clientProcess ← NIL; GOTO returnSuspended}; END; --protected code. CleanupClientProcess[]; IF connection.state = suspended THEN {ArpaBuffer.ReturnBuffer[b]; GOTO returnSuspended}; --now that we (finally) have a buffer, set defaults for fields. body ← b.arpa; body.tcp.urg ← body.tcp.psh ← body.tcp.rst ← body.tcp.syn ← body.tcp.fin ← FALSE; body.tcp.ack ← TRUE; body.tcp.urgentPointer ← LOOPHOLE[0]; b.requeueProcedure ← RequeueProc; --put buffer in retransmit list. EXITS returnSuspended => RETURN WITH ERROR TcpStream.Suspended[connection.whySuspended]; END; --GetOutputBuffer GetSystemBuffer: PROC[setAck: BOOLEAN] RETURNS [b: ArpaBuffer.Buffer] = BEGIN --gets small buffer for sending packets that will not be retransmitted. body: ArpaBuffer.Body; n: NATURAL = ArpaPort.minIPHeaderBytes + minTcpHeaderBytes; body ← (b ← ArpaBuffer.GetBuffer[connection.pool, send, TRUE, n]).arpa; body.tcp.urg ← body.tcp.psh ← body.tcp.rst ← body.tcp.syn ← body.tcp.fin ← FALSE; body.tcp.ack ← setAck; body.tcp.urgentPointer ← LOOPHOLE[0]; body.tcp.sequence ← CardToSeq[xmtr.nextSeq]; END; --GetSystemBuffer InitRcvr: ENTRY PROC [sequence: ArpaTypes.Cardinal32] = BEGIN rcvr.irs ← SeqToCard[sequence]; rcvr.lastConsumed ← rcvr.push ← rcvr.urg ← rcvr.fin ← rcvr.irs; --just to get things started. rcvr.nextSeq ← SUCC[rcvr.irs]; rcvr.maxSeq ← rcvr.nextSeq + PRED[rcvr.inputSpace.size]; END; --InitRcvr InitXmtr: PROC = BEGIN InitXmtrLocked: ENTRY PROC = BEGIN xmtr.iss ← System.GetClockPulses[]; xmtr.nextSeq ← SUCC[xmtr.iss]; xmtr.unackedSeq ← xmtr.iss; xmtr.maxSeq ← xmtr.unackedSeq + xmtr.maxAlloc; END; --InitXmtrLocked InitXmtrLocked[]; rexmtr.process ← FORK Retransmitter[]; END; --InitXmtr InsertRexmt: PUBLIC ENTRY PROC [b: ArpaBuffer.Buffer] = << PROCESS: RETRANSMITTER Puts the buffer in the retransmission list. The list is ordered. >> BEGIN body: ArpaBuffer.Body ← b.arpa; prev, bList: ArpaBuffer.Buffer ← NIL; dataLen: CARDINAL ← GetDataLength[body].l; --has this one already been acked? Syns and fins may carry no data, --but still must be put in the retransmission list. SELECT dataLen FROM 0 => IF Greater[xmtr.unackedSeq, SeqToCard[body.tcp.sequence]] THEN {ArpaBuffer.ReturnBuffer[b]; RETURN}; ENDCASE => IF (Greater[xmtr.unackedSeq, SeqToCard[body.tcp.sequence] + dataLen - 1]) THEN {ArpaBuffer.ReturnBuffer[b]; RETURN}; b.fo.next ← NIL; rexmtr.list.length ← SUCC[rexmtr.list.length]; IF (rexmtr.list.head = NIL) THEN {rexmtr.list.head ← b} ELSE FOR bList ← rexmtr.list.head, ArpaBuffer.From[bList.fo.next] UNTIL bList = NIL DO SELECT TRUE FROM (Less[SeqToCard[body.tcp.sequence], SeqToCard[bList.arpa.tcp.sequence]]) => BEGIN IF prev = NIL THEN BEGIN b.fo.next ← ArpaBuffer.To[rexmtr.list.head]; rexmtr.list.head ← b; END ELSE BEGIN b.fo.next ← prev.fo.next; prev.fo.next ← ArpaBuffer.To[b]; END; EXIT; END; (body.tcp.sequence = bList.arpa.tcp.sequence) => Driver.Glitch[TrashInRexmtr]; ENDCASE; prev ← bList; REPEAT FINISHED => BEGIN b.fo.next ← prev.fo.next; prev.fo.next ← ArpaBuffer.To[b]; END; ENDLOOP; END; --InsertRexmt New: INTERNAL PROC [ link: TcpStreamInternal.DataPair, startSeq, endSeq: LONG CARDINAL] RETURNS [new: TcpStreamInternal.DataPair] = BEGIN << Try to allocate a DataObject from the available list. If that fails, then allocate one from the heap. Once we get running, they should all come from the the avail list rather then the heap. Initially it is expected that both the inuse and avail lists are empty. >> IF rcvr.avail.length # 0 THEN BEGIN new ← rcvr.avail.head; rcvr.avail.head ← new.next; rcvr.avail.length ← PRED[rcvr.avail.length]; END ELSE new ← CommHeap.zone.NEW[TcpStreamInternal.PairObject]; new↑ ← [startSeq, endSeq, NIL]; --IF sanityChecking THEN ListSanityCheck[]; IF link = NIL THEN {new.next ← rcvr.inuse.head; rcvr.inuse.head ← new} ELSE {new.next ← link.next; link.next ← new}; rcvr.inuse.length ← SUCC[rcvr.inuse.length]; --IF sanityChecking THEN ListSanityCheck[]; END; --New CheckForData: ENTRY PROC [body: ArpaBuffer.Body] RETURNS [BOOLEAN] = BEGIN << Puts the data into the input space and updates the list of data pointers into that space. Also does fin processing. >> CheckForClose: INTERNAL PROC = BEGIN << If we have received a fin, check if we can consume it and modify the state of the close handshake. >> SELECT TRUE FROM (~rcvr.finSig) => NULL; --no fin received (rcvr.fin = rcvr.nextSeq) => BEGIN --we have all the data before it. rcvr.nextSeq ← SUCC[rcvr.fin]; --consume fin's sequence number. SELECT connection.state FROM established => connection.state ← closeWait; finWait1 => connection.state ← closing; finWait2 => BEGIN xmtr.timeWaitStart ← System.GetClockPulses[]; connection.state ← timeWait; END; ENDCASE; NOTIFY rcvr.newInput; END; (Less[rcvr.fin, PRED[rcvr.nextSeq]]) => Driver.Glitch[DataAfterFin]; ENDCASE; --fin was out of sequence - wait for other data to arrive. END; --CheckForClose --start of CheckForData dataLen: CARDINAL; data: LONG POINTER; startSeq, endSeq: LONG CARDINAL; --first and last seq of new data. spaceAvail: LONG CARDINAL = rcvr.inputSpace.size - (rcvr.nextSeq - 1 - rcvr.lastConsumed); [dataLen, data] ← GetDataLength[body]; IF dataLen = 0 THEN {CheckForClose[]; RETURN[FALSE]}; --empty packet may carry fin. --set initial values. startSeq ← SeqToCard[body.tcp.sequence]; endSeq ← startSeq + dataLen - 1; << We assume size is greater than the greatest dataLen. The amount of data we will move is the maximum of the amount we want to move and the amount we have space to move. >> startSeq ← Max[startSeq, rcvr.nextSeq]; dataLen ← LowHalf[endSeq - startSeq + 1]; --adjust for new start. dataLen ← LowHalf[Min[dataLen, spaceAvail]]; --again for space avail endSeq ← startSeq + dataLen - 1; --and use the result to set new endpoint. IF dataLen = 0 THEN RETURN[FALSE]; --no data, no more to do --ack will be generated when client consumes at least 1/2 of data. IF dataLen >= spaceAvail THEN rcvr.remoteBlocked ← TRUE; InsertData[body, data, startSeq, endSeq]; CheckForClose[]; RETURN[TRUE]; END; --CheckForData InsertData: INTERNAL PROC [ body: ArpaBuffer.Body, data: LONG POINTER, startSeq, endSeq: LONG CARDINAL] = BEGIN << This is very fragile code. You mess with it without knowing exactly what is going on, you deserve whatever you get! It's also probably more complex than it needs to be - IP reassembly algorithms are simpler, and may be usable here. >> moved, dataLen: CARDINAL; start, end: CARDINAL; --space relative. p, q, prev, tmp: TcpStreamInternal.DataPair ← NIL; bStart: CARDINAL; --starting point of data in the buffer to be moved. --set the ends to reflect the data we can actually move. dataLen ← LowHalf[endSeq - startSeq] + 1; start ← LowHalf[startSeq MOD rcvr.inputSpace.size]; end ← start + dataLen - 1; --Record the starting point in the buffer of the data to be moved. bStart ← LowHalf[startSeq - SeqToCard[body.tcp.sequence]]; IF end >= rcvr.inputSpace.size THEN --is data going to wrap? BEGIN moved ← ByteBlt.ByteBlt[ to: [rcvr.inputSpace.space.pointer, start, rcvr.inputSpace.size], from: [data, bStart, rcvr.inputSpace.size]]; moved ← moved + ByteBlt.ByteBlt[ to: [rcvr.inputSpace.space.pointer, 0, dataLen - moved], from: [data, bStart + moved, rcvr.inputSpace.size]]; END ELSE moved ← ByteBlt.ByteBlt[ to: [rcvr.inputSpace.space.pointer, start, SUCC[end]], from: [data, bStart, rcvr.inputSpace.size]]; --Update the sequence number list. FOR p ← rcvr.inuse.head, p.next UNTIL p = NIL DO --look for startSeq. SELECT TRUE FROM --is the starting seq within (or right on either end of) this node? (GreaterOrEqual[startSeq, p.start-1] AND LessOrEqual[startSeq, p.end+1]) => {p.end ← Max[endSeq, p.end]; p.start ← Min[startSeq, p.start]; EXIT}; --is starting seq going to create a disjoint node before this node? (Less[startSeq, p.start]) => {p ← New[prev, startSeq, endSeq]; EXIT}; ENDCASE; -- (startSeq > p.end) means keep searching prev ← p; REPEAT FINISHED => --must be disjoint node at the end of the list. p ← New[prev, startSeq, endSeq]; ENDLOOP; --now compact the list so that there are no adjacent or overlapping seqments. FOR q ← p.next, q.next UNTIL q = NIL DO IF Less[endSeq, q.start-1] THEN EXIT; p.end ← Max[p.end, q.end]; --IF sanityChecking THEN ListSanityCheck[]; tmp ← q; p.next ← q.next; q ← p; --pull element from middle rcvr.inuse.length ← PRED[rcvr.inuse.length]; tmp.next ← rcvr.avail.head; rcvr.avail.head ← tmp; --put in avail list rcvr.avail.length ← SUCC[rcvr.avail.length]; --IF sanityChecking THEN ListSanityCheck[]; ENDLOOP; IF GreaterOrEqual[rcvr.nextSeq, rcvr.inuse.head.start] AND LessOrEqual[rcvr.nextSeq, rcvr.inuse.head.end] THEN {rcvr.nextSeq ← SUCC[rcvr.inuse.head.end]; NOTIFY rcvr.newInput}; END; --InsertData ProcessOptions: PROC [body: ArpaBuffer.Body] = BEGIN m: TcpStreamInternal.MaxSegment = LOOPHOLE[@body.tcp.options]; --**the option type should be a constant in TcpStreamInternal. IF m.type = 2 THEN xmtr.maxTcpBytes ← m.maxSize; END; --ProcessOptions ProcessTcpState: PUBLIC PROC [body: ArpaBuffer.Body] = --PROCESS: RECEIVER BEGIN ack: LONG CARDINAL ← SeqToCard[body.tcp.acknowledgement]; IF Greater[ack, xmtr.unackedSeq] THEN --process the acknowledgement. BEGIN --process the ack and do the necessary state changes. SELECT connection.state FROM synReceived, synSent => IF Greater[ack, xmtr.iss] THEN --notify any waiters {connection.state ← established; Notify[@connection.isEstablished]}; finWait1 => IF (ack = xmtr.nextSeq) AND (~body.tcp.fin) THEN connection.state ← finWait2; closing => IF (ack = xmtr.nextSeq) THEN {xmtr.timeWaitStart ← System.GetClockPulses[]; connection.state ← timeWait}; lastAck => IF (ack = xmtr.nextSeq) THEN connection.state ← closed; ENDCASE; xmtr.unackedSeq ← ack; --This is the only place unackedSeq gets updated. ConsumeAcked[]; END; xmtr.maxSeq ← xmtr.unackedSeq + body.tcp.window; --offered window minus outstanding data is usable window. IF (body.tcp.window - (xmtr.nextSeq - xmtr.unackedSeq)) > body.tcp.window / 4 THEN BEGIN IF rexmtr.list.head # NIL THEN Notify[@rexmtr.condition] ELSE Notify[@xmtr.newAllocation]; END; END; --ProcessTcpState RcvdTcpPkt: PROC [b: ArpaBuffer.Buffer] = << PROCESS: RECEIVER Parses incoming packets, no mean feat. There is some redundancy in ConnReq and ConnRep that should be looked at. >> BEGIN --so clients don't lose data when causing unwind after Close. ENABLE UNWIND => {IF CheckForData[b.arpa] THEN SendAck[]; ArpaBuffer.ReturnBuffer[b]}; body: ArpaBuffer.Body = b.arpa; len: CARDINAL ← GetDataLength[body].l; SELECT TRUE FROM connection.state = closed, --client aborted by calling delete connection.state = suspended => NULL; --or stream has been suspended. body.tcp.syn => --syn packet BEGIN IF ArpaFlags.doStats THEN ArpaStats.Incr[synsRcvd]; SELECT connection.state FROM listen => ConnRequest[b]; synSent => ConnReply[b]; ENDCASE => --should not get a syn in already synchronized states. BEGIN IF ~ValidSeq[body, len] THEN SendAck[] ELSE {SendRst[b]; SuspendStream[reset, remoteReject]; IF ArpaFlags.doStats THEN ArpaStats.Incr[badSyn]}; END; END; (body.tcp.sourcePort # connection.remotePort) => --Stray packet arrived on a listening conn (remotePort not yet estab.) {SendRst[b]; IF ArpaFlags.doStats THEN ArpaStats.Incr[badSourcePort]}; ~ValidSeq[body, len] => {SELECT TRUE FROM body.tcp.rst => NULL; --stray reset. (connection.state = listen) => SendRst[b]; --stray packet (not syn). (connection.state = synSent), (connection.state = synReceived) => --stray packet, reset sender. IF ~ValidAck[b] THEN SendRst[b]; (len = 0) AND (~body.tcp.fin) => NULL; --don't ack acks. (connection.state = timeWait) => {SendAck[]; xmtr.timeWaitStart ← System.GetClockPulses[]}; ENDCASE => SendAck[]; IF ArpaFlags.doStats THEN ArpaStats.Incr[seqOutOfRange]}; body.tcp.rst => --valid seq with rst. {SuspendStream[reset, remoteReject]; IF ArpaFlags.doStats THEN ArpaStats.Incr[resetsReceived]}; ~SecurityMatch[body] => {SendRst[b]; SELECT connection.state FROM established, finWait1, finWait2, closeWait, closing, lastAck, timeWait => SuspendStream[securityMismatch, securityMismatch]; ENDCASE; --closed, listen, synSent, synReceived; IF ArpaFlags.doStats THEN ArpaStats.Incr[badSecurity]}; ~PrecedenceMatch[body] => {SendRst[b]; SELECT connection.state FROM established, finWait1, finWait2, closeWait, closing, lastAck, timeWait => SuspendStream[precedenceMismatch, precedenceMismatch]; ENDCASE; --closed, listen, synSent, synReceived; IF ArpaFlags.doStats THEN ArpaStats.Incr[badPrecedence]}; ~body.tcp.ack => --every packet except original syn should carry ack. IF ArpaFlags.doStats THEN ArpaStats.Incr[badAck]; body.tcp.fin => SELECT connection.state FROM established, finWait1, finWait2 => BEGIN rcvr.finSig ← TRUE; rcvr.fin ← SeqToCard[body.tcp.sequence] + len; [] ← CheckForData[body]; SendAck[]; END; ENDCASE => IF ArpaFlags.doStats THEN ArpaStats.Incr[badFin]; --**Reset here? body.tcp.urg => --mark urgent pointer, earlier urgents are superceded BEGIN rcvr.urg ← Max[ rcvr.urg, SeqToCard[body.tcp.sequence] + LOOPHOLE[body.tcp.urgentPointer, CARDINAL]]; IF ~rcvr.urgSig THEN --multiple urgents are merged. {rcvr.urgSig ← TRUE; Notify[@rcvr.urgArrived]}; [] ← CheckForData[body]; SendAck[]; END; body.tcp.psh => --mark pushed data, earlier pushes are superceded. BEGIN lastSeq: LONG CARDINAL ← SeqToCard[body.tcp.sequence]; IF len # 0 THEN lastSeq ← lastSeq + len - 1; rcvr.push ← Max[lastSeq, rcvr.push]; rcvr.pushSig ← TRUE; [] ← CheckForData[body]; SendAck[]; END; ENDCASE => --ah, the vanilla data packet! IF CheckForData[body] THEN SendAck[]; ArpaBuffer.ReturnBuffer[b]; --This is where receive buffers get returned. END; --RcvdTcpPkt Receiver: PUBLIC PROC RETURNS [TcpOps.PSProc] = --PROCESS: THIS IS THE RECEIVER BEGIN b: ArpaBuffer.Buffer; body: ArpaBuffer.Body; Process.SetPriority[CommPriorities.receiver]; UNTIL connection.pleaseStop DO ENABLE ABORTED => EXIT; --get next packet from socket queue body ← (b ← ArpaPort.GetPacket[connection.port]).arpa; --is it worth it? SELECT TRUE FROM (body.ipHeader.protocol = tcp) => BEGIN IF ArpaFlags.doStats THEN ArpaStats.Incr[tcpsRcvd]; SELECT TRUE FROM (body.ipHeader.length < TcpStreamInternal.minTcpHeaderBytes + ArpaPort.minIPHeaderBytes) => {IF ArpaFlags.doStats THEN ArpaStats.Incr[pktTooShort]; ArpaBuffer.ReturnBuffer[b]}; (connection.remoteAddr = ArpaRouter.unknownInternetAddress) => RcvdTcpPkt[b]; --this could be the first syn. ArpaPortInternal.AddrMismatch[ ArpaPortInternal.BuildMasks[body.ipHeader.destination].hostMask, body.ipHeader.source, connection.remoteAddr] => {IF ArpaFlags.doStats THEN ArpaStats.Incr[badSource]; ArpaBuffer.ReturnBuffer[b]}; ENDCASE => RcvdTcpPkt[b]; END; --protocol = tcp (body.ipHeader.protocol = icmp) => BEGIN SELECT body.icmp.type FROM unreachable => {SuspendStream[noRouteToDestination, noRouteToDestination]; IF ArpaFlags.doStats THEN ArpaStats.Incr[icmpUnreachable]}; ENDCASE => NULL; ArpaBuffer.ReturnBuffer[b]; END; ENDCASE => BEGIN IF ArpaFlags.doStats THEN ArpaStats.Incr[badProtocol]; --****icmp error packet here? ArpaBuffer.ReturnBuffer[b]; END; ENDLOOP; RETURN[LOOPHOLE[Receiver]]; END; --Receiver RequeueProc: PUBLIC PROC [b: ArpaBuffer.Buffer] = << PROCESS: BUFFER MANAGER Puts the buffer in the retransmission list. >> BEGIN SELECT TRUE FROM --ARP should find this for us on next try. (b.fo.status = invalidDestAddr) => {InsertRexmt[b]; ConsumeAcked[]}; (b.fo.status ~IN[pending..aborted]) => BEGIN SuspendStream[noRouteToDestination, SELECT b.fo.status FROM noTranslationForDestination => noTranslationForDestination, noAnswerOrBusy => noAnswerOrBusy, circuitInUse => circuitInUse, circuitNotReady => circuitNotReady, dialerHardwareProblem => circuitNotReady, noDialingHardware => noDialingHardware, --noRouteToNetwork, hardwareProblem => noRouteToDestination, ENDCASE => noRouteToDestination]; ArpaBuffer.ReturnBuffer[b]; --don't keep this one - we're dead END; --put it on the retransmission list. ENDCASE => {InsertRexmt[b]; ConsumeAcked[]}; --poke the rexmt list cleaner. END; --RequeueProc Retransmitter: PUBLIC PROC RETURNS [TcpOps.PSProc] = --PROCESS: THIS IS THE RETRANSMITTER BEGIN time: LONG CARDINAL; b: ArpaBuffer.Buffer; ExtractHead: ENTRY PROC RETURNS [BOOLEAN ← TRUE] = BEGIN --stops access to the head of the list (which is being rexmittd). IF rexmtr.list.head = NIL THEN RETURN[FALSE]; --make sure it's still there. rexmtr.list.head ← ArpaBuffer.From[rexmtr.list.head.fo.next]; rexmtr.list.length ← PRED[rexmtr.list.length]; END; --ExtractHead UNTIL connection.pleaseStop DO ENABLE ABORTED => EXIT; BEGIN time ← System.GetClockPulses[]; SELECT TRUE FROM (connection.state = suspended) => GOTO suspended; --anything to retransmit? ((b ← rexmtr.list.head) # NIL) => SELECT TRUE FROM ((time - b.fo.time) < MIN[(rexmtr.interval * CARDINAL[b.fo.tries]), rexmtr.ceiling]) => NULL; --too soon ((b.fo.tries ← SUCC[b.fo.tries]) > rexmtr.giveUp) => GOTO suspend; ENDCASE => --data retransmission. IF ExtractHead[] THEN --ConsumeAcked may try to return it. BEGIN b.fo.time ← System.GetClockPulses[]; --record packet send time SendPacket[b]; --retransmit the packet IF ArpaFlags.doStats THEN ArpaStats.Incr[retransmitted]; END; << Remote has closed window or we have missed his window update - is it time to probe for allocation? >> (Greater[xmtr.nextSeq, xmtr.maxSeq]) AND (System.GetClockPulses[] - xmtr.lastSent) > (rexmtr.interval * 2) => SendAllocProbe[]; ENDCASE; --time to recompute retransmission interval? IF ((time - rexmtr.calculation) > rexmtr.calculationInterval) THEN BEGIN rexmtr.calculation ← time; --reset start of interval --retransmission interval recalculation IF (rexmtr.count # 0) THEN BEGIN --recalculate retransmission interval --REUSES LOCAL VARIABLE 'time' time ← rexmtr.delay / rexmtr.count; time ← time + (time / 2); --don't get too close rexmtr.delay ← 0; rexmtr.count ← 0; --weight old interval more than new (3 to 1) time ← ((rexmtr.interval * 3) + time) / 4; --interval is limited both low and high rexmtr.interval ← MAX[rexmtr.floor, MIN[rexmtr.ceiling, time]]; END; END; --recompute interval EXITS suspended => NULL; suspend => SuspendStream[transmissionTimeout, timeout]; END; --transmitter not blocked Wait[@rexmtr.condition]; ENDLOOP; RETURN[LOOPHOLE[Retransmitter]]; END; --Retransmitter SendAck: PROC [] = BEGIN b: ArpaBuffer.Buffer; --why send an empty ack when we have something real to send? IF rexmtr.list.head # NIL THEN {Notify[@rexmtr.condition]; RETURN}; b ← GetSystemBuffer[TRUE]; --he sets b.arpa.tcp.sequence SetHeaderFields[b, 0, 0]; SendPacket[b]; --**IF ArpaFlags.doStats THEN ArpaStats.Incr[acksSent]; END; --SendAck SendAllocProbe: ENTRY PROC = BEGIN << Sends a packet probing the remote to open his allocation window for us. This is the only case where a data packet is sent using a buffer obtained from GetSystemBuffer instead of GetOutputBuffer, as we want to always be able to obtain a buffer for a probe, and we don't want it put in the rexmtr table. >> b: ArpaBuffer.Buffer = GetSystemBuffer[TRUE]; body: ArpaBuffer.Body = b.arpa; blocked: ArpaBuffer.Body = xmtr.blocked.b.arpa; body.tcp.sequence ← blocked.tcp.sequence; SELECT TRUE FROM (xmtr.blocked.dataLen > 0) => BEGIN body.tcp.psh ← TRUE; --maybe this will make him answer. body.tcp.bytes[0] ← blocked.tcp.bytes[0]; SetHeaderFields[b, 1, 0]; END; (blocked.tcp.fin) => BEGIN body.tcp.fin ← TRUE; SetHeaderFields[b, 0, 0]; END; ENDCASE => --heaven knows what this is. BEGIN ArpaBuffer.ReturnBuffer[b]; RETURN; END; SendPacket[b]; END; --SendAllocProbe SendPacket: PUBLIC PROC [b: ArpaBuffer.Buffer] = BEGIN << PROCESS: RETRANSMITTER, CLIENT SENDING Sets the current state information, and transmits. (Or retransmits.) >> --never send a packet without current state information body: ArpaBuffer.Body = b.arpa; body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq]; body.tcp.window ← LowHalf[rcvr.maxSeq - rcvr.nextSeq] + 1; ArpaPort.PutPacket[connection.port, b]; xmtr.lastSent ← System.GetClockPulses[]; --this is sufficient for any ack. END; --SendPacket SendRst: PUBLIC PROC [offender: ArpaBuffer.Buffer] = BEGIN << Sends a reset packet in response to the offending packet. If offender is nil, then the reset is a user-initiated abort. >> off: ArpaBuffer.Body; b: ArpaBuffer.Buffer ← GetSystemBuffer[TRUE]; body: ArpaBuffer.Body = b.arpa; AssignSeq: ENTRY PROC = {body.tcp.sequence ← CardToSeq[xmtr.nextSeq]}; SetHeaderFields[b, 0, 0]; body.tcp.rst ← TRUE; --must be why we're here body.tcp.window ← 0; --This is probably silly. SELECT TRUE FROM (offender = NIL) => {AssignSeq[]; body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq]}; ((off ← offender.arpa).tcp.ack) => {body.tcp.sequence ← off.tcp.acknowledgement; body.tcp.ack ← FALSE} ENDCASE => BEGIN ack: LONG CARDINAL ← SeqToCard[off.tcp.sequence] + TcpStreamInternal.GetDataLength[off].l; body.tcp.sequence ← [0, 0]; body.tcp.acknowledgement ← CardToSeq[ack]; END; ArpaPort.PutPacket[connection.port, b]; IF ArpaFlags.doStats THEN ArpaStats.Incr[resetsSent]; END; --SendRst SendSyn: PUBLIC PROC [ack: BOOLEAN] = BEGIN << Send a syn packet, either connection request or response. The only difference is the acknowledge fields and the maxSize option. >> optionLen: CARDINAL ← 0; b: ArpaBuffer.Buffer = GetOutputBuffer[]; --sets ack field by default. body: ArpaBuffer.Body = b.arpa; IF ack THEN body.tcp.acknowledgement ← CardToSeq[rcvr.nextSeq] ELSE body.tcp.ack ← FALSE; IF advertiseSize THEN BEGIN --ends on a 32-bit boundry, so no zero padding. n: CARDINAL ← (ArpaBuffer.DataBytesPerRawBuffer[b]) - ArpaPort.minIPHeaderBytes - TcpStreamInternal.minTcpHeaderBytes - 18; -- 18 is ethernet encap. m: TcpStreamInternal.MaxSegment ← LOOPHOLE[@body.tcp.options]; m↑ ← [2, 4, n]; --and what the hell is this? optionLen ← SIZE[TcpStreamInternal.MaxSegmentObj] * bpw; rcvr.maxTcpBytes ← IF connection.offNet THEN MIN[n, TcpStreamInternal.maxTcpDataBytes] ELSE n; END; body.tcp.syn ← TRUE; --this is why we're here SetHeaderFields[b, 0, optionLen]; body.tcp.sequence ← CardToSeq[xmtr.iss]; body.tcp.window ← rcvr.inputSpace.size; ArpaPort.PutPacket[connection.port, b]; IF ArpaFlags.doStats THEN ArpaStats.Incr[synsSent]; END; --SendSyn SetWaitTime: ENTRY PROC [timeout: TcpStream.WaitTime] = BEGIN --no possibility of signals --time IN[maxMsecToPulses..INFINITY] => silly client NEVER wake up! rcvr.waitTime ← timeout; rcvr.interval ← IF timeout > maxMsecToPulses THEN LAST[LONG CARDINAL] ELSE System.MicrosecondsToPulses[timeout*1000]; IF rcvr.interval = LAST[LONG CARDINAL] THEN Process.DisableTimeout[@rcvr.newInput] ELSE Process.SetTimeout[ @rcvr.newInput, CommUtil.PulsesToTicks[[rcvr.interval]]]; END; --SetWaitTime StartTcpStream: PUBLIC TcpOps.StartTcpStreamProc = << PROC[ local, remote: ArpaRouter.InternetAddress, localPort, remotePort: ArpaRouter.Port, timeout: TcpStream.WaitTime, precedence: TcpStream.Precedence, security: TcpStream.Security, options: Environment.Block, establish: BOOLEAN, gf: LONG POINTER --TO FRAME[TcpImpl]--] RETURNS [ TcpStream.Handle, ArpaRouter.InternetAddress, ArpaRouter.Port]; Process is the client's. >> BEGIN time: LONG CARDINAL ← System.GetClockPulses[]; --Initialize stream object. tcpStreamObject ← [ destroy: NIL, --done from the outside(Mgr) put: PutBlock, get: GetBlock, waitForUrgent: WaitForUrgent, close: Close, setWaitTime: SetWaitTime, findAddresses: FindAddresses]; InitStream[ local: local, remote: remote, localPort: localPort, remotePort: remotePort, timeout: timeout, precedence: precedence, security: security, options: options]; output.b ← NIL; output.bufferSize ← 0; output.finger ← 0; SetWaitTime[timeout]; --and now create the stream (or listener). time ← System.GetClockPulses[]; SELECT TRUE FROM (connection.state = closed) AND establish => --active create BEGIN rcvr.process ← FORK Receiver[]; InitXmtr[]; connection.state ← synSent; SendSyn[ack: FALSE]; DO --UNTIL failed | established. Wait[@connection.isEstablished]; SELECT TRUE FROM (connection.state = suspended) => ERROR TcpStream.Failed[connection.whyFailed]; (connection.state = established), (connection.state = closeWait), (connection.state = closing), (connection.state = lastAck), (connection.state = timeWait) => EXIT; --stream is (or was) estab. --streams in listen, synSent, and synReceived can still fail. ((System.GetClockPulses[] - time) > rcvr.interval) => BEGIN time ← System.GetClockPulses[]; ERROR TcpStream.Failed[timeout]; END; ENDCASE; ENDLOOP; END; connection.state = closed => --create listener BEGIN connection.state ← listen; rcvr.process ← FORK Receiver[]; IF notifyListenerStarted # NIL THEN notifyListenerStarted[]; DO --UNTIL failed | established. Wait[@connection.isEstablished]; SELECT TRUE FROM (connection.state = suspended) => ERROR TcpStream.Failed[connection.whyFailed]; (connection.state = established), (connection.state = closeWait), (connection.state = closing), (connection.state = lastAck), (connection.state = timeWait) => EXIT; --stream is (or was) estab. --streams in listen, synSent, and synReceived can still fail. ((System.GetClockPulses[] - time) > rcvr.interval) => BEGIN time ← System.GetClockPulses[]; SIGNAL TcpStream.ListenTimeout; END; ENDCASE; ENDLOOP; END; ENDCASE => Driver.Glitch[IllegalState]; --How did we get here?! RETURN[@tcpStreamObject, connection.remoteAddr, connection.remotePort]; END; --StartTcpStream StopTcpStream: PUBLIC TcpOps.StopTcpStreamProc = --PROCESS: CLIENT STREAM DELETION. BEGIN --allowed for last ack to get to remote. timeWaitInterval: LONG CARDINAL ← rcvr.ackInterval * 2; TimeWaitLoop: PROC = BEGIN UNTIL (System.GetClockPulses[] - xmtr.timeWaitStart) > timeWaitInterval DO Process.Pause[1]; IF (connection.state = suspended) THEN EXIT; --got a reset and suspended. ENDLOOP; END; --TimeWaitLoop --Give the transmitter a kick in case he is waiting on allocation. Notify[@xmtr.newAllocation]; --try to let close protocol complete. SELECT connection.state FROM closed => NULL; lastAck => WHILE (xmtr.nextSeq # xmtr.unackedSeq) AND --until we get that last ack. (connection.state # suspended) DO --or we get a reset and suspend. Process.Pause[2]; ENDLOOP; closing => BEGIN WHILE (xmtr.nextSeq # xmtr.unackedSeq) AND --until we get that last ack (connection.state # suspended) DO --or we get a reset and suspend. Process.Pause[2]; ENDLOOP; --timer is reset if a retransmitted fin is received. xmtr.timeWaitStart ← System.GetClockPulses[]; connection.state ← timeWait; TimeWaitLoop[]; END; timeWait => TimeWaitLoop[]; suspended, listen => NULL; ENDCASE => SendRst[NIL]; connection.pleaseStop ← TRUE; IF rcvr.process # NIL THEN {Process.Abort[rcvr.process]; [] ← JOIN rcvr.process; rcvr.process ← NIL}; IF rexmtr.process # NIL THEN {Process.Abort[rexmtr.process]; [] ← JOIN rexmtr.process; rexmtr.process ← NIL}; connection.state ← closed; FlushDataList[@rcvr.inuse]; FlushDataList[@rcvr.avail]; FlushRexmtrList[]; IF output.b # NIL THEN ArpaBuffer.ReturnBuffer[output.b]; [] ← Space.Unmap[rcvr.inputSpace.space.pointer]; TcpPort.DeleteTcpPort[connection.port]; END; --StopTcpStream SuspendStream: PUBLIC ENTRY PROC [ suspend: TcpStream.SuspendReason, fail: TcpStream.FailureReason] = BEGIN --PROCESS: RECEIVER OR RETRANSMITTER SELECT connection.state FROM suspended => NULL; ENDCASE => BEGIN connection.stateBeforeSuspension ← connection.state; connection.state ← suspended; connection.whySuspended ← suspend; connection.whyFailed ← fail; END; << Notify the internal processes, so they will loop again and discover the stream has been suspended. >> NOTIFY rcvr.newInput; NOTIFY xmtr.newAllocation; << Abort any client waiting for a buffer. This will be translated into Suspended by the GetBuffer code. >> IF xmtr.clientProcess # NIL THEN Process.Abort[xmtr.clientProcess]; END; --SuspendStream PutBlock: PUBLIC PROC [block: Environment.Block, push, urgent: BOOLEAN] = --PROCESS: CLIENT SENDING BEGIN --monitored so ForceOut can check for output data pending. AssignBuffer: ENTRY PROC = INLINE {output.b ← b}; moved, n: CARDINAL ← 0; oBlock: Environment.Block; b: ArpaBuffer.Buffer ← NIL; --a push with no new data. IF (block.startIndex = block.stopIndexPlusOne) AND (push OR urgent) THEN {IF (output.b # NIL) THEN --is there any old data to be pushed? ForceOut[setPush: TRUE, setFin: FALSE, setUrg: TRUE]} ELSE WHILE block.startIndex < block.stopIndexPlusOne DO IF output.b = NIL THEN --need a new buffer. BEGIN b ← GetOutputBuffer[]; n ← (ArpaBuffer.DataBytesPerRawBuffer[b]) - ArpaPort.minIPHeaderBytes - TcpStreamInternal.minTcpHeaderBytes - 18; -- 18 ethernet encap AssignBuffer[]; output.finger ← 0; output.bufferSize ← MIN[IF connection.offNet THEN MIN[TcpStreamInternal.maxTcpDataBytes, xmtr.maxTcpBytes] ELSE xmtr.maxTcpBytes, n]; END; oBlock ← [ blockPointer: LOOPHOLE[@output.b.arpa.tcp.bytes], startIndex: output.finger, stopIndexPlusOne: output.bufferSize]; moved ← ByteBlt.ByteBlt[oBlock, block]; block.startIndex ← block.startIndex + moved; --Is packet full? IF (output.finger ← output.finger + moved) = output.bufferSize THEN --Push bit may get set on last packet, and this may be last packet (if --packet and client data end at the same time). ForceOut[ setPush: (block.startIndex >= block.stopIndexPlusOne) AND push, setFin: FALSE, setUrg: urgent]; REPEAT FINISHED => --pushed data indicates push on last packet. IF push AND (output.b # NIL) THEN ForceOut[setPush: TRUE, setFin: FALSE, setUrg: urgent]; ENDLOOP; END; --PutBlock UpdateInputSpace: INTERNAL PROC [consumed: CARDINAL] RETURNS [needsAck: BOOLEAN ← FALSE] = BEGIN << Updates the head of list based on how much data the client just consumed. >> tmp: TcpStreamInternal.DataPair; rcvr.inuse.head.start ← rcvr.inuse.head.start + consumed; rcvr.lastConsumed ← PRED[rcvr.inuse.head.start]; IF TcpStreamInternal.Greater[rcvr.inuse.head.start, rcvr.inuse.head.end] THEN BEGIN << Client consumed all the available (contiguous) data. So pull the DataPair out of the list and put it in the avail list. >> --IF sanityChecking THEN ListSanityCheck[]; tmp ← rcvr.inuse.head; rcvr.inuse.head ← tmp.next; rcvr.inuse.length ← PRED[rcvr.inuse.length]; tmp.next ← rcvr.avail.head; rcvr.avail.head ← tmp; rcvr.avail.length ← SUCC[rcvr.avail.length]; --IF sanityChecking THEN ListSanityCheck[]; END ELSE << rcvr.inputSpace.size => total space available CARDINAL[rcvr.nextSeq - rcvr.lastConsumed] => amount of space in use So, if he was blocked and we now have more than half of the data space available, prod him for some more data. This assumes that the input space size is at least twice as large as the packets being used. >> IF rcvr.remoteBlocked AND CARDINAL[ rcvr.nextSeq - rcvr.lastConsumed] < (rcvr.inputSpace.size / 2) THEN {rcvr.remoteBlocked ← FALSE; needsAck ← TRUE}; END; --UpdateInputSpace ValidAck: PROC [b: ArpaBuffer.Buffer] RETURNS [BOOLEAN] = BEGIN IF GreaterOrEqual[SeqToCard[b.arpa.tcp.acknowledgement], xmtr.iss] AND LessOrEqual[SeqToCard[b.arpa.tcp.acknowledgement], xmtr.nextSeq] THEN RETURN[TRUE] ELSE RETURN[FALSE]; END; --ValidAck ValidSeq: PROC [body: ArpaBuffer.Body, len: CARDINAL] RETURNS [accept: BOOLEAN ← FALSE] = BEGIN inSeq: LONG CARDINAL ← SeqToCard[body.tcp.sequence]; SELECT len FROM > 0 => --most common case of carrying at least one data byte. BEGIN lastByte: LONG CARDINAL ← inSeq + len - 1; SELECT TRUE FROM inSeq IN [rcvr.nextSeq..rcvr.maxSeq], --first byte in window. lastByte IN [rcvr.nextSeq..rcvr.maxSeq], --last byte in window. Greater[rcvr.nextSeq, inSeq] AND Less[rcvr.maxSeq, lastByte] => --some middle bytes in window. {ProcessTcpState[body]; accept ← TRUE}; ENDCASE; END; 0 => --zero bytes of data, but need to check for state. IF Greater[rcvr.nextSeq, rcvr.maxSeq] THEN --window closed, seq must equal receiver's next. {IF (inSeq = rcvr.nextSeq) THEN ProcessTcpState[body]; --can't accept, but process state anyway. } ELSE --window not closed. IF inSeq IN [rcvr.nextSeq..rcvr.maxSeq] THEN {ProcessTcpState[body]; accept ← TRUE}; ENDCASE; --length < 0?! END; --ValidSeq WaitForUrgent: ENTRY PROC [block: Environment.Block] = BEGIN ENABLE UNWIND => NULL; WAIT rcvr.urgArrived; END; --WaitForUrgent END... LOG 15-May-85 12:43:27 SMA Created file. 6-Jan-86 11:58:28 SMA More twiddles to CheckForData and InsertData 7-Jan-86 10:35:02 SMA Maximum packet size from 100 to 576. 23-Mar-86 16:19:40 SMA Months and months of debugging. 14-May-86 11:27:27 SMA ENABLE UNWIND in WaitForUrgent. 16-May-86 8:33:42 SMA Problems with out-of-seq one byte packets. 16-May-86 8:35:13 SMA Urgent passed to ForceOut 19-May-86 12:52:36 SMA Check incoming urgent before push. 4-Jun-86 11:49:15 SMA Set urgent pointer correctly. 5-Jun-86 12:45:33 SMA Fix push processing for empty packets. 13-Jun-86 11:10:22 SMA Check for suspended (from reset) in StopTcpStream. 18-Jun-86 14:28:24 SMA Reset b.time on retransmission. 19-Jun-86 14:49:08 SMA Integer arithmetic when calsulating usable window. 20-Jun-86 16:17:27 SMA Don't consume fin prematurely. 20-Jun-86 17:13:29 SMA Receiver more generous with acks. ? ALD/ISI Fix ceiling calc. in rexmtr, ICMP bug, etc. 8-Jul-86 14:42:02 JAV Fixed loop when pushed recieved in GetBlock 17-Dec-86 12:19:33 SMA Ack not generated on every GetBlock (use remoteBlocked). 7-Jan-87 15:27:53 SMA Initialize "stream markers" to initial receive seq. #. 2-Feb-87 11:42:15 SMA Added probing for allocation code. 9-Feb-87 19:35:06 SMA Receiver less generous with acks (sigh). 11-Mar-87 12:17:37 AOF Funston buffer management, et al. 2-Jun-87 10:55:52 AOF Caching of DataObject entries. 2-Jun-87 10:56:10 AOF Recoding of time to send ack computation. 3-Mar-88 13:47:16 AOF Use global variable for start/stop procs.