-- Copyright (C) 1981, 1982, 1984, 1985 by Xerox Corporation. All rights reserved. -- ReadInput.mesa, Transport Mechanism Mail Server - algorithm to process input queue -- -- HGM, 21-May-85 23:57:55 -- Randy Gobbel 19-May-81 23:20:09 -- -- Andrew Birrell 30-Dec-81 15:25:29 -- -- Mark Johnson 7-Jan-82 15:28:38 -- -- Ted Wobber 2-Nov-82 11:08:23 -- Brenda Hankins 15-Aug-84 16:19:11 DIRECTORY BodyDefs USING [maxRNameLength, PackedTime, RName, Timestamp], Buffer USING [AccessHandle, GetBuffer, MakePool], HeapDefs USING [ HeapReadData, HeapReadRName, HeapEndRead, HeapStartWrite, HeapWriteData, HeapWriteRName, ObjectNumber, ReaderHandle, ReadRList, WriterHandle], Inline USING [LongCOPY], LogDefs USING [DisplayNumber, WriteChar, WriteLogEntry], LogPrivateDefs USING [startUpTime], MailboxDefs USING [MBXWrite, Remail, WaitForUnlocked], NameInfoDefs USING [Close, Enumerate], PolicyDefs USING [ CheckOperation, EndOperation, Operation, ReadPendingPause, Wait, WaitOperation], PupDefs USING [ PupAddress, PupBuffer, PupRouterSendThis, SetPupContentsWords, UniqueLocalPupAddress], Process USING [Detach, DisableTimeout, Pause], ProtocolDefs USING [AppendTimestamp, mailServerPollingSocket], RestartDefs USING [] --EXPORT -- , ReturnDefs USING [BadGroup, BadRecipients, LongTerm], ServerDefs USING [EnumerateAll, ServerHandle], SiteCacheDefs USING [ FindMBXSite, NeedToRemail, RecipientInfo, RemailInfo, SingleFlush], SLDefs USING [ GetCount, SLHeader, SLStartRead, SLEndRead, SLWrite, SLQueue, SLReadHandle, SLTransfer, WaitForNonEmpty], String USING [AppendChar, AppendDecimal, AppendString, AppendLongDecimal, EquivalentStrings], Time USING [Current, Packed, Unpack]; ReadInput: MONITOR IMPORTS Buffer, HeapDefs, LogDefs, LogPrivateDefs, Inline, MailboxDefs, NameInfoDefs, PolicyDefs, PupDefs, Process, ProtocolDefs, ReturnDefs, ServerDefs, SiteCacheDefs, SLDefs, String, Time EXPORTS MailboxDefs, RestartDefs = BEGIN BadSL: ERROR = CODE; -- statistics -- messageTotal, localTotal, remoteTotal, badTotal: LONG CARDINAL ← 0; foreignTotal, dlTotal, skippedTotal: LONG CARDINAL ← 0; recipientLog: STRING = [150]; -- about 10 recipients -- NoteRecipient: PROC [name: BodyDefs.RName] = BEGIN dot: STRING = "..."L; IF recipientLog.length + 1 + name.length + 1 + dot.length < recipientLog.maxlength THEN BEGIN String.AppendChar[recipientLog, ' ]; String.AppendString[recipientLog, name]; END ELSE IF recipientLog.length + 1 + dot.length < recipientLog.maxlength THEN BEGIN String.AppendChar[recipientLog, ' ]; String.AppendString[recipientLog, dot]; END; END; -- time-outs -- shortTermLimit: CARDINAL = 2 --days-- ; longTermLimit: CARDINAL = 4 --days-- ; timeoutDisabled: BOOLEAN ← TRUE; bufferPool: Buffer.AccessHandle; flushSource: PupDefs.PupAddress = PupDefs.UniqueLocalPupAddress[NIL]; TellOtherServer: PROC [received: BodyDefs.Timestamp, name: BodyDefs.RName] = BEGIN -- non-local name, tell the M-Server that gave us the msg to flush it's cache IF received.net # 0 AND received.host # 0 THEN BEGIN b: PupDefs.PupBuffer = Buffer.GetBuffer[ type: pup, aH: bufferPool, function: send]; b.pup.dest ← [ net: [received.net], host: [received.host], socket: ProtocolDefs.mailServerPollingSocket]; b.pup.source ← flushSource; Inline.LongCOPY[ from: name, to: @(b.pup.pupWords), nwords: SIZE[StringBody [name.length]]]; PupDefs.SetPupContentsWords[b, SIZE[StringBody [name.length]]]; b.pup.pupType ← LOOPHOLE[215B]; PupDefs.PupRouterSendThis[b]; END -- ELSE message came to us from a client -- ; END; Sort: PROCEDURE [ handle: SLDefs.SLReadHandle, body: HeapDefs.ObjectNumber, SL: HeapDefs.ReaderHandle] = BEGIN -- Despite its name, this procedure has no sorting algorithm as such, -- since the delivery sites for a message are determined in a single -- pass through its steering list pendingSL: HeapDefs.WriterHandle ← NIL; lockedSL: HeapDefs.WriterHandle ← NIL; badList: HeapDefs.WriterHandle ← NIL; slHeader: SLDefs.SLHeader; localCount, remoteCount, pendingCount, badCount, lockCount: CARDINAL ← 0; foreignCount, dlCount, skippedCount: CARDINAL ← 0; MakeBad: PROC [recipient: BodyDefs.RName] = BEGIN IF badList = NIL THEN badList ← HeapDefs.HeapStartWrite[temp]; HeapDefs.HeapWriteRName[badList, recipient]; END; MakeLocked: PROC [recipient: BodyDefs.RName] = BEGIN IF lockedSL = NIL THEN BEGIN lockedSL ← HeapDefs.HeapStartWrite[SLpending]; slHeader.server ← NIL; HeapDefs.HeapWriteData[lockedSL, [@slHeader, SIZE[SLDefs.SLHeader]]]; END; HeapDefs.HeapWriteRName[lockedSL, recipient]; lockCount ← lockCount + 1; END; MakePending: PROC [recipient: BodyDefs.RName] = BEGIN IF pendingSL = NIL THEN BEGIN pendingSL ← HeapDefs.HeapStartWrite[SLpending]; slHeader.server ← NIL; HeapDefs.HeapWriteData[pendingSL, [@slHeader, SIZE[SLDefs.SLHeader]]]; END; HeapDefs.HeapWriteRName[pendingSL, recipient]; pendingCount ← pendingCount + 1; END; HaveIDoneThisOne: SIGNAL [new: BodyDefs.RName] RETURNS [BOOLEAN] = CODE; -- This signal is used to prevent recursive loops when expanding -- recursively defined DL's. -- Deliver: PROCEDURE [recipient: BodyDefs.RName] RETURNS [done: BOOLEAN] = BEGIN -- deliver to one recipient, who may be a group -- DeliverLocal: ENTRY PROC RETURNS [info: SiteCacheDefs.RecipientInfo] = BEGIN -- part of the cache-flush synchronization arrangements -- info ← IF String.EquivalentStrings[recipient, "ExpressMail↑.ms"] THEN [ notFound[]] ELSE SiteCacheDefs.FindMBXSite[recipient]; WITH info SELECT FROM local => IF MailboxDefs.MBXWrite[recipient, body, slHeader.created] THEN { LogDefs.WriteChar['L]; localCount ← localCount + 1} ELSE {LogDefs.WriteChar['M]; MakeLocked[recipient]}; allDown, notFound => NULL; ENDCASE => TellOtherServer[slHeader.received, recipient]; END; info: SiteCacheDefs.RecipientInfo = DeliverLocal[]; done ← FALSE; WITH info SELECT FROM dl => BEGIN oldBadList: HeapDefs.WriterHandle = badList; badList ← NIL; LogDefs.WriteChar['G]; dlCount ← dlCount + 1; IF ~(SIGNAL HaveIDoneThisOne[recipient]) THEN BEGIN NameInfoDefs.Enumerate[ members, Deliver ! HaveIDoneThisOne => IF String.EquivalentStrings[recipient, new] THEN RESUME [TRUE]]; END ELSE skippedCount ← skippedCount + 1; IF badList # NIL THEN ReturnDefs.BadGroup[badList, body, recipient]; badList ← oldBadList; NameInfoDefs.Close[members]; END; foreign => BEGIN LogDefs.WriteChar['X]; IF ~(SIGNAL HaveIDoneThisOne[recipient]) THEN BEGIN HeapDefs.ReadRList[ members, Deliver ! HaveIDoneThisOne => IF String.EquivalentStrings[recipient, new] THEN RESUME [TRUE]]; END ELSE skippedCount ← skippedCount + 1; HeapDefs.HeapEndRead[members]; foreignCount ← foreignCount + 1; END; local => NULL -- done by DeliverLocal -- ; found => BEGIN LogDefs.WriteChar['F]; IF server.SL = NIL THEN BEGIN server.SL ← HeapDefs.HeapStartWrite[SLforward]; slHeader.server ← server; HeapDefs.HeapWriteData[server.SL, [@slHeader, SIZE[SLDefs.SLHeader]]]; END; HeapDefs.HeapWriteRName[server.SL, recipient]; remoteCount ← remoteCount + 1; END; allDown => {LogDefs.WriteChar['P]; MakePending[recipient]}; notFound => BEGIN LogDefs.WriteChar['B]; MakeBad[recipient]; badCount ← badCount + 1; END; ENDCASE => ERROR; -- The loop calling this procedure may use lots of processor time: Process.Pause[1]; END; BEGIN startTime: Time.Packed ← Time.Current[]; reject: BOOLEAN; pendingSL ← NIL; badList ← NIL; BEGIN ended: BOOLEAN; used: CARDINAL; [ended, used] ← HeapDefs.HeapReadData[SL, [@slHeader, SIZE[SLDefs.SLHeader]]]; IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[]; reject ← ElapsedDays[LOOPHOLE[slHeader.created.time], longTermLimit] OR ElapsedDays[LOOPHOLE[slHeader.received.time], shortTermLimit]; recipientLog.length ← 0; String.AppendString[recipientLog, "RecipientLog "L]; ProtocolDefs.AppendTimestamp[recipientLog, slHeader.created]; String.AppendChar[recipientLog, ':]; UNTIL ended DO recipient: BodyDefs.RName = [BodyDefs.maxRNameLength]; ended ← HeapDefs.HeapReadRName[SL, recipient]; NoteRecipient[recipient]; IF reject THEN MakeBad[recipient] ELSE [] ← Deliver[recipient ! HaveIDoneThisOne => RESUME [FALSE]]; ENDLOOP; END; HeapDefs.HeapEndRead[SL]; IF reject AND badList # NIL THEN BEGIN log: STRING = [128]; ReturnDefs.LongTerm[badList, body]; String.AppendString[log, "Long-term timeout on "L]; ProtocolDefs.AppendTimestamp[log, slHeader.created]; LogDefs.WriteLogEntry[log]; END ELSE BEGIN EndServerSL: PROCEDURE [server: ServerDefs.ServerHandle] = BEGIN -- this is executed inside the ServerInfo monitor -- IF server.SL # NIL THEN BEGIN SLDefs.SLWrite[body, server.SL, forward]; server.SL ← NIL; END; END; IF lockedSL # NIL THEN SLDefs.SLWrite[body, lockedSL, mailbox]; IF pendingSL # NIL THEN SLDefs.SLWrite[body, pendingSL, pending]; ServerDefs.EnumerateAll[EndServerSL]; IF badList # NIL THEN ReturnDefs.BadRecipients[badList, body]; IF pendingCount = 0 OR localCount # 0 OR remoteCount # 0 OR badCount # 0 OR lockCount # 0 THEN BEGIN log: STRING = [180]; called: BOOLEAN ← FALSE; Add: PROC [count: CARDINAL, text: STRING] = BEGIN IF count # 0 THEN BEGIN String.AppendString[log, ", "L]; String.AppendDecimal[log, count]; String.AppendString[log, text]; called ← TRUE; END; END; String.AppendString[log, "Delivered "L]; ProtocolDefs.AppendTimestamp[log, slHeader.created]; Add[localCount, " local"L]; Add[remoteCount, " remote"L]; Add[pendingCount, " pending"L]; Add[badCount, " bad"L]; Add[lockCount, " locked"L]; Add[foreignCount, " foreign"L]; Add[dlCount, " DLs"L]; Add[skippedCount, " DLs skipped"L]; IF ~called THEN String.AppendString[log, ", no recipients"L]; LogDefs.WriteLogEntry[log]; LogDefs.WriteLogEntry[recipientLog]; messageTotal ← messageTotal + 1; localTotal ← localTotal + localCount; remoteTotal ← remoteTotal + remoteCount; badTotal ← badTotal + badCount; foreignTotal ← foreignTotal + foreignCount; dlTotal ← dlTotal + dlCount; skippedTotal ← skippedTotal + skippedCount; END; END; SLDefs.SLEndRead[handle]; LogFinished[slHeader.created, startTime]; END; END; -- Sort LogFinished: PROC [stamp: BodyDefs.Timestamp, startTime: Time.Packed] = BEGIN seconds: LONG CARDINAL ← Time.Current[]-startTime; log: STRING = [128]; IF seconds < 30 THEN RETURN; String.AppendString[log, "Processing "L]; ProtocolDefs.AppendTimestamp[log, stamp]; String.AppendString[log, " took "L]; String.AppendLongDecimal[log, seconds]; String.AppendString[log, " seconds"L]; LogDefs.WriteLogEntry[log]; END; daySecs: LONG CARDINAL = 24 * LONG[60 * 60]; ElapsedDays: PROC [from: BodyDefs.PackedTime, interval: CARDINAL] RETURNS [tooMany: BOOLEAN] = -- Determines whether more than "interval" working days have elapsed -- since "from". BEGIN -- For every five working days, we allow seven elapsed days -- -- For under five working days, must correct for weekends -- now: Time.Packed = Time.Current[]; completeWeeks: CARDINAL = (interval / 5) * 7; partialWeek: CARDINAL = interval MOD 5; limit: LONG CARDINAL ← from + (completeWeeks + partialWeek) * daySecs; IF timeoutDisabled OR now < limit THEN RETURN[FALSE] -- optimize most calls ELSE BEGIN limitDay: CARDINAL = Time.Unpack[LOOPHOLE[limit, Time.Packed]].weekday; saturday: CARDINAL = 5; sunday: CARDINAL = 6; SELECT limitDay FROM saturday => SELECT partialWeek FROM 0 => NULL; -- any time in the weekend is ok -- ENDCASE => limit ← limit + 2 * daySecs; sunday => SELECT partialWeek FROM 0 => NULL; -- any time in the weekend is ok -- 1 => -- limit should be set to end of following Monday -- limit ← limit + 2 * daySecs; ENDCASE => limit ← limit + 2 * daySecs; ENDCASE => IF limitDay < partialWeek THEN limit ← limit + MIN[partialWeek - limitDay, 2] * daySecs; RETURN[now > limit] END END; SeeIfTimeToStartTimeouts: PROCEDURE = BEGIN -- check uptime to see if greater than 6 hours. timeoutDisabled ← Time.Current[] - LogPrivateDefs.startUpTime < 6 * 60 * 60 END; forever: BOOLEAN ← FALSE; SortQueue: PROC [queue: SLDefs.SLQueue] = BEGIN which: Class = SELECT queue FROM input => normal, ENDCASE => other; operation: PolicyDefs.Operation = SELECT queue FROM input => readInput, pending => readPending, mailbox => readMailbox, ENDCASE => ERROR; DO IF timeoutDisabled THEN SeeIfTimeToStartTimeouts[]; IF forever THEN SLDefs.WaitForNonEmpty[queue]; SELECT queue FROM input => NULL; pending => PolicyDefs.ReadPendingPause[]; mailbox => MailboxDefs.WaitForUnlocked[]; ENDCASE => ERROR; ClaimSorter[which]; THROUGH [0..SLDefs.GetCount[queue]) DO SL: HeapDefs.ReaderHandle; handle: SLDefs.SLReadHandle; body: HeapDefs.ObjectNumber; IF queue = input OR ~PolicyDefs.CheckOperation[operation] THEN BEGIN ReleaseSorter[which]; IF forever THEN PolicyDefs.WaitOperation[operation] ELSE {IF ~PolicyDefs.CheckOperation[operation] THEN RETURN}; ClaimSorter[which]; END; [handle, body, SL] ← SLDefs.SLStartRead[queue]; Sort[handle, body, SL]; PolicyDefs.EndOperation[operation]; ENDLOOP; ReleaseSorter[which]; IF ~forever THEN EXIT; ENDLOOP; END; otherQueueCount: CARDINAL ← 0; -- give priority to non input queues -- waiters: CARDINAL ← 0; -- number of processes wanting the sorter -- sorterInUse: BOOLEAN ← FALSE; -- mutual exclusion on calls of "Sort" -- sorterCond: CONDITION; Class: TYPE = {normal, other}; -- class = other has priority -- ClaimSorter: ENTRY PROC [which: Class] = BEGIN waiters ← waiters + 1; IF which = other THEN otherQueueCount ← otherQueueCount + 1; WHILE sorterInUse OR (which # other AND otherQueueCount # 0) DO WAIT sorterCond ENDLOOP; sorterInUse ← TRUE; waiters ← waiters - 1; END; ReleaseSorter: ENTRY PROC [which: Class] = BEGIN sorterInUse ← FALSE; IF which = other THEN otherQueueCount ← otherQueueCount - 1; BROADCAST sorterCond; END; USPS: PROC = BEGIN DO used: CARDINAL; handle: SLDefs.SLReadHandle; body: HeapDefs.ObjectNumber; SL: HeapDefs.ReaderHandle; slHeader: SLDefs.SLHeader; [handle, body, SL] ← SLDefs.SLStartRead[express]; [, used] ← HeapDefs.HeapReadData[SL, [@slHeader, SIZE[SLDefs.SLHeader]]]; IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[]; HeapDefs.HeapEndRead[SL]; UNTIL (Time.Current[]-slHeader.created.time) > 60*60 DO -- Delay all Express mail at least an hour PolicyDefs.Wait[mins: 1]; ENDLOOP; PolicyDefs.WaitOperation[readExpress]; SLDefs.SLTransfer[handle, input]; PolicyDefs.EndOperation[readExpress]; LogExpressMotion[slHeader.created]; ENDLOOP; END; LogExpressMotion: PROC [stamp: BodyDefs.Timestamp] = BEGIN log: STRING = [128]; ProtocolDefs.AppendTimestamp[log, stamp]; String.AppendString[log, " moved from Express to Input Queue"L]; LogDefs.WriteLogEntry[log]; END; -- The synchronization for sitecache flushing and possible remailing is -- complicated. The cause of the complication is that "Deliver" might -- have found someone to be local but not placed the message in the -- mailbox, at the time that we flush the cache. This is prevented by -- the entry procedures "DeliverLocal" and "FlushCache". FlushCacheAndRemail: PUBLIC PROC [who: BodyDefs.RName] = BEGIN FlushCache: ENTRY PROC = {SiteCacheDefs.SingleFlush[who]}; info: SiteCacheDefs.RemailInfo; FlushCache[]; info ← SiteCacheDefs.NeedToRemail[who]; IF info # stillLocal THEN MailboxDefs.Remail[who: who, valid: info = remail]; END; ReadInput1: PUBLIC PROCEDURE = BEGIN Process.DisableTimeout[@sorterCond]; bufferPool ← Buffer.MakePool[send: 3, receive: 0]; LogDefs.DisplayNumber["Local recipients"L, [long[@localTotal]] ]; LogDefs.DisplayNumber["Remote recipients"L, [long[@remoteTotal]] ]; LogDefs.DisplayNumber["Bad recipients"L, [long[@badTotal]] ]; LogDefs.DisplayNumber["Foreign recipients"L, [long[@foreignTotal]] ]; LogDefs.DisplayNumber["DLs expanded"L, [long[@dlTotal]] ]; LogDefs.DisplayNumber["Nested DLs skipped"L, [long[@skippedTotal]] ]; END; ReadInput2: PUBLIC PROCEDURE = BEGIN -- empty queues once synchronously, to get us off the ground SortQueue[queue: mailbox]; SortQueue[queue: input]; forever ← TRUE; Process.Detach[FORK SortQueue[queue: input]]; Process.Detach[FORK SortQueue[queue: mailbox]]; Process.Detach[FORK SortQueue[queue: pending]]; Process.Detach[FORK USPS[]]; END; END. log: August 84 - blh: Klamath update 15-Aug-84 10:20:13 - blh: disable timeouts for first 6 hours after startup.