PacketStreamInstance.mesa - implementation module for an instance of a SPP packet stream
Copyright © 1985 by Xerox Corporation. All rights reserved.
BLyon on: March 18, 1981 6:18 PM
Levin, August 9, 1983 9:14 am
DIRECTORY
BufferDefs USING [BufferAccessHandle, OisBuffer, Queue, QueueObject, QueueLength, QueueCleanup, QueueInitialize],
CommFlags USING [doDebug, doStats],
CommUtilDefs USING [EnableAborts],
DriverDefs USING [Glitch],
LoadState USING [SelfDestruct],
OISCP USING [DequeueSpp, EnqueueSpp, MaybeGetFreeSendOisBufferFromPool, MaybeGetFreeReceiveOisBufferFromPool, ReturnFreeOisBuffer, unknownConnID],
OISCPTypes USING [ConnectionID, maxBytesPerPkt, WaitTime],
PacketStream,
PrincOpsUtils USING [ByteBlt],
Process USING [MsecToTicks, SetTimeout, Ticks, Yield],
ProcessorFace USING [GetClockPulses, microsecondsPerHundredPulses],
Router USING [XmitStatus],
Socket USING [Abort, ChannelAborted, ChannelHandle, Create, Delete, GetPacket, GetStatus, PutPacket, TimeOut],
SocketInternal USING [GetBufferPool, ChannelHandleToSocketHandle],
NSAddress USING [NetworkAddress],
StatsDefs USING [StatBump, StatIncr];
Time units have been converted from MilliSeconds to Pulses (as defined by ProcessorFace.GetClockPulses); However, all interface still are in units of MilliSeconds (like the timeout passed to the MONITOR). GetClock now returns Pulses. Variable names ending in 'Time' will be in milliseconds (hopefully only initial values), while other variables will be in Pulses (some will end in 'Pulses').
PacketStreamInstance: MONITOR [
localAddr, remoteAddr: NSAddress.NetworkAddress,
localConnectionID, remoteConnectionID: OISCPTypes.ConnectionID,
establish: BOOLEAN, timeout: OISCPTypes.WaitTime ← PacketStream.defaultWaitTime,
classOfService: PacketStream.ClassOfService ← bulk]
RETURNS [
PacketStream.Handle, NSAddress.NetworkAddress, OISCPTypes.ConnectionID]
IMPORTS
BufferDefs, CommUtilDefs, DriverDefs, LoadState, PrincOpsUtils, Process, ProcessorFace,
Socket, SocketInternal, StatsDefs, PacketStream, OISCP
SHARES BufferDefs =
BEGIN OPEN DriverDefs, OISCP, Router, PacketStream, StatsDefs;
Pulses: TYPE = RECORD[LONG CARDINAL];
EXPORTED TYPE(S)
NetworkAddress: PUBLIC TYPE = NSAddress.NetworkAddress;
client interface
ps: Object ←
[NIL, TakeFromUser, GetForUser, WaitForAttention, SetWaitTime, FindAddresses,
GetSenderSizeLimit, ReturnGetSppDataBuffer]; -- pool (NIL) initted later
connection control parameters
connection state
state: PacketStream.State;
whySuspended: PacketStream.SuspendReason;
whyFailed: PacketStream.FailureReason;
stateBeforeSuspension: PacketStream.State;
connectionIsEstablished: CONDITION;
socket interface data
socketCH: Socket.ChannelHandle;
pool: BufferDefs.BufferAccessHandle;
this is the buffer pool from which we get buffers
sequencing, duplicate suppression and flow control information
nextInputSeq, maxInputSeq: CARDINAL;
unackedOutputSeq, nextOutputSeq, maxOutputSeq: CARDINAL;
newAllocation: CONDITION;
sendAnAck: BOOLEAN;
connection control
maxOisPktLength: CARDINAL;
input attention control
newInputAttn: CONDITION;
table to suppress attentions that the client has seen, but we haven't acked.
we permit only a small number of pending attns, the rest are thrown away!
attnSeqTable: ARRAY (0..maxPendingAttns] OF CARDINAL;
attnCount: CARDINAL;
sentBuffer: BufferDefs.OisBuffer; -- hold the head of the sentQueue
input/output pointers and queues
inOrderQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject];
inAttnQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject];
outOfOrderQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject];
tempQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject];
sentQueue: BufferDefs.Queue = NEW[BufferDefs.QueueObject];
inOrderQueueNotEmpty: CONDITION;
variables and parameters for this packet stream
ackRequestPulse: LONG CARDINAL; -- the time an ack was requested, a Pulse
lastPacketReceivedPulse: LONG CARDINAL;
time any packet was rcv'd for this connection, a Pulse
lastProbePulse: LONG CARDINAL; -- retransmitter's last probe sent time, a Pulse
probeCounter: CARDINAL; -- retransmitter's allocation probe counter
probeRetransmitPulses: LONG CARDINAL; -- in Pulses
dataPacketRetransmitPulses: Pulses; -- in Pulses
waitPulses: LONG CARDINAL;
maximum time (in Pulses) any blocking procedure should block
delayCount: CARDINAL; -- number of data packets used in delay calculation
delaySum: Pulses;
cumulative time (in Pulses) that delayCount packets spent on retransmit queue
lastDelayCalculationPulse: LONG CARDINAL;
a Pulses, last time we updated retransmit time
seqNumWhenDelayCalculated: CARDINAL;
process handles and specific parameters for controlling them
pleaseStop: BOOLEAN;
letClientRun: CONDITION; -- this is also used in DeleteCleanup - do not ENABLE ABORTs
retransmitterFork, receiverFork: PROCESS;
retransmitterWakeupTimer: CONDITION;
temporary stats
normalRetransUpdates: CARDINAL; -- **temp
doubleRetransUpdates: CARDINAL; -- **temp
constants for bulk transfer performance
10 packet buffers at the socket.
3 are for sending, 3 for receiving.
1 for sending a system packet and 1 for receiving a system packet
1 for receiving an attention packet
1 for extra receive packet
bulkSendBufs: CARDINAL = 4;
bulkReceiveBufs: CARDINAL = 6;
bulkInitialAllocation: CARDINAL = 3;
constants for transactional transfer performance
7 packet buffers at the socket.
2 are for sending, 2 for receiving.
1 for sending a system packet and 1 for receiving a system packet
1 for extra receive packet for attention or whatever
transactSendBufs: CARDINAL = 3;
transactReceiveBufs: CARDINAL = 4;
transactInitialAllocation: CARDINAL = 2;
defaultSendAllocation: CARDINAL = 1; -- more will screw simple guys like 860
duplicateWindow: INTEGER = 100;
maxPendingAttns: CARDINAL = 5;
some day these will become variables whose value will be determined heuristically
using some kind of adaptive algorithm
retransmissionsBeforeGiveUp: CARDINAL = 30;
retransmissionsBeforeAskForAck: CARDINAL = 2;
probesBeforeGiveUp: CARDINAL = 8; -- therefore 40 secs or inactivity
emptyInOrderQueue: INTEGER = 0;
minSendAllocationBeforeRequestingAck: INTEGER = 0;
minRcvAllocation: INTEGER = 0;
initialDataPacketRetransmitTime: CARDINAL = 2000; -- msecs
maxDataPacketRetransmitTime: CARDINAL = 20000; -- msecs
maxDataPacketRetransmitPulses: Pulses ← MilliSecondsToPulses[
maxDataPacketRetransmitTime];
initialRetransmitterTime: CARDINAL = 250; -- msecs
probeMultiplier: CARDINAL = 4; -- probe time is this multiple of retrans timeout
delayCalculationTime: CARDINAL = 10000;
10 secs (keep < 1 min to assure that delaySum doesn't overflow)
delayCalculationPulses: LONG CARDINAL ← MilliSecondsToPulses[
delayCalculationTime];
inactiveConnectionTime: CARDINAL = 5000; -- msecs
inactiveConnectionPulses: LONG CARDINAL ← MilliSecondsToPulses[
inactiveConnectionTime];
fortyMsecsOfPulses: LONG CARDINAL ← MilliSecondsToPulses[40];
fiveHundredMsecsOfPulses: LONG CARDINAL ← MilliSecondsToPulses[500];
the following four constants are weights used in UpdateRetransmitTime
wOldAbove: CARDINAL ← 1;
wOldBelow: CARDINAL ← 2;
wNewAbove: CARDINAL ← 50;
wNewBelow: CARDINAL ← 2;
We are using an adaptive scheme for deciding when packets must be retransmitted.
We assume that dataPacketRetransmitPulses must elapse since the last time
ackRequestPulse was updated before retransmitting everything on the sentQueue.
ackRequestPulse is updated whenever a data packet with the sendAck bit set is
transmitted or retransmitted, or when a packet is received that updates the
unackedOutputSeq field. We transmit
a probe whenever no packet has been received for 5 sec. The connection
can become suspended if we send 8 probes without a response (probes being
generated because of allocation requirements or the 5 sec threshold), or if we
retransmit a packet too many times. This stuff is very tricky and need careful
thought for the next Pilot.
various Glitches generated by this module
StreamNotEstablished: ERROR = CODE;
StreamTerminating: ERROR = CODE;
Hot Procedures
This procedure gives a Sequenced Packet Protocol packet to be transmitted over
the specified packet stream. This packet resides in a buffer, and some of the fields
of the packet must be filled in by the caller of this procedure. These fields are
oisPktLength, the attention flag, and subtype. This buffer will be returned to the
free queue once the packet has been acknowledged. SIGNALs generated by this
procedure leave the clean up responsibility for the buffer to the caller.
TakeFromUser: PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
TakeFromUserLocked: ENTRY PROCEDURE = INLINE
BEGIN
ENABLE UNWIND => NULL;
DO
SELECT state FROM
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Glitch[StreamNotEstablished];
established, open =>
IF LOOPHOLE[nextOutputSeq - maxOutputSeq, INTEGER] <= 0 THEN
BEGIN
b.ois.systemPacket ← FALSE;
we should ask for an acknowledgement to be returned, if on sending this
packet there is space only for minAllocationBeforeRequestingAck, or this is
an attention. We should do this only if, our client hasn't already set the bit.
IF NOT b.ois.sendAck THEN
b.ois.sendAck ← b.ois.attention OR LOOPHOLE[maxOutputSeq - nextOutputSeq,
INTEGER] = minSendAllocationBeforeRequestingAck;
b.ois.unusedType ← 0;
PrepareSequencedPacket[b];
END
ELSE
BEGIN
WAIT newAllocation; -- let ABORTED ERROR propogate
LOOP; -- we start from the beginning in case the state changed
END;
suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended];
terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating];
ENDCASE;
EXIT;
ENDLOOP;
END; -- TakeFromUserLocked
TakeFromUserLocked[]; -- either returns or generates a SIGNAL.
IF CommFlags.doStats THEN StatIncr[statDataPacketsSent];
IF CommFlags.doStats THEN
StatBump[statDataBytesSent, b.ois.pktLength - bytesPerSequencedPktHeader];
PutPacketOnSocketChannel[b];
END; -- TakeFromUser
This procedure gets the next sequenced, duplicate-suppressed packet from the
packet stream. This procedure hangs till a packet is available. The client is
expected to return the buffer to the free queue. If the wait times expires then
NIL will be returned, and so the client MUST test for this case.
GetForUser: PROCEDURE RETURNS [b: BufferDefs.OisBuffer] =
BEGIN
returnAnAck: BOOLEANFALSE;
GetForUserLocked: ENTRY PROCEDURE = INLINE
BEGIN
ENABLE UNWIND => NULL;
startPulse, now: LONG CARDINAL;
startPulse ← ProcessorFace.GetClockPulses[];
DO
SELECT state FROM
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Glitch[StreamNotEstablished];
established, open =>
BEGIN
IF (b ← DequeueSpp[inOrderQueue]) # NIL THEN
BEGIN
removed code to return an ack here
IF sendAnAck AND BufferDefs.QueueLength[inOrderQueue] =
emptyInOrderQueue THEN
BEGIN
sendAnAck ← FALSE; set the global switch to false
returnAnAck ← TRUE; we must send back the ack
END;
IF CommFlags.doStats THEN
BEGIN
StatIncr[statDataPacketsReceived];
StatBump[
statDataBytesReceived,
b.ois.pktLength - bytesPerSequencedPktHeader];
END;
END
ELSE
BEGIN
now ← ProcessorFace.GetClockPulses[];
IF (now - startPulse) >= waitPulses THEN EXIT;
WAIT inOrderQueueNotEmpty; -- let ABORTED ERROR propogate
LOOP; -- we start from the beginning in case the state changed
END;
END;
suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended];
terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating];
ENDCASE;
EXIT;
ENDLOOP;
END; -- GetForUserLocked
GetForUserLocked[];
we now ack when we receive the packet IF returnAnAck THEN SendSystemPacket[FALSE];
END; -- GetForUser
This procedure waits until a packet with the attention bit arrives, and returns
an advance copy to the client, who must return it to the buffer pool. The client will
not get duplicates, though it may see the advance copies of the attention
packet after it has seen the real copy if it is sluggish.
WaitForAttention: ENTRY PROCEDURE RETURNS [b: BufferDefs.OisBuffer] =
BEGIN
ENABLE UNWIND => NULL;
DO
SELECT state FROM
unestablished, activeEstablish, waitEstablish =>
IF CommFlags.doDebug THEN Glitch[StreamNotEstablished];
established, open =>
BEGIN
IF (b ← DequeueSpp[inAttnQueue]) # NIL THEN EXIT
ELSE
BEGIN
WAIT newInputAttn; -- let ABORTED ERROR propogate
LOOP; -- we start from the beginning in case the state changed
END;
END;
suspended => RETURN WITH ERROR ConnectionSuspended[whySuspended];
terminating => IF CommFlags.doDebug THEN Glitch[StreamTerminating];
ENDCASE;
EXIT;
ENDLOOP;
END; -- WaitForAttention
This procedure returns a processed spp data buffer to its packet stream socket's pool. It also increases the allocation window
ReturnGetSppDataBuffer: PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
JustOpenedTheWindow: ENTRY PROCEDURE RETURNS [BOOLEAN] = INLINE
{RETURN[LOOPHOLE[(maxInputSeq ← maxInputSeq + 1) - nextInputSeq, INTEGER] = 1]};
OISCP.ReturnFreeOisBuffer[b];
IF JustOpenedTheWindow[] THEN SendSystemPacket[FALSE]; -- just opened a closed window
END;
This procedure causes the transmission of a system packet, i.e. one that does not
consume any sequence number. The subtype of this packet is irrelavent and so is
made zero. The procedure takes an argument indicating whether the send
acknowledgement flag should be set or not in the outgoing packet.
SendSystemPacket: PROCEDURE [sendAck: BOOLEAN] =
BEGIN
b: BufferDefs.OisBuffer;
SendSystemPacketLocked: ENTRY PROCEDURE = INLINE
BEGIN PrepareSequencedPacket[b]; END; -- SendSystemPacketLocked
IF (b ← OISCP.MaybeGetFreeSendOisBufferFromPool[pool]) # NIL THEN
BEGIN
b.requeueProcedure should ony be set and inspected by level1 and network drivers,
b.ois.pktLength ← bytesPerSequencedPktHeader;
b.ois.systemPacket ← TRUE;
b.ois.sendAck ← sendAck;
b.ois.attention ← FALSE;
b.ois.unusedType ← 0;
b.ois.subtype ← 0;
SendSystemPacketLocked[];
PutPacketOnSocketChannel[b];
END;
END; -- SendSystemPacket
This procedure takes a buffer and prepares it for transmission by filling in the
appropriate fields in the buffer and updating the packet stream state variables.
PrepareSequencedPacket: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] = INLINE
BEGIN
b.requeueProcedure should ony be set and inspected by level1 and network drivers
b.length filled by router
b.network filled by router
b.iocbChain filled by network driver
b.userPtr not used currently
b.userData not used currently
b.userDataLength not used currently
b.status used by the socket and router code
b.time used to determine round-trip delay. Perhaps someday individual retransmit timeout. Filled when put on retransmit queue
b.tries ← 0;
rest used by queue package, drivers and dispatcher
encapsulation filled by router
b.checksum filled by router
b.ois.pktLength filled by caller of this routine
b.transportControl filled by router
b.ois.transCntlAndPktTp.packetType ← sequencedPacket;
b.ois.destination ← remoteAddr;
b.ois.source filled by socket interface
b.ois.systemPacket filled by caller of this routine
b.ois.sendAck filled by acknowledgement strategy by caller
b.ois.attention filled by caller of this routine
b.ois.unusedType filled by caller of this routine
b.ois.subtype filled by caller of this routine
b.ois.endOfMessage filled by caller of this routine
b.ois.sourceConnectionID ← localConnectionID;
b.ois.destinationConnectionID ← remoteConnectionID;
b.ois.sequenceNumber ← nextOutputSeq;
b.ois.acknowledgeNumber ← nextInputSeq;
b.ois.allocationNumber ← maxInputSeq;
IF NOT b.ois.systemPacket THEN nextOutputSeq ← nextOutputSeq + 1
ELSE
BEGIN
IF CommFlags.doStats THEN StatIncr[statAcksSent];
IF CommFlags.doStats THEN StatIncr[statSystemPacketsSent];
END;
IF b.ois.sendAck AND CommFlags.doStats THEN StatIncr[statAckRequestsSent];
IF b.ois.attention AND CommFlags.doStats THEN StatIncr[statAttentionsSent];
END; -- PrepareSequencedPacket
This procedure puts a packet out on the socket channel, where the packet is
asynchronously sent. On completion the dispatcher enqueues the packet onto
the sentQueue for possible retransmission only if the packet is not a system packet.
The packet stream monitor should not be locked when we call this.
PutPacketOnSocketChannel: PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
IF ~b.ois.systemPacket THEN
BEGIN
b.requeueProcedure ← LOOPHOLE[EnqueueTransmittedPacketAppropriately];
remember time for retransmitter
IF b.ois.sendAck THEN ackRequestPulse ← ProcessorFace.GetClockPulses[];
END;
Socket.PutPacket[
socketCH, b !
Socket.ChannelAborted =>
we are about to be deleted
BEGIN b.requeueProcedure[b]; CONTINUE; END];
END; -- PutPacketOnSocketChannel
EnqueueTransmittedPacketAppropriately: ENTRY PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
FailConnection: INTERNAL PROCEDURE [why: FailureReason] =
BEGIN
SuspendStream[noRouteToDestination];
whyFailed ← why; -- inform the creator if there is one
END; -- FailConnection
check for failures (this may be the first packet, so look for connection failures)
SELECT LOOPHOLE[b.status, XmitStatus] FROM
pending, goodCompletion, aborted, invalidDestAddr => NULL;
ok or not expected
noRouteToNetwork, hardwareProblem => FailConnection[noRouteToDestination];
noTranslationForDestination => FailConnection[noTranslationForDestination];
noAnswerOrBusy => FailConnection[noAnswerOrBusy];
circuitInUse => FailConnection[circuitInUse];
circuitNotReady => FailConnection[circuitNotReady];
noDialingHardware => FailConnection[noDialingHardware];
dialerHardwareProblem => FailConnection[noRouteToDestination];
ENDCASE => ERROR;
enqueue onto sentQueue since this is not a system packet
EnqueueSpp[sentQueue, b];
IF b.tries = 0 AND b.ois.sendAck THEN b.time ← ProcessorFace.GetClockPulses[];
record time if ackReq packet and first time put there (used for delay calculation)
END; -- EnqueueTransmittedPacketAppropriately
This procedure changes the state of the packet stream to be suspended.
SuspendStream: INTERNAL PROCEDURE [why: SuspendReason] =
BEGIN
stateBeforeSuspension ← state;
state ← suspended;
whySuspended ← why;
END; -- SuspendStream
This procedure changes the state of the packet stream to be suspended.
SuspendStreamLocked: ENTRY PROCEDURE [why: SuspendReason] =
BEGIN SuspendStream[why]; END; -- SuspendStreamLocked
This process retransmits packets that have not been acknowledged in a reasonable
time, and in addition generates probes etc. to test for the connection's livenesss.
The Retransmitter, Receiver and the transmission process all three try to keep the
sentBuffer and sentQueue ordered by sequence number.
Retransmitter: PROCEDURE =
BEGIN
now: LONG CARDINAL;
retransBuffer: BufferDefs.OisBuffer;
some procedures whose scope is just the Retransmitter
TimeToSendAProbe: ENTRY PROCEDURE RETURNS [goAhead: BOOLEAN] = INLINE
BEGIN
goAhead ← FALSE;
IF state = established OR state = open THEN
BEGIN
IF
((LOOPHOLE[nextOutputSeq - maxOutputSeq, INTEGER] > 0 AND
(now - lastProbePulse) > probeRetransmitPulses)) OR
(((now - lastPacketReceivedPulse) >= inactiveConnectionPulses AND
(now - lastProbePulse) >= inactiveConnectionPulses)) THEN
BEGIN
probe for allocation, or just an ack from other end, or for activity.
lastProbePulse ← now;
IF (probeCounter ← probeCounter + 1) > probesBeforeGiveUp THEN
SuspendStream[transmissionTimeout]
ELSE BEGIN goAhead ← TRUE; IF CommFlags.doStats THEN StatIncr[statProbesSent]; END;
END;
END;
END; -- TimeToSendAProbe
RetransmissionCount: ENTRY PROCEDURE RETURNS [CARDINAL] = INLINE
BEGIN
IF state = established OR state = open THEN
BEGIN
IF (now - ackRequestPulse) >= dataPacketRetransmitPulses THEN
RETURN[sentQueue.length + (IF sentBuffer = NIL THEN 0 ELSE 1)]
we must retransmit all packets currently on the sentQueue and sentBuffer.
a count is kept because on retransmission we put packets back on
sentQueue and otherwise we will be in an infinite loop.
ELSE RETURN[0]; -- not yet time to retransmit
END
ELSE RETURN[0]; -- any other state requires no retransmission.
END; -- RetransmissionCount
GetFromRetransmissionQueue: ENTRY PROCEDURE RETURNS [b: BufferDefs.OisBuffer] = INLINE
BEGIN
IF sentBuffer = NIL THEN b ← DequeueSpp[sentQueue]
ELSE BEGIN b ← sentBuffer; sentBuffer ← NIL; END;
IF b = NIL THEN RETURN;
SELECT b.tries FROM
>= retransmissionsBeforeGiveUp =>
BEGIN -- give up trying to send anything on this packet stream.
SuspendStream[transmissionTimeout];
sentBuffer ← b; -- put back at head of sentQueue
b ← NIL;
END;
= retransmissionsBeforeAskForAck => b.ois.sendAck ← TRUE;
need to set ack request
ENDCASE;
END; -- GetFromRetransmissionQueue
WaitForAWhile: ENTRY PROCEDURE = INLINE
BEGIN WAIT retransmitterWakeupTimer; END; -- WaitForAWhile
UpdateRetransmitTime: PROCEDURE = INLINE
BEGIN
avgDelay: Pulses;
weightNewAbove: CARDINAL;
IF ((now - lastDelayCalculationPulse) >= delayCalculationPulses) THEN
BEGIN -- time to do retransmission timeout determination
IF delayCount > 0 THEN
BEGIN
we have had some packets with lossless delay (no retransmissions)
avgDelay ← [delaySum/delayCount];
normalRetransUpdates ← normalRetransUpdates + 1; -- **temp
new retransmit timeout is a function of the old retransmit timeout and the
newly measured delay. If the delay is less than about 500 msecs, the standard
deviation of the measured delay is large compared to transmission time;
otherwise transmission time dominates. So for fast mediums, we multiply the
delay by a large factor. For slow mediums, we go with the measured delay.
The following lines are in flux.
weightNewAbove ← IF avgDelay > fiveHundredMsecsOfPulses THEN 1 ELSE wNewAbove;
dataPacketRetransmitPulses ←
[(dataPacketRetransmitPulses / wOldBelow)* wOldAbove +
(avgDelay / wNewBelow)* weightNewAbove];
dataPacketRetransmitPulses ←
[(dataPacketRetransmitPulses + wNewAbove*avgDelay)/2];
delaySum ← [0];
delayCount ← 0;
END
ELSE
BEGIN
if data flowed, all had retranmissions and we should increase timeout
IF unackedOutputSeq > seqNumWhenDelayCalculated THEN
BEGIN
doubleRetransUpdates ← doubleRetransUpdates + 1; -- **temp
dataPacketRetransmitPulses ← [dataPacketRetransmitPulses*2];
END;
END;
seqNumWhenDelayCalculated ← unackedOutputSeq;
dataPacketRetransmitPulses ← [MIN[
dataPacketRetransmitPulses, maxDataPacketRetransmitPulses]];
probeRetransmitPulses ←
probeMultiplier*dataPacketRetransmitPulses + fortyMsecsOfPulses;
Process.SetTimeout[
@retransmitterWakeupTimer, PulsesToTicks[[dataPacketRetransmitPulses/4]]];
set new retransmitter wakeup
END;
END; -- UpdateRetransmitTime
main body of the procedure
UNTIL pleaseStop DO
checks the value of pleaseStop, race condition OK
now ← ProcessorFace.GetClockPulses[];
THROUGH (0..RetransmissionCount[]] DO
this is an upper bound
state may change for any reason, but we keep sending them out, unless we
ourselves changed the state to be suspended! The Receiver may take packets
off the sentQueue in parallel in which case we will get a NIL and therefore exit.
IF (retransBuffer ← GetFromRetransmissionQueue[]) = NIL THEN EXIT;
IF LOOPHOLE[retransBuffer.ois.sequenceNumber - unackedOutputSeq, INTEGER] < 0
THEN OISCP.ReturnFreeOisBuffer[retransBuffer]
packet has been ACKed
ELSE
BEGIN
its time to send this packet out again
retransBuffer.tries ← retransBuffer.tries + 1;
PutPacketOnSocketChannel[retransBuffer]; -- requeues buffer on sentQueue
retransBuffer ← NIL;
IF CommFlags.doStats THEN StatIncr[statDataPacketsRetransmitted];
END;
ENDLOOP;
IF TimeToSendAProbe[] THEN SendSystemPacket[TRUE];
WaitForAWhile[];
UpdateRetransmitTime[];
ENDLOOP;
END; -- Retransmitter
PulsesToTicks: PROCEDURE [pulses: Pulses] RETURNS [Process.Ticks] =
INLINE
BEGIN
lastCard: LONG CARDINAL = LAST[CARDINAL];
msecs: LONG CARDINAL
(((pulses+50)/100)*LONG[ProcessorFace.microsecondsPerHundredPulses]+500)/1000 + 1;
RETURN[Process.MsecToTicks[IF msecs < lastCard THEN msecs ELSE LAST[CARDINAL]]];
END;
This process waits at the socket channel for a packet to arrive.
This packet is in a buffer that belongs to this socket and should be returned to the
pool associated with the socket channel.
Receiver: PROCEDURE =
BEGIN
b: BufferDefs.OisBuffer;
UNTIL pleaseStop DO
race condition not harmful as we account for it
now wait for something to arrive.
b ← Socket.GetPacket[
socketCH ! Socket.TimeOut => RETRY; Socket.ChannelAborted => EXIT];
pleaseStop got set by someone else
SELECT LOOPHOLE[b.status, XmitStatus] FROM
goodCompletion =>
SELECT b.ois.transCntlAndPktTp.packetType FROM
sequencedPacket => GotSequencedPacket[LOOPHOLE[b, BufferDefs.OisBuffer]];
error => GotErrorPacket[b];
ENDCASE =>
BEGIN
discard, don't want any other protocol type
OISCP.ReturnFreeOisBuffer[b];
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType];
END;
ENDCASE => OISCP.ReturnFreeOisBuffer[b];
ENDLOOP;
END; -- Receiver
This procedure examines the sequenced packet that has just arrived.
GotSequencedPacket: PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
returnAnAck: BOOLEANFALSE;
we do not return an ack unless we are required to
giveBackToSocket: BOOLEANTRUE; -- unless we put on some other queue
nowPulse: LONG CARDINAL; -- set if we get a packet for our connection
GotSequencedPacketLocked: ENTRY PROCEDURE = INLINE
BEGIN
SELECT state FROM
unestablished, suspended, terminating => NULL; -- not interested.
activeEstablish, waitEstablish =>
[returnAnAck, giveBackToSocket] ← EstablishThisConnection[b];
established, open =>
BEGIN
check that packet is from right remote address and connection ID
IF b.ois.source.host = remoteAddr.host
AND b.ois.source.socket = remoteAddr.socket AND b.ois.sourceConnectionID =
remoteConnectionID AND b.ois.destinationConnectionID = localConnectionID
THEN
BEGIN -- the packet is for this connection
probeCounter ← 0; -- other end is alive
lastPacketReceivedPulse ← nowPulse ← ProcessorFace.GetClockPulses[];
for inactivity et al
SELECT LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] FROM
just the packet we wanted, or packet is early
IN [0..duplicateWindow] => RightOrEarlyPacket[];
old duplicate packet
IN [-duplicateWindow..0) => DuplicatePacket[];
very old duplicate packet
ENDCASE => IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedVeryLate]
END
ELSE
packet is really not for this connection (or other end not established)
BEGIN
IF CommFlags.doStats AND
(b.ois.source.host # remoteAddr.host OR b.ois.source.socket #
remoteAddr.socket) THEN StatIncr[statPacketsRejectedBadSource];
IF b.ois.sourceConnectionID # remoteConnectionID OR
b.ois.destinationConnectionID # localConnectionID THEN
BEGIN
IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
returnAnAck ← TRUE; -- probably our connection response got lost
END;
END;
END;
ENDCASE;
delayed acking has been abandoned; only allocate is dependent on client
this is delicate and is done to avoid getting stuck in a small window
IF returnAnAck AND BufferDefs.QueueLength[inOrderQueue] # emptyInOrderQueue
THEN
BEGIN
returnAnAck ← FALSE; we don't have to send back an ack
sendAnAck ← TRUE; make another process send back an ack
END;
This wait lets the user process run and suck up all the packets on the
inOrderInputQueue so that we send back a full allocate.
IF LOOPHOLE[maxInputSeq - nextInputSeq, INTEGER] < minRcvAllocation THEN
WAIT letClientRun;
END; -- GotSequencedPacketLocked
This procedure processes both the next expected packet as well as early packets.
Both kinds of packets are processed similarly as far as acks and allocation,
and attentions are concerned.
RightOrEarlyPacket: INTERNAL PROCEDURE = INLINE
BEGIN
IF b.ois.sendAck THEN
BEGIN
returnAnAck ← TRUE;
IF CommFlags.doStats THEN StatIncr[statAckRequestsReceived];
END;
update allocation and ack fields of packetstream.
IF LOOPHOLE[b.ois.allocationNumber - maxOutputSeq, INTEGER] > 0 THEN
BEGIN maxOutputSeq ← b.ois.allocationNumber; NOTIFY newAllocation; END;
IF LOOPHOLE[b.ois.acknowledgeNumber - unackedOutputSeq, INTEGER] > 0 THEN
BEGIN
unackedOutputSeq ← b.ois.acknowledgeNumber;
ackRequestPulse ← nowPulse; -- if we get an ack we move the ack req. time
END;
now remove acked packets, if any, from sentQueue. We assume that
the sentQueue is kept ordered by increasing sequence number.
IF sentBuffer = NIL THEN sentBuffer ← DequeueSpp[sentQueue];
UNTIL sentBuffer = NIL OR
LOOPHOLE[sentBuffer.ois.sequenceNumber - unackedOutputSeq, INTEGER] >= 0 DO
IF sentBuffer.ois.sendAck AND sentBuffer.tries = 0 THEN
BEGIN -- constitutes a measure of lossless delay (no retransmissions)
delaySum ← [delaySum + (nowPulse - sentBuffer.time)];
add time spent on queue to delay stats
delayCount ← delayCount + 1;
END;
OISCP.ReturnFreeOisBuffer[sentBuffer];
sentBuffer ← DequeueSpp[sentQueue];
ENDLOOP;
so far we have only updated state information, now to dispense with the packet
IF b.ois.systemPacket THEN
BEGIN
IF CommFlags.doStats THEN
BEGIN
StatIncr[statSystemPacketsReceived];
StatIncr[statAcksReceived];
END;
END
ELSE
this is not a system packet process it intelligently
BEGIN
if the attention bit is set then we try to process the attention, and only
if we were able to do so do we put the packet on the inOrderQueue.
IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN
BEGIN
only now must we decide if it is in order or out of order
IF LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] = 0 THEN
just the packet we wanted
BEGIN
state ← open;
DO
nextInputSeq ← nextInputSeq + 1;
IF attnCount # 0 THEN UpdateAttnSeqTable[];
EnqueueSpp[inOrderQueue, b];
giveBackToSocket ← FALSE;
NOTIFY inOrderQueueNotEmpty;
examine the OutOfOrderQueue to see if anything can be taken off
currently there is never anything on the OutOfOrderQueue.
DO
IF BufferDefs.QueueLength[outOfOrderQueue] = 0 THEN
BEGIN b ← NIL; EXIT; END;
b ← DequeueSpp[outOfOrderQueue];
SELECT LOOPHOLE[b.ois.sequenceNumber - nextInputSeq, INTEGER] FROM
0 => EXIT;
this is the next in order packet, pick it up in outer loop
IN (0..duplicateWindow] => EnqueueSpp[tempQueue, b];
still out of order
ENDCASE => OISCP.ReturnFreeOisBuffer[b];
old duplicate, throw away
ENDLOOP;
put buffers that are on tempQueue back onto outOfOrderQueue
UNTIL (tempQueue.first) = NIL DO
EnqueueSpp[outOfOrderQueue, DequeueSpp[tempQueue]]; ENDLOOP;
IF b = NIL THEN EXIT;
else, loop to process packet from outOfOrderQueue that is now next
ENDLOOP;
END
ELSE IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedEarly];
early packet
END;
END;
END; -- RightOrEarlyPacket
DuplicatePacket: INTERNAL PROCEDURE = INLINE
BEGIN
IF CommFlags.doStats THEN StatIncr[statDataPacketsReceivedAgain];
send acks only if the other end asked
IF b.ois.sendAck THEN returnAnAck ← TRUE;
END; -- DuplicatePacket
main body of this sprocedure
IF b.ois.pktLength >= bytesPerSequencedPktHeader THEN
BEGIN
GotSequencedPacketLocked[];
IF returnAnAck THEN SendSystemPacket[FALSE]; -- we may have to send the ack
END
ELSE IF CommFlags.doStats THEN StatIncr[statEmptyFunnys];
IF giveBackToSocket THEN OISCP.ReturnFreeOisBuffer[b];
END; -- GotSequencedPacket
This procedure attempts to processes the attention packet, and returns whether it
was successful or not. Success implies that an entry for the attn packet already
exists in the attnSeqTable, or has just been made. This packet is now a candidate
for the inOrderQueue or the outOfOrderQueue. If an entry can not be made because
of space restrictions we pretend we never ever saw this packet and let it be
retransmitted from the source. The attnSeqTable is reasonablly big, and so this
decision should not cause adverse effects. This procedure is only called if the packet
was within the accept window.
AttentionPacketProcessed: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer] RETURNS [BOOLEAN] =
BEGIN
i: CARDINAL ← 1;
alreadyThere: BOOLEANFALSE;
advanceB: BufferDefs.OisBuffer;
nBytes: CARDINAL;
IF attnCount = maxPendingAttns THEN RETURN[FALSE]; -- no space
insert the b.ois.sequenceNumber in the attnSeqTable if not already there
IF attnCount # 0 -- see if an entry is already there
THEN
FOR i IN (0..attnCount] DO
IF attnSeqTable[i] = b.ois.sequenceNumber THEN
BEGIN alreadyThere ← TRUE; EXIT; END;
ENDLOOP;
IF NOT alreadyThere THEN -- because attnCount=0 or because really not there!
BEGIN
IF (advanceB ← MaybeGetFreeReceiveOisBufferFromPool[pool]) = NIL THEN
RETURN[FALSE]; -- cop out
attnSeqTable[i] ← b.ois.sequenceNumber;
attnCount ← attnCount + 1;
make a copy and then enqueue it
nBytes ← PrincOpsUtils.ByteBlt[to: [@advanceB.ois, 0, b.ois.pktLength],
from: [@b.ois, 0, b.ois.pktLength]];
EnqueueSpp[inAttnQueue, advanceB];
NOTIFY newInputAttn;
END;
IF CommFlags.doStats THEN StatIncr[statAttentionsReceived];
RETURN[TRUE];
END; -- AttentionPacketProcessed
This procedure updates the attnSeqTable when the nextInputSeq is updated.
That is to say, we are removing any entries in the attnSeqTable that we plan
to ack, the next time we send out a packet.
UpdateAttnSeqTable: INTERNAL PROCEDURE =
BEGIN
i: CARDINAL ← 1;
DO
IF i > attnCount THEN EXIT;
IF LOOPHOLE[(attnSeqTable[i] - nextInputSeq), INTEGER] < 0 THEN
BEGIN
attnSeqTable[i] ← attnSeqTable[attnCount];
attnSeqTable[attnCount] ← 0;
attnCount ← attnCount - 1;
END
ELSE i ← i + 1;
ENDLOOP;
END; -- UpdateAttnSeqTable
This procedure examines an error protocol packet.
GotErrorPacket: ENTRY PROCEDURE [b: BufferDefs.OisBuffer] =
BEGIN
ENABLE UNWIND => NULL;
IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived];
SELECT b.ois.errorType FROM
noSocketOisErrorCode =>
SELECT state FROM
unestablished, activeEstablish, waitEstablish =>
{whyFailed ← noServiceAtDestination;
SuspendStream[noRouteToDestination]};
established, open => SuspendStream[remoteServiceDisappeared];
ENDCASE => NULL;
ENDCASE => NULL;
OISCP.ReturnFreeOisBuffer[b];
END; -- GotErrorPacket
Cool Procedures
This procedure sets the wait time.
SetWaitTime: ENTRY PROCEDURE [time: OISCPTypes.WaitTime] =
BEGIN waitPulses ← MilliSecondsToPulses[time]; END; -- SetWaitTime
This procedure returns the local and remote address of this packet stream.
FindAddresses: ENTRY PROCEDURE RETURNS [local, remote: NetworkAddress] =
BEGIN local ← localAddr; remote ← remoteAddr; END; -- FindAddresses
This procedure returns the number of data bytes that can fit into an sppBuffer.
GetSenderSizeLimit: ENTRY PROCEDURE RETURNS [CARDINAL] =
BEGIN RETURN[maxOisPktLength - bytesPerSequencedPktHeader]; END;
GetSenderSizeLimit
This procedure atempts to establish the local end of the connection
to the incoming packet.
EstablishThisConnection: INTERNAL PROCEDURE [b: BufferDefs.OisBuffer]
RETURNS [returnAnAck, giveBackToSocket: BOOLEAN] =
BEGIN
returnAnAck ← FALSE;
giveBackToSocket ← TRUE;
If state=activeEstablish then master/slave response when sockets are completely
specified. If state=waitEstablish then other end initiates and knows all about us.
IF b.ois.source.host = remoteAddr.host AND
b.ois.source.socket = remoteAddr.socket AND b.ois.destinationConnectionID =
localConnectionID AND b.ois.sourceConnectionID # unknownConnID AND
b.ois.sequenceNumber = 0 THEN
BEGIN
remoteConnectionID ← b.ois.sourceConnectionID;
maxOutputSeq ← b.ois.allocationNumber;
state ← established;
IF b.ois.sendAck THEN returnAnAck ← TRUE;
IF NOT b.ois.systemPacket THEN
BEGIN
if the attention bit is set then we try to process the attention, and only
if we were able to do so do we put the packet on the inOrderQueue.
IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN
BEGIN
nextInputSeq ← nextInputSeq + 1;
EnqueueSpp[inOrderQueue, b];
giveBackToSocket ← FALSE;
IF attnCount # 0 THEN UpdateAttnSeqTable[];
remove knowledge of old attns
state ← open;
NOTIFY inOrderQueueNotEmpty;
END;
END
ELSE IF CommFlags.doStats THEN StatIncr[statSystemPacketsReceived];
NOTIFY connectionIsEstablished;
END
If state=activeEstablish then simultaneous establishment when sockets are
completely specified. If state=waitEstablish then other end initiates and knows
all about us except our connection ID.
ELSE
IF b.ois.source.host = remoteAddr.host AND
b.ois.source.socket = remoteAddr.socket AND b.ois.destinationConnectionID =
unknownConnID AND b.ois.sourceConnectionID # unknownConnID AND
b.ois.sequenceNumber = 0 THEN
BEGIN
remoteConnectionID ← b.ois.sourceConnectionID;
maxOutputSeq ← b.ois.allocationNumber;
IF state = waitEstablish THEN returnAnAck ← TRUE;
state ← established;
IF b.ois.sendAck THEN returnAnAck ← TRUE;
IF CommFlags.doStats AND b.ois.systemPacket THEN StatIncr[statSystemPacketsReceived];
NOTIFY connectionIsEstablished;
END
If state=activeEstablish then response from well known socket at which
there is a server process
ELSE
IF state = activeEstablish AND b.ois.source.host = remoteAddr.host
AND b.ois.source.socket # remoteAddr.socket
AND b.ois.destinationConnectionID = localConnectionID AND
b.ois.sourceConnectionID # unknownConnID AND b.ois.sequenceNumber = 0 THEN
BEGIN
remoteAddr.socket ← b.ois.source.socket;
remoteConnectionID ← b.ois.sourceConnectionID;
maxOutputSeq ← b.ois.allocationNumber;
state ← established;
IF b.ois.sendAck THEN returnAnAck ← TRUE;
IF NOT b.ois.systemPacket THEN
BEGIN
if the attention bit is set then we try to process the attention, and only
if we were able to do so do we put the packet on the inOrderQueue.
IF ~b.ois.attention OR (b.ois.attention AND AttentionPacketProcessed[b]) THEN
BEGIN
nextInputSeq ← nextInputSeq + 1;
EnqueueSpp[inOrderQueue, b];
giveBackToSocket ← FALSE;
IF attnCount # 0 THEN UpdateAttnSeqTable[];
remove knowledge of old attns
state ← open;
NOTIFY inOrderQueueNotEmpty;
END;
END
ELSE IF CommFlags.doStats THEN StatIncr[statSystemPacketsReceived];
NOTIFY connectionIsEstablished;
END
mismatched IDs
ELSE IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
IF returnAnAck AND CommFlags.doStats THEN StatIncr[statAckRequestsReceived];
END; -- EstablishThisConnection
This procedure causes the active establishement of the connection. A system packet
is transmitted to the remote end, and then this procedure casues the caller to hang
for a certain maximum time in which it is hoped that the connection has become
established owing to the receipt of an appropriate packet from the remote end.
ActivelyEstablish: PROCEDURE =
BEGIN
currentPulse, startPulse: LONG CARDINAL;
WaitUntilActivelyEstablished: ENTRY PROCEDURE RETURNS [BOOLEAN] = INLINE
BEGIN
ENABLE UNWIND => NULL;
WAIT connectionIsEstablished;
IF state = established OR state = open THEN RETURN[TRUE]
ELSE
BEGIN
IF state = suspended THEN RETURN WITH ERROR ConnectionFailed[whyFailed];
currentPulse ← ProcessorFace.GetClockPulses[];
IF (currentPulse - startPulse) >= waitPulses THEN
BEGIN SIGNAL ConnectionFailed[timeout]; startPulse ← ProcessorFace.GetClockPulses[]; END;
RETURN[FALSE];
END;
END; -- WaitUntilActivelyEstablished
startPulse ← ProcessorFace.GetClockPulses[];
DO
SendSystemPacket[TRUE];
waits for 2 seconds to see if the connection is established
IF WaitUntilActivelyEstablished[] -- this returns or raises a SIGNAL -- THEN
RETURN;
ENDLOOP;
END; -- ActivelyEstablish
This procedure waits for the remote end to initiate connection establishement.
WaitUntilEstablished: ENTRY PROCEDURE =
BEGIN
ENABLE UNWIND => NULL;
currentPulse, startPulse: LONG CARDINAL;
startPulse ← ProcessorFace.GetClockPulses[];
DO
waits for 2 seconds each time around
WAIT connectionIsEstablished;
IF state = established OR state = open THEN RETURN
ELSE
BEGIN
currentPulse ← ProcessorFace.GetClockPulses[];
IF (currentPulse - startPulse) >= waitPulses THEN
BEGIN SIGNAL ConnectionFailed[timeout]; startPulse ← ProcessorFace.GetClockPulses[]; END;
END;
ENDLOOP;
END; -- WaitUntilEstablished
This procedure destroys the packet stream. The procedure changes the state of
the packet stream, NOTIFYs the Retransmitter and Receiver to self destruct,
and then waits until this happens. When this condition is satisfied, the socket
channel is deleted and, the packet stream data structures cleaned up.
Delete: PUBLIC PROCEDURE =
BEGIN
DeleteActivate[];
Socket.Abort[socketCH];
JOIN retransmitterFork;
JOIN receiverFork;
DeleteCleanup[];
Socket.Delete[socketCH];
LoadState.SelfDestruct[];
END; -- Delete
DeleteActivate: ENTRY PROCEDURE = INLINE
BEGIN
state ← terminating;
pleaseStop ← TRUE;
NOTIFY retransmitterWakeupTimer;
END; -- DeleteActivate
DeleteCleanup: ENTRY PROCEDURE = INLINE
BEGIN OPEN SocketInternal;
ENABLE UNWIND => NULL;
IF sentBuffer # NIL THEN OISCP.ReturnFreeOisBuffer[sentBuffer];
BufferDefs.QueueCleanup[inOrderQueue];
BufferDefs.QueueCleanup[inAttnQueue];
BufferDefs.QueueCleanup[outOfOrderQueue];
BufferDefs.QueueCleanup[tempQueue];
WHILE sentQueue.length<ChannelHandleToSocketHandle[socketCH].pool.sendInUse DO
waiting for asynchronous puts to complete and be requeue appropriately
WAIT letClientRun; -- wait on any convenient variable - ABORTs must be DISABLED!
ENDLOOP;
BufferDefs.QueueCleanup[sentQueue];
END; -- DeleteCleanup
MilliSecondsToPulses: PROCEDURE [ms: LONG CARDINAL]
RETURNS [pulses: Pulses] =
BEGIN
we must be carefull about multiplication overflow since milliSeconds must be
converted to microSeconds
micro: LONG CARDINAL = MIN[LAST[LONG CARDINAL]/1000,ms] * 1000;
hundreds: LONG CARDINAL = micro / ProcessorFace.microsecondsPerHundredPulses;
pulses ← [MIN[LAST[LONG CARDINAL]/100,hundreds]*100];
END; -- end MilliSecondsToPulses
initialization (Cool)
initialize connection control parameters
This initializes the necessary data structures for a packet stream instance.
It fills in all the necessary parameters and creates two processes that look
after the well being of this packet stream. One is a Retransmitter, that periodically
checks the sentQueue to see if there is anything unacknowledged, and the other
is the Receiver, which sees if there are any incoming packets on the
corresponding socket.
state ← unestablished;
whySuspended ← notSuspended;
stateBeforeSuspension ← unestablished;
Process.SetTimeout[@connectionIsEstablished, Process.MsecToTicks[2000]];
nextInputSeq ← unackedOutputSeq ← nextOutputSeq ← 0;
maxOutputSeq ← nextOutputSeq + defaultSendAllocation - 1;
SELECT classOfService FROM
bulk =>
{socketCH ← Socket.Create[localAddr, bulkSendBufs, bulkReceiveBufs, 0, TRUE];
maxInputSeq ← bulkInitialAllocation};
transactional =>
{socketCH ← Socket.Create[localAddr, transactSendBufs, transactReceiveBufs, 0,
TRUE];
maxInputSeq ← transactInitialAllocation};
ENDCASE => ERROR;
pool ← ps.pool ← SocketInternal.GetBufferPool[
SocketInternal.ChannelHandleToSocketHandle[socketCH]];
localAddr ← Socket.GetStatus[socketCH].localAddr;
IF remoteConnectionID # unknownConnID THEN state ← established;
Process.SetTimeout[@newAllocation, Process.MsecToTicks[1000]];
CommUtilDefs.EnableAborts[@newAllocation];
sendAnAck ← FALSE;
maxOisPktLength ← OISCPTypes.maxBytesPerPkt;
Process.SetTimeout[@newInputAttn, Process.MsecToTicks[1000]];
CommUtilDefs.EnableAborts[@newInputAttn];
FOR attnCount IN (0..maxPendingAttns] DO attnSeqTable[attnCount] ← 0; ENDLOOP;
attnCount ← 0;
sentBuffer ← NIL;
BufferDefs.QueueInitialize[inOrderQueue];
BufferDefs.QueueInitialize[inAttnQueue];
BufferDefs.QueueInitialize[outOfOrderQueue];
BufferDefs.QueueInitialize[tempQueue];
BufferDefs.QueueInitialize[sentQueue];
Process.SetTimeout[@inOrderQueueNotEmpty, Process.MsecToTicks[1000]];
CommUtilDefs.EnableAborts[@inOrderQueueNotEmpty];
ackRequestPulse ← lastProbePulse ← lastDelayCalculationPulse ←
lastPacketReceivedPulse ← ProcessorFace.GetClockPulses[];
probeCounter ← 0;
dataPacketRetransmitPulses ← MilliSecondsToPulses[
initialDataPacketRetransmitTime]; -- msecs converted to Pulses
probeRetransmitPulses ← MilliSecondsToPulses[
probeMultiplier*initialDataPacketRetransmitTime + 40];
msecs converted to Pulses
delaySum ← [0];
delayCount ← 0;
normalRetransUpdates ← doubleRetransUpdates ← 0; -- **temp
waitPulses ← MilliSecondsToPulses[timeout]; -- msecs converted to Pulses
pleaseStop ← FALSE;
Process.SetTimeout[@letClientRun, Process.MsecToTicks[250]]; -- this is also used in DeleteCleanup - do not ENABLE ABORTs
Process.SetTimeout[
@retransmitterWakeupTimer, Process.MsecToTicks[initialRetransmitterTime]];
This baroque code is to ensure that the state is not looked at by any of the other
processes until this initialization code has finished examining this variable without
entering the monitor.
IF state = unestablished THEN
BEGIN
IF establish THEN
BEGIN
state ← activeEstablish;
retransmitterFork ← FORK Retransmitter[];
receiverFork ← FORK Receiver[];
Process.Yield[];
ActivelyEstablish[];
END
ELSE
BEGIN
state ← waitEstablish;
retransmitterFork ← FORK Retransmitter[];
receiverFork ← FORK Receiver[];
Process.Yield[];
WaitUntilEstablished[];
END;
END
ELSE
BEGIN
SendSystemPacket[FALSE]; -- gratuitous ack in case we are a server agent now.
Retransmissions of this gratuitous packet will ocurr in the nature of probes.
retransmitterFork ← FORK Retransmitter[];
receiverFork ← FORK Receiver[];
Process.Yield[];
END;
waitPulses ← MilliSecondsToPulses[PacketStream.infiniteWaitTime]; -- Gets, WaitAttentions default to very long
RETURN[@ps, remoteAddr, remoteConnectionID];
END. -- of PacketStreamInstance module
LOG
Time: January 26, 1981 10:03 AM By: Garlick Action: 1.) added classOfService parameter to reduce buffers in interactive case 2.) added interpretation of Error Protocol packets 3.) increased inactivity timeouts for internet cases 4.) initial data operation timeout set to infinite 5.) acks no longer stimulated by Gets, rather in receiver 6.) send an alocation packet if just opening the window 7.) reinstated slow media adaptive algorithm 8.) enabled Process.Aborting for Gets, Puts, WaitAttn.(635)\23386v14V