// PupBSPProt.bcpl -- Byte Stream Protocol // companion files are PupBSPStreams.bcpl and PupBSPa.asm // Copyright Xerox Corporation 1979, 1981 // Last modified November 21, 1981 10:49 AM by Taft get "Pup.decl" get "PupRTPInternal.decl" external [ //outgoing procedures BSPPupProc; BSPTimerProc; SendAck; TransmitStrategy; SetTimeout; SearchTQ //incoming procedures BSPHandleUncommonPup; RTPFilter; RTPFSM ReleasePBI; GetPBI; CompletePup; SetPupID Zero; MultEq; DoubleDifference; DoubleIncrement SetTimer; TimerHasExpired Enqueue; Dequeue; Unqueue; InsertAfter; FlushQueue EnableInterrupts; DisableInterrupts; Min; Max //statics maxPupDataBytes; overlapDataWithAck ] // If overlapDataWithAck is true, TransmitStrategy will generate // an AData before allocation is completely exhausted, thereby permitting // continued sending during the AData/Ack round-trip time. // This improves performance when the round-trip time is large. // Unfortunately, it has disasterous effects on performance in certain // situations because of lost packets caused by Ethernet interface // turnaround latency in Altos and Novas. We therefore do not enable // this feature at the present time. static [ overlapDataWithAck = false ] //---------------------------------------------------------------------------- let BSPPupProc(pbi) be //---------------------------------------------------------------------------- // Incoming Pups that are not RTP come here. pbi is always disposed of, // either here or by higher-level handler that we call. [ let soc = pbi>>PBI.socket // ignore pbi if socket's state is not Open, EndIn, or EndOut if soc>>BSPSoc.state ls stateOpen % soc>>BSPSoc.state gr stateEndOut then [ ReleasePBI(pbi); return ] switchon pbi>>PBI.pup.type into [ case typeAMark: case typeAData: soc>>BSPSoc.ackPending = true case typeData: case typeMark: [ // discard pbi unless all the following conditions are met: // from correct source port? unless RTPFilter(pbi, true, false) & // socket allocation sufficient? soc>>BSPSoc.numTPBI ne 0 & soc>>BSPSoc.numIPBI ne 0 & // pbi non-empty? pbi>>PBI.pup.length gr pupOvBytes & // all of pbi's bytes in allocation window? DoubleDifference(lv pbi>>PBI.pup.id, lv soc>>BSPSoc.rcvByteID) + pbi>>PBI.pup.length - (pupOvBytes+1) uls (soc>>BSPSoc.maxIPBI-1)*maxPupDataBytes & // non-duplicate? InsertInSequence(lv soc>>BSPSoc.bspIQ, pbi) do [ ReleasePBI(pbi); endcase ] // discard while pbi ne 0 & MultEq(lv soc>>BSPSoc.rcvByteID, lv pbi>>PBI.pup.id) do [ // Filled in first hole, advance rcvByteID DoubleIncrement(lv soc>>BSPSoc.rcvByteID, pbi>>PBI.pup.length-pupOvBytes) soc>>BSPSoc.unReadPups = soc>>BSPSoc.unReadPups+1 pbi = pbi!0 ] SetTimer(lv soc>>BSPSoc.inactivityTimer, inactivityTimeout) endcase ] case typeAck: [ unless RTPFilter(pbi, true, false) & DoubleDifference(lv pbi>>PBI.pup.id, lv soc>>BSPSoc.lastAckID) ge 0 do [ ReleasePBI(pbi); endcase ] // wrong source or out of order if soc>>BSPSoc.ackRequested then [ // Update the adaptive timeout = 2 times the average round-trip // delay, exponentially aged over the last 8 samples. let t = nil; SetTimer(lv t, 0) compiletest alto ifso [ t = (t-soc>>BSPSoc.lastADataTime) lshift 3 +4 ] ifnot [ t = (t-soc>>BSPSoc.lastADataTime) lshift 1 +1 ] soc>>BSPSoc.aDataTimeout = (7*soc>>BSPSoc.aDataTimeout+Max(5, Min(1000, t))) rshift 3 ] soc>>BSPSoc.lastAckID^1 = pbi>>PBI.pup.id^1 soc>>BSPSoc.lastAckID^2 = pbi>>PBI.pup.id^2 // BSPPupProc (cont'd) // move all PBIs from bspTQ to a private queue let tQ = vec 1 DisableInterrupts() tQ!0 = soc>>BSPSoc.bspTQ.head; tQ!1 = soc>>BSPSoc.bspTQ.tail soc>>BSPSoc.bspTQ.head = 0 EnableInterrupts() // decide whether to release, keep, or retransmit each pbi let rTQ = vec 1; rTQ!0 = 0 // q of pbis to retransmit [ // repeat let npbi = Dequeue(tQ); if npbi eq 0 break if npbi>>PBI.pup.type eq typeInterrupt then [ CompletePup(npbi); loop ] // Retransmit interrupts test DoubleDifference(lv soc>>BSPSoc.lastAckID, lv npbi>>PBI.pup.id) ge npbi>>PBI.pup.length-pupOvBytes ifso [ // This pbi has been acked, release it soc>>BSPSoc.unAckedPups = soc>>BSPSoc.unAckedPups-1 soc>>BSPSoc.unAckedBytes = soc>>BSPSoc.unAckedBytes- (npbi>>PBI.pup.length-pupOvBytes) ReleasePBI(npbi) soc>>BSPSoc.pupAllocCount = soc>>BSPSoc.pupAllocCount+1 ] ifnot [ // pbi not yet acked, decide whether to retransmit test soc>>BSPSoc.aDataCount-npbi>>PBI.timer gr 0 & soc>>BSPSoc.ackRequested ifso // Yes, retain for retransmission [ TransmitStrategy(npbi, false) InsertInSequence(rTQ, npbi) // can't be duplicate ] ifnot // No, just put back on bspTQ Enqueue(lv soc>>BSPSoc.bspTQ, npbi) ] ] repeat // Update allocations soc>>BSPSoc.ackRequested = false if soc>>BSPSoc.pupAllocCount ge pupAllocHysteresis then [ // Time to attempt to increase maxPupAlloc soc>>BSPSoc.pupAllocCount = 0 soc>>BSPSoc.maxPupAlloc = Min(soc>>BSPSoc.maxPupAlloc+1, soc>>BSPSoc.maxOPBI) ] soc>>BSPSoc.bytesPerPup = Min(pbi>>PBI.pup.words^1, maxPupDataBytes) soc>>BSPSoc.pupAlloc = Min(soc>>BSPSoc.maxPupAlloc, pbi>>PBI.pup.words^2 - soc>>BSPSoc.unAckedPups) soc>>BSPSoc.byteAlloc = pbi>>PBI.pup.words^3 - soc>>BSPSoc.unAckedBytes ReleasePBI(pbi) // if we have any PBIs to retransmit, do so now if rTQ!0 ne 0 then [ TransmitStrategy(rTQ!1) // Maybe turn tail into AData while rTQ!0 ne 0 do CompletePup(Dequeue(rTQ)) ] SetTimeout(soc) SetTimer(lv soc>>BSPSoc.inactivityTimer, inactivityTimeout) endcase ] // BSPPupProc (cont'd) case typeInterrupt: case typeInterruptReply: case typeEnd: BSPHandleUncommonPup(soc, pbi) // it always disposes of pbi endcase case typeError: if pbi>>PBI.pup.words^11 eq 3 % // port IQ overflow pbi>>PBI.pup.words^11 eq #1007 then // gateway OQ overflow [ // Got a congestion error. Attempt to throttle back output // by decreasing the maximum outgoing Pup allocation. soc>>BSPSoc.maxPupAlloc = Max(soc>>BSPSoc.maxPupAlloc-1, 1) soc>>BSPSoc.pupAllocCount = 0 ] // let higher-level handler look at it also default: // non-BSP Pups get passed to higher-level handler, which always // disposes of pbi (soc>>BSPSoc.bspOtherPupProc)(pbi) ] if soc>>BSPSoc.ackPending then SendAck(soc) ] //---------------------------------------------------------------------------- and InsertInSequence(queue, pbi) = valof //---------------------------------------------------------------------------- // Inserts pbi into the proper place in queue to maintain ascending order of // Pup IDs. Queue must already be in order. // Returns true normally; false if the pbi's ID duplicates one already in // the queue. [ // Test first for the most common case of the new pbi going on the end of // the queue. If it doesn't belong there, then initiate a search from the // front of the queue for the right place to insert it. test queue!0 eq 0 % DoubleDifference(lv pbi>>PBI.pup.id, lv (queue!1)>>PBI.pup.id) gr 0 ifso Enqueue(queue, pbi) ifnot [ // Search queue for right place to insert new pbi let pred = queue let d = nil // assert: the queue is non-empty and pbi's ID <= the tail item's ID. // Therefore this loop is guaranteed to terminate. [ // repeat d = DoubleDifference(lv pbi>>PBI.pup.id, lv (pred!0)>>PBI.pup.id) if d le 0 then break pred = pred!0 ] repeat if d eq 0 then resultis false // duplicate InsertAfter(queue, pred, pbi) ] resultis true ] //---------------------------------------------------------------------------- and BSPTimerProc(soc) be //---------------------------------------------------------------------------- // Called whenever the socket bspTimer expires [ // do nothing if socket's state is not Open, EndIn, or EndOut unless soc>>BSPSoc.state ls stateOpen % soc>>BSPSoc.state gr stateEndOut do [ unless soc>>BSPSoc.noInactivityTimeout do if TimerHasExpired(lv soc>>BSPSoc.inactivityTimer) then [ RTPFSM(soc, CLST, 0); return ] // Connection has died, abort it if soc>>BSPSoc.ackPending then SendAck(soc) if soc>>BSPSoc.interruptOut then [ let pbi = SearchTQ(soc, true) // Search for outstanding interrupt if pbi ne 0 then CompletePup(pbi) // Retransmit if found one ] // Generate AData probe unconditionally. If we have no data to move, // probe once every 15 seconds to ensure that the connection is alive. let pbi = GetPBI(soc, true) test pbi ne 0 ifso [ // Probe with zero-length AData if can get pbi unless soc>>BSPSoc.ackRequested do SetTimer(lv soc>>BSPSoc.lastADataTime, 0) soc>>BSPSoc.ackRequested = true soc>>BSPSoc.aDataCount = soc>>BSPSoc.aDataCount+1 SetPupID(pbi, lv soc>>BSPSoc.xmitByteID) CompletePup(pbi, typeAData, pupOvBytes) ] ifnot [ // Probe with a pbi on the TQ otherwise pbi = SearchTQ(soc, false) // Look for a non-interrupt if pbi ne 0 then [ TransmitStrategy(pbi, true); CompletePup(pbi) ] ] ] SetTimeout(soc) ] //---------------------------------------------------------------------------- and SendAck(soc) be //---------------------------------------------------------------------------- [ let pbi = GetPBI(soc, true); if pbi ne 0 then [ soc>>BSPSoc.ackPending = false SetPupID(pbi, lv soc>>BSPSoc.rcvByteID) pbi>>PBI.pup.words^1 = maxPupDataBytes // Bytes per pup pbi>>PBI.pup.words^2 = // Pup allocation soc>>BSPSoc.maxIPBI-1-soc>>BSPSoc.unReadPups pbi>>PBI.pup.words^3 = // Byte allocation maxPupDataBytes*pbi>>PBI.pup.words^2 soc>>BSPSoc.sentZeroAlloc = pbi>>PBI.pup.words^2 eq 0 CompletePup(pbi, typeAck, pupOvBytes+6) // Flush received data packets we aren't acknowledging, since // we know they will be retransmitted and we don't want the // retransmissions to overflow our socket IQ if we are slow // in processing them. while soc>>BSPSoc.bspIQ.head ne 0 do [ pbi = soc>>BSPSoc.bspIQ.tail if DoubleDifference(lv pbi>>PBI.pup.id, lv soc>>BSPSoc.rcvByteID) ls 0 then break Unqueue(lv soc>>BSPSoc.bspIQ, pbi) ReleasePBI(pbi) ] ] ] //---------------------------------------------------------------------------- and SearchTQ(soc, lookForInterrupt) = valof //---------------------------------------------------------------------------- // Unqueues and returns the last pbi on the BSP TQ, looking for // an Interrupt if lookForInterrupt is true, anything else if false. // We return the last pbi (rather than the first) so that retransmitting it // leaves the order of pbis on the TQ unchanged. Returns zero if none found. [ let rpbi = 0 let pbi = soc>>BSPSoc.bspTQ.head while pbi ne 0 do [ if (pbi>>PBI.pup.type eq typeInterrupt) eqv lookForInterrupt then [ rpbi = pbi; break ] pbi = pbi!0 ] if rpbi ne 0 then Unqueue(lv soc>>BSPSoc.bspTQ, rpbi) resultis rpbi ] //---------------------------------------------------------------------------- and TransmitStrategy(pbi, makeA; numargs na) be //---------------------------------------------------------------------------- // Diddles type depending on makeA or allocation remaining. // Current algorithm is that we send AData when allocation // falls below 33% of that given in the last received Ack. // This may be expressed as: // pupAlloc le k * (pupAlloc + unAckedPups) // where k = 0.33 and it is assumed that pupAlloc+unAckedPups // gives the original Pup allocation. This is equivalent to // (1-k)/k * pupAlloc le unAckedPups. // For k = 0.33, (1-k)/k = 2. This is the derivation of the // comparison "pupAlloc lshift 1 le soc>>BSPSoc.unAckedPups" below. [ let soc = pbi>>PBI.socket unless na gr 1 & makeA do test soc>>BSPSoc.ackRequested ifso makeA = false ifnot [ let pupAlloc = Min(Min(Min(soc>>BSPSoc.pupAlloc, soc>>BSPSoc.byteAlloc/soc>>BSPSoc.bytesPerPup), soc>>BSPSoc.numOPBI-1), soc>>BSPSoc.numTPBI-1) makeA = (overlapDataWithAck? pupAlloc lshift 1 le soc>>BSPSoc.unAckedPups, pupAlloc le 0) ] if makeA then [ // Unless an AData is already outstanding, start measuring // the round-trip delay for computing the adaptive timeout. unless soc>>BSPSoc.ackRequested do SetTimer(lv soc>>BSPSoc.lastADataTime, 0) soc>>BSPSoc.ackRequested = true soc>>BSPSoc.aDataCount = soc>>BSPSoc.aDataCount+1 ] pbi>>PBI.pup.type = selecton pbi>>PBI.pup.type into [ case typeData: case typeAData: makeA? typeAData, typeData case typeMark: case typeAMark: makeA? typeAMark, typeMark ] ] //---------------------------------------------------------------------------- and SetTimeout(soc) be //---------------------------------------------------------------------------- SetTimer(lv soc>>BSPSoc.bspTimer, (soc>>BSPSoc.interruptOut % soc>>BSPSoc.unAckedPups gr 0 % soc>>BSPSoc.pupAlloc le 0 % soc>>BSPSoc.byteAlloc le 0 % soc>>BSPSoc.bytesPerPup le 0) ? (soc>>BSPSoc.ackRequested? soc>>BSPSoc.aDataTimeout, Max(soc>>BSPSoc.aDataTimeout, outstandingDataTimeout)), idleTimeout)