<> <> <> <> <> <> DIRECTORY ArpaQueue USING [Create, Dequeue, ElemProc, Enqueue, Enumerate, GetHead, GetProcessableElement, Inhibit, Inhibition, IsEmpty, MQ, Name, NewElem, QElem, Uninhibit, Value], ArpaSMTPControl USING [defaultInhibitTime], ArpaSMTPDescr USING [AddProcessedRecipient, Descr, DescrRep, Destroy, EnumerateRawRecipients, EnumerateRecipientHosts, GetArpaReversePath, GetExpiryDate, GetFormat, GetGvSender, GetPrecedeMsgText, GetUserHandle, HostProc, MoreRecipients, Print, RawRecipProc, RemoveRawRecipients, RemoveRecipientHost, RetrieveMsgStream, SetArpaReversePath, SetGvSender, SetPrecedeMsgText, StoreItemInfo, Unparse, WrongState], ArpaSMTPGVSend USING [totalGvMsgsSent, totalGvBytesSent, Connection, Failed, Open, SendItem], ArpaSMTPQueue USING [], ArpaSMTPSend USING [totalArpaMsgsSent, totalArpaBytesSent, Close, Connection, Failed, Open, SendItem], ArpaSMTPSupport USING [AuthorizationCheck, CheckHeader, Log, NotifySender], ArpaSMTPSyntax USING [EnumerateGVItems, GVItemProc, HostAndUser, ReceiveRName, ReversePath, UnBlessReturnPath], BasicTime USING [GMT, Now, nullGMT, Period, Unpack, Unpacked, Update], Convert USING [RopeFromInt, RopeFromTime], GVBasics USING [oldestTime, RopeFromTimestamp, Timestamp], GVNames USING [GetMembers, MemberInfo], GVProtocol USING [ReceiveTimestamp], IO USING [Close, int, Put, PutChar, PutF, PutFR, PutRope, rope, STREAM, time], Process USING [Detach, SecondsToTicks], Real USING [InlineRound], Rope USING [Cat, Equal, Find, ROPE]; ArpaSMTPQueueImpl: CEDAR MONITOR IMPORTS ArpaQueue, ArpaSMTPControl, ArpaSMTPDescr, ArpaSMTPGVSend, ArpaSMTPSend, ArpaSMTPSupport, ArpaSMTPSyntax, BasicTime, Convert, GVBasics, GVNames, GVProtocol, IO, Real, Rope, Process EXPORTS ArpaSMTPQueue = BEGIN Descr: TYPE = ArpaSMTPDescr.Descr; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; QUEUE: TYPE = ArpaQueue.MQ; <> ItemOnly: TYPE = REF Descr; -- an item for delivery to one or more recipients DeliveryPkg: TYPE = REF DeliveryPkgRep; -- an item ready for delivery to a clump of recipients at the same host DeliveryPkgRep: TYPE = RECORD[descr: Descr, recipList: LIST OF ROPE]; RecipBundle: TYPE = REF RecipBundleRep; -- a queue of delivery pkgs for a single host RecipBundleRep: TYPE = RECORD [ queue: QUEUE, sent, recv: INT _ 0, lastUp: BasicTime.GMT]; <> <> EmptyQueues: QUEUE _ ArpaQueue.Create["Empty"]; -- queue of RecipBundles GVQueue: QUEUE _ ArpaQueue.Create["GV"]; -- queue of RecipBundles ExpressQueue: QUEUE _ ArpaQueue.Create["Ex"]; -- queue of RecipBundles ArpaExpressQueue: QUEUE _ ArpaQueue.Create["ArpaEx"]; -- queue of RecipBundles HostQueues: QUEUE _ ArpaQueue.Create["ARPA"]; -- queue of RecipBundles SickQueues: QUEUE _ ArpaQueue.Create["Sick"]; -- queue of RecipBundles BadItemQueue: QUEUE _ ArpaQueue.Create["Bad"]; -- queue of DeliveryPkgs tempQueue: QUEUE _ ArpaQueue.Create["Temp"]; -- queue of DeliveryPkgs <> gvHostName: ROPE _ "Grapevine"; -- Wired into format of msg saved on disk expressHostName: ROPE _ "Express"; expressMail: LIST OF ROPE; arpaExpressHostName: ROPE _ "ArpaExpress"; arpaExpressMail: LIST OF ROPE; notExpressMail: LIST OF ROPE; ¬ <> expressOK: BOOL _ TRUE; debug: BOOL _ FALSE; ExpressList: PUBLIC PROC RETURNS[list: LIST OF ROPE] = {RETURN[expressMail]}; SetExpressOK: PUBLIC PROC [ok: BOOL] = {expressOK _ ok}; Express: PROC [names: LIST OF ROPE] RETURNS [name: ROPE] = { UNTIL names = NIL DO IF NotExpress[names.first] THEN RETURN[NIL]; FOR list: LIST OF ROPE _ expressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first]; ENDLOOP; names _ names.rest; ENDLOOP; RETURN[NIL]; }; NotExpress: PROC [name: ROPE] RETURNS [BOOLEAN] = { FOR list: LIST OF ROPE _ notExpressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[name, list.first, FALSE] THEN RETURN[TRUE]; ENDLOOP; RETURN[FALSE]; }; ArpaExpress: PROC [names: LIST OF ROPE] RETURNS [name: ROPE] = { UNTIL names = NIL DO FOR list: LIST OF ROPE _ arpaExpressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first]; ENDLOOP; names _ names.rest; ENDLOOP; RETURN[NIL]; }; DLWithoutUpArrow: PUBLIC PROC [dl: ROPE] RETURNS [valid: BOOL] = { FOR list: LIST OF ROPE _ noUpArrowDLs, list.rest UNTIL list = NIL DO IF Rope.Equal[dl, list.first, FALSE] THEN RETURN[TRUE]; ENDLOOP; RETURN[FALSE]; }; GetQueueForNewMessage: INTERNAL PROC [home: QUEUE, name: ROPE] RETURNS [queue: QUEUE] = { Find: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], name, FALSE] THEN { old _ qElem; queue _ msgs.queue; continue _ FALSE; }; }; old: ArpaQueue.QElem; new: RecipBundle; GVQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; ExpressQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; ArpaExpressQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; HostQueues.Enumerate[Find]; IF queue # NIL THEN RETURN; SickQueues.Enumerate[Find]; IF queue # NIL THEN RETURN; <> EmptyQueues.Enumerate[Find]; IF queue # NIL THEN { MoveAndLog[from: EmptyQueues, to: home, qElem: old]; RETURN; }; <> new _ NEW[RecipBundleRep _ [ queue: ArpaQueue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]]; EnqueueAndLog[queue: home, qElem: ArpaQueue.NewElem[new]]; RETURN[new.queue]; }; PrintItem: PUBLIC ENTRY PROC [item: INT, out: STREAM] = { ENABLE UNWIND => NULL; descr: Descr; qElem: ArpaQueue.QElem; [qElem, descr] _ DescrFromHandle[item]; IF descr = NIL THEN {out.PutRope["No item with that handle.\n"]; RETURN}; descr.Print[out: out, form: short]; PrintInibition[qElem, out, NIL]; }; DescrFromHandle: INTERNAL PROC [handle: INT] RETURNS [qe: ArpaQueue.QElem, descr: Descr] = { found: BOOL _ FALSE; Check: ItemProc = { qe _ qElem; descr _ DescrFromElemVal[qElem.Value[]]; IF handle = descr.GetUserHandle[] THEN {found _ TRUE; continue _ FALSE}; }; EnumerateItems[Check]; IF ~found THEN descr _ NIL; }; PrintQueue: PUBLIC ENTRY PROC [name: ROPE, out: STREAM] = { ENABLE UNWIND => NULL; PrintQueueInner[name, out]; }; PrintQueueInner: INTERNAL PROC [name: ROPE, out: STREAM] = { queue: QUEUE; ScanForInfo: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF queue # msgs.queue THEN RETURN [TRUE]; out.PutF[ "\tsent: %G, recv: %G, last up: ", IO.int[msgs.sent], IO.int[msgs.recv]]; IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"] ELSE out.Put[IO.time[msgs.lastUp]]; out.PutRope["\n"]; PrintInibition[qElem, out, "\t"]; RETURN[FALSE]; }; PrintVal: ArpaQueue.ElemProc = { value: REF ANY = qElem.Value[]; SELECT ValTypeOfElem[value] FROM RecipBundleT => { msgs: RecipBundle = NARROW[value]; out.PutF[ "\t%G:\n\t\tsent: %G, recv: %G, last up: ", IO.rope[msgs.queue.Name[]], IO.int[msgs.sent], IO.int[msgs.recv]]; IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"] ELSE out.Put[IO.time[msgs.lastUp]]; out.PutRope["\n"]; PrintInibition[qElem, out, "\t\t"]; msgs.queue.Enumerate[PrintVal]; }; ENDCASE => { out.PutRope["\t\t"]; PrintElemVal[value, out]; out.PutChar['\n]; }; }; IF name = NIL THEN { PrintQueueInner[EmptyQueues.Name[], out]; PrintQueueInner[GVQueue.Name[], out]; PrintQueueInner[ExpressQueue.Name[], out]; PrintQueueInner[ArpaExpressQueue.Name[], out]; PrintQueueInner[HostQueues.Name[], out]; PrintQueueInner[SickQueues.Name[], out]; RETURN; }; queue _ FindQueue[name]; IF queue = NIL THEN { out.PutRope["Unknown queue: "]; out.PutRope[name]; out.PutRope[".\n"]; RETURN; }; out.PutRope[queue.Name[]]; out.PutRope[":\n"]; EmptyQueues.Enumerate[ScanForInfo]; GVQueue.Enumerate[ScanForInfo]; ExpressQueue.Enumerate[ScanForInfo]; ArpaExpressQueue.Enumerate[ScanForInfo]; HostQueues.Enumerate[ScanForInfo]; SickQueues.Enumerate[ScanForInfo]; queue.Enumerate[PrintVal]; }; FindQueue: INTERNAL PROC [name: ROPE] RETURNS [queue: QUEUE _ NIL] = { bundle: RecipBundle; SELECT TRUE FROM Rope.Equal[name, EmptyQueues.Name[], FALSE] => RETURN[EmptyQueues]; Rope.Equal[name, GVQueue.Name[], FALSE] => RETURN[GVQueue]; Rope.Equal[name, ExpressQueue.Name[], FALSE] => RETURN[ExpressQueue]; Rope.Equal[name, ArpaExpressQueue.Name[], FALSE] => RETURN[ArpaExpressQueue]; Rope.Equal[name, HostQueues.Name[], FALSE] => RETURN[HostQueues]; Rope.Equal[name, SickQueues.Name[], FALSE] => RETURN[SickQueues]; Rope.Equal[name, BadItemQueue.Name[], FALSE] => RETURN[BadItemQueue]; Rope.Equal[name, tempQueue.Name[], FALSE] => RETURN[tempQueue]; ENDCASE; bundle _ GetBundle[name, FALSE]; IF bundle = NIL THEN RETURN[NIL]; RETURN[bundle.queue]; }; GetBundle: INTERNAL PROC [name: ROPE, create: BOOL] RETURNS [bundle: RecipBundle _ NIL] = { Find: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], name, FALSE] THEN { bundle _ msgs; continue _ FALSE; }; }; EmptyQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; GVQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; ExpressQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; ArpaExpressQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; HostQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; SickQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; IF ~create THEN RETURN; bundle _ NEW[RecipBundleRep _ [ queue: ArpaQueue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]]; EnqueueAndLog[queue: EmptyQueues, qElem: ArpaQueue.NewElem[bundle]]; }; PrintElemVal: PROC [value: REF ANY, out: STREAM] = { SELECT ValTypeOfElem[value] FROM DeliveryPkgT, RecipBundleT => out.PutRope[Unparse[value]]; ENDCASE => ERROR UserError["Somehow you have tried to print something that isn't a queue."]; }; PrintInibition: PROC [qElem: ArpaQueue.QElem, out: STREAM, prefix: ROPE] = { for: INT; why: ROPE; [for, why] _ qElem.Inhibition[]; IF for > 0 THEN out.PutF[ "%GInhibited for %G:%02G because: %G.\n", IO.rope[prefix], IO.int[for/60], IO.int[for MOD 60], IO.rope[why]]; }; Unparse: PROC [value: REF ANY] RETURNS [ROPE] = { SELECT TRUE FROM ISTYPE[value, ItemOnly] => RETURN[NARROW[value, ItemOnly]^.Unparse[]]; ISTYPE[value, DeliveryPkg] => { pkg: DeliveryPkg = NARROW[value]; r: ROPE _ Rope.Cat[pkg.descr.Unparse[], " for "]; FOR l: LIST OF ROPE _ pkg.recipList, l.rest UNTIL l = NIL DO r _ Rope.Cat[r, IF l = pkg.recipList THEN NIL ELSE ", ", l.first]; ENDLOOP; RETURN[r]; }; ISTYPE[value, RecipBundle] => RETURN[NARROW[value, RecipBundle].queue.Name[]]; ENDCASE => RETURN["(Unparse ENDCASE)"]; }; StartServer: PUBLIC PROC = TRUSTED { CheckLists[]; Process.Detach[FORK SendGVMessages[]]; Process.Detach[FORK SendArpaMessages[]]; Process.Detach[FORK Background[]]; }; newMessageArrived: CONDITION _ [timeout: Process.SecondsToTicks[30]]; StartNewMessage: PUBLIC ENTRY PROC [source: ROPE] = {}; -- Hack to hang on ML before reading message from the net. Otherwise we let duplicate messages through because we don't "accept" them after they get onto the disk. AddNewMessage: PUBLIC ENTRY PROC [descr: Descr, source: ROPE] = { ENABLE UNWIND => NULL; host: RecipBundle _ GetBundle[source, TRUE]; qElem: ArpaQueue.QElem _ ArpaQueue.NewElem[NEW[Descr _ descr]]; value: REF ANY _ qElem.Value[]; CheckLists[]; SELECT descr.GetFormat[] FROM gv => { from: ROPE _ descr.GetGvSender[]; <> <> IF from = NIL THEN { arpaReversePath, gvSender: ROPE; gvID: ROPE; temp: IO.STREAM = descr.RetrieveMsgStream[]; RetrieveReturnTo: ArpaSMTPSyntax.GVItemProc = { IF itemHeader.type = PostMark THEN { timestamp: GVBasics.Timestamp _ GVProtocol.ReceiveTimestamp[itemStream]; gvID _ GVBasics.RopeFromTimestamp[timestamp]; }; IF itemHeader.type = ReturnTo THEN { gvSender _ ArpaSMTPSyntax.ReceiveRName[itemStream]; from _ gvSender; continue _ FALSE; }; }; ArpaSMTPSyntax.EnumerateGVItems[temp, RetrieveReturnTo]; temp.Close[]; ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is ", gvID, "."]; ArpaSMTPDescr.SetGvSender[descr, gvSender]; arpaReversePath _ ArpaSMTPSyntax.ReversePath[gvSender]; descr.SetArpaReversePath[arpaReversePath]; }; ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is from ", from, "."]; IF ~ArpaSMTPSupport.AuthorizationCheck[from] OR ~ArpaSMTPSupport.CheckHeader[from, descr] THEN { BEGIN ENABLE ArpaSMTPDescr.WrongState => CONTINUE; ArpaSMTPDescr.RemoveRawRecipients[descr]; END; ConsiderItemDestruction[descr]; RETURN; }; }; arpa => { arpaReversePath: ROPE; arpaReversePath _ ArpaSMTPDescr.GetArpaReversePath[descr]; arpaReversePath _ ArpaSMTPSyntax.UnBlessReturnPath[arpaReversePath]; IF arpaReversePath = NIL THEN { arpaReversePath _ ArpaSMTPDescr.GetGvSender[descr]; arpaReversePath _ ArpaSMTPSyntax.UnBlessReturnPath[arpaReversePath]; arpaReversePath _ ArpaSMTPSyntax.ReversePath[arpaReversePath]; }; descr.SetArpaReversePath[arpaReversePath]; ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is from ", arpaReversePath, "."]; }; ENDCASE => ERROR; BEGIN ENABLE ArpaSMTPDescr.WrongState => CONTINUE; --already processed redistributed: ROPE; ProcessRawRecipient: ArpaSMTPDescr.RawRecipProc = { descr: Descr = NARROW[procData, ItemOnly]^; hostName, userName: ROPE; [hostName, userName] _ ArpaSMTPSyntax.HostAndUser[rawRecipient]; IF hostName = NIL THEN { hostName _ gvHostName; IF Rope.Find[userName, "^."] # -1 OR DLWithoutUpArrow[userName] THEN { IF redistributed = NIL THEN redistributed _ "Redistributed: " ELSE redistributed _ Rope.Cat[redistributed, ", "]; <<-- Show old name if used>> IF Rope.Find[rawRecipient, "Xerox.ARPA", 0, FALSE] # -1 THEN redistributed _ Rope.Cat[redistributed, rawRecipient] ELSE redistributed _ Rope.Cat[redistributed, userName];}; }; descr.AddProcessedRecipient[hostName, userName, TRUE]; }; descr.EnumerateRawRecipients[ProcessRawRecipient, value]; IF redistributed # NIL THEN descr.SetPrecedeMsgText[Rope.Cat[redistributed, "\n", descr.GetPrecedeMsgText[]]]; END; descr.StoreItemInfo[]; -- update info on disk descr.EnumerateRecipientHosts[QueueForDelivery, value]; ConsiderItemDestruction[descr]; -- Empty, Crashed after sending to last host host.recv _ host.recv + 1; host.lastUp _ BasicTime.Now[]; }; QueueForDelivery: INTERNAL ArpaSMTPDescr.HostProc = { descr: Descr = NARROW[procData, ItemOnly]^; mainQueue, hostQueue: QUEUE; queueName: ROPE; gv: BOOL = Rope.Equal[hostName, gvHostName]; SELECT TRUE FROM ~gv => { queueName _ hostName; mainQueue _ HostQueues; NOTIFY newArpaMessage; }; (queueName _ Express[userNames]) # NIL => { mainQueue _ ExpressQueue}; (queueName _ ArpaExpress[userNames]) # NIL => { mainQueue _ ArpaExpressQueue}; ENDCASE => { queueName _ gvHostName; mainQueue _ GVQueue; NOTIFY newGvMessage; }; hostQueue _ GetQueueForNewMessage[mainQueue, queueName]; EnqueueAndLog[queue: hostQueue, qElem: ArpaQueue.NewElem[NEW[DeliveryPkgRep _ [descr, userNames]]]]; }; newArpaMessage: CONDITION _ [timeout: Process.SecondsToTicks[30]]; GetProcessableHost: ENTRY PROC RETURNS [host: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; DO host _ HostQueues.GetProcessableElement[]; IF host # NIL THEN { MoveHostToEnd[HostQueues, host]; RETURN; }; WAIT newArpaMessage; ENDLOOP; }; SendArpaMessages: PROC = { DO host: ArpaQueue.QElem _ GetProcessableHost[]; recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; IF ProcessArpaHost[host] THEN LOOP; IF host.Inhibition[].for > 0 THEN LOOP; IF hostQueue.IsEmpty[] THEN LOOP; -- Bogus host name MoveToSick[HostQueues, host, "Didn't make any progress"]; ENDLOOP; }; ProcessArpaHost: PROC [host: ArpaQueue.QElem] RETURNS [progress: BOOL] = { hostStream: ArpaSMTPSend.Connection; recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; startTime: BasicTime.GMT _ BasicTime.Now[]; BEGIN -- so hostStream and hostQueue will be in scope for EXITS msg: ArpaQueue.QElem; progress _ FALSE; IF hostQueue.IsEmpty[] THEN GOTO FlushHostQueue; hostStream _ ArpaSMTPSend.Open[hostQueue.Name[] ! ArpaSMTPSend.Failed => { SELECT withItem FROM irrelevant, retryLater => { IF problemWithHost THEN MoveToSick[HostQueues, host, reason]; GOTO Done; }; returnToSender => { -- there must be a problem with this Host (no item was given) <> <> UNTIL hostQueue.IsEmpty[] DO qElem: ArpaQueue.QElem = hostQueue.GetHead[]; pkg: DeliveryPkg = NARROW[qElem.Value[]]; descr: Descr = pkg.descr; IF debug THEN SIGNAL LookAtThis; ArpaSMTPSupport.NotifySender[descr, reason]; FinishedWithMsg[queue: hostQueue, pkg: pkg]; ENDLOOP; IF recipients.recv # 0 THEN GOTO FlushHostQueue; -- Foo@Concord.ms (yetch) FlushThisHost[queue: HostQueues, value: host.Value[]]; GOTO Done; }; ENDCASE => ERROR; }]; -- putOnBadQueue makes no sense for Open UNTIL (msg _ GetProcessableElement[hostQueue]) = NIL DO IF BasicTime.Period[from: startTime, to: BasicTime.Now[]] > 5*60 THEN { ArpaSMTPSupport.Log[important, "Already spent more than 5 minutes sending mail to " , Unparse[host.Value[]], "."]; EXIT; }; IF ~SendArpaMsg[hostStream, host, msg] THEN { hostStream.Close[TRUE]; RETURN; }; recipients.sent _ recipients.sent + 1; recipients.lastUp _ BasicTime.Now[]; progress _ TRUE; ENDLOOP; GOTO FlushHostQueue; EXITS FlushHostQueue => { IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: HostQueues, host: host]; }; Done => NULL; END; IF hostStream # NIL THEN hostStream.Close[FALSE]; }; LookAtThis: SIGNAL = CODE; SendArpaMsg: PROC [stream: ArpaSMTPSend.Connection, host, msg: ArpaQueue.QElem] RETURNS [continue: BOOL _ TRUE] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; pkg: DeliveryPkg = NARROW[msg.Value[]]; descr: Descr = pkg.descr; BEGIN -- so the above vars will be in scope for EXITS ArpaSMTPSend.SendItem[descr, pkg.recipList, stream ! ArpaSMTPSend.Failed => { IF problemWithHost THEN { MoveToSick[HostQueues, host, reason]; continue _ FALSE; };-- stop the enumeration of SMTP items (stream may be aborted) SELECT withItem FROM retryLater => { IF ~problemWithHost THEN msg.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason]; GOTO Done; }; returnToSender => { IF debug THEN SIGNAL LookAtThis; ArpaSMTPSupport.NotifySender[descr, reason]; GOTO RemoveItem; }; putOnBadQueue => { MoveToBad[queue: hostQueue, msg: msg]; GOTO Done; }; ENDCASE => ERROR; -- shouldn't be irrelevant }]; GOTO RemoveItem; EXITS RemoveItem => { FinishedWithMsg[queue: hostQueue, pkg: pkg]; }; Done => NULL; END; }; newGvMessage: CONDITION _ [timeout: Process.SecondsToTicks[30]]; GetGvClump: ENTRY PROC RETURNS [host: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; DO host _ GVQueue.GetProcessableElement[]; IF host # NIL THEN RETURN; WAIT newGvMessage; ENDLOOP; }; SendGVMessages: PROC = { DO host: ArpaQueue.QElem _ GetGvClump[]; IF ProcessGVQueue[host] THEN LOOP; IF host.Inhibition[].for > 0 THEN LOOP; MoveToSick[GVQueue, host, "Didn't make any progress"]; ENDLOOP; }; gvHandle: ArpaSMTPGVSend.Connection _ ArpaSMTPGVSend.Open[]; -- Only need one ProcessGVQueue: PROC [host: ArpaQueue.QElem] RETURNS [progress: BOOL] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; msg: ArpaQueue.QElem; progress _ FALSE; UNTIL (msg _ GetProcessableElement[hostQueue]) = NIL DO IF ~SendGVMsg[host, msg] THEN RETURN; recipients.sent _ recipients.sent + 1; recipients.lastUp _ BasicTime.Now[]; progress _ TRUE; ENDLOOP; IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: GVQueue, host: host]; }; SendGVMsg: PROC [host, msg: ArpaQueue.QElem] RETURNS [progress: BOOL _ TRUE] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; pkg: DeliveryPkg = NARROW[msg.Value[]]; descr: Descr = pkg.descr; BEGIN -- so the above vars will be in scope for EXITS ArpaSMTPGVSend.SendItem[descr, pkg.recipList, gvHandle ! ArpaSMTPGVSend.Failed => { IF problemWithGV THEN { MoveToSick[GVQueue, host, reason]; progress _ FALSE; }; -- stop the enumeration of GV item SELECT withItem FROM retryLater => { IF ~problemWithGV THEN msg.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason]; GOTO Done; }; returnToSender => { IF debug THEN SIGNAL LookAtThis; ArpaSMTPSupport.NotifySender[descr, reason]; GOTO RemoveItem; }; putOnBadQueue => { MoveToBad[queue: hostQueue, msg: msg]; GOTO Done; }; ENDCASE => ERROR; -- shouldn't be "irrelevant" }]; GOTO RemoveItem; EXITS RemoveItem => { FinishedWithMsg[queue: hostQueue, pkg: pkg]; }; Done => NULL; END; }; Background: ENTRY PROC = { ENABLE UNWIND => NULL; now: BasicTime.GMT _ BasicTime.Now[]; snooz: CONDITION _ [timeout: Process.SecondsToTicks[1*60]]; DO WAIT snooz; CheckLists[]; MaybePrintStats[]; ConsiderExpressMail[]; ConsiderOldMessages[]; ConsiderSickHosts[]; ENDLOOP; }; <> timeExpressLastChecked, timeArpaExpressLastChecked, timeNotExpressLastChecked, BasicTime.Update[BasicTime.Now[], - checkInterval]; expressStamp, arpaExpressStamp, notExpressStamp, ¬ checkInterval: INT = 2*60*60; -- 2 hours CheckLists: PROC = { now: BasicTime.GMT _ BasicTime.Now[]; IF BasicTime.Period[from: timeExpressLastChecked, to: now] > checkInterval THEN GetExpressList[]; IF BasicTime.Period[from: timeArpaExpressLastChecked, to: now] > checkInterval THEN GetArpaExpressList[]; IF BasicTime.Period[from: timeNotExpressLastChecked, to: now] > checkInterval THEN GetNotExpressList[]; }; GetExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo _ GVNames.GetMembers["ExpressMail^.MS", expressStamp]; WITH m: memberInfo SELECT FROM noChange => timeExpressLastChecked _ BasicTime.Now[]; group => { timeExpressLastChecked _ BasicTime.Now[]; expressMail _ m.members; expressStamp _ m.stamp; ArpaSMTPSupport.Log[important, "Updated Express List."]; }; ENDCASE => NULL; }; GetArpaExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers["ArpaExpressMail^.MS", arpaExpressStamp]; WITH m: memberInfo SELECT FROM noChange => timeArpaExpressLastChecked _ BasicTime.Now[]; group => { timeArpaExpressLastChecked _ BasicTime.Now[]; arpaExpressMail _ m.members; arpaExpressStamp _ m.stamp; ArpaSMTPSupport.Log[important, "Updated Arpa Express List."]; }; ENDCASE => NULL; }; GetNotExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers["NotArpaExpressMail^.MS", notExpressStamp]; WITH m: memberInfo SELECT FROM noChange => timeNotExpressLastChecked _ BasicTime.Now[]; group => { timeNotExpressLastChecked _ BasicTime.Now[]; notExpressMail _ m.members; notExpressStamp _ m.stamp; ArpaSMTPSupport.Log[important, "Updated Not Express List."]; }; ENDCASE => NULL; }; memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers[" WITH m: memberInfo SELECT FROM noChange => group => { ¬ ¬ ArpaSMTPSupport.Log[important, "Updated ENDCASE => NULL; }; totalGvMsgsSentLastHour: INT _ 0; totalGvBytesSentLastHour: INT _ 0; totalArpaMsgsSentLastHour: INT _ 0; totalArpaBytesSentLastHour: INT _ 0; totalGvMsgsSentLastDay: INT _ 0; totalGvBytesSentLastDay: INT _ 0; totalArpaMsgsSentLastDay: INT _ 0; totalArpaBytesSentLastDay: INT _ 0; hourStatsLastChecked: BasicTime.GMT _ BasicTime.Now[]; dayStatsLastChecked: BasicTime.GMT _ BasicTime.Now[]; MaybePrintStats: INTERNAL PROC = { now: BasicTime.GMT _ BasicTime.Now[]; hourSecs: INT = 3600; daySecs: INT = 86400; hourPeriod: INT _ BasicTime.Period[from: hourStatsLastChecked, to: now]; dayPeriod: INT _ BasicTime.Period[from: dayStatsLastChecked, to: now]; totalGvMsgsSentInHour: INT_0; totalArpaMsgsSentInHour: INT_0; totalGvMsgsSentInDay: INT_0; totalArpaMsgsSentInDay: INT_0; totalGvBytesSentInHour: INT_0; totalArpaBytesSentInHour: INT_0; totalGvBytesSentInDay: INT_0; totalArpaBytesSentInDay: INT_0; date: ROPE _ Convert.RopeFromTime[from: now, start: years, end: days, includeDayOfWeek: TRUE]; IF hourPeriod < 3600 THEN RETURN; hourStatsLastChecked _ now; ArpaSMTPSupport.Log[important, "Date: ", date, ", Hourly Statistics: "]; totalGvMsgsSentInHour _ Real.InlineRound[((ArpaSMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastHour)*1.0*hourSecs)/hourPeriod]; totalGvMsgsSentLastHour _ ArpaSMTPGVSend.totalGvMsgsSent; totalGvBytesSentInHour _ Real.InlineRound[((ArpaSMTPGVSend.totalGvBytesSent - totalGvBytesSentLastHour)*1.0*hourSecs)/hourPeriod]; totalGvBytesSentLastHour _ ArpaSMTPGVSend.totalGvBytesSent; ArpaSMTPSupport.Log[important, " Hourly GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalGvBytesSentInHour], " bytes."]; totalArpaMsgsSentInHour _ Real.InlineRound[((ArpaSMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastHour)*1.0*hourSecs)/hourPeriod]; totalArpaMsgsSentLastHour _ ArpaSMTPSend.totalArpaMsgsSent; totalArpaBytesSentInHour _ Real.InlineRound[((ArpaSMTPSend.totalArpaBytesSent - totalArpaBytesSentLastHour)*1.0*hourSecs)/hourPeriod]; totalArpaBytesSentLastHour _ ArpaSMTPSend.totalArpaBytesSent; ArpaSMTPSupport.Log[important, " Hourly Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInHour], " bytes."]; ArpaSMTPSupport.Log[important, " Arpa queue hosts ", Convert.RopeFromInt[CountQueueInternal["Arpa"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Arpa"]]]; ArpaSMTPSupport.Log[important, " Sick queue hosts ", Convert.RopeFromInt[CountQueueInternal["Sick"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Sick"]]]; ArpaSMTPSupport.Log[important, " Express queue hosts ", Convert.RopeFromInt[CountQueueInternal["Ex"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Ex"]]]; ArpaSMTPSupport.Log[important, " GV queue messages ", Convert.RopeFromInt[CountMessagesInternal["GV"]]]; IF dayPeriod < daySecs THEN RETURN; dayStatsLastChecked _ now; totalGvMsgsSentInDay _ Real.InlineRound[((ArpaSMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastDay)*1.0*daySecs)/dayPeriod]; totalGvMsgsSentLastDay _ ArpaSMTPGVSend.totalGvMsgsSent; totalGvBytesSentInDay _ Real.InlineRound[((ArpaSMTPGVSend.totalGvBytesSent - totalGvBytesSentLastDay)*1.0*daySecs)/dayPeriod]; totalGvBytesSentLastDay _ ArpaSMTPGVSend.totalGvBytesSent; ArpaSMTPSupport.Log[important, " Daily GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalGvBytesSentInDay], " bytes."]; totalArpaMsgsSentInDay _ Real.InlineRound[((ArpaSMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastDay)*1.0*daySecs)/dayPeriod]; totalArpaMsgsSentLastDay _ ArpaSMTPSend.totalArpaMsgsSent; totalArpaBytesSentInDay _ Real.InlineRound[((ArpaSMTPSend.totalArpaBytesSent - totalArpaBytesSentLastDay)*1.0*daySecs)/dayPeriod]; totalArpaBytesSentLastDay _ ArpaSMTPSend.totalArpaBytesSent; ArpaSMTPSupport.Log[important, " Daily Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInDay], " bytes."]; ArpaSMTPSupport.Log[important, " Total Arpa bound since reboot: ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaBytesSent], " bytes."]; ArpaSMTPSupport.Log[important, " Total GV bound since reboot: ", Convert.RopeFromInt[ArpaSMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPGVSend.totalGvBytesSent], " bytes."]; ArpaSMTPSupport.Log[important, " Totals since reboot: ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaMsgsSent+ArpaSMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaBytesSent+ArpaSMTPGVSend.totalGvBytesSent], " bytes."]; }; expressDelay: INT _ 1; -- 1 minute default delay ConsiderExpressMail: INTERNAL PROC = { now: BasicTime.Unpacked _ BasicTime.Unpack[BasicTime.Now[]]; host: ArpaQueue.QElem; recipients: RecipBundle; hostQueue, from, gv: QUEUE; msg: ArpaQueue.QElem; minutes: INT; IF ~expressOK THEN RETURN; IF ~GVQueue.IsEmpty[] THEN RETURN; IF now.weekday IN [Monday..Friday] AND now.hour IN [8..12+8) THEN RETURN; <> host _ ExpressQueue.GetProcessableElement[]; IF host # NIL THEN { from _ ExpressQueue; MoveHostToEnd[ExpressQueue, host]; } ELSE { host _ ArpaExpressQueue.GetProcessableElement[]; IF host = NIL THEN RETURN; from _ ArpaExpressQueue; MoveHostToEnd[ArpaExpressQueue, host]; }; recipients _ NARROW[host.Value[]]; hostQueue _ recipients.queue; msg _ hostQueue.GetProcessableElement[]; IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: from, host: host]; IF msg = NIL THEN { UnlockedMoveToEmpty[from: from, host: host]; RETURN; }; gv _ GetQueueForNewMessage[GVQueue, gvHostName]; MoveAndLog[from: hostQueue, to: gv, qElem: msg]; IF hostQueue.IsEmpty[] THEN UnlockedMoveToEmpty[from: from, host: host]; NOTIFY newGvMessage; <> SELECT now.hour FROM 12+8 => minutes _ expressDelay + 4; -- 12 messages first hour 12+9 => minutes _ expressDelay + 3; -- 15 messages next hour 12+10 => minutes _ expressDelay + 2; -- 20 messages next hour ENDCASE => minutes _ expressDelay+1; -- 30 messages/hour => 197 msgs in 8 hrs IF ~(now.weekday IN[Monday..Friday]) THEN minutes _ expressDelay; DallyAWhile[minutes]; }; -- (At least 1) Extra minute in main loop DallyAWhile: INTERNAL PROC [minutes: INT] = { snooz: CONDITION _ [timeout: Process.SecondsToTicks[minutes*60]]; WAIT snooz; }; busySwitch: INT _ 10; ConsiderSickHosts: INTERNAL PROC = { host: ArpaQueue.QElem _ SickQueues.GetProcessableElement[]; IF CountQueueInternal["ARPA"] > busySwitch THEN RETURN; IF host # NIL THEN { recipients: RecipBundle _ NARROW[host.Value[]]; gv: BOOL _ Rope.Equal[recipients.queue.Name[], gvHostName]; target: QUEUE _ IF gv THEN GVQueue ELSE HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; }; MoveAllSickHosts: PUBLIC ENTRY PROC = { ENABLE UNWIND => NULL; Uninhibit: ArpaQueue.ElemProc = { ArpaQueue.Uninhibit[qElem, SickQueues]; }; host: ArpaQueue.QElem; SickQueues.Enumerate[Uninhibit]; host _ SickQueues.GetProcessableElement[]; WHILE host # NIL DO MoveSickHostQueue[host]; host _ SickQueues.GetProcessableElement[]; ENDLOOP; }; MoveSickHostQueue: INTERNAL PROC[host: ArpaQueue.QElem] = { IF host # NIL THEN { recipients: RecipBundle _ NARROW[host.Value[]]; gv: BOOL _ Rope.Equal[recipients.queue.Name[], gvHostName]; target: QUEUE _ IF gv THEN GVQueue ELSE HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; }; FindSickHost: INTERNAL PROC[host: Rope.ROPE] RETURNS[element: ArpaQueue.QElem_NIL]= { Find: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], host, FALSE] THEN { ArpaQueue.Uninhibit[qElem, SickQueues]; element _ qElem; continue _ FALSE; }; }; IF host # NIL THEN SickQueues.Enumerate[Find]; }; MoveSickHost: PUBLIC ENTRY PROC[name: Rope.ROPE_ NIL] = { ENABLE UNWIND => NULL; host: ArpaQueue.QElem _ FindSickHost[name]; IF host # NIL THEN { target: QUEUE _ HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; NOTIFY newArpaMessage; }; }; CountQueue: PUBLIC ENTRY PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { ENABLE UNWIND => NULL; n _ CountQueueInternal[name]; }; CountQueueInternal: INTERNAL PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { Count: ArpaQueue.ElemProc = { n _ n+1}; IF name = NIL THEN RETURN[-1]; FindQueue[name].Enumerate[Count]; }; CountMessages: PUBLIC ENTRY PROC[queueName: Rope.ROPE_NIL] RETURNS[n: INT_0]= { ENABLE UNWIND => NULL; n _ CountMessagesInternal[queueName]; }; CountMessagesInternal: INTERNAL PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { Count: ArpaQueue.ElemProc = { n _ n+1}; CountNested: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; msgs.queue.Enumerate[Count]; }; IF name = NIL THEN RETURN[-1]; SELECT TRUE FROM Rope.Equal[name, HostQueues.Name[], FALSE] => HostQueues.Enumerate[CountNested]; Rope.Equal[name, ExpressQueue.Name[], FALSE] => ExpressQueue.Enumerate[CountNested]; Rope.Equal[name, SickQueues.Name[], FALSE] => SickQueues.Enumerate[CountNested]; Rope.Equal[name, GVQueue.Name[], FALSE] => GVQueue.Enumerate[CountNested]; ENDCASE => n _ CountQueueInternal[name]; }; ConsiderOldMessages: INTERNAL PROC = { now: BasicTime.GMT _ BasicTime.Now[]; host: ArpaQueue.QElem _ SickQueues.GetProcessableElement[]; ConsiderExpiration: INTERNAL ItemProc = { value: REF ANY = qElem.Value[]; pkg: DeliveryPkg = NARROW[qElem.Value[]]; expiryDate: BasicTime.GMT _ ArpaSMTPDescr.GetExpiryDate[pkg.descr]; IF BasicTime.Period[from: now, to: expiryDate] < 0 THEN { reason: ROPE _ "Unable to deliver msg to "; FOR loosers: LIST OF ROPE _ pkg.recipList, loosers.rest UNTIL loosers = NIL DO reason _ Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first]; ENDLOOP; reason _ Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."]; MoveAndLog[from: queue, to: tempQueue, qElem: qElem]; SendItBack[queue: queue, qElem: qElem]; RETURN[FALSE]; }; }; IF ~GVQueue.IsEmpty[] THEN RETURN; IF ~HostQueues.IsEmpty[] THEN RETURN; EnumerateSickPkgs[ConsiderExpiration]; }; SendItBack: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = { pkg: DeliveryPkg = NARROW[qElem.Value[]]; reason: ROPE _ "Unable to deliver msg to "; FOR loosers: LIST OF ROPE _ pkg.recipList, loosers.rest UNTIL loosers = NIL DO reason _ Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first]; ENDLOOP; reason _ Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."]; ArpaSMTPSupport.NotifySender[pkg.descr, reason]; MoveAndLog[from: tempQueue, to: queue, qElem: qElem]; InternalFinishedWithMsg[queue: queue, pkg: pkg]; }; <> UserError: PUBLIC ERROR [reason: ROPE] = CODE; Failed: ERROR [why: ErrorCode, reason: ROPE] = CODE; ErrorCode: TYPE = {lockNotObtained, notOnQueue}; DescrFromElemVal: PROC [elemVal: REF ANY] RETURNS [Descr] = { SELECT ValTypeOfElem[elemVal] FROM ItemOnlyT => RETURN[NARROW[elemVal, ItemOnly]^]; DeliveryPkgT => RETURN[NARROW[elemVal, DeliveryPkg].descr]; ENDCASE => ERROR; }; ItemProc: TYPE = PROC [queue: QUEUE, qElem: ArpaQueue.QElem, procData: REF ANY _ NIL] RETURNS [continue: BOOL _ TRUE]; EnumerateSickPkgs: INTERNAL PROC [proc: ItemProc, data: REF ANY _ NIL] = { continueOverall: BOOL; queue: QUEUE; ScanQueue: ArpaQueue.ElemProc = { continueOverall _ proc[queue, qElem, data]; RETURN[continueOverall]; }; ScanHostQueue: ArpaQueue.ElemProc = { queue _ NARROW[qElem.Value[], RecipBundle].queue; queue.Enumerate[ScanQueue]; RETURN[continueOverall]; }; SickQueues.Enumerate[ScanHostQueue]; }; EnumerateItems: INTERNAL PROC [proc: ItemProc, data: REF ANY _ NIL] = { continueOverall: BOOL; queue: QUEUE; ScanQueue: ArpaQueue.ElemProc = { continueOverall _ proc[queue, qElem, data]; RETURN[continueOverall]; }; ScanHostQueue: ArpaQueue.ElemProc = { queue _ NARROW[qElem.Value[], RecipBundle].queue; queue.Enumerate[ScanQueue]; RETURN[continueOverall]; }; IF ~continueOverall THEN RETURN; GVQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; HostQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; SickQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; queue _ tempQueue; tempQueue.Enumerate[ScanQueue, data]; queue _ BadItemQueue; BadItemQueue.Enumerate[ScanQueue, data]; }; QueueProc: TYPE = PROC [queue: QUEUE, procData: REF ANY _ NIL] RETURNS [continue: BOOL _ TRUE]; EnumerateQueues: INTERNAL PROC [proc: QueueProc, data: REF ANY _ NIL] = { continueOverall: BOOL _ TRUE; ScanHostQueue: ArpaQueue.ElemProc = { queue: QUEUE _ NARROW[qElem.Value[], RecipBundle].queue; continueOverall _ proc[queue, data]; RETURN[continueOverall]; }; GVQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ArpaExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; HostQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; SickQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; IF ~proc[tempQueue, data] THEN RETURN; IF ~proc[BadItemQueue, data] THEN RETURN; }; GetProcessableElement: ENTRY PROC [queue: QUEUE] RETURNS [qElem: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; qElem _ queue.GetProcessableElement[]; IF qElem = NIL THEN RETURN; queue.Dequeue[qElem.Value[]]; queue.Enqueue[qElem]; }; MoveHostToEnd: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = { queue.Dequeue[qElem.Value[]]; queue.Enqueue[qElem]; }; MoveToEmpty: ENTRY PROC [from: QUEUE, host: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; MoveAndLog[from: from, to: EmptyQueues, qElem: host]; }; UnlockedMoveToEmpty: INTERNAL PROC [from: QUEUE, host: ArpaQueue.QElem] = { MoveAndLog[from: from, to: EmptyQueues, qElem: host]; }; MoveToSick: ENTRY PROC [from: QUEUE, host: ArpaQueue.QElem, reason: ROPE] = { ENABLE UNWIND => NULL; host.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason]; MoveAndLog[from: from, to: SickQueues, qElem: host]; }; MoveToBad: ENTRY PROC [queue: QUEUE, msg: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; DequeueAndLog[queue: queue, value: msg.Value[]]; EnqueueAndLog[queue: BadItemQueue, qElem: msg]; }; FlushThisHost: ENTRY PROC [queue: QUEUE, value: REF ANY] = { ENABLE UNWIND => NULL; DequeueAndLog[queue: queue, value: value]; }; FinishedWithMsg: ENTRY PROC [queue: QUEUE, pkg: DeliveryPkg] = { ENABLE UNWIND => NULL; InternalFinishedWithMsg[queue, pkg]; }; InternalFinishedWithMsg: INTERNAL PROC [queue: QUEUE, pkg: DeliveryPkg] = { pkg.descr.RemoveRecipientHost[queue.Name[]]; DequeueAndLog[queue: queue, value: pkg]; ConsiderItemDestruction[descr: pkg.descr]; }; LockedMoveAndLog: ENTRY PROC [from, to: QUEUE, qElem: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; MoveAndLog[from: from, to: to, qElem: qElem]; }; MoveAndLog: INTERNAL PROC [from, to: QUEUE, qElem: ArpaQueue.QElem] = { from.Dequeue[qElem.Value[]]; to.Enqueue[qElem]; ArpaSMTPSupport.Log[verbose, Unparse[qElem.Value[]], " moved from ", from.Name[], " to ", to.Name[], "."]; }; EnqueueAndLog: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = { <> queue.Enqueue[qElem]; ArpaSMTPSupport.Log[verbose, Unparse[qElem.Value[]], " added to ", queue.Name[], "."]; }; DequeueAndLog: INTERNAL PROC [queue: QUEUE, value: REF ANY] = { val: ROPE = Unparse[value]; queue.Dequeue[value]; ArpaSMTPSupport.Log[verbose, val, " removed from ", queue.Name[], "."]; }; ValTypeOfQueue: PROC [queue: QUEUE] RETURNS [ElemValType] = { RETURN[SELECT queue FROM tempQueue, BadItemQueue => DeliveryPkgT, EmptyQueues, GVQueue, ExpressQueue, HostQueues, SickQueues => RecipBundleT, ENDCASE => DeliveryPkgT ]; }; ValTypeOfElem: PROC [value: REF ANY] RETURNS [ElemValType] = { SELECT TRUE FROM ISTYPE[value, DeliveryPkg] => RETURN[DeliveryPkgT]; ISTYPE[value, RecipBundle] => RETURN[RecipBundleT]; ENDCASE => ERROR; }; ElemValType: TYPE = {ItemOnlyT, DeliveryPkgT, RecipBundleT}; ValTypeNames: ARRAY ElemValType OF ROPE = [ "Somebody is confused", Rope.Cat[ "DeliveryPkg (from ", ", an Host queue, or ", BadItemQueue.Name[], ")"], Rope.Cat["Host queue (from Empty, GV, Ex, ARPA, or Sick)"] ]; ConsiderItemDestruction: INTERNAL PROC [descr: Descr] = { <> <> queues: LIST OF QUEUE _ NIL; itemPresent: BOOL _ FALSE; onQueue: QUEUE; CollectQueues: QueueProc = {queues _ CONS[queue, queues]; }; EnumerateQueues[CollectQueues]; [itemPresent, onQueue] _ ItemIsOn[descr.GetUserHandle[], queues]; IF itemPresent THEN { IF ~descr.MoreRecipients[] AND onQueue # BadItemQueue THEN -- queue corruption ArpaSMTPSupport.Log[important, "There are no more recipients in the item descriptor for item", descr.Unparse[], ", but there is a reference to it on ", onQueue.Name[], ".\nThis should not have occurred; the demons should discard it."]; } ELSE { -- item not present on any queues IF descr.MoreRecipients[] THEN { qElem: ArpaQueue.QElem = ArpaQueue.NewElem[NEW[DeliveryPkgRep _ [descr: descr, recipList: NIL]]]; EnqueueAndLog[queue: BadItemQueue, qElem: qElem]; ArpaSMTPSupport.Log[ATTENTION, "There are more recipients in the item descriptor for item ", descr.Unparse[], ", though there is no reference to it on any queue.\n", "This should not have occurred. Have placed a deliveryPkg on ", BadItemQueue.Name[], "."]; } ELSE { descr.Destroy[]; }; }; }; ItemIsOn: INTERNAL PROC [handle: INT, queueList: LIST OF QUEUE] RETURNS [itemPresent: BOOL, onQueue: QUEUE] = { itemPresent _ FALSE; FOR qList: LIST OF QUEUE _ queueList, qList.rest UNTIL qList = NIL DO onQueue _ qList.first; IF RetrieveElemVal[onQueue, handle, FALSE] # NIL THEN { itemPresent _ TRUE; EXIT; }; ENDLOOP; }; RetrieveElemVal: PUBLIC PROC [queue: QUEUE, handle: INT, errorIfNotFound: BOOL _ TRUE] RETURNS [value: REF ANY _ NIL] = { <> SELECT ValTypeOfQueue[queue] FROM DeliveryPkgT => { Find: ArpaQueue.ElemProc = { IF handle = ArpaSMTPDescr.GetUserHandle[NARROW[qElem.Value[], DeliveryPkg].descr] THEN {value _ qElem.Value[]; continue _ FALSE}; }; queue.Enumerate[Find]; }; RecipBundleT => ERROR UserError[IO.PutFR["You have asked the queue module to retrieve item #%g from HostQueues, which doesn't directly contain items (possibly a code bug). Use HostQueue.", IO.int[handle]]]; ENDCASE => ERROR; IF value = NIL AND errorIfNotFound THEN { reason: ROPE = IO.PutFR[ "Msg #%g not found on %g.", IO.int[handle], IO.rope[queue.Name[]]]; ERROR UserError[reason]; }; }; -- end RetrieveElemVal END.