-- PacketStreamInstance.mesa (last edited by: BLyon on: March 18, 1981 6:18 PM) -- Function: The implementation module for an instance of a SPP packet stream. DIRECTORY BufferDefs USING [ BufferAccessHandle, OisBuffer, QueueObject, QueueLength, QueueCleanup, QueueInitialize], ByteBlt USING [ByteBlt], CommFlags USING [doDebug, doStats], CommUtilDefs USING [EnableAborts], DriverDefs USING [Glitch], Environment USING [Block], Inline USING [LowHalf], OISCP USING [ DequeueSpp, EnqueueSpp, MaybeGetFreeSendOisBufferFromPool, MaybeGetFreeReceiveOisBufferFromPool, ReturnFreeOisBuffer, unknownConnID], OISCPTypes USING [ConnectionID, maxBytesPerPkt, WaitTime], PacketStream: FROM "PacketStream", Process USING [MsecToTicks, SetTimeout, Ticks, Yield], Router USING [XmitStatus], Runtime USING [SelfDestruct], Socket USING [ Abort, ChannelAborted, ChannelHandle, Create, Delete, GetPacket, GetStatus, PutPacket, TimeOut], SocketInternal USING [GetBufferPool, ChannelHandleToSocketHandle], SpecialSystem USING [NetworkAddress], StatsDefs USING [StatBump, StatIncr], System USING [ GetClockPulses, MicrosecondsToPulses, Pulses, PulsesToMicroseconds]; -- Time units have been converted from MilliSeconds to Pulses; However, all interface -- still are in units of MilliSeconds (like the timeout passed to the MONITOR). -- GetClock now returns Pulses. Variable names ending in 'Time' will be in milliseconds -- (hopefully only initial values), while other variables will be in Pulses (some will end in -- 'Pulses'). PacketStreamInstance: MONITOR [ localAddr, remoteAddr: SpecialSystem.NetworkAddress, localConnectionID, remoteConnectionID: OISCPTypes.ConnectionID, establish: BOOLEAN, timeout: OISCPTypes.WaitTime _ PacketStream.defaultWaitTime, classOfService: PacketStream.ClassOfService _ bulk] RETURNS [ PacketStream.Handle, SpecialSystem.NetworkAddress, OISCPTypes.ConnectionID] IMPORTS BufferDefs, ByteBlt, CommUtilDefs, DriverDefs, Inline, Process, Runtime, Socket, SocketInternal, StatsDefs, System, PacketStream, OISCP EXPORTS System SHARES BufferDefs = BEGIN OPEN DriverDefs, OISCP, Router, PacketStream, StatsDefs; -- EXPORTED TYPE(S) NetworkAddress: PUBLIC TYPE = SpecialSystem.NetworkAddress; -- client interface ps: Object _ [NIL, TakeFromUser, GetForUser, WaitForAttention, SetWaitTime, FindAddresses, GetSenderSizeLimit, ReturnGetSppDataBuffer]; -- pool (NIL) initted later -- connection control parameters -- connection state state: PacketStream.State; whySuspended: PacketStream.SuspendReason; whyFailed: PacketStream.FailureReason; stateBeforeSuspension: PacketStream.State; connectionIsEstablished: CONDITION; -- socket interface data socketCH: Socket.ChannelHandle; pool: BufferDefs.BufferAccessHandle; -- this is the buffer pool from which we get buffers -- sequencing, duplicate suppression and flow control information nextInputSeq, maxInputSeq: CARDINAL; unackedOutputSeq, nextOutputSeq, maxOutputSeq: CARDINAL; newAllocation: CONDITION; sendAnAck: BOOLEAN; -- connection control maxOisPktLength: CARDINAL; -- input attention control newInputAttn: CONDITION; -- table to suppress attentions that the client has seen, but we haven't acked. -- we permit only a small number of pending attns, the rest are thrown away! attnSeqTable: ARRAY (0..maxPendingAttns] OF CARDINAL; attnCount: CARDINAL; sentBuffer: BufferDefs.OisBuffer; -- hold the head of the sentQueue -- input/output pointers and queues inOrderQueue, inAttnQueue, outOfOrderQueue, tempQueue, sentQueue: BufferDefs.QueueObject; inOrderQueueNotEmpty: CONDITION; -- variables and parameters for this packet stream ackRequestPulse: LONG CARDINAL; -- the time an ack was requested, a Pulse lastPacketReceivedPulse: LONG CARDINAL; -- time any packet was rcv'd for this connection, a Pulse lastProbePulse: LONG CARDINAL; -- retransmitter's last probe sent time, a Pulse probeCounter: CARDINAL; -- retransmitter's allocation probe counter probeRetransmitPulses: LONG CARDINAL; -- in Pulses dataPacketRetransmitPulses: System.Pulses; -- in Pulses waitPulses: LONG CARDINAL; -- maximum time (in Pulses) any blocking procedure should block delayCount: CARDINAL; -- number of data packets used in delay calculation delaySum: System.Pulses; -- cumulative time (in Pulses) that delayCount packets spent on retransmit queue lastDelayCalculationPulse: LONG CARDINAL; -- a Pulses, last time we updated retransmit time seqNumWhenDelayCalculated: CARDINAL; -- process handles and specific parameters for controlling them pleaseStop: BOOLEAN; letClientRun: CONDITION; -- this is also used in DeleteCleanup - do not ENABLE ABORTs retransmitterFork, receiverFork: PROCESS; retransmitterWakeupTimer: CONDITION; -- temporary stats normalRetransUpdates: CARDINAL; -- **temp doubleRetransUpdates: CARDINAL; -- **temp -- constants for bulk transfer performance -- 10 packet buffers at the socket. -- 3 are for sending, 3 for receiving. -- 1 for sending a system packet and 1 for receiving a system packet -- 1 for receiving an attention packet -- 1 for extra receive packet bulkSendBufs: CARDINAL = 4; bulkReceiveBufs: CARDINAL = 6; bulkInitialAllocation: CARDINAL = 3; -- constants for transactional transfer performance -- 7 packet buffers at the socket. -- 2 are for sending, 2 for receiving. -- 1 for sending a system packet and 1 for receiving a system packet -- 1 for extra receive packet for attention or whatever transactSendBufs: CARDINAL = 3; transactReceiveBufs: CARDINAL = 4; transactInitialAllocation: CARDINAL = 2; defaultSendAllocation: CARDINAL = 1; -- more will screw simple guys like 860 duplicateWindow: INTEGER = 100; maxPendingAttns: CARDINAL = 5; -- some day these will become variables whose value will be determined heuristically -- using some kind of adaptive algorithm retransmissionsBeforeGiveUp: CARDINAL = 30; retransmissionsBeforeAskForAck: CARDINAL = 2; probesBeforeGiveUp: CARDINAL = 8; -- therefore 40 secs or inactivity emptyInOrderQueue: INTEGER = 0; minSendAllocationBeforeRequestingAck: INTEGER = 0; minRcvAllocation: INTEGER = 0; initialDataPacketRetransmitTime: CARDINAL = 2000; -- msecs maxDataPacketRetransmitTime: CARDINAL = 20000; -- msecs maxDataPacketRetransmitPulses: System.Pulses _ MilliSecondsToPulses[ maxDataPacketRetransmitTime]; initialRetransmitterTime: CARDINAL = 250; -- msecs probeMultiplier: CARDINAL = 4; -- probe time is this multiple of retrans timeout delayCalculationTime: CARDINAL = 10000; -- 10 secs (keep < 1 min to assure that delaySum doesn't overflow) delayCalculationPulses: LONG CARDINAL _ MilliSecondsToPulses[ delayCalculationTime]; inactiveConnectionTime: CARDINAL = 5000; -- msecs inactiveConnectionPulses: LONG CARDINAL _ MilliSecondsToPulses[ inactiveConnectionTime]; fortyMsecsOfPulses: LONG CARDINAL _ MilliSecondsToPulses[40]; fiveHundredMsecsOfPulses: LONG CARDINAL _ MilliSecondsToPulses[500]; -- the following four constants are weights used in UpdateRetransmitTime wOldAbove: CARDINAL _ 1; wOldBelow: CARDINAL _ 2; wNewAbove: CARDINAL _ 50; wNewBelow: CARDINAL _ 2; -- We are using an adaptive scheme for deciding when packets must be retransmitted. -- We assume that dataPacketRetransmitPulses must elapse since the last time -- ackRequestPulse was updated before retransmitting everything on the sentQueue. -- ackRequestPulse is updated whenever a data packet with the sendAck bit set is -- transmitted or retransmitted, or when a packet is received that updates the -- unackedOutputSeq field. We transmit -- a probe whenever no packet has been received for 5 sec. The connection -- can become suspended if we send 8 probes without a response (probes being -- generated because of allocation requirements or the 5 sec threshold), or if we -- retransmit a packet too many times. This stuff is very tricky and need careful -- thought for the next Pilot. -- various Glitches generated by this module StreamNotEstablished: ERROR = CODE; StreamTerminating: ERROR = CODE; -- Hot Procedures -- This procedure gives a Sequenced Packet Protocol packet to be transmitted over -- the specified packet stream. This packet resides in a buffer, and some of the fields -- of the packet must be filled in by the caller of this procedure. These fields are -- oisPktLength, the attention flag, and subtype. This buffer will be returned to the -- free queue once the packet has been acknowledged. SIGNALs generated by this -- procedure leave the clean up responsibility for the buffer to the caller. TakeFromUser: PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN TakeFromUserLocked: ENTRY PROCEDURE = INLINE BEGIN ENABLE UNWIND => NULL; DO SELECT state FROM unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Glitch[StreamNotEstablished]; established, open => IF LOOPHOLE[nextOutputSeq - maxOutputSeq, INTEGER] <= 0 THEN BEGIN b.ois.systemPacket _ FALSE; -- we should ask for an acknowledgement to be returned, if on sending this -- packet there is space only for minAllocationBeforeRequestingAck, or this is -- an attention. We should do this only if, our client hasn't already set the bit. IF NOT b.ois.sendAck THEN b.ois.sendAck _ b.ois.attention OR LOOPHOLE[maxOutputSeq - nextOutputSeq, INTEGER] = minSendAllocationBeforeRequestingAck; b.ois.unusedType _ 0; PrepareSequencedPacket[b]; END ELSE BEGIN WAIT newAllocation; -- let ABORTED ERROR propogate LOOP; -- we start from the beginning in case the state changed END; suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended]; terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating]; ENDCASE; EXIT; ENDLOOP; END; -- TakeFromUserLocked TakeFromUserLocked[]; -- either returns or generates a SIGNAL. IF CommFlags.doStats THEN StatIncr[statDataPacketsSent]; IF CommFlags.doStats THEN StatBump[statDataBytesSent, b.ois.pktLength - bytesPerSequencedPktHeader]; PutPacketOnSocketChannel[b]; END; -- TakeFromUser -- This procedure gets the next sequenced, duplicate-suppressed packet from the -- packet stream. This procedure hangs till a packet is available. The client is -- expected to return the buffer to the free queue. If the wait times expires then -- NIL will be returned, and so the client MUST test for this case. GetForUser: PROCEDURE RETURNS [b: BufferDefs.OisBuffer] = BEGIN returnAnAck: BOOLEAN _ FALSE; GetForUserLocked: ENTRY PROCEDURE = INLINE BEGIN ENABLE UNWIND => NULL; startPulse, now: LONG CARDINAL; startPulse _ System.GetClockPulses[]; DO SELECT state FROM unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Glitch[StreamNotEstablished]; established, open => BEGIN IF (b _ DequeueSpp[@inOrderQueue]) # NIL THEN BEGIN -- removed code to return an ack here -- IF sendAnAck AND BufferDefs.QueueLength[@inOrderQueue] = -- emptyInOrderQueue THEN -- BEGIN -- sendAnAck _ FALSE; set the global switch to false -- returnAnAck _ TRUE; we must send back the ack -- END; IF CommFlags.doStats THEN BEGIN StatIncr[statDataPacketsReceived]; StatBump[ statDataBytesReceived, b.ois.pktLength - bytesPerSequencedPktHeader]; END; END ELSE BEGIN now _ System.GetClockPulses[]; IF (now - startPulse) >= waitPulses THEN EXIT; WAIT inOrderQueueNotEmpty; -- let ABORTED ERROR propogate LOOP; -- we start from the beginning in case the state changed END; END; suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended]; terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating]; ENDCASE; EXIT; ENDLOOP; END; -- GetForUserLocked GetForUserLocked[]; -- we now ack when we receive the packet IF returnAnAck THEN SendSystemPacket[FALSE]; END; -- GetForUser -- This procedure waits until a packet with the attention bit arrives, and returns -- an advance copy to the client, who must return it to the buffer pool. The client will -- not get duplicates, though it may see the advance copies of the attention -- packet after it has seen the real copy if it is sluggish. WaitForAttention: ENTRY PROCEDURE RETURNS [b: BufferDefs.OisBuffer] = BEGIN ENABLE UNWIND => NULL; DO SELECT state FROM unestablished, activeEstablish, waitEstablish => IF CommFlags.doDebug THEN Glitch[StreamNotEstablished]; established, open => BEGIN IF (b _ DequeueSpp[@inAttnQueue]) # NIL THEN EXIT ELSE BEGIN WAIT newInputAttn; -- let ABORTED ERROR propogate LOOP; -- we start from the beginning in case the state changed END; END; suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended]; terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating]; ENDCASE; EXIT; ENDLOOP; END; -- WaitForAttention -- This procedure returns a processed spp data buffer to its packet stream socket's pool. It also increases the allocation window ReturnGetSppDataBuffer: PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN JustOpenedTheWindow: ENTRY PROCEDURE RETURNS [BOOLEAN] = INLINE {RETURN[LOOPHOLE[(maxInputSeq _ maxInputSeq + 1) - nextInputSeq, INTEGER] = 1]}; OISCP.ReturnFreeOisBuffer[b]; IF JustOpenedTheWindow[] THEN SendSystemPacket[FALSE]; -- just opened a closed window END; -- This procedure causes the transmission of a system packet, i.e. one that does not -- consume any sequence number. The subtype of this packet is irrelavent and so is -- made zero. The procedure takes an argument indicating whether the send -- acknowledgement flag should be set or not in the outgoing packet. SendSystemPacket: PROCEDURE [sendAck: BOOLEAN] = BEGIN b: BufferDefs.OisBuffer; SendSystemPacketLocked: ENTRY PROCEDURE = INLINE BEGIN PrepareSequencedPacket[b]; END; -- SendSystemPacketLocked IF (b _ OISCP.MaybeGetFreeSendOisBufferFromPool[pool]) # NIL THEN BEGIN -- b.requeueProcedure should ony be set and inspected by level1 and network drivers, b.ois.pktLength _ bytesPerSequencedPktHeader; b.ois.systemPacket _ TRUE; b.ois.sendAck _ sendAck; b.ois.attention _ FALSE; b.ois.unusedType _ 0; b.ois.subtype _ 0; SendSystemPacketLocked[]; PutPacketOnSocketChannel[b]; END; END; -- SendSystemPacket -- This procedure takes a buffer and prepares it for transmission by filling in the -- appropriate fields in the buffer and updating the packet stream state variables. PrepareSequencedPacket: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] = INLINE BEGIN -- b.requeueProcedure should ony be set and inspected by level1 and network drivers -- b.length filled by router -- b.network filled by router -- b.iocbChain filled by network driver -- b.userPtr not used currently -- b.userData not used currently -- b.userDataLength not used currently -- b.status used by the socket and router code -- b.time used to determine round-trip delay. Perhaps someday individual retransmit timeout. Filled when put on retransmit queue b.tries _ 0; -- rest used by queue package, drivers and dispatcher -- encapsulation filled by router -- b.checksum filled by router -- b.ois.pktLength filled by caller of this routine -- b.transportControl filled by router b.ois.transCntlAndPktTp.packetType _ sequencedPacket; b.ois.destination _ remoteAddr; -- b.ois.source filled by socket interface -- b.ois.systemPacket filled by caller of this routine -- b.ois.sendAck filled by acknowledgement strategy by caller -- b.ois.attention filled by caller of this routine -- b.ois.unusedType filled by caller of this routine -- b.ois.subtype filled by caller of this routine -- b.ois.endOfMessage filled by caller of this routine b.ois.sourceConnectionID _ localConnectionID; b.ois.destinationConnectionID _ remoteConnectionID; b.ois.sequenceNumber _ nextOutputSeq; b.ois.acknowledgeNumber _ nextInputSeq; b.ois.allocationNumber _ maxInputSeq; IF NOT b.ois.systemPacket THEN nextOutputSeq _ nextOutputSeq + 1 ELSE BEGIN IF CommFlags.doStats THEN StatIncr[statAcksSent]; IF CommFlags.doStats THEN StatIncr[statSystemPacketsSent]; END; IF b.ois.sendAck AND CommFlags.doStats THEN StatIncr[statAckRequestsSent]; IF b.ois.attention AND CommFlags.doStats THEN StatIncr[statAttentionsSent]; END; -- PrepareSequencedPacket -- This procedure puts a packet out on the socket channel, where the packet is -- asynchronously sent. On completion the dispatcher enqueues the packet onto -- the sentQueue for possible retransmission only if the packet is not a system packet. -- The packet stream monitor should not be locked when we call this. PutPacketOnSocketChannel: PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN IF ~b.ois.systemPacket THEN BEGIN b.requeueProcedure _ LOOPHOLE[EnqueueTransmittedPacketAppropriately]; -- remember time for retransmitter IF b.ois.sendAck THEN ackRequestPulse _ System.GetClockPulses[]; END; Socket.PutPacket[ socketCH, b ! Socket.ChannelAborted => -- we are about to be deleted BEGIN b.requeueProcedure[b]; CONTINUE; END]; END; -- PutPacketOnSocketChannel EnqueueTransmittedPacketAppropriately: ENTRY PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN FailConnection: INTERNAL PROCEDURE [why: FailureReason] = BEGIN SuspendStream[noRouteToDestination]; whyFailed _ why; -- inform the creator if there is one END; -- FailConnection -- check for failures (this may be the first packet, so look for connection failures) SELECT LOOPHOLE[b.status, XmitStatus] FROM pending, goodCompletion, aborted, invalidDestAddr => NULL; -- ok or not expected noRouteToNetwork, hardwareProblem => FailConnection[noRouteToDestination]; noTranslationForDestination => FailConnection[noTranslationForDestination]; noAnswerOrBusy => FailConnection[noAnswerOrBusy]; circuitInUse => FailConnection[circuitInUse]; circuitNotReady => FailConnection[circuitNotReady]; noDialingHardware => FailConnection[noDialingHardware]; dialerHardwareProblem => FailConnection[noRouteToDestination]; ENDCASE => ERROR; -- enqueue onto sentQueue since this is not a system packet EnqueueSpp[@sentQueue, b]; IF b.tries = 0 AND b.ois.sendAck THEN b.time _ System.GetClockPulses[]; -- record time if ackReq packet and first time put there (used for delay calculation) END; -- EnqueueTransmittedPacketAppropriately -- This procedure changes the state of the packet stream to be suspended. SuspendStream: INTERNAL PROCEDURE [why: SuspendReason] = BEGIN stateBeforeSuspension _ state; state _ suspended; whySuspended _ why; END; -- SuspendStream -- This procedure changes the state of the packet stream to be suspended. SuspendStreamLocked: ENTRY PROCEDURE [why: SuspendReason] = BEGIN SuspendStream[why]; END; -- SuspendStreamLocked -- This process retransmits packets that have not been acknowledged in a reasonable -- time, and in addition generates probes etc. to test for the connection's livenesss. -- The Retransmitter, Receiver and the transmission process all three try to keep the -- sentBuffer and sentQueue ordered by sequence number. Retransmitter: PROCEDURE = BEGIN now: LONG CARDINAL; retransBuffer: BufferDefs.OisBuffer; -- some procedures whose scope is just the Retransmitter TimeToSendAProbe: ENTRY PROCEDURE RETURNS [goAhead: BOOLEAN] = INLINE BEGIN goAhead _ FALSE; IF state = established OR state = open THEN BEGIN IF ((LOOPHOLE[nextOutputSeq - maxOutputSeq, INTEGER] > 0 AND (now - lastProbePulse) > probeRetransmitPulses)) OR (((now - lastPacketReceivedPulse) >= inactiveConnectionPulses AND (now - lastProbePulse) >= inactiveConnectionPulses)) THEN BEGIN -- probe for allocation, or just an ack from other end, or for activity. lastProbePulse _ now; IF (probeCounter _ probeCounter + 1) > probesBeforeGiveUp THEN SuspendStream[transmissionTimeout] ELSE BEGIN goAhead _ TRUE; IF CommFlags.doStats THEN StatIncr[statProbesSent]; END; END; END; END; -- TimeToSendAProbe RetransmissionCount: ENTRY PROCEDURE RETURNS [CARDINAL] = INLINE BEGIN IF state = established OR state = open THEN BEGIN IF (now - ackRequestPulse) >= dataPacketRetransmitPulses THEN RETURN[sentQueue.length + (IF sentBuffer = NIL THEN 0 ELSE 1)] -- we must retransmit all packets currently on the sentQueue and sentBuffer. -- a count is kept because on retransmission we put packets back on -- sentQueue and otherwise we will be in an infinite loop. ELSE RETURN[0]; -- not yet time to retransmit END ELSE RETURN[0]; -- any other state requires no retransmission. END; -- RetransmissionCount GetFromRetransmissionQueue: ENTRY PROCEDURE RETURNS [b: BufferDefs.OisBuffer] = INLINE BEGIN IF sentBuffer = NIL THEN b _ DequeueSpp[@sentQueue] ELSE BEGIN b _ sentBuffer; sentBuffer _ NIL; END; IF b = NIL THEN RETURN; SELECT b.tries FROM >= retransmissionsBeforeGiveUp => BEGIN -- give up trying to send anything on this packet stream. SuspendStream[transmissionTimeout]; sentBuffer _ b; -- put back at head of sentQueue b _ NIL; END; = retransmissionsBeforeAskForAck => b.ois.sendAck _ TRUE; -- need to set ack request ENDCASE; END; -- GetFromRetransmissionQueue WaitForAWhile: ENTRY PROCEDURE = INLINE BEGIN WAIT retransmitterWakeupTimer; END; -- WaitForAWhile UpdateRetransmitTime: PROCEDURE = INLINE BEGIN avgDelay: System.Pulses; weightNewAbove: CARDINAL; IF ((now - lastDelayCalculationPulse) >= delayCalculationPulses) THEN BEGIN -- time to do retransmission timeout determination IF delayCount > 0 THEN BEGIN -- we have had some packets with lossless delay (no retransmissions) avgDelay _ [delaySum/delayCount]; normalRetransUpdates _ normalRetransUpdates + 1; -- **temp -- new retransmit timeout is a function of the old retransmit timeout and the --newly measured delay. If the delay is less than about 500 msecs, the standard --deviation of the measured delay is large compared to transmission time; --otherwise transmission time dominates. So for fast mediums, we multiply the --delay by a large factor. For slow mediums, we go with the measured delay. -- The following lines are in flux. weightNewAbove _ IF avgDelay > fiveHundredMsecsOfPulses THEN 1 ELSE wNewAbove; dataPacketRetransmitPulses _ [(dataPacketRetransmitPulses / wOldBelow)* wOldAbove + (avgDelay / wNewBelow)* weightNewAbove]; -- dataPacketRetransmitPulses _ --[(dataPacketRetransmitPulses + wNewAbove*avgDelay)/2]; delaySum _ [0]; delayCount _ 0; END ELSE BEGIN -- if data flowed, all had retranmissions and we should increase timeout IF unackedOutputSeq > seqNumWhenDelayCalculated THEN BEGIN doubleRetransUpdates _ doubleRetransUpdates + 1; -- **temp dataPacketRetransmitPulses _ [dataPacketRetransmitPulses*2]; END; END; seqNumWhenDelayCalculated _ unackedOutputSeq; dataPacketRetransmitPulses _ [MIN[ dataPacketRetransmitPulses, maxDataPacketRetransmitPulses]]; probeRetransmitPulses _ probeMultiplier*dataPacketRetransmitPulses + fortyMsecsOfPulses; Process.SetTimeout[ @retransmitterWakeupTimer, PulsesToTicks[[dataPacketRetransmitPulses/4]]]; -- set new retransmitter wakeup END; END; -- UpdateRetransmitTime -- main body of the procedure UNTIL pleaseStop DO -- checks the value of pleaseStop, race condition OK now _ System.GetClockPulses[]; THROUGH (0..RetransmissionCount[]] DO -- this is an upper bound -- state may change for any reason, but we keep sending them out, unless we -- ourselves changed the state to be suspended! The Receiver may take packets -- off the sentQueue in parallel in which case we will get a NIL and therefore exit. IF (retransBuffer _ GetFromRetransmissionQueue[]) = NIL THEN EXIT; IF LOOPHOLE[retransBuffer.ois.sequenceNumber - unackedOutputSeq, INTEGER] < 0 THEN OISCP.ReturnFreeOisBuffer[retransBuffer] -- packet has been ACKed ELSE BEGIN -- its time to send this packet out again retransBuffer.tries _ retransBuffer.tries + 1; PutPacketOnSocketChannel[retransBuffer]; -- requeues buffer on sentQueue retransBuffer _ NIL; IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted]; END; ENDLOOP; IF TimeToSendAProbe[] THEN SendSystemPacket[TRUE]; WaitForAWhile[]; UpdateRetransmitTime[]; ENDLOOP; END; -- Retransmitter PulsesToTicks: PROCEDURE [pulses: System.Pulses] RETURNS [Process.Ticks] = INLINE BEGIN lastCard: LONG CARDINAL = LAST[CARDINAL]; msecs: LONG CARDINAL _ System.PulsesToMicroseconds[pulses]/1000 + 1; RETURN[ Process.MsecToTicks[ IF msecs > lastCard THEN LAST[CARDINAL] ELSE Inline.LowHalf[msecs]]]; END; -- This process waits at the socket channel for a packet to arrive. -- This packet is in a buffer that belongs to this socket and should be returned to the -- pool associated with the socket channel. Receiver: PROCEDURE = BEGIN b: BufferDefs.OisBuffer; UNTIL pleaseStop DO -- race condition not harmful as we account for it -- now wait for something to arrive. b _ Socket.GetPacket[ socketCH ! Socket.TimeOut => RETRY; Socket.ChannelAborted => EXIT]; -- pleaseStop got set by someone else SELECT LOOPHOLE[b.status, XmitStatus] FROM goodCompletion => SELECT b.ois.transCntlAndPktTp.packetType FROM sequencedPacket => GotSequencedPacket[LOOPHOLE[b, BufferDefs.OisBuffer]]; error => GotErrorPacket[b]; ENDCASE => BEGIN -- discard, don't want any other protocol type OISCP.ReturnFreeOisBuffer[b]; IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType]; END; ENDCASE => OISCP.ReturnFreeOisBuffer[b]; ENDLOOP; END; -- Receiver -- This procedure examines the sequenced packet that has just arrived. GotSequencedPacket: PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN returnAnAck: BOOLEAN _ FALSE; -- we do not return an ack unless we are required to giveBackToSocket: BOOLEAN _ TRUE; -- unless we put on some other queue nowPulse: System.Pulses; -- set if we get a packet for our connection GotSequencedPacketLocked: ENTRY PROCEDURE = INLINE BEGIN SELECT state FROM unestablished, suspended, terminating => NULL; -- not interested. activeEstablish, waitEstablish => [returnAnAck, giveBackToSocket] _ EstablishThisConnection[b]; established, open => BEGIN -- check that packet is from right remote address and connection ID IF b.ois.source.host = remoteAddr.host AND b.ois.source.socket = remoteAddr.socket AND b.ois.sourceConnectionID = remoteConnectionID AND b.ois.destinationConnectionID = localConnectionID THEN BEGIN -- the packet is for this connection probeCounter _ 0; -- other end is alive lastPacketReceivedPulse _ nowPulse _ System.GetClockPulses[]; -- for inactivity et al SELECT LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] FROM -- just the packet we wanted, or packet is early IN [0..duplicateWindow] => RightOrEarlyPacket[]; -- old duplicate packet IN [-duplicateWindow..0) => DuplicatePacket[]; -- very old duplicate packet ENDCASE => IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedVeryLate] END ELSE -- packet is really not for this connection (or other end not established) BEGIN IF CommFlags.doStats AND (b.ois.source.host # remoteAddr.host OR b.ois.source.socket # remoteAddr.socket) THEN StatIncr[statPacketsRejectedBadSource]; IF b.ois.sourceConnectionID # remoteConnectionID OR b.ois.destinationConnectionID # localConnectionID THEN BEGIN IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; returnAnAck _ TRUE; -- probably our connection response got lost END; END; END; ENDCASE; -- delayed acking has been abandoned; only allocate is dependent on client -- this is delicate and is done to avoid getting stuck in a small window -- IF returnAnAck AND BufferDefs.QueueLength[@inOrderQueue] # emptyInOrderQueue -- THEN -- BEGIN -- returnAnAck _ FALSE; we don't have to send back an ack -- sendAnAck _ TRUE; make another process send back an ack -- END; -- This wait lets the user process run and suck up all the packets on the -- inOrderInputQueue so that we send back a full allocate. IF LOOPHOLE[maxInputSeq - nextInputSeq, INTEGER] < minRcvAllocation THEN WAIT letClientRun; END; -- GotSequencedPacketLocked -- This procedure processes both the next expected packet as well as early packets. -- Both kinds of packets are processed similarly as far as acks and allocation, -- and attentions are concerned. RightOrEarlyPacket: INTERNAL PROCEDURE = INLINE BEGIN IF b.ois.sendAck THEN BEGIN returnAnAck _ TRUE; IF CommFlags.doStats THEN StatIncr[statAckRequestsReceived]; END; -- update allocation and ack fields of packetstream. IF LOOPHOLE[b.ois.allocationNumber - maxOutputSeq, INTEGER] > 0 THEN BEGIN maxOutputSeq _ b.ois.allocationNumber; NOTIFY newAllocation; END; IF LOOPHOLE[b.ois.acknowledgeNumber - unackedOutputSeq, INTEGER] > 0 THEN BEGIN unackedOutputSeq _ b.ois.acknowledgeNumber; ackRequestPulse _ nowPulse; -- if we get an ack we move the ack req. time END; -- now remove acked packets, if any, from sentQueue. We assume that -- the sentQueue is kept ordered by increasing sequence number. IF sentBuffer = NIL THEN sentBuffer _ DequeueSpp[@sentQueue]; UNTIL sentBuffer = NIL OR LOOPHOLE[sentBuffer.ois.sequenceNumber - unackedOutputSeq, INTEGER] >= 0 DO IF sentBuffer.ois.sendAck AND sentBuffer.tries = 0 THEN BEGIN -- constitutes a measure of lossless delay (no retransmissions) delaySum _ [delaySum + (nowPulse - sentBuffer.time)]; -- add time spent on queue to delay stats delayCount _ delayCount + 1; END; OISCP.ReturnFreeOisBuffer[sentBuffer]; sentBuffer _ DequeueSpp[@sentQueue]; ENDLOOP; -- so far we have only updated state information, now to dispense with the packet IF b.ois.systemPacket THEN BEGIN IF CommFlags.doStats THEN BEGIN StatIncr[statSystemPacketsReceived]; StatIncr[statAcksReceived]; END; END ELSE -- this is not a system packet process it intelligently BEGIN -- if the attention bit is set then we try to process the attention, and only -- if we were able to do so do we put the packet on the inOrderQueue. IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN BEGIN -- only now must we decide if it is in order or out of order IF LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] = 0 THEN -- just the packet we wanted BEGIN state _ open; DO nextInputSeq _ nextInputSeq + 1; IF attnCount # 0 THEN UpdateAttnSeqTable[]; EnqueueSpp[@inOrderQueue, b]; giveBackToSocket _ FALSE; NOTIFY inOrderQueueNotEmpty; -- examine the OutOfOrderQueue to see if anything can be taken off -- currently there is never anything on the OutOfOrderQueue. DO IF BufferDefs.QueueLength[@outOfOrderQueue] = 0 THEN BEGIN b _ NIL; EXIT; END; b _ DequeueSpp[@outOfOrderQueue]; SELECT LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] FROM 0 => EXIT; -- this is the next in order packet, pick it up in outer loop IN (0..duplicateWindow] => EnqueueSpp[@tempQueue, b]; -- still out of order ENDCASE => OISCP.ReturnFreeOisBuffer[b]; -- old duplicate, throw away ENDLOOP; -- put buffers that are on tempQueue back onto outOfOrderQueue UNTIL (tempQueue.first) = NIL DO EnqueueSpp[@outOfOrderQueue, DequeueSpp[@tempQueue]]; ENDLOOP; IF b = NIL THEN EXIT; -- else, loop to process packet from outOfOrderQueue that is now next ENDLOOP; END ELSE IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedEarly]; -- early packet END; END; END; -- RightOrEarlyPacket DuplicatePacket: INTERNAL PROCEDURE = INLINE BEGIN IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedAgain]; -- send acks only if the other end asked IF b.ois.sendAck THEN returnAnAck _ TRUE; END; -- DuplicatePacket -- main body of this sprocedure IF b.ois.pktLength >= bytesPerSequencedPktHeader THEN BEGIN GotSequencedPacketLocked[]; IF returnAnAck THEN SendSystemPacket[FALSE]; -- we may have to send the ack END ELSE IF CommFlags.doStats THEN StatIncr[statEmptyFunnys]; IF giveBackToSocket THEN OISCP.ReturnFreeOisBuffer[b]; END; -- GotSequencedPacket -- This procedure attempts to processes the attention packet, and returns whether it -- was successful or not. Success implies that an entry for the attn packet already -- exists in the attnSeqTable, or has just been made. This packet is now a candidate -- for the inOrderQueue or the outOfOrderQueue. If an entry can not be made because -- of space restrictions we pretend we never ever saw this packet and let it be -- retransmitted from the source. The attnSeqTable is reasonablly big, and so this -- decision should not cause adverse effects. This procedure is only called if the packet -- was within the accept window. AttentionPacketProcessed: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] RETURNS [BOOLEAN] = BEGIN i: CARDINAL _ 1; alreadyThere: BOOLEAN _ FALSE; advanceB: BufferDefs.OisBuffer; to, from: Environment.Block; nBytes: CARDINAL; IF attnCount = maxPendingAttns THEN RETURN[FALSE]; -- no space -- insert the b.ois.sequenceNumber in the attnSeqTable if not already there IF attnCount # 0 -- see if an entry is already there THEN FOR i IN (0..attnCount] DO IF attnSeqTable[i] = b.ois.sequenceNumber THEN BEGIN alreadyThere _ TRUE; EXIT; END; ENDLOOP; IF NOT alreadyThere THEN -- because attnCount=0 or because really not there! BEGIN IF (advanceB _ MaybeGetFreeReceiveOisBufferFromPool[pool]) = NIL THEN RETURN[FALSE]; -- cop out attnSeqTable[i] _ b.ois.sequenceNumber; attnCount _ attnCount + 1; -- make a copy and then enqueue it to _ [@advanceB.ois, 0, b.ois.pktLength]; from _ [@b.ois, 0, b.ois.pktLength]; nBytes _ ByteBlt.ByteBlt[to: to, from: from]; EnqueueSpp[@inAttnQueue, advanceB]; NOTIFY newInputAttn; END; IF CommFlags.doStats THEN StatIncr[statAttentionsReceived]; RETURN[TRUE]; END; -- AttentionPacketProcessed -- This procedure updates the attnSeqTable when the nextInputSeq is updated. -- That is to say, we are removing any entries in the attnSeqTable that we plan -- to ack, the next time we send out a packet. UpdateAttnSeqTable: INTERNAL PROCEDURE = BEGIN i: CARDINAL _ 1; DO IF i > attnCount THEN EXIT; IF LOOPHOLE[(attnSeqTable[i] - nextInputSeq), INTEGER] < 0 THEN BEGIN attnSeqTable[i] _ attnSeqTable[attnCount]; attnSeqTable[attnCount] _ 0; attnCount _ attnCount - 1; END ELSE i _ i + 1; ENDLOOP; END; -- UpdateAttnSeqTable -- This procedure examines an error protocol packet. GotErrorPacket: ENTRY PROCEDURE [b: BufferDefs.OisBuffer] = BEGIN ENABLE UNWIND => NULL; IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived]; SELECT b.ois.errorType FROM noSocketOisErrorCode => SELECT state FROM unestablished, activeEstablish, waitEstablish => {whyFailed _ noServiceAtDestination; SuspendStream[noRouteToDestination]}; established, open => SuspendStream[remoteServiceDisappeared]; ENDCASE => NULL; ENDCASE => NULL; OISCP.ReturnFreeOisBuffer[b]; END; -- GotErrorPacket -- Cool Procedures -- This procedure sets the wait time. SetWaitTime: ENTRY PROCEDURE [time: OISCPTypes.WaitTime] = BEGIN waitPulses _ MilliSecondsToPulses[time]; END; -- SetWaitTime -- This procedure returns the local and remote address of this packet stream. FindAddresses: ENTRY PROCEDURE RETURNS [local, remote: NetworkAddress] = BEGIN local _ localAddr; remote _ remoteAddr; END; -- FindAddresses -- This procedure returns the number of data bytes that can fit into an sppBuffer. GetSenderSizeLimit: ENTRY PROCEDURE RETURNS [CARDINAL] = BEGIN RETURN[maxOisPktLength - bytesPerSequencedPktHeader]; END; -- GetSenderSizeLimit -- This procedure atempts to establish the local end of the connection -- to the incoming packet. EstablishThisConnection: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] RETURNS [returnAnAck, giveBackToSocket: BOOLEAN] = BEGIN returnAnAck _ FALSE; giveBackToSocket _ TRUE; -- If state=activeEstablish then master/slave response when sockets are completely -- specified. If state=waitEstablish then other end initiates and knows all about us. IF b.ois.source.host = remoteAddr.host AND b.ois.source.socket = remoteAddr.socket AND b.ois.destinationConnectionID = localConnectionID AND b.ois.sourceConnectionID # unknownConnID AND b.ois.sequenceNumber = 0 THEN BEGIN remoteConnectionID _ b.ois.sourceConnectionID; maxOutputSeq _ b.ois.allocationNumber; state _ established; IF b.ois.sendAck THEN returnAnAck _ TRUE; IF NOT b.ois.systemPacket THEN BEGIN -- if the attention bit is set then we try to process the attention, and only -- if we were able to do so do we put the packet on the inOrderQueue. IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN BEGIN nextInputSeq _ nextInputSeq + 1; EnqueueSpp[@inOrderQueue, b]; giveBackToSocket _ FALSE; IF attnCount # 0 THEN UpdateAttnSeqTable[]; -- remove knowledge of old attns state _ open; NOTIFY inOrderQueueNotEmpty; END; END ELSE IF CommFlags.doStats THEN StatIncr[statSystemPacketsReceived]; NOTIFY connectionIsEstablished; END -- If state=activeEstablish then simultaneous establishment when sockets are -- completely specified. If state=waitEstablish then other end initiates and knows -- all about us except our connection ID. ELSE IF b.ois.source.host = remoteAddr.host AND b.ois.source.socket = remoteAddr.socket AND b.ois.destinationConnectionID = unknownConnID AND b.ois.sourceConnectionID # unknownConnID AND b.ois.sequenceNumber = 0 THEN BEGIN remoteConnectionID _ b.ois.sourceConnectionID; maxOutputSeq _ b.ois.allocationNumber; IF state = waitEstablish THEN returnAnAck _ TRUE; state _ established; IF b.ois.sendAck THEN returnAnAck _ TRUE; IF CommFlags.doStats AND b.ois.systemPacket THEN StatIncr[statSystemPacketsReceived]; NOTIFY connectionIsEstablished; END -- If state=activeEstablish then response from well known socket at which -- there is a server process ELSE IF state = activeEstablish AND b.ois.source.host = remoteAddr.host AND b.ois.source.socket # remoteAddr.socket AND b.ois.destinationConnectionID = localConnectionID AND b.ois.sourceConnectionID # unknownConnID AND b.ois.sequenceNumber = 0 THEN BEGIN remoteAddr.socket _ b.ois.source.socket; remoteConnectionID _ b.ois.sourceConnectionID; maxOutputSeq _ b.ois.allocationNumber; state _ established; IF b.ois.sendAck THEN returnAnAck _ TRUE; IF NOT b.ois.systemPacket THEN BEGIN -- if the attention bit is set then we try to process the attention, and only -- if we were able to do so do we put the packet on the inOrderQueue. IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN BEGIN nextInputSeq _ nextInputSeq + 1; EnqueueSpp[@inOrderQueue, b]; giveBackToSocket _ FALSE; IF attnCount # 0 THEN UpdateAttnSeqTable[]; -- remove knowledge of old attns state _ open; NOTIFY inOrderQueueNotEmpty; END; END ELSE IF CommFlags.doStats THEN StatIncr[statSystemPacketsReceived]; NOTIFY connectionIsEstablished; END -- mismatched IDs ELSE IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID]; IF returnAnAck AND CommFlags.doStats THEN StatIncr[statAckRequestsReceived]; END; -- EstablishThisConnection -- This procedure causes the active establishement of the connection. A system packet -- is transmitted to the remote end, and then this procedure casues the caller to hang -- for a certain maximum time in which it is hoped that the connection has become -- established owing to the receipt of an appropriate packet from the remote end. ActivelyEstablish: PROCEDURE = BEGIN currentPulse, startPulse: LONG CARDINAL; WaitUntilActivelyEstablished: ENTRY PROCEDURE RETURNS [BOOLEAN] = INLINE BEGIN ENABLE UNWIND => NULL; WAIT connectionIsEstablished; IF state = established OR state = open THEN RETURN[TRUE] ELSE BEGIN IF state = suspended THEN RETURN WITH ERROR ConnectionFailed[whyFailed]; currentPulse _ System.GetClockPulses[]; IF (currentPulse - startPulse) >= waitPulses THEN BEGIN SIGNAL ConnectionFailed[timeout]; startPulse _ System.GetClockPulses[]; END; RETURN[FALSE]; END; END; -- WaitUntilActivelyEstablished startPulse _ System.GetClockPulses[]; DO SendSystemPacket[TRUE]; -- waits for 2 seconds to see if the connection is established IF WaitUntilActivelyEstablished[] -- this returns or raises a SIGNAL -- THEN RETURN; ENDLOOP; END; -- ActivelyEstablish -- This procedure waits for the remote end to initiate connection establishement. WaitUntilEstablished: ENTRY PROCEDURE = BEGIN ENABLE UNWIND => NULL; currentPulse, startPulse: LONG CARDINAL; startPulse _ System.GetClockPulses[]; DO -- waits for 2 seconds each time around WAIT connectionIsEstablished; IF state = established OR state = open THEN RETURN ELSE BEGIN currentPulse _ System.GetClockPulses[]; IF (currentPulse - startPulse) >= waitPulses THEN BEGIN SIGNAL ConnectionFailed[timeout]; startPulse _ System.GetClockPulses[]; END; END; ENDLOOP; END; -- WaitUntilEstablished -- This procedure destroys the packet stream. The procedure changes the state of -- the packet stream, NOTIFYs the Retransmitter and Receiver to self destruct, -- and then waits until this happens. When this condition is satisfied, the socket -- channel is deleted and, the packet stream data structures cleaned up. Delete: PUBLIC PROCEDURE = BEGIN DeleteActivate[]; Socket.Abort[socketCH]; JOIN retransmitterFork; JOIN receiverFork; DeleteCleanup[]; Socket.Delete[socketCH]; Runtime.SelfDestruct[]; END; -- Delete DeleteActivate: ENTRY PROCEDURE = INLINE BEGIN state _ terminating; pleaseStop _ TRUE; NOTIFY retransmitterWakeupTimer; END; -- DeleteActivate DeleteCleanup: ENTRY PROCEDURE = INLINE BEGIN OPEN SocketInternal; ENABLE UNWIND => NULL; IF sentBuffer # NIL THEN OISCP.ReturnFreeOisBuffer[sentBuffer]; BufferDefs.QueueCleanup[@inOrderQueue]; BufferDefs.QueueCleanup[@inAttnQueue]; BufferDefs.QueueCleanup[@outOfOrderQueue]; BufferDefs.QueueCleanup[@tempQueue]; WHILE sentQueue.length= LAST[LONG CARDINAL]/1000 -- multiplication overflow condition THEN pulses _ System.MicrosecondsToPulses[LAST[LONG CARDINAL]] ELSE pulses _ System.MicrosecondsToPulses[1000*ms]; END; -- end MilliSecondsToPulses -- initialization (Cool) -- initialize connection control parameters -- This initializes the necessary data structures for a packet stream instance. -- It fills in all the necessary parameters and creates two processes that look -- after the well being of this packet stream. One is a Retransmitter, that periodically -- checks the sentQueue to see if there is anything unacknowledged, and the other -- is the Receiver, which sees if there are any incoming packets on the -- corresponding socket. state _ unestablished; whySuspended _ notSuspended; stateBeforeSuspension _ unestablished; Process.SetTimeout[@connectionIsEstablished, Process.MsecToTicks[2000]]; nextInputSeq _ unackedOutputSeq _ nextOutputSeq _ 0; maxOutputSeq _ nextOutputSeq + defaultSendAllocation - 1; SELECT classOfService FROM bulk => {socketCH _ Socket.Create[localAddr, bulkSendBufs, bulkReceiveBufs, 0, TRUE]; maxInputSeq _ bulkInitialAllocation}; transactional => {socketCH _ Socket.Create[localAddr, transactSendBufs, transactReceiveBufs, 0, TRUE]; maxInputSeq _ transactInitialAllocation}; ENDCASE => ERROR; pool _ ps.pool _ SocketInternal.GetBufferPool[ SocketInternal.ChannelHandleToSocketHandle[socketCH]]; localAddr _ Socket.GetStatus[socketCH].localAddr; IF remoteConnectionID # unknownConnID THEN state _ established; Process.SetTimeout[@newAllocation, Process.MsecToTicks[1000]]; CommUtilDefs.EnableAborts[@newAllocation]; sendAnAck _ FALSE; maxOisPktLength _ OISCPTypes.maxBytesPerPkt; Process.SetTimeout[@newInputAttn, Process.MsecToTicks[1000]]; CommUtilDefs.EnableAborts[@newInputAttn]; FOR attnCount IN (0..maxPendingAttns] DO attnSeqTable[attnCount] _ 0; ENDLOOP; attnCount _ 0; sentBuffer _ NIL; BufferDefs.QueueInitialize[@inOrderQueue]; BufferDefs.QueueInitialize[@inAttnQueue]; BufferDefs.QueueInitialize[@outOfOrderQueue]; BufferDefs.QueueInitialize[@tempQueue]; BufferDefs.QueueInitialize[@sentQueue]; Process.SetTimeout[@inOrderQueueNotEmpty, Process.MsecToTicks[1000]]; CommUtilDefs.EnableAborts[@inOrderQueueNotEmpty]; ackRequestPulse _ lastProbePulse _ lastDelayCalculationPulse _ lastPacketReceivedPulse _ System.GetClockPulses[]; probeCounter _ 0; dataPacketRetransmitPulses _ MilliSecondsToPulses[ initialDataPacketRetransmitTime]; -- msecs converted to Pulses probeRetransmitPulses _ MilliSecondsToPulses[ probeMultiplier*initialDataPacketRetransmitTime + 40]; -- msecs converted to Pulses delaySum _ [0]; delayCount _ 0; normalRetransUpdates _ doubleRetransUpdates _ 0; -- **temp waitPulses _ MilliSecondsToPulses[timeout]; -- msecs converted to Pulses pleaseStop _ FALSE; Process.SetTimeout[@letClientRun, Process.MsecToTicks[250]]; -- this is also used in DeleteCleanup - do not ENABLE ABORTs Process.SetTimeout[ @retransmitterWakeupTimer, Process.MsecToTicks[initialRetransmitterTime]]; -- This baroque code is to ensure that the state is not looked at by any of the other -- processes until this initialization code has finished examining this variable without -- entering the monitor. IF state = unestablished THEN BEGIN IF establish THEN BEGIN state _ activeEstablish; retransmitterFork _ FORK Retransmitter[]; receiverFork _ FORK Receiver[]; Process.Yield[]; ActivelyEstablish[]; END ELSE BEGIN state _ waitEstablish; retransmitterFork _ FORK Retransmitter[]; receiverFork _ FORK Receiver[]; Process.Yield[]; WaitUntilEstablished[]; END; END ELSE BEGIN SendSystemPacket[FALSE]; -- gratuitous ack in case we are a server agent now. -- Retransmissions of this gratuitous packet will ocurr in the nature of probes. retransmitterFork _ FORK Retransmitter[]; receiverFork _ FORK Receiver[]; Process.Yield[]; END; waitPulses _ MilliSecondsToPulses[PacketStream.infiniteWaitTime]; -- Gets, WaitAttentions default to very long RETURN[@ps, remoteAddr, remoteConnectionID]; END. -- of PacketStreamInstance module LOG Time: January 26, 1981 10:03 AM By: Garlick Action: 1.) added classOfService parameter to reduce buffers in interactive case 2.) added interpretation of Error Protocol packets 3.) increased inactivity timeouts for internet cases 4.) initial data operation timeout set to infinite 5.) acks no longer stimulated by Gets, rather in receiver 6.) send an alocation packet if just opening the window 7.) reinstated slow media adaptive algorithm 8.) enabled Process.Aborting for Gets, Puts, WaitAttn.(635)\23386v14V