-- Transport Mechanism Mail Server - algorithm to process input queue --

-- [Indigo]<Grapevine>MS>ReadInput.mesa

-- Randy Gobbel		19-May-81 23:20:09 --
-- Andrew Birrell	26-Oct-82 14:28:08 --
-- Mike Schroeder       18-Nov-82 11:18:50 --

DIRECTORY
BodyDefs	USING[ maxRNameLength, PackedTime, RName, Timestamp ],
HeapDefs	USING[ HeapReadData, HeapReadRName, HeapEndRead,
		       HeapStartWrite, HeapWriteData, HeapWriteRName,
		       ObjectNumber, ReaderHandle, ReadRList, WriterHandle ],
Inline		USING[ LongCOPY ],
LogDefs		USING[ --DisplayNumber,-- WriteChar, WriteLogEntry ],
MailboxDefs	USING[ MBXWrite, Remail, WaitForUnlocked ],
NameInfoDefs	USING[ Close, Enumerate ],
PolicyDefs	USING[ CheckOperation, EndOperation, ExpressAllowed, Operation,
		       ReadPendingPause, WaitOperation],
PupDefs		USING[ GetFreePupBuffer, PupAddress, PupBuffer,
		       PupRouterSendThis, SetPupContentsWords,
		       UniqueLocalPupAddress ],
Process		USING[ Detach, DisableTimeout, Pause ],
ProtocolDefs	USING[ AppendTimestamp, mailServerPollingSocket ],
RestartDefs	USING[] --EXPORT only--,
ReturnDefs	USING[ BadGroup, BadRecipients, LongTerm ],
ServerDefs	USING[ EnumerateAll, ServerHandle ],
SiteCacheDefs	USING[ FindMBXSite, NeedToRemail, RecipientInfo, RemailInfo,                       SingleFlush ],
SLDefs		USING[ GetCount, SLHeader, SLStartRead, SLEndRead, SLWrite,
		       SLQueue, SLReadHandle, SLTransfer, WaitForNonEmpty ],
String		USING[ AppendChar, AppendDecimal, AppendString,
		       EquivalentStrings ],
Time		USING[ Current, Packed, Unpack];

ReadInput: MONITOR
   IMPORTS HeapDefs, LogDefs, Inline, MailboxDefs, NameInfoDefs, PolicyDefs,
           PupDefs, Process, ProtocolDefs, ReturnDefs, ServerDefs,
           SiteCacheDefs, SLDefs, String, Time
   EXPORTS MailboxDefs, RestartDefs --PROGRAM-- =
BEGIN

-- statistics --
messageTotal: CARDINAL ← 0;
localTotal:   CARDINAL ← 0;
remoteTotal:  CARDINAL ← 0;
badTotal:     CARDINAL ← 0;

-- stats: -- recipientLog: STRING = [150]; -- about 10 recipients --
-- stats: -- 
-- stats: -- NoteRecipient: PROC[name: BodyDefs.RName] =
-- stats: --    BEGIN
-- stats: --    dot: STRING = "..."L;
-- stats: --    IF recipientLog.length + 1 + name.length + 1 + dot.length
-- stats: --          < recipientLog.maxlength
-- stats: --    THEN BEGIN
-- stats: --         String.AppendChar[recipientLog, ' ];
-- stats: --         String.AppendString[recipientLog, name];
-- stats: --         END
-- stats: --    ELSE IF recipientLog.length + 1 + dot.length
-- stats: --               < recipientLog.maxlength
-- stats: --         THEN BEGIN
-- stats: --              String.AppendChar[recipientLog, ' ];
-- stats: --              String.AppendString[recipientLog, dot];
-- stats: --              END;
-- stats: --    END;

-- time-outs --

shortTermLimit: CARDINAL = 2--days--;
longTermLimit:  CARDINAL = 4--days--;

flushSource: PupDefs.PupAddress = PupDefs.UniqueLocalPupAddress[NIL];

TellOtherServer: PROC[received: BodyDefs.Timestamp,
                      name: BodyDefs.RName] =
   BEGIN
   -- name was non-local, so tell the M-Server that gave us the
   -- message to flush it's cache --
   IF received.net # 0 AND received.host # 0
   THEN BEGIN
        b: PupDefs.PupBuffer = PupDefs.GetFreePupBuffer[];
        b.dest ← [ net:[received.net],
                   host:[received.host],
                   socket: ProtocolDefs.mailServerPollingSocket ];
        b.source ← flushSource;
        Inline.LongCOPY[from: name, to: @(b.pupWords),
                        nwords: SIZE[StringBody[name.length]] ];
        PupDefs.SetPupContentsWords[b, SIZE[StringBody[name.length]]];
        b.pupType ← LOOPHOLE[215]; -- to get around difference in Pilot/Alto PupTypes
        PupDefs.PupRouterSendThis[b];
        END
-- ELSE message came to us from a client --;
   END;

Sort: PROCEDURE[handle: SLDefs.SLReadHandle,
                body: HeapDefs.ObjectNumber,
                SL: HeapDefs.ReaderHandle] =
   BEGIN
   -- Despite its name, this procedure has no sorting algorithm as such,
   -- since the delivery sites for a message are determined in a single
   -- pass through its steering list
   pendingSL:    HeapDefs.WriterHandle ← NIL;
   lockedSL:     HeapDefs.WriterHandle ← NIL;
   badList:      HeapDefs.WriterHandle ← NIL;
   slHeader:     SLDefs.SLHeader;
   localCount:   CARDINAL ← 0;
   remoteCount:  CARDINAL ← 0;
   pendingCount: CARDINAL ← 0;
   badCount:     CARDINAL ← 0;
   lockCount:    CARDINAL ← 0;

   MakeBad: PROC[recipient: BodyDefs.RName] RETURNS[done: BOOLEAN] =
      BEGIN
      done ← FALSE;
      IF badList = NIL THEN badList ← HeapDefs.HeapStartWrite[temp];
      HeapDefs.HeapWriteRName[badList, recipient];
      END;

   MakeLocked: PROC[recipient: BodyDefs.RName] =
      BEGIN
      IF lockedSL = NIL
      THEN BEGIN
           lockedSL ← HeapDefs.HeapStartWrite[SLpending];
           slHeader.server ← NIL;
           HeapDefs.HeapWriteData[lockedSL,
                  [@slHeader,SIZE[SLDefs.SLHeader]]];
           END;
      HeapDefs.HeapWriteRName[lockedSL, recipient];
      lockCount ← lockCount + 1;
      END;

   MakePending: PROC[recipient: BodyDefs.RName] =
      BEGIN
      IF pendingSL = NIL
      THEN BEGIN
           pendingSL ← HeapDefs.HeapStartWrite[SLpending];
           slHeader.server ← NIL;
           HeapDefs.HeapWriteData[pendingSL,
                  [@slHeader,SIZE[SLDefs.SLHeader]]];
           END;
      HeapDefs.HeapWriteRName[pendingSL,recipient];
      pendingCount ← pendingCount + 1;
      END;


   HaveIDoneThisOne: SIGNAL[new: BodyDefs.RName] RETURNS[ BOOLEAN ] = CODE;
   -- This signal is used to prevent recursive loops when expanding
   -- recursively defined DL's. --

   Deliver: PROCEDURE[recipient: BodyDefs.RName] RETURNS[done: BOOLEAN] =
      BEGIN -- deliver to one recipient, who may be a group --
      DeliverLocal: ENTRY PROC RETURNS[info: SiteCacheDefs.RecipientInfo] =
         BEGIN
         -- part of the cache-flush synchronization arrangements --
         info ← IF String.EquivalentStrings[recipient, "ExpressMail↑.ms"]
                THEN [notFound[]]
                ELSE SiteCacheDefs.FindMBXSite[recipient];
         WITH info SELECT FROM
           local =>
             IF MailboxDefs.MBXWrite[recipient, body, slHeader.created]
             THEN { LogDefs.WriteChar['L]; localCount ← localCount + 1 }
             ELSE { LogDefs.WriteChar['M]; MakeLocked[recipient] };
           allDown, notFound => NULL;
         ENDCASE => TellOtherServer[slHeader.received, recipient];
         END;
      info: SiteCacheDefs.RecipientInfo = DeliverLocal[];
      done ← FALSE;
      WITH info SELECT FROM
        dl =>
          BEGIN
          oldBadList: HeapDefs.WriterHandle = badList;
          badList ← NIL;
          LogDefs.WriteChar['G];
          IF NOT (SIGNAL HaveIDoneThisOne[recipient])
          THEN NameInfoDefs.Enumerate[members, Deliver !
                  HaveIDoneThisOne =>
                     IF String.EquivalentStrings[recipient, new]
                     THEN RESUME[TRUE]   ];
          IF badList # NIL
          THEN ReturnDefs.BadGroup[badList, body, recipient];
          badList ← oldBadList;
          NameInfoDefs.Close[members];
          END;
        foreign =>
          BEGIN
          LogDefs.WriteChar['X];
          IF NOT (SIGNAL HaveIDoneThisOne[recipient])
          THEN HeapDefs.ReadRList[members, Deliver !
                  HaveIDoneThisOne =>
                     IF String.EquivalentStrings[recipient, new]
                     THEN RESUME[TRUE]   ];
          HeapDefs.HeapEndRead[members];
          END;
        local => NULL -- done by DeliverLocal --;
        found =>
          BEGIN
          LogDefs.WriteChar['F];
          IF server.SL = NIL
          THEN BEGIN
               server.SL ← HeapDefs.HeapStartWrite[SLforward];
               slHeader.server ← server;
               HeapDefs.HeapWriteData[server.SL,
                       [@slHeader,SIZE[SLDefs.SLHeader]]];
               END;
          HeapDefs.HeapWriteRName[server.SL,recipient];
          remoteCount ← remoteCount + 1;
          END;
        allDown =>
          BEGIN
          LogDefs.WriteChar['P];
          MakePending[recipient];
          END;
        notFound =>
          BEGIN
          LogDefs.WriteChar['B];
          [] ← MakeBad[recipient];
          badCount ← badCount + 1;
          END;
      ENDCASE => ERROR;
      -- The loop calling this procedure may use lots of processor time --
      Process.Pause[1];
      END;

   BEGIN
      reject:     BOOLEAN;
      pendingSL ← NIL;
      badList ← NIL;
      localCount ← remoteCount ← pendingCount ← badCount ← lockCount ← 0;
      BEGIN
         BadSL:    ERROR = CODE;
         ended:    BOOLEAN;
         used:     CARDINAL;
         [ended,used] ←
             HeapDefs.HeapReadData[SL, [@slHeader,SIZE[SLDefs.SLHeader]] ];
         IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[];
         reject ← ElapsedDays[slHeader.created.time, longTermLimit]
               OR ElapsedDays[slHeader.received.time, shortTermLimit];
-- stats: -- recipientLog.length ← 0;
-- stats: -- String.AppendString[recipientLog, "RecipientLog "L];
-- stats: -- ProtocolDefs.AppendTimestamp[recipientLog, slHeader.created];
-- stats: -- String.AppendChar[recipientLog, ':];
         UNTIL ended
         DO recipient: BodyDefs.RName = [BodyDefs.maxRNameLength];
            ended ← HeapDefs.HeapReadRName[SL, recipient];
-- stats: -- NoteRecipient[recipient];
            IF reject
            THEN [] ← MakeBad[recipient]
            ELSE [] ← Deliver[recipient !
                         HaveIDoneThisOne => RESUME[FALSE] ];
         ENDLOOP;
      END;
      HeapDefs.HeapEndRead[SL];
      IF reject AND badList # NIL
      THEN BEGIN
           ReturnDefs.LongTerm[badList, body];
           BEGIN
              log: STRING = [128];
              String.AppendString[log, "Long-term timeout on "L];
              ProtocolDefs.AppendTimestamp[log, slHeader.created];
              LogDefs.WriteLogEntry[log];
           END;
           END
      ELSE BEGIN
           IF lockedSL # NIL
           THEN BEGIN
                SLDefs.SLWrite[body, lockedSL, mailbox];
                END;
           IF pendingSL # NIL
           THEN BEGIN
                SLDefs.SLWrite[body, pendingSL, pending];
                END;
           BEGIN
              EndServerSL: PROCEDURE[server: ServerDefs.ServerHandle] =
                 BEGIN
                 -- this is executed inside the ServerInfo monitor --
                 IF server.SL # NIL
                 THEN BEGIN
                      SLDefs.SLWrite[body, server.SL,
                         IF server.type = foreign THEN foreign ELSE forward];
                      server.SL ← NIL;
                      END;
                 END;
              ServerDefs.EnumerateAll[EndServerSL];
           END;
           IF badList # NIL
           THEN ReturnDefs.BadRecipients[badList, body];
           IF pendingCount = 0
           OR localCount # 0 OR remoteCount # 0 OR badCount # 0
           OR lockCount # 0
           THEN BEGIN
                log: STRING = [140];
                called: BOOLEAN ← FALSE;
                Add: PROC[count: CARDINAL, text: STRING] =
                   BEGIN
                   IF count # 0
                   THEN BEGIN
                        String.AppendString[log, ", "L];
                        String.AppendDecimal[log, count];
                        String.AppendString[log, text];
                        called ← TRUE;
                        END;
                   END;
                String.AppendString[log, "Delivered "L];
                ProtocolDefs.AppendTimestamp[log, slHeader.created];
                Add[localCount, " local"L];
                Add[remoteCount, " remote"L];
                Add[pendingCount, " pending"L];
                Add[badCount, " bad"L];
                Add[lockCount, " locked"L];
                IF NOT called
                THEN String.AppendString[log, ", no recipients"L];
                LogDefs.WriteLogEntry[log];
-- stats: --    LogDefs.WriteLogEntry[recipientLog];
                messageTotal ← messageTotal + 1;
                localTotal ← localTotal + localCount;
                remoteTotal ← remoteTotal + remoteCount;
                badTotal ← badTotal + badCount;
                END;
           END;
      SLDefs.SLEndRead[handle];
   END;

   END;

daySecs: LONG CARDINAL = 24 * LONG[60*60];

ElapsedDays: PROC[from: BodyDefs.PackedTime, interval: CARDINAL]
          RETURNS[tooMany: BOOLEAN] =
   -- Determines whether more than "interval" working days have elapsed
   -- since "from".
   BEGIN
   -- For every five working days, we allow seven elapsed days --
   -- For under five working days, must correct for weekends --
   now:           Time.Packed = Time.Current[];
   completeWeeks: CARDINAL = (interval/5)*7;
   partialWeek:   CARDINAL = interval MOD 5;
   limit:         LONG CARDINAL ← from +
                            (completeWeeks+partialWeek) * daySecs;
   IF now < limit
   THEN RETURN[FALSE] -- optimize most calls --
   ELSE BEGIN
        limitDay: CARDINAL = Time.Unpack[LOOPHOLE[limit,Time.Packed]].weekday;
        saturday: CARDINAL = 5;
        sunday:   CARDINAL = 6;
        SELECT limitDay FROM
          saturday =>
            SELECT partialWeek FROM
              0 => NULL; -- any time in the weekend is ok --
            ENDCASE => limit ← limit + 2 * daySecs;
          sunday =>
            SELECT partialWeek FROM
              0 => NULL; -- any time in the weekend is ok --
              1 => -- limit should be set to end of following Monday --
                   limit ← limit + 2 * daySecs;
            ENDCASE => limit ← limit + 2 * daySecs;
        ENDCASE =>
          IF limitDay < partialWeek
          THEN limit ← limit + MIN[partialWeek-limitDay,2] * daySecs;
        RETURN[ now > limit ]
        END
   END;


forever: BOOLEAN ← FALSE;
readPendingRunningTime: LONG CARDINAL ← 10 * 60; --seconds--

SortQueue: PROC[queue: SLDefs.SLQueue] =
   BEGIN
   which: Class = SELECT queue FROM
        input => normal,
        ENDCASE => other;
   operation: PolicyDefs.Operation = SELECT queue FROM
        input => readInput,
        pending => readPending,
        mailbox => readMailbox,
      ENDCASE => ERROR;
   DO exhaustion: LONG CARDINAL;
      IF forever THEN SLDefs.WaitForNonEmpty[queue];
      SELECT queue FROM
        input => NULL;
        pending => PolicyDefs.ReadPendingPause[];
        mailbox => MailboxDefs.WaitForUnlocked[];
      ENDCASE => ERROR;
      ClaimSorter[which];
      exhaustion ← Time.Current[] + readPendingRunningTime;
      THROUGH [1..SLDefs.GetCount[queue]]
      DO SL:     HeapDefs.ReaderHandle;
         handle: SLDefs.SLReadHandle;
         body:   HeapDefs.ObjectNumber;
         IF which # other OR NOT PolicyDefs.CheckOperation[operation]
         THEN BEGIN
              ReleaseSorter[which];
              IF forever
              THEN PolicyDefs.WaitOperation[operation]
              ELSE { IF NOT PolicyDefs.CheckOperation[operation] THEN RETURN };
              ClaimSorter[which];
              END;
         [handle, body, SL] ← SLDefs.SLStartRead[queue];
         Sort[handle, body, SL];
         PolicyDefs.EndOperation[operation];
         IF queue = input THEN ConsiderExpress[];
         IF queue = pending
         AND Time.Current[] > exhaustion
         AND waiters # 0
         THEN EXIT;
      ENDLOOP;
      ReleaseSorter[which];
      IF NOT forever THEN EXIT;
   ENDLOOP;
   END;


otherQueueCount: CARDINAL ← 0; -- give priority to non input queues --
waiters: CARDINAL ← 0; -- number of processes wanting the sorter --
sorterInUse: BOOLEAN ← FALSE; -- mutual exclusion on calls of "Sort" --
sorterCond: CONDITION;
Class: TYPE = { normal, other }; -- class = other has priority --

ClaimSorter: ENTRY PROC[which: Class] =
   BEGIN
   waiters ← waiters+1;
   IF which = other THEN otherQueueCount ← otherQueueCount+1;
   WHILE sorterInUse
   OR ( which # other AND otherQueueCount # 0 )
   DO WAIT sorterCond ENDLOOP;
   sorterInUse ← TRUE;
   waiters ← waiters-1;
   END;

ReleaseSorter: ENTRY PROC[which: Class] =
   BEGIN
   sorterInUse ← FALSE;
   IF which = other THEN otherQueueCount ← otherQueueCount-1;
   BROADCAST sorterCond;
   END;

pony: CONDITION ← [timeout:0];

USPS: PROC =
   BEGIN
   DO handle: SLDefs.SLReadHandle;
      body: HeapDefs.ObjectNumber;
      SL: HeapDefs.ReaderHandle;
      SLDefs.WaitForNonEmpty[express];
      CheckExpress[];
      PolicyDefs.WaitOperation[readExpress];
      IF PolicyDefs.ExpressAllowed[SLDefs.GetCount[input]]
      THEN BEGIN
           [handle, body, SL] ← SLDefs.SLStartRead[express];
           HeapDefs.HeapEndRead[SL];
           SLDefs.SLTransfer[handle, input];
           END;
      PolicyDefs.EndOperation[readExpress];
   ENDLOOP;
   END;

CheckExpress: ENTRY PROC = INLINE
   BEGIN
   UNTIL PolicyDefs.ExpressAllowed[SLDefs.GetCount[input]]
   DO WAIT pony ENDLOOP;
   END;

ConsiderExpress: ENTRY PROC = INLINE
   { NOTIFY pony };

-- The synchronization for sitecache flushing and possible remailing is
-- complicated.  The cause of the complication is that "Deliver" might
-- have found someone to be local but not placed the message in the
-- mailbox, at the time that we flush the cache.  This is prevented by
-- the entry procedures "DeliverLocal" and "FlushCache".

FlushCacheAndRemail: PUBLIC PROC[ who: BodyDefs.RName ] =
   BEGIN
   FlushCache: ENTRY PROC =
      { SiteCacheDefs.SingleFlush[who] };
   info: SiteCacheDefs.RemailInfo;
   FlushCache[];
   info ← SiteCacheDefs.NeedToRemail[who];
   IF info # stillLocal
   THEN MailboxDefs.Remail[who: who, valid: info = remail];
   END;


StartLog: PROCEDURE =
   BEGIN
--   LogDefs.DisplayNumber["Local recipients"L, [short[@localTotal]] ];
--   LogDefs.DisplayNumber["Remote recipients"L, [short[@remoteTotal]] ];
--   LogDefs.DisplayNumber["Bad recipients"L, [short[@badTotal]] ];
   END;

Process.DisableTimeout[@sorterCond];

StartLog[];

-- empty queues once synchronously, to get us off the ground --
SortQueue[queue: mailbox];
SortQueue[queue: input];

forever ← TRUE;

Process.Detach[ FORK SortQueue[queue: input] ];
Process.Detach[ FORK SortQueue[queue: mailbox] ];
Process.Detach[ FORK SortQueue[queue: pending] ];
Process.Detach[ FORK USPS[] ];

END.