-- Copyright (C) 1981, 1984  by Xerox Corporation. All rights reserved. 
-- File:  SequinImplB.mesa

-- HGM: 10-Dec-84 21:24:46
-- last edited by Hankins:   6-Aug-84 13:38:30	
--    Klamath update (pup changes, sequinClosure change)
-- Last edited by Levin:   6-Jul-81 16:15:00
-- Loosely derived (after extensive discussions with Wobber) 
--   from Butterfield's Sequin.mesa of August 27, 1979  2:49 PM.

DIRECTORY
  Buffer USING [Buffer, BufferObject, Enqueue, GetBuffer, ReturnBuffer],
  PupDefs USING [
    DequeuePup, EnqueuePup, GetPupContentsBytes, PupBuffer, SetPupContentsBytes],
  PupTypes USING [BufferBody, PupType],
  Sequin USING [Buffer, noBuffer],
  SequinPrivate USING [
    bufferPool, Handle, LongHandle, maxAllocate, maxBytes, maxPings, Seconds,
    SequenceNumber, SequinID, SequinControl],
  Time USING [Current];

SequinImplB: MONITOR LOCKS sequin.LOCK USING sequin: SequinPrivate.Handle
  IMPORTS Buffer, PupDefs, SequinPrivate, Time EXPORTS Sequin, SequinPrivate =

  BEGIN

  -- Types --

  Handle: PUBLIC TYPE = SequinPrivate.Handle;

  -- Miscellaneous Declarations --

  BogusBuffer: ERROR = CODE;
  ServerIsSpeakingGreek: ERROR = CODE;
  WhereDidIPutThatBuffer: ERROR = CODE;

  idleLatency: SequinPrivate.Seconds = 10;

  -- Procedures and Signals Exported to Sequin --

  Broken: PUBLIC ERROR = CODE;

  Get: PUBLIC ENTRY PROCEDURE [sequin: Handle] RETURNS [Sequin.Buffer] =
    BEGIN
    packet: PupDefs.PupBuffer;
    DO
      IF sequin.broken THEN RETURN WITH ERROR Broken;
      IF (packet ← PupDefs.DequeuePup[@sequin.getQueue]) ~= NIL THEN EXIT;
      WAIT sequin.goAhead;
      ENDLOOP;
    RETURN[PupToSequinBuffer[packet]]
    END;

  GetIfAvailable: PUBLIC ENTRY PROC [sequin: Handle] RETURNS [Sequin.Buffer] =
    BEGIN
    packet: PupDefs.PupBuffer;
    IF sequin.broken THEN RETURN WITH ERROR Broken;
    IF (packet ← PupDefs.DequeuePup[@sequin.getQueue]) = NIL THEN
      RETURN[Sequin.noBuffer];
    RETURN[PupToSequinBuffer[packet]]
    END;

  Put: PUBLIC PROCEDURE [sequin: Handle, buffer: Sequin.Buffer] =
    BEGIN
    packet: PupDefs.PupBuffer = SequinToPupBuffer[buffer];
    PupDefs.SetPupContentsBytes[packet, buffer.nBytes];
    Send[sequin, packet, data];
    END;

  GetEmptyBuffer: PUBLIC PROCEDURE RETURNS [Sequin.Buffer] =
    BEGIN
    packet: PupDefs.PupBuffer = Buffer.GetBuffer[
      type: pup, aH: SequinPrivate.bufferPool, function: send];
    --is send correct?
    PupDefs.SetPupContentsBytes[packet, 0];
    RETURN[PupToSequinBuffer[packet]]
    END;

  ReleaseBuffer: PUBLIC PROCEDURE [buffer: Sequin.Buffer] =
    BEGIN
    IF buffer = Sequin.noBuffer THEN RETURN;
    Buffer.ReturnBuffer[SequinToPupBuffer[buffer]];
    END;


  -- Procedures exported to SequinPrivate --

  Send: PUBLIC ENTRY PROCEDURE [
    sequin: Handle, packet: PupDefs.PupBuffer,
    control: SequinPrivate.SequinControl] =
    BEGIN
    UNTIL Compare[
      sequin.retransmitSequence + MIN[sequin.allocate, SequinPrivate.maxAllocate],
      sequin.id.sendSequence] = ahead OR sequin.broken DO
      WAIT sequin.goAhead; ENDLOOP;
    IF sequin.broken THEN {Buffer.ReturnBuffer[packet]; RETURN WITH ERROR Broken};
    IF control = data AND sequin.state = init THEN {
      control ← open; sequin.state ← open};
    sequin.id.control ← control;
    LOOPHOLE[@packet.pup.pupID, LONG POINTER TO SequinPrivate.SequinID]↑ ←
      sequin.id;
    IF PupDefs.GetPupContentsBytes[packet] ~= 0 THEN
      sequin.id.sendSequence ← sequin.id.sendSequence + 1;
    packet.pup.pupType ← sequin.pupType;
    SendRequeueable[sequin, packet];
    END;

  SocketWarmer: PUBLIC PROCEDURE [sequin: Handle] =
    -- This procedure is forked as a separate process which tries to keep the
    -- socket as "clean" as possible.  It absorbs incoming packets and initiates
    -- retransmissions as necessary.  Good packets are moved to the getQueue.
    BEGIN
    packet: PupDefs.PupBuffer;
    packetID: LONG POINTER TO SequinPrivate.SequinID;
    alive: BOOLEAN ← TRUE;

    ProcessPacket: PROC [sequin: Handle] RETURNS [alive: BOOLEAN] = INLINE
      BEGIN
      alive ← TRUE;
      IF packet = NIL THEN
        BEGIN
        SELECT CheckForPing[sequin] FROM
          no => NULL;
          retransmit => Retransmit[sequin];
          check => RespondWithNewPacket[check];
          dead => RETURN[FALSE];
          ENDCASE;
        RETURN
        END;
      IF PupDefs.GetPupContentsBytes[packet] > SequinPrivate.maxBytes THEN RETURN;
      SELECT packet.pup.pupType FROM
        error =>
          BEGIN OPEN PupTypes;
          SELECT packet.pup.errorCode FROM
            noProcessPupErrorCode, cantGetTherePupErrorCode, hostDownPupErrorCode,
              eightHopsPupErrorCode => RETURN[FALSE];
            ENDCASE => RETURN;
          END;
        sequin.pupType => NULL;
        ENDCASE => RETURN;
      packetID ← LOOPHOLE[@packet.pup.pupID];
      IF packetID.control = broken THEN RETURN[FALSE];
      SELECT Compare[packetID.sendSequence, sequin.id.receiveSequence] FROM
        duplicate => RETURN;
        ahead =>
          BEGIN
          DiscardAckedPackets[sequin];
          IF packetID.control = restart THEN Retransmit[sequin];
          IF ~sequin.restartRequested THEN
            BEGIN
            sequin.restartRequested ← TRUE;
            RespondWithCurrentPacket[sequin, restart];
            END;
          RETURN
          END;
        ENDCASE;
      -- we've seen everything from the server
      ResetPinging[];
      DiscardAckedPackets[sequin];
      SELECT packetID.control FROM
        data => {EnqueueArrival[sequin]; RespondWithNewPacket[ack]};
        dallying => {RespondWithCurrentPacket[sequin, quit]; RETURN[FALSE]};
        restart => Retransmit[sequin];
        check => RespondWithCurrentPacket[sequin, ack];
        ack, nop => NULL;
        ENDCASE => ERROR ServerIsSpeakingGreek;
      END;

    CheckForPing: ENTRY PROCEDURE [sequin: Handle]
      RETURNS [{no, retransmit, check, dead}] = INLINE
      BEGIN
      SELECT sequin.pings FROM
        0 =>
          IF Time.Current[] - sequin.lastPacketTime <= idleLatency
            AND sequin.retransmitQueue.length = 0 THEN RETURN[no];
        SequinPrivate.maxPings => RETURN[dead];
        ENDCASE;
      IF sequin.state = init THEN RETURN[no];  -- don't ping until first real packet
      sequin.pings ← sequin.pings + 1;
      sequin.recentRestart ← sequin.restartRequested ← FALSE;
      RETURN[
        IF sequin.retransmitQueue.length = 1 AND sequin.pings = 1 THEN retransmit
        ELSE check]
      END;

    ResetPinging: PROCEDURE = INLINE {
      sequin.pings ← 0; sequin.lastPacketTime ← Time.Current[]};

    DiscardAckedPackets: ENTRY PROCEDURE [sequin: Handle] =
      BEGIN
      sequin.allocate ← packetID.allocate;
      IF Compare[packetID.receiveSequence, sequin.retransmitSequence] = ahead THEN
        BEGIN
        skipped: CARDINAL ← 0;
        UNTIL packetID.receiveSequence = sequin.retransmitSequence
          OR skipped = sequin.retransmitQueue.length DO
          b: PupDefs.PupBuffer ← PupDefs.DequeuePup[@sequin.retransmitQueue];
          IF b = NIL THEN EXIT;  -- buffer hasn't made it to queue yet
          IF sequin.retransmitSequence ~= LOOPHOLE[b.pup.pupID,
            SequinPrivate.SequinID].sendSequence THEN {
            PupDefs.EnqueuePup[@sequin.retransmitQueue, b];
            skipped ← skipped + 1;
            LOOP};
          Buffer.ReturnBuffer[b];
          sequin.retransmitSequence ← sequin.retransmitSequence + 1;
          skipped ← 0;
          ENDLOOP;
        BROADCAST sequin.goAhead;
        END;
      END;

    EnqueueArrival: ENTRY PROCEDURE [sequin: Handle] = INLINE
      BEGIN
      PupDefs.EnqueuePup[@sequin.getQueue, packet];
      sequin.id.receiveSequence ← sequin.id.receiveSequence + 1;
      sequin.recentRestart ← sequin.restartRequested ← FALSE;
      NOTIFY sequin.goAhead;
      END;

    RespondWithNewPacket: PROCEDURE [control: SequinPrivate.SequinControl] =
      BEGIN
      packet ← Buffer.GetBuffer[
        type: pup, aH: SequinPrivate.bufferPool, function: send];
      --is send correct?
      packetID ← LOOPHOLE[@packet.pup.pupID];
      RespondWithCurrentPacket[sequin, control];
      END;

    RespondWithCurrentPacket: ENTRY PROCEDURE [
      sequin: Handle, control: SequinPrivate.SequinControl] =
      BEGIN
      sequin.id.control ← control;
      packetID↑ ← sequin.id;
      packet.pup.pupType ← sequin.pupType;
      PupDefs.SetPupContentsBytes[packet, 0];
      sequin.socket.put[packet];
      packet ← NIL;
      END;

    MarkDead: ENTRY PROCEDURE [sequin: Handle] =
      BEGIN
      sequin.broken ← TRUE;
      sequin.state ← destroyed;
      BROADCAST sequin.goAhead;
      END;

    Retransmit: ENTRY PROCEDURE [sequin: Handle] =
      BEGIN
      skipped: CARDINAL ← 0;
      seq: SequinPrivate.SequenceNumber ← sequin.retransmitSequence;
      IF sequin.recentRestart THEN RETURN;
      UNTIL skipped = sequin.retransmitQueue.length DO
        buffer: PupDefs.PupBuffer = PupDefs.DequeuePup[@sequin.retransmitQueue];
        bufferID: LONG POINTER TO SequinPrivate.SequinID;
        IF buffer = NIL THEN ERROR WhereDidIPutThatBuffer;
        IF (bufferID ← LOOPHOLE[@buffer.pup.pupID]).sendSequence ~= seq THEN {
          PupDefs.EnqueuePup[@sequin.retransmitQueue, buffer];
          skipped ← skipped + 1;
          LOOP};
        bufferID.inPart ← sequin.id.inPart;
        SendRequeueable[sequin, buffer];
        sequin.recentRestart ← TRUE;
        skipped ← 0;
        seq ← IF seq = LAST[SequinPrivate.SequenceNumber] THEN 0 ELSE seq + 1;
        ENDLOOP;
      END;

    ResetPinging[];
    WHILE alive DO
      packet ← sequin.socket.get[];
      IF ~(alive ← ProcessPacket[sequin]) THEN MarkDead[sequin];
      IF packet ~= NIL THEN Buffer.ReturnBuffer[packet];
      ENDLOOP;
    END;

  Requeue: PROCEDURE [buffer: Buffer.Buffer] =
    BEGIN
    longUnspecified: SequinPrivate.LongHandle ← LOOPHOLE[buffer.requeueData];
    DoRequeue: ENTRY PROCEDURE [sequin: Handle] = INLINE
      BEGIN
      sequin.buffersToRequeue ← sequin.buffersToRequeue - 1;
      IF sequin.state = destroyed THEN
        BEGIN
        Buffer.ReturnBuffer[buffer];
        IF sequin.buffersToRequeue = 0 THEN {NOTIFY sequin.goAhead; RETURN};
        END
      ELSE Buffer.Enqueue[@sequin.retransmitQueue, buffer];
      END;
    DoRequeue[longUnspecified.shortHandle];
    END;


  -- Internal Procedures --

  SendRequeueable: INTERNAL PROC [sequin: Handle, buffer: PupDefs.PupBuffer] =
    BEGIN
    longUnspecified: SequinPrivate.LongHandle ← [sequin, 0];
    buffer.requeueProcedure ← Requeue;
    buffer.requeueData ← LOOPHOLE[longUnspecified];
    sequin.buffersToRequeue ← sequin.buffersToRequeue + 1;
    sequin.socket.put[buffer];
    END;

  Compare: PROCEDURE [a, b: SequinPrivate.SequenceNumber]
    RETURNS [{equal, duplicate, ahead}] =
    BEGIN
    maxGetAhead: CARDINAL = 128;
    RETURN[
      SELECT TRUE FROM
        a = b => equal,
        a > b => IF a <= b + maxGetAhead THEN ahead ELSE duplicate,
        ENDCASE => IF b <= a + maxGetAhead THEN duplicate ELSE ahead]
    END;

  PupToSequinBuffer: PROC [b: PupDefs.PupBuffer] RETURNS [Sequin.Buffer] = INLINE
    {
    RETURN[
      [
        data: LOOPHOLE[@b.pup.pupBody], nBytes: PupDefs.GetPupContentsBytes[b],
        maxBytes: SequinPrivate.maxBytes]]};

  positionInBuffer: CARDINAL = 42;  -- data offset into PupDefs.PupBuffer

  SequinToPupBuffer: PROC [b: Sequin.Buffer] RETURNS [PupDefs.PupBuffer] =
    BEGIN
    IF b.data = NIL THEN ERROR BogusBuffer;
    RETURN[LOOPHOLE[b.data - positionInBuffer]];
    -- old: RETURN[LOOPHOLE[b.data-SIZE[pupBytes Buffer.PupBufferObject]]]
    -- **** it is possible for the defn of PupDefs.PupBuffer and
    -- PupTypes.BufferBody to change such that this var positionInBuffer is 
    -- no longer correct even if check in mainline code does not fail.
    END;

  -- mainline code:

  IF SIZE[pupBytes PupTypes.BufferBody] + SIZE[Buffer.BufferObject] # (67B + 12B)
    THEN ERROR;
  -- defn of these two types has changed and the positionInBuffer var must be altered.

  END.