<> <> <> <> <> <> <> <> <<>> DIRECTORY Basics USING [BITNOT], BasicTime USING [GetClockPulses, MicrosecondsToPulses, Pulses, PulsesToMicroseconds], List USING [Nconc1], Process USING [Abort, DisableTimeout, MsecToTicks, Pause, SetTimeout], IPDefs USING [Byte, CreateIPHandle, DByte, Datagram, DatagramRec, DestroyIPHandle, Error, Address, InternetHandle, nullAddress, Receive, TCPProtocol], IPOps USING [OnesComplementAddBlock], TCP USING [Error, Reason, TCPInfo, Timeout], TCPOps USING [ConnectionState, recvBufferLength, sendBufferLength, TCPControlSet, TCPHandle, tcpHdrByteLength, TCPHeaderP, TCPSendBuffer], TCPLogging USING [PrintStateChange], TCPReceiving USING [ProcessRcvdSegment], TCPStates USING [Abort, Close, CloseConnection, CopyHandleList, GetInitialSequenceNumber, Open, ValidHandle], TCPTransmit USING [Rexmit, rexmitSleepTime, RepacketizeandRexmit, SendSYN, TryToSendData]; TCPOpsImpl: CEDAR MONITOR LOCKS handle USING handle: TCPHandle IMPORTS Basics, BasicTime, IPDefs, IPOps, List, Process, TCP, TCPLogging, TCPReceiving, TCPStates, TCPTransmit EXPORTS TCPOps ~ BEGIN OPEN TCPOps; defaultReceiveWindow: PUBLIC INT _ TCPOps.recvBufferLength; repacketizing: PUBLIC BOOL _ FALSE; ourLocalAddress: PUBLIC IPDefs.Address; tcpIPHandle: IPDefs.InternetHandle; retransmitProcess: PROCESS; receiverProcess: PROCESS; <> pktsSent: PUBLIC INT _ 0; pktsRcvd: PUBLIC INT _ 0; pktsRexmitted: PUBLIC INT _ 0; pktsDuplicate: PUBLIC INT _ 0; pktsWithNoConnection: PUBLIC INT _ 0; pktsFromFuture: PUBLIC INT _ 0; pktsFromPast: PUBLIC INT _ 0; pktsWithBadChecksum: PUBLIC INT _ 0; <> Open: PUBLIC PROC [tcpInfo: TCP.TCPInfo] RETURNS [handle: TCPHandle] ~ { <> RETURN TCPStates.Open[tcpInfo]; }; Close: PUBLIC PROC [handle: TCPHandle] ~ { <> TCPStates.Close[handle]; }; Abort: PUBLIC PROC [handle: TCPHandle] ~ { <> TCPStates.Abort[handle]; }; WaitForListenerOpen: PUBLIC ENTRY PROC [handle: TCPHandle, timeout: INT] ~ { <> startTime: BasicTime.Pulses; IF handle.state = listen THEN { startTime _ BasicTime.GetClockPulses[]; TRUSTED {IF timeout>0 THEN Process.SetTimeout[@handle.notListening, Process.MsecToTicks[MIN[timeout, 5000]]] ELSE Process.DisableTimeout[@handle.notListening]}; DO WAIT handle.notListening; IF handle.state # listen THEN EXIT; IF handle.dataTimeout >= 0 AND BasicTime.GetClockPulses[]-startTime > BasicTime.MicrosecondsToPulses[handle.dataTimeout*1000] THEN EXIT; ENDLOOP; }; }; SendCurrentDatagram: PUBLIC ENTRY PROC [handle: TCPHandle, push: BOOL] ~ { <> ENABLE UNWIND => NULL; control: TCPControlSet; GetNewOutputDatagram: PROC ~ { handle.currentOutputDatagram _ NEW[IPDefs.DatagramRec]; handle.currentOutputPtr _ tcpHdrByteLength; handle.currentOutputLimit _ handle.currentOutputPtr+handle.maxSegmentSize; }; TCPStates.ValidHandle[handle]; control.psh _ push; SELECT handle.state FROM listen => IF NOT (handle.matchForeignPort AND handle.matchForeignAddr) THEN ERROR TCP.Error[neverOpen] ELSE { handle.active _ TRUE; handle.iss _ TCPStates.GetInitialSequenceNumber[]; handle.sndUna _ handle.iss; TCPLogging.PrintStateChange[handle, synSent]; handle.state _ synSent; TCPTransmit.SendSYN[handle]; QueueSendSegment[handle, handle.currentOutputDatagram, handle.currentOutputPtr-tcpHdrByteLength, control]; GetNewOutputDatagram[]; }; synSent, synRcvd, established, closeWait => { QueueSendSegment[handle, handle.currentOutputDatagram, handle.currentOutputPtr-tcpHdrByteLength, control]; GetNewOutputDatagram[]; }; ENDCASE => ERROR TCP.Error[handle.reason]; }; GetNextDatagram: PUBLIC ENTRY PROC [handle: TCPHandle] ~ { <> ENABLE UNWIND => NULL; TCPStates.ValidHandle[handle]; handle.currentInputBuffer _ NIL; IF handle.readyToReadQueue = NIL THEN { timeoutTime: BasicTime.Pulses _ SetTimeout[handle.dataTimeout]; WHILE handle.readyToReadQueue=NIL DO IF NOT ValidRcvState[handle.state] THEN ERROR TCP.Error[handle.reason]; IF TimedOut[timeoutTime] THEN { SIGNAL TCP.Timeout; timeoutTime _ SetTimeout[handle.dataTimeout]; }; WAIT handle.dataAvailable; ENDLOOP; }; handle.currentInputBuffer _ NARROW[handle.readyToReadQueue.first]; handle.readyToReadQueue _ handle.readyToReadQueue.rest; handle.rcvWnd _ handle.rcvWnd + handle.currentInputBuffer.dataByteCount; }; SetUrgent: PUBLIC ENTRY PROC [handle: TCPHandle] ~ { <> ENABLE UNWIND => NULL; TCPStates.ValidHandle[handle]; IF NOT handle.sndUrgent THEN { handle.sndUrgent _ TRUE; handle.sndUp _ handle.sndNxt + handle.nBytesToSend; }; }; WaitForUrgentData: PUBLIC ENTRY PROC [handle: TCPHandle] RETURNS [urgentIndex: INT] ~ { <> ENABLE UNWIND => NULL; TCPStates.ValidHandle[handle]; WHILE NOT handle.urgentMode AND ValidRcvState[handle.state] DO WAIT handle.urgentAvailable; ENDLOOP; handle.urgentMode _ FALSE; IF ValidRcvState[handle.state] THEN RETURN [handle.rcvUp] ELSE ERROR TCP.Error[handle.reason]; }; <> TCPChecksum: PUBLIC PROC [data: IPDefs.Datagram] RETURNS [checksum: IPDefs.DByte] ~ TRUSTED { <> PseudoHeader: TYPE ~ MACHINE DEPENDENT RECORD [ sourceAddr (0): IPDefs.Address, dstnAddr (2): IPDefs.Address, zeroes (4: 0..7): IPDefs.Byte, protocol (4: 8..15): IPDefs.Byte, tcpTotalByteCount (5): IPDefs.DByte]; pseudoHeader: PseudoHeader; tcpHdrPtr: TCPHeaderP _ LOOPHOLE[@data.data]; cs: CARDINAL; pseudoHeader.sourceAddr _ data.inHdr.source; pseudoHeader.dstnAddr _ data.inHdr.destination; pseudoHeader.zeroes _ 0; pseudoHeader.protocol _ data.inHdr.protocol; pseudoHeader.tcpTotalByteCount _ data.dataLength; IF data.dataLength MOD 2 # 0 THEN data.data[data.dataLength] _ 0; cs _ IPOps.OnesComplementAddBlock[ptr: tcpHdrPtr, count: (data.dataLength+1)/2, initialSum: Basics.BITNOT[tcpHdrPtr.checksum]]; -- Start with negative of the checksum that's in the header so that we don't have to smash it to zero to compute the real checksum. cs _ IPOps.OnesComplementAddBlock[ptr: @pseudoHeader, count: PseudoHeader.SIZE, initialSum: cs]; RETURN [Basics.BITNOT[cs]]; -- return one's complement of computed sum }; ChecksumsMatch: PUBLIC PROC [c1, c2: IPDefs.DByte] RETURNS [BOOL] ~ { RETURN [c1 = c2 OR ((c1 = 0 OR c1 = 65535) AND (c2 = 0 OR c2 = 65535))]; }; ValidRcvState: PROC [state: ConnectionState] RETURNS [BOOL] ~ INLINE { <> RETURN [state IN [listen..finWait2]]; }; SetTimeout: PUBLIC PROC [delta: INT, base: BasicTime.Pulses _ BasicTime.GetClockPulses[]] RETURNS [BasicTime.Pulses] ~ { <> RETURN [IF delta>=0 THEN MAX[base+BasicTime.MicrosecondsToPulses[delta*1000], 1] ELSE 0]; }; TimedOut: PUBLIC PROC [timeoutTime: BasicTime.Pulses] RETURNS [BOOL] ~ { <> RETURN [timeoutTime#0 AND LOOPHOLE[BasicTime.GetClockPulses[]-timeoutTime, INT] >= 0]; }; <> CheckRexmitQueues: PROC = { <> l: LIST OF REF ANY; FOR l _ TCPStates.CopyHandleList[], l.rest WHILE l # NIL DO handle: TCPHandle _ NARROW[l.first]; CheckRexmit[handle ! TCP.Error => CONTINUE]; ENDLOOP; -- get next connection }; -- CheckRexmitQueues CheckRexmit: ENTRY PROC [handle: TCPHandle] ~ { <> ENABLE UNWIND => NULL; TCPStates.ValidHandle[handle]; IF handle.state = timeWait AND TimedOut[handle.timeWaitTime] THEN TCPStates.CloseConnection[handle, handle.reason] ELSE IF handle.rexmitQueue # NIL THEN { tcpSendBufferPtr: REF TCPSendBuffer _ NARROW[handle.rexmitQueue.first]; IF TimedOut[tcpSendBufferPtr.timeoutTime] THEN { -- timed out, close connection now: BasicTime.Pulses _ BasicTime.GetClockPulses[]; TCPStates.CloseConnection[handle, transmissionTimeout]; } ELSE IF TimedOut[tcpSendBufferPtr.rexmitTime] THEN IF repacketizing THEN -- optionally repacketize on rexmit TCPTransmit.RepacketizeandRexmit[handle, tcpSendBufferPtr] ELSE TCPTransmit.Rexmit[handle, tcpSendBufferPtr]; }; }; <> <> <> <<<0 => less,>> <<=0 => equal,>> <<>0 => greater,>> < ERROR];>> <<};>> QueueSendSegment: INTERNAL PROC [handle: TCPHandle, sendDatagram: IPDefs.Datagram, dataByteCount: INT, ctl: TCPControlSet] ~ TRUSTED { <> <> start: BasicTime.Pulses _ BasicTime.GetClockPulses[]; timeout: INT = handle.dataTimeout; windowSize: INT; tcpHdrPtr: TCPHeaderP _ LOOPHOLE[@sendDatagram.data]; <> windowSize _ IF repacketizing THEN MIN[sendBufferLength, handle.sndWnd] ELSE handle.sndWnd; IF windowSize - handle.nBytesToSend < dataByteCount THEN { timeoutTime: BasicTime.Pulses _ SetTimeout[handle.dataTimeout]; DO WAIT handle.windowAvailable; SELECT handle.state FROM synSent, synRcvd, established, closeWait => NULL; ENDCASE => TCP.Error[handle.reason]; windowSize _ IF repacketizing THEN MIN[sendBufferLength, handle.sndWnd] ELSE handle.sndWnd; IF windowSize - handle.nBytesToSend >= dataByteCount THEN EXIT; IF TimedOut[timeoutTime] THEN { now: BasicTime.Pulses _ BasicTime.GetClockPulses[]; duration: LONG CARDINAL = BasicTime.PulsesToMicroseconds[now-start]; SIGNAL TCP.Timeout; timeoutTime _ SetTimeout[handle.dataTimeout]; }; ENDLOOP; }; handle.nBytesToSend _ handle.nBytesToSend + dataByteCount; IF repacketizing THEN { -- if repacketizing data <> FOR i: INT IN [tcpHdrPtr.dataWordOffset*4..tcpHdrPtr.dataWordOffset*4+dataByteCount) DO handle.sendBuffer[handle.fillSlot] _ sendDatagram.data[i]; handle.fillSlot _ (handle.fillSlot + 1) MOD sendBufferLength; ENDLOOP; }; IF ctl.urg THEN { -- fill in urgent control and pointer tcpHdrPtr.urg _ TRUE; tcpHdrPtr.urgentPtr _ dataByteCount } <> ELSE { tcpHdrPtr.urg _ FALSE; tcpHdrPtr.urgentPtr _ 0; }; tcpHdrPtr.psh _ ctl.psh; tcpHdrPtr.ack _ FALSE; tcpHdrPtr.rst _ FALSE; tcpHdrPtr.syn _ FALSE; tcpHdrPtr.fin _ FALSE; tcpHdrPtr.checksum _ 0; sendDatagram.dataLength _ dataByteCount; -- set data length handle.toNetQueue _ List.Nconc1[handle.toNetQueue, sendDatagram]; -- put buffer on queue to send IF handle.state = established OR handle.state = closeWait THEN TCPTransmit.TryToSendData[handle]; -- send it if window permits IF ctl.psh THEN { -- Wait for what we have sent to get acked timeoutTime: BasicTime.Pulses _ SetTimeout[handle.dataTimeout]; WHILE handle.toNetQueue # NIL OR handle.rexmitQueue # NIL DO WAIT handle.windowAvailable; SELECT handle.state FROM synSent, synRcvd, established, closeWait => NULL; ENDCASE => TCP.Error[handle.reason]; IF TimedOut[timeoutTime] THEN { now: BasicTime.Pulses _ BasicTime.GetClockPulses[]; duration: LONG CARDINAL = BasicTime.PulsesToMicroseconds[now-start]; SIGNAL TCP.Timeout; timeoutTime _ SetTimeout[handle.dataTimeout]; }; ENDLOOP; } }; -- QueueSendSegment RetransmitProcessProc: PROC ~ { <> DO Process.Pause[Process.MsecToTicks[TCPTransmit.rexmitSleepTime]]; CheckRexmitQueues[]; ENDLOOP; }; ReceiverProcessProc: PROC ~ { <> ENABLE IPDefs.Error => GO TO GiveUp; DO data: IPDefs.Datagram _ IPDefs.Receive[tcpIPHandle]; IF data # NIL THEN TCPReceiving.ProcessRcvdSegment[data]; ENDLOOP; EXITS GiveUp => NULL; }; StartupTCP: PROC ~ { tcpIPHandle _ IPDefs.CreateIPHandle[[matchProtocol~TRUE, protocol~IPDefs.TCPProtocol, matchAddr~FALSE, address~IPDefs.nullAddress, matchLocalAddr~TRUE, localAddress~IPDefs.nullAddress]]; ourLocalAddress _ tcpIPHandle.localAddress; TRUSTED { retransmitProcess _ FORK RetransmitProcessProc[]; receiverProcess _ FORK ReceiverProcessProc[]; }; }; ShutdownTCP: PROC ~ { TRUSTED { Process.Abort[receiverProcess]; Process.Abort[retransmitProcess]; }; IPDefs.DestroyIPHandle[tcpIPHandle]; }; StartupTCP[]; END.