-- Copyright (C) 1984, 1985  by Xerox Corporation. All rights reserved. 
-- BSPSinkNoDisk.mesa,HGM, 25-Jun-85  3:14:17

DIRECTORY
  Environment USING [Byte],
  Process USING [Yield],
  Put USING [Line],
  Stream USING [Handle, GetBlock, Delete],
  String USING [AppendChar, AppendString],
  Time USING [AppendCurrent],

  PupDefs USING [PupPackageMake, PupPackageDestroy, AppendHostName, veryLongWait],
  PupStream USING [
    CreatePupByteStreamListener, DestroyPupListener, RejectThisRequest,
    StreamClosing, PupListener, PupAddress],
  PupTypes USING [bspTestSoc],
  Stats USING [StatCounterIndex, StatIncr, StatBump, StatsStringToIndex];

BSPSinkNoDisk: MONITOR
  IMPORTS
    Process, Put, Stream, String, Time,
    PupDefs, PupStream, Stats =
  BEGIN OPEN Stats, PupDefs;
  
  statBytesReceived: PUBLIC StatCounterIndex;
  statConnectionsOpened: PUBLIC StatCounterIndex;

  useCount: CARDINAL ← 0;
  pleaseStop, running, verbose, superQuiet: BOOLEAN ← FALSE;
  listener: PupStream.PupListener;

  sinks: CARDINAL ← 0;
  maxSinks: CARDINAL ← 4;

  Init: PROCEDURE =
    BEGIN
    statBytesReceived ← StatsStringToIndex["BSP Sink - Bytes received"];
    statConnectionsOpened ← StatsStringToIndex["BSP Sink - Connections opened"];
    END;

  ListenerOn: PUBLIC PROCEDURE =
    BEGIN
    IF (useCount ← useCount + 1) = 1 THEN
      BEGIN
      running ← TRUE;
      Starter[];
      END;
    END;

  Starter: PROCEDURE =
    BEGIN
    pleaseStop ← FALSE;
    [] ← PupDefs.PupPackageMake[];
    listener ← PupStream.CreatePupByteStreamListener[
      PupTypes.bspTestSoc, Sink, veryLongWait, Check];
    END;

  Check: ENTRY PROCEDURE [who: PupStream.PupAddress] =
    BEGIN
    IF sinks >= maxSinks THEN
      PupStream.RejectThisRequest["Sorry, I'm full now."L];
    sinks ← sinks + 1;
    Stats.StatIncr[statConnectionsOpened];
    END;

  ListenerOff: PUBLIC PROCEDURE =
    BEGIN
    IF useCount # 0 AND (useCount ← useCount - 1) = 0 THEN
      BEGIN
      running ← FALSE;
      Stopper[];
      END;
    END;

  Stopper: PROCEDURE =
    BEGIN
    pleaseStop ← TRUE;
    UNTIL sinks = 0 DO Process.Yield[]; ENDLOOP;
    PupStream.DestroyPupListener[listener];
    PupDefs.PupPackageDestroy[];
    END;

  Sink: PROCEDURE [stream: Stream.Handle, who: PupStream.PupAddress] =
    BEGIN
    KillSinkLocked: ENTRY PROCEDURE = BEGIN sinks ← sinks - 1; END;
    bytesPerClump: CARDINAL = 512;
    buffer: PACKED ARRAY [0..bytesPerClump) OF Environment.Byte;
    bytes: CARDINAL;
    Announce[who, "Creating"L];
    BEGIN
    ENABLE PupStream.StreamClosing => CONTINUE;
    UNTIL pleaseStop DO
      [bytes, ] ← Stream.GetBlock[stream, [@buffer, 0, bytesPerClump]];
      Stats.StatBump[statBytesReceived, bytes];
      ENDLOOP;
    END;
    Stream.Delete[stream];
    Announce[who, "Destroying"L];
    KillSinkLocked[];
    END;

  Announce: PROCEDURE [who: PupStream.PupAddress, arg: STRING] =
    BEGIN
    text: STRING = [100];
    IF superQuiet THEN RETURN;
    Time.AppendCurrent[text];
    String.AppendString[text, "  BSP: "L];
    String.AppendString[text, arg];
    String.AppendString[text, " BSP connection for "L];
    PupDefs.AppendHostName[text, who];
    String.AppendChar[text, '.];
    Put.Line[NIL, text];
    END;


  -- initialization
  Init[];
  ListenerOn[];
  END.