-- Copyright (C) 1981, 1982, 1984, 1985  by Xerox Corporation. All rights reserved. 
-- ReadInput.mesa, Transport Mechanism Mail Server - algorithm to process input queue --

-- HGM, 17-Oct-85 21:58:07
-- Randy Gobbel		19-May-81 23:20:09 --
-- Andrew Birrell	30-Dec-81 15:25:29 --
-- Mark Johnson 	 7-Jan-82 15:28:38 --
-- Ted Wobber	 	 2-Nov-82 11:08:23
-- Brenda Hankins 	15-Aug-84 16:19:11

DIRECTORY
  BodyDefs USING [maxRNameLength, PackedTime, RName, Timestamp],
  Buffer USING [AccessHandle, GetBuffer, MakePool],
  Heap USING [systemZone],
  HeapDefs USING [
    HeapReadData, HeapReadRName, HeapEndRead, HeapStartWrite, HeapWriteData,
    HeapWriteRName, ObjectNumber, ReaderHandle, ReadRList, WriterHandle],
  Inline USING [LongCOPY],
  LogDefs USING [DisplayNumber, WriteChar, WriteLogEntry],
  LogPrivateDefs USING [startUpTime],
  MailboxDefs USING [MBXWrite, Remail, WaitForUnlocked],
  NameInfoDefs USING [Close, Enumerate],
  PolicyDefs USING [
    CheckOperation, EndOperation, Operation, ReadPendingPause, Wait, WaitOperation],
  PupDefs USING [
    PupAddress, PupBuffer, PupRouterSendThis, SetPupContentsWords,
    UniqueLocalPupAddress],
  Process USING [Detach, DisableTimeout, Pause],
  ProtocolDefs USING [AppendTimestamp, mailServerPollingSocket],
  RestartDefs USING [] --EXPORT -- ,
  ReturnDefs USING [BadGroup, BadRecipients, LongTerm],
  ServerDefs USING [EnumerateAll, ServerHandle],
  SiteCacheDefs USING [
    FindMBXSite, NeedToRemail, RecipientInfo, RemailInfo, SingleFlush],
  SLDefs USING [
    GetCount, SLHeader, SLStartRead, SLEndRead, SLWrite, SLQueue, SLReadHandle,
    SLTransfer, WaitForNonEmpty],
  String USING [AppendChar, AppendDecimal, AppendString, AppendLongDecimal, EquivalentStrings],
  Time USING [Current, Packed, Unpack];

ReadInput: MONITOR
  IMPORTS
    Buffer, Heap, HeapDefs, LogDefs, LogPrivateDefs, Inline, MailboxDefs, NameInfoDefs,
    PolicyDefs, PupDefs, Process, ProtocolDefs, ReturnDefs, ServerDefs,
    SiteCacheDefs, SLDefs, String, Time
  EXPORTS MailboxDefs, RestartDefs =
  BEGIN

  BadSL: ERROR = CODE;
  
  -- statistics --
  messageTotal, localTotal, remoteTotal, badTotal: LONG CARDINAL ← 0;
  foreignTotal, dlTotal, skippedTotal: LONG CARDINAL ← 0;

  recipientLog: LONG STRING ← Heap.systemZone.NEW[StringBody[150]];  -- about 10 recipients --
  NoteRecipient: PROC [name: BodyDefs.RName] =
    BEGIN
    dot: STRING = "..."L;
    IF recipientLog.length + 1 + name.length + 1 + dot.length < recipientLog.maxlength THEN
      BEGIN
      String.AppendChar[recipientLog, ' ];
      String.AppendString[recipientLog, name];
      END
    ELSE
      IF recipientLog.length + 1 + dot.length < recipientLog.maxlength THEN
        BEGIN
        String.AppendChar[recipientLog, ' ];
        String.AppendString[recipientLog, dot];
        END;
    END;

  -- time-outs --

  shortTermLimit: CARDINAL = 2 --days-- ;
  longTermLimit: CARDINAL = 4 --days-- ;
  timeoutDisabled: BOOLEAN ← TRUE;

  bufferPool: Buffer.AccessHandle;

  flushSource: PupDefs.PupAddress = PupDefs.UniqueLocalPupAddress[NIL];

  TellOtherServer: PROC [received: BodyDefs.Timestamp, name: BodyDefs.RName] =
    BEGIN
    -- non-local name, tell the M-Server that gave us the msg to flush it's cache
    IF received.net # 0 AND received.host # 0 THEN
      BEGIN
      b: PupDefs.PupBuffer = Buffer.GetBuffer[
        type: pup, aH: bufferPool, function: send];
      b.pup.dest ← [
        net: [received.net], host: [received.host],
        socket: ProtocolDefs.mailServerPollingSocket];
      b.pup.source ← flushSource;
      Inline.LongCOPY[
        from: name, to: @(b.pup.pupWords),
        nwords: SIZE[StringBody [name.length]]];
      PupDefs.SetPupContentsWords[b, SIZE[StringBody [name.length]]];
      b.pup.pupType ← LOOPHOLE[215B];
      PupDefs.PupRouterSendThis[b];
      END
    -- ELSE message came to us from a client -- ;
    END;

  Sort: PROCEDURE [
    handle: SLDefs.SLReadHandle, body: HeapDefs.ObjectNumber,
    SL: HeapDefs.ReaderHandle] =
    BEGIN
    -- Despite its name, this procedure has no sorting algorithm as such,
    -- since the delivery sites for a message are determined in a single
    -- pass through its steering list
    pendingSL: HeapDefs.WriterHandle ← NIL;
    lockedSL: HeapDefs.WriterHandle ← NIL;
    badList: HeapDefs.WriterHandle ← NIL;
    slHeader: SLDefs.SLHeader;
    localCount, remoteCount, pendingCount, badCount, lockCount: CARDINAL ← 0;
    foreignCount, dlCount, skippedCount: CARDINAL ← 0;

    MakeBad: PROC [recipient: BodyDefs.RName] =
      BEGIN
      IF badList = NIL THEN badList ← HeapDefs.HeapStartWrite[temp];
      HeapDefs.HeapWriteRName[badList, recipient];
      END;

    MakeLocked: PROC [recipient: BodyDefs.RName] =
      BEGIN
      IF lockedSL = NIL THEN
        BEGIN
        lockedSL ← HeapDefs.HeapStartWrite[SLpending];
        slHeader.server ← NIL;
        HeapDefs.HeapWriteData[lockedSL, [@slHeader, SIZE[SLDefs.SLHeader]]];
        END;
      HeapDefs.HeapWriteRName[lockedSL, recipient];
      lockCount ← lockCount + 1;
      END;

    MakePending: PROC [recipient: BodyDefs.RName] =
      BEGIN
      IF pendingSL = NIL THEN
        BEGIN
        pendingSL ← HeapDefs.HeapStartWrite[SLpending];
        slHeader.server ← NIL;
        HeapDefs.HeapWriteData[pendingSL, [@slHeader, SIZE[SLDefs.SLHeader]]];
        END;
      HeapDefs.HeapWriteRName[pendingSL, recipient];
      pendingCount ← pendingCount + 1;
      END;

    HaveIDoneThisOne: SIGNAL [new: BodyDefs.RName] RETURNS [BOOLEAN] = CODE;
    -- This signal is used to prevent recursive loops when expanding
    -- recursively defined DL's. --

    Deliver: PROCEDURE [recipient: BodyDefs.RName] RETURNS [done: BOOLEAN] =
      BEGIN  -- deliver to one recipient, who may be a group --
      DeliverLocal: ENTRY PROC RETURNS [info: SiteCacheDefs.RecipientInfo] =
        BEGIN  -- part of the cache-flush synchronization arrangements --
        info ←
          IF String.EquivalentStrings[recipient, "ExpressMail↑.ms"] THEN [
          notFound[]] ELSE SiteCacheDefs.FindMBXSite[recipient];
        WITH info SELECT FROM
          local =>
            IF MailboxDefs.MBXWrite[recipient, body, slHeader.created] THEN {
              LogDefs.WriteChar['L]; localCount ← localCount + 1}
            ELSE {LogDefs.WriteChar['M]; MakeLocked[recipient]};
          allDown, notFound => NULL;
          ENDCASE => TellOtherServer[slHeader.received, recipient];
        END;
      info: SiteCacheDefs.RecipientInfo = DeliverLocal[];
      done ← FALSE;
      WITH info SELECT FROM
        dl =>
          BEGIN
          oldBadList: HeapDefs.WriterHandle = badList;
          badList ← NIL;
          LogDefs.WriteChar['G];
	  dlCount ← dlCount + 1;
          IF ~(SIGNAL HaveIDoneThisOne[recipient]) THEN
	    BEGIN
            NameInfoDefs.Enumerate[
              members, Deliver !
              HaveIDoneThisOne =>
                IF String.EquivalentStrings[recipient, new] THEN RESUME [TRUE]];
	    END
	  ELSE skippedCount ← skippedCount + 1;
          IF badList # NIL THEN ReturnDefs.BadGroup[badList, body, recipient];
          badList ← oldBadList;
          NameInfoDefs.Close[members];
          END;
        foreign =>
          BEGIN
          LogDefs.WriteChar['X];
          IF ~(SIGNAL HaveIDoneThisOne[recipient]) THEN
	    BEGIN
            HeapDefs.ReadRList[
              members, Deliver !
              HaveIDoneThisOne =>
                IF String.EquivalentStrings[recipient, new] THEN RESUME [TRUE]];
	    END
	  ELSE skippedCount ← skippedCount + 1;
          HeapDefs.HeapEndRead[members];
          foreignCount ← foreignCount + 1;
          END;
        local => NULL -- done by DeliverLocal -- ;
        found =>
          BEGIN
          LogDefs.WriteChar['F];
          IF server.SL = NIL THEN
            BEGIN
            server.SL ← HeapDefs.HeapStartWrite[SLforward];
            slHeader.server ← server;
            HeapDefs.HeapWriteData[server.SL, [@slHeader, SIZE[SLDefs.SLHeader]]];
            END;
          HeapDefs.HeapWriteRName[server.SL, recipient];
          remoteCount ← remoteCount + 1;
          END;
        allDown => {LogDefs.WriteChar['P]; MakePending[recipient]};
        notFound =>
          BEGIN
          LogDefs.WriteChar['B];
          MakeBad[recipient];
          badCount ← badCount + 1;
          END;
        ENDCASE => ERROR;
      -- The loop calling this procedure may use lots of processor time:
      Process.Pause[1];
      END;

    BEGIN
    startTime: Time.Packed ← Time.Current[];
    reject: BOOLEAN;
    pendingSL ← NIL;
    badList ← NIL;
    BEGIN
    ended: BOOLEAN;
    used: CARDINAL;
    [ended, used] ← HeapDefs.HeapReadData[SL, [@slHeader, SIZE[SLDefs.SLHeader]]];
    IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[];
    reject ← ElapsedDays[LOOPHOLE[slHeader.created.time], longTermLimit]
      OR ElapsedDays[LOOPHOLE[slHeader.received.time], shortTermLimit];
    recipientLog.length ← 0;
    String.AppendString[recipientLog, "RecipientLog "L];
    ProtocolDefs.AppendTimestamp[recipientLog, slHeader.created];
    String.AppendChar[recipientLog, ':];
    UNTIL ended DO
      recipient: BodyDefs.RName = [BodyDefs.maxRNameLength];
      ended ← HeapDefs.HeapReadRName[SL, recipient];
      NoteRecipient[recipient];
      IF reject THEN MakeBad[recipient]
      ELSE [] ← Deliver[recipient ! HaveIDoneThisOne => RESUME [FALSE]];
      ENDLOOP;
    END;
    HeapDefs.HeapEndRead[SL];
    IF reject AND badList # NIL THEN
      BEGIN
      log: LONG STRING ← Heap.systemZone.NEW[StringBody[128]];
      ReturnDefs.LongTerm[badList, body];
      String.AppendString[log, "Long-term timeout on "L];
      ProtocolDefs.AppendTimestamp[log, slHeader.created];
      LogDefs.WriteLogEntry[log];
      Heap.systemZone.FREE[@log];
      END
    ELSE
      BEGIN
      EndServerSL: PROCEDURE [server: ServerDefs.ServerHandle] =
        BEGIN  -- this is executed inside the ServerInfo monitor --
        IF server.SL # NIL THEN
          BEGIN
          SLDefs.SLWrite[body, server.SL, forward];
          server.SL ← NIL;
          END;
        END;
      IF lockedSL # NIL THEN SLDefs.SLWrite[body, lockedSL, mailbox];
      IF pendingSL # NIL THEN SLDefs.SLWrite[body, pendingSL, pending];
      ServerDefs.EnumerateAll[EndServerSL];
      IF badList # NIL THEN ReturnDefs.BadRecipients[badList, body];
      IF pendingCount # 0 OR localCount # 0 OR remoteCount # 0 OR badCount # 0
        OR lockCount # 0 THEN
        BEGIN
        log: LONG STRING ← Heap.systemZone.NEW[StringBody[200]];
        called: BOOLEAN ← FALSE;
        Add: PROC [count: CARDINAL, text: STRING] =
          BEGIN
          IF count # 0 THEN
            BEGIN
            String.AppendString[log, ", "L];
            String.AppendDecimal[log, count];
            String.AppendString[log, text];
            called ← TRUE;
            END;
          END;
        String.AppendString[log, "Delivered "L];
        ProtocolDefs.AppendTimestamp[log, slHeader.created];
        Add[localCount, " local"L];
        Add[remoteCount, " remote"L];
        Add[pendingCount, " pending"L];
        Add[badCount, " bad"L];
        Add[lockCount, " locked"L];
        Add[foreignCount, " foreign"L];
        Add[dlCount, " DLs"L];
        Add[skippedCount, " DLs skipped"L];
        IF ~called THEN String.AppendString[log, ", no recipients"L];
        LogDefs.WriteLogEntry[log];
        Heap.systemZone.FREE[@log];
        LogDefs.WriteLogEntry[recipientLog];
        messageTotal ← messageTotal + 1;
        localTotal ← localTotal + localCount;
        remoteTotal ← remoteTotal + remoteCount;
        badTotal ← badTotal + badCount;
        foreignTotal ← foreignTotal + foreignCount;
        dlTotal ← dlTotal + dlCount;
        skippedTotal ← skippedTotal + skippedCount;
        END;
      END;
    SLDefs.SLEndRead[handle];
    LogFinished[slHeader.created, startTime];
    END;
    END;  -- Sort

  LogFinished: PROC [stamp: BodyDefs.Timestamp, startTime: Time.Packed] =
    BEGIN
    log: LONG STRING;
    seconds: LONG CARDINAL ← Time.Current[]-startTime;
    IF seconds < 30 THEN RETURN;
    log ← Heap.systemZone.NEW[StringBody[128]];
    String.AppendString[log, "Processing "L];
    ProtocolDefs.AppendTimestamp[log, stamp];
    String.AppendString[log, " took "L];
    String.AppendLongDecimal[log, seconds];
    String.AppendString[log, " seconds"L];
    LogDefs.WriteLogEntry[log];
    Heap.systemZone.FREE[@log];
    END;

  daySecs: LONG CARDINAL = 24 * LONG[60 * 60];

  ElapsedDays: PROC [from: BodyDefs.PackedTime, interval: CARDINAL]
    RETURNS [tooMany: BOOLEAN] =
    -- Determines whether more than "interval" working days have elapsed
    -- since "from".
    BEGIN
    -- For every five working days, we allow seven elapsed days --
    -- For under five working days, must correct for weekends --
    now: Time.Packed = Time.Current[];
    completeWeeks: CARDINAL = (interval / 5) * 7;
    partialWeek: CARDINAL = interval MOD 5;
    limit: LONG CARDINAL ← from + (completeWeeks + partialWeek) * daySecs;
    IF timeoutDisabled OR now < limit THEN RETURN[FALSE]  -- optimize most calls 
    ELSE
      BEGIN
      limitDay: CARDINAL = Time.Unpack[LOOPHOLE[limit, Time.Packed]].weekday;
      saturday: CARDINAL = 5;
      sunday: CARDINAL = 6;
      SELECT limitDay FROM
        saturday =>
          SELECT partialWeek FROM
            0 => NULL;  -- any time in the weekend is ok --
            ENDCASE => limit ← limit + 2 * daySecs;
        sunday =>
          SELECT partialWeek FROM
            0 => NULL;  -- any time in the weekend is ok --
            1 =>  -- limit should be set to end of following Monday --
              limit ← limit + 2 * daySecs;
            ENDCASE => limit ← limit + 2 * daySecs;
        ENDCASE =>
          IF limitDay < partialWeek THEN
            limit ← limit + MIN[partialWeek - limitDay, 2] * daySecs;
      RETURN[now > limit]
      END
    END;

  SeeIfTimeToStartTimeouts: PROCEDURE =
    BEGIN  -- check uptime to see if greater than 6 hours.
    timeoutDisabled ← Time.Current[] - LogPrivateDefs.startUpTime < 6 * 60 * 60
    END;

  forever: BOOLEAN ← FALSE;

  SortQueue: PROC [queue: SLDefs.SLQueue] =
    BEGIN
    which: Class = SELECT queue FROM input => normal, ENDCASE => other;
    operation: PolicyDefs.Operation =
      SELECT queue FROM
        input => readInput,
        pending => readPending,
        mailbox => readMailbox,
        ENDCASE => ERROR;
    DO
      IF timeoutDisabled THEN SeeIfTimeToStartTimeouts[];
      IF forever THEN SLDefs.WaitForNonEmpty[queue];
      SELECT queue FROM
        input => NULL;
        pending => PolicyDefs.ReadPendingPause[];
        mailbox => MailboxDefs.WaitForUnlocked[];
        ENDCASE => ERROR;
      ClaimSorter[which];
      THROUGH [0..SLDefs.GetCount[queue]) DO
        SL: HeapDefs.ReaderHandle;
        handle: SLDefs.SLReadHandle;
        body: HeapDefs.ObjectNumber;
        IF queue = input OR ~PolicyDefs.CheckOperation[operation] THEN
          BEGIN
          ReleaseSorter[which];
          IF forever THEN PolicyDefs.WaitOperation[operation]
          ELSE {IF ~PolicyDefs.CheckOperation[operation] THEN RETURN};
          ClaimSorter[which];
          END;
        [handle, body, SL] ← SLDefs.SLStartRead[queue];
        Sort[handle, body, SL];
        PolicyDefs.EndOperation[operation];
        ENDLOOP;
      ReleaseSorter[which];
      IF ~forever THEN EXIT;
      ENDLOOP;
    END;


  otherQueueCount: CARDINAL ← 0;  -- give priority to non input queues --
  waiters: CARDINAL ← 0;  -- number of processes wanting the sorter --
  sorterInUse: BOOLEAN ← FALSE;  -- mutual exclusion on calls of "Sort" --
  sorterCond: CONDITION;
  Class: TYPE = {normal, other};  -- class = other has priority --
  ClaimSorter: ENTRY PROC [which: Class] =
    BEGIN
    waiters ← waiters + 1;
    IF which = other THEN otherQueueCount ← otherQueueCount + 1;
    WHILE sorterInUse OR (which # other AND otherQueueCount # 0) DO
      WAIT sorterCond ENDLOOP;
    sorterInUse ← TRUE;
    waiters ← waiters - 1;
    END;

  ReleaseSorter: ENTRY PROC [which: Class] =
    BEGIN
    sorterInUse ← FALSE;
    IF which = other THEN otherQueueCount ← otherQueueCount - 1;
    BROADCAST sorterCond;
    END;

  USPS: PROC =
    BEGIN
    DO
      used: CARDINAL;
      handle: SLDefs.SLReadHandle;
      body: HeapDefs.ObjectNumber;
      SL: HeapDefs.ReaderHandle;
      slHeader: SLDefs.SLHeader;
      [handle, body, SL] ← SLDefs.SLStartRead[express];
      [, used] ← HeapDefs.HeapReadData[SL, [@slHeader, SIZE[SLDefs.SLHeader]]];
      IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[];
      HeapDefs.HeapEndRead[SL];
      UNTIL (Time.Current[]-slHeader.created.time) > 60*60 DO
        -- Delay all Express mail at least an hour
        PolicyDefs.Wait[mins: 1]; ENDLOOP;
      PolicyDefs.WaitOperation[readExpress];
      SLDefs.SLTransfer[handle, input];
      PolicyDefs.EndOperation[readExpress];
      LogExpressMotion[slHeader.created];
      ENDLOOP;
    END;

  LogExpressMotion: PROC [stamp: BodyDefs.Timestamp] =
    BEGIN
    log: LONG STRING ← Heap.systemZone.NEW[StringBody[128]];
    ProtocolDefs.AppendTimestamp[log, stamp];
    String.AppendString[log, " moved from Express to Input Queue"L];
    LogDefs.WriteLogEntry[log];
    Heap.systemZone.FREE[@log];
    END;

 -- The synchronization for sitecache flushing and possible remailing is
  -- complicated.  The cause of the complication is that "Deliver" might
  -- have found someone to be local but not placed the message in the
  -- mailbox, at the time that we flush the cache.  This is prevented by
  -- the entry procedures "DeliverLocal" and "FlushCache".

  FlushCacheAndRemail: PUBLIC PROC [who: BodyDefs.RName] =
    BEGIN
    FlushCache: ENTRY PROC = {SiteCacheDefs.SingleFlush[who]};
    info: SiteCacheDefs.RemailInfo;
    FlushCache[];
    info ← SiteCacheDefs.NeedToRemail[who];
    IF info # stillLocal THEN MailboxDefs.Remail[who: who, valid: info = remail];
    END;

  ReadInput1: PUBLIC PROCEDURE =
    BEGIN
    Process.DisableTimeout[@sorterCond];
    bufferPool ← Buffer.MakePool[send: 3, receive: 0];
    LogDefs.DisplayNumber["Local recipients"L, [long[@localTotal]] ];
    LogDefs.DisplayNumber["Remote recipients"L, [long[@remoteTotal]] ];
    LogDefs.DisplayNumber["Bad recipients"L, [long[@badTotal]] ];
    LogDefs.DisplayNumber["Foreign recipients"L, [long[@foreignTotal]] ];
    LogDefs.DisplayNumber["DLs expanded"L, [long[@dlTotal]] ];
    LogDefs.DisplayNumber["Nested DLs skipped"L, [long[@skippedTotal]] ];
    END;


  ReadInput2: PUBLIC PROCEDURE =
    BEGIN
    -- empty queues once synchronously, to get us off the ground
    SortQueue[queue: mailbox];
    SortQueue[queue: input];
    forever ← TRUE;
    Process.Detach[FORK SortQueue[queue: input]];
    Process.Detach[FORK SortQueue[queue: mailbox]];
    Process.Detach[FORK SortQueue[queue: pending]];
    Process.Detach[FORK USPS[]];
    END;

  END.

log:
August 84 - blh:	Klamath update
15-Aug-84 10:20:13 - blh:	disable timeouts for first 6 hours after startup.