-- File: PupPktCool.mesa,  Last Edit: Levin   9-Mar-82 10:49:53

DIRECTORY
  Heap USING [systemMDSZone],
  String USING [AppendChar],
  System USING [Pulses, GetClockPulses],
  StatsDefs USING [StatIncr],
  CommFlags USING [doDebug, doStats],
  PupRouterDefs USING [NextPupConnectionID],
  PupPktOps,
  DriverDefs USING [MaybeGetFreePupBuffer, Glitch],
  PupStream USING [CloseReason, StreamClosing, PupOpenMode],
  PupPktDefs USING [PupPktStream],
  PupDefs USING [
    Pair, defaultPupsToAllocate, DequeuePup, GetLocalPupAddress,
    ReturnFreePupBuffer, PupAddress, PupBuffer,
    PupSocketID, PupRouterSendThis, SetPupContentsBytes, Tocks, veryLongWait,
    PupSocketMake],
  PupTypes USING [PupType, fillInSocketID];

PupPktCool: MONITOR LOCKS him.lock USING him: PupPktOps.Instance
  IMPORTS
    Heap, String, System, StatsDefs, PupStream,
    PupRouterDefs, DriverDefs, PupPktOps, PupDefs
  EXPORTS PupPktOps, PupPktDefs =
  BEGIN OPEN StatsDefs, PupPktOps, DriverDefs, PupPktDefs, PupDefs;

  myPing: BOOLEAN ← TRUE;
  myMaxAllocate, myPathMaxAllocate: CARDINAL ← defaultPupsToAllocate;
  myMaxBufferSize: CARDINAL ← 0;

  NoBufferToSend: PUBLIC ERROR = CODE;
  StreamAlreadyOpen: PUBLIC ERROR = CODE;

  PupPktStreamAbort: PUBLIC PROCEDURE [ps: PupPktStream, e: STRING] =
    BEGIN
    krock: Instance = NIL;
    offset: INTEGER = @krock.me - LOOPHOLE[krock, POINTER];
    him: Instance ← LOOPHOLE[ps - offset];
    SendAbortWithText[him, e];
    END;

  GetSenderSizeLimit: PUBLIC ENTRY PROCEDURE [him: Instance] RETURNS [CARDINAL] =
    BEGIN OPEN him;
    ENABLE UNWIND => NULL;
    UNTIL outEnd # 0 DO
      WAIT stateChange;
      IF state = closed THEN ERROR PupStream.StreamClosing[whyClosed, text];
      ENDLOOP;
    RETURN[outEnd];
    END;

  GetLocalAddress: PUBLIC PROCEDURE [him: Instance] RETURNS [PupAddress] =
    BEGIN OPEN him; RETURN[local]; END;

  GotOther: PUBLIC PROCEDURE [Instance, PupBuffer] = GotOtherInternal;
  GotOtherInternal: INTERNAL PROCEDURE [him: Instance, b: PupBuffer] =
    BEGIN OPEN him;
    SELECT b.pupType FROM
      rfc => GotRfc[him];
      end => GotEnd[him];
      endRep => GotEndReply[him];
      abort => GotAbort[him];
      error => GotError[him];
      int => GotInt[him];
      intRep => GotIntReply[him];
      ENDCASE => IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadType];
    END;

  GotRfc: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    IF state # idle AND c.pupID # connectionID THEN
      BEGIN
      IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
      RETURN;
      END;
    SELECT state FROM
      idle =>
	BEGIN
	state ← open;
	remote ← c.address;
	OpenInit[him, c.pupID];
	SendRfcInternal[him];
	MySendAck[him];
	BROADCAST stateChange;
	END;
      halfOpen =>
	BEGIN
	state ← open;
	remote ← c.address;
	MySendAck[him];
	BROADCAST stateChange;
	END;
      open, talking => IF mode # sendRfc THEN SendRfcInternal[him];
      ENDCASE --end, closed, finishing-- => SendAbortInternal[him];
    END;

  GotEnd: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    IF c.pupID # connectionID THEN
      BEGIN
      IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
      RETURN;
      END;
    SELECT state FROM
      talking, finishing => BEGIN state ← finishing; whyClosed ← remoteClose; END;
      ENDCASE =>
	BEGIN
	SmashClosedInternal[him, remoteClose];
	SendEndReplyInternal[him];
	END;
    END;

  GotEndReply: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    IF c.pupID # connectionID THEN
      BEGIN
      IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
      RETURN;
      END;
    SELECT state FROM
      open, talking, halfOpen, finishing => SendAbortInternal[him];
      ENDCASE --idle, end, closed-- => state ← closed;
    BROADCAST stateChange;
    NOTIFY inputReady;
    END;

  GotError: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    i, len: CARDINAL;
    IF CommFlags.doStats THEN StatIncr[statErrorPacketsReceived];
    SELECT c.errorCode FROM
      noProcessPupErrorCode =>
	BEGIN -- like an abort
	IF c.source.socket # remote.socket THEN RETURN;
	SELECT state FROM
	  idle, closed => RETURN;
	  ENDCASE => SmashClosedInternal[him, remoteReject];
	IF text = NIL THEN
	  BEGIN
	  len ← c.pupLength - bytesPerPupHeader - 2*(10 + 1 + 1);
	  len ← MIN[len, 100];
	  text ← Heap.systemMDSZone.NEW[StringBody[len]];
	  FOR i IN [0..len) DO String.AppendChar[text, c.errorText[i]]; ENDLOOP;
	  END;
	END;
      gatewayResourceLimitsPupErrorCode =>
	BEGIN
	throttle ← throttle + 1;
	IF pathMaxAllocate = 1 THEN
	  -- Beware: We may have gone unstable
	  retransmitPulses ←
	    [MIN[System.Pulses[2*retransmitPulses], maxRetransmitPulses]];
	END;
      ENDCASE;
    END;

  GotAbort: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    i, len: CARDINAL;
    IF c.pupID # connectionID THEN
      BEGIN
      IF CommFlags.doStats THEN StatIncr[statPacketsRejectedBadID];
      RETURN;
      END;
    IF state = idle OR state = closed THEN RETURN;
    IF text = NIL THEN
      BEGIN
      len ← c.pupLength - bytesPerPupHeader - 2*(1);
      len ← MIN[len, 100];
      text ← Heap.systemMDSZone.NEW[StringBody[len]];
      FOR i IN [0..len) DO String.AppendChar[text, c.abortText[i]]; ENDLOOP;
      END;
    SELECT state FROM
      halfOpen => BEGIN SmashClosedInternal[him, remoteReject]; END;
      ENDCASE => SendAbortInternal[him];
    END;

  GotInt: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    int: LONG INTEGER ← Flip[c.pupID];
    SELECT TRUE FROM
      (int = inIntSeq - 1) => SendIntReply[him]; -- retransmission

      (int = inIntSeq) =>
	BEGIN
	inIntSeq ← inIntSeq + 1;
	NOTIFY waitingForInterrupt;
	SendIntReply[him];
	END;
      ENDCASE => RETURN; -- very old duplicate

    END;

  GotIntReply: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    int: LONG INTEGER ← Flip[c.pupID];
    IF int = outIntSeq THEN
      BEGIN outIntPending ← FALSE; outIntSeq ← outIntSeq + 1; END;
    BROADCAST stateChange;
    END;

  SendEnd: PUBLIC PROCEDURE [him: Instance] = SendEndInternal;
  SendEndInternal: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    timer ← System.GetClockPulses[];
    IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
    Send[him, end, connectionID, 0];
    -- 15*2 is 30 sec
    IF (probeCounter ← probeCounter + 1) > 15 THEN
      SmashClosed[him, transmissionTimeout];
    END;

  SendEndReplyInternal: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him; Send[him, endRep, connectionID, 0]; END;

  SendAbort: PUBLIC PROCEDURE [him: Instance] = SendAbortInternal;
  SendAbortInternal: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    SmashClosedInternal[him, remoteReject];
    IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
    c.abortCode ← 1;
    Send[him, abort, connectionID, 2];
    END;

  SendAbortWithText: ENTRY PROCEDURE [him: Instance, e: STRING] =
    BEGIN OPEN him;
    charsPerAbortHeader: CARDINAL = 2;
    chars: CARDINAL = MIN[
      IF e = NIL THEN 0 ELSE e.length, dataBytesPerPup - charsPerAbortHeader];
    SmashClosedInternal[him, localAbort];
    IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
    c.abortCode ← 1;
    FOR i: CARDINAL IN [0..chars) DO c.abortText[i] ← e[i]; ENDLOOP;
    Send[him, abort, connectionID, charsPerAbortHeader + chars];
    END;

  SendInt: PUBLIC PROCEDURE [him: Instance] = SendIntInternal;
  SendIntInternal: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    outIntTime ← System.GetClockPulses[];
    IF c = NIL AND (c ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
    Send[him, int, Flop[outIntSeq], 0];
    END;

  SendIntReply: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him; Send[him, intRep, Flop[inIntSeq - 1], 0]; END;

  SendRfc: PUBLIC PROCEDURE [him: Instance] = SendRfcInternal;
  SendRfcInternal: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    IF c = NIL THEN c ← MaybeGetFreePupBuffer[];
    IF c = NIL THEN RETURN;
    timer ← System.GetClockPulses[];
    c.address ← local;
    Send[him, rfc, connectionID, dataBytesPerRFC];
    END;

  AnswerRfc: INTERNAL PROCEDURE [him: Instance, listener: PupSocketID] =
    BEGIN OPEN him;
    b: PupBuffer ← MaybeGetFreePupBuffer[];
    IF b = NIL THEN RETURN;
    b.address ← local;
    b.pupType ← rfc;
    b.pupID ← connectionID;
    SetPupContentsBytes[b, dataBytesPerRFC];
    b.source ← local;
    b.source.socket ← listener;
    b.dest ← remote;
    PupRouterSendThis[b];
    END;

  -- send a control message using the current buffer

  Send: INTERNAL PROCEDURE [
    him: Instance, thisType: PupTypes.PupType, thisID: Pair, thisLen: CARDINAL] =
    BEGIN OPEN him;
    b: PupBuffer ← c;
    IF CommFlags.doDebug AND b = NIL THEN Glitch[NoBufferToSend];
    c ← NIL;
    b.pupType ← thisType;
    b.pupID ← thisID;
    SetPupContentsBytes[b, thisLen];
    b.source ← local;
    b.dest ← remote;
    PupRouterSendThis[b];
    END;

  CloseReason: TYPE = PupStream.CloseReason;

  SmashClosed: PUBLIC PROCEDURE [Instance, CloseReason] = SmashClosedInternal;
  SmashClosedInternal: INTERNAL PROCEDURE [him: Instance, why: CloseReason] =
    BEGIN OPEN him;
    state ← closed;
    IF whyClosed = localClose THEN whyClosed ← why; -- don't clobber first reason
    BROADCAST stateChange;
    NOTIFY inputReady;
    END;

  StreamDied: PUBLIC PROCEDURE [him: Instance, b: PupBuffer] =
    BEGIN OPEN him;
    IF b # NIL THEN ReturnFreePupBuffer[b];
    ERROR PupStream.StreamClosing[whyClosed, text];
    END;

  Open: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    SELECT state FROM
      idle =>
	BEGIN
	state ← halfOpen;
	OpenInit[him, PupRouterDefs.NextPupConnectionID[]];
	SendRfcInternal[him];
	THROUGH [0..60) WHILE state # open DO
	  -- 1 min total
	  SELECT state FROM
	    open => RETURN;
	    closed => ERROR PupStream.StreamClosing[whyClosed, text];
	    ENDCASE => WAIT stateChange; -- 1 sec

	  ENDLOOP;
	IF state = open THEN RETURN; -- it gets opened fast if local
	state ← closed;
	whyClosed ← transmissionTimeout;
	ERROR PupStream.StreamClosing[whyClosed, text];
	END;
      ENDCASE => Glitch[StreamAlreadyOpen];
    END;

  OpenInit: INTERNAL PROCEDURE [him: Instance, newID: Pair] =
    BEGIN OPEN him;
    sameNet ← remote.net = local.net;
    probeCounter ← initialRetransmissions; -- also allows first (gratuitous) ack
    connectionID ← newID;
    nextInputID ← nextOutputID ← ackedID ← allocatedID ← allocationID ←
      maxOutputID ← outIntSeq ← inIntSeq ← seenIntSeq ← Flip[newID];
    END;

  MakeLocal: PUBLIC ENTRY PROCEDURE [
    him: Instance, l: PupSocketID, r: PupAddress, m: PupStream.PupOpenMode,
    id: Pair] =
    BEGIN OPEN him;
    ENABLE UNWIND => NULL;
    kludge: PupSocketID ←
      IF m # alreadyOpened THEN l ELSE PupTypes.fillInSocketID;
    local ← GetLocalPupAddress[kludge, @r];
    remote ← r;
    mode ← m;
    connectionID ← id;
    socket ← PupSocketMake[local.socket, remote, veryLongWait, id];
    retransmitterFork ← FORK retransmitter[];
    slurpFork ← FORK slurp[];
    SELECT mode FROM
      sendRfc => Open[him];
      alreadyOpened =>
	BEGIN
	state ← open;
	OpenInit[him, id];
	AnswerRfc[him, l];
	MySendAck[him];
	END;
      wait => NULL;
      ENDCASE => ERROR;
    END;

  -- Copied (with only slight edits) from PupPktHot

  MySendAck: INTERNAL PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    b: PupBuffer;
    IF c # NIL THEN BEGIN b ← c; c ← NIL; END
    ELSE IF (b ← MaybeGetFreePupBuffer[]) = NIL THEN RETURN;
    b.pupBody ← ack[
      dataBytesPerPup, MAX[0, INTEGER[myMaxAllocate - inputQueue.length]],
      byteAllocate];
    allocatedID ← nextInputID + byteAllocate;
    b.pupType ← ack;
    b.pupID ← Flop[nextInputID];
    b.pupLength ← bytesPerAck;
    b.source ← local;
    b.dest ← remote;
    PupRouterSendThis[b];
    IF CommFlags.doStats THEN StatIncr[statAcksSent];
    -- MAXC doesn't send free ACK, so set clock ahead
    timer ←
      [System.GetClockPulses[] + ctlRetransmitPulses/16 - ctlRetransmitPulses];
    END;

  DestroyLocalLocked: PUBLIC ENTRY PROCEDURE [him: Instance] =
    BEGIN OPEN him;
    THROUGH [0..100) WHILE outIntPending DO
      IF state = closed THEN EXIT; -- probably smashed closed, don't hang
      WAIT stateChange;
      ENDLOOP;
    DO
      SELECT state FROM
	open =>
	  BEGIN state ← end; SendEndInternal[him]; probeCounter ← 0; EXIT; END;
	talking, finishing => WAIT stateChange;
	halfOpen => BEGIN SendAbortInternal[him]; EXIT; END;
	ENDCASE --closed, end, idle-- => EXIT;
      ENDLOOP;
    THROUGH [0..10) UNTIL state = idle OR state = closed DO
      -- extra layer for debugging
      UNTIL state = idle OR state = closed DO
	IF c # NIL THEN ReturnFreePupBuffer[c]; -- probe for allocate
	UNTIL (c ← DequeuePup[@inputQueue]) = NIL DO
	  ReturnFreePupBuffer[c]; ENDLOOP;
	WAIT stateChange;
	ENDLOOP;
      ENDLOOP;
    state ← closed;
    NOTIFY retransmitterReady;
    UNTIL unackedPups = 0 DO WAIT stateChange; ENDLOOP; -- retransmitter gets them
    pleaseDie ← TRUE;
    NOTIFY retransmitterReady;
    END;

  END.