DIRECTORY ArpaQueue USING [Create, Dequeue, ElemProc, Enqueue, Enumerate, GetHead, GetProcessableElement, Inhibit, Inhibition, IsEmpty, MQ, Name, NewElem, QElem, Uninhibit, Value], ArpaSMTPControl USING [defaultInhibitTime], ArpaSMTPDescr USING [AddProcessedRecipient, Descr, DescrRep, Destroy, EnumerateRawRecipients, EnumerateRecipientHosts, GetArpaReversePath, GetExpiryDate, GetFormat, GetGvSender, GetPrecedeMsgText, GetUserHandle, HostProc, MoreRecipients, Print, RawRecipProc, RemoveRawRecipients, RemoveRecipientHost, RetrieveMsgStream, SetArpaReversePath, SetGvSender, SetPrecedeMsgText, StoreItemInfo, Unparse, WrongState], ArpaSMTPGVSend USING [totalGvMsgsSent, totalGvBytesSent, Connection, Failed, Open, SendItem], ArpaSMTPQueue USING [], ArpaSMTPSend USING [totalArpaMsgsSent, totalArpaBytesSent, Close, Connection, Failed, Open, SendItem], ArpaSMTPSupport USING [AuthorizationCheck, CheckHeader, Log, NotifySender], ArpaSMTPSyntax USING [EnumerateGVItems, GVItemProc, HostAndUser, ReceiveRName, ReversePath, UnBlessReturnPath], BasicTime USING [GMT, Now, nullGMT, Period, Unpack, Unpacked, Update], Convert USING [RopeFromInt, RopeFromTime], GVBasics USING [oldestTime, RopeFromTimestamp, Timestamp], GVNames USING [GetMembers, MemberInfo], GVProtocol USING [ReceiveTimestamp], IO USING [Close, int, Put, PutChar, PutF, PutFR, PutRope, rope, STREAM, time], Process USING [Detach, SecondsToTicks], Real USING [InlineRound], Rope USING [Cat, Equal, Find, ROPE]; ArpaSMTPQueueImpl: CEDAR MONITOR IMPORTS ArpaQueue, ArpaSMTPControl, ArpaSMTPDescr, ArpaSMTPGVSend, ArpaSMTPSend, ArpaSMTPSupport, ArpaSMTPSyntax, BasicTime, Convert, GVBasics, GVNames, GVProtocol, IO, Real, Rope, Process EXPORTS ArpaSMTPQueue = BEGIN Descr: TYPE = ArpaSMTPDescr.Descr; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; QUEUE: TYPE = ArpaQueue.MQ; ItemOnly: TYPE = REF Descr; -- an item for delivery to one or more recipients DeliveryPkg: TYPE = REF DeliveryPkgRep; -- an item ready for delivery to a clump of recipients at the same host DeliveryPkgRep: TYPE = RECORD[descr: Descr, recipList: LIST OF ROPE]; RecipBundle: TYPE = REF RecipBundleRep; -- a queue of delivery pkgs for a single host RecipBundleRep: TYPE = RECORD [ queue: QUEUE, sent, recv: INT _ 0, lastUp: BasicTime.GMT]; EmptyQueues: QUEUE _ ArpaQueue.Create["Empty"]; -- queue of RecipBundles GVQueue: QUEUE _ ArpaQueue.Create["GV"]; -- queue of RecipBundles ExpressQueue: QUEUE _ ArpaQueue.Create["Ex"]; -- queue of RecipBundles ArpaExpressQueue: QUEUE _ ArpaQueue.Create["ArpaEx"]; -- queue of RecipBundles HostQueues: QUEUE _ ArpaQueue.Create["ARPA"]; -- queue of RecipBundles SickQueues: QUEUE _ ArpaQueue.Create["Sick"]; -- queue of RecipBundles BadItemQueue: QUEUE _ ArpaQueue.Create["Bad"]; -- queue of DeliveryPkgs tempQueue: QUEUE _ ArpaQueue.Create["Temp"]; -- queue of DeliveryPkgs gvHostName: ROPE _ "Grapevine"; -- Wired into format of msg saved on disk expressHostName: ROPE _ "Express"; expressMail: LIST OF ROPE; arpaExpressHostName: ROPE _ "ArpaExpress"; arpaExpressMail: LIST OF ROPE; notExpressMail: LIST OF ROPE; noUpArrowDLs: PUBLIC LIST OF ROPE; expressOK: BOOL _ TRUE; debug: BOOL _ FALSE; ExpressList: PUBLIC PROC RETURNS[list: LIST OF ROPE] = {RETURN[expressMail]}; SetExpressOK: PUBLIC PROC [ok: BOOL] = {expressOK _ ok}; Express: PROC [names: LIST OF ROPE] RETURNS [name: ROPE] = { UNTIL names = NIL DO IF NotExpress[names.first] THEN RETURN[NIL]; FOR list: LIST OF ROPE _ expressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first]; ENDLOOP; names _ names.rest; ENDLOOP; RETURN[NIL]; }; NotExpress: PROC [name: ROPE] RETURNS [BOOLEAN] = { FOR list: LIST OF ROPE _ notExpressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[name, list.first, FALSE] THEN RETURN[TRUE]; ENDLOOP; RETURN[FALSE]; }; ArpaExpress: PROC [names: LIST OF ROPE] RETURNS [name: ROPE] = { UNTIL names = NIL DO FOR list: LIST OF ROPE _ arpaExpressMail, list.rest UNTIL list = NIL DO IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first]; ENDLOOP; names _ names.rest; ENDLOOP; RETURN[NIL]; }; DLWithoutUpArrow: PUBLIC PROC [dl: ROPE] RETURNS [valid: BOOL] = { FOR list: LIST OF ROPE _ noUpArrowDLs, list.rest UNTIL list = NIL DO IF Rope.Equal[dl, list.first, FALSE] THEN RETURN[TRUE]; ENDLOOP; RETURN[FALSE]; }; GetQueueForNewMessage: INTERNAL PROC [home: QUEUE, name: ROPE] RETURNS [queue: QUEUE] = { Find: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], name, FALSE] THEN { old _ qElem; queue _ msgs.queue; continue _ FALSE; }; }; old: ArpaQueue.QElem; new: RecipBundle; GVQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; ExpressQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; ArpaExpressQueue.Enumerate[Find]; IF queue # NIL THEN RETURN; HostQueues.Enumerate[Find]; IF queue # NIL THEN RETURN; SickQueues.Enumerate[Find]; IF queue # NIL THEN RETURN; EmptyQueues.Enumerate[Find]; IF queue # NIL THEN { MoveAndLog[from: EmptyQueues, to: home, qElem: old]; RETURN; }; new _ NEW[RecipBundleRep _ [ queue: ArpaQueue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]]; EnqueueAndLog[queue: home, qElem: ArpaQueue.NewElem[new]]; RETURN[new.queue]; }; PrintItem: PUBLIC ENTRY PROC [item: INT, out: STREAM] = { ENABLE UNWIND => NULL; descr: Descr; qElem: ArpaQueue.QElem; [qElem, descr] _ DescrFromHandle[item]; IF descr = NIL THEN {out.PutRope["No item with that handle.\n"]; RETURN}; descr.Print[out: out, form: short]; PrintInibition[qElem, out, NIL]; }; DescrFromHandle: INTERNAL PROC [handle: INT] RETURNS [qe: ArpaQueue.QElem, descr: Descr] = { found: BOOL _ FALSE; Check: ItemProc = { qe _ qElem; descr _ DescrFromElemVal[qElem.Value[]]; IF handle = descr.GetUserHandle[] THEN {found _ TRUE; continue _ FALSE}; }; EnumerateItems[Check]; IF ~found THEN descr _ NIL; }; PrintQueue: PUBLIC ENTRY PROC [name: ROPE, out: STREAM] = { ENABLE UNWIND => NULL; PrintQueueInner[name, out]; }; PrintQueueInner: INTERNAL PROC [name: ROPE, out: STREAM] = { queue: QUEUE; ScanForInfo: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF queue # msgs.queue THEN RETURN [TRUE]; out.PutF[ "\tsent: %G, recv: %G, last up: ", IO.int[msgs.sent], IO.int[msgs.recv]]; IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"] ELSE out.Put[IO.time[msgs.lastUp]]; out.PutRope["\n"]; PrintInibition[qElem, out, "\t"]; RETURN[FALSE]; }; PrintVal: ArpaQueue.ElemProc = { value: REF ANY = qElem.Value[]; SELECT ValTypeOfElem[value] FROM RecipBundleT => { msgs: RecipBundle = NARROW[value]; out.PutF[ "\t%G:\n\t\tsent: %G, recv: %G, last up: ", IO.rope[msgs.queue.Name[]], IO.int[msgs.sent], IO.int[msgs.recv]]; IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"] ELSE out.Put[IO.time[msgs.lastUp]]; out.PutRope["\n"]; PrintInibition[qElem, out, "\t\t"]; msgs.queue.Enumerate[PrintVal]; }; ENDCASE => { out.PutRope["\t\t"]; PrintElemVal[value, out]; out.PutChar['\n]; }; }; IF name = NIL THEN { PrintQueueInner[EmptyQueues.Name[], out]; PrintQueueInner[GVQueue.Name[], out]; PrintQueueInner[ExpressQueue.Name[], out]; PrintQueueInner[ArpaExpressQueue.Name[], out]; PrintQueueInner[HostQueues.Name[], out]; PrintQueueInner[SickQueues.Name[], out]; RETURN; }; queue _ FindQueue[name]; IF queue = NIL THEN { out.PutRope["Unknown queue: "]; out.PutRope[name]; out.PutRope[".\n"]; RETURN; }; out.PutRope[queue.Name[]]; out.PutRope[":\n"]; EmptyQueues.Enumerate[ScanForInfo]; GVQueue.Enumerate[ScanForInfo]; ExpressQueue.Enumerate[ScanForInfo]; ArpaExpressQueue.Enumerate[ScanForInfo]; HostQueues.Enumerate[ScanForInfo]; SickQueues.Enumerate[ScanForInfo]; queue.Enumerate[PrintVal]; }; FindQueue: INTERNAL PROC [name: ROPE] RETURNS [queue: QUEUE _ NIL] = { bundle: RecipBundle; SELECT TRUE FROM Rope.Equal[name, EmptyQueues.Name[], FALSE] => RETURN[EmptyQueues]; Rope.Equal[name, GVQueue.Name[], FALSE] => RETURN[GVQueue]; Rope.Equal[name, ExpressQueue.Name[], FALSE] => RETURN[ExpressQueue]; Rope.Equal[name, ArpaExpressQueue.Name[], FALSE] => RETURN[ArpaExpressQueue]; Rope.Equal[name, HostQueues.Name[], FALSE] => RETURN[HostQueues]; Rope.Equal[name, SickQueues.Name[], FALSE] => RETURN[SickQueues]; Rope.Equal[name, BadItemQueue.Name[], FALSE] => RETURN[BadItemQueue]; Rope.Equal[name, tempQueue.Name[], FALSE] => RETURN[tempQueue]; ENDCASE; bundle _ GetBundle[name, FALSE]; IF bundle = NIL THEN RETURN[NIL]; RETURN[bundle.queue]; }; GetBundle: INTERNAL PROC [name: ROPE, create: BOOL] RETURNS [bundle: RecipBundle _ NIL] = { Find: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], name, FALSE] THEN { bundle _ msgs; continue _ FALSE; }; }; EmptyQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; GVQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; ExpressQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; ArpaExpressQueue.Enumerate[Find]; IF bundle # NIL THEN RETURN; HostQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; SickQueues.Enumerate[Find]; IF bundle # NIL THEN RETURN; IF ~create THEN RETURN; bundle _ NEW[RecipBundleRep _ [ queue: ArpaQueue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]]; EnqueueAndLog[queue: EmptyQueues, qElem: ArpaQueue.NewElem[bundle]]; }; PrintElemVal: PROC [value: REF ANY, out: STREAM] = { SELECT ValTypeOfElem[value] FROM DeliveryPkgT, RecipBundleT => out.PutRope[Unparse[value]]; ENDCASE => ERROR UserError["Somehow you have tried to print something that isn't a queue."]; }; PrintInibition: PROC [qElem: ArpaQueue.QElem, out: STREAM, prefix: ROPE] = { for: INT; why: ROPE; [for, why] _ qElem.Inhibition[]; IF for > 0 THEN out.PutF[ "%GInhibited for %G:%02G because: %G.\n", IO.rope[prefix], IO.int[for/60], IO.int[for MOD 60], IO.rope[why]]; }; Unparse: PROC [value: REF ANY] RETURNS [ROPE] = { SELECT TRUE FROM ISTYPE[value, ItemOnly] => RETURN[NARROW[value, ItemOnly]^.Unparse[]]; ISTYPE[value, DeliveryPkg] => { pkg: DeliveryPkg = NARROW[value]; r: ROPE _ Rope.Cat[pkg.descr.Unparse[], " for "]; FOR l: LIST OF ROPE _ pkg.recipList, l.rest UNTIL l = NIL DO r _ Rope.Cat[r, IF l = pkg.recipList THEN NIL ELSE ", ", l.first]; ENDLOOP; RETURN[r]; }; ISTYPE[value, RecipBundle] => RETURN[NARROW[value, RecipBundle].queue.Name[]]; ENDCASE => RETURN["(Unparse ENDCASE)"]; }; StartServer: PUBLIC PROC = TRUSTED { CheckLists[]; Process.Detach[FORK SendGVMessages[]]; Process.Detach[FORK SendArpaMessages[]]; Process.Detach[FORK Background[]]; }; newMessageArrived: CONDITION _ [timeout: Process.SecondsToTicks[30]]; StartNewMessage: PUBLIC ENTRY PROC [source: ROPE] = {}; -- Hack to hang on ML before reading message from the net. Otherwise we let duplicate messages through because we don't "accept" them after they get onto the disk. AddNewMessage: PUBLIC ENTRY PROC [descr: Descr, source: ROPE] = { ENABLE UNWIND => NULL; host: RecipBundle _ GetBundle[source, TRUE]; qElem: ArpaQueue.QElem _ ArpaQueue.NewElem[NEW[Descr _ descr]]; value: REF ANY _ qElem.Value[]; CheckLists[]; SELECT descr.GetFormat[] FROM gv => { from: ROPE _ descr.GetGvSender[]; IF from = NIL THEN { arpaReversePath, gvSender: ROPE; gvID: ROPE; temp: IO.STREAM = descr.RetrieveMsgStream[]; RetrieveReturnTo: ArpaSMTPSyntax.GVItemProc = { IF itemHeader.type = PostMark THEN { timestamp: GVBasics.Timestamp _ GVProtocol.ReceiveTimestamp[itemStream]; gvID _ GVBasics.RopeFromTimestamp[timestamp]; }; IF itemHeader.type = ReturnTo THEN { gvSender _ ArpaSMTPSyntax.ReceiveRName[itemStream]; from _ gvSender; continue _ FALSE; }; }; ArpaSMTPSyntax.EnumerateGVItems[temp, RetrieveReturnTo]; temp.Close[]; ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is ", gvID, "."]; ArpaSMTPDescr.SetGvSender[descr, gvSender]; arpaReversePath _ ArpaSMTPSyntax.ReversePath[gvSender]; descr.SetArpaReversePath[arpaReversePath]; }; ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is from ", from, "."]; IF ~ArpaSMTPSupport.AuthorizationCheck[from] OR ~ArpaSMTPSupport.CheckHeader[from, descr] THEN { BEGIN ENABLE ArpaSMTPDescr.WrongState => CONTINUE; ArpaSMTPDescr.RemoveRawRecipients[descr]; END; ConsiderItemDestruction[descr]; RETURN; }; }; arpa => { arpaReversePath: ROPE; arpaReversePath _ ArpaSMTPDescr.GetArpaReversePath[descr]; arpaReversePath _ ArpaSMTPSyntax.UnBlessReturnPath[arpaReversePath]; IF arpaReversePath = NIL THEN { arpaReversePath _ ArpaSMTPDescr.GetGvSender[descr]; arpaReversePath _ ArpaSMTPSyntax.UnBlessReturnPath[arpaReversePath]; arpaReversePath _ ArpaSMTPSyntax.ReversePath[arpaReversePath]; }; descr.SetArpaReversePath[arpaReversePath]; ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is from ", arpaReversePath, "."]; }; ENDCASE => ERROR; BEGIN ENABLE ArpaSMTPDescr.WrongState => CONTINUE; --already processed redistributed: ROPE; ProcessRawRecipient: ArpaSMTPDescr.RawRecipProc = { descr: Descr = NARROW[procData, ItemOnly]^; hostName, userName: ROPE; [hostName, userName] _ ArpaSMTPSyntax.HostAndUser[rawRecipient]; IF hostName = NIL THEN { hostName _ gvHostName; IF Rope.Find[userName, "^."] # -1 OR DLWithoutUpArrow[userName] THEN { IF redistributed = NIL THEN redistributed _ "Redistributed: " ELSE redistributed _ Rope.Cat[redistributed, ", "]; IF Rope.Find[rawRecipient, "Xerox.ARPA", 0, FALSE] # -1 THEN redistributed _ Rope.Cat[redistributed, rawRecipient] ELSE redistributed _ Rope.Cat[redistributed, userName];}; }; descr.AddProcessedRecipient[hostName, userName, TRUE]; }; descr.EnumerateRawRecipients[ProcessRawRecipient, value]; IF redistributed # NIL THEN descr.SetPrecedeMsgText[Rope.Cat[redistributed, "\n", descr.GetPrecedeMsgText[]]]; END; descr.StoreItemInfo[]; -- update info on disk descr.EnumerateRecipientHosts[QueueForDelivery, value]; ConsiderItemDestruction[descr]; -- Empty, Crashed after sending to last host host.recv _ host.recv + 1; host.lastUp _ BasicTime.Now[]; }; QueueForDelivery: INTERNAL ArpaSMTPDescr.HostProc = { descr: Descr = NARROW[procData, ItemOnly]^; mainQueue, hostQueue: QUEUE; queueName: ROPE; gv: BOOL = Rope.Equal[hostName, gvHostName]; SELECT TRUE FROM ~gv => { queueName _ hostName; mainQueue _ HostQueues; NOTIFY newArpaMessage; }; (queueName _ Express[userNames]) # NIL => { mainQueue _ ExpressQueue}; (queueName _ ArpaExpress[userNames]) # NIL => { mainQueue _ ArpaExpressQueue}; ENDCASE => { queueName _ gvHostName; mainQueue _ GVQueue; NOTIFY newGvMessage; }; hostQueue _ GetQueueForNewMessage[mainQueue, queueName]; EnqueueAndLog[queue: hostQueue, qElem: ArpaQueue.NewElem[NEW[DeliveryPkgRep _ [descr, userNames]]]]; }; newArpaMessage: CONDITION _ [timeout: Process.SecondsToTicks[30]]; GetProcessableHost: ENTRY PROC RETURNS [host: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; DO host _ HostQueues.GetProcessableElement[]; IF host # NIL THEN { MoveHostToEnd[HostQueues, host]; RETURN; }; WAIT newArpaMessage; ENDLOOP; }; SendArpaMessages: PROC = { DO host: ArpaQueue.QElem _ GetProcessableHost[]; recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; IF ProcessArpaHost[host] THEN LOOP; IF host.Inhibition[].for > 0 THEN LOOP; IF hostQueue.IsEmpty[] THEN LOOP; -- Bogus host name MoveToSick[HostQueues, host, "Didn't make any progress"]; ENDLOOP; }; ProcessArpaHost: PROC [host: ArpaQueue.QElem] RETURNS [progress: BOOL] = { hostStream: ArpaSMTPSend.Connection; recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; startTime: BasicTime.GMT _ BasicTime.Now[]; BEGIN -- so hostStream and hostQueue will be in scope for EXITS msg: ArpaQueue.QElem; progress _ FALSE; IF hostQueue.IsEmpty[] THEN GOTO FlushHostQueue; hostStream _ ArpaSMTPSend.Open[hostQueue.Name[] ! ArpaSMTPSend.Failed => { SELECT withItem FROM irrelevant, retryLater => { IF problemWithHost THEN MoveToSick[HostQueues, host, reason]; GOTO Done; }; returnToSender => { -- there must be a problem with this Host (no item was given) UNTIL hostQueue.IsEmpty[] DO qElem: ArpaQueue.QElem = hostQueue.GetHead[]; pkg: DeliveryPkg = NARROW[qElem.Value[]]; descr: Descr = pkg.descr; IF debug THEN SIGNAL LookAtThis; ArpaSMTPSupport.NotifySender[descr, reason]; FinishedWithMsg[queue: hostQueue, pkg: pkg]; ENDLOOP; IF recipients.recv # 0 THEN GOTO FlushHostQueue; -- Foo@Concord.ms (yetch) FlushThisHost[queue: HostQueues, value: host.Value[]]; GOTO Done; }; ENDCASE => ERROR; }]; -- putOnBadQueue makes no sense for Open UNTIL (msg _ GetProcessableElement[hostQueue]) = NIL DO IF BasicTime.Period[from: startTime, to: BasicTime.Now[]] > 5*60 THEN { ArpaSMTPSupport.Log[important, "Already spent more than 5 minutes sending mail to " , Unparse[host.Value[]], "."]; EXIT; }; IF ~SendArpaMsg[hostStream, host, msg] THEN { hostStream.Close[TRUE]; RETURN; }; recipients.sent _ recipients.sent + 1; recipients.lastUp _ BasicTime.Now[]; progress _ TRUE; ENDLOOP; GOTO FlushHostQueue; EXITS FlushHostQueue => { IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: HostQueues, host: host]; }; Done => NULL; END; IF hostStream # NIL THEN hostStream.Close[FALSE]; }; LookAtThis: SIGNAL = CODE; SendArpaMsg: PROC [stream: ArpaSMTPSend.Connection, host, msg: ArpaQueue.QElem] RETURNS [continue: BOOL _ TRUE] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; pkg: DeliveryPkg = NARROW[msg.Value[]]; descr: Descr = pkg.descr; BEGIN -- so the above vars will be in scope for EXITS ArpaSMTPSend.SendItem[descr, pkg.recipList, stream ! ArpaSMTPSend.Failed => { IF problemWithHost THEN { MoveToSick[HostQueues, host, reason]; continue _ FALSE; };-- stop the enumeration of SMTP items (stream may be aborted) SELECT withItem FROM retryLater => { IF ~problemWithHost THEN msg.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason]; GOTO Done; }; returnToSender => { IF debug THEN SIGNAL LookAtThis; ArpaSMTPSupport.NotifySender[descr, reason]; GOTO RemoveItem; }; putOnBadQueue => { MoveToBad[queue: hostQueue, msg: msg]; GOTO Done; }; ENDCASE => ERROR; -- shouldn't be irrelevant }]; GOTO RemoveItem; EXITS RemoveItem => { FinishedWithMsg[queue: hostQueue, pkg: pkg]; }; Done => NULL; END; }; newGvMessage: CONDITION _ [timeout: Process.SecondsToTicks[30]]; GetGvClump: ENTRY PROC RETURNS [host: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; DO host _ GVQueue.GetProcessableElement[]; IF host # NIL THEN RETURN; WAIT newGvMessage; ENDLOOP; }; SendGVMessages: PROC = { DO host: ArpaQueue.QElem _ GetGvClump[]; IF ProcessGVQueue[host] THEN LOOP; IF host.Inhibition[].for > 0 THEN LOOP; MoveToSick[GVQueue, host, "Didn't make any progress"]; ENDLOOP; }; gvHandle: ArpaSMTPGVSend.Connection _ ArpaSMTPGVSend.Open[]; -- Only need one ProcessGVQueue: PROC [host: ArpaQueue.QElem] RETURNS [progress: BOOL] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; msg: ArpaQueue.QElem; progress _ FALSE; UNTIL (msg _ GetProcessableElement[hostQueue]) = NIL DO IF ~SendGVMsg[host, msg] THEN RETURN; recipients.sent _ recipients.sent + 1; recipients.lastUp _ BasicTime.Now[]; progress _ TRUE; ENDLOOP; IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: GVQueue, host: host]; }; SendGVMsg: PROC [host, msg: ArpaQueue.QElem] RETURNS [progress: BOOL _ TRUE] = { recipients: RecipBundle _ NARROW[host.Value[]]; hostQueue: QUEUE _ recipients.queue; pkg: DeliveryPkg = NARROW[msg.Value[]]; descr: Descr = pkg.descr; BEGIN -- so the above vars will be in scope for EXITS ArpaSMTPGVSend.SendItem[descr, pkg.recipList, gvHandle ! ArpaSMTPGVSend.Failed => { IF problemWithGV THEN { MoveToSick[GVQueue, host, reason]; progress _ FALSE; }; -- stop the enumeration of GV item SELECT withItem FROM retryLater => { IF ~problemWithGV THEN msg.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason]; GOTO Done; }; returnToSender => { IF debug THEN SIGNAL LookAtThis; ArpaSMTPSupport.NotifySender[descr, reason]; GOTO RemoveItem; }; putOnBadQueue => { MoveToBad[queue: hostQueue, msg: msg]; GOTO Done; }; ENDCASE => ERROR; -- shouldn't be "irrelevant" }]; GOTO RemoveItem; EXITS RemoveItem => { FinishedWithMsg[queue: hostQueue, pkg: pkg]; }; Done => NULL; END; }; Background: ENTRY PROC = { ENABLE UNWIND => NULL; now: BasicTime.GMT _ BasicTime.Now[]; snooz: CONDITION _ [timeout: Process.SecondsToTicks[1*60]]; DO WAIT snooz; CheckLists[]; MaybePrintStats[]; ConsiderExpressMail[]; ConsiderOldMessages[]; ConsiderSickHosts[]; ENDLOOP; }; timeExpressLastChecked, timeArpaExpressLastChecked, timeNotExpressLastChecked, timeNoUpArrowLastChecked: BasicTime.GMT _ BasicTime.Update[BasicTime.Now[], - checkInterval]; expressStamp, arpaExpressStamp, notExpressStamp, noUpArrowStamp: GVBasics.Timestamp _ GVBasics.oldestTime; checkInterval: INT = 2*60*60; -- 2 hours CheckLists: PROC = { now: BasicTime.GMT _ BasicTime.Now[]; IF BasicTime.Period[from: timeExpressLastChecked, to: now] > checkInterval THEN GetExpressList[]; IF BasicTime.Period[from: timeArpaExpressLastChecked, to: now] > checkInterval THEN GetArpaExpressList[]; IF BasicTime.Period[from: timeNotExpressLastChecked, to: now] > checkInterval THEN GetNotExpressList[]; IF BasicTime.Period[from: timeNoUpArrowLastChecked, to: now] > checkInterval THEN GetNoUpArrowList[]; }; GetExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo _ GVNames.GetMembers["ExpressMail^.MS", expressStamp]; WITH m: memberInfo SELECT FROM noChange => timeExpressLastChecked _ BasicTime.Now[]; group => { timeExpressLastChecked _ BasicTime.Now[]; expressMail _ m.members; expressStamp _ m.stamp; ArpaSMTPSupport.Log[important, "Updated Express List."]; }; ENDCASE => NULL; }; GetArpaExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers["ArpaExpressMail^.MS", arpaExpressStamp]; WITH m: memberInfo SELECT FROM noChange => timeArpaExpressLastChecked _ BasicTime.Now[]; group => { timeArpaExpressLastChecked _ BasicTime.Now[]; arpaExpressMail _ m.members; arpaExpressStamp _ m.stamp; ArpaSMTPSupport.Log[important, "Updated Arpa Express List."]; }; ENDCASE => NULL; }; GetNotExpressList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers["NotArpaExpressMail^.MS", notExpressStamp]; WITH m: memberInfo SELECT FROM noChange => timeNotExpressLastChecked _ BasicTime.Now[]; group => { timeNotExpressLastChecked _ BasicTime.Now[]; notExpressMail _ m.members; notExpressStamp _ m.stamp; ArpaSMTPSupport.Log[important, "Updated Not Express List."]; }; ENDCASE => NULL; }; GetNoUpArrowList: PROC = TRUSTED { memberInfo: GVNames.MemberInfo _ [noChange[]]; memberInfo_ GVNames.GetMembers["NoUpArrowDLs^.MS", noUpArrowStamp]; WITH m: memberInfo SELECT FROM noChange => timeNoUpArrowLastChecked _ BasicTime.Now[]; group => { timeNoUpArrowLastChecked _ BasicTime.Now[]; noUpArrowDLs _ m.members; noUpArrowStamp _ m.stamp; ArpaSMTPSupport.Log[important, "Updated No Up Arrow DLs."]; }; ENDCASE => NULL; }; totalGvMsgsSentLastHour: INT _ 0; totalGvBytesSentLastHour: INT _ 0; totalArpaMsgsSentLastHour: INT _ 0; totalArpaBytesSentLastHour: INT _ 0; totalGvMsgsSentLastDay: INT _ 0; totalGvBytesSentLastDay: INT _ 0; totalArpaMsgsSentLastDay: INT _ 0; totalArpaBytesSentLastDay: INT _ 0; hourStatsLastChecked: BasicTime.GMT _ BasicTime.Now[]; dayStatsLastChecked: BasicTime.GMT _ BasicTime.Now[]; MaybePrintStats: INTERNAL PROC = { now: BasicTime.GMT _ BasicTime.Now[]; hourSecs: INT = 3600; daySecs: INT = 86400; hourPeriod: INT _ BasicTime.Period[from: hourStatsLastChecked, to: now]; dayPeriod: INT _ BasicTime.Period[from: dayStatsLastChecked, to: now]; totalGvMsgsSentInHour: INT_0; totalArpaMsgsSentInHour: INT_0; totalGvMsgsSentInDay: INT_0; totalArpaMsgsSentInDay: INT_0; totalGvBytesSentInHour: INT_0; totalArpaBytesSentInHour: INT_0; totalGvBytesSentInDay: INT_0; totalArpaBytesSentInDay: INT_0; date: ROPE _ Convert.RopeFromTime[from: now, start: years, end: days, includeDayOfWeek: TRUE]; IF hourPeriod < 3600 THEN RETURN; hourStatsLastChecked _ now; ArpaSMTPSupport.Log[important, "Date: ", date, ", Hourly Statistics: "]; totalGvMsgsSentInHour _ Real.InlineRound[((ArpaSMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastHour)*1.0*hourSecs)/hourPeriod]; totalGvMsgsSentLastHour _ ArpaSMTPGVSend.totalGvMsgsSent; totalGvBytesSentInHour _ Real.InlineRound[((ArpaSMTPGVSend.totalGvBytesSent - totalGvBytesSentLastHour)*1.0*hourSecs)/hourPeriod]; totalGvBytesSentLastHour _ ArpaSMTPGVSend.totalGvBytesSent; ArpaSMTPSupport.Log[important, " Hourly GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalGvBytesSentInHour], " bytes."]; totalArpaMsgsSentInHour _ Real.InlineRound[((ArpaSMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastHour)*1.0*hourSecs)/hourPeriod]; totalArpaMsgsSentLastHour _ ArpaSMTPSend.totalArpaMsgsSent; totalArpaBytesSentInHour _ Real.InlineRound[((ArpaSMTPSend.totalArpaBytesSent - totalArpaBytesSentLastHour)*1.0*hourSecs)/hourPeriod]; totalArpaBytesSentLastHour _ ArpaSMTPSend.totalArpaBytesSent; ArpaSMTPSupport.Log[important, " Hourly Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInHour], " bytes."]; ArpaSMTPSupport.Log[important, " Arpa queue hosts ", Convert.RopeFromInt[CountQueueInternal["Arpa"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Arpa"]]]; ArpaSMTPSupport.Log[important, " Sick queue hosts ", Convert.RopeFromInt[CountQueueInternal["Sick"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Sick"]]]; ArpaSMTPSupport.Log[important, " Express queue hosts ", Convert.RopeFromInt[CountQueueInternal["Ex"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Ex"]]]; ArpaSMTPSupport.Log[important, " GV queue messages ", Convert.RopeFromInt[CountMessagesInternal["GV"]]]; IF dayPeriod < daySecs THEN RETURN; dayStatsLastChecked _ now; totalGvMsgsSentInDay _ Real.InlineRound[((ArpaSMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastDay)*1.0*daySecs)/dayPeriod]; totalGvMsgsSentLastDay _ ArpaSMTPGVSend.totalGvMsgsSent; totalGvBytesSentInDay _ Real.InlineRound[((ArpaSMTPGVSend.totalGvBytesSent - totalGvBytesSentLastDay)*1.0*daySecs)/dayPeriod]; totalGvBytesSentLastDay _ ArpaSMTPGVSend.totalGvBytesSent; ArpaSMTPSupport.Log[important, " Daily GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalGvBytesSentInDay], " bytes."]; totalArpaMsgsSentInDay _ Real.InlineRound[((ArpaSMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastDay)*1.0*daySecs)/dayPeriod]; totalArpaMsgsSentLastDay _ ArpaSMTPSend.totalArpaMsgsSent; totalArpaBytesSentInDay _ Real.InlineRound[((ArpaSMTPSend.totalArpaBytesSent - totalArpaBytesSentLastDay)*1.0*daySecs)/dayPeriod]; totalArpaBytesSentLastDay _ ArpaSMTPSend.totalArpaBytesSent; ArpaSMTPSupport.Log[important, " Daily Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInDay], " bytes."]; ArpaSMTPSupport.Log[important, " Total Arpa bound since reboot: ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaBytesSent], " bytes."]; ArpaSMTPSupport.Log[important, " Total GV bound since reboot: ", Convert.RopeFromInt[ArpaSMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPGVSend.totalGvBytesSent], " bytes."]; ArpaSMTPSupport.Log[important, " Totals since reboot: ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaMsgsSent+ArpaSMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaBytesSent+ArpaSMTPGVSend.totalGvBytesSent], " bytes."]; }; expressDelay: INT _ 1; -- 1 minute default delay ConsiderExpressMail: INTERNAL PROC = { now: BasicTime.Unpacked _ BasicTime.Unpack[BasicTime.Now[]]; host: ArpaQueue.QElem; recipients: RecipBundle; hostQueue, from, gv: QUEUE; msg: ArpaQueue.QElem; minutes: INT; IF ~expressOK THEN RETURN; IF ~GVQueue.IsEmpty[] THEN RETURN; IF now.weekday IN [Monday..Friday] AND now.hour IN [8..12+8) THEN RETURN; host _ ExpressQueue.GetProcessableElement[]; IF host # NIL THEN { from _ ExpressQueue; MoveHostToEnd[ExpressQueue, host]; } ELSE { host _ ArpaExpressQueue.GetProcessableElement[]; IF host = NIL THEN RETURN; from _ ArpaExpressQueue; MoveHostToEnd[ArpaExpressQueue, host]; }; recipients _ NARROW[host.Value[]]; hostQueue _ recipients.queue; msg _ hostQueue.GetProcessableElement[]; IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: from, host: host]; IF msg = NIL THEN { UnlockedMoveToEmpty[from: from, host: host]; RETURN; }; gv _ GetQueueForNewMessage[GVQueue, gvHostName]; MoveAndLog[from: hostQueue, to: gv, qElem: msg]; IF hostQueue.IsEmpty[] THEN UnlockedMoveToEmpty[from: from, host: host]; NOTIFY newGvMessage; SELECT now.hour FROM 12+8 => minutes _ expressDelay + 4; -- 12 messages first hour 12+9 => minutes _ expressDelay + 3; -- 15 messages next hour 12+10 => minutes _ expressDelay + 2; -- 20 messages next hour ENDCASE => minutes _ expressDelay+1; -- 30 messages/hour => 197 msgs in 8 hrs IF ~(now.weekday IN[Monday..Friday]) THEN minutes _ expressDelay; DallyAWhile[minutes]; }; -- (At least 1) Extra minute in main loop DallyAWhile: INTERNAL PROC [minutes: INT] = { snooz: CONDITION _ [timeout: Process.SecondsToTicks[minutes*60]]; WAIT snooz; }; ConsiderSickHosts: INTERNAL PROC = { host: ArpaQueue.QElem _ SickQueues.GetProcessableElement[]; IF host # NIL THEN { recipients: RecipBundle _ NARROW[host.Value[]]; gv: BOOL _ Rope.Equal[recipients.queue.Name[], gvHostName]; target: QUEUE _ IF gv THEN GVQueue ELSE HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; }; MoveAllSickHosts: PUBLIC ENTRY PROC = { ENABLE UNWIND => NULL; Uninhibit: ArpaQueue.ElemProc = { ArpaQueue.Uninhibit[qElem, SickQueues]; }; host: ArpaQueue.QElem; SickQueues.Enumerate[Uninhibit]; host _ SickQueues.GetProcessableElement[]; WHILE host # NIL DO MoveSickHostQueue[host]; host _ SickQueues.GetProcessableElement[]; ENDLOOP; }; MoveSickHostQueue: INTERNAL PROC[host: ArpaQueue.QElem] = { IF host # NIL THEN { recipients: RecipBundle _ NARROW[host.Value[]]; gv: BOOL _ Rope.Equal[recipients.queue.Name[], gvHostName]; target: QUEUE _ IF gv THEN GVQueue ELSE HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; }; FindSickHost: INTERNAL PROC[host: Rope.ROPE] RETURNS[element: ArpaQueue.QElem_NIL]= { Find: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; IF Rope.Equal[msgs.queue.Name[], host, FALSE] THEN { ArpaQueue.Uninhibit[qElem, SickQueues]; element _ qElem; continue _ FALSE; }; }; IF host # NIL THEN SickQueues.Enumerate[Find]; }; MoveSickHost: PUBLIC ENTRY PROC[name: Rope.ROPE_ NIL] = { ENABLE UNWIND => NULL; host: ArpaQueue.QElem _ FindSickHost[name]; IF host # NIL THEN { target: QUEUE _ HostQueues; MoveAndLog[from: SickQueues, to: target, qElem: host]; NOTIFY newArpaMessage; }; }; CountQueue: PUBLIC ENTRY PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { ENABLE UNWIND => NULL; n _ CountQueueInternal[name]; }; CountQueueInternal: INTERNAL PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { Count: ArpaQueue.ElemProc = { n _ n+1}; IF name = NIL THEN RETURN[-1]; FindQueue[name].Enumerate[Count]; }; CountMessages: PUBLIC ENTRY PROC[queueName: Rope.ROPE_NIL] RETURNS[n: INT_0]= { ENABLE UNWIND => NULL; n _ CountMessagesInternal[queueName]; }; CountMessagesInternal: INTERNAL PROC[name: Rope.ROPE_NIL] RETURNS[n: INT_0]= { Count: ArpaQueue.ElemProc = { n _ n+1}; CountNested: ArpaQueue.ElemProc = { msgs: RecipBundle _ NARROW[qElem.Value[]]; msgs.queue.Enumerate[Count]; }; IF name = NIL THEN RETURN[-1]; SELECT TRUE FROM Rope.Equal[name, HostQueues.Name[], FALSE] => HostQueues.Enumerate[CountNested]; Rope.Equal[name, ExpressQueue.Name[], FALSE] => ExpressQueue.Enumerate[CountNested]; Rope.Equal[name, SickQueues.Name[], FALSE] => SickQueues.Enumerate[CountNested]; Rope.Equal[name, GVQueue.Name[], FALSE] => GVQueue.Enumerate[CountNested]; ENDCASE => n _ CountQueueInternal[name]; }; ConsiderOldMessages: INTERNAL PROC = { now: BasicTime.GMT _ BasicTime.Now[]; host: ArpaQueue.QElem _ SickQueues.GetProcessableElement[]; ConsiderExpiration: INTERNAL ItemProc = { value: REF ANY = qElem.Value[]; pkg: DeliveryPkg = NARROW[qElem.Value[]]; expiryDate: BasicTime.GMT _ ArpaSMTPDescr.GetExpiryDate[pkg.descr]; IF BasicTime.Period[from: now, to: expiryDate] < 0 THEN { reason: ROPE _ "Unable to deliver msg to "; FOR loosers: LIST OF ROPE _ pkg.recipList, loosers.rest UNTIL loosers = NIL DO reason _ Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first]; ENDLOOP; reason _ Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."]; MoveAndLog[from: queue, to: tempQueue, qElem: qElem]; SendItBack[queue: queue, qElem: qElem]; RETURN[FALSE]; }; }; IF ~GVQueue.IsEmpty[] THEN RETURN; IF ~HostQueues.IsEmpty[] THEN RETURN; EnumerateSickPkgs[ConsiderExpiration]; }; SendItBack: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = { pkg: DeliveryPkg = NARROW[qElem.Value[]]; reason: ROPE _ "Unable to deliver msg to "; FOR loosers: LIST OF ROPE _ pkg.recipList, loosers.rest UNTIL loosers = NIL DO reason _ Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first]; ENDLOOP; reason _ Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."]; ArpaSMTPSupport.NotifySender[pkg.descr, reason]; MoveAndLog[from: tempQueue, to: queue, qElem: qElem]; InternalFinishedWithMsg[queue: queue, pkg: pkg]; }; UserError: PUBLIC ERROR [reason: ROPE] = CODE; Failed: ERROR [why: ErrorCode, reason: ROPE] = CODE; ErrorCode: TYPE = {lockNotObtained, notOnQueue}; DescrFromElemVal: PROC [elemVal: REF ANY] RETURNS [Descr] = { SELECT ValTypeOfElem[elemVal] FROM ItemOnlyT => RETURN[NARROW[elemVal, ItemOnly]^]; DeliveryPkgT => RETURN[NARROW[elemVal, DeliveryPkg].descr]; ENDCASE => ERROR; }; ItemProc: TYPE = PROC [queue: QUEUE, qElem: ArpaQueue.QElem, procData: REF ANY _ NIL] RETURNS [continue: BOOL _ TRUE]; EnumerateSickPkgs: INTERNAL PROC [proc: ItemProc, data: REF ANY _ NIL] = { continueOverall: BOOL; queue: QUEUE; ScanQueue: ArpaQueue.ElemProc = { continueOverall _ proc[queue, qElem, data]; RETURN[continueOverall]; }; ScanHostQueue: ArpaQueue.ElemProc = { queue _ NARROW[qElem.Value[], RecipBundle].queue; queue.Enumerate[ScanQueue]; RETURN[continueOverall]; }; SickQueues.Enumerate[ScanHostQueue]; }; EnumerateItems: INTERNAL PROC [proc: ItemProc, data: REF ANY _ NIL] = { continueOverall: BOOL; queue: QUEUE; ScanQueue: ArpaQueue.ElemProc = { continueOverall _ proc[queue, qElem, data]; RETURN[continueOverall]; }; ScanHostQueue: ArpaQueue.ElemProc = { queue _ NARROW[qElem.Value[], RecipBundle].queue; queue.Enumerate[ScanQueue]; RETURN[continueOverall]; }; IF ~continueOverall THEN RETURN; GVQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; HostQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; SickQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; queue _ tempQueue; tempQueue.Enumerate[ScanQueue, data]; queue _ BadItemQueue; BadItemQueue.Enumerate[ScanQueue, data]; }; QueueProc: TYPE = PROC [queue: QUEUE, procData: REF ANY _ NIL] RETURNS [continue: BOOL _ TRUE]; EnumerateQueues: INTERNAL PROC [proc: QueueProc, data: REF ANY _ NIL] = { continueOverall: BOOL _ TRUE; ScanHostQueue: ArpaQueue.ElemProc = { queue: QUEUE _ NARROW[qElem.Value[], RecipBundle].queue; continueOverall _ proc[queue, data]; RETURN[continueOverall]; }; GVQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; ArpaExpressQueue.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; HostQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; SickQueues.Enumerate[ScanHostQueue]; IF ~continueOverall THEN RETURN; IF ~proc[tempQueue, data] THEN RETURN; IF ~proc[BadItemQueue, data] THEN RETURN; }; GetProcessableElement: ENTRY PROC [queue: QUEUE] RETURNS [qElem: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; qElem _ queue.GetProcessableElement[]; IF qElem = NIL THEN RETURN; queue.Dequeue[qElem.Value[]]; queue.Enqueue[qElem]; }; MoveHostToEnd: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = { queue.Dequeue[qElem.Value[]]; queue.Enqueue[qElem]; }; MoveToEmpty: ENTRY PROC [from: QUEUE, host: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; MoveAndLog[from: from, to: EmptyQueues, qElem: host]; }; UnlockedMoveToEmpty: INTERNAL PROC [from: QUEUE, host: ArpaQueue.QElem] = { MoveAndLog[from: from, to: EmptyQueues, qElem: host]; }; MoveToSick: ENTRY PROC [from: QUEUE, host: ArpaQueue.QElem, reason: ROPE] = { ENABLE UNWIND => NULL; host.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason]; MoveAndLog[from: from, to: SickQueues, qElem: host]; }; MoveToBad: ENTRY PROC [queue: QUEUE, msg: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; DequeueAndLog[queue: queue, value: msg.Value[]]; EnqueueAndLog[queue: BadItemQueue, qElem: msg]; }; FlushThisHost: ENTRY PROC [queue: QUEUE, value: REF ANY] = { ENABLE UNWIND => NULL; DequeueAndLog[queue: queue, value: value]; }; FinishedWithMsg: ENTRY PROC [queue: QUEUE, pkg: DeliveryPkg] = { ENABLE UNWIND => NULL; InternalFinishedWithMsg[queue, pkg]; }; InternalFinishedWithMsg: INTERNAL PROC [queue: QUEUE, pkg: DeliveryPkg] = { pkg.descr.RemoveRecipientHost[queue.Name[]]; DequeueAndLog[queue: queue, value: pkg]; ConsiderItemDestruction[descr: pkg.descr]; }; LockedMoveAndLog: ENTRY PROC [from, to: QUEUE, qElem: ArpaQueue.QElem] = { ENABLE UNWIND => NULL; MoveAndLog[from: from, to: to, qElem: qElem]; }; MoveAndLog: INTERNAL PROC [from, to: QUEUE, qElem: ArpaQueue.QElem] = { from.Dequeue[qElem.Value[]]; to.Enqueue[qElem]; ArpaSMTPSupport.Log[verbose, Unparse[qElem.Value[]], " moved from ", from.Name[], " to ", to.Name[], "."]; }; EnqueueAndLog: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = { queue.Enqueue[qElem]; ArpaSMTPSupport.Log[verbose, Unparse[qElem.Value[]], " added to ", queue.Name[], "."]; }; DequeueAndLog: INTERNAL PROC [queue: QUEUE, value: REF ANY] = { val: ROPE = Unparse[value]; queue.Dequeue[value]; ArpaSMTPSupport.Log[verbose, val, " removed from ", queue.Name[], "."]; }; ValTypeOfQueue: PROC [queue: QUEUE] RETURNS [ElemValType] = { RETURN[SELECT queue FROM tempQueue, BadItemQueue => DeliveryPkgT, EmptyQueues, GVQueue, ExpressQueue, HostQueues, SickQueues => RecipBundleT, ENDCASE => DeliveryPkgT ]; }; ValTypeOfElem: PROC [value: REF ANY] RETURNS [ElemValType] = { SELECT TRUE FROM ISTYPE[value, DeliveryPkg] => RETURN[DeliveryPkgT]; ISTYPE[value, RecipBundle] => RETURN[RecipBundleT]; ENDCASE => ERROR; }; ElemValType: TYPE = {ItemOnlyT, DeliveryPkgT, RecipBundleT}; ValTypeNames: ARRAY ElemValType OF ROPE = [ "Somebody is confused", Rope.Cat[ "DeliveryPkg (from ", ", an Host queue, or ", BadItemQueue.Name[], ")"], Rope.Cat["Host queue (from Empty, GV, Ex, ARPA, or Sick)"] ]; ConsiderItemDestruction: INTERNAL PROC [descr: Descr] = { queues: LIST OF QUEUE _ NIL; itemPresent: BOOL _ FALSE; onQueue: QUEUE; CollectQueues: QueueProc = {queues _ CONS[queue, queues]; }; EnumerateQueues[CollectQueues]; [itemPresent, onQueue] _ ItemIsOn[descr.GetUserHandle[], queues]; IF itemPresent THEN { IF ~descr.MoreRecipients[] AND onQueue # BadItemQueue THEN -- queue corruption ArpaSMTPSupport.Log[important, "There are no more recipients in the item descriptor for item", descr.Unparse[], ", but there is a reference to it on ", onQueue.Name[], ".\nThis should not have occurred; the demons should discard it."]; } ELSE { -- item not present on any queues IF descr.MoreRecipients[] THEN { qElem: ArpaQueue.QElem = ArpaQueue.NewElem[NEW[DeliveryPkgRep _ [descr: descr, recipList: NIL]]]; EnqueueAndLog[queue: BadItemQueue, qElem: qElem]; ArpaSMTPSupport.Log[ATTENTION, "There are more recipients in the item descriptor for item ", descr.Unparse[], ", though there is no reference to it on any queue.\n", "This should not have occurred. Have placed a deliveryPkg on ", BadItemQueue.Name[], "."]; } ELSE { descr.Destroy[]; }; }; }; ItemIsOn: INTERNAL PROC [handle: INT, queueList: LIST OF QUEUE] RETURNS [itemPresent: BOOL, onQueue: QUEUE] = { itemPresent _ FALSE; FOR qList: LIST OF QUEUE _ queueList, qList.rest UNTIL qList = NIL DO onQueue _ qList.first; IF RetrieveElemVal[onQueue, handle, FALSE] # NIL THEN { itemPresent _ TRUE; EXIT; }; ENDLOOP; }; RetrieveElemVal: PUBLIC PROC [queue: QUEUE, handle: INT, errorIfNotFound: BOOL _ TRUE] RETURNS [value: REF ANY _ NIL] = { SELECT ValTypeOfQueue[queue] FROM DeliveryPkgT => { Find: ArpaQueue.ElemProc = { IF handle = ArpaSMTPDescr.GetUserHandle[NARROW[qElem.Value[], DeliveryPkg].descr] THEN {value _ qElem.Value[]; continue _ FALSE}; }; queue.Enumerate[Find]; }; RecipBundleT => ERROR UserError[IO.PutFR["You have asked the queue module to retrieve item #%g from HostQueues, which doesn't directly contain items (possibly a code bug). Use HostQueue.", IO.int[handle]]]; ENDCASE => ERROR; IF value = NIL AND errorIfNotFound THEN { reason: ROPE = IO.PutFR[ "Msg #%g not found on %g.", IO.int[handle], IO.rope[queue.Name[]]]; ERROR UserError[reason]; }; }; -- end RetrieveElemVal END. 6ArpaSMTPQueueImpl.mesa Copyright c 1985 by Xerox Corporation. All rights reserved. DCraft, December 21, 1983 9:02 pm Taft, February 4, 1984 4:30:19 pm PST Hal Murray May 29, 1985 2:17:17 am PDT John Larson, February 29, 1988 0:22:08 am PST Queue elements (which must be REFs) are DeliveryPkgs or RecipBundles. The Queues (Names are in ValTypeNames too) Note: The names of all sub-queues must be unique Special Host Names and such Policy controls Not currently in use yet. Try for an old one. Doesn't exist yet. Make it. Fish Sender out of body of msg, but not if ReturnToSender has already set one up. Beware of loops if you can't return a message to the original sender. -- Show old name if used on Open probably means the host name is unknown; all items should be returned Reject the whole list... Info for lists IF now.hour = 12+11 THEN RETURN; -- Archiver and RegPurger This heuristic is a hack. For easy messages it's too slow. For nasty ones, GV still gets swamped. IF ~HostQueues.IsEmpty[] THEN RETURN; Miscellaneous Will happily queue wrong value type on a queue. Check all item queues (excluding InSystemQueue) to determine if descr is on any of them. If so, the item should be retained, else destroyed unless there are further recipients in the descriptor (in which case there is some error so log an ATTENTION msg and place on BadItemQueue). Also log an error if the descr appears on some queue but has no further recipients. The alternative to this (and the way I first implemented it) is to check only if there are no further recipients in the recipient list field of the Descr, and assume that if there aren't then the item isn't on any pkg queue. If all of the code was right, and users could never confuse the queue system, this would be the better choice because all of the queues wouldn't have to be searched. However, we don't want pkgs on queues pointing to a destroyed item, and this is a server so we must try to act intelligently in the face of corruptions... Find the value of the element on queue with the given user handle. Value is NIL if handle not found on queue. Κ-`– "cedar" style˜headšœ™Icodešœ Οmœ1™Mšœ5˜5—šž˜Mšœ8˜8———Mšœ0žœ˜9—Mšœ9˜9šžœžœž˜MšœR˜R—Mšžœ˜Mšœ ˜-Mšœ7˜7Mšœ  ,˜LM˜Mšœ!˜!—š‘œžœ˜5Mšœžœ˜+Mšœžœ˜Mšœ žœ˜Mšœžœ$˜,šžœžœž˜šœ˜Mšœ˜Mšœ˜Mšžœ˜—šœ#žœ˜+Mšœ˜—šœ'žœ˜/Mšœ˜—šžœ˜ Mšœ˜Mšœ˜Mšžœ˜——Mšœ8˜8Mšœ:žœ+˜h—Mšœž œ)˜Bš‘œžœžœžœ˜BMšžœžœžœ˜šž˜Mšœ*˜*Mšžœžœžœ$žœ˜@Mšžœ˜Mšžœ˜ ——š‘œžœ˜šž˜Mšœ-˜-Mšœžœ˜/Mšœ žœ˜$Mšžœžœžœ˜#Mšžœžœžœ˜'Mšžœžœžœ ˜4Mšœ9˜9Mšžœ˜ ——š‘œžœžœ žœ˜JMšœ$˜$Mšœžœ˜/Mšœ žœ˜$Mšœžœ˜+Mšžœ 9˜?Mšœ˜Mšœ žœ˜Mšžœžœžœ˜0šœJ˜Jšžœ ž˜šœ˜šžœž˜Mšœ%˜%—Mšžœ ˜ —šœ =˜QM™MM™šžœž˜Mšœ-˜-Mšœžœ˜)Mšœ˜Mšžœžœžœ ˜ Mšœ,˜,Mšœ,˜,Mšžœ˜—Mšžœžœžœ ˜JMšœ6˜6Mšžœ ˜ —Mšžœžœ (˜>——šžœ,žœž˜7šžœ?žœ˜GMšœs˜sMšžœ˜—Mšžœ%žœžœžœ˜PMšœ&˜&Mšœ$˜$Mšœ žœ˜Mšžœ˜—Mšžœ˜šž˜šœ˜Mšžœžœ.˜I—Mšœžœ˜ —Mšžœ˜Mšžœžœžœžœ˜4—Mš‘ œžœžœ˜š ‘ œžœ?žœ žœžœ˜sMšœžœ˜/Mšœ žœ˜$Mšœžœ˜'Mšœ˜šžœ /˜5šœM˜Mšžœžœ˜Mšœ%˜%Mšœ žœ =˜Q—šžœ ž˜˜šžœž˜MšœB˜B—Mšžœ ˜ —˜Mšžœžœžœ ˜ Mšœ,˜,Mšžœ˜—˜Mšœ&˜&Mšžœ ˜ —Mšžœžœ ˜,—Mšœ˜—Mšžœ ˜šž˜šœ˜Mšœ/˜/—Mšœžœ˜ —Mšžœ˜——Mšœž œ)˜@š‘ œžœžœžœ˜:Mšžœžœžœ˜šž˜Mšœ'˜'Mšžœžœžœžœ˜Mšžœ˜Mšžœ˜ ——š‘œžœ˜šž˜Mšœ%˜%Mšžœžœžœ˜"Mšžœžœžœ˜'Mšœ6˜6Mšžœ˜ ——Mšœ= ˜Mš‘œžœžœ žœ˜IMšœžœ˜/Mšœ žœ˜$Mšœ˜Mšœ žœ˜šžœ,žœž˜7Mšžœžœžœ˜%Mšœ&˜&Mšœ$˜$Mšœ žœ˜Mšžœ˜—Mšžœžœ,˜G—š ‘ œžœžœ žœžœ˜PMšœžœ˜/Mšœ žœ˜$Mšœžœ˜'Mšœ˜šžœ /˜5šœT˜Tšžœžœ˜Mšœ"˜"Mšœ žœ "˜7—šžœ ž˜˜šžœž˜MšœB˜BMšžœ ˜ ——˜Mšžœžœžœ ˜ Mšœ,˜,Mšžœ˜—˜Mšœ&˜&Mšžœ ˜ —Mšžœžœ ˜.—Mšœ˜—Mšžœ ˜šž˜šœ˜Mšœ/˜/—Mšœžœ˜ —Mšžœ˜——š‘ œžœžœ˜Mšžœžœžœ˜Mšœžœ˜%Mšœž œ+˜;šž˜Mšžœ˜ M˜ M˜M˜M˜M˜Mšžœ˜ —M˜—M˜šœ™MšœO£œ žœ6˜¬Mšœ1£œ+˜jMšœžœ   ˜(—š‘ œžœ˜Mšœžœ˜%MšžœIžœ˜aMšžœMžœ˜jMšžœLžœ˜hMšΡkmt£K€£˜fMšœ˜—š‘œžœžœ˜ Mšœ.˜.MšœA˜Ašžœžœž˜Mšœ5˜5˜ Mšœ)˜)M˜M˜Jšœ;˜;—Mšžœžœ˜——š‘œžœžœ˜$Mšœ.˜.MšœH˜Hšžœžœž˜Mšœ9˜9˜ Mšœ-˜-Mšœ˜Mšœ˜Jšœ@˜@—Mšžœžœ˜M˜——š‘œžœžœ˜#Mšœ.˜.MšœJ˜Jšžœžœž˜Mšœ8˜8˜ Mšœ,˜,Mšœ˜Mšœ˜Jšœ?˜?—Mšžœžœ˜M˜M˜——šΡmntœžœžœ˜"Mšœ.˜.Mšœ £œ£œ˜Cšžœžœž˜Mšœ £œ˜7˜ Mš£œ˜+Mš£ œ ˜Mš£œ ˜Jšœ(£œ˜>—Mšžœžœ˜M˜M˜——Jšœžœ˜!Jšœžœ˜"Jšœžœ˜#Jšœžœ˜$J˜Jšœžœ˜ Jšœžœ˜!Jšœžœ˜"Jšœžœ˜#J˜Mšœ žœ˜6Mšœžœ˜5M˜š‘œž œ˜#Mšœžœ˜%Mšœ žœ˜Mšœ žœ ˜Mšœ žœ9˜HMšœ žœ8˜FMšœžœ˜Mšœžœ˜Mšœžœ˜Mšœžœ˜Mšœžœ˜Mšœžœ˜ Mšœžœ˜Mšœžœ˜M˜MšœžœNžœ˜^Mšžœžœžœ˜!Mšœ˜M˜JšœK˜KM˜Mšœ˜Mšœ9˜9M˜Mšœ‚˜‚Mšœ;˜;M˜JšœΈ˜ΈJ˜Mšœƒ˜ƒMšœ;˜;M˜Mšœ†˜†Mšœ=˜=J˜JšœΎ˜ΎJ˜Jšœ²˜²J˜Jšœ²˜²J˜Jšœ±˜±J˜Jšœl˜lJ˜J˜Mšžœžœžœ˜#Mšœ˜J˜J˜Mšœ{˜{Mšœ8˜8M˜Mšœ~˜~Mšœ:˜:M˜Jšœ΅˜΅M˜Mšœ˜Mšœ:˜:M˜Mšœ‚˜‚Mšœ<˜Mšžœžœžœ˜Mšœ0˜0Mšœ2˜2—š ‘ œžœžœ žœ žœžœ˜šžœžœž˜Mšžœžœ˜3Mšžœžœ˜3Mšžœžœ˜——Mšœ žœ+˜<š‘ œžœ žœžœ˜+Mšœ˜šœ ˜ MšœH˜H—Mšœ=˜=—š‘œžœžœ˜9M™μM™‘Mš œžœžœžœžœ˜Mšœ žœžœ˜Mšœ žœ˜M˜Mš‘ œžœ˜