DIRECTORY BufferDefs USING [BufferAccessHandle, OisBuffer, Queue, QueueObject, QueueLength, QueueCleanup, QueueInitialize], CommFlags USING [doDebug, doStats], CommUtilDefs USING [EnableAborts], DriverDefs USING [Glitch], LoadState USING [SelfDestruct], OISCP USING [DequeueSpp, EnqueueSpp, MaybeGetFreeSendOisBufferFromPool, MaybeGetFreeReceiveOisBufferFromPool, ReturnFreeOisBuffer, unknownConnID], OISCPTypes USING [ConnectionID, maxBytesPerPkt, WaitTime], PacketStream, PrincOpsUtils USING [ByteBlt], Process USING [MsecToTicks, SetTimeout, Ticks, Yield], ProcessorFace USING [GetClockPulses, microsecondsPerHundredPulses], Router USING [XmitStatus], Socket USING [Abort, ChannelAborted, ChannelHandle, Create, Delete, GetPacket, GetStatus, PutPacket, TimeOut], SocketInternal USING [GetBufferPool, ChannelHandleToSocketHandle], NSAddress USING [NetworkAddress], StatsDefs USING [StatBump, StatIncr]; PacketStreamInstance: MONITOR [ localAddr, remoteAddr: NSAddress.NetworkAddress, localConnectionID, remoteConnectionID: OISCPTypes.ConnectionID, establish: BOOLEAN, timeout: OISCPTypes.WaitTime _ PacketStream.defaultWaitTime, classOfService: PacketStream.ClassOfService _ bulk] RETURNS [ PacketStream.Handle, NSAddress.NetworkAddress, OISCPTypes.ConnectionID] IMPORTS BufferDefs, CommUtilDefs, DriverDefs, LoadState, PrincOpsUtils, Process, ProcessorFace, Socket, SocketInternal, StatsDefs, PacketStream, OISCP SHARES BufferDefs = BEGIN OPEN DriverDefs, OISCP, Router, PacketStream, StatsDefs; Pulses: TYPE = RECORD[LONG CARDINAL]; NetworkAddress: PUBLIC TYPE = NSAddress.NetworkAddress; ps: Object _ [NIL, TakeFromUser, GetForUser, WaitForAttention, SetWaitTime, FindAddresses, GetSenderSizeLimit, ReturnGetSppDataBuffer]; -- pool (NIL) initted later state: PacketStream.State; whySuspended: PacketStream.SuspendReason; whyFailed: PacketStream.FailureReason; stateBeforeSuspension: PacketStream.State; connectionIsEstablished: CONDITION; socketCH: Socket.ChannelHandle; pool: BufferDefs.BufferAccessHandle; nextInputSeq, maxInputSeq: CARDINAL; unackedOutputSeq, nextOutputSeq, maxOutputSeq: CARDINAL; newAllocation: CONDITION; sendAnAck: BOOLEAN; maxOisPktLength: CARDINAL; newInputAttn: CONDITION; attnSeqTable: ARRAY (0..maxPendingAttns] OF CARDINAL; attnCount: CARDINAL; sentBuffer: BufferDefs.OisBuffer; -- hold the head of the sentQueue inOrderQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject]; inAttnQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject]; outOfOrderQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject]; tempQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject]; sentQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject]; inOrderQueueNotEmpty: CONDITION; ackRequestPulse: LONG CARDINAL; -- the time an ack was requested, a Pulse lastPacketReceivedPulse: LONG CARDINAL; lastProbePulse: LONG CARDINAL; -- retransmitter's last probe sent time, a Pulse probeCounter: CARDINAL; -- retransmitter's allocation probe counter probeRetransmitPulses: LONG CARDINAL; -- in Pulses dataPacketRetransmitPulses: Pulses; -- in Pulses waitPulses: LONG CARDINAL; delayCount: CARDINAL; -- number of data packets used in delay calculation delaySum: Pulses; lastDelayCalculationPulse: LONG CARDINAL; seqNumWhenDelayCalculated: CARDINAL; pleaseStop: BOOLEAN; letClientRun: CONDITION; -- this is also used in DeleteCleanup - do not ENABLE ABORTs retransmitterFork, receiverFork: PROCESS; retransmitterWakeupTimer: CONDITION; normalRetransUpdates: CARDINAL; -- **temp doubleRetransUpdates: CARDINAL; -- **temp bulkSendBufs: CARDINAL = 4; bulkReceiveBufs: CARDINAL = 6; bulkInitialAllocation: CARDINAL = 3; transactSendBufs: CARDINAL = 3; transactReceiveBufs: CARDINAL = 4; transactInitialAllocation: CARDINAL = 2; defaultSendAllocation: CARDINAL = 1; -- more will screw simple guys like 860 duplicateWindow: INTEGER = 100; maxPendingAttns: CARDINAL = 5; retransmissionsBeforeGiveUp: CARDINAL = 30; retransmissionsBeforeAskForAck: CARDINAL = 2; probesBeforeGiveUp: CARDINAL = 8; -- therefore 40 secs or inactivity emptyInOrderQueue: INTEGER = 0; minSendAllocationBeforeRequestingAck: INTEGER = 0; minRcvAllocation: INTEGER = 0; initialDataPacketRetransmitTime: CARDINAL = 2000; -- msecs maxDataPacketRetransmitTime: CARDINAL = 20000; -- msecs maxDataPacketRetransmitPulses: Pulses _ MilliSecondsToPulses[ maxDataPacketRetransmitTime]; initialRetransmitterTime: CARDINAL = 250; -- msecs probeMultiplier: CARDINAL = 4; -- probe time is this multiple of retrans timeout delayCalculationTime: CARDINAL = 10000; delayCalculationPulses: LONG CARDINAL _ MilliSecondsToPulses[ delayCalculationTime]; inactiveConnectionTime: CARDINAL = 5000; -- msecs inactiveConnectionPulses: LONG CARDINAL _ MilliSecondsToPulses[ inactiveConnectionTime]; fortyMsecsOfPulses: LONG CARDINAL _ MilliSecondsToPulses[40]; fiveHundredMsecsOfPulses: LONG CARDINAL _ MilliSecondsToPulses[500]; wOldAbove: CARDINAL _ 1; wOldBelow: CARDINAL _ 2; wNewAbove: CARDINAL _ 50; wNewBelow: CARDINAL _ 2; StreamNotEstablished: ERROR = CODE; StreamTerminating: ERROR = CODE; TakeFromUser: PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN TakeFromUserLocked: ENTRY PROCEDURE = INLINE BEGIN ENABLE UNWIND => NULL; DO SELECT state FROM unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Glitch[StreamNotEstablished]; established, open => IF LOOPHOLE[nextOutputSeq - maxOutputSeq, INTEGER] <= 0 THEN BEGIN b.ois.systemPacket _ FALSE; IF NOT b.ois.sendAck THEN b.ois.sendAck _ b.ois.attention OR LOOPHOLE[maxOutputSeq - nextOutputSeq, INTEGER] = minSendAllocationBeforeRequestingAck; b.ois.unusedType _ 0; PrepareSequencedPacket[b]; END ELSE BEGIN WAIT newAllocation; -- let ABORTED ERROR propogate LOOP; -- we start from the beginning in case the state changed END; suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended]; terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating]; ENDCASE; EXIT; ENDLOOP; END; -- TakeFromUserLocked TakeFromUserLocked[]; -- either returns or generates a SIGNAL. IF CommFlags.doStats THEN StatIncr[statDataPacketsSent]; IF CommFlags.doStats THEN StatBump[statDataBytesSent, b.ois.pktLength - bytesPerSequencedPktHeader]; PutPacketOnSocketChannel[b]; END; -- TakeFromUser GetForUser: PROCEDURE RETURNS [b: BufferDefs.OisBuffer] = BEGIN returnAnAck: BOOLEAN _ FALSE; GetForUserLocked: ENTRY PROCEDURE = INLINE BEGIN ENABLE UNWIND => NULL; startPulse, now: LONG CARDINAL; startPulse _ ProcessorFace.GetClockPulses[]; DO SELECT state FROM unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Glitch[StreamNotEstablished]; established, open => BEGIN IF (b _ DequeueSpp[inOrderQueue]) # NIL THEN BEGIN IF CommFlags.doStats THEN BEGIN StatIncr[statDataPacketsReceived]; StatBump[ statDataBytesReceived, b.ois.pktLength - bytesPerSequencedPktHeader]; END; END ELSE BEGIN now _ ProcessorFace.GetClockPulses[]; IF (now - startPulse) >= waitPulses THEN EXIT; WAIT inOrderQueueNotEmpty; -- let ABORTED ERROR propogate LOOP; -- we start from the beginning in case the state changed END; END; suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended]; terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating]; ENDCASE; EXIT; ENDLOOP; END; -- GetForUserLocked GetForUserLocked[]; END; -- GetForUser WaitForAttention: ENTRY PROCEDURE RETURNS [b: BufferDefs.OisBuffer] = BEGIN ENABLE UNWIND => NULL; DO SELECT state FROM unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Glitch[StreamNotEstablished]; established, open => BEGIN IF (b _ DequeueSpp[inAttnQueue]) # NIL THEN EXIT ELSE BEGIN WAIT newInputAttn; -- let ABORTED ERROR propogate LOOP; -- we start from the beginning in case the state changed END; END; suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended]; terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating]; ENDCASE; EXIT; ENDLOOP; END; -- WaitForAttention ReturnGetSppDataBuffer: PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN JustOpenedTheWindow: ENTRY PROCEDURE RETURNS [BOOLEAN] = INLINE {RETURN[LOOPHOLE[(maxInputSeq _ maxInputSeq + 1) - nextInputSeq, INTEGER] = 1]}; OISCP.ReturnFreeOisBuffer[b]; IF JustOpenedTheWindow[] THEN SendSystemPacket[FALSE]; -- just opened a closed window END; SendSystemPacket: PROCEDURE [sendAck: BOOLEAN] = BEGIN b: BufferDefs.OisBuffer; SendSystemPacketLocked: ENTRY PROCEDURE = INLINE BEGIN PrepareSequencedPacket[b]; END; -- SendSystemPacketLocked IF (b _ OISCP.MaybeGetFreeSendOisBufferFromPool[pool]) # NIL THEN BEGIN b.ois.pktLength _ bytesPerSequencedPktHeader; b.ois.systemPacket _ TRUE; b.ois.sendAck _ sendAck; b.ois.attention _ FALSE; b.ois.unusedType _ 0; b.ois.subtype _ 0; SendSystemPacketLocked[]; PutPacketOnSocketChannel[b]; END; END; -- SendSystemPacket PrepareSequencedPacket: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] = INLINE BEGIN b.tries _ 0; b.ois.transCntlAndPktTp.packetType _ sequencedPacket; b.ois.destination _ remoteAddr; b.ois.sourceConnectionID _ localConnectionID; b.ois.destinationConnectionID _ remoteConnectionID; b.ois.sequenceNumber _ nextOutputSeq; b.ois.acknowledgeNumber _ nextInputSeq; b.ois.allocationNumber _ maxInputSeq; IF NOT b.ois.systemPacket THEN nextOutputSeq _ nextOutputSeq + 1 ELSE BEGIN IF CommFlags.doStats THEN StatIncr[statAcksSent]; IF CommFlags.doStats THEN StatIncr[statSystemPacketsSent]; END; IF b.ois.sendAck AND CommFlags.doStats THEN StatIncr[statAckRequestsSent]; IF b.ois.attention AND CommFlags.doStats THEN StatIncr[statAttentionsSent]; END; -- PrepareSequencedPacket PutPacketOnSocketChannel: PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN IF ~b.ois.systemPacket THEN BEGIN b.requeueProcedure _ LOOPHOLE[EnqueueTransmittedPacketAppropriately]; IF b.ois.sendAck THEN ackRequestPulse _ ProcessorFace.GetClockPulses[]; END; Socket.PutPacket[ socketCH, b ! Socket.ChannelAborted => BEGIN b.requeueProcedure[b]; CONTINUE; END]; END; -- PutPacketOnSocketChannel EnqueueTransmittedPacketAppropriately: ENTRY PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN FailConnection: INTERNAL PROCEDURE [why: FailureReason] = BEGIN SuspendStream[noRouteToDestination]; whyFailed _ why; -- inform the creator if there is one END; -- FailConnection SELECT LOOPHOLE[b.status, XmitStatus] FROM pending, goodCompletion, aborted, invalidDestAddr => NULL; noRouteToNetwork, hardwareProblem => FailConnection[noRouteToDestination]; noTranslationForDestination => FailConnection[noTranslationForDestination]; noAnswerOrBusy => FailConnection[noAnswerOrBusy]; circuitInUse => FailConnection[circuitInUse]; circuitNotReady => FailConnection[circuitNotReady]; noDialingHardware => FailConnection[noDialingHardware]; dialerHardwareProblem => FailConnection[noRouteToDestination]; ENDCASE => ERROR; EnqueueSpp[sentQueue, b]; IF b.tries = 0 AND b.ois.sendAck THEN b.time _ ProcessorFace.GetClockPulses[]; END; -- EnqueueTransmittedPacketAppropriately SuspendStream: INTERNAL PROCEDURE [why: SuspendReason] = BEGIN stateBeforeSuspension _ state; state _ suspended; whySuspended _ why; END; -- SuspendStream SuspendStreamLocked: ENTRY PROCEDURE [why: SuspendReason] = BEGIN SuspendStream[why]; END; -- SuspendStreamLocked Retransmitter: PROCEDURE = BEGIN now: LONG CARDINAL; retransBuffer: BufferDefs.OisBuffer; TimeToSendAProbe: ENTRY PROCEDURE RETURNS [goAhead: BOOLEAN] = INLINE BEGIN goAhead _ FALSE; IF state = established OR state = open THEN BEGIN IF ((LOOPHOLE[nextOutputSeq - maxOutputSeq, INTEGER] > 0 AND (now - lastProbePulse) > probeRetransmitPulses)) OR (((now - lastPacketReceivedPulse) >= inactiveConnectionPulses AND (now - lastProbePulse) >= inactiveConnectionPulses)) THEN BEGIN lastProbePulse _ now; IF (probeCounter _ probeCounter + 1) > probesBeforeGiveUp THEN SuspendStream[transmissionTimeout] ELSE BEGIN goAhead _ TRUE; IF CommFlags.doStats THEN StatIncr[statProbesSent]; END; END; END; END; -- TimeToSendAProbe RetransmissionCount: ENTRY PROCEDURE RETURNS [CARDINAL] = INLINE BEGIN IF state = established OR state = open THEN BEGIN IF (now - ackRequestPulse) >= dataPacketRetransmitPulses THEN RETURN[sentQueue.length + (IF sentBuffer = NIL THEN 0 ELSE 1)] ELSE RETURN[0]; -- not yet time to retransmit END ELSE RETURN[0]; -- any other state requires no retransmission. END; -- RetransmissionCount GetFromRetransmissionQueue: ENTRY PROCEDURE RETURNS [b: BufferDefs.OisBuffer] = INLINE BEGIN IF sentBuffer = NIL THEN b _ DequeueSpp[sentQueue] ELSE BEGIN b _ sentBuffer; sentBuffer _ NIL; END; IF b = NIL THEN RETURN; SELECT b.tries FROM >= retransmissionsBeforeGiveUp => BEGIN -- give up trying to send anything on this packet stream. SuspendStream[transmissionTimeout]; sentBuffer _ b; -- put back at head of sentQueue b _ NIL; END; = retransmissionsBeforeAskForAck => b.ois.sendAck _ TRUE; ENDCASE; END; -- GetFromRetransmissionQueue WaitForAWhile: ENTRY PROCEDURE = INLINE BEGIN WAIT retransmitterWakeupTimer; END; -- WaitForAWhile UpdateRetransmitTime: PROCEDURE = INLINE BEGIN avgDelay: Pulses; weightNewAbove: CARDINAL; IF ((now - lastDelayCalculationPulse) >= delayCalculationPulses) THEN BEGIN -- time to do retransmission timeout determination IF delayCount > 0 THEN BEGIN avgDelay _ [delaySum/delayCount]; normalRetransUpdates _ normalRetransUpdates + 1; -- **temp weightNewAbove _ IF avgDelay > fiveHundredMsecsOfPulses THEN 1 ELSE wNewAbove; dataPacketRetransmitPulses _ [(dataPacketRetransmitPulses / wOldBelow)* wOldAbove + (avgDelay / wNewBelow)* weightNewAbove]; delaySum _ [0]; delayCount _ 0; END ELSE BEGIN IF unackedOutputSeq > seqNumWhenDelayCalculated THEN BEGIN doubleRetransUpdates _ doubleRetransUpdates + 1; -- **temp dataPacketRetransmitPulses _ [dataPacketRetransmitPulses*2]; END; END; seqNumWhenDelayCalculated _ unackedOutputSeq; dataPacketRetransmitPulses _ [MIN[ dataPacketRetransmitPulses, maxDataPacketRetransmitPulses]]; probeRetransmitPulses _ probeMultiplier*dataPacketRetransmitPulses + fortyMsecsOfPulses; Process.SetTimeout[ @retransmitterWakeupTimer, PulsesToTicks[[dataPacketRetransmitPulses/4]]]; END; END; -- UpdateRetransmitTime UNTIL pleaseStop DO now _ ProcessorFace.GetClockPulses[]; THROUGH (0..RetransmissionCount[]] DO IF (retransBuffer _ GetFromRetransmissionQueue[]) = NIL THEN EXIT; IF LOOPHOLE[retransBuffer.ois.sequenceNumber - unackedOutputSeq, INTEGER] < 0 THEN OISCP.ReturnFreeOisBuffer[retransBuffer] ELSE BEGIN retransBuffer.tries _ retransBuffer.tries + 1; PutPacketOnSocketChannel[retransBuffer]; -- requeues buffer on sentQueue retransBuffer _ NIL; IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted]; END; ENDLOOP; IF TimeToSendAProbe[] THEN SendSystemPacket[TRUE]; WaitForAWhile[]; UpdateRetransmitTime[]; ENDLOOP; END; -- Retransmitter PulsesToTicks: PROCEDURE [pulses: Pulses] RETURNS [Process.Ticks] = INLINE BEGIN lastCard: LONG CARDINAL = LAST[CARDINAL]; msecs: LONG CARDINAL _ (((pulses+50)/100)*LONG[ProcessorFace.microsecondsPerHundredPulses]+500)/1000 + 1; RETURN[Process.MsecToTicks[IF msecs < lastCard THEN msecs ELSE LAST[CARDINAL]]]; END; Receiver: PROCEDURE = BEGIN b: BufferDefs.OisBuffer; UNTIL pleaseStop DO b _ Socket.GetPacket[ socketCH ! Socket.TimeOut => RETRY; Socket.ChannelAborted => EXIT]; SELECT LOOPHOLE[b.status, XmitStatus] FROM goodCompletion => SELECT b.ois.transCntlAndPktTp.packetType FROM sequencedPacket => GotSequencedPacket[LOOPHOLE[b, BufferDefs.OisBuffer]]; error => GotErrorPacket[b]; ENDCASE => BEGIN OISCP.ReturnFreeOisBuffer[b]; IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType]; END; ENDCASE => OISCP.ReturnFreeOisBuffer[b]; ENDLOOP; END; -- Receiver GotSequencedPacket: PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN returnAnAck: BOOLEAN _ FALSE; giveBackToSocket: BOOLEAN _ TRUE; -- unless we put on some other queue nowPulse: LONG CARDINAL; -- set if we get a packet for our connection GotSequencedPacketLocked: ENTRY PROCEDURE = INLINE BEGIN SELECT state FROM unestablished, suspended, terminating => NULL; -- not interested. activeEstablish, waitEstablish => [returnAnAck, giveBackToSocket] _ EstablishThisConnection[b]; established, open => BEGIN IF b.ois.source.host = remoteAddr.host AND b.ois.source.socket = remoteAddr.socket AND b.ois.sourceConnectionID = remoteConnectionID AND b.ois.destinationConnectionID = localConnectionID THEN BEGIN -- the packet is for this connection probeCounter _ 0; -- other end is alive lastPacketReceivedPulse _ nowPulse _ ProcessorFace.GetClockPulses[]; SELECT LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] FROM IN [0..duplicateWindow] => RightOrEarlyPacket[]; IN [-duplicateWindow..0) => DuplicatePacket[]; ENDCASE => IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedVeryLate] END ELSE BEGIN IF CommFlags.doStats AND (b.ois.source.host # remoteAddr.host OR b.ois.source.socket # remoteAddr.socket) THEN StatIncr[statPacketsRejectedBadSource]; IF b.ois.sourceConnectionID # remoteConnectionID OR b.ois.destinationConnectionID # localConnectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; returnAnAck _ TRUE; -- probably our connection response got lost END; END; END; ENDCASE; IF LOOPHOLE[maxInputSeq - nextInputSeq, INTEGER] < minRcvAllocation THEN WAIT letClientRun; END; -- GotSequencedPacketLocked RightOrEarlyPacket: INTERNAL PROCEDURE = INLINE BEGIN IF b.ois.sendAck THEN BEGIN returnAnAck _ TRUE; IF CommFlags.doStats THEN StatIncr[statAckRequestsReceived]; END; IF LOOPHOLE[b.ois.allocationNumber - maxOutputSeq, INTEGER] > 0 THEN BEGIN maxOutputSeq _ b.ois.allocationNumber; NOTIFY newAllocation; END; IF LOOPHOLE[b.ois.acknowledgeNumber - unackedOutputSeq, INTEGER] > 0 THEN BEGIN unackedOutputSeq _ b.ois.acknowledgeNumber; ackRequestPulse _ nowPulse; -- if we get an ack we move the ack req. time END; IF sentBuffer = NIL THEN sentBuffer _ DequeueSpp[sentQueue]; UNTIL sentBuffer = NIL OR LOOPHOLE[sentBuffer.ois.sequenceNumber - unackedOutputSeq, INTEGER] >= 0 DO IF sentBuffer.ois.sendAck AND sentBuffer.tries = 0 THEN BEGIN -- constitutes a measure of lossless delay (no retransmissions) delaySum _ [delaySum + (nowPulse - sentBuffer.time)]; delayCount _ delayCount + 1; END; OISCP.ReturnFreeOisBuffer[sentBuffer]; sentBuffer _ DequeueSpp[sentQueue]; ENDLOOP; IF b.ois.systemPacket THEN BEGIN IF CommFlags.doStats THEN BEGIN StatIncr[statSystemPacketsReceived]; StatIncr[statAcksReceived]; END; END ELSE BEGIN IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN BEGIN IF LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] = 0 THEN BEGIN state _ open; DO nextInputSeq _ nextInputSeq + 1; IF attnCount # 0 THEN UpdateAttnSeqTable[]; EnqueueSpp[inOrderQueue, b]; giveBackToSocket _ FALSE; NOTIFY inOrderQueueNotEmpty; DO IF BufferDefs.QueueLength[outOfOrderQueue] = 0 THEN BEGIN b _ NIL; EXIT; END; b _ DequeueSpp[outOfOrderQueue]; SELECT LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] FROM 0 => EXIT; IN (0..duplicateWindow] => EnqueueSpp[tempQueue, b]; ENDCASE => OISCP.ReturnFreeOisBuffer[b]; ENDLOOP; UNTIL (tempQueue.first) = NIL DO EnqueueSpp[outOfOrderQueue, DequeueSpp[tempQueue]]; ENDLOOP; IF b = NIL THEN EXIT; ENDLOOP; END ELSE IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedEarly]; END; END; END; -- RightOrEarlyPacket DuplicatePacket: INTERNAL PROCEDURE = INLINE BEGIN IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedAgain]; IF b.ois.sendAck THEN returnAnAck _ TRUE; END; -- DuplicatePacket IF b.ois.pktLength >= bytesPerSequencedPktHeader THEN BEGIN GotSequencedPacketLocked[]; IF returnAnAck THEN SendSystemPacket[FALSE]; -- we may have to send the ack END ELSE IF CommFlags.doStats THEN StatIncr[statEmptyFunnys]; IF giveBackToSocket THEN OISCP.ReturnFreeOisBuffer[b]; END; -- GotSequencedPacket AttentionPacketProcessed: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] RETURNS [BOOLEAN] = BEGIN i: CARDINAL _ 1; alreadyThere: BOOLEAN _ FALSE; advanceB: BufferDefs.OisBuffer; nBytes: CARDINAL; IF attnCount = maxPendingAttns THEN RETURN[FALSE]; -- no space IF attnCount # 0 -- see if an entry is already there THEN FOR i IN (0..attnCount] DO IF attnSeqTable[i] = b.ois.sequenceNumber THEN BEGIN alreadyThere _ TRUE; EXIT; END; ENDLOOP; IF NOT alreadyThere THEN -- because attnCount=0 or because really not there! BEGIN IF (advanceB _ MaybeGetFreeReceiveOisBufferFromPool[pool]) = NIL THEN RETURN[FALSE]; -- cop out attnSeqTable[i] _ b.ois.sequenceNumber; attnCount _ attnCount + 1; nBytes _ PrincOpsUtils.ByteBlt[to: [@advanceB.ois, 0, b.ois.pktLength], from: [@b.ois, 0, b.ois.pktLength]]; EnqueueSpp[inAttnQueue, advanceB]; NOTIFY newInputAttn; END; IF CommFlags.doStats THEN StatIncr[statAttentionsReceived]; RETURN[TRUE]; END; -- AttentionPacketProcessed UpdateAttnSeqTable: INTERNAL PROCEDURE = BEGIN i: CARDINAL _ 1; DO IF i > attnCount THEN EXIT; IF LOOPHOLE[(attnSeqTable[i] - nextInputSeq), INTEGER] < 0 THEN BEGIN attnSeqTable[i] _ attnSeqTable[attnCount]; attnSeqTable[attnCount] _ 0; attnCount _ attnCount - 1; END ELSE i _ i + 1; ENDLOOP; END; -- UpdateAttnSeqTable GotErrorPacket: ENTRY PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN ENABLE UNWIND => NULL; IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived]; SELECT b.ois.errorType FROM noSocketOisErrorCode => SELECT state FROM unestablished, activeEstablish, waitEstablish => {whyFailed _ noServiceAtDestination; SuspendStream[noRouteToDestination]}; established, open => SuspendStream[remoteServiceDisappeared]; ENDCASE => NULL; ENDCASE => NULL; OISCP.ReturnFreeOisBuffer[b]; END; -- GotErrorPacket SetWaitTime: ENTRY PROCEDURE [time: OISCPTypes.WaitTime] = BEGIN waitPulses _ MilliSecondsToPulses[time]; END; -- SetWaitTime FindAddresses: ENTRY PROCEDURE RETURNS [local, remote: NetworkAddress] = BEGIN local _ localAddr; remote _ remoteAddr; END; -- FindAddresses GetSenderSizeLimit: ENTRY PROCEDURE RETURNS [CARDINAL] = BEGIN RETURN[maxOisPktLength - bytesPerSequencedPktHeader]; END; EstablishThisConnection: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] RETURNS [returnAnAck, giveBackToSocket: BOOLEAN] = BEGIN returnAnAck _ FALSE; giveBackToSocket _ TRUE; IF b.ois.source.host = remoteAddr.host AND b.ois.source.socket = remoteAddr.socket AND b.ois.destinationConnectionID = localConnectionID AND b.ois.sourceConnectionID # unknownConnID AND b.ois.sequenceNumber = 0 THEN BEGIN remoteConnectionID _ b.ois.sourceConnectionID; maxOutputSeq _ b.ois.allocationNumber; state _ established; IF b.ois.sendAck THEN returnAnAck _ TRUE; IF NOT b.ois.systemPacket THEN BEGIN IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN BEGIN nextInputSeq _ nextInputSeq + 1; EnqueueSpp[inOrderQueue, b]; giveBackToSocket _ FALSE; IF attnCount # 0 THEN UpdateAttnSeqTable[]; state _ open; NOTIFY inOrderQueueNotEmpty; END; END ELSE IF CommFlags.doStats THEN StatIncr[statSystemPacketsReceived]; NOTIFY connectionIsEstablished; END ELSE IF b.ois.source.host = remoteAddr.host AND b.ois.source.socket = remoteAddr.socket AND b.ois.destinationConnectionID = unknownConnID AND b.ois.sourceConnectionID # unknownConnID AND b.ois.sequenceNumber = 0 THEN BEGIN remoteConnectionID _ b.ois.sourceConnectionID; maxOutputSeq _ b.ois.allocationNumber; IF state = waitEstablish THEN returnAnAck _ TRUE; state _ established; IF b.ois.sendAck THEN returnAnAck _ TRUE; IF CommFlags.doStats AND b.ois.systemPacket THEN StatIncr[statSystemPacketsReceived]; NOTIFY connectionIsEstablished; END ELSE IF state = activeEstablish AND b.ois.source.host = remoteAddr.host AND b.ois.source.socket # remoteAddr.socket AND b.ois.destinationConnectionID = localConnectionID AND b.ois.sourceConnectionID # unknownConnID AND b.ois.sequenceNumber = 0 THEN BEGIN remoteAddr.socket _ b.ois.source.socket; remoteConnectionID _ b.ois.sourceConnectionID; maxOutputSeq _ b.ois.allocationNumber; state _ established; IF b.ois.sendAck THEN returnAnAck _ TRUE; IF NOT b.ois.systemPacket THEN BEGIN IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN BEGIN nextInputSeq _ nextInputSeq + 1; EnqueueSpp[inOrderQueue, b]; giveBackToSocket _ FALSE; IF attnCount # 0 THEN UpdateAttnSeqTable[]; state _ open; NOTIFY inOrderQueueNotEmpty; END; END ELSE IF CommFlags.doStats THEN StatIncr[statSystemPacketsReceived]; NOTIFY connectionIsEstablished; END ELSE IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; IF returnAnAck AND CommFlags.doStats THEN StatIncr[statAckRequestsReceived]; END; -- EstablishThisConnection ActivelyEstablish: PROCEDURE = BEGIN currentPulse, startPulse: LONG CARDINAL; WaitUntilActivelyEstablished: ENTRY PROCEDURE RETURNS [BOOLEAN] = INLINE BEGIN ENABLE UNWIND => NULL; WAIT connectionIsEstablished; IF state = established OR state = open THEN RETURN[TRUE] ELSE BEGIN IF state = suspended THEN RETURN WITH ERROR ConnectionFailed[whyFailed]; currentPulse _ ProcessorFace.GetClockPulses[]; IF (currentPulse - startPulse) >= waitPulses THEN BEGIN SIGNAL ConnectionFailed[timeout]; startPulse _ ProcessorFace.GetClockPulses[]; END; RETURN[FALSE]; END; END; -- WaitUntilActivelyEstablished startPulse _ ProcessorFace.GetClockPulses[]; DO SendSystemPacket[TRUE]; IF WaitUntilActivelyEstablished[] -- this returns or raises a SIGNAL -- THEN RETURN; ENDLOOP; END; -- ActivelyEstablish WaitUntilEstablished: ENTRY PROCEDURE = BEGIN ENABLE UNWIND => NULL; currentPulse, startPulse: LONG CARDINAL; startPulse _ ProcessorFace.GetClockPulses[]; DO WAIT connectionIsEstablished; IF state = established OR state = open THEN RETURN ELSE BEGIN currentPulse _ ProcessorFace.GetClockPulses[]; IF (currentPulse - startPulse) >= waitPulses THEN BEGIN SIGNAL ConnectionFailed[timeout]; startPulse _ ProcessorFace.GetClockPulses[]; END; END; ENDLOOP; END; -- WaitUntilEstablished Delete: PUBLIC PROCEDURE = BEGIN DeleteActivate[]; Socket.Abort[socketCH]; JOIN retransmitterFork; JOIN receiverFork; DeleteCleanup[]; Socket.Delete[socketCH]; LoadState.SelfDestruct[]; END; -- Delete DeleteActivate: ENTRY PROCEDURE = INLINE BEGIN state _ terminating; pleaseStop _ TRUE; NOTIFY retransmitterWakeupTimer; END; -- DeleteActivate DeleteCleanup: ENTRY PROCEDURE = INLINE BEGIN OPEN SocketInternal; ENABLE UNWIND => NULL; IF sentBuffer # NIL THEN OISCP.ReturnFreeOisBuffer[sentBuffer]; BufferDefs.QueueCleanup[inOrderQueue]; BufferDefs.QueueCleanup[inAttnQueue]; BufferDefs.QueueCleanup[outOfOrderQueue]; BufferDefs.QueueCleanup[tempQueue]; WHILE sentQueue.length {socketCH _ Socket.Create[localAddr, bulkSendBufs, bulkReceiveBufs, 0, TRUE]; maxInputSeq _ bulkInitialAllocation}; transactional => {socketCH _ Socket.Create[localAddr, transactSendBufs, transactReceiveBufs, 0, TRUE]; maxInputSeq _ transactInitialAllocation}; ENDCASE => ERROR; pool _ ps.pool _ SocketInternal.GetBufferPool[ SocketInternal.ChannelHandleToSocketHandle[socketCH]]; localAddr _ Socket.GetStatus[socketCH].localAddr; IF remoteConnectionID # unknownConnID THEN state _ established; Process.SetTimeout[@newAllocation, Process.MsecToTicks[1000]]; CommUtilDefs.EnableAborts[@newAllocation]; sendAnAck _ FALSE; maxOisPktLength _ OISCPTypes.maxBytesPerPkt; Process.SetTimeout[@newInputAttn, Process.MsecToTicks[1000]]; CommUtilDefs.EnableAborts[@newInputAttn]; FOR attnCount IN (0..maxPendingAttns] DO attnSeqTable[attnCount] _ 0; ENDLOOP; attnCount _ 0; sentBuffer _ NIL; BufferDefs.QueueInitialize[inOrderQueue]; BufferDefs.QueueInitialize[inAttnQueue]; BufferDefs.QueueInitialize[outOfOrderQueue]; BufferDefs.QueueInitialize[tempQueue]; BufferDefs.QueueInitialize[sentQueue]; Process.SetTimeout[@inOrderQueueNotEmpty, Process.MsecToTicks[1000]]; CommUtilDefs.EnableAborts[@inOrderQueueNotEmpty]; ackRequestPulse _ lastProbePulse _ lastDelayCalculationPulse _ lastPacketReceivedPulse _ ProcessorFace.GetClockPulses[]; probeCounter _ 0; dataPacketRetransmitPulses _ MilliSecondsToPulses[ initialDataPacketRetransmitTime]; -- msecs converted to Pulses probeRetransmitPulses _ MilliSecondsToPulses[ probeMultiplier*initialDataPacketRetransmitTime + 40]; delaySum _ [0]; delayCount _ 0; normalRetransUpdates _ doubleRetransUpdates _ 0; -- **temp waitPulses _ MilliSecondsToPulses[timeout]; -- msecs converted to Pulses pleaseStop _ FALSE; Process.SetTimeout[@letClientRun, Process.MsecToTicks[250]]; -- this is also used in DeleteCleanup - do not ENABLE ABORTs Process.SetTimeout[ @retransmitterWakeupTimer, Process.MsecToTicks[initialRetransmitterTime]]; IF state = unestablished THEN BEGIN IF establish THEN BEGIN state _ activeEstablish; retransmitterFork _ FORK Retransmitter[]; receiverFork _ FORK Receiver[]; Process.Yield[]; ActivelyEstablish[]; END ELSE BEGIN state _ waitEstablish; retransmitterFork _ FORK Retransmitter[]; receiverFork _ FORK Receiver[]; Process.Yield[]; WaitUntilEstablished[]; END; END ELSE BEGIN SendSystemPacket[FALSE]; -- gratuitous ack in case we are a server agent now. retransmitterFork _ FORK Retransmitter[]; receiverFork _ FORK Receiver[]; Process.Yield[]; END; waitPulses _ MilliSecondsToPulses[PacketStream.infiniteWaitTime]; -- Gets, WaitAttentions default to very long RETURN[@ps, remoteAddr, remoteConnectionID]; END. -- of PacketStreamInstance module LOG Time: January 26, 1981 10:03 AM By: Garlick Action: 1.) added classOfService parameter to reduce buffers in interactive case 2.) added interpretation of Error Protocol packets 3.) increased inactivity timeouts for internet cases 4.) initial data operation timeout set to infinite 5.) acks no longer stimulated by Gets, rather in receiver 6.) send an alocation packet if just opening the window 7.) reinstated slow media adaptive algorithm 8.) enabled Process.Aborting for Gets, Puts, WaitAttn.(635)\23386v14V :HPacketStreamInstance.mesa - implementation module for an instance of a SPP packet stream Copyright c 1985 by Xerox Corporation. All rights reserved. BLyon on: March 18, 1981 6:18 PM Levin, August 9, 1983 9:14 am Time units have been converted from MilliSeconds to Pulses (as defined by ProcessorFace.GetClockPulses); However, all interface still are in units of MilliSeconds (like the timeout passed to the MONITOR). GetClock now returns Pulses. Variable names ending in 'Time' will be in milliseconds (hopefully only initial values), while other variables will be in Pulses (some will end in 'Pulses'). EXPORTED TYPE(S) client interface connection control parameters connection state socket interface data this is the buffer pool from which we get buffers sequencing, duplicate suppression and flow control information connection control input attention control table to suppress attentions that the client has seen, but we haven't acked. we permit only a small number of pending attns, the rest are thrown away! input/output pointers and queues variables and parameters for this packet stream time any packet was rcv'd for this connection, a Pulse maximum time (in Pulses) any blocking procedure should block cumulative time (in Pulses) that delayCount packets spent on retransmit queue a Pulses, last time we updated retransmit time process handles and specific parameters for controlling them temporary stats constants for bulk transfer performance 10 packet buffers at the socket. 3 are for sending, 3 for receiving. 1 for sending a system packet and 1 for receiving a system packet 1 for receiving an attention packet 1 for extra receive packet constants for transactional transfer performance 7 packet buffers at the socket. 2 are for sending, 2 for receiving. 1 for sending a system packet and 1 for receiving a system packet 1 for extra receive packet for attention or whatever some day these will become variables whose value will be determined heuristically using some kind of adaptive algorithm 10 secs (keep < 1 min to assure that delaySum doesn't overflow) the following four constants are weights used in UpdateRetransmitTime We are using an adaptive scheme for deciding when packets must be retransmitted. We assume that dataPacketRetransmitPulses must elapse since the last time ackRequestPulse was updated before retransmitting everything on the sentQueue. ackRequestPulse is updated whenever a data packet with the sendAck bit set is transmitted or retransmitted, or when a packet is received that updates the unackedOutputSeq field. We transmit a probe whenever no packet has been received for 5 sec. The connection can become suspended if we send 8 probes without a response (probes being generated because of allocation requirements or the 5 sec threshold), or if we retransmit a packet too many times. This stuff is very tricky and need careful thought for the next Pilot. various Glitches generated by this module Hot Procedures This procedure gives a Sequenced Packet Protocol packet to be transmitted over the specified packet stream. This packet resides in a buffer, and some of the fields of the packet must be filled in by the caller of this procedure. These fields are oisPktLength, the attention flag, and subtype. This buffer will be returned to the free queue once the packet has been acknowledged. SIGNALs generated by this procedure leave the clean up responsibility for the buffer to the caller. we should ask for an acknowledgement to be returned, if on sending this packet there is space only for minAllocationBeforeRequestingAck, or this is an attention. We should do this only if, our client hasn't already set the bit. This procedure gets the next sequenced, duplicate-suppressed packet from the packet stream. This procedure hangs till a packet is available. The client is expected to return the buffer to the free queue. If the wait times expires then NIL will be returned, and so the client MUST test for this case. removed code to return an ack here IF sendAnAck AND BufferDefs.QueueLength[inOrderQueue] = emptyInOrderQueue THEN BEGIN sendAnAck _ FALSE; set the global switch to false returnAnAck _ TRUE; we must send back the ack END; we now ack when we receive the packet IF returnAnAck THEN SendSystemPacket[FALSE]; This procedure waits until a packet with the attention bit arrives, and returns an advance copy to the client, who must return it to the buffer pool. The client will not get duplicates, though it may see the advance copies of the attention packet after it has seen the real copy if it is sluggish. This procedure returns a processed spp data buffer to its packet stream socket's pool. It also increases the allocation window This procedure causes the transmission of a system packet, i.e. one that does not consume any sequence number. The subtype of this packet is irrelavent and so is made zero. The procedure takes an argument indicating whether the send acknowledgement flag should be set or not in the outgoing packet. b.requeueProcedure should ony be set and inspected by level1 and network drivers, This procedure takes a buffer and prepares it for transmission by filling in the appropriate fields in the buffer and updating the packet stream state variables. b.requeueProcedure should ony be set and inspected by level1 and network drivers b.length filled by router b.network filled by router b.iocbChain filled by network driver b.userPtr not used currently b.userData not used currently b.userDataLength not used currently b.status used by the socket and router code b.time used to determine round-trip delay. Perhaps someday individual retransmit timeout. Filled when put on retransmit queue rest used by queue package, drivers and dispatcher encapsulation filled by router b.checksum filled by router b.ois.pktLength filled by caller of this routine b.transportControl filled by router b.ois.source filled by socket interface b.ois.systemPacket filled by caller of this routine b.ois.sendAck filled by acknowledgement strategy by caller b.ois.attention filled by caller of this routine b.ois.unusedType filled by caller of this routine b.ois.subtype filled by caller of this routine b.ois.endOfMessage filled by caller of this routine This procedure puts a packet out on the socket channel, where the packet is asynchronously sent. On completion the dispatcher enqueues the packet onto the sentQueue for possible retransmission only if the packet is not a system packet. The packet stream monitor should not be locked when we call this. remember time for retransmitter we are about to be deleted check for failures (this may be the first packet, so look for connection failures) ok or not expected enqueue onto sentQueue since this is not a system packet record time if ackReq packet and first time put there (used for delay calculation) This procedure changes the state of the packet stream to be suspended. This procedure changes the state of the packet stream to be suspended. This process retransmits packets that have not been acknowledged in a reasonable time, and in addition generates probes etc. to test for the connection's livenesss. The Retransmitter, Receiver and the transmission process all three try to keep the sentBuffer and sentQueue ordered by sequence number. some procedures whose scope is just the Retransmitter probe for allocation, or just an ack from other end, or for activity. we must retransmit all packets currently on the sentQueue and sentBuffer. a count is kept because on retransmission we put packets back on sentQueue and otherwise we will be in an infinite loop. need to set ack request we have had some packets with lossless delay (no retransmissions) new retransmit timeout is a function of the old retransmit timeout and the newly measured delay. If the delay is less than about 500 msecs, the standard deviation of the measured delay is large compared to transmission time; otherwise transmission time dominates. So for fast mediums, we multiply the delay by a large factor. For slow mediums, we go with the measured delay. The following lines are in flux. dataPacketRetransmitPulses _ [(dataPacketRetransmitPulses + wNewAbove*avgDelay)/2]; if data flowed, all had retranmissions and we should increase timeout set new retransmitter wakeup main body of the procedure checks the value of pleaseStop, race condition OK this is an upper bound state may change for any reason, but we keep sending them out, unless we ourselves changed the state to be suspended! The Receiver may take packets off the sentQueue in parallel in which case we will get a NIL and therefore exit. packet has been ACKed its time to send this packet out again This process waits at the socket channel for a packet to arrive. This packet is in a buffer that belongs to this socket and should be returned to the pool associated with the socket channel. race condition not harmful as we account for it now wait for something to arrive. pleaseStop got set by someone else discard, don't want any other protocol type This procedure examines the sequenced packet that has just arrived. we do not return an ack unless we are required to check that packet is from right remote address and connection ID for inactivity et al just the packet we wanted, or packet is early old duplicate packet very old duplicate packet packet is really not for this connection (or other end not established) delayed acking has been abandoned; only allocate is dependent on client this is delicate and is done to avoid getting stuck in a small window IF returnAnAck AND BufferDefs.QueueLength[inOrderQueue] # emptyInOrderQueue THEN BEGIN returnAnAck _ FALSE; we don't have to send back an ack sendAnAck _ TRUE; make another process send back an ack END; This wait lets the user process run and suck up all the packets on the inOrderInputQueue so that we send back a full allocate. This procedure processes both the next expected packet as well as early packets. Both kinds of packets are processed similarly as far as acks and allocation, and attentions are concerned. update allocation and ack fields of packetstream. now remove acked packets, if any, from sentQueue. We assume that the sentQueue is kept ordered by increasing sequence number. add time spent on queue to delay stats so far we have only updated state information, now to dispense with the packet this is not a system packet process it intelligently if the attention bit is set then we try to process the attention, and only if we were able to do so do we put the packet on the inOrderQueue. only now must we decide if it is in order or out of order just the packet we wanted examine the OutOfOrderQueue to see if anything can be taken off currently there is never anything on the OutOfOrderQueue. this is the next in order packet, pick it up in outer loop still out of order old duplicate, throw away put buffers that are on tempQueue back onto outOfOrderQueue else, loop to process packet from outOfOrderQueue that is now next early packet send acks only if the other end asked main body of this sprocedure This procedure attempts to processes the attention packet, and returns whether it was successful or not. Success implies that an entry for the attn packet already exists in the attnSeqTable, or has just been made. This packet is now a candidate for the inOrderQueue or the outOfOrderQueue. If an entry can not be made because of space restrictions we pretend we never ever saw this packet and let it be retransmitted from the source. The attnSeqTable is reasonablly big, and so this decision should not cause adverse effects. This procedure is only called if the packet was within the accept window. insert the b.ois.sequenceNumber in the attnSeqTable if not already there make a copy and then enqueue it This procedure updates the attnSeqTable when the nextInputSeq is updated. That is to say, we are removing any entries in the attnSeqTable that we plan to ack, the next time we send out a packet. This procedure examines an error protocol packet. Cool Procedures This procedure sets the wait time. This procedure returns the local and remote address of this packet stream. This procedure returns the number of data bytes that can fit into an sppBuffer. GetSenderSizeLimit This procedure atempts to establish the local end of the connection to the incoming packet. If state=activeEstablish then master/slave response when sockets are completely specified. If state=waitEstablish then other end initiates and knows all about us. if the attention bit is set then we try to process the attention, and only if we were able to do so do we put the packet on the inOrderQueue. remove knowledge of old attns If state=activeEstablish then simultaneous establishment when sockets are completely specified. If state=waitEstablish then other end initiates and knows all about us except our connection ID. If state=activeEstablish then response from well known socket at which there is a server process if the attention bit is set then we try to process the attention, and only if we were able to do so do we put the packet on the inOrderQueue. remove knowledge of old attns mismatched IDs This procedure causes the active establishement of the connection. A system packet is transmitted to the remote end, and then this procedure casues the caller to hang for a certain maximum time in which it is hoped that the connection has become established owing to the receipt of an appropriate packet from the remote end. waits for 2 seconds to see if the connection is established This procedure waits for the remote end to initiate connection establishement. waits for 2 seconds each time around This procedure destroys the packet stream. The procedure changes the state of the packet stream, NOTIFYs the Retransmitter and Receiver to self destruct, and then waits until this happens. When this condition is satisfied, the socket channel is deleted and, the packet stream data structures cleaned up. waiting for asynchronous puts to complete and be requeue appropriately we must be carefull about multiplication overflow since milliSeconds must be converted to microSeconds initialization (Cool) initialize connection control parameters This initializes the necessary data structures for a packet stream instance. It fills in all the necessary parameters and creates two processes that look after the well being of this packet stream. One is a Retransmitter, that periodically checks the sentQueue to see if there is anything unacknowledged, and the other is the Receiver, which sees if there are any incoming packets on the corresponding socket. msecs converted to Pulses This baroque code is to ensure that the state is not looked at by any of the other processes until this initialization code has finished examining this variable without entering the monitor. Retransmissions of this gratuitous packet will ocurr in the nature of probes. Ê(ÿ˜codešœX™XKšœ Ïmœ1™˜PK˜3šžœ˜ K˜GK˜—šž˜K˜WKšœ1ž˜6—Kšžœ ˜Kšžœžœ žœ"˜>K˜Kš œžœžœžœžœ˜%K˜Kšœ™Kšœžœžœ˜7K˜Kšœ™˜ šœžœI˜MKšœ-Ïc˜HK˜——Kšœ™Kšœ™K˜K˜)K˜&K˜*Kšœž œ˜#Kšœ™K˜K˜$Kšœ1™1Kšœ>™>Kšœžœ˜$Kšœ/žœ˜8Kšœž œ˜Kšœ žœ˜Kšœ™Kšœžœ˜Kšœ™Kšœž œ˜KšœL™LKšœI™IKšœžœžœžœ˜5Kšœ žœ˜Kšœ"Ÿ!˜CKšœ ™ Kšœ!žœ˜=Kšœ žœ˜Kšžœ˜——Kšœ žœžœžœ#˜AKšœžœžœ˜CKšžœ˜—Kšžœ˜Kšžœ˜——KšžœŸ˜K˜KšœŸ(˜>Kšžœžœ˜8šžœž˜K˜J—K˜KšžœŸ˜K˜—KšœL™LKšœO™OKšœP™PKšœ@™@K˜š  œž œžœ˜9Kšž˜Kšœ žœžœ˜K˜š œžœž œž˜*Kšž˜Kšžœžœžœ˜Kšœžœžœ˜K˜,šž˜šžœž˜˜0Kšžœžœ˜7—˜Kšž˜šžœ"žœž˜,Kšž˜Kšœ"™"šœ7™7šœ™Kšœ™Kšœ2™2Kšœ.™.K˜Kšœ™——šžœž˜Kšž˜K˜"˜ K˜K˜.—Kšžœ˜—Kšž˜—šž˜Kšž˜K˜%Kšžœ"žœžœ˜.KšžœŸ˜9KšžœŸ9˜?Kšžœ˜—Kšžœ˜—Kšœ žœžœžœ#˜AKšœžœžœ˜CKšžœ˜—Kšžœ˜Kšžœ˜—KšžœŸ˜K˜—K˜KšœR™RKšžœŸ ˜K˜—KšœO™OKšœV™VKšœI™IKšœ9™9K˜š œžœž œžœ˜EKšž˜Kšžœžœžœ˜šž˜šžœž˜˜0Kšžœžœ˜7—˜Kšž˜Kšžœ!žœžœž˜0šž˜Kšž˜KšžœŸ˜1KšžœŸ9˜?Kšžœ˜—Kšžœ˜—Kšœ žœžœžœ#˜AKšœžœžœ˜CKšžœ˜—Kšžœ˜Kšžœ˜—KšžœŸ˜K˜—Kšœ™K˜š œž œ˜=Kšž˜K˜š  œžœž œžœžœž˜?Kšœžœžœ1žœ˜PK˜—Kšžœ˜KšžœžœžœŸ˜UKšžœ˜K˜—KšœQ™QKšœP™PKšœG™GKšœA™AK˜š œž œ žœ˜0Kšž˜K˜K˜š œžœž œž˜0KšžœžœŸ˜?K˜Kšžœžœ,žœž˜AKšž˜KšœQ™QK˜-Kšœžœ˜K˜Kšœžœ˜K˜K˜K˜K˜Kšžœ˜—KšžœŸ˜K˜—KšœP™PKšœP™PK˜š œžœž œž˜MKšž˜KšœP™PKšœ™Kšœ™Kšœ$™$Kšœ™Kšœ™Kšœ#™#Kšœ+™+Kšœ™K˜ Kšœ2™2Kšœ™Kšœ™Kšœ0™0Kšœ#™#K˜5K˜Kšœ'™'Kšœ3™3Kšœ:™:Kšœ0™0Kšœ1™1Kšœ.™.Kšœ3™3K˜-K˜3K˜%K˜'K˜%Kšžœžœžœ"˜@šž˜Kšž˜Kšžœžœ˜1Kšžœžœ!˜:Kšžœ˜—Kšžœžœžœ˜JKšžœžœžœ˜KKšžœŸ˜K˜—KšœK™KKšœK™KKšœT™TKšœA™AK˜š œž œ˜?Kšž˜šžœž˜Kšž˜Kšœžœ(˜EKšœ™Kšžœžœ2˜GKšžœ˜—˜K˜ ˜Kšœ™Kšžœžœžœ˜,——KšžœŸ˜ K˜—š %œžœž œ˜RKšž˜K˜š œžœž œ˜9Kšž˜K˜$KšœŸ%˜6K˜KšžœŸ˜K˜—KšœR™Ršžœžœž˜*Kšœ5žœ˜:Kšœ™K˜K˜JK˜KK˜1K˜-K˜3K˜7K˜>Kšžœžœ˜—Kšœ8™8K˜Kšžœ žœžœ)˜NKšœR™RK˜KšžœŸ(˜-K˜—KšœF™FK˜š  œžœž œ˜8Kšž˜K˜K˜K˜KšžœŸ˜K˜—KšœF™FK˜š œžœž œ˜;KšžœžœŸ˜5K˜—KšœP™PKšœS™SKšœR™RKšœ4™4K˜š  œž œ˜Kšž˜Kšœžœžœ˜K˜$K˜Kšœ5™5K˜š  œžœž œžœ žœž˜EKšž˜Kšœ žœ˜šžœžœž˜+Kšž˜šž˜šœžœžœž˜9Kšœ1ž˜3—šœ>ž˜AKšœ5ž˜9—Kšž˜KšœE™EK˜šžœ8ž˜>K˜"—Kš žœžœ žœžœžœžœ˜SKšžœ˜—Kšžœ˜—KšžœŸ˜K˜—š  œžœž œžœžœž˜@Kšž˜šžœžœž˜+Kšž˜šžœ7ž˜=Kš žœžœžœžœžœ˜>KšœI™IKšœ@™@Kšœ7™7K˜—KšžœžœŸ˜-K˜Kšž˜—KšžœžœŸ.˜>K˜KšžœŸ˜K˜—š  œžœž œžœž˜VKšž˜Kšžœžœžœ˜2Kšžœžœžœžœ˜1Kšžœžœžœžœ˜šžœ ž˜˜!KšžœŸ9˜?K˜#KšœŸ ˜0Kšœžœ˜Kšžœ˜—Kšœ4žœ˜9Kšœ™K˜Kšžœ˜—KšžœŸ˜"K˜—š  œžœž œž˜'KšžœžœžœŸ˜:K˜—š œž œž˜(Kšž˜K˜Kšœžœ˜šžœ?ž˜EKšžœŸ2˜8šžœž˜Kšž˜KšœA™AK˜!Kšœ1Ÿ ˜;KšœJ™JKšœN™NKšœG™GKšœL™LKšœJ™JKšœ ™ Kšœžœ%žœžœ ˜N˜K˜6K˜(—šœ™Kšœ6™6—K˜K˜Kšž˜—šž˜Kšž˜KšœE™Ešžœ.ž˜4Kšž˜Kšœ1Ÿ ˜;K˜KšœH™HšžœŸ#˜4Kšž˜šžœžœž˜šžœ(ž˜.Kšžœžœžœžœ˜%—Kšžœ˜——šžœžœžœŸ3˜LKšž˜šžœ;žœž˜EKšžœžœŸ ˜—K˜'K˜Kšœ™˜GK˜$—K˜"Kšžœ˜Kšžœ˜—Kšžœžœ"˜;Kšžœžœ˜ KšžœŸ˜ K˜—KšœI™IKšœL™LKšœ+™+K˜š œžœž œ˜(Kšž˜Kšœžœ˜šž˜Kšžœžœžœ˜šžœžœ#žœž˜?Kšž˜K˜*K˜K˜Kšž˜—Kšžœ ˜Kšžœ˜—KšžœŸ˜K˜—Kšœ1™1K˜š œžœž œ˜;Kšž˜Kšžœžœžœ˜Kšžœžœ$˜=šžœž˜˜šžœž˜˜0K˜$K˜%—K˜=Kšžœžœ˜——Kšžœžœ˜—Kšžœ˜KšžœŸ˜K˜—Kšœ™K˜Kšœ"™"K˜š  œžœž œ˜:Kšžœ*žœŸ˜BK˜—KšœJ™JK˜š  œžœž œžœ"˜HKšžœ)žœŸ˜CK˜—KšœO™OK˜š  œžœž œžœžœ˜8Kšžœžœ0žœ˜@—Kšœ™K˜K˜K˜KšœC™CKšœ™K˜š œžœž œ˜EKšžœ!žœ˜2Kšž˜Kšœžœ˜Kšœžœ˜KšœO™OKšœS™Sšžœ%ž˜*Kšœ(žœ ˜KKšœžœ*ž˜BKšœž˜Kšž˜K˜.K˜&K˜Kšžœžœžœ˜)šžœžœž˜Kšž˜KšœJ™JKšœB™Bšžœžœžœž˜MKšž˜K˜ K˜Kšœžœ˜Kšžœžœ˜+Kšœ™K˜ Kšžœ˜Kšžœ˜—Kšž˜—Kšžœžœžœ%˜CKšžœ˜Kšž˜KšœI™IKšœP™PKšœ&™&K˜—šž˜šžœ%ž˜*Kšœ(žœ ˜KKšœžœ*ž˜>Kšœž˜Kšž˜K˜.K˜&Kšžœžœžœ˜1K˜Kšžœžœžœ˜)Kšžœžœžœ%˜UKšžœ˜Kšž˜KšœF™FKšœ™K˜—šž˜Kšžœžœ$˜B—šžœ(˜+Kšžœ3ž˜9Kšœ)žœž˜JKšž˜K˜(K˜.K˜&K˜Kšžœžœžœ˜)šžœžœž˜Kšž˜KšœJ™JKšœB™Bšžœžœžœž˜MKšž˜K˜ K˜Kšœžœ˜Kšžœžœ˜+Kšœ™K˜ Kšžœ˜Kšžœ˜—Kšž˜—Kšžœžœžœ%˜CKšžœ˜Kšž˜Kšœ™K˜Kšžœžœžœ$˜B——Kšžœ žœžœ#˜LKšžœŸ˜K˜—KšœS™SKšœS™SKšœN™NKšœN™NK˜š œž œ˜Kšž˜Kšœžœžœ˜(K˜š  œžœž œžœžœž˜HKšž˜Kšžœžœžœ˜Kšžœ˜Kš žœžœžœžœžœ˜8šž˜Kšž˜Kš žœžœžœžœžœ˜HK˜.šžœ+ž˜1KšžœžœIžœ˜Y—Kšžœžœ˜Kšžœ˜—KšžœŸ˜$K˜—K˜,šž˜Kšœžœ˜Kšœ;™;šžœ Ÿ%œž˜LKšžœ˜—Kšžœ˜—KšžœŸ˜K˜—KšœN™NK˜š œžœž œ˜'Kšž˜Kšžœžœžœ˜Kšœžœžœ˜(K˜,šž˜Kšœ$™$Kšžœ˜Kšžœžœžœž˜2šž˜Kšž˜K˜.šžœ+ž˜1KšžœžœIžœ˜Y—Kšžœ˜—Kšžœ˜—KšžœŸ˜K˜—KšœO™OKšœK™KKšœP™PKšœE™EK˜š œžœž œ˜Kšž˜K˜K˜Kšžœ˜Kšžœ˜K˜K˜K˜KšžœŸ ˜K˜—š œžœž œž˜(Kšž˜K˜Kšœ žœ˜Kšžœ˜ KšžœŸ˜K˜—š  œžœž œž˜'Kšžœžœ˜Kšžœžœžœ˜Kšžœžœžœžœ!˜?K˜&K˜%K˜)K˜#šžœGž˜NKšœF™FKšžœŸ>˜RKšžœ˜—K˜#KšžœŸ˜K˜—š œž œžœžœ˜3Kšžœ˜Kšž˜KšœL™LKšœ™Kš œžœžœžœžœžœžœ˜?Kšœ žœžœ7˜NKš œ žœžœžœžœ˜5KšžœŸ˜ K˜K˜—Kšœ™Kšœ(™(K˜KšœL™LKšœL™LKšœV™VKšœN™NKšœD™DKšœ™K˜K˜K˜K˜&K˜HK˜4K˜9šžœž˜˜KšœGžœ˜MK˜%—˜˜NKšžœ˜—K˜)—Kšžœžœ˜—˜.K˜6—K˜1Kšžœ$žœ˜?K˜>K˜*Kšœ žœ˜K˜,K˜=K˜)Kšžœ žœžœžœ˜NK˜Kšœ žœ˜K˜)K˜(K˜,K˜&K˜&K˜EK˜1˜>K˜9—K˜˜2Kšœ"Ÿ˜>—˜-K˜6—Kšœ™K˜K˜Kšœ1Ÿ ˜;Kšœ,Ÿ˜HKšœ žœ˜Kšœ>Ÿ<˜z˜K˜J—KšœR™RKšœU™UKšœ™šžœž˜Kšž˜šžœ ž˜Kšž˜K˜Kšœžœ˜)Kšœžœ ˜K˜K˜Kšž˜—šž˜Kšž˜K˜Kšœžœ˜)Kšœžœ ˜K˜K˜Kšžœ˜—Kšž˜—šž˜Kšž˜KšœžœŸ4˜MKšœM™MKšœžœ˜)Kšœžœ ˜K˜Kšžœ˜—KšœBŸ,˜nKšžœ&˜,KšžœŸ!˜'K˜—Kšžœ˜K˜Kšœžœá˜K˜—…—vÞÚ%