-- Transport Mechanism Mail Server - algorithm to process input queue -- -- [Indigo]MS>ReadInput.mesa -- Randy Gobbel 19-May-81 23:20:09 -- -- Andrew Birrell 26-Oct-82 14:28:08 -- -- Mike Schroeder 18-Nov-82 11:18:50 -- DIRECTORY BodyDefs USING[ maxRNameLength, PackedTime, RName, Timestamp ], HeapDefs USING[ HeapReadData, HeapReadRName, HeapEndRead, HeapStartWrite, HeapWriteData, HeapWriteRName, ObjectNumber, ReaderHandle, ReadRList, WriterHandle ], Inline USING[ LongCOPY ], LogDefs USING[ --DisplayNumber,-- WriteChar, WriteLogEntry ], MailboxDefs USING[ MBXWrite, Remail, WaitForUnlocked ], NameInfoDefs USING[ Close, Enumerate ], PolicyDefs USING[ CheckOperation, EndOperation, ExpressAllowed, Operation, ReadPendingPause, WaitOperation], PupDefs USING[ GetFreePupBuffer, PupAddress, PupBuffer, PupRouterSendThis, SetPupContentsWords, UniqueLocalPupAddress ], Process USING[ Detach, DisableTimeout, Pause ], ProtocolDefs USING[ AppendTimestamp, mailServerPollingSocket ], RestartDefs USING[] --EXPORT only--, ReturnDefs USING[ BadGroup, BadRecipients, LongTerm ], ServerDefs USING[ EnumerateAll, ServerHandle ], SiteCacheDefs USING[ FindMBXSite, NeedToRemail, RecipientInfo, RemailInfo, SingleFlush ], SLDefs USING[ GetCount, SLHeader, SLStartRead, SLEndRead, SLWrite, SLQueue, SLReadHandle, SLTransfer, WaitForNonEmpty ], String USING[ AppendChar, AppendDecimal, AppendString, EquivalentStrings ], Time USING[ Current, Packed, Unpack]; ReadInput: MONITOR IMPORTS HeapDefs, LogDefs, Inline, MailboxDefs, NameInfoDefs, PolicyDefs, PupDefs, Process, ProtocolDefs, ReturnDefs, ServerDefs, SiteCacheDefs, SLDefs, String, Time EXPORTS MailboxDefs, RestartDefs --PROGRAM-- = BEGIN -- statistics -- messageTotal: CARDINAL _ 0; localTotal: CARDINAL _ 0; remoteTotal: CARDINAL _ 0; badTotal: CARDINAL _ 0; -- stats: -- recipientLog: STRING = [150]; -- about 10 recipients -- -- stats: -- -- stats: -- NoteRecipient: PROC[name: BodyDefs.RName] = -- stats: -- BEGIN -- stats: -- dot: STRING = "..."L; -- stats: -- IF recipientLog.length + 1 + name.length + 1 + dot.length -- stats: -- < recipientLog.maxlength -- stats: -- THEN BEGIN -- stats: -- String.AppendChar[recipientLog, ' ]; -- stats: -- String.AppendString[recipientLog, name]; -- stats: -- END -- stats: -- ELSE IF recipientLog.length + 1 + dot.length -- stats: -- < recipientLog.maxlength -- stats: -- THEN BEGIN -- stats: -- String.AppendChar[recipientLog, ' ]; -- stats: -- String.AppendString[recipientLog, dot]; -- stats: -- END; -- stats: -- END; -- time-outs -- shortTermLimit: CARDINAL = 2--days--; longTermLimit: CARDINAL = 4--days--; flushSource: PupDefs.PupAddress = PupDefs.UniqueLocalPupAddress[NIL]; TellOtherServer: PROC[received: BodyDefs.Timestamp, name: BodyDefs.RName] = BEGIN -- name was non-local, so tell the M-Server that gave us the -- message to flush it's cache -- IF received.net # 0 AND received.host # 0 THEN BEGIN b: PupDefs.PupBuffer = PupDefs.GetFreePupBuffer[]; b.dest _ [ net:[received.net], host:[received.host], socket: ProtocolDefs.mailServerPollingSocket ]; b.source _ flushSource; Inline.LongCOPY[from: name, to: @(b.pupWords), nwords: SIZE[StringBody[name.length]] ]; PupDefs.SetPupContentsWords[b, SIZE[StringBody[name.length]]]; b.pupType _ LOOPHOLE[215]; -- to get around difference in Pilot/Alto PupTypes PupDefs.PupRouterSendThis[b]; END -- ELSE message came to us from a client --; END; Sort: PROCEDURE[handle: SLDefs.SLReadHandle, body: HeapDefs.ObjectNumber, SL: HeapDefs.ReaderHandle] = BEGIN -- Despite its name, this procedure has no sorting algorithm as such, -- since the delivery sites for a message are determined in a single -- pass through its steering list pendingSL: HeapDefs.WriterHandle _ NIL; lockedSL: HeapDefs.WriterHandle _ NIL; badList: HeapDefs.WriterHandle _ NIL; slHeader: SLDefs.SLHeader; localCount: CARDINAL _ 0; remoteCount: CARDINAL _ 0; pendingCount: CARDINAL _ 0; badCount: CARDINAL _ 0; lockCount: CARDINAL _ 0; MakeBad: PROC[recipient: BodyDefs.RName] RETURNS[done: BOOLEAN] = BEGIN done _ FALSE; IF badList = NIL THEN badList _ HeapDefs.HeapStartWrite[temp]; HeapDefs.HeapWriteRName[badList, recipient]; END; MakeLocked: PROC[recipient: BodyDefs.RName] = BEGIN IF lockedSL = NIL THEN BEGIN lockedSL _ HeapDefs.HeapStartWrite[SLpending]; slHeader.server _ NIL; HeapDefs.HeapWriteData[lockedSL, [@slHeader,SIZE[SLDefs.SLHeader]]]; END; HeapDefs.HeapWriteRName[lockedSL, recipient]; lockCount _ lockCount + 1; END; MakePending: PROC[recipient: BodyDefs.RName] = BEGIN IF pendingSL = NIL THEN BEGIN pendingSL _ HeapDefs.HeapStartWrite[SLpending]; slHeader.server _ NIL; HeapDefs.HeapWriteData[pendingSL, [@slHeader,SIZE[SLDefs.SLHeader]]]; END; HeapDefs.HeapWriteRName[pendingSL,recipient]; pendingCount _ pendingCount + 1; END; HaveIDoneThisOne: SIGNAL[new: BodyDefs.RName] RETURNS[ BOOLEAN ] = CODE; -- This signal is used to prevent recursive loops when expanding -- recursively defined DL's. -- Deliver: PROCEDURE[recipient: BodyDefs.RName] RETURNS[done: BOOLEAN] = BEGIN -- deliver to one recipient, who may be a group -- DeliverLocal: ENTRY PROC RETURNS[info: SiteCacheDefs.RecipientInfo] = BEGIN -- part of the cache-flush synchronization arrangements -- info _ IF String.EquivalentStrings[recipient, "ExpressMail^.ms"] THEN [notFound[]] ELSE SiteCacheDefs.FindMBXSite[recipient]; WITH info SELECT FROM local => IF MailboxDefs.MBXWrite[recipient, body, slHeader.created] THEN { LogDefs.WriteChar['L]; localCount _ localCount + 1 } ELSE { LogDefs.WriteChar['M]; MakeLocked[recipient] }; allDown, notFound => NULL; ENDCASE => TellOtherServer[slHeader.received, recipient]; END; info: SiteCacheDefs.RecipientInfo = DeliverLocal[]; done _ FALSE; WITH info SELECT FROM dl => BEGIN oldBadList: HeapDefs.WriterHandle = badList; badList _ NIL; LogDefs.WriteChar['G]; IF NOT (SIGNAL HaveIDoneThisOne[recipient]) THEN NameInfoDefs.Enumerate[members, Deliver ! HaveIDoneThisOne => IF String.EquivalentStrings[recipient, new] THEN RESUME[TRUE] ]; IF badList # NIL THEN ReturnDefs.BadGroup[badList, body, recipient]; badList _ oldBadList; NameInfoDefs.Close[members]; END; foreign => BEGIN LogDefs.WriteChar['X]; IF NOT (SIGNAL HaveIDoneThisOne[recipient]) THEN HeapDefs.ReadRList[members, Deliver ! HaveIDoneThisOne => IF String.EquivalentStrings[recipient, new] THEN RESUME[TRUE] ]; HeapDefs.HeapEndRead[members]; END; local => NULL -- done by DeliverLocal --; found => BEGIN LogDefs.WriteChar['F]; IF server.SL = NIL THEN BEGIN server.SL _ HeapDefs.HeapStartWrite[SLforward]; slHeader.server _ server; HeapDefs.HeapWriteData[server.SL, [@slHeader,SIZE[SLDefs.SLHeader]]]; END; HeapDefs.HeapWriteRName[server.SL,recipient]; remoteCount _ remoteCount + 1; END; allDown => BEGIN LogDefs.WriteChar['P]; MakePending[recipient]; END; notFound => BEGIN LogDefs.WriteChar['B]; [] _ MakeBad[recipient]; badCount _ badCount + 1; END; ENDCASE => ERROR; -- The loop calling this procedure may use lots of processor time -- Process.Pause[1]; END; BEGIN reject: BOOLEAN; pendingSL _ NIL; badList _ NIL; localCount _ remoteCount _ pendingCount _ badCount _ lockCount _ 0; BEGIN BadSL: ERROR = CODE; ended: BOOLEAN; used: CARDINAL; [ended,used] _ HeapDefs.HeapReadData[SL, [@slHeader,SIZE[SLDefs.SLHeader]] ]; IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[]; reject _ ElapsedDays[slHeader.created.time, longTermLimit] OR ElapsedDays[slHeader.received.time, shortTermLimit]; -- stats: -- recipientLog.length _ 0; -- stats: -- String.AppendString[recipientLog, "RecipientLog "L]; -- stats: -- ProtocolDefs.AppendTimestamp[recipientLog, slHeader.created]; -- stats: -- String.AppendChar[recipientLog, ':]; UNTIL ended DO recipient: BodyDefs.RName = [BodyDefs.maxRNameLength]; ended _ HeapDefs.HeapReadRName[SL, recipient]; -- stats: -- NoteRecipient[recipient]; IF reject THEN [] _ MakeBad[recipient] ELSE [] _ Deliver[recipient ! HaveIDoneThisOne => RESUME[FALSE] ]; ENDLOOP; END; HeapDefs.HeapEndRead[SL]; IF reject AND badList # NIL THEN BEGIN ReturnDefs.LongTerm[badList, body]; BEGIN log: STRING = [128]; String.AppendString[log, "Long-term timeout on "L]; ProtocolDefs.AppendTimestamp[log, slHeader.created]; LogDefs.WriteLogEntry[log]; END; END ELSE BEGIN IF lockedSL # NIL THEN BEGIN SLDefs.SLWrite[body, lockedSL, mailbox]; END; IF pendingSL # NIL THEN BEGIN SLDefs.SLWrite[body, pendingSL, pending]; END; BEGIN EndServerSL: PROCEDURE[server: ServerDefs.ServerHandle] = BEGIN -- this is executed inside the ServerInfo monitor -- IF server.SL # NIL THEN BEGIN SLDefs.SLWrite[body, server.SL, IF server.type = foreign THEN foreign ELSE forward]; server.SL _ NIL; END; END; ServerDefs.EnumerateAll[EndServerSL]; END; IF badList # NIL THEN ReturnDefs.BadRecipients[badList, body]; IF pendingCount = 0 OR localCount # 0 OR remoteCount # 0 OR badCount # 0 OR lockCount # 0 THEN BEGIN log: STRING = [140]; called: BOOLEAN _ FALSE; Add: PROC[count: CARDINAL, text: STRING] = BEGIN IF count # 0 THEN BEGIN String.AppendString[log, ", "L]; String.AppendDecimal[log, count]; String.AppendString[log, text]; called _ TRUE; END; END; String.AppendString[log, "Delivered "L]; ProtocolDefs.AppendTimestamp[log, slHeader.created]; Add[localCount, " local"L]; Add[remoteCount, " remote"L]; Add[pendingCount, " pending"L]; Add[badCount, " bad"L]; Add[lockCount, " locked"L]; IF NOT called THEN String.AppendString[log, ", no recipients"L]; LogDefs.WriteLogEntry[log]; -- stats: -- LogDefs.WriteLogEntry[recipientLog]; messageTotal _ messageTotal + 1; localTotal _ localTotal + localCount; remoteTotal _ remoteTotal + remoteCount; badTotal _ badTotal + badCount; END; END; SLDefs.SLEndRead[handle]; END; END; daySecs: LONG CARDINAL = 24 * LONG[60*60]; ElapsedDays: PROC[from: BodyDefs.PackedTime, interval: CARDINAL] RETURNS[tooMany: BOOLEAN] = -- Determines whether more than "interval" working days have elapsed -- since "from". BEGIN -- For every five working days, we allow seven elapsed days -- -- For under five working days, must correct for weekends -- now: Time.Packed = Time.Current[]; completeWeeks: CARDINAL = (interval/5)*7; partialWeek: CARDINAL = interval MOD 5; limit: LONG CARDINAL _ from + (completeWeeks+partialWeek) * daySecs; IF now < limit THEN RETURN[FALSE] -- optimize most calls -- ELSE BEGIN limitDay: CARDINAL = Time.Unpack[LOOPHOLE[limit,Time.Packed]].weekday; saturday: CARDINAL = 5; sunday: CARDINAL = 6; SELECT limitDay FROM saturday => SELECT partialWeek FROM 0 => NULL; -- any time in the weekend is ok -- ENDCASE => limit _ limit + 2 * daySecs; sunday => SELECT partialWeek FROM 0 => NULL; -- any time in the weekend is ok -- 1 => -- limit should be set to end of following Monday -- limit _ limit + 2 * daySecs; ENDCASE => limit _ limit + 2 * daySecs; ENDCASE => IF limitDay < partialWeek THEN limit _ limit + MIN[partialWeek-limitDay,2] * daySecs; RETURN[ now > limit ] END END; forever: BOOLEAN _ FALSE; readPendingRunningTime: LONG CARDINAL _ 10 * 60; --seconds-- SortQueue: PROC[queue: SLDefs.SLQueue] = BEGIN which: Class = SELECT queue FROM input => normal, ENDCASE => other; operation: PolicyDefs.Operation = SELECT queue FROM input => readInput, pending => readPending, mailbox => readMailbox, ENDCASE => ERROR; DO exhaustion: LONG CARDINAL; IF forever THEN SLDefs.WaitForNonEmpty[queue]; SELECT queue FROM input => NULL; pending => PolicyDefs.ReadPendingPause[]; mailbox => MailboxDefs.WaitForUnlocked[]; ENDCASE => ERROR; ClaimSorter[which]; exhaustion _ Time.Current[] + readPendingRunningTime; THROUGH [1..SLDefs.GetCount[queue]] DO SL: HeapDefs.ReaderHandle; handle: SLDefs.SLReadHandle; body: HeapDefs.ObjectNumber; IF which # other OR NOT PolicyDefs.CheckOperation[operation] THEN BEGIN ReleaseSorter[which]; IF forever THEN PolicyDefs.WaitOperation[operation] ELSE { IF NOT PolicyDefs.CheckOperation[operation] THEN RETURN }; ClaimSorter[which]; END; [handle, body, SL] _ SLDefs.SLStartRead[queue]; Sort[handle, body, SL]; PolicyDefs.EndOperation[operation]; IF queue = input THEN ConsiderExpress[]; IF queue = pending AND Time.Current[] > exhaustion AND waiters # 0 THEN EXIT; ENDLOOP; ReleaseSorter[which]; IF NOT forever THEN EXIT; ENDLOOP; END; otherQueueCount: CARDINAL _ 0; -- give priority to non input queues -- waiters: CARDINAL _ 0; -- number of processes wanting the sorter -- sorterInUse: BOOLEAN _ FALSE; -- mutual exclusion on calls of "Sort" -- sorterCond: CONDITION; Class: TYPE = { normal, other }; -- class = other has priority -- ClaimSorter: ENTRY PROC[which: Class] = BEGIN waiters _ waiters+1; IF which = other THEN otherQueueCount _ otherQueueCount+1; WHILE sorterInUse OR ( which # other AND otherQueueCount # 0 ) DO WAIT sorterCond ENDLOOP; sorterInUse _ TRUE; waiters _ waiters-1; END; ReleaseSorter: ENTRY PROC[which: Class] = BEGIN sorterInUse _ FALSE; IF which = other THEN otherQueueCount _ otherQueueCount-1; BROADCAST sorterCond; END; pony: CONDITION _ [timeout:0]; USPS: PROC = BEGIN DO handle: SLDefs.SLReadHandle; body: HeapDefs.ObjectNumber; SL: HeapDefs.ReaderHandle; SLDefs.WaitForNonEmpty[express]; CheckExpress[]; PolicyDefs.WaitOperation[readExpress]; IF PolicyDefs.ExpressAllowed[SLDefs.GetCount[input]] THEN BEGIN [handle, body, SL] _ SLDefs.SLStartRead[express]; HeapDefs.HeapEndRead[SL]; SLDefs.SLTransfer[handle, input]; END; PolicyDefs.EndOperation[readExpress]; ENDLOOP; END; CheckExpress: ENTRY PROC = INLINE BEGIN UNTIL PolicyDefs.ExpressAllowed[SLDefs.GetCount[input]] DO WAIT pony ENDLOOP; END; ConsiderExpress: ENTRY PROC = INLINE { NOTIFY pony }; -- The synchronization for sitecache flushing and possible remailing is -- complicated. The cause of the complication is that "Deliver" might -- have found someone to be local but not placed the message in the -- mailbox, at the time that we flush the cache. This is prevented by -- the entry procedures "DeliverLocal" and "FlushCache". FlushCacheAndRemail: PUBLIC PROC[ who: BodyDefs.RName ] = BEGIN FlushCache: ENTRY PROC = { SiteCacheDefs.SingleFlush[who] }; info: SiteCacheDefs.RemailInfo; FlushCache[]; info _ SiteCacheDefs.NeedToRemail[who]; IF info # stillLocal THEN MailboxDefs.Remail[who: who, valid: info = remail]; END; StartLog: PROCEDURE = BEGIN -- LogDefs.DisplayNumber["Local recipients"L, [short[@localTotal]] ]; -- LogDefs.DisplayNumber["Remote recipients"L, [short[@remoteTotal]] ]; -- LogDefs.DisplayNumber["Bad recipients"L, [short[@badTotal]] ]; END; Process.DisableTimeout[@sorterCond]; StartLog[]; -- empty queues once synchronously, to get us off the ground -- SortQueue[queue: mailbox]; SortQueue[queue: input]; forever _ TRUE; Process.Detach[ FORK SortQueue[queue: input] ]; Process.Detach[ FORK SortQueue[queue: mailbox] ]; Process.Detach[ FORK SortQueue[queue: pending] ]; Process.Detach[ FORK USPS[] ]; END.