-- File:  SequinImplB.mesa
-- Last edited by Levin:   6-Jul-81 16:15:00
-- Loosely derived (after extensive discussions with Wobber) from Butterfield's
--    Sequin.mesa of August 27, 1979  2:49 PM.

DIRECTORY
  BufferDefs USING [Buffer, Enqueue, PupBufferObject, ReturnFreeBuffer],
  Frame USING [GetReturnLink, MyLocalFrame],
  PupDefs USING [
    DequeuePup, EnqueuePup, GetFreePupBuffer, GetPupContentsBytes, PupBuffer,
    ReturnFreePupBuffer, SetPupContentsBytes],
  PupTypes USING [PupType],
  Sequin USING [Buffer, noBuffer],
  SequinPrivate USING [Handle, maxAllocate, maxBytes, maxPings, RequeueClosure,
    Seconds, SequenceNumber, SequinID, SequinControl],
  Time USING [Current];

SequinImplB: MONITOR LOCKS sequin.LOCK USING sequin: SequinPrivate.Handle
  IMPORTS BufferDefs, Frame, PupDefs, SequinPrivate, Time
  EXPORTS Sequin, SequinPrivate =

  BEGIN OPEN PupDefs, Sequin, SequinPrivate;


  -- Types --

  Handle: PUBLIC TYPE = SequinPrivate.Handle;


  -- Miscellaneous Declarations --

  BogusBuffer: ERROR = CODE;
  ServerIsSpeakingGreek: ERROR = CODE;
  WhereDidIPutThatBuffer: ERROR = CODE;

  idleLatency: Seconds = 10;


  -- Procedures and Signals Exported to Sequin --

  Broken: PUBLIC ERROR = CODE;

  Get: PUBLIC ENTRY PROCEDURE [sequin: Handle] RETURNS [Buffer] =
    BEGIN
    packet: PupBuffer;
    DO
      IF sequin.broken THEN RETURN WITH ERROR Broken;
      IF (packet ← DequeuePup[@sequin.getQueue]) ~= NIL THEN EXIT;
      WAIT sequin.goAhead;
      ENDLOOP;
    RETURN[PupToSequinBuffer[packet]]
    END;

  GetIfAvailable: PUBLIC ENTRY PROCEDURE [sequin: Handle] RETURNS [Buffer] =
    BEGIN
    packet: PupBuffer;
    IF sequin.broken THEN RETURN WITH ERROR Broken;
    IF (packet ← DequeuePup[@sequin.getQueue]) = NIL THEN RETURN[noBuffer];
    RETURN[PupToSequinBuffer[packet]]
    END;

  Put: PUBLIC PROCEDURE [sequin: Handle, buffer: Buffer] =
    BEGIN
    packet: PupBuffer = SequinToPupBuffer[buffer];
    SetPupContentsBytes[packet, buffer.nBytes];
    Send[sequin, packet, data];
    END;

  GetEmptyBuffer: PUBLIC PROCEDURE RETURNS [Buffer] =
    BEGIN
    packet: PupBuffer = GetFreePupBuffer[];
    SetPupContentsBytes[packet, 0];
    RETURN[PupToSequinBuffer[packet]]
    END;

  ReleaseBuffer: PUBLIC PROCEDURE [buffer: Buffer] =
    BEGIN
    IF buffer = noBuffer THEN RETURN;
    ReturnFreePupBuffer[SequinToPupBuffer[buffer]];
    END;


  -- Procedures exported to SequinPrivate --

  Send: PUBLIC ENTRY PROCEDURE [
    sequin: Handle, packet: PupBuffer, control: SequinControl] =
    BEGIN
    UNTIL Compare[sequin.retransmitSequence + MIN[sequin.allocate, maxAllocate],
		  sequin.id.sendSequence] = ahead OR sequin.broken DO
      WAIT sequin.goAhead;
      ENDLOOP;
    IF sequin.broken THEN {ReturnFreePupBuffer[packet]; RETURN WITH ERROR Broken};
    IF control = data AND sequin.state = init THEN {control ← open; sequin.state ← open};
    sequin.id.control ← control;
    LOOPHOLE[@packet.pupID, LONG POINTER TO SequinID]↑ ← sequin.id;
    IF GetPupContentsBytes[packet] ~= 0 THEN
      sequin.id.sendSequence ← sequin.id.sendSequence + 1;
    packet.pupType ← sequin.pupType;
    SendRequeueable[sequin, packet];
    END;

  SocketWarmer: PUBLIC PROCEDURE [sequin: Handle] =
    -- This procedure is forked as a separate process which tries to keep the socket
    -- as "clean" as possible.  It absorbs incoming packets and initiates
    -- retransmissions as necessary.  Good packets are moved to the getQueue.
    BEGIN
    packet: PupBuffer;
    packetID: LONG POINTER TO SequinID;
    alive: BOOLEAN ← TRUE;

    ProcessPacket: PROCEDURE [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE
      BEGIN
      alive ← TRUE;
      IF packet = NIL THEN
	BEGIN
	SELECT CheckForPing[sequin] FROM
	  no => NULL;
	  retransmit => Retransmit[sequin];
	  check => RespondWithNewPacket[check];
	  dead => RETURN[FALSE];
	  ENDCASE;
	RETURN
	END;
      IF GetPupContentsBytes[packet] > maxBytes THEN RETURN;
      SELECT packet.pupType FROM
	error =>
	  BEGIN OPEN PupTypes;
	  SELECT packet.errorCode FROM
	    noProcessPupErrorCode, cantGetTherePupErrorCode,
	    hostDownPupErrorCode, eightHopsPupErrorCode => RETURN[FALSE];
	    ENDCASE => RETURN;
	  END;
	sequin.pupType => NULL;
	ENDCASE => RETURN;
      packetID ← LOOPHOLE[@packet.pupID];
      IF packetID.control = broken THEN RETURN[FALSE];
      SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM
	duplicate => RETURN;
	ahead =>
	  BEGIN
	  DiscardAckedPackets[sequin];
	  IF packetID.control = restart THEN Retransmit[sequin];
	  IF ~sequin.restartRequested THEN
	    BEGIN
	    sequin.restartRequested ← TRUE;
	    RespondWithCurrentPacket[sequin, restart];
	    END;
	  RETURN
	  END;
	ENDCASE;
      -- we've seen everything from the server
      ResetPinging[];
      DiscardAckedPackets[sequin];
      SELECT packetID.control FROM
	data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]};
	dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]};
	restart => Retransmit[sequin];
	check => RespondWithCurrentPacket[sequin, ack];
	ack, nop => NULL;
	ENDCASE => ERROR ServerIsSpeakingGreek;
      END;

    CheckForPing: ENTRY PROCEDURE [sequin: Handle]
      RETURNS [{no, retransmit, check, dead}] = INLINE
      BEGIN
      SELECT sequin.pings FROM
	0 =>
	  IF Time.Current[] - sequin.lastPacketTime <= idleLatency AND
	    sequin.retransmitQueue.length = 0 THEN RETURN[no];
	maxPings => RETURN[dead];
	ENDCASE;
      IF sequin.state = init THEN RETURN[no];  -- don't ping until first real packet
      sequin.pings ← sequin.pings + 1;
      sequin.recentRestart ← sequin.restartRequested ← FALSE;
      RETURN[
	IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit
	ELSE check]
      END;

    ResetPinging: PROCEDURE = INLINE
      {sequin.pings ← 0; sequin.lastPacketTime ← Time.Current[]};

    DiscardAckedPackets: ENTRY PROCEDURE [sequin: Handle] =
      BEGIN
      sequin.allocate ← packetID.allocate;
      IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN
	BEGIN
	skipped: CARDINAL ← 0;
	UNTIL packetID.receiveSequence = sequin.retransmitSequence OR
	  skipped = sequin.retransmitQueue.length DO
	  b: PupBuffer ← DequeuePup[@sequin.retransmitQueue];
	  IF b = NIL THEN EXIT;  -- buffer hasn't made it to queue yet
	  IF sequin.retransmitSequence ~= LOOPHOLE[b.pupID, SequinID].sendSequence THEN
	    {EnqueuePup[@sequin.retransmitQueue, b]; skipped ← skipped + 1; LOOP};
	  ReturnFreePupBuffer[b];
	  sequin.retransmitSequence ← sequin.retransmitSequence + 1;
	  skipped ← 0;
	  ENDLOOP;
	BROADCAST sequin.goAhead;
	END;
      END;

    EnqueueArrival: ENTRY PROCEDURE [sequin: Handle] = INLINE
      BEGIN
      EnqueuePup[@sequin.getQueue, packet];
      sequin.id.receiveSequence ← sequin.id.receiveSequence + 1;
      sequin.recentRestart ← sequin.restartRequested ← FALSE;
      NOTIFY sequin.goAhead;
      END;

    RespondWithNewPacket: PROCEDURE [control: SequinControl] =
      BEGIN
      packet ← GetFreePupBuffer[];
      packetID ← LOOPHOLE[@packet.pupID];
      RespondWithCurrentPacket[sequin, control];
      END;

    RespondWithCurrentPacket: ENTRY PROCEDURE [sequin: Handle, control: SequinControl] =
      BEGIN
      sequin.id.control ← control;
      packetID↑ ← sequin.id;
      packet.pupType ← sequin.pupType;
      SetPupContentsBytes[packet, 0];
      sequin.socket.put[packet];
      packet ← NIL;
      END;

    MarkDead: ENTRY PROCEDURE [sequin: Handle] =
      BEGIN
      sequin.broken ← TRUE;
      sequin.state ← destroyed;
      BROADCAST sequin.goAhead;
      END;

    Retransmit: ENTRY PROCEDURE [sequin: Handle] =
      BEGIN
      skipped: CARDINAL ← 0;
      seq: SequenceNumber ← sequin.retransmitSequence;
      IF sequin.recentRestart THEN RETURN;
      UNTIL skipped = sequin.retransmitQueue.length DO
	buffer: PupBuffer = DequeuePup[@sequin.retransmitQueue];
	bufferID: LONG POINTER TO SequinID;
	IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer;
	IF (bufferID ← LOOPHOLE[@buffer.pupID]).sendSequence ~= seq THEN
          {EnqueuePup[@sequin.retransmitQueue, buffer]; skipped ← skipped + 1; LOOP};
	bufferID.inPart ← sequin.id.inPart;
	SendRequeueable[sequin, buffer];
	sequin.recentRestart ← TRUE;
	skipped ← 0;
	seq ← IF seq = LAST[SequenceNumber] THEN 0 ELSE seq + 1;
	ENDLOOP;
      END;

    ResetPinging[];
    WHILE alive DO
      packet ← sequin.socket.get[];
      IF ~(alive ← ProcessPacket[sequin]) THEN MarkDead[sequin];
      IF packet ~= NIL THEN ReturnFreePupBuffer[packet];
      ENDLOOP;
    END;

  MakeRequeueClosure: PUBLIC PROCEDURE [sequin: Handle]
    RETURNS [closure: RequeueClosure] =
    BEGIN OPEN Frame;
    Return: PROCEDURE [RequeueClosure] ← LOOPHOLE[GetReturnLink[]];
    Requeue: PROCEDURE [buffer: BufferDefs.Buffer] =
      BEGIN
      DoRequeue: ENTRY PROCEDURE [sequin: Handle] = INLINE
	BEGIN
	sequin.buffersToRequeue ← sequin.buffersToRequeue - 1;
	IF sequin.state = destroyed THEN
	  BEGIN
	  BufferDefs.ReturnFreeBuffer[buffer];
	  IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN};
	  END
	ELSE BufferDefs.Enqueue[@sequin.retransmitQueue, buffer];
	END;

      DoRequeue[sequin];
      END;

    sequin.requeuer ← Requeue;
    Return[MyLocalFrame[]];
    -- never gets here; see SequinImplA.Destroy
    END;


  -- Internal Procedures --

  SendRequeueable: INTERNAL PROCEDURE [sequin: Handle, buffer: PupBuffer] =
    BEGIN
    buffer.requeueProcedure ← sequin.requeuer;
    sequin.buffersToRequeue ← sequin.buffersToRequeue + 1;
    sequin.socket.put[buffer];
    END;

  Compare: PROCEDURE [a, b: SequenceNumber] RETURNS [{equal, duplicate, ahead}] =
    BEGIN
    maxGetAhead: CARDINAL = 128;
    RETURN[
      SELECT TRUE FROM
        a = b => equal,
	a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate,
	ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead]
    END;

  PupToSequinBuffer: PROCEDURE [b: PupBuffer] RETURNS [Buffer] = INLINE
    {RETURN[[data: LOOPHOLE[@b.pupBody], nBytes: GetPupContentsBytes[b],
	      maxBytes: maxBytes]]};

  SequinToPupBuffer: PROCEDURE [b: Buffer] RETURNS [PupBuffer] =
    BEGIN
    IF b.data = NIL THEN ERROR BogusBuffer;
    RETURN[LOOPHOLE[b.data-SIZE[pupBytes BufferDefs.PupBufferObject]]]
    END;


  END.