ArpaSMTPQueueImpl:
CEDAR
MONITOR
IMPORTS ArpaQueue, ArpaSMTPControl, ArpaSMTPDescr, ArpaSMTPGVSend, ArpaSMTPSend, ArpaSMTPSupport, ArpaSMTPSyntax, BasicTime, Convert, GVBasics, GVNames, GVProtocol, IO, Real, Rope, Process
EXPORTS ArpaSMTPQueue =
BEGIN
Descr: TYPE = ArpaSMTPDescr.Descr;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
QUEUE: TYPE = ArpaQueue.MQ;
Queue elements (which must be REFs) are DeliveryPkgs or RecipBundles.
ItemOnly: TYPE = REF Descr; -- an item for delivery to one or more recipients
DeliveryPkg: TYPE = REF DeliveryPkgRep; -- an item ready for delivery to a clump of recipients at the same host
DeliveryPkgRep: TYPE = RECORD[descr: Descr, recipList: LIST OF ROPE];
RecipBundle: TYPE = REF RecipBundleRep; -- a queue of delivery pkgs for a single host
RecipBundleRep:
TYPE =
RECORD [
queue: QUEUE,
sent, recv: INT ← 0,
lastUp: BasicTime.GMT];
The Queues (Names are in ValTypeNames too)
Note: The names of all sub-queues must be unique
EmptyQueues: QUEUE ← ArpaQueue.Create["Empty"]; -- queue of RecipBundles
GVQueue: QUEUE ← ArpaQueue.Create["GV"]; -- queue of RecipBundles
ExpressQueue: QUEUE ← ArpaQueue.Create["Ex"]; -- queue of RecipBundles
ArpaExpressQueue: QUEUE ← ArpaQueue.Create["ArpaEx"]; -- queue of RecipBundles
HostQueues: QUEUE ← ArpaQueue.Create["ARPA"]; -- queue of RecipBundles
SickQueues: QUEUE ← ArpaQueue.Create["Sick"]; -- queue of RecipBundles
BadItemQueue: QUEUE ← ArpaQueue.Create["Bad"]; -- queue of DeliveryPkgs
tempQueue: QUEUE ← ArpaQueue.Create["Temp"]; -- queue of DeliveryPkgs
Special Host Names and such
gvHostName: ROPE ← "Grapevine"; -- Wired into format of msg saved on disk
expressHostName: ROPE ← "Express";
expressMail: LIST OF ROPE;
arpaExpressHostName: ROPE ← "ArpaExpress";
arpaExpressMail: LIST OF ROPE;
notExpressMail: LIST OF ROPE;
¬*¢ ®®¡‡g: PUBLIC LIST OF ROPE;
Policy controls
expressOK: BOOL ← TRUE;
debug: BOOL ← FALSE;
ExpressList: PUBLIC PROC RETURNS[list: LIST OF ROPE] = {RETURN[expressMail]};
SetExpressOK: PUBLIC PROC [ok: BOOL] = {expressOK ← ok};
Express:
PROC [names:
LIST
OF
ROPE]
RETURNS [name:
ROPE] = {
UNTIL names = NIL DO
IF NotExpress[names.first]
THEN
RETURN[
NIL];
FOR list:
LIST
OF
ROPE ← expressMail, list.rest
UNTIL list =
NIL
DO
IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first];
ENDLOOP;
names ← names.rest;
ENDLOOP;
RETURN[NIL]; };
NotExpress:
PROC [name:
ROPE]
RETURNS [
BOOLEAN] = {
FOR list:
LIST
OF
ROPE ← notExpressMail, list.rest
UNTIL list =
NIL
DO
IF Rope.Equal[name, list.first,
FALSE]
THEN
RETURN[
TRUE];
ENDLOOP;
RETURN[FALSE]; };
ArpaExpress:
PROC [names:
LIST
OF
ROPE]
RETURNS [name:
ROPE] = {
UNTIL names =
NIL
DO
FOR list:
LIST
OF
ROPE ← arpaExpressMail, list.rest
UNTIL list =
NIL
DO
IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first];
ENDLOOP;
names ← names.rest;
ENDLOOP;
RETURN[NIL]; };
DLWithoutUpArrow:
PUBLIC PROC [dl:
ROPE]
RETURNS [valid:
BOOL] = {
FOR list:
LIST
OF
ROPE ← noUpArrowDLs, list.rest
UNTIL list =
NIL
DO
IF Rope.Equal[dl, list.first, FALSE] THEN RETURN[TRUE];
ENDLOOP;
RETURN[FALSE];
};
GetQueueForNewMessage:
INTERNAL
PROC [home:
QUEUE, name:
ROPE]
RETURNS [queue:
QUEUE] = {
Find: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
IF Rope.Equal[msgs.queue.Name[], name,
FALSE]
THEN {
old ← qElem;
queue ← msgs.queue;
continue ← FALSE; }; };
old: ArpaQueue.QElem;
new: RecipBundle;
GVQueue.Enumerate[Find];
IF queue # NIL THEN RETURN;
ExpressQueue.Enumerate[Find];
IF queue # NIL THEN RETURN;
ArpaExpressQueue.Enumerate[Find];
IF queue # NIL THEN RETURN;
HostQueues.Enumerate[Find];
IF queue # NIL THEN RETURN;
SickQueues.Enumerate[Find];
IF queue # NIL THEN RETURN;
Not currently in use yet. Try for an old one.
EmptyQueues.Enumerate[Find];
IF queue #
NIL
THEN {
MoveAndLog[from: EmptyQueues, to: home, qElem: old];
RETURN; };
Doesn't exist yet. Make it.
new ←
NEW[RecipBundleRep ← [
queue: ArpaQueue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]];
EnqueueAndLog[queue: home, qElem: ArpaQueue.NewElem[new]];
RETURN[new.queue]; };
PrintItem:
PUBLIC
ENTRY
PROC [item:
INT, out:
STREAM] = {
ENABLE UNWIND => NULL;
descr: Descr;
qElem: ArpaQueue.QElem;
[qElem, descr] ← DescrFromHandle[item];
IF descr = NIL THEN {out.PutRope["No item with that handle.\n"]; RETURN};
descr.Print[out: out, form: short];
PrintInibition[qElem, out, NIL]; };
DescrFromHandle:
INTERNAL
PROC [handle:
INT]
RETURNS [qe: ArpaQueue.QElem, descr: Descr] = {
found: BOOL ← FALSE;
Check: ItemProc = {
qe ← qElem;
descr ← DescrFromElemVal[qElem.Value[]];
IF handle = descr.GetUserHandle[] THEN {found ← TRUE; continue ← FALSE}; };
EnumerateItems[Check];
IF ~found THEN descr ← NIL; };
PrintQueue:
PUBLIC
ENTRY
PROC [name:
ROPE, out:
STREAM] = {
ENABLE UNWIND => NULL;
PrintQueueInner[name, out]; };
PrintQueueInner:
INTERNAL
PROC [name:
ROPE, out:
STREAM] = {
queue: QUEUE;
ScanForInfo: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
IF queue # msgs.queue THEN RETURN [TRUE];
out.PutF[
"\tsent: %G, recv: %G, last up: ", IO.int[msgs.sent], IO.int[msgs.recv]];
IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"]
ELSE out.Put[IO.time[msgs.lastUp]];
out.PutRope["\n"];
PrintInibition[qElem, out, "\t"];
RETURN[FALSE]; };
PrintVal: ArpaQueue.ElemProc = {
value: REF ANY = qElem.Value[];
SELECT ValTypeOfElem[value]
FROM
RecipBundleT => {
msgs: RecipBundle = NARROW[value];
out.PutF[
"\t%G:\n\t\tsent: %G, recv: %G, last up: ",
IO.rope[msgs.queue.Name[]], IO.int[msgs.sent], IO.int[msgs.recv]];
IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"]
ELSE out.Put[IO.time[msgs.lastUp]];
out.PutRope["\n"];
PrintInibition[qElem, out, "\t\t"];
msgs.queue.Enumerate[PrintVal]; };
ENDCASE => {
out.PutRope["\t\t"];
PrintElemVal[value, out];
out.PutChar['\n]; }; };
IF name =
NIL
THEN {
PrintQueueInner[EmptyQueues.Name[], out];
PrintQueueInner[GVQueue.Name[], out];
PrintQueueInner[ExpressQueue.Name[], out];
PrintQueueInner[ArpaExpressQueue.Name[], out];
PrintQueueInner[HostQueues.Name[], out];
PrintQueueInner[SickQueues.Name[], out];
RETURN; };
queue ← FindQueue[name];
IF queue =
NIL
THEN {
out.PutRope["Unknown queue: "]; out.PutRope[name]; out.PutRope[".\n"]; RETURN; };
out.PutRope[queue.Name[]];
out.PutRope[":\n"];
EmptyQueues.Enumerate[ScanForInfo];
GVQueue.Enumerate[ScanForInfo];
ExpressQueue.Enumerate[ScanForInfo];
ArpaExpressQueue.Enumerate[ScanForInfo];
HostQueues.Enumerate[ScanForInfo];
SickQueues.Enumerate[ScanForInfo];
queue.Enumerate[PrintVal]; };
FindQueue:
INTERNAL
PROC [name:
ROPE]
RETURNS [queue:
QUEUE ←
NIL] = {
bundle: RecipBundle;
SELECT
TRUE
FROM
Rope.Equal[name, EmptyQueues.Name[], FALSE] => RETURN[EmptyQueues];
Rope.Equal[name, GVQueue.Name[], FALSE] => RETURN[GVQueue];
Rope.Equal[name, ExpressQueue.Name[], FALSE] => RETURN[ExpressQueue];
Rope.Equal[name, ArpaExpressQueue.Name[], FALSE] => RETURN[ArpaExpressQueue];
Rope.Equal[name, HostQueues.Name[], FALSE] => RETURN[HostQueues];
Rope.Equal[name, SickQueues.Name[], FALSE] => RETURN[SickQueues];
Rope.Equal[name, BadItemQueue.Name[], FALSE] => RETURN[BadItemQueue];
Rope.Equal[name, tempQueue.Name[], FALSE] => RETURN[tempQueue];
ENDCASE;
bundle ← GetBundle[name, FALSE];
IF bundle = NIL THEN RETURN[NIL];
RETURN[bundle.queue]; };
GetBundle:
INTERNAL
PROC [name:
ROPE, create:
BOOL]
RETURNS [bundle: RecipBundle ←
NIL] = {
Find: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
IF Rope.Equal[msgs.queue.Name[], name,
FALSE]
THEN {
bundle ← msgs;
continue ← FALSE; }; };
EmptyQueues.Enumerate[Find];
IF bundle # NIL THEN RETURN;
GVQueue.Enumerate[Find];
IF bundle # NIL THEN RETURN;
ExpressQueue.Enumerate[Find];
IF bundle # NIL THEN RETURN;
ArpaExpressQueue.Enumerate[Find];
IF bundle # NIL THEN RETURN;
HostQueues.Enumerate[Find];
IF bundle # NIL THEN RETURN;
SickQueues.Enumerate[Find];
IF bundle # NIL THEN RETURN;
IF ~create THEN RETURN;
bundle ←
NEW[RecipBundleRep ← [
queue: ArpaQueue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]];
EnqueueAndLog[queue: EmptyQueues, qElem: ArpaQueue.NewElem[bundle]]; };
PrintElemVal:
PROC [value:
REF
ANY, out:
STREAM] = {
SELECT ValTypeOfElem[value]
FROM
DeliveryPkgT, RecipBundleT => out.PutRope[Unparse[value]];
ENDCASE =>
ERROR UserError["Somehow you have tried to print something that isn't a queue."]; };
PrintInibition:
PROC [qElem: ArpaQueue.QElem, out:
STREAM, prefix:
ROPE] = {
for: INT;
why: ROPE;
[for, why] ← qElem.Inhibition[];
IF for > 0
THEN out.PutF[
"%GInhibited for %G:%02G because: %G.\n",
IO.rope[prefix], IO.int[for/60], IO.int[for MOD 60], IO.rope[why]]; };
Unparse:
PROC [value:
REF
ANY]
RETURNS [
ROPE] = {
SELECT
TRUE
FROM
ISTYPE[value, ItemOnly] => RETURN[NARROW[value, ItemOnly]^.Unparse[]];
ISTYPE[value, DeliveryPkg] => {
pkg: DeliveryPkg = NARROW[value];
r: ROPE ← Rope.Cat[pkg.descr.Unparse[], " for "];
FOR l:
LIST
OF
ROPE ← pkg.recipList, l.rest
UNTIL l =
NIL
DO
r ← Rope.Cat[r, IF l = pkg.recipList THEN NIL ELSE ", ", l.first];
ENDLOOP;
RETURN[r]; };
ISTYPE[value, RecipBundle] =>
RETURN[NARROW[value, RecipBundle].queue.Name[]];
ENDCASE => RETURN["(Unparse ENDCASE)"]; };
StartServer:
PUBLIC
PROC =
TRUSTED {
CheckLists[];
Process.Detach[FORK SendGVMessages[]];
Process.Detach[FORK SendArpaMessages[]];
Process.Detach[FORK Background[]]; };
newMessageArrived: CONDITION ← [timeout: Process.SecondsToTicks[30]];
StartNewMessage: PUBLIC ENTRY PROC [source: ROPE] = {}; -- Hack to hang on ML before reading message from the net. Otherwise we let duplicate messages through because we don't "accept" them after they get onto the disk.
AddNewMessage:
PUBLIC
ENTRY
PROC [descr: Descr, source:
ROPE] = {
ENABLE UNWIND => NULL;
host: RecipBundle ← GetBundle[source, TRUE];
qElem: ArpaQueue.QElem ← ArpaQueue.NewElem[NEW[Descr ← descr]];
value: REF ANY ← qElem.Value[];
CheckLists[];
SELECT descr.GetFormat[]
FROM
gv => {
from: ROPE ← descr.GetGvSender[];
Fish Sender out of body of msg, but not if ReturnToSender has already set one up.
Beware of loops if you can't return a message to the original sender.
IF from =
NIL
THEN {
arpaReversePath, gvSender: ROPE;
gvID: ROPE;
temp: IO.STREAM = descr.RetrieveMsgStream[];
RetrieveReturnTo: ArpaSMTPSyntax.GVItemProc = {
IF itemHeader.type = PostMark
THEN {
timestamp: GVBasics.Timestamp ← GVProtocol.ReceiveTimestamp[itemStream];
gvID ← GVBasics.RopeFromTimestamp[timestamp]; };
IF itemHeader.type = ReturnTo
THEN {
gvSender ← ArpaSMTPSyntax.ReceiveRName[itemStream];
from ← gvSender;
continue ← FALSE; }; };
ArpaSMTPSyntax.EnumerateGVItems[temp, RetrieveReturnTo];
temp.Close[];
ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is ", gvID, "."];
ArpaSMTPDescr.SetGvSender[descr, gvSender];
arpaReversePath ← ArpaSMTPSyntax.ReversePath[gvSender];
descr.SetArpaReversePath[arpaReversePath]; };
ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is from ", from, "."];
IF ~ArpaSMTPSupport.AuthorizationCheck[from]
OR ~ArpaSMTPSupport.CheckHeader[from, descr]
THEN {
BEGIN ENABLE ArpaSMTPDescr.WrongState => CONTINUE;
ArpaSMTPDescr.RemoveRawRecipients[descr];
END;
ConsiderItemDestruction[descr];
RETURN; }; };
arpa => {
arpaReversePath: ROPE;
arpaReversePath ← ArpaSMTPDescr.GetArpaReversePath[descr];
arpaReversePath ← ArpaSMTPSyntax.UnBlessReturnPath[arpaReversePath];
IF arpaReversePath =
NIL
THEN {
arpaReversePath ← ArpaSMTPDescr.GetGvSender[descr];
arpaReversePath ← ArpaSMTPSyntax.UnBlessReturnPath[arpaReversePath];
arpaReversePath ← ArpaSMTPSyntax.ReversePath[arpaReversePath]; };
descr.SetArpaReversePath[arpaReversePath];
ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is from ", arpaReversePath, "."]; };
ENDCASE => ERROR;
BEGIN ENABLE ArpaSMTPDescr.WrongState => CONTINUE; --already processed
redistributed: ROPE;
ProcessRawRecipient: ArpaSMTPDescr.RawRecipProc = {
descr: Descr = NARROW[procData, ItemOnly]^;
hostName, userName: ROPE;
[hostName, userName] ← ArpaSMTPSyntax.HostAndUser[rawRecipient];
IF hostName =
NIL
THEN {
hostName ← gvHostName;
IF Rope.Find[userName, "^."] # -1
OR DLWithoutUpArrow[userName]
THEN {
IF redistributed = NIL THEN redistributed ← "Redistributed: "
ELSE redistributed ← Rope.Cat[redistributed, ", "];
-- Show old name if used
IF Rope.Find[rawRecipient, "Xerox.ARPA", 0,
FALSE] # -1
THEN
redistributed ← Rope.Cat[redistributed, rawRecipient]
ELSE
redistributed ← Rope.Cat[redistributed, userName];}; };
descr.AddProcessedRecipient[hostName, userName, TRUE]; };
descr.EnumerateRawRecipients[ProcessRawRecipient, value];
IF redistributed #
NIL
THEN
descr.SetPrecedeMsgText[Rope.Cat[redistributed, "\n", descr.GetPrecedeMsgText[]]];
END;
descr.StoreItemInfo[]; -- update info on disk
descr.EnumerateRecipientHosts[QueueForDelivery, value];
ConsiderItemDestruction[descr]; -- Empty, Crashed after sending to last host
host.recv ← host.recv + 1;
host.lastUp ← BasicTime.Now[]; };
QueueForDelivery:
INTERNAL ArpaSMTPDescr.HostProc = {
descr: Descr = NARROW[procData, ItemOnly]^;
mainQueue, hostQueue: QUEUE;
queueName: ROPE;
gv: BOOL = Rope.Equal[hostName, gvHostName];
SELECT
TRUE
FROM
~gv => {
queueName ← hostName;
mainQueue ← HostQueues;
NOTIFY newArpaMessage; };
(queueName ← Express[userNames]) #
NIL => {
mainQueue ← ExpressQueue};
(queueName ← ArpaExpress[userNames]) #
NIL => {
mainQueue ← ArpaExpressQueue};
ENDCASE => {
queueName ← gvHostName;
mainQueue ← GVQueue;
NOTIFY newGvMessage; };
hostQueue ← GetQueueForNewMessage[mainQueue, queueName];
EnqueueAndLog[queue: hostQueue,
qElem: ArpaQueue.NewElem[NEW[DeliveryPkgRep ← [descr, userNames]]]]; };
newArpaMessage: CONDITION ← [timeout: Process.SecondsToTicks[30]];
GetProcessableHost:
ENTRY
PROC
RETURNS [host: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
DO
host ← HostQueues.GetProcessableElement[];
IF host # NIL THEN { MoveHostToEnd[HostQueues, host]; RETURN; };
WAIT newArpaMessage;
ENDLOOP; };
SendArpaMessages:
PROC = {
DO
host: ArpaQueue.QElem ← GetProcessableHost[];
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
IF ProcessArpaHost[host] THEN LOOP;
IF host.Inhibition[].for > 0 THEN LOOP;
IF hostQueue.IsEmpty[] THEN LOOP; -- Bogus host name
MoveToSick[HostQueues, host, "Didn't make any progress"];
ENDLOOP; };
ProcessArpaHost:
PROC [host: ArpaQueue.QElem]
RETURNS [progress:
BOOL] = {
hostStream: ArpaSMTPSend.Connection;
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
startTime: BasicTime.GMT ← BasicTime.Now[];
BEGIN -- so hostStream and hostQueue will be in scope for EXITS
msg: ArpaQueue.QElem;
progress ← FALSE;
IF hostQueue.IsEmpty[] THEN GOTO FlushHostQueue;
hostStream ← ArpaSMTPSend.Open[hostQueue.Name[] ! ArpaSMTPSend.Failed => {
SELECT withItem
FROM
irrelevant, retryLater => {
IF problemWithHost
THEN
MoveToSick[HostQueues, host, reason];
GOTO Done; };
returnToSender => {
-- there must be a problem with this Host (no item was given)
on Open probably means the host name is unknown; all items should be returned
Reject the whole list...
UNTIL hostQueue.IsEmpty[]
DO
qElem: ArpaQueue.QElem = hostQueue.GetHead[];
pkg: DeliveryPkg = NARROW[qElem.Value[]];
descr: Descr = pkg.descr;
IF debug THEN SIGNAL LookAtThis;
ArpaSMTPSupport.NotifySender[descr, reason];
FinishedWithMsg[queue: hostQueue, pkg: pkg];
ENDLOOP;
IF recipients.recv # 0 THEN GOTO FlushHostQueue; -- Foo@Concord.ms (yetch)
FlushThisHost[queue: HostQueues, value: host.Value[]];
GOTO Done; };
ENDCASE => ERROR; }]; -- putOnBadQueue makes no sense for Open
UNTIL (msg ← GetProcessableElement[hostQueue]) =
NIL
DO
IF BasicTime.Period[from: startTime, to: BasicTime.Now[]] > 5*60
THEN {
ArpaSMTPSupport.Log[important,
"Already spent more than 5 minutes sending mail to " , Unparse[host.Value[]], "."];
EXIT; };
IF ~SendArpaMsg[hostStream, host, msg] THEN { hostStream.Close[TRUE]; RETURN; };
recipients.sent ← recipients.sent + 1;
recipients.lastUp ← BasicTime.Now[];
progress ← TRUE;
ENDLOOP;
GOTO FlushHostQueue;
EXITS
FlushHostQueue => {
IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: HostQueues, host: host]; };
Done => NULL;
END;
IF hostStream # NIL THEN hostStream.Close[FALSE]; };
LookAtThis: SIGNAL = CODE;
SendArpaMsg:
PROC [stream: ArpaSMTPSend.Connection, host, msg: ArpaQueue.QElem]
RETURNS [continue:
BOOL ←
TRUE] = {
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
pkg: DeliveryPkg = NARROW[msg.Value[]];
descr: Descr = pkg.descr;
BEGIN
-- so the above vars will be in scope for EXITS
ArpaSMTPSend.SendItem[descr, pkg.recipList, stream ! ArpaSMTPSend.Failed => {
IF problemWithHost
THEN {
MoveToSick[HostQueues, host, reason];
continue ← FALSE; };-- stop the enumeration of SMTP items (stream may be aborted)
SELECT withItem
FROM
retryLater => {
IF ~problemWithHost
THEN
msg.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason];
GOTO Done; };
returnToSender => {
IF debug THEN SIGNAL LookAtThis;
ArpaSMTPSupport.NotifySender[descr, reason];
GOTO RemoveItem; };
putOnBadQueue => {
MoveToBad[queue: hostQueue, msg: msg];
GOTO Done; };
ENDCASE => ERROR; -- shouldn't be irrelevant
}];
GOTO RemoveItem;
EXITS
RemoveItem => {
FinishedWithMsg[queue: hostQueue, pkg: pkg]; };
Done => NULL;
END; };
newGvMessage: CONDITION ← [timeout: Process.SecondsToTicks[30]];
GetGvClump:
ENTRY
PROC
RETURNS [host: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
DO
host ← GVQueue.GetProcessableElement[];
IF host # NIL THEN RETURN;
WAIT newGvMessage;
ENDLOOP; };
SendGVMessages:
PROC = {
DO
host: ArpaQueue.QElem ← GetGvClump[];
IF ProcessGVQueue[host] THEN LOOP;
IF host.Inhibition[].for > 0 THEN LOOP;
MoveToSick[GVQueue, host, "Didn't make any progress"];
ENDLOOP; };
gvHandle: ArpaSMTPGVSend.Connection ← ArpaSMTPGVSend.Open[]; -- Only need one
ProcessGVQueue:
PROC [host: ArpaQueue.QElem]
RETURNS [progress:
BOOL] = {
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
msg: ArpaQueue.QElem;
progress ← FALSE;
UNTIL (msg ← GetProcessableElement[hostQueue]) =
NIL
DO
IF ~SendGVMsg[host, msg] THEN RETURN;
recipients.sent ← recipients.sent + 1;
recipients.lastUp ← BasicTime.Now[];
progress ← TRUE;
ENDLOOP;
IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: GVQueue, host: host]; };
SendGVMsg:
PROC [host, msg: ArpaQueue.QElem]
RETURNS [progress:
BOOL ←
TRUE] = {
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
pkg: DeliveryPkg = NARROW[msg.Value[]];
descr: Descr = pkg.descr;
BEGIN
-- so the above vars will be in scope for EXITS
ArpaSMTPGVSend.SendItem[descr, pkg.recipList, gvHandle
! ArpaSMTPGVSend.Failed => {
IF problemWithGV
THEN {
MoveToSick[GVQueue, host, reason];
progress ← FALSE; }; -- stop the enumeration of GV item
SELECT withItem
FROM
retryLater => {
IF ~problemWithGV
THEN
msg.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason];
GOTO Done; };
returnToSender => {
IF debug THEN SIGNAL LookAtThis;
ArpaSMTPSupport.NotifySender[descr, reason];
GOTO RemoveItem; };
putOnBadQueue => {
MoveToBad[queue: hostQueue, msg: msg];
GOTO Done; };
ENDCASE => ERROR; -- shouldn't be "irrelevant"
}];
GOTO RemoveItem;
EXITS
RemoveItem => {
FinishedWithMsg[queue: hostQueue, pkg: pkg]; };
Done => NULL;
END; };
Background:
ENTRY
PROC = {
ENABLE UNWIND => NULL;
now: BasicTime.GMT ← BasicTime.Now[];
snooz: CONDITION ← [timeout: Process.SecondsToTicks[1*60]];
DO
WAIT snooz;
CheckLists[];
MaybePrintStats[];
ConsiderExpressMail[];
ConsiderOldMessages[];
ConsiderSickHosts[];
ENDLOOP; };
Info for lists
timeExpressLastChecked, timeArpaExpressLastChecked, timeNotExpressLastChecked, f%C *¢ ®®¡‡ègfïC©ÐC: BasicTime.GMT ← BasicTime.Update[BasicTime.Now[], - checkInterval];
expressStamp, arpaExpressStamp, notExpressStamp, ¬*¢ ®®¡Hfè%¢: GVBasics.Timestamp ← GVBasics.oldestTime;
checkInterval: INT = 2*60*60; -- 2 hours
CheckLists:
PROC = {
now: BasicTime.GMT ← BasicTime.Now[];
IF BasicTime.Period[from: timeExpressLastChecked, to: now] > checkInterval THEN GetExpressList[];
IF BasicTime.Period[from: timeArpaExpressLastChecked, to: now] > checkInterval THEN GetArpaExpressList[];
IF BasicTime.Period[from: timeNotExpressLastChecked, to: now] > checkInterval THEN GetNotExpressList[];
†!#èg©¥%C+ÜC®zé®%§#f%C *¢ ®®¡‡ègfïC©ÐC#f§#¬¡{#e#©C©Ð†¬fC®hè#¥î #‚Cf *¢ ®®¡‡gfz{®#
};
GetExpressList:
PROC =
TRUSTED {
memberInfo: GVNames.MemberInfo ← [noChange[]];
memberInfo ← GVNames.GetMembers["ExpressMail^.MS", expressStamp];
WITH m: memberInfo
SELECT
FROM
noChange => timeExpressLastChecked ← BasicTime.Now[];
group => {
timeExpressLastChecked ← BasicTime.Now[];
expressMail ← m.members;
expressStamp ← m.stamp;
ArpaSMTPSupport.Log[important, "Updated Express List."]; };
ENDCASE => NULL; };
GetArpaExpressList:
PROC =
TRUSTED {
memberInfo: GVNames.MemberInfo ← [noChange[]];
memberInfo← GVNames.GetMembers["ArpaExpressMail^.MS", arpaExpressStamp];
WITH m: memberInfo
SELECT
FROM
noChange => timeArpaExpressLastChecked ← BasicTime.Now[];
group => {
timeArpaExpressLastChecked ← BasicTime.Now[];
arpaExpressMail ← m.members;
arpaExpressStamp ← m.stamp;
ArpaSMTPSupport.Log[important, "Updated Arpa Express List."]; };
ENDCASE => NULL; };
GetNotExpressList:
PROC =
TRUSTED {
memberInfo: GVNames.MemberInfo ← [noChange[]];
memberInfo← GVNames.GetMembers["NotArpaExpressMail^.MS", notExpressStamp];
WITH m: memberInfo
SELECT
FROM
noChange => timeNotExpressLastChecked ← BasicTime.Now[];
group => {
timeNotExpressLastChecked ← BasicTime.Now[];
notExpressMail ← m.members;
notExpressStamp ← m.stamp;
ArpaSMTPSupport.Log[important, "Updated Not Express List."]; };
ENDCASE => NULL; };
‚Cf *¢ ®®¡‡gf:
PROC =
TRUSTED {
memberInfo: GVNames.MemberInfo ← [noChange[]];
memberInfo← GVNames.GetMembers[" *¢ ®®¡‡gË+[H", ¬*¢ ®®¡Hfè%¢];
WITH m: memberInfo
SELECT
FROM
noChange => f%C *¢ ®®¡‡ègfïC©ÐC ← BasicTime.Now[];
group => {
f%C *¢ ®®¡‡ègfïC©ÐC ← BasicTime.Now[];
¬*¢ ®®¡‡g ← m.members;
¬*¢ ®®¡Hfè%¢ ← m.stamp;
ArpaSMTPSupport.Log[important, "Updated #*¢# ®®¡#‡g."]; };
ENDCASE => NULL; };
totalGvMsgsSentLastHour: INT ← 0;
totalGvBytesSentLastHour: INT ← 0;
totalArpaMsgsSentLastHour: INT ← 0;
totalArpaBytesSentLastHour: INT ← 0;
totalGvMsgsSentLastDay: INT ← 0;
totalGvBytesSentLastDay: INT ← 0;
totalArpaMsgsSentLastDay: INT ← 0;
totalArpaBytesSentLastDay: INT ← 0;
hourStatsLastChecked: BasicTime.GMT ← BasicTime.Now[];
dayStatsLastChecked: BasicTime.GMT ← BasicTime.Now[];
MaybePrintStats:
INTERNAL PROC = {
now: BasicTime.GMT ← BasicTime.Now[];
hourSecs: INT = 3600;
daySecs: INT = 86400;
hourPeriod: INT ← BasicTime.Period[from: hourStatsLastChecked, to: now];
dayPeriod: INT ← BasicTime.Period[from: dayStatsLastChecked, to: now];
totalGvMsgsSentInHour: INT𡤀
totalArpaMsgsSentInHour: INT𡤀
totalGvMsgsSentInDay: INT𡤀
totalArpaMsgsSentInDay: INT𡤀
totalGvBytesSentInHour: INT𡤀
totalArpaBytesSentInHour: INT𡤀
totalGvBytesSentInDay: INT𡤀
totalArpaBytesSentInDay: INT𡤀
date: ROPE ← Convert.RopeFromTime[from: now, start: years, end: days, includeDayOfWeek: TRUE];
IF hourPeriod < 3600 THEN RETURN;
hourStatsLastChecked ← now;
ArpaSMTPSupport.Log[important, "Date: ", date, ", Hourly Statistics: "];
totalGvMsgsSentInHour ← Real.InlineRound[((ArpaSMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastHour)*1.0*hourSecs)/hourPeriod];
totalGvMsgsSentLastHour ← ArpaSMTPGVSend.totalGvMsgsSent;
totalGvBytesSentInHour ← Real.InlineRound[((ArpaSMTPGVSend.totalGvBytesSent - totalGvBytesSentLastHour)*1.0*hourSecs)/hourPeriod];
totalGvBytesSentLastHour ← ArpaSMTPGVSend.totalGvBytesSent;
ArpaSMTPSupport.Log[important, " Hourly GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalGvBytesSentInHour], " bytes."];
totalArpaMsgsSentInHour ← Real.InlineRound[((ArpaSMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastHour)*1.0*hourSecs)/hourPeriod];
totalArpaMsgsSentLastHour ← ArpaSMTPSend.totalArpaMsgsSent;
totalArpaBytesSentInHour ← Real.InlineRound[((ArpaSMTPSend.totalArpaBytesSent - totalArpaBytesSentLastHour)*1.0*hourSecs)/hourPeriod];
totalArpaBytesSentLastHour ← ArpaSMTPSend.totalArpaBytesSent;
ArpaSMTPSupport.Log[important, " Hourly Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInHour], " bytes."];
ArpaSMTPSupport.Log[important, " Arpa queue hosts ", Convert.RopeFromInt[CountQueueInternal["Arpa"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Arpa"]]];
ArpaSMTPSupport.Log[important, " Sick queue hosts ", Convert.RopeFromInt[CountQueueInternal["Sick"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Sick"]]];
ArpaSMTPSupport.Log[important, " Express queue hosts ", Convert.RopeFromInt[CountQueueInternal["Ex"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Ex"]]];
ArpaSMTPSupport.Log[important, " GV queue messages ", Convert.RopeFromInt[CountMessagesInternal["GV"]]];
IF dayPeriod < daySecs THEN RETURN;
dayStatsLastChecked ← now;
totalGvMsgsSentInDay ← Real.InlineRound[((ArpaSMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastDay)*1.0*daySecs)/dayPeriod];
totalGvMsgsSentLastDay ← ArpaSMTPGVSend.totalGvMsgsSent;
totalGvBytesSentInDay ← Real.InlineRound[((ArpaSMTPGVSend.totalGvBytesSent - totalGvBytesSentLastDay)*1.0*daySecs)/dayPeriod];
totalGvBytesSentLastDay ← ArpaSMTPGVSend.totalGvBytesSent;
ArpaSMTPSupport.Log[important, " Daily GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalGvBytesSentInDay], " bytes."];
totalArpaMsgsSentInDay ← Real.InlineRound[((ArpaSMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastDay)*1.0*daySecs)/dayPeriod];
totalArpaMsgsSentLastDay ← ArpaSMTPSend.totalArpaMsgsSent;
totalArpaBytesSentInDay ← Real.InlineRound[((ArpaSMTPSend.totalArpaBytesSent - totalArpaBytesSentLastDay)*1.0*daySecs)/dayPeriod];
totalArpaBytesSentLastDay ← ArpaSMTPSend.totalArpaBytesSent;
ArpaSMTPSupport.Log[important, " Daily Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInDay], " bytes."];
ArpaSMTPSupport.Log[important, " Total Arpa bound since reboot: ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaBytesSent], " bytes."];
ArpaSMTPSupport.Log[important, " Total GV bound since reboot: ", Convert.RopeFromInt[ArpaSMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPGVSend.totalGvBytesSent], " bytes."];
ArpaSMTPSupport.Log[important, " Totals since reboot: ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaMsgsSent+ArpaSMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaBytesSent+ArpaSMTPGVSend.totalGvBytesSent], " bytes."];
};
expressDelay: INT ← 1; -- 1 minute default delay
ConsiderExpressMail:
INTERNAL
PROC = {
now: BasicTime.Unpacked ← BasicTime.Unpack[BasicTime.Now[]];
host: ArpaQueue.QElem;
recipients: RecipBundle;
hostQueue, from, gv: QUEUE;
msg: ArpaQueue.QElem;
minutes: INT;
IF ~expressOK THEN RETURN;
IF ~GVQueue.IsEmpty[] THEN RETURN;
IF now.weekday IN [Monday..Friday] AND now.hour IN [8..12+8) THEN RETURN;
IF now.hour = 12+11 THEN RETURN; -- Archiver and RegPurger
host ← ExpressQueue.GetProcessableElement[];
IF host #
NIL
THEN {
from ← ExpressQueue;
MoveHostToEnd[ExpressQueue, host]; }
ELSE {
host ← ArpaExpressQueue.GetProcessableElement[];
IF host = NIL THEN RETURN;
from ← ArpaExpressQueue;
MoveHostToEnd[ArpaExpressQueue, host]; };
recipients ← NARROW[host.Value[]];
hostQueue ← recipients.queue;
msg ← hostQueue.GetProcessableElement[];
IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: from, host: host];
IF msg = NIL THEN { UnlockedMoveToEmpty[from: from, host: host]; RETURN; };
gv ← GetQueueForNewMessage[GVQueue, gvHostName];
MoveAndLog[from: hostQueue, to: gv, qElem: msg];
IF hostQueue.IsEmpty[] THEN UnlockedMoveToEmpty[from: from, host: host];
NOTIFY newGvMessage;
This heuristic is a hack. For easy messages it's too slow. For nasty ones, GV still gets swamped.
SELECT now.hour
FROM
12+8 => minutes ← expressDelay + 4; -- 12 messages first hour
12+9 => minutes ← expressDelay + 3; -- 15 messages next hour
12+10 => minutes ← expressDelay + 2; -- 20 messages next hour
ENDCASE => minutes ← expressDelay+1; -- 30 messages/hour => 197 msgs in 8 hrs
IF ~(now.weekday IN[Monday..Friday]) THEN minutes ← expressDelay;
DallyAWhile[minutes]; }; -- (At least 1) Extra minute in main loop
DallyAWhile:
INTERNAL
PROC [minutes:
INT] = {
snooz: CONDITION ← [timeout: Process.SecondsToTicks[minutes*60]];
WAIT snooz; };
ConsiderSickHosts:
INTERNAL
PROC = {
host: ArpaQueue.QElem ← SickQueues.GetProcessableElement[];
IF ~HostQueues.IsEmpty[] THEN RETURN;
IF host #
NIL
THEN {
recipients: RecipBundle ← NARROW[host.Value[]];
gv: BOOL ← Rope.Equal[recipients.queue.Name[], gvHostName];
target: QUEUE ← IF gv THEN GVQueue ELSE HostQueues;
MoveAndLog[from: SickQueues, to: target, qElem: host];
IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; };
MoveAllSickHosts:
PUBLIC
ENTRY
PROC = {
ENABLE UNWIND => NULL;
Uninhibit: ArpaQueue.ElemProc = {
ArpaQueue.Uninhibit[qElem, SickQueues];
};
host: ArpaQueue.QElem;
SickQueues.Enumerate[Uninhibit];
host ← SickQueues.GetProcessableElement[];
WHILE host #
NIL
DO
MoveSickHostQueue[host];
host ← SickQueues.GetProcessableElement[];
ENDLOOP;
MoveSickHostQueue:
INTERNAL
PROC[host: ArpaQueue.QElem] = {
IF host #
NIL
THEN {
recipients: RecipBundle ← NARROW[host.Value[]];
gv: BOOL ← Rope.Equal[recipients.queue.Name[], gvHostName];
target: QUEUE ← IF gv THEN GVQueue ELSE HostQueues;
MoveAndLog[from: SickQueues, to: target, qElem: host];
IF gv
THEN
NOTIFY newGvMessage
ELSE
NOTIFY newArpaMessage; }; };
FindSickHost:
INTERNAL
PROC[host: Rope.
ROPE]
RETURNS[element: ArpaQueue.QElem←
NIL]= {
Find: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
IF Rope.Equal[msgs.queue.Name[], host,
FALSE]
THEN {
ArpaQueue.Uninhibit[qElem, SickQueues];
element ← qElem;
continue ← FALSE; }; };
IF host # NIL THEN SickQueues.Enumerate[Find];
MoveSickHost:
PUBLIC ENTRY
PROC[name: Rope.
ROPE←
NIL] = {
ENABLE UNWIND => NULL;
host: ArpaQueue.QElem ← FindSickHost[name];
IF host #
NIL
THEN {
target: QUEUE ← HostQueues;
MoveAndLog[from: SickQueues, to: target, qElem: host];
NOTIFY newArpaMessage; }; };
CountQueue:
PUBLIC
ENTRY
PROC[name: Rope.
ROPE←
NIL]
RETURNS[n:
INT𡤀]= {
ENABLE UNWIND => NULL;
n ← CountQueueInternal[name];
};
CountQueueInternal:
INTERNAL
PROC[name: Rope.
ROPE←
NIL]
RETURNS[n:
INT𡤀]= {
Count: ArpaQueue.ElemProc = {
n ← n+1};
IF name = NIL THEN RETURN[-1];
FindQueue[name].Enumerate[Count];
};
CountMessages:
PUBLIC
ENTRY
PROC[queueName: Rope.
ROPE←
NIL]
RETURNS[n:
INT𡤀]= {
ENABLE UNWIND => NULL;
n ← CountMessagesInternal[queueName];
};
CountMessagesInternal:
INTERNAL PROC[name: Rope.
ROPE←
NIL]
RETURNS[n:
INT𡤀]= {
Count: ArpaQueue.ElemProc = {
n ← n+1};
CountNested: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
msgs.queue.Enumerate[Count];
};
IF name = NIL THEN RETURN[-1];
SELECT
TRUE
FROM
Rope.Equal[name, HostQueues.Name[], FALSE] => HostQueues.Enumerate[CountNested];
Rope.Equal[name, ExpressQueue.Name[], FALSE] => ExpressQueue.Enumerate[CountNested];
Rope.Equal[name, SickQueues.Name[], FALSE] => SickQueues.Enumerate[CountNested];
Rope.Equal[name, GVQueue.Name[], FALSE] => GVQueue.Enumerate[CountNested];
ENDCASE => n ← CountQueueInternal[name];
};
ConsiderOldMessages:
INTERNAL
PROC = {
now: BasicTime.GMT ← BasicTime.Now[];
host: ArpaQueue.QElem ← SickQueues.GetProcessableElement[];
ConsiderExpiration:
INTERNAL ItemProc = {
value: REF ANY = qElem.Value[];
pkg: DeliveryPkg = NARROW[qElem.Value[]];
expiryDate: BasicTime.GMT ← ArpaSMTPDescr.GetExpiryDate[pkg.descr];
IF BasicTime.Period[from: now, to: expiryDate] < 0
THEN {
reason: ROPE ← "Unable to deliver msg to ";
FOR loosers:
LIST
OF
ROPE ← pkg.recipList, loosers.rest
UNTIL loosers =
NIL
DO
reason ← Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first];
ENDLOOP;
reason ← Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."];
MoveAndLog[from: queue, to: tempQueue, qElem: qElem];
SendItBack[queue: queue, qElem: qElem];
RETURN[FALSE]; }; };
IF ~GVQueue.IsEmpty[] THEN RETURN;
IF ~HostQueues.IsEmpty[] THEN RETURN;
EnumerateSickPkgs[ConsiderExpiration]; };
SendItBack:
INTERNAL
PROC [queue:
QUEUE, qElem: ArpaQueue.QElem] = {
pkg: DeliveryPkg = NARROW[qElem.Value[]];
reason: ROPE ← "Unable to deliver msg to ";
FOR loosers:
LIST
OF
ROPE ← pkg.recipList, loosers.rest
UNTIL loosers =
NIL
DO
reason ← Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first];
ENDLOOP;
reason ← Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."];
ArpaSMTPSupport.NotifySender[pkg.descr, reason];
MoveAndLog[from: tempQueue, to: queue, qElem: qElem];
InternalFinishedWithMsg[queue: queue, pkg: pkg]; };
Miscellaneous
UserError: PUBLIC ERROR [reason: ROPE] = CODE;
Failed: ERROR [why: ErrorCode, reason: ROPE] = CODE;
ErrorCode: TYPE = {lockNotObtained, notOnQueue};
DescrFromElemVal:
PROC [elemVal:
REF
ANY]
RETURNS [Descr] = {
SELECT ValTypeOfElem[elemVal]
FROM
ItemOnlyT => RETURN[NARROW[elemVal, ItemOnly]^];
DeliveryPkgT => RETURN[NARROW[elemVal, DeliveryPkg].descr];
ENDCASE => ERROR; };
ItemProc: TYPE = PROC [queue: QUEUE, qElem: ArpaQueue.QElem, procData: REF ANY ← NIL] RETURNS [continue: BOOL ← TRUE];
EnumerateSickPkgs:
INTERNAL
PROC [proc: ItemProc, data:
REF
ANY ←
NIL] = {
continueOverall: BOOL;
queue: QUEUE;
ScanQueue: ArpaQueue.ElemProc = {
continueOverall ← proc[queue, qElem, data];
RETURN[continueOverall]; };
ScanHostQueue: ArpaQueue.ElemProc = {
queue ← NARROW[qElem.Value[], RecipBundle].queue;
queue.Enumerate[ScanQueue];
RETURN[continueOverall]; };
SickQueues.Enumerate[ScanHostQueue]; };
EnumerateItems:
INTERNAL
PROC [proc: ItemProc, data:
REF
ANY ←
NIL] = {
continueOverall: BOOL;
queue: QUEUE;
ScanQueue: ArpaQueue.ElemProc = {
continueOverall ← proc[queue, qElem, data];
RETURN[continueOverall]; };
ScanHostQueue: ArpaQueue.ElemProc = {
queue ← NARROW[qElem.Value[], RecipBundle].queue;
queue.Enumerate[ScanQueue];
RETURN[continueOverall]; };
IF ~continueOverall THEN RETURN;
GVQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
ExpressQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
HostQueues.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
SickQueues.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
queue ← tempQueue;
tempQueue.Enumerate[ScanQueue, data];
queue ← BadItemQueue;
BadItemQueue.Enumerate[ScanQueue, data]; };
QueueProc: TYPE = PROC [queue: QUEUE, procData: REF ANY ← NIL] RETURNS [continue: BOOL ← TRUE];
EnumerateQueues:
INTERNAL
PROC [proc: QueueProc, data:
REF
ANY ←
NIL] = {
continueOverall: BOOL ← TRUE;
ScanHostQueue: ArpaQueue.ElemProc = {
queue: QUEUE ← NARROW[qElem.Value[], RecipBundle].queue;
continueOverall ← proc[queue, data];
RETURN[continueOverall]; };
GVQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
ExpressQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
ArpaExpressQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
HostQueues.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
SickQueues.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
IF ~proc[tempQueue, data] THEN RETURN;
IF ~proc[BadItemQueue, data] THEN RETURN; };
GetProcessableElement:
ENTRY
PROC [queue:
QUEUE]
RETURNS [qElem: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
qElem ← queue.GetProcessableElement[];
IF qElem = NIL THEN RETURN;
queue.Dequeue[qElem.Value[]];
queue.Enqueue[qElem]; };
MoveHostToEnd:
INTERNAL
PROC [queue:
QUEUE, qElem: ArpaQueue.QElem] = {
queue.Dequeue[qElem.Value[]];
queue.Enqueue[qElem]; };
MoveToEmpty:
ENTRY
PROC [from:
QUEUE, host: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
MoveAndLog[from: from, to: EmptyQueues, qElem: host]; };
UnlockedMoveToEmpty:
INTERNAL
PROC [from:
QUEUE, host: ArpaQueue.QElem] = {
MoveAndLog[from: from, to: EmptyQueues, qElem: host]; };
MoveToSick:
ENTRY
PROC [from:
QUEUE, host: ArpaQueue.QElem, reason:
ROPE] = {
ENABLE UNWIND => NULL;
host.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason];
MoveAndLog[from: from, to: SickQueues, qElem: host]; };
MoveToBad:
ENTRY
PROC [queue:
QUEUE, msg: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
DequeueAndLog[queue: queue, value: msg.Value[]];
EnqueueAndLog[queue: BadItemQueue, qElem: msg]; };
FlushThisHost:
ENTRY
PROC [queue:
QUEUE, value:
REF
ANY] = {
ENABLE UNWIND => NULL;
DequeueAndLog[queue: queue, value: value]; };
FinishedWithMsg:
ENTRY
PROC [queue:
QUEUE, pkg: DeliveryPkg] = {
ENABLE UNWIND => NULL;
InternalFinishedWithMsg[queue, pkg]; };
InternalFinishedWithMsg:
INTERNAL
PROC [queue:
QUEUE, pkg: DeliveryPkg] = {
pkg.descr.RemoveRecipientHost[queue.Name[]];
DequeueAndLog[queue: queue, value: pkg];
ConsiderItemDestruction[descr: pkg.descr]; };
LockedMoveAndLog:
ENTRY
PROC [from, to:
QUEUE, qElem: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
MoveAndLog[from: from, to: to, qElem: qElem]; };
MoveAndLog:
INTERNAL
PROC [from, to:
QUEUE, qElem: ArpaQueue.QElem] = {
from.Dequeue[qElem.Value[]];
to.Enqueue[qElem];
ArpaSMTPSupport.Log[verbose,
Unparse[qElem.Value[]], " moved from ", from.Name[], " to ", to.Name[], "."]; };
EnqueueAndLog:
INTERNAL
PROC [queue:
QUEUE, qElem: ArpaQueue.QElem] = {
Will happily queue wrong value type on a queue.
queue.Enqueue[qElem];
ArpaSMTPSupport.Log[verbose,
Unparse[qElem.Value[]], " added to ", queue.Name[], "."]; };
DequeueAndLog:
INTERNAL
PROC [queue:
QUEUE, value:
REF
ANY] = {
val: ROPE = Unparse[value];
queue.Dequeue[value];
ArpaSMTPSupport.Log[verbose, val, " removed from ", queue.Name[], "."]; };
ValTypeOfQueue:
PROC [queue:
QUEUE]
RETURNS [ElemValType] = {
RETURN[
SELECT queue
FROM
tempQueue, BadItemQueue => DeliveryPkgT,
EmptyQueues, GVQueue, ExpressQueue, HostQueues, SickQueues => RecipBundleT,
ENDCASE => DeliveryPkgT ]; };
ValTypeOfElem:
PROC [value:
REF
ANY]
RETURNS [ElemValType] = {
SELECT
TRUE
FROM
ISTYPE[value, DeliveryPkg] => RETURN[DeliveryPkgT];
ISTYPE[value, RecipBundle] => RETURN[RecipBundleT];
ENDCASE => ERROR; };
ElemValType: TYPE = {ItemOnlyT, DeliveryPkgT, RecipBundleT};
ValTypeNames:
ARRAY ElemValType
OF
ROPE = [
"Somebody is confused",
Rope.Cat[
"DeliveryPkg (from ", ", an Host queue, or ", BadItemQueue.Name[], ")"],
Rope.Cat["Host queue (from Empty, GV, Ex, ARPA, or Sick)"] ];
ConsiderItemDestruction:
INTERNAL
PROC [descr: Descr] = {
Check all item queues (excluding InSystemQueue) to determine if descr is on any of them. If so, the item should be retained, else destroyed unless there are further recipients in the descriptor (in which case there is some error so log an ATTENTION msg and place on BadItemQueue). Also log an error if the descr appears on some queue but has no further recipients.
The alternative to this (and the way I first implemented it) is to check only if there are no further recipients in the recipient list field of the Descr, and assume that if there aren't then the item isn't on any pkg queue. If all of the code was right, and users could never confuse the queue system, this would be the better choice because all of the queues wouldn't have to be searched. However, we don't want pkgs on queues pointing to a destroyed item, and this is a server so we must try to act intelligently in the face of corruptions...
queues: LIST OF QUEUE ← NIL;
itemPresent: BOOL ← FALSE;
onQueue: QUEUE;
CollectQueues: QueueProc = {queues ← CONS[queue, queues]; };
EnumerateQueues[CollectQueues];
[itemPresent, onQueue] ← ItemIsOn[descr.GetUserHandle[], queues];
IF itemPresent
THEN {
IF ~descr.MoreRecipients[]
AND onQueue # BadItemQueue
THEN
-- queue corruption
ArpaSMTPSupport.Log[important,
"There are no more recipients in the item descriptor for item",
descr.Unparse[],
", but there is a reference to it on ", onQueue.Name[],
".\nThis should not have occurred; the demons should discard it."]; }
ELSE {
-- item not present on any queues
IF descr.MoreRecipients[]
THEN {
qElem: ArpaQueue.QElem = ArpaQueue.NewElem[NEW[DeliveryPkgRep ←
[descr: descr, recipList: NIL]]];
EnqueueAndLog[queue: BadItemQueue, qElem: qElem];
ArpaSMTPSupport.Log[ATTENTION,
"There are more recipients in the item descriptor for item ",
descr.Unparse[],
", though there is no reference to it on any queue.\n",
"This should not have occurred. Have placed a deliveryPkg on ",
BadItemQueue.Name[], "."]; }
ELSE { descr.Destroy[]; }; }; };
ItemIsOn:
INTERNAL
PROC [handle:
INT, queueList:
LIST
OF
QUEUE]
RETURNS [itemPresent:
BOOL, onQueue:
QUEUE] = {
itemPresent ← FALSE;
FOR qList:
LIST
OF
QUEUE ← queueList, qList.rest
UNTIL qList =
NIL
DO
onQueue ← qList.first;
IF RetrieveElemVal[onQueue, handle,
FALSE] #
NIL
THEN {
itemPresent ← TRUE;
EXIT; };
ENDLOOP; };
RetrieveElemVal:
PUBLIC
PROC [queue:
QUEUE, handle:
INT, errorIfNotFound:
BOOL ←
TRUE]
RETURNS [value:
REF
ANY ←
NIL] = {
Find the value of the element on queue with the given user handle. Value is NIL if handle not found on queue.
SELECT ValTypeOfQueue[queue]
FROM
DeliveryPkgT => {
Find: ArpaQueue.ElemProc = {
IF handle = ArpaSMTPDescr.GetUserHandle[NARROW[qElem.Value[], DeliveryPkg].descr] THEN {value ← qElem.Value[]; continue ← FALSE}; };
queue.Enumerate[Find]; };
RecipBundleT =>
ERROR UserError[IO.PutFR["You have asked the queue module to retrieve item #%g from HostQueues, which doesn't directly contain items (possibly a code bug). Use HostQueue.", IO.int[handle]]];
ENDCASE => ERROR;
IF value =
NIL
AND errorIfNotFound
THEN {
reason:
ROPE =
IO.PutFR[
"Msg #%g not found on %g.", IO.int[handle], IO.rope[queue.Name[]]];
ERROR UserError[reason]; };
}; -- end RetrieveElemVal
END.