-- File: PupPktHot.mesa,  Last Edit:
  -- HGM  October 13, 1980  8:08 PM
  -- MAS  August 20, 1980  12:57 PM

-- Copyright  Xerox Corporation 1979, 1980

DIRECTORY
  InlineDefs: FROM "InlineDefs" USING [LowHalf],
  StatsDefs: FROM "StatsDefs" USING [StatBump, StatIncr],
  CommUtilDefs: FROM "CommUtilDefs" USING [
    GetTicks, SetTimeout, MsecToTicks],
  DriverDefs: FROM "DriverDefs" USING [
    MaybeGetFreePupBuffer, doDebug, doStats, Glitch],
  PupStream: FROM "PupStream" USING [StreamClosing],
  PupPktPrivateDefs: FROM "PupPktPrivateDefs",
  PupPktDefs: FROM "PupPktDefs",
  PupDefs: FROM "PupDefs" USING [
    Byte, PupBuffer, DequeuePup, EnqueuePup,
    GetFreePupBuffer, ReturnFreePupBuffer, PupRouterSendThis],
  BufferDefs: FROM "BufferDefs" USING [ReturnFreeBuffer],
  PupTypes: FROM "PupTypes" USING [PupAddress, maxDataBytesPerGatewayPup];

PupPktHot: MONITOR
    RETURNS [POINTER TO PupPktPrivateDefs.InstanceData]
 LOCKS mine.lock
  IMPORTS
    InlineDefs,
    StatsDefs, CommUtilDefs, PupStream, DriverDefs,
    PupPktPrivateDefs, PupDefs, BufferDefs
  EXPORTS PupPktPrivateDefs =
BEGIN OPEN StatsDefs, DriverDefs, PupPktPrivateDefs, PupDefs;

mine: PupPktPrivateDefs.InstanceData;

NeitherDataNorMark: PUBLIC ERROR = CODE;
BufferAlreadyRequeued: PUBLIC ERROR = CODE;
StreamNotOpen: PUBLIC ERROR = CODE;

-- Only called by InputPacket
Diff: PROCEDURE [a, b: LONG INTEGER] RETURNS [INTEGER] =
  BEGIN
  maxInteger: INTEGER = 77777B;
  temp: LONG INTEGER ← a-b;
  SELECT TRUE FROM
    temp>maxInteger => RETURN [maxInteger];
    temp<-maxInteger => RETURN [-maxInteger];
    ENDCASE => RETURN[InlineDefs.LowHalf[temp]];
  END;

Retransmitter: ENTRY PROCEDURE =
  -- Retransmit things which have not been acknowledged in a reasonable time.
  -- Such things include RFCs and ENDs as well as data.
  BEGIN OPEN mine;
  now:  CARDINAL;
  UNTIL pleaseDie DO
    now ← CommUtilDefs.GetTicks[];
    SELECT state FROM
      open =>
        BEGIN
        IF now-timer>pingTicks AND ping THEN
          BEGIN
          probeCounter ← pingRetransmissions;
          allocatedPups ← 0;  -- will start probing
          END;
        IF now-timer>ctlRetransmitTicks AND (outEnd=0 OR allocatedPups=0) THEN
          ProbeForAck[];
        END;
      talking, finishing =>
        BEGIN
        DO  -- recycle things that have timed out
          IF sentBuffer=NIL THEN sentBuffer ← DequeuePup[@sentQueue];
          IF sentBuffer=NIL THEN EXIT;
          IF ackedID-Flip[sentBuffer.pupID]>0 THEN
            BEGIN -- this packet has been ACKed already
            IF (unackedPups←unackedPups-1)=0 THEN
              BEGIN
              SELECT state FROM  -- last packet has been ACKed
                talking => state ← open;
                finishing => state ← end;
                ENDCASE;
              BROADCAST stateChange;
              END;
            ReturnFreePupBuffer[sentBuffer]; sentBuffer←NIL;
            END
            ELSE EXIT;
          ENDLOOP;
        IF sentBuffer#NIL AND now-timer>retransmitTicks THEN
          BEGIN
          ProbeForAck[];
          IF now-timer>retransmitTicks THEN
            BEGIN  -- couldn't get buffer, use one of ours
            IF sentBuffer.pupType=data THEN sentBuffer.pupType←aData;
            timer ← CommUtilDefs.GetTicks[];
            PupRouterSendThis[sentBuffer];
            IF doStats THEN StatIncr[statDataPacketsRetransmitted];
            sentBuffer ← DequeuePup[@sentQueue];
            END;
          END;
        END;
      halfOpen => IF now-timer>ctlRetransmitTicks THEN SendRfc[@mine];
      end => IF now-timer>ctlRetransmitTicks THEN SendEnd[@mine];
      closed =>
        BEGIN
        DO  -- flush anything left on the sentQueue
          IF sentBuffer=NIL THEN sentBuffer ← DequeuePup[@sentQueue];
          IF sentBuffer=NIL THEN EXIT;
          unackedPups ← unackedPups-1;
          ReturnFreePupBuffer[sentBuffer];
          sentBuffer ← NIL;
          ENDLOOP;
        END;
      ENDCASE;
    IF outIntPending AND now-outIntTime>ctlRetransmitTicks THEN SendInt[@mine];
    WAIT retransmitterReady;
    ENDLOOP;
  UNTIL unackedPups=0 DO
    DO  -- flush anything left on the sentQueue
      IF sentBuffer=NIL THEN sentBuffer ← DequeuePup[@sentQueue];
      IF sentBuffer=NIL THEN EXIT;
      unackedPups ← unackedPups-1;
      ReturnFreePupBuffer[sentBuffer];
      sentBuffer ← NIL;
      ENDLOOP;
    -- If a buffer is on a device output queue, wait until it comes back.
    IF unackedPups#0 THEN WAIT retransmitterReady;
    ENDLOOP;
  END;


ProbeForAck: INTERNAL PROCEDURE =
  BEGIN OPEN mine;
  b: PupBuffer;
  IF (b←MaybeGetFreePupBuffer[])=NIL THEN RETURN;
  b.pupLength ← bytesPerPupHeader;
  b.pupType ← aData;
  b.pupID ← Flop[nextOutputID];
  b.source.socket ← local.socket;
  b.dest ← remote;
  timer ← CommUtilDefs.GetTicks[];
  aDataOut ← FALSE;
  clumpsSinceBump ← 0;
  PupRouterSendThis[b];
  IF doStats THEN StatIncr[statProbesSent];
  IF (probeCounter←probeCounter+1)>retransmitionsBeforeAbort THEN
    SmashClosed[@mine,transmissionTimeout];
  IF probeCounter>probesBeforePanic THEN
    retransmitTicks ← MIN[2*retransmitTicks,maxRetransmitTicks];
  END;

ThrottleForward: PROCEDURE =
  BEGIN OPEN mine;
  old: CARDINAL ← pathMaxAllocate;
  clumpsSinceBump ← 0;
  -- We can actually get one packet ahead at this point.
  IF retransmitTicks=maxRetransmitTicks THEN RETURN;
  pathMaxAllocate ← MIN[pathMaxAllocate+1,myMaxAllocate];
  retransmitTicks ← (retransmitTicks*pathMaxAllocate)/old;
  END;

ThrottleBack: INTERNAL PROCEDURE =
  BEGIN OPEN mine;
  old: CARDINAL;
  IF pathMaxAllocate=1 THEN
    BEGIN  -- This is a desperate attempt to avoid an instability
    -- It is/(was?) also a nasty bug under some strange case that Andrew found
    pause: CONDITION;
CommUtilDefs.SetTimeout[@pause,CommUtilDefs.MsecToTicks[maxRetransmitTime]];
    WAIT pause;
    END;
  UNTIL throttle=0 OR retransmitTicks=minRetransmitTicks OR pathMaxAllocate=1 DO
    old ← pathMaxAllocate;
    pathMaxAllocate ← pathMaxAllocate-1;
    -- Beware of rounding down, assume return ack takes as long as a send packet
-- This goes unstable if much of the time is due to somebody else's packets
--    retransmitTicks ← ((retransmitTicks+old)*old)/(old+1);
    throttle ← throttle-1;
    ENDLOOP;
  clumpsSinceBump ← throttle ← 0;
  END;

SendAck: INTERNAL PROCEDURE =
  BEGIN OPEN mine;
  b: PupBuffer;
  IF c#NIL THEN BEGIN b←c; c←NIL; END
  ELSE IF (b←MaybeGetFreePupBuffer[])=NIL THEN RETURN;
  b.pupBody ← ack [
    dataBytesPerPup,
    MAX[0,INTEGER[myMaxAllocate-inputQueue.length]],
    byteAllocate];
  allocatedID ← nextInputID+byteAllocate;
  b.pupType ← ack;
  b.pupID ← Flop[nextInputID];
  b.pupLength ← bytesPerAck;
  b.source.socket ← local.socket;
  b.dest ← remote;
  PupRouterSendThis[b];
  IF doStats THEN StatIncr[statAcksSent];
  IF state=open AND outEnd#0 AND allocatedPups#0 THEN
    timer ← CommUtilDefs.GetTicks[];  -- avoid pinging
  sendAck ← FALSE;
  END;

Get: ENTRY PROCEDURE RETURNS [b: PupBuffer] =
  BEGIN OPEN mine; ENABLE UNWIND => NULL;
  IF inputQueue.length=0 THEN
    SELECT TRUE FROM
      (state=closed) => ERROR PupStream.StreamClosing[whyClosed,text];
      dontWait => RETURN[NIL];
      ENDCASE => WAIT inputReady;
  IF inputQueue.length#0 THEN b ← DequeuePup[@inputQueue]
  ELSE b ← NIL;
  IF b=NIL THEN
    BEGIN
    IF state=closed THEN ERROR PupStream.StreamClosing[whyClosed,text];
    RETURN;
    END;
  IF doStats THEN SELECT b.pupType FROM
    data, aData =>
      BEGIN
      StatIncr[statDataPacketsReceived];
      StatBump[statDataBytesReceived,b.pupLength-bytesPerPupHeader];
      END;
    mark, aMark => StatIncr[statMarksReceived];
    ENDCASE => Glitch[NeitherDataNorMark];
  IF inputQueue.length=0 AND sendAck THEN SendAck[];
  END;

Put: PROCEDURE [b: PupBuffer]=
  BEGIN OPEN mine;
  IF doDebug AND b.requeueProcedure#BufferDefs.ReturnFreeBuffer THEN
    Glitch[BufferAlreadyRequeued];
  SELECT state FROM
    open, talking  =>
      BEGIN
        IF doStats THEN
          BEGIN
          StatIncr[statDataPacketsSent];
          StatBump[statDataBytesSent,b.pupLength-bytesPerPupHeader];
          END;
        SendPacket[b];
        END;
    idle, halfOpen => Glitch[StreamNotOpen];
    ENDCASE --end, closed, finishing-- => StreamDied[@mine,b];
  END;

PutMark: PROCEDURE [byte: Byte] =
  BEGIN OPEN mine;
  b: PupBuffer;
  b ← GetFreePupBuffer[];
  b.pupBytes[0] ← byte;
  b.pupLength ← bytesPerPupHeader+1;
  b.pupType ← aMark;
  SendPacket[b];
  IF doStats THEN StatIncr[statMarksSent];
  END;

SendPacket: ENTRY PROCEDURE [b: PupBuffer] =
  BEGIN OPEN mine;
  SELECT state FROM
    open, talking =>
      BEGIN
      b.pupID ← Flop[nextOutputID];
      IF b.pupLength=bytesPerPupHeader THEN probeCounter ← 1
      ELSE
        BEGIN
        b.requeueProcedure ← LOOPHOLE[PutOnSentQueue];
        state ← talking;
        unackedPups ← unackedPups+1;
        END;
      b.source.socket ← local.socket;
      b.dest ← remote;
      nextOutputID ← nextOutputID+(b.pupLength-bytesPerPupHeader);
      IF b.pupType=data  -- we don't use mark, only aMark
      AND ~((maxOutputID-nextOutputID)>0 AND allocatedPups>unackedPups) THEN
        b.pupType ← aData;
      IF b.pupType#data THEN  -- aMark or aData
        BEGIN
        timer ← CommUtilDefs.GetTicks[];
        aDataOut ← TRUE;
        END;
      PupRouterSendThis[b];
      IF b.pupType#data THEN WaitToSend[ ! UNWIND => NULL];
      END;
    idle, halfOpen => Glitch[StreamNotOpen];
    ENDCASE --end, closed, finishing-- => StreamDied[@mine, b ! UNWIND => NULL];
  END;

WaitToSend: INTERNAL PROCEDURE =
  BEGIN OPEN mine;
  SELECT state FROM
    open, talking =>
      BEGIN
      -- Wait until all our packets have been acked so we don't shoot them down.
      DO
        SELECT state FROM
          open => EXIT;
          talking => WAIT stateChange;
          ENDCASE => StreamDied[@mine,NIL];
        ENDLOOP;
      -- now wait for allocate
      UNTIL (maxOutputID-nextOutputID)>0 AND allocatedPups>unackedPups DO
        SELECT state FROM
          -- not really, but .....
          open, talking => WAIT stateChange;
          ENDCASE => StreamDied[@mine,NIL];
        ENDLOOP;
      IF throttle>0 THEN ThrottleBack[];
      END;
    idle, halfOpen => Glitch[StreamNotOpen];
    ENDCASE --end, closed, finishing-- => StreamDied[@mine,NIL];
  END;

PutOnSentQueue: ENTRY PROCEDURE [b: PupBuffer] =
  BEGIN OPEN mine;  -- better not SIGNAL from here
  IF ackedID-Flip[b.pupID]>0 THEN
    BEGIN
    IF (unackedPups←unackedPups-1)=0 THEN
      BEGIN
      SELECT state FROM  -- last packet has been ACKed
        talking => state ← open;
        finishing => state ← end;
        ENDCASE => SendAbort[@mine];
      BROADCAST stateChange;
      END;
    ReturnFreePupBuffer[b];
    END
  ELSE IF sentBuffer=NIL THEN sentBuffer ← b ELSE EnqueuePup[@sentQueue,b];
  END;

Slurp: PROCEDURE =
  BEGIN OPEN mine;
  b: PupBuffer;
  UNTIL pleaseDie DO
    b ← socket.get[];
    IF b#NIL THEN InputPacket[b];
    ENDLOOP;
  END;

InputPacket: ENTRY PROCEDURE [b: PupBuffer] =
  BEGIN OPEN mine;
  thisID: LONG INTEGER ← Flip[b.pupID];
  c ← b;

  IF ~(b.pupType=rfc OR b.pupType=error)
  AND b.source.socket#remote.socket THEN
    BEGIN
    IF doStats THEN StatIncr[statPacketsRejectedBadSource];
    ReturnFreePupBuffer[b];
    c ← NIL;
    END;

  SELECT b.pupType FROM
    data, aData, mark, aMark =>
    SELECT state FROM
    open, talking, end =>
      BEGIN
      SELECT b.pupType FROM
        aData, aMark => sendAck ← TRUE;
        ENDCASE;
      IF b.pupLength>bytesPerPupHeader THEN
        BEGIN
        offset: INTEGER ← Diff[thisID,nextInputID];
        SELECT offset FROM
          0 =>
            BEGIN  -- nice - just what we wanted
            nextInputID ← nextInputID+(c.pupLength-bytesPerPupHeader);
            EnqueuePup[@inputQueue,c];
            NOTIFY inputReady;
            c ← NIL;
            END;
          ENDCASE =>
            BEGIN
            IF doStats THEN SELECT offset FROM
              IN (0..duplicateWindow] => StatIncr[statDataPacketsReceivedEarly];
              IN (-duplicateWindow..0) => StatIncr[statDataPacketsReceivedAgain];
              ENDCASE => StatIncr[statDataPacketsReceivedVeryLate];
            ReturnFreePupBuffer[c];
            c ← NIL;
            END;
        END
      ELSE  -- funny length from way back there
        BEGIN
        IF doStats THEN
          IF b.pupLength=bytesPerPupHeader AND b.pupType=aData
          THEN StatIncr[statProbesReceived]
          ELSE StatIncr[statEmptyFunnys];
        END;
      -- answer probes immediately
      IF sendAck AND inputQueue.length=0 THEN SendAck[];
      END;
    halfOpen => SendRfc[@mine];
    ENDCASE --idle, closed, finishing-- => SendAbort[@mine];
   
  ack =>
    BEGIN
    IF b.pupLength<bytesPerAck THEN
      BEGIN IF doStats THEN StatIncr[statMouseTrap]; GOTO SkipThisAck; END;
    IF doStats THEN StatIncr[statAcksReceived];
    -- Try to avoid the funny stable SLOW case
    IF (thisID-ackedID)<0 OR (thisID=ackedID AND probeCounter=0) THEN
      BEGIN IF doStats THEN StatIncr[statDuplicateAcks]; GOTO SkipThisAck; END;
    probeCounter ← 0;
    IF aDataOut THEN
      BEGIN
      responseTime, myRetrTime: CARDINAL;
      responseTime ← CommUtilDefs.GetTicks[]-timer;
      responseTime ← MIN[responseTime,maxRetransmitTicks];
      -- retransmitTicks ← 2*Smooth[responseTime]
      -- retransmitTicks ← 2*((7/8)*(retransmitTicks/2)+(1/8)*responseTime)
      -- retransmitTicks ← (7*retransmitTicks+2*responseTime)/8
      myRetrTime ← retransmitTicks;
      myRetrTime ← (6*myRetrTime+myRetrTime+2*responseTime)/8;
      myRetrTime ← MAX[minRetransmitTicks,MIN[myRetrTime,maxRetransmitTicks]];
      retransmitTicks ← myRetrTime;
      aDataOut ← FALSE;
      clumpsSinceBump ← clumpsSinceBump+1;
      IF clumpsSinceBump>clumpsBeforeBump THEN ThrottleForward[];
      END;
    IF allocatedPups=0 THEN BROADCAST stateChange;
    hisMaxAllocate ← b.numberOfPupsAhead;
    IF hisMaxAllocate=0 THEN
      BEGIN
      probeCounter ← 1;
      IF doStats THEN StatIncr[statEmptyAlloc];
      END;
    allocatedPups ← MIN[hisMaxAllocate,pathMaxAllocate];
    IF outEnd=0 THEN BROADCAST stateChange;  -- in case first time
    outEnd ← MIN[b.maximumBytesPerPup,dataBytesPerPup];
    IF ~sameNet THEN outEnd←MIN[outEnd,PupTypes.maxDataBytesPerGatewayPup];
    IF (thisID-maxOutputID)+b.numberOfBytesAhead>0
    THEN maxOutputID ← b.numberOfBytesAhead+thisID;
    ackedID ← thisID;
    IF sentBuffer=NIL THEN sentBuffer ← DequeuePup[@sentQueue];
    WHILE sentBuffer#NIL AND thisID>Flip[sentBuffer.pupID] DO
      IF (unackedPups←unackedPups-1)=0 THEN
        BEGIN
        SELECT state FROM  -- last packet has been ACKed
          talking => state ← open;
          finishing => state ← end;
          ENDCASE => SendAbort[@mine];
        BROADCAST stateChange;
        END;
      ReturnFreePupBuffer[sentBuffer];
      sentBuffer ← DequeuePup[@sentQueue];
      ENDLOOP;
    UNTIL sentBuffer=NIL DO
      PupRouterSendThis[sentBuffer];
      IF doStats THEN StatIncr[statDataPacketsRetransmitted];
      sentBuffer ← DequeuePup[@sentQueue];
      ENDLOOP;
    EXITS SkipThisAck => NULL;
    END;
   
  ENDCASE => GotOther[@mine,c];

IF c#NIL THEN BEGIN ReturnFreePupBuffer[c]; c←NIL; END;
END;

MyGetLocalAddress: PROCEDURE RETURNS [PupTypes.PupAddress] =
  BEGIN
  RETURN[GetLocalAddress[@mine]];
  END;

MyGetSenderSizeLimit: PROCEDURE RETURNS [CARDINAL] =
  BEGIN
  RETURN[GetSenderSizeLimit[@mine]];
  END;

MySendAttention: ENTRY PROCEDURE =
  BEGIN OPEN mine; ENABLE UNWIND => NULL;
  WHILE outIntPending DO WAIT stateChange; ENDLOOP;
  outIntPending ← TRUE;
  SendInt[@mine];
  END;

MyWaitForAttention: ENTRY PROCEDURE =
  BEGIN OPEN mine; ENABLE UNWIND => NULL;
  WHILE seenIntSeq=inIntSeq DO WAIT waitingForInterrupt; ENDLOOP;
  seenIntSeq ← seenIntSeq+1;
  END;

-- initialization
mine.me.get ← Get;  -- do cold stuff here to avoid recompilation hassels
mine.me.put ← Put;
mine.me.putMark ← PutMark;
mine.me.getSenderSizeLimit ← MyGetSenderSizeLimit;
mine.me.sendAttention ← MySendAttention;
mine.me.waitForAttention ← MyWaitForAttention;
mine.me.getLocalAddress ← MyGetLocalAddress;
mine.slurp ← Slurp;
mine.retransmitter ← Retransmitter;
RETURN[@mine];
END.  -- PupPktHot