DIRECTORY BasicTime USING [GMT, Now, nullGMT, Period, Unpack, Unpacked, Update], Convert USING [RopeFromInt, RopeFromTime], GVBasics USING [oldestTime, RopeFromTimestamp, Timestamp], GVNames USING [GetMembers, MemberInfo], GVProtocol USING [ReceiveTimestamp], IO USING [Close, int, Put, PutChar, PutF, PutFR, PutRope, rope, STREAM, time], Process USING [Detach, SecondsToTicks], Real USING [InlineRound], Rope USING [Cat, Equal, Find, ROPE], SafeStorage USING [ReclaimCollectibleObjects], VM USING [], VMStatistics USING [VirtualAllocation], Queue USING [Create, Dequeue, ElemProc, Enqueue, Enumerate, GetHead, GetProcessableElement, Inhibit, Inhibition, IsEmpty, MQ, Name, NewElem, QElem, Uninhibit, Value], SMTPControl USING [defaultInhibitTime], SMTPDescr USING [AddProcessedRecipient, Descr, DescrRep, Destroy, EnumerateRawRecipients, EnumerateRecipientHosts, GetArpaReversePath, GetExpiryDate, GetFormat, GetGvSender, GetPrecedeMsgText, GetUserHandle, HostProc, MoreRecipients, Print, RawRecipProc, RemoveRawRecipients, RemoveRecipientHost, RetrieveMsgStream, SetArpaReversePath, SetGvSender, SetPrecedeMsgText, StoreItemInfo, Unparse, WrongState], SMTPGVSend USING [totalGvMsgsSent, totalGvBytesSent, Connection, Failed, Open, SendItem], SMTPQueue USING [], SMTPSend USING [totalArpaMsgsSent, totalArpaBytesSent, Close, Connection, Failed, Open, SendItem], SMTPSupport USING [AuthorizationCheck, CheckHeader, Log, NotifySender], SMTPSyntax USING [EnumerateGVItems, GVItemProc, HostAndUser, ReceiveRName, ReversePath, UnBlessReturnPath]; SMTPQueueImpl: CEDAR MONITOR IMPORTS BasicTime, Convert, GVBasics, GVNames, GVProtocol, IO, Process, Queue, Real, Rope, SafeStorage, VMStatistics, SMTPControl, SMTPDescr, SMTPGVSend, SMTPSend, SMTPSupport, SMTPSyntax EXPORTS SMTPQueue = BEGIN Descr: TYPE = SMTPDescr.Descr; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; QUEUE: TYPE = Queue.MQ; ItemOnly: TYPE = REF Descr; -- an item for delivery to one or more recipients DeliveryPkg: TYPE = REF DeliveryPkgRep; -- an item ready for delivery to a clump of recipients at the same host DeliveryPkgRep: TYPE = RECORD[descr: Descr, recipList: LIST OF ROPE]; RecipBundle: TYPE = REF RecipBundleRep; -- a queue of delivery pkgs for a single host RecipBundleRep: TYPE = RECORD [ queue: QUEUE, sent, recv: INT _ 0, lastUp: BasicTime.GMT]; EmptyQueues: QUEUE _ Queue.Create["Empty"]; -- queue of RecipBundles GVQueue: QUEUE _ Queue.Create["GV"]; -- queue of RecipBundles ExpressQueue: QUEUE _ Queue.Create["Ex"]; -- queue of RecipBundles ArpaExpressQueue: QUEUE _ Queue.Create["ArpaEx"]; -- queue of RecipBundles HostQueues: QUEUE _ Queue.Create["ARPA"]; -- queue of RecipBundles SickQueues: QUEUE _ Queue.Create["Sick"]; -- queue of RecipBundles BadItemQueue: QUEUE _ Queue.Create["Bad"]; -- queue of DeliveryPkgs tempQueue: QUEUE _ Queue.Create["Temp"]; -- queue of DeliveryPkgs gvHostName: ROPE _ "Grapevine"; -- Wired into format of msg saved on disk expressHostName: ROPE _ "Express"; expressMail: LIST OF ROPE; arpaExpressHostName: ROPE _ "ArpaExpress"; arpaExpressMail: LIST OF ROPE; notExpressMail: LIST OF ROPE; noUpArrowDLs: PUBLIC LIST OF ROPE; expressOK: BOOL _ TRUE; debug: BOOL _ FALSE; ExpressList: PUBLIC PROC RETURNS[list: LIST OF ROPE] = {RETURN[expressMail]}; SetExpressOK: PUBLIC PROC [ok: BOOL] = {expressOK _ ok}; Express: PROC [names: LIST OF ROPE] RETURNS [name: ROPE] = { UNTIL names = NIL DO IF NotExpress[names.first] THEN RETURN[NIL]; FOR list: LIST OF ROPE _ expressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first]; ENDLOOP; names _ names.rest; ENDLOOP; RETURN[NIL]; }; NotExpress: PROC [name: ROPE] RETURNS [BOOLEAN] = { FOR list: LIST OF ROPE _ notExpressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[name, list.first, FALSE] THEN RETURN[TRUE]; ENDLOOP; RETURN[FALSE]; }; ArpaExpress: PROC [names: LIST OF ROPE] RETURNS [name: ROPE] = { UNTIL names = NIL DO FOR list: LIST OF ROPE _ arpaExpressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first]; ENDLOOP; names _ names.rest; ENDLOOP; RETURN[NIL]; }; DLWithoutUpArrow: PUBLIC PROC [dl: ROPE] RETURNS [valid: BOOL] = { FOR list: LIST OF ROPE _ noUpArrowDLs, list.rest UNTIL list = NIL DO IF Rope.Equal[dl, list.first, FALSE] THEN RETURN[TRUE]; ENDLOOP; RETURN[FALSE]; }; GetQueueForNewMessage: INTERNAL PROC [home: QUEUE, name: ROPE] RETURNS [queue: QUEUE] = { Find: Queue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], name, FALSE] THEN { old _ qElem; queue _ msgs.queue; continue _ FALSE; }; }; old: Queue.QElem; new: RecipBundle; GVQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; ExpressQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; ArpaExpressQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; HostQueues.Enumerate[Find]; IF queue # NIL THEN RETURN; SickQueues.Enumerate[Find]; IF queue # NIL THEN RETURN; EmptyQueues.Enumerate[Find]; IF queue # NIL THEN { MoveAndLog[from: EmptyQueues, to: home, qElem: old]; RETURN; }; new _ NEW[RecipBundleRep _ [ queue: Queue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]]; EnqueueAndLog[queue: home, qElem: Queue.NewElem[new]]; RETURN[new.queue]; }; PrintItem: PUBLIC ENTRY PROC [item: INT, out: STREAM] = { descr: Descr; qElem: Queue.QElem; [qElem, descr] _ DescrFromHandle[item]; IF descr = NIL THEN {out.PutRope["No item with that handle.\n"]; RETURN}; descr.Print[out: out, form: short]; PrintInibition[qElem, out, NIL]; }; DescrFromHandle: INTERNAL PROC [handle: INT] RETURNS [qe: Queue.QElem, descr: Descr] = { found: BOOL _ FALSE; Check: ItemProc = { qe _ qElem; descr _ DescrFromElemVal[qElem.Value[]]; IF handle = descr.GetUserHandle[] THEN {found _ TRUE; continue _ FALSE}; }; EnumerateItems[Check]; IF ~found THEN descr _ NIL; }; PrintQueue: PUBLIC ENTRY PROC [name: ROPE, out: STREAM] = { PrintQueueInner[name, out]; }; PrintQueueInner: INTERNAL PROC [name: ROPE, out: STREAM] = { queue: QUEUE; ScanForInfo: Queue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF queue # msgs.queue THEN RETURN [TRUE]; out.PutF[ "\tsent: %G, recv: %G, last up: ", IO.int[msgs.sent], IO.int[msgs.recv]]; IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"] ELSE out.Put[IO.time[msgs.lastUp]]; out.PutRope["\n"]; PrintInibition[qElem, out, "\t"]; RETURN[FALSE]; }; PrintVal: Queue.ElemProc = { value: REF ANY = qElem.Value[]; SELECT ValTypeOfElem[value] FROM RecipBundleT => { msgs: RecipBundle = NARROW[value]; out.PutF[ "\t%G:\n\t\tsent: %G, recv: %G, last up: ", IO.rope[msgs.queue.Name[]], IO.int[msgs.sent], IO.int[msgs.recv]]; IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"] ELSE out.Put[IO.time[msgs.lastUp]]; out.PutRope["\n"]; PrintInibition[qElem, out, "\t\t"]; msgs.queue.Enumerate[PrintVal]; }; ENDCASE => { out.PutRope["\t\t"]; PrintElemVal[value, out]; out.PutChar['\n]; }; }; IF name = NIL THEN { PrintQueueInner[EmptyQueues.Name[], out]; PrintQueueInner[GVQueue.Name[], out]; PrintQueueInner[ExpressQueue.Name[], out]; PrintQueueInner[ArpaExpressQueue.Name[], out]; PrintQueueInner[HostQueues.Name[], out]; PrintQueueInner[SickQueues.Name[], out]; RETURN; }; queue _ FindQueue[name]; IF queue = NIL THEN { out.PutRope["Unknown queue: "]; out.PutRope[name]; out.PutRope[".\n"]; RETURN; }; out.PutRope[queue.Name[]]; out.PutRope[":\n"]; EmptyQueues.Enumerate[ScanForInfo]; GVQueue.Enumerate[ScanForInfo]; ExpressQueue.Enumerate[ScanForInfo]; ArpaExpressQueue.Enumerate[ScanForInfo]; HostQueues.Enumerate[ScanForInfo]; SickQueues.Enumerate[ScanForInfo]; queue.Enumerate[PrintVal]; }; FindQueue: INTERNAL PROC [name: ROPE] RETURNS [queue: QUEUE _ NIL] = { bundle: RecipBundle; SELECT TRUE FROM Rope.Equal[name, EmptyQueues.Name[], FALSE] => RETURN[EmptyQueues]; Rope.Equal[name, GVQueue.Name[], FALSE] => RETURN[GVQueue]; Rope.Equal[name, ExpressQueue.Name[], FALSE] => RETURN[ExpressQueue]; Rope.Equal[name, ArpaExpressQueue.Name[], FALSE] => RETURN[ArpaExpressQueue]; Rope.Equal[name, HostQueues.Name[], FALSE] => RETURN[HostQueues]; Rope.Equal[name, SickQueues.Name[], FALSE] => RETURN[SickQueues]; Rope.Equal[name, BadItemQueue.Name[], FALSE] => RETURN[BadItemQueue]; Rope.Equal[name, tempQueue.Name[], FALSE] => RETURN[tempQueue]; ENDCASE; bundle _ GetBundle[name, FALSE]; IF bundle = NIL THEN RETURN[NIL]; RETURN[bundle.queue]; }; GetBundle: INTERNAL PROC [name: ROPE, create: BOOL] RETURNS [bundle: RecipBundle _ NIL] = { Find: Queue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], name, FALSE] THEN { bundle _ msgs; continue _ FALSE; }; }; EmptyQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; GVQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; ExpressQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; ArpaExpressQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; HostQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; SickQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; IF ~create THEN RETURN; bundle _ NEW[RecipBundleRep _ [ queue: Queue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]]; EnqueueAndLog[queue: EmptyQueues, qElem: Queue.NewElem[bundle]]; }; PrintElemVal: PROC [value: REF ANY, out: STREAM] = { SELECT ValTypeOfElem[value] FROM DeliveryPkgT, RecipBundleT => out.PutRope[Unparse[value]]; ENDCASE => ERROR UserError["Somehow you have tried to print something that isn't a queue."]; }; PrintInibition: PROC [qElem: Queue.QElem, out: STREAM, prefix: ROPE] = { for: INT; why: ROPE; [for, why] _ qElem.Inhibition[]; IF for > 0 THEN out.PutF[ "%GInhibited for %G:%02G because: %G.\n", IO.rope[prefix], IO.int[for/60], IO.int[for MOD 60], IO.rope[why]]; }; Unparse: PROC [value: REF ANY] RETURNS [ROPE] = { SELECT TRUE FROM ISTYPE[value, ItemOnly] => RETURN[NARROW[value, ItemOnly]^.Unparse[]]; ISTYPE[value, DeliveryPkg] => { pkg: DeliveryPkg = NARROW[value]; r: ROPE _ Rope.Cat[pkg.descr.Unparse[], " for "]; FOR l: LIST OF ROPE _ pkg.recipList, l.rest UNTIL l = NIL DO r _ Rope.Cat[r, IF l = pkg.recipList THEN NIL ELSE ", ", l.first]; ENDLOOP; RETURN[r]; }; ISTYPE[value, RecipBundle] => RETURN[NARROW[value, RecipBundle].queue.Name[]]; ENDCASE => RETURN["(Unparse ENDCASE)"]; }; StartServer: PUBLIC PROC = TRUSTED { CheckLists[]; Process.Detach[FORK SendGVMessages[]]; Process.Detach[FORK SendArpaMessages[]]; Process.Detach[FORK Background[]]; }; newMessageArrived: CONDITION _ [timeout: Process.SecondsToTicks[30]]; StartNewMessage: PUBLIC ENTRY PROC [source: ROPE] = {}; -- Hack to hang on ML before reading message from the net. Otherwise we let duplicate messages through because we don't "accept" them after they get onto the disk. AddNewMessage: PUBLIC ENTRY PROC [descr: Descr, source: ROPE] = { host: RecipBundle _ GetBundle[source, TRUE]; qElem: Queue.QElem _ Queue.NewElem[NEW[Descr _ descr]]; value: REF ANY _ qElem.Value[]; CheckLists[]; SELECT descr.GetFormat[] FROM gv => { from: ROPE _ descr.GetGvSender[]; IF from = NIL THEN { arpaReversePath, gvSender: ROPE; gvID: ROPE; temp: IO.STREAM = descr.RetrieveMsgStream[]; RetrieveReturnTo: SMTPSyntax.GVItemProc = { IF itemHeader.type = PostMark THEN { timestamp: GVBasics.Timestamp _ GVProtocol.ReceiveTimestamp[itemStream]; gvID _ GVBasics.RopeFromTimestamp[timestamp]; }; IF itemHeader.type = ReturnTo THEN { gvSender _ SMTPSyntax.ReceiveRName[itemStream]; from _ gvSender; continue _ FALSE; }; }; SMTPSyntax.EnumerateGVItems[temp, RetrieveReturnTo]; temp.Close[]; SMTPSupport.Log[verbose, descr.Unparse[], " is ", gvID, "."]; SMTPDescr.SetGvSender[descr, gvSender]; arpaReversePath _ SMTPSyntax.ReversePath[gvSender]; descr.SetArpaReversePath[arpaReversePath]; }; SMTPSupport.Log[verbose, descr.Unparse[], " is from ", from, "."]; IF ~SMTPSupport.AuthorizationCheck[from] OR ~SMTPSupport.CheckHeader[from, descr] THEN { BEGIN ENABLE SMTPDescr.WrongState => CONTINUE; SMTPDescr.RemoveRawRecipients[descr]; END; ConsiderItemDestruction[descr]; RETURN; }; }; arpa => { arpaReversePath: ROPE; arpaReversePath _ SMTPDescr.GetArpaReversePath[descr]; arpaReversePath _ SMTPSyntax.UnBlessReturnPath[arpaReversePath]; IF arpaReversePath = NIL THEN { arpaReversePath _ SMTPDescr.GetGvSender[descr]; arpaReversePath _ SMTPSyntax.UnBlessReturnPath[arpaReversePath]; arpaReversePath _ SMTPSyntax.ReversePath[arpaReversePath]; }; descr.SetArpaReversePath[arpaReversePath]; SMTPSupport.Log[verbose, descr.Unparse[], " is from ", arpaReversePath, "."]; }; ENDCASE => ERROR; BEGIN ENABLE SMTPDescr.WrongState => CONTINUE; --already processed redistributed: ROPE; ProcessRawRecipient: SMTPDescr.RawRecipProc = { descr: Descr = NARROW[procData, ItemOnly]^; hostName, userName: ROPE; [hostName, userName] _ SMTPSyntax.HostAndUser[rawRecipient]; IF hostName = NIL THEN { hostName _ gvHostName; IF Rope.Find[userName, "^."] # -1 OR DLWithoutUpArrow[userName] THEN { IF redistributed = NIL THEN redistributed _ "Redistributed: " ELSE redistributed _ Rope.Cat[redistributed, ", "]; IF Rope.Find[rawRecipient, "Xerox.ARPA", 0, FALSE] # -1 THEN redistributed _ Rope.Cat[redistributed, rawRecipient] ELSE redistributed _ Rope.Cat[redistributed, userName];}; }; descr.AddProcessedRecipient[hostName, userName, TRUE]; }; descr.EnumerateRawRecipients[ProcessRawRecipient, value]; IF redistributed # NIL THEN descr.SetPrecedeMsgText[Rope.Cat[redistributed, "\n", descr.GetPrecedeMsgText[]]]; END; descr.StoreItemInfo[]; -- update info on disk descr.EnumerateRecipientHosts[QueueForDelivery, value]; ConsiderItemDestruction[descr]; -- Empty, Crashed after sending to last host host.recv _ host.recv + 1; host.lastUp _ BasicTime.Now[]; }; QueueForDelivery: INTERNAL SMTPDescr.HostProc = { descr: Descr = NARROW[procData, ItemOnly]^; mainQueue, hostQueue: QUEUE; queueName: ROPE; gv: BOOL = Rope.Equal[hostName, gvHostName]; SELECT TRUE FROM ~gv => { queueName _ hostName; mainQueue _ HostQueues; NOTIFY newArpaMessage; }; (queueName _ Express[userNames]) # NIL => { mainQueue _ ExpressQueue}; (queueName _ ArpaExpress[userNames]) # NIL => { mainQueue _ ArpaExpressQueue}; ENDCASE => { queueName _ gvHostName; mainQueue _ GVQueue; NOTIFY newGvMessage; }; hostQueue _ GetQueueForNewMessage[mainQueue, queueName]; EnqueueAndLog[queue: hostQueue, qElem: Queue.NewElem[NEW[DeliveryPkgRep _ [descr, userNames]]]]; }; newArpaMessage: CONDITION _ [timeout: Process.SecondsToTicks[30]]; GetProcessableHost: ENTRY PROC RETURNS [host: Queue.QElem] = { DO host _ HostQueues.GetProcessableElement[]; IF host # NIL THEN { MoveHostToEnd[HostQueues, host]; RETURN; }; WAIT newArpaMessage; ENDLOOP; }; SendArpaMessages: PROC = { DO host: Queue.QElem _ GetProcessableHost[]; recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; IF ProcessArpaHost[host] THEN LOOP; IF host.Inhibition[].for > 0 THEN LOOP; IF hostQueue.IsEmpty[] THEN LOOP; -- Bogus host name MoveToSick[HostQueues, host, "Didn't make any progress"]; ENDLOOP; }; ProcessArpaHost: PROC [host: Queue.QElem] RETURNS [progress: BOOL] = { hostStream: SMTPSend.Connection; recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; startTime: BasicTime.GMT _ BasicTime.Now[]; BEGIN -- so hostStream and hostQueue will be in scope for EXITS msg: Queue.QElem; progress _ FALSE; IF hostQueue.IsEmpty[] THEN GOTO FlushHostQueue; hostStream _ SMTPSend.Open[hostQueue.Name[] ! SMTPSend.Failed => { SELECT withItem FROM irrelevant, retryLater => { IF problemWithHost THEN MoveToSick[HostQueues, host, reason]; GOTO Done; }; returnToSender => { -- there must be a problem with this Host (no item was given) UNTIL hostQueue.IsEmpty[] DO qElem: Queue.QElem = hostQueue.GetHead[]; pkg: DeliveryPkg = NARROW[qElem.Value[]]; descr: Descr = pkg.descr; IF debug THEN SIGNAL LookAtThis; SMTPSupport.NotifySender[descr, reason]; FinishedWithMsg[queue: hostQueue, pkg: pkg]; ENDLOOP; IF recipients.recv # 0 THEN GOTO FlushHostQueue; -- Foo@Concord.ms (yetch) FlushThisHost[queue: HostQueues, value: host.Value[]]; GOTO Done; }; ENDCASE => ERROR; }]; -- putOnBadQueue makes no sense for Open UNTIL (msg _ GetProcessableElement[hostQueue]) = NIL DO IF BasicTime.Period[from: startTime, to: BasicTime.Now[]] > 5*60 THEN { SMTPSupport.Log[important, "Already spent more than 5 minutes sending mail to " , Unparse[host.Value[]], "."]; EXIT; }; IF ~SendArpaMsg[hostStream, host, msg] THEN { hostStream.Close[TRUE]; RETURN; }; recipients.sent _ recipients.sent + 1; recipients.lastUp _ BasicTime.Now[]; progress _ TRUE; ENDLOOP; GOTO FlushHostQueue; EXITS FlushHostQueue => { IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: HostQueues, host: host]; }; Done => NULL; END; IF hostStream # NIL THEN hostStream.Close[FALSE]; }; LookAtThis: SIGNAL = CODE; SendArpaMsg: PROC [stream: SMTPSend.Connection, host, msg: Queue.QElem] RETURNS [continue: BOOL _ TRUE] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; pkg: DeliveryPkg = NARROW[msg.Value[]]; descr: Descr = pkg.descr; BEGIN -- so the above vars will be in scope for EXITS SMTPSend.SendItem[descr, pkg.recipList, stream ! SMTPSend.Failed => { IF problemWithHost THEN { MoveToSick[HostQueues, host, reason]; continue _ FALSE; };-- stop the enumeration of SMTP items (stream may be aborted) SELECT withItem FROM retryLater => { IF ~problemWithHost THEN msg.Inhibit[for: SMTPControl.defaultInhibitTime, why: reason]; GOTO Done; }; returnToSender => { IF debug THEN SIGNAL LookAtThis; SMTPSupport.NotifySender[descr, reason]; GOTO RemoveItem; }; putOnBadQueue => { MoveToBad[queue: hostQueue, msg: msg]; GOTO Done; }; ENDCASE => ERROR; -- shouldn't be irrelevant }]; GOTO RemoveItem; EXITS RemoveItem => { FinishedWithMsg[queue: hostQueue, pkg: pkg]; }; Done => NULL; END; }; newGvMessage: CONDITION _ [timeout: Process.SecondsToTicks[30]]; GetGvClump: ENTRY PROC RETURNS [host: Queue.QElem] = { DO host _ GVQueue.GetProcessableElement[]; IF host # NIL THEN RETURN; WAIT newGvMessage; ENDLOOP; }; SendGVMessages: PROC = { DO host: Queue.QElem _ GetGvClump[]; IF ProcessGVQueue[host] THEN LOOP; IF host.Inhibition[].for > 0 THEN LOOP; MoveToSick[GVQueue, host, "Didn't make any progress"]; ENDLOOP; }; gvHandle: SMTPGVSend.Connection _ SMTPGVSend.Open[]; -- Only need one ProcessGVQueue: PROC [host: Queue.QElem] RETURNS [progress: BOOL] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; msg: Queue.QElem; progress _ FALSE; UNTIL (msg _ GetProcessableElement[hostQueue]) = NIL DO IF ~SendGVMsg[host, msg] THEN RETURN; recipients.sent _ recipients.sent + 1; recipients.lastUp _ BasicTime.Now[]; progress _ TRUE; ENDLOOP; IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: GVQueue, host: host]; }; SendGVMsg: PROC [host, msg: Queue.QElem] RETURNS [progress: BOOL _ TRUE] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; pkg: DeliveryPkg = NARROW[msg.Value[]]; descr: Descr = pkg.descr; BEGIN -- so the above vars will be in scope for EXITS SMTPGVSend.SendItem[descr, pkg.recipList, gvHandle ! SMTPGVSend.Failed => { IF problemWithGV THEN { MoveToSick[GVQueue, host, reason]; progress _ FALSE; }; -- stop the enumeration of GV item SELECT withItem FROM retryLater => { IF ~problemWithGV THEN msg.Inhibit[for: SMTPControl.defaultInhibitTime, why: reason]; GOTO Done; }; returnToSender => { IF debug THEN SIGNAL LookAtThis; SMTPSupport.NotifySender[descr, reason]; GOTO RemoveItem; }; putOnBadQueue => { MoveToBad[queue: hostQueue, msg: msg]; GOTO Done; }; ENDCASE => ERROR; -- shouldn't be "irrelevant" }]; GOTO RemoveItem; EXITS RemoveItem => { FinishedWithMsg[queue: hostQueue, pkg: pkg]; }; Done => NULL; END; }; Background: ENTRY PROC = { now: BasicTime.GMT _ BasicTime.Now[]; snooz: CONDITION _ [timeout: Process.SecondsToTicks[1*60]]; DO WAIT snooz; CheckLists[]; CheckVM[]; MaybePrintStats[]; ConsiderExpressMail[]; ConsiderOldMessages[]; ConsiderSickHosts[]; ENDLOOP; }; timeVMLastChecked: BasicTime.GMT _ BasicTime.Now[]; timeExpressLastChecked, timeArpaExpressLastChecked, timeNotExpressLastChecked, timeNoUpArrowLastChecked: BasicTime.GMT _ BasicTime.Update[BasicTime.Now[], - checkInterval]; expressStamp, arpaExpressStamp, notExpressStamp, noUpArrowStamp: GVBasics.Timestamp _ GVBasics.oldestTime; checkInterval: INT = 2*60*60; -- 2 hours CheckLists: PROC = { now: BasicTime.GMT _ BasicTime.Now[]; IF BasicTime.Period[from: timeExpressLastChecked, to: now] > checkInterval THEN GetExpressList[]; IF BasicTime.Period[from: timeArpaExpressLastChecked, to: now] > checkInterval THEN GetArpaExpressList[]; IF BasicTime.Period[from: timeNotExpressLastChecked, to: now] > checkInterval THEN GetNotExpressList[]; IF BasicTime.Period[from: timeNoUpArrowLastChecked, to: now] > checkInterval THEN GetNoUpArrowList[]; }; GetExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo _ GVNames.GetMembers["ExpressMail^.MS", expressStamp]; WITH m: memberInfo SELECT FROM noChange => timeExpressLastChecked _ BasicTime.Now[]; group => { timeExpressLastChecked _ BasicTime.Now[]; expressMail _ m.members; expressStamp _ m.stamp; SMTPSupport.Log[important, "Updated Express List."]; }; ENDCASE => NULL; }; GetArpaExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers["ArpaExpressMail^.MS", arpaExpressStamp]; WITH m: memberInfo SELECT FROM noChange => timeArpaExpressLastChecked _ BasicTime.Now[]; group => { timeArpaExpressLastChecked _ BasicTime.Now[]; arpaExpressMail _ m.members; arpaExpressStamp _ m.stamp; SMTPSupport.Log[important, "Updated Arpa Express List."]; }; ENDCASE => NULL; }; GetNotExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers["NotArpaExpressMail^.MS", notExpressStamp]; WITH m: memberInfo SELECT FROM noChange => timeNotExpressLastChecked _ BasicTime.Now[]; group => { timeNotExpressLastChecked _ BasicTime.Now[]; notExpressMail _ m.members; notExpressStamp _ m.stamp; SMTPSupport.Log[important, "Updated Not Express List."]; }; ENDCASE => NULL; }; GetNoUpArrowList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers["NoUpArrowDLs^.MS", noUpArrowStamp]; WITH m: memberInfo SELECT FROM noChange => timeNoUpArrowLastChecked _ BasicTime.Now[]; group => { timeNoUpArrowLastChecked _ BasicTime.Now[]; noUpArrowDLs _ m.members; noUpArrowStamp _ m.stamp; SMTPSupport.Log[important, "Updated No Up Arrow DLs."]; }; ENDCASE => NULL; }; CheckVM: PROC = TRUSTED { now: BasicTime.GMT _ BasicTime.Now[]; date: ROPE _ Convert.RopeFromTime[from: now, start: years, end: days, includeDayOfWeek: TRUE]; pagesAllocated, pagesFreed, pagesInPartition, freeVM: INT; IF BasicTime.Period[from: timeVMLastChecked, to: now] < 15*60 THEN RETURN; timeVMLastChecked _ now; [pagesAllocated, pagesFreed, pagesInPartition] _ VMStatistics.VirtualAllocation[normalVM]; freeVM _ MAX[0, pagesInPartition - pagesAllocated + pagesFreed]; SMTPSupport.Log[important, "Date: ", date, " Free VM is ", Convert.RopeFromInt[freeVM], " pages."]; IF freeVM > 2000 THEN RETURN; SMTPSupport.Log[important, "\n\n\n**** VM getting low. Here goes a TraceAndSweep.\n\n\n"]; SafeStorage.ReclaimCollectibleObjects[TRUE, TRUE]; [pagesAllocated, pagesFreed, pagesInPartition] _ VMStatistics.VirtualAllocation[normalVM]; freeVM _ MAX[0, pagesInPartition - pagesAllocated + pagesFreed]; SMTPSupport.Log[important, "Free VM is ", Convert.RopeFromInt[freeVM], " pages."]; }; totalGvMsgsSentLastHour: INT _ 0; totalGvBytesSentLastHour: INT _ 0; totalArpaMsgsSentLastHour: INT _ 0; totalArpaBytesSentLastHour: INT _ 0; totalGvMsgsSentLastDay: INT _ 0; totalGvBytesSentLastDay: INT _ 0; totalArpaMsgsSentLastDay: INT _ 0; totalArpaBytesSentLastDay: INT _ 0; hourStatsLastChecked: BasicTime.GMT _ BasicTime.Now[]; dayStatsLastChecked: BasicTime.GMT _ BasicTime.Now[]; MaybePrintStats: INTERNAL PROC = { now: BasicTime.GMT _ BasicTime.Now[]; hourSecs: INT = 3600; daySecs: INT = 86400; hourPeriod: INT _ BasicTime.Period[from: hourStatsLastChecked, to: now]; dayPeriod: INT _ BasicTime.Period[from: dayStatsLastChecked, to: now]; totalGvMsgsSentInHour: INT_0; totalArpaMsgsSentInHour: INT_0; totalGvMsgsSentInDay: INT_0; totalArpaMsgsSentInDay: INT_0; totalGvBytesSentInHour: INT_0; totalArpaBytesSentInHour: INT_0; totalGvBytesSentInDay: INT_0; totalArpaBytesSentInDay: INT_0; date: ROPE _ Convert.RopeFromTime[from: now, start: years, end: days, includeDayOfWeek: TRUE]; IF hourPeriod < 3600 THEN RETURN; hourStatsLastChecked _ now; SMTPSupport.Log[important, "Date: ", date, ", Hourly Statistics: "]; totalGvMsgsSentInHour _ Real.InlineRound[((SMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastHour)*1.0*hourSecs)/hourPeriod]; totalGvMsgsSentLastHour _ SMTPGVSend.totalGvMsgsSent; totalGvBytesSentInHour _ Real.InlineRound[((SMTPGVSend.totalGvBytesSent - totalGvBytesSentLastHour)*1.0*hourSecs)/hourPeriod]; totalGvBytesSentLastHour _ SMTPGVSend.totalGvBytesSent; SMTPSupport.Log[important, " Hourly GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalGvBytesSentInHour], " bytes."]; totalArpaMsgsSentInHour _ Real.InlineRound[((SMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastHour)*1.0*hourSecs)/hourPeriod]; totalArpaMsgsSentLastHour _ SMTPSend.totalArpaMsgsSent; totalArpaBytesSentInHour _ Real.InlineRound[((SMTPSend.totalArpaBytesSent - totalArpaBytesSentLastHour)*1.0*hourSecs)/hourPeriod]; totalArpaBytesSentLastHour _ SMTPSend.totalArpaBytesSent; SMTPSupport.Log[important, " Hourly Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInHour], " bytes."]; SMTPSupport.Log[important, " Arpa queue hosts ", Convert.RopeFromInt[CountQueueInternal["Arpa"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Arpa"]]]; SMTPSupport.Log[important, " Sick queue hosts ", Convert.RopeFromInt[CountQueueInternal["Sick"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Sick"]]]; SMTPSupport.Log[important, " Express queue hosts ", Convert.RopeFromInt[CountQueueInternal["Ex"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Ex"]]]; SMTPSupport.Log[important, " GV queue messages ", Convert.RopeFromInt[CountMessagesInternal["GV"]]]; IF dayPeriod < daySecs THEN RETURN; dayStatsLastChecked _ now; totalGvMsgsSentInDay _ Real.InlineRound[((SMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastDay)*1.0*daySecs)/dayPeriod]; totalGvMsgsSentLastDay _ SMTPGVSend.totalGvMsgsSent; totalGvBytesSentInDay _ Real.InlineRound[((SMTPGVSend.totalGvBytesSent - totalGvBytesSentLastDay)*1.0*daySecs)/dayPeriod]; totalGvBytesSentLastDay _ SMTPGVSend.totalGvBytesSent; SMTPSupport.Log[important, " Daily GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalGvBytesSentInDay], " bytes."]; totalArpaMsgsSentInDay _ Real.InlineRound[((SMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastDay)*1.0*daySecs)/dayPeriod]; totalArpaMsgsSentLastDay _ SMTPSend.totalArpaMsgsSent; totalArpaBytesSentInDay _ Real.InlineRound[((SMTPSend.totalArpaBytesSent - totalArpaBytesSentLastDay)*1.0*daySecs)/dayPeriod]; totalArpaBytesSentLastDay _ SMTPSend.totalArpaBytesSent; SMTPSupport.Log[important, " Daily Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInDay], " bytes."]; SMTPSupport.Log[important, " Total Arpa bound since reboot: ", Convert.RopeFromInt[SMTPSend.totalArpaMsgsSent], " messages, ", Convert.RopeFromInt[SMTPSend.totalArpaBytesSent], " bytes."]; SMTPSupport.Log[important, " Total GV bound since reboot: ", Convert.RopeFromInt[SMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[SMTPGVSend.totalGvBytesSent], " bytes."]; SMTPSupport.Log[important, " Totals since reboot: ", Convert.RopeFromInt[SMTPSend.totalArpaMsgsSent+SMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[SMTPSend.totalArpaBytesSent+SMTPGVSend.totalGvBytesSent], " bytes."]; }; expressDelay: INT _ 1; -- 1 minute default delay ConsiderExpressMail: INTERNAL PROC = { now: BasicTime.Unpacked _ BasicTime.Unpack[BasicTime.Now[]]; host: Queue.QElem; recipients: RecipBundle; hostQueue, from, gv: QUEUE; msg: Queue.QElem; minutes: INT; IF ~expressOK THEN RETURN; IF ~GVQueue.IsEmpty[] THEN RETURN; IF now.weekday IN [Monday..Friday] AND now.hour IN [8..12+8) THEN RETURN; host _ ExpressQueue.GetProcessableElement[]; IF host # NIL THEN { from _ ExpressQueue; MoveHostToEnd[ExpressQueue, host]; } ELSE { host _ ArpaExpressQueue.GetProcessableElement[]; IF host = NIL THEN RETURN; from _ ArpaExpressQueue; MoveHostToEnd[ArpaExpressQueue, host]; }; recipients _ NARROW[host.Value[]]; hostQueue _ recipients.queue; msg _ hostQueue.GetProcessableElement[]; IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: from, host: host]; IF msg = NIL THEN { UnlockedMoveToEmpty[from: from, host: host]; RETURN; }; gv _ GetQueueForNewMessage[GVQueue, gvHostName]; MoveAndLog[from: hostQueue, to: gv, qElem: msg]; IF hostQueue.IsEmpty[] THEN UnlockedMoveToEmpty[from: from, host: host]; NOTIFY newGvMessage; SELECT now.hour FROM 12+8 => minutes _ expressDelay + 4; -- 12 messages first hour 12+9 => minutes _ expressDelay + 3; -- 15 messages next hour 12+10 => minutes _ expressDelay + 2; -- 20 messages next hour ENDCASE => minutes _ expressDelay+1; -- 30 messages/hour => 197 msgs in 8 hrs IF ~(now.weekday IN[Monday..Friday]) THEN minutes _ expressDelay; DallyAWhile[minutes]; }; -- (At least 1) Extra minute in main loop DallyAWhile: INTERNAL PROC [minutes: INT] = { snooz: CONDITION _ [timeout: Process.SecondsToTicks[minutes*60]]; WAIT snooz; }; ConsiderSickHosts: INTERNAL PROC = { host: Queue.QElem _ SickQueues.GetProcessableElement[]; IF host # NIL THEN { recipients: RecipBundle _ NARROW[host.Value[]]; gv: BOOL _ Rope.Equal[recipients.queue.Name[], gvHostName]; target: QUEUE _ IF gv THEN GVQueue ELSE HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; }; MoveAllSickHosts: PUBLIC ENTRY PROC = { Uninhibit: Queue.ElemProc = { Queue.Uninhibit[qElem, SickQueues]; }; host: Queue.QElem; SickQueues.Enumerate[Uninhibit]; host _ SickQueues.GetProcessableElement[]; WHILE host # NIL DO MoveSickHostQueue[host]; host _ SickQueues.GetProcessableElement[]; ENDLOOP; }; MoveSickHostQueue: INTERNAL PROC[host: Queue.QElem] = { IF host # NIL THEN { recipients: RecipBundle _ NARROW[host.Value[]]; gv: BOOL _ Rope.Equal[recipients.queue.Name[], gvHostName]; target: QUEUE _ IF gv THEN GVQueue ELSE HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; }; FindSickHost: INTERNAL PROC[host: Rope.ROPE] RETURNS[element: Queue.QElem_NIL]= { Find: Queue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], host, FALSE] THEN { Queue.Uninhibit[qElem, SickQueues]; element _ qElem; continue _ FALSE; }; }; IF host # NIL THEN SickQueues.Enumerate[Find]; }; MoveSickHost: PUBLIC ENTRY PROC[name: Rope.ROPE_ NIL] = { host: Queue.QElem _ FindSickHost[name]; IF host # NIL THEN { target: QUEUE _ HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; NOTIFY newArpaMessage; }; }; CountQueue: PUBLIC ENTRY PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { n _ CountQueueInternal[name]; }; CountQueueInternal: INTERNAL PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { Count: Queue.ElemProc = { n _ n+1}; IF name = NIL THEN RETURN[-1]; FindQueue[name].Enumerate[Count]; }; CountMessages: PUBLIC ENTRY PROC[queueName: Rope.ROPE_NIL] RETURNS[n: INT_0]= { n _ CountMessagesInternal[queueName]; }; CountMessagesInternal: INTERNAL PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { Count: Queue.ElemProc = { n _ n+1}; CountNested: Queue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; msgs.queue.Enumerate[Count]; }; IF name = NIL THEN RETURN[-1]; SELECT TRUE FROM Rope.Equal[name, HostQueues.Name[], FALSE] => HostQueues.Enumerate[CountNested]; Rope.Equal[name, ExpressQueue.Name[], FALSE] => ExpressQueue.Enumerate[CountNested]; Rope.Equal[name, SickQueues.Name[], FALSE] => SickQueues.Enumerate[CountNested]; Rope.Equal[name, GVQueue.Name[], FALSE] => GVQueue.Enumerate[CountNested]; ENDCASE => n _ CountQueueInternal[name]; }; ConsiderOldMessages: INTERNAL PROC = { now: BasicTime.GMT _ BasicTime.Now[]; host: Queue.QElem _ SickQueues.GetProcessableElement[]; ConsiderExpiration: INTERNAL ItemProc = { value: REF ANY = qElem.Value[]; pkg: DeliveryPkg = NARROW[qElem.Value[]]; expiryDate: BasicTime.GMT _ SMTPDescr.GetExpiryDate[pkg.descr]; IF BasicTime.Period[from: now, to: expiryDate] < 0 THEN { reason: ROPE _ "Unable to deliver msg to "; FOR loosers: LIST OF ROPE _ pkg.recipList, loosers.rest UNTIL loosers = NIL DO reason _ Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first]; ENDLOOP; reason _ Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."]; MoveAndLog[from: queue, to: tempQueue, qElem: qElem]; SendItBack[queue: queue, qElem: qElem]; RETURN[FALSE]; }; }; IF ~GVQueue.IsEmpty[] THEN RETURN; IF ~HostQueues.IsEmpty[] THEN RETURN; EnumerateSickPkgs[ConsiderExpiration]; }; SendItBack: INTERNAL PROC [queue: QUEUE, qElem: Queue.QElem] = { pkg: DeliveryPkg = NARROW[qElem.Value[]]; reason: ROPE _ "Unable to deliver msg to "; FOR loosers: LIST OF ROPE _ pkg.recipList, loosers.rest UNTIL loosers = NIL DO reason _ Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first]; ENDLOOP; reason _ Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."]; SMTPSupport.NotifySender[pkg.descr, reason]; MoveAndLog[from: tempQueue, to: queue, qElem: qElem]; InternalFinishedWithMsg[queue: queue, pkg: pkg]; }; UserError: PUBLIC ERROR [reason: ROPE] = CODE; Failed: ERROR [why: ErrorCode, reason: ROPE] = CODE; ErrorCode: TYPE = {lockNotObtained, notOnQueue}; DescrFromElemVal: PROC [elemVal: REF ANY] RETURNS [Descr] = { SELECT ValTypeOfElem[elemVal] FROM ItemOnlyT => RETURN[NARROW[elemVal, ItemOnly]^]; DeliveryPkgT => RETURN[NARROW[elemVal, DeliveryPkg].descr]; ENDCASE => ERROR; }; ItemProc: TYPE = PROC [queue: QUEUE, qElem: Queue.QElem, procData: REF ANY _ NIL] RETURNS [continue: BOOL _ TRUE]; EnumerateSickPkgs: INTERNAL PROC [proc: ItemProc, data: REF ANY _ NIL] = { continueOverall: BOOL; queue: QUEUE; ScanQueue: Queue.ElemProc = { continueOverall _ proc[queue, qElem, data]; RETURN[continueOverall]; }; ScanHostQueue: Queue.ElemProc = { queue _ NARROW[qElem.Value[], RecipBundle].queue; queue.Enumerate[ScanQueue]; RETURN[continueOverall]; }; SickQueues.Enumerate[ScanHostQueue]; }; EnumerateItems: INTERNAL PROC [proc: ItemProc, data: REF ANY _ NIL] = { continueOverall: BOOL; queue: QUEUE; ScanQueue: Queue.ElemProc = { continueOverall _ proc[queue, qElem, data]; RETURN[continueOverall]; }; ScanHostQueue: Queue.ElemProc = { queue _ NARROW[qElem.Value[], RecipBundle].queue; queue.Enumerate[ScanQueue]; RETURN[continueOverall]; }; IF ~continueOverall THEN RETURN; GVQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; HostQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; SickQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; queue _ tempQueue; tempQueue.Enumerate[ScanQueue, data]; queue _ BadItemQueue; BadItemQueue.Enumerate[ScanQueue, data]; }; QueueProc: TYPE = PROC [queue: QUEUE, procData: REF ANY _ NIL] RETURNS [continue: BOOL _ TRUE]; EnumerateQueues: INTERNAL PROC [proc: QueueProc, data: REF ANY _ NIL] = { continueOverall: BOOL _ TRUE; ScanHostQueue: Queue.ElemProc = { queue: QUEUE _ NARROW[qElem.Value[], RecipBundle].queue; continueOverall _ proc[queue, data]; RETURN[continueOverall]; }; GVQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ArpaExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; HostQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; SickQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; IF ~proc[tempQueue, data] THEN RETURN; IF ~proc[BadItemQueue, data] THEN RETURN; }; GetProcessableElement: ENTRY PROC [queue: QUEUE] RETURNS [qElem: Queue.QElem] = { qElem _ queue.GetProcessableElement[]; IF qElem = NIL THEN RETURN; queue.Dequeue[qElem.Value[]]; queue.Enqueue[qElem]; }; MoveHostToEnd: INTERNAL PROC [queue: QUEUE, qElem: Queue.QElem] = { queue.Dequeue[qElem.Value[]]; queue.Enqueue[qElem]; }; MoveToEmpty: ENTRY PROC [from: QUEUE, host: Queue.QElem] = { MoveAndLog[from: from, to: EmptyQueues, qElem: host]; }; UnlockedMoveToEmpty: INTERNAL PROC [from: QUEUE, host: Queue.QElem] = { MoveAndLog[from: from, to: EmptyQueues, qElem: host]; }; MoveToSick: ENTRY PROC [from: QUEUE, host: Queue.QElem, reason: ROPE] = { host.Inhibit[for: SMTPControl.defaultInhibitTime, why: reason]; MoveAndLog[from: from, to: SickQueues, qElem: host]; }; MoveToBad: ENTRY PROC [queue: QUEUE, msg: Queue.QElem] = { DequeueAndLog[queue: queue, value: msg.Value[]]; EnqueueAndLog[queue: BadItemQueue, qElem: msg]; }; FlushThisHost: ENTRY PROC [queue: QUEUE, value: REF ANY] = { DequeueAndLog[queue: queue, value: value]; }; FinishedWithMsg: ENTRY PROC [queue: QUEUE, pkg: DeliveryPkg] = { InternalFinishedWithMsg[queue, pkg]; }; InternalFinishedWithMsg: INTERNAL PROC [queue: QUEUE, pkg: DeliveryPkg] = { pkg.descr.RemoveRecipientHost[queue.Name[]]; DequeueAndLog[queue: queue, value: pkg]; ConsiderItemDestruction[descr: pkg.descr]; }; LockedMoveAndLog: ENTRY PROC [from, to: QUEUE, qElem: Queue.QElem] = { MoveAndLog[from: from, to: to, qElem: qElem]; }; MoveAndLog: INTERNAL PROC [from, to: QUEUE, qElem: Queue.QElem] = { from.Dequeue[qElem.Value[]]; to.Enqueue[qElem]; SMTPSupport.Log[verbose, Unparse[qElem.Value[]], " moved from ", from.Name[], " to ", to.Name[], "."]; }; EnqueueAndLog: INTERNAL PROC [queue: QUEUE, qElem: Queue.QElem] = { queue.Enqueue[qElem]; SMTPSupport.Log[verbose, Unparse[qElem.Value[]], " added to ", queue.Name[], "."]; }; DequeueAndLog: INTERNAL PROC [queue: QUEUE, value: REF ANY] = { val: ROPE = Unparse[value]; queue.Dequeue[value]; SMTPSupport.Log[verbose, val, " removed from ", queue.Name[], "."]; }; ValTypeOfQueue: PROC [queue: QUEUE] RETURNS [ElemValType] = { RETURN[SELECT queue FROM tempQueue, BadItemQueue => DeliveryPkgT, EmptyQueues, GVQueue, ExpressQueue, HostQueues, SickQueues => RecipBundleT, ENDCASE => DeliveryPkgT ]; }; ValTypeOfElem: PROC [value: REF ANY] RETURNS [ElemValType] = { SELECT TRUE FROM ISTYPE[value, DeliveryPkg] => RETURN[DeliveryPkgT]; ISTYPE[value, RecipBundle] => RETURN[RecipBundleT]; ENDCASE => ERROR; }; ElemValType: TYPE = {ItemOnlyT, DeliveryPkgT, RecipBundleT}; ValTypeNames: ARRAY ElemValType OF ROPE = [ "Somebody is confused", Rope.Cat[ "DeliveryPkg (from ", ", an Host queue, or ", BadItemQueue.Name[], ")"], Rope.Cat["Host queue (from Empty, GV, Ex, ARPA, or Sick)"] ]; ConsiderItemDestruction: INTERNAL PROC [descr: Descr] = { queues: LIST OF QUEUE _ NIL; itemPresent: BOOL _ FALSE; onQueue: QUEUE; CollectQueues: QueueProc = {queues _ CONS[queue, queues]; }; EnumerateQueues[CollectQueues]; [itemPresent, onQueue] _ ItemIsOn[descr.GetUserHandle[], queues]; IF itemPresent THEN { IF ~descr.MoreRecipients[] AND onQueue # BadItemQueue THEN -- queue corruption SMTPSupport.Log[important, "There are no more recipients in the item descriptor for item", descr.Unparse[], ", but there is a reference to it on ", onQueue.Name[], ".\nThis should not have occurred; the demons should discard it."]; } ELSE { -- item not present on any queues IF descr.MoreRecipients[] THEN { qElem: Queue.QElem = Queue.NewElem[NEW[DeliveryPkgRep _ [descr: descr, recipList: NIL]]]; EnqueueAndLog[queue: BadItemQueue, qElem: qElem]; SMTPSupport.Log[ATTENTION, "There are more recipients in the item descriptor for item ", descr.Unparse[], ", though there is no reference to it on any queue.\n", "This should not have occurred. Have placed a deliveryPkg on ", BadItemQueue.Name[], "."]; } ELSE { descr.Destroy[]; }; }; }; ItemIsOn: INTERNAL PROC [handle: INT, queueList: LIST OF QUEUE] RETURNS [itemPresent: BOOL, onQueue: QUEUE] = { itemPresent _ FALSE; FOR qList: LIST OF QUEUE _ queueList, qList.rest UNTIL qList = NIL DO onQueue _ qList.first; IF RetrieveElemVal[onQueue, handle, FALSE] # NIL THEN { itemPresent _ TRUE; EXIT; }; ENDLOOP; }; RetrieveElemVal: PUBLIC PROC [queue: QUEUE, handle: INT, errorIfNotFound: BOOL _ TRUE] RETURNS [value: REF ANY _ NIL] = { SELECT ValTypeOfQueue[queue] FROM DeliveryPkgT => { Find: Queue.ElemProc = { IF handle = SMTPDescr.GetUserHandle[NARROW[qElem.Value[], DeliveryPkg].descr] THEN {value _ qElem.Value[]; continue _ FALSE}; }; queue.Enumerate[Find]; }; RecipBundleT => ERROR UserError[IO.PutFR["You have asked the queue module to retrieve item #%g from HostQueues, which doesn't directly contain items (possibly a code bug). Use HostQueue.", IO.int[handle]]]; ENDCASE => ERROR; IF value = NIL AND errorIfNotFound THEN { reason: ROPE = IO.PutFR[ "Msg #%g not found on %g.", IO.int[handle], IO.rope[queue.Name[]]]; ERROR UserError[reason]; }; }; -- end RetrieveElemVal END. .SMTPQueueImpl.mesa Copyright c 1985 by Xerox Corporation. All rights reserved. DCraft, December 21, 1983 9:02 pm Taft, February 4, 1984 4:30:19 pm PST Hal Murray May 29, 1985 2:17:17 am PDT John Larson, August 2, 1987 4:20:36 pm PDT Queue elements (which must be REFs) are DeliveryPkgs or RecipBundles. The Queues (Names are in ValTypeNames too) Note: The names of all sub-queues must be unique Special Host Names and such Policy controls Not currently in use yet. Try for an old one. Doesn't exist yet. Make it. Fish Sender out of body of msg, but not if ReturnToSender has already set one up. Beware of loops if you can't return a message to the original sender. -- Show old name if used on Open probably means the host name is unknown; all items should be returned Reject the whole list... Info for lists IF now.hour = 12+11 THEN RETURN; -- Archiver and RegPurger This heuristic is a hack. For easy messages it's too slow. For nasty ones, GV still gets swamped. IF ~HostQueues.IsEmpty[] THEN RETURN; Miscellaneous Will happily queue wrong value type on a queue. Check all item queues (excluding InSystemQueue) to determine if descr is on any of them. If so, the item should be retained, else destroyed unless there are further recipients in the descriptor (in which case there is some error so log an ATTENTION msg and place on BadItemQueue). Also log an error if the descr appears on some queue but has no further recipients. The alternative to this (and the way I first implemented it) is to check only if there are no further recipients in the recipient list field of the Descr, and assume that if there aren't then the item isn't on any pkg queue. If all of the code was right, and users could never confuse the queue system, this would be the better choice because all of the queues wouldn't have to be searched. However, we don't want pkgs on queues pointing to a destroyed item, and this is a server so we must try to act intelligently in the face of corruptions... Find the value of the element on queue with the given user handle. Value is NIL if handle not found on queue. Ê-– "cedar" style˜headšœ™Icodešœ Ïmœ1™Mšœ5˜5—šž˜Mšœ8˜8———Mšœ0žœ˜9—Mšœ9˜9šžœžœž˜MšœR˜R—Mšžœ˜Mšœ ˜-Mšœ7˜7Mšœ  ,˜LM˜Mšœ!˜!—š¡œžœ˜1Mšœžœ˜+Mšœžœ˜Mšœ žœ˜Mšœžœ$˜,šžœžœž˜šœ˜Mšœ˜Mšœ˜Mšžœ˜—šœ#žœ˜+Mšœ˜—šœ'žœ˜/Mšœ˜—šžœ˜ Mšœ˜Mšœ˜Mšžœ˜——Mšœ8˜8Mšœ6žœ+˜d—Mšœž œ)˜Bš¡œžœžœžœ˜>šž˜Mšœ*˜*Mšžœžœžœ$žœ˜@Mšžœ˜Mšžœ˜ ——š¡œžœ˜šž˜Mšœ)˜)Mšœžœ˜/Mšœ žœ˜$Mšžœžœžœ˜#Mšžœžœžœ˜'Mšžœžœžœ ˜4Mšœ9˜9Mšžœ˜ ——š¡œžœžœ žœ˜FMšœ ˜ Mšœžœ˜/Mšœ žœ˜$Mšœžœ˜+Mšžœ 9˜?Mšœ˜Mšœ žœ˜Mšžœžœžœ˜0šœB˜Bšžœ ž˜šœ˜šžœž˜Mšœ%˜%—Mšžœ ˜ —šœ =˜QM™MM™šžœž˜Mšœ)˜)Mšœžœ˜)Mšœ˜Mšžœžœžœ ˜ Mšœ(˜(Mšœ,˜,Mšžœ˜—Mšžœžœžœ ˜JMšœ6˜6Mšžœ ˜ —Mšžœžœ (˜>——šžœ,žœž˜7šžœ?žœ˜GMšœo˜oMšžœ˜—Mšžœ%žœžœžœ˜PMšœ&˜&Mšœ$˜$Mšœ žœ˜Mšžœ˜—Mšžœ˜šž˜šœ˜Mšžœžœ.˜I—Mšœžœ˜ —Mšžœ˜Mšžœžœžœžœ˜4—Mš¡ œžœžœ˜š ¡ œžœ7žœ žœžœ˜kMšœžœ˜/Mšœ žœ˜$Mšœžœ˜'Mšœ˜šžœ /˜5šœE˜Ešžœžœ˜Mšœ%˜%Mšœ žœ =˜Q—šžœ ž˜˜šžœž˜Mšœ>˜>—Mšžœ ˜ —˜Mšžœžœžœ ˜ M˜(Mšžœ˜—˜Mšœ&˜&Mšžœ ˜ —Mšžœžœ ˜,—Mšœ˜—Mšžœ ˜šž˜šœ˜Mšœ/˜/—Mšœžœ˜ —Mšžœ˜——Mšœž œ)˜@š¡ œžœžœžœ˜6šž˜Mšœ'˜'Mšžœžœžœžœ˜Mšžœ˜Mšžœ˜ ——š¡œžœ˜šž˜Mšœ!˜!Mšžœžœžœ˜"Mšžœžœžœ˜'Mšœ6˜6Mšžœ˜ ——Mšœ5 ˜Eš¡œžœžœ žœ˜EMšœžœ˜/Mšœ žœ˜$Mšœ˜Mšœ žœ˜šžœ,žœž˜7Mšžœžœžœ˜%Mšœ&˜&Mšœ$˜$Mšœ žœ˜Mšžœ˜—Mšžœžœ,˜G—š ¡ œžœžœ žœžœ˜LMšœžœ˜/Mšœ žœ˜$Mšœžœ˜'Mšœ˜šžœ /˜5šœL˜Lšžœžœ˜Mšœ"˜"Mšœ žœ "˜7—šžœ ž˜˜šžœž˜Mšœ>˜>Mšžœ ˜ ——˜Mšžœžœžœ ˜ Mšœ(˜(Mšžœ˜—˜Mšœ&˜&Mšžœ ˜ —Mšžœžœ ˜.—Mšœ˜—Mšžœ ˜šž˜šœ˜Mšœ/˜/—Mšœžœ˜ —Mšžœ˜——š¡ œžœžœ˜Mšœžœ˜%Mšœž œ+˜;šž˜Mšžœ˜ M˜ M˜ M˜M˜M˜M˜Mšžœ˜ —M˜—Mšœžœ˜3M˜šœ™MšœO£œ žœ6˜¬Mšœ1£œ+˜jMšœžœ   ˜(—š¡ œžœ˜Mšœžœ˜%MšžœIžœ˜aMšžœMžœ˜jMšžœLžœ˜hMšÑkmt£K¤£˜fMšœ˜—š¡œžœžœ˜ Mšœ.˜.MšœA˜Ašžœžœž˜Mšœ5˜5˜ Mšœ)˜)M˜M˜Jšœ7˜7—Mšžœžœ˜——š¡œžœžœ˜$Mšœ.˜.MšœH˜Hšžœžœž˜Mšœ9˜9˜ Mšœ-˜-Mšœ˜Mšœ˜Jšœ<˜<—Mšžœžœ˜M˜——š¡œžœžœ˜#Mšœ.˜.MšœJ˜Jšžœžœž˜Mšœ8˜8˜ Mšœ,˜,Mšœ˜Mšœ˜Jšœ;˜;—Mšžœžœ˜M˜M˜——šÑmntœžœžœ˜"Mšœ.˜.Mšœ £œ£œ˜Cšžœžœž˜Mšœ £œ˜7˜ Mš£œ˜+Mš£ œ ˜Mš£œ ˜Jšœ$£œ˜:—Mšžœžœ˜M˜M˜——š¡œžœžœ˜Mšœžœ˜%MšœžœNžœ˜^Mšœ6žœ˜:Mšžœ<žœžœ˜JMšœ˜JšœZ˜ZJšœ žœ4˜@Jšœc˜cJšžœžœžœ˜JšœZ˜ZJšœ&žœžœ˜2JšœZ˜ZJšœ žœ4˜@JšœU˜UJ˜J˜—Jšœžœ˜!Jšœžœ˜"Jšœžœ˜#Jšœžœ˜$J˜Jšœžœ˜ Jšœžœ˜!Jšœžœ˜"Jšœžœ˜#J˜Mšœ žœ˜6Mšœžœ˜5M˜š¡œž œ˜#Mšœžœ˜%Mšœ žœ˜Mšœ žœ ˜Mšœ žœ9˜HMšœ žœ8˜FMšœžœ˜Mšœžœ˜Mšœžœ˜Mšœžœ˜Mšœžœ˜Mšœžœ˜ Mšœžœ˜Mšœžœ˜M˜MšœžœNžœ˜^Mšžœžœžœ˜!Mšœ˜M˜JšœG˜GM˜Mšœ{˜{Mšœ5˜5M˜Mšœ~˜~Mšœ7˜7M˜Jšœ´˜´J˜Mšœ˜Mšœ7˜7M˜Mšœ‚˜‚Mšœ9˜9J˜Jšœº˜ºJ˜Jšœ®˜®J˜Jšœ®˜®J˜Jšœ­˜­J˜Jšœh˜hJ˜J˜Mšžœžœžœ˜#Mšœ˜J˜J˜Mšœw˜wMšœ4˜4M˜Mšœz˜zMšœ6˜6M˜Jšœ±˜±M˜Mšœ{˜{Mšœ6˜6M˜Mšœ~˜~Mšœ8˜8J˜Jšœ·˜·J˜J˜JšœÇ˜ÇJ˜JšœÅ˜ÅJ˜Jšœô˜ôJ˜J˜Jšœ˜J˜J˜J˜J˜J˜J˜—Jšœžœ ˜0š¡œžœžœ˜&Mšœ<˜šžœžœž˜Mšžœžœ˜3Mšžœžœ˜3Mšžœžœ˜——Mšœ žœ+˜<š¡ œžœ žœžœ˜+Mšœ˜šœ ˜ MšœH˜H—Mšœ=˜=—š¡œžœžœ˜9M™ìM™¡Mš œžœžœžœžœ˜Mšœ žœžœ˜Mšœ žœ˜M˜Mš¡ œžœ˜