-- Copyright (C) 1983  by Xerox Corporation. All rights reserved. 
-- File: PupPktHot.mesa,  Last Edit: AOF 17-Jan-85 15:05:24
-- Tim Diebert,  7-Oct-85 10:27:19

DIRECTORY
  Buffer USING [GetBuffer, ReturnBuffer],
  Inline USING [LowHalf],
  Process USING [EnableAborts, SetTimeout, MsecToTicks],
  System USING [Pulses, GetClockPulses],
  Stats USING [StatBump, StatIncr],
  CommFlags USING [doDebug, doStats],
  Driver USING [Glitch],
  PupStream USING [StreamClosing],
  PupPktDefs,
  PupPktOps,
  PupDefs USING [Byte, PupBuffer, DequeuePup, EnqueuePup, PupRouterSendThis],
  PupTypes USING [PupAddress, maxDataBytesPerGatewayPup];

PupPktHot: MONITOR RETURNS [POINTER TO PupPktOps.InstanceData]LOCKS mine.lock
  IMPORTS
    Buffer, Inline, Process, System, Stats, PupStream, Driver,
    PupPktOps, PupDefs
  EXPORTS PupPktOps =
  BEGIN OPEN PupPktOps, PupDefs;

  mine: PupPktOps.InstanceData;

  NeitherDataNorMark: PUBLIC ERROR = CODE;
  BufferAlreadyRequeued: PUBLIC ERROR = CODE;
  StreamNotOpen: PUBLIC ERROR = CODE;

  -- Only called by InputPacket
  Diff: PROCEDURE [a, b: LONG INTEGER] RETURNS [INTEGER] =
    BEGIN
    maxInteger: INTEGER = 77777B;
    temp: LONG INTEGER ← a - b;
    SELECT TRUE FROM
      temp > maxInteger => RETURN[maxInteger];
      temp < -maxInteger => RETURN[-maxInteger];
      ENDCASE => RETURN[Inline.LowHalf[temp]];
    END;

  Retransmitter: ENTRY PROCEDURE =
    -- Retransmit things which have not been acknowledged in a reasonable time.
    -- Such things include RFCs and ENDs as well as data.
    BEGIN OPEN mine;
    ENABLE UNWIND => NULL;
    now: System.Pulses;
    UNTIL pleaseDie DO
      now ← System.GetClockPulses[];
      SELECT state FROM
        open =>
          BEGIN
          IF now - timer > pingPulses AND ping THEN
            BEGIN
            probeCounter ← pingRetransmissions;
            allocatedPups ← 0;  -- will start probing
            END;
          IF now - timer > ctlRetransmitPulses
            AND (outEnd = 0 OR allocatedPups = 0) THEN
	      BEGIN
	      ProbeForAck[];
	      now ← System.GetClockPulses[];
	      END;
          END;
        talking, finishing =>
          BEGIN
          DO
            -- recycle things that have timed out
            IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue];
            IF sentBuffer = NIL THEN EXIT;
            IF ackedID - Flip[sentBuffer.pup.pupID] > 0 THEN
              BEGIN  -- this packet has been ACKed already
              IF (unackedPups ← unackedPups - 1) = 0 THEN
                BEGIN
                SELECT state FROM  -- last packet has been ACKed

                  talking => state ← open;
                  finishing => state ← end;
                  ENDCASE;
                BROADCAST stateChange;
                END;
              Buffer.ReturnBuffer[sentBuffer];
              sentBuffer ← NIL;
              END
            ELSE EXIT;
            ENDLOOP;
          IF sentBuffer # NIL AND now - timer > retransmitPulses THEN
            BEGIN
            ProbeForAck[];
	    now ← System.GetClockPulses[];
            IF now - timer > retransmitPulses THEN
              BEGIN  -- couldn't get buffer, use one of ours
              IF sentBuffer.pup.pupType = data THEN sentBuffer.pup.pupType ← aData;
              timer ← System.GetClockPulses[];
              PupRouterSendThis[sentBuffer];
              IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsRetransmitted];
              sentBuffer ← DequeuePup[@sentQueue];
              END;
            END;
          END;
        halfOpen => IF now - timer > ctlRetransmitPulses THEN SendRfc[@mine];
        end => IF now - timer > ctlRetransmitPulses THEN SendEnd[@mine];
        closed =>
          BEGIN
          DO
            -- flush anything left on the sentQueue
            IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue];
            IF sentBuffer = NIL THEN EXIT;
            unackedPups ← unackedPups - 1;
            Buffer.ReturnBuffer[sentBuffer];
            sentBuffer ← NIL;
            ENDLOOP;
          END;
        ENDCASE;
      IF outIntPending AND now - outIntTime > ctlRetransmitPulses THEN
        SendInt[@mine];
      WAIT retransmitterReady;
      ENDLOOP;
    UNTIL unackedPups = 0 DO
      DO
        -- flush anything left on the sentQueue
        IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue];
        IF sentBuffer = NIL THEN EXIT;
        unackedPups ← unackedPups - 1;
        Buffer.ReturnBuffer[sentBuffer];
        sentBuffer ← NIL;
        ENDLOOP;
      -- If a buffer is on a device output queue, wait until it comes back.
      IF unackedPups # 0 THEN WAIT retransmitterReady;
      ENDLOOP;
    END;


  ProbeForAck: INTERNAL PROCEDURE =
    BEGIN OPEN mine;
    b: PupBuffer;
    IF (b ← Buffer.GetBuffer[
      pup, PupPktOps.pupBuffers, send, fullBuffer, FALSE]) = NIL THEN RETURN;
    b.pup.pupLength ← bytesPerPupHeader;
    b.pup.pupType ← aData;
    b.pup.pupID ← Flop[nextOutputID];
    b.pup.source ← local;
    b.pup.dest ← remote;
    timer ← System.GetClockPulses[];
    aDataOut ← FALSE;
    clumpsSinceBump ← 0;
    PupRouterSendThis[b];
    IF CommFlags.doStats THEN Stats.StatIncr[statProbesSent];
    IF (probeCounter ← probeCounter + 1) > retransmitionsBeforeAbort THEN
      SmashClosed[@mine, transmissionTimeout];
    IF probeCounter > probesBeforePanic THEN
      retransmitPulses ← [
        MIN[System.Pulses[2*retransmitPulses], maxRetransmitPulses]];
    END;

  ThrottleForward: PROCEDURE =
    BEGIN OPEN mine;
    old: CARDINAL ← pathMaxAllocate;
    clumpsSinceBump ← 0;
    -- We can actually get one packet ahead at this point.
    IF retransmitPulses = maxRetransmitPulses THEN RETURN;
    pathMaxAllocate ← MIN[pathMaxAllocate + 1, myMaxAllocate];
    retransmitPulses ← [(retransmitPulses*pathMaxAllocate)/old];
    END;
    
  ThrottleBack: INTERNAL PROCEDURE =
    BEGIN OPEN mine;
    old: CARDINAL;
    IF pathMaxAllocate = 1 THEN
      BEGIN  -- This is a desperate attempt to avoid an instability
      -- It is/(was?) also a nasty bug under some strange case that Andrew found
      pause: CONDITION;
      Process.SetTimeout[@pause, Process.MsecToTicks[maxRetransmitTime]];
      Process.EnableAborts[@pause];
      WAIT pause;
      END;
    UNTIL throttle = 0 OR retransmitPulses = minRetransmitPulses
      OR pathMaxAllocate = 1 DO
      old ← pathMaxAllocate;
      pathMaxAllocate ← pathMaxAllocate - 1;
      -- Beware of rounding down, assume return ack takes as long as a send packet
      -- This goes unstable if much of the time is due to somebody else's packets
      --    retransmitPulses ← ((retransmitPulses+old)*old)/(old+1);
      throttle ← throttle - 1;
      ENDLOOP;
    clumpsSinceBump ← throttle ← 0;
    END;

  SendAck: INTERNAL PROCEDURE =
    BEGIN OPEN mine;
    b: PupBuffer;
    IF c # NIL THEN BEGIN b ← c; c ← NIL; END
    ELSE IF (b ← Buffer.GetBuffer[
      pup, PupPktOps.pupBuffers, send, fullBuffer, FALSE]) = NIL THEN RETURN;
    b.pup.pupBody ← ack[
      dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]],
      byteAllocate];
    allocatedID ← nextInputID + byteAllocate;
    b.pup.pupType ← ack;
    b.pup.pupID ← Flop[nextInputID];
    b.pup.pupLength ← bytesPerAck;
    b.pup.source ← local;
    b.pup.dest ← remote;
    PupRouterSendThis[b];
    IF CommFlags.doStats THEN Stats.StatIncr[statAcksSent];
    IF state = open AND outEnd # 0 AND allocatedPups # 0 THEN
      timer ← System.GetClockPulses[];  -- avoid pinging
    sendAck ← FALSE;
    END;

  Get: ENTRY PROCEDURE RETURNS [b: PupBuffer] =
    BEGIN OPEN mine;
    ENABLE UNWIND => NULL;
    IF inputQueue.length = 0 THEN
      SELECT TRUE FROM
        (state = closed) => ERROR PupStream.StreamClosing[whyClosed, text];
        dontWait => RETURN[NIL];
        ENDCASE => WAIT inputReady;
    IF inputQueue.length # 0 THEN b ← DequeuePup[@inputQueue] ELSE b ← NIL;
    IF b = NIL THEN
      BEGIN
      IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text];
      RETURN;
      END;
    IF CommFlags.doStats THEN
      SELECT b.pup.pupType FROM
        data, aData =>
          BEGIN
          Stats.StatIncr[statDataPacketsReceived];
          Stats.StatBump[statDataBytesReceived, b.pup.pupLength - bytesPerPupHeader];
          END;
        mark, aMark => Stats.StatIncr[statMarksReceived];
        ENDCASE => Driver.Glitch[NeitherDataNorMark];
    IF inputQueue.length = 0 AND sendAck THEN SendAck[];
    END;

  Put: PROCEDURE [b: PupBuffer] =
    BEGIN OPEN mine;
    IF CommFlags.doDebug AND b.requeueProcedure # Buffer.ReturnBuffer THEN
      Driver.Glitch[BufferAlreadyRequeued];
    SELECT state FROM
      open, talking =>
        BEGIN
        IF CommFlags.doStats THEN
          BEGIN
          Stats.StatIncr[statDataPacketsSent];
          Stats.StatBump[statDataBytesSent, b.pup.pupLength - bytesPerPupHeader];
          END;
        SendPacket[b];
        END;
      idle, halfOpen => Driver.Glitch[StreamNotOpen];
      ENDCASE --end, closed, finishing-- => StreamDied[@mine, b];
    END;

  PutMark: PROCEDURE [byte: Byte] =
    BEGIN OPEN mine;
    b: PupBuffer;
    b ← Buffer.GetBuffer[pup, PupPktOps.pupBuffers, send, smallBuffer];
    b.pup.pupBytes[0] ← byte;
    b.pup.pupLength ← bytesPerPupHeader + 1;
    b.pup.pupType ← aMark;
    SendPacket[b];
    IF CommFlags.doStats THEN Stats.StatIncr[statMarksSent];
    END;

  SendPacket: ENTRY PROCEDURE [b: PupBuffer] =
    BEGIN OPEN mine;
    ENABLE UNWIND => NULL;
    SELECT state FROM
      open, talking =>
        BEGIN
        b.pup.pupID ← Flop[nextOutputID];
        IF b.pup.pupLength = bytesPerPupHeader THEN probeCounter ← 1
        ELSE
          BEGIN
          b.requeueProcedure ← LOOPHOLE[PutOnSentQueue];
          state ← talking;
          unackedPups ← unackedPups + 1;
          END;
        b.pup.source ← local;
        b.pup.dest ← remote;
        nextOutputID ← nextOutputID + (b.pup.pupLength - bytesPerPupHeader);
        IF b.pup.pupType = data  -- we don't use mark, only aMark
          AND ~((maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups)
          THEN b.pup.pupType ← aData;
        IF b.pup.pupType # data THEN  -- aMark or aData
          BEGIN timer ← System.GetClockPulses[]; aDataOut ← TRUE; END;
        PupRouterSendThis[b];
        IF b.pup.pupType # data THEN WaitToSend[ ! UNWIND => NULL];
        END;
      idle, halfOpen => Driver.Glitch[StreamNotOpen];
      ENDCASE --end, closed, finishing-- => StreamDied[@mine, b ! UNWIND => NULL];
    END;

  WaitToSend: INTERNAL PROCEDURE =
    BEGIN OPEN mine;
    SELECT state FROM
      open, talking =>
        BEGIN
        -- Wait until all our packets have been acked so we don't shoot them down.
        DO
          SELECT state FROM
            open => EXIT;
            talking => WAIT stateChange;
            ENDCASE => StreamDied[@mine, NIL];
          ENDLOOP;
        -- now wait for allocate
        UNTIL (maxOutputID - nextOutputID) > 0 AND allocatedPups > unackedPups DO
          SELECT state FROM
            -- not really, but .....

            open, talking => WAIT stateChange;
            ENDCASE => StreamDied[@mine, NIL];
          ENDLOOP;
        IF throttle > 0 THEN ThrottleBack[];
        END;
      idle, halfOpen => Driver.Glitch[StreamNotOpen];
      ENDCASE --end, closed, finishing-- => StreamDied[@mine, NIL];
    END;

  PutOnSentQueue: ENTRY PROCEDURE [b: PupBuffer] =
    BEGIN OPEN mine;  -- better not SIGNAL from here
    IF ackedID > Flip[b.pup.pupID] THEN
      BEGIN
      IF (unackedPups ← unackedPups - 1) = 0 THEN
        BEGIN
        SELECT state FROM  -- last packet has been ACKed

          talking => state ← open;
          finishing => state ← end;
          ENDCASE => SendAbort[@mine];
        BROADCAST stateChange;
        END;
      Buffer.ReturnBuffer[b];
      END
    ELSE IF sentBuffer = NIL THEN sentBuffer ← b ELSE EnqueuePup[@sentQueue, b];
    END;

  Slurp: PROCEDURE =
    BEGIN OPEN mine;
    b: PupBuffer;
    UNTIL pleaseDie DO b ← socket.get[]; IF b # NIL THEN InputPacket[b]; ENDLOOP;
    END;

  InputPacket: ENTRY PROCEDURE [b: PupBuffer] =
    BEGIN OPEN mine;
    thisID: LONG INTEGER ← Flip[b.pup.pupID];
    c ← b;

    IF ~(b.pup.pupType = rfc OR b.pup.pupType = error)
      AND (b.pup.source.socket # remote.socket)
      THEN
      BEGIN
      IF CommFlags.doStats THEN Stats.StatIncr[statPacketsRejectedBadSource];
      Buffer.ReturnBuffer[b];
      c ← NIL;
      RETURN;
      END;

    SELECT b.pup.pupType FROM
      data, aData, mark, aMark =>
        SELECT state FROM
          open, talking, end =>
            BEGIN
            SELECT b.pup.pupType FROM aData, aMark => sendAck ← TRUE; ENDCASE;
            IF b.pup.pupLength > bytesPerPupHeader THEN
              BEGIN
              offset: INTEGER ← Diff[thisID, nextInputID];
              SELECT offset FROM
                0 =>
                  BEGIN  -- nice - just what we wanted
                  nextInputID ← nextInputID +
		    (c.pup.pupLength - bytesPerPupHeader);
                  EnqueuePup[@inputQueue, c];
                  NOTIFY inputReady;
                  c ← NIL;
                  END;
                ENDCASE =>
                  BEGIN
                  IF CommFlags.doStats THEN
                    SELECT offset FROM
                      IN (0..duplicateWindow] =>
                        Stats.StatIncr[statDataPacketsReceivedEarly];
                      IN (-duplicateWindow..0) =>
                        Stats.StatIncr[statDataPacketsReceivedAgain];
                      ENDCASE => Stats.StatIncr[statDataPacketsReceivedVeryLate];
                  Buffer.ReturnBuffer[c];
                  c ← NIL;
                  END;
              END
            ELSE  -- funny length from way back there
              BEGIN
                IF (b.pup.pupLength = bytesPerPupHeader)
		  AND (b.pup.pupType = aData) THEN
                  BEGIN
		  IF CommFlags.doStats THEN Stats.StatIncr[statProbesReceived];
		  SendAck[];
		  END
                ELSE IF CommFlags.doStats THEN Stats.StatIncr[statEmptyFunnys];
              END;
            -- answer probes immediately
            IF sendAck AND inputQueue.length = 0 THEN SendAck[];
            END;
          halfOpen => SendRfc[@mine];
          ENDCASE --idle, closed, finishing-- => SendAbort[@mine];

      ack =>
        BEGIN
        IF b.pup.pupLength < bytesPerAck THEN
          BEGIN
          IF CommFlags.doStats THEN Stats.StatIncr[statMouseTrap];
          GOTO SkipThisAck;
          END;
        IF CommFlags.doStats THEN Stats.StatIncr[statAcksReceived];
        -- Try to avoid the funny stable SLOW case
        IF (thisID - ackedID) < 0 OR (thisID = ackedID AND probeCounter = 0) THEN
          BEGIN
          IF CommFlags.doStats THEN Stats.StatIncr[statDuplicateAcks];
          GOTO SkipThisAck;
          END;
        probeCounter ← 0;
        IF aDataOut THEN
          BEGIN
          myRetrTime, responseTime: System.Pulses;
          responseTime ← [System.GetClockPulses[] - timer];
          responseTime ← [MIN[responseTime, maxRetransmitPulses]];
          -- retransmitPulses ← 2*Smooth[responseTime]
          -- retransmitPulses ← 2*((7/8)*(retransmitPulses/2)+(1/8)*responseTime)
          -- retransmitPulses ← (7*retransmitPulses+2*responseTime)/8
          myRetrTime ← retransmitPulses;
          myRetrTime ← [(6*myRetrTime + myRetrTime + 2*responseTime)/8];
          myRetrTime ← [
            MAX[minRetransmitPulses, MIN[myRetrTime, maxRetransmitPulses]]];
          retransmitPulses ← myRetrTime;
          aDataOut ← FALSE;
          clumpsSinceBump ← clumpsSinceBump + 1;
          IF clumpsSinceBump > clumpsBeforeBump THEN ThrottleForward[];
          END;
        IF allocatedPups = 0 THEN BROADCAST stateChange;
        hisMaxAllocate ← b.pup.numberOfPupsAhead;
        IF hisMaxAllocate = 0 THEN
          BEGIN
          probeCounter ← 1;
          IF CommFlags.doStats THEN Stats.StatIncr[statEmptyAlloc];
          END;
        allocatedPups ← MIN[hisMaxAllocate, pathMaxAllocate];
        IF outEnd = 0 THEN BROADCAST stateChange;  -- in case first time
        outEnd ← MIN[b.pup.maximumBytesPerPup, dataBytesPerPup];
        IF ~sameNet THEN outEnd ← MIN[outEnd, PupTypes.maxDataBytesPerGatewayPup];
        IF (thisID - maxOutputID) + b.pup.numberOfBytesAhead > 0 THEN
          maxOutputID ← b.pup.numberOfBytesAhead + thisID;
        ackedID ← thisID;
        IF sentBuffer = NIL THEN sentBuffer ← DequeuePup[@sentQueue];
        WHILE sentBuffer # NIL AND thisID > Flip[sentBuffer.pup.pupID] DO
          IF (unackedPups ← unackedPups - 1) = 0 THEN
            BEGIN
            SELECT state FROM  -- last packet has been ACKed
              talking => state ← open;
              finishing => state ← end;
              ENDCASE => SendAbort[@mine];
            BROADCAST stateChange;
            END;
          Buffer.ReturnBuffer[sentBuffer];
          sentBuffer ← DequeuePup[@sentQueue];
          ENDLOOP;
        UNTIL sentBuffer = NIL DO
          PupRouterSendThis[sentBuffer];
          IF CommFlags.doStats THEN Stats.StatIncr[statDataPacketsRetransmitted];
          sentBuffer ← DequeuePup[@sentQueue];
          ENDLOOP;
        EXITS SkipThisAck => NULL;
        END;
      ENDCASE => GotOther[@mine, c];

    IF c # NIL THEN BEGIN Buffer.ReturnBuffer[c]; c ← NIL; END;
    END;

  MyGetLocalAddress: PROCEDURE RETURNS [PupTypes.PupAddress] =
    BEGIN RETURN[GetLocalAddress[@mine]]; END;

  MyGetSenderSizeLimit: PROCEDURE RETURNS [CARDINAL] =
    BEGIN RETURN[GetSenderSizeLimit[@mine]]; END;

  MySendAttention: ENTRY PROCEDURE =
    BEGIN OPEN mine;
    ENABLE UNWIND => NULL;
    WHILE outIntPending DO WAIT stateChange; ENDLOOP;
    outIntPending ← TRUE;
    SendInt[@mine];
    END;

  MyWaitForAttention: ENTRY PROCEDURE =
    BEGIN OPEN mine;
    ENABLE UNWIND => NULL;
    WHILE seenIntSeq = inIntSeq DO WAIT waitingForInterrupt; ENDLOOP;
    seenIntSeq ← seenIntSeq + 1;
    END;

  -- initialization
  mine.me.get ← Get;  -- do cold stuff here to avoid recompilation hassels
  mine.me.put ← Put;
  mine.me.putMark ← PutMark;
  mine.me.getSenderSizeLimit ← MyGetSenderSizeLimit;
  mine.me.sendAttention ← MySendAttention;
  mine.me.waitForAttention ← MyWaitForAttention;
  mine.me.getLocalAddress ← MyGetLocalAddress;
  mine.slurp ← Slurp;
  mine.retransmitter ← Retransmitter;
  RETURN[@mine];
  END.
  
LOG

19-May-83 16:31:21  By: SMA  Action: Converted to new BufferMgr.
24-May-83 11:08:44  By: SMA  Action: Get some buffers without wait.
 6-Jun-83 15:32:51  By: SMA  Action: Enable aborts on condition variables.
10-Dec-84 17:16:19  By: SMA  Action: Put in HGM's fix for PupPktHot.
14-Dec-84 14:55:09  By: SMA  Action: Added SetWaitTime and GetWaitTime.