ArpaSMTPQueueImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
DCraft, December 21, 1983 9:02 pm
Taft, February 4, 1984 4:30:19 pm PST
Hal Murray May 29, 1985 2:17:17 am PDT
John Larson, February 29, 1988 0:22:08 am PST
DIRECTORY
ArpaQueue USING [Create, Dequeue, ElemProc, Enqueue, Enumerate, GetHead, GetProcessableElement, Inhibit, Inhibition, IsEmpty, MQ, Name, NewElem, QElem, Uninhibit, Value],
ArpaSMTPControl USING [defaultInhibitTime],
ArpaSMTPDescr USING [AddProcessedRecipient, Descr, DescrRep, Destroy, EnumerateRawRecipients, EnumerateRecipientHosts, GetArpaReversePath, GetExpiryDate, GetFormat, GetGvSender, GetPrecedeMsgText, GetUserHandle, HostProc, MoreRecipients, Print, RawRecipProc, RemoveRawRecipients, RemoveRecipientHost, RetrieveMsgStream, SetArpaReversePath, SetGvSender, SetPrecedeMsgText, StoreItemInfo, Unparse, WrongState],
ArpaSMTPGVSend USING [totalGvMsgsSent, totalGvBytesSent, Connection, Failed, Open, SendItem],
ArpaSMTPQueue USING [],
ArpaSMTPSend USING [totalArpaMsgsSent, totalArpaBytesSent, Close, Connection, Failed, Open, SendItem],
ArpaSMTPSupport USING [AuthorizationCheck, CheckHeader, Log, NotifySender],
ArpaSMTPSyntax USING [EnumerateGVItems, GVItemProc, HostAndUser, ReceiveRName, ReversePath, UnBlessReturnPath],
BasicTime USING [GMT, Now, nullGMT, Period, Unpack, Unpacked, Update],
Convert USING [RopeFromInt, RopeFromTime],
GVBasics USING [oldestTime, RopeFromTimestamp, Timestamp],
GVNames USING [GetMembers, MemberInfo],
GVProtocol USING [ReceiveTimestamp],
IO USING [Close, int, Put, PutChar, PutF, PutFR, PutRope, rope, STREAM, time],
Process USING [Detach, SecondsToTicks],
Real USING [InlineRound],
Rope USING [Cat, Equal, Find, ROPE];
ArpaSMTPQueueImpl: CEDAR MONITOR
IMPORTS ArpaQueue, ArpaSMTPControl, ArpaSMTPDescr, ArpaSMTPGVSend, ArpaSMTPSend, ArpaSMTPSupport, ArpaSMTPSyntax, BasicTime, Convert, GVBasics, GVNames, GVProtocol, IO, Real, Rope, Process
EXPORTS ArpaSMTPQueue =
BEGIN
Descr: TYPE = ArpaSMTPDescr.Descr;
ROPE: TYPE = Rope.ROPE;
STREAM: TYPE = IO.STREAM;
QUEUE: TYPE = ArpaQueue.MQ;
Queue elements (which must be REFs) are DeliveryPkgs or RecipBundles.
ItemOnly: TYPE = REF Descr; -- an item for delivery to one or more recipients
DeliveryPkg: TYPE = REF DeliveryPkgRep; -- an item ready for delivery to a clump of recipients at the same host
DeliveryPkgRep: TYPE = RECORD[descr: Descr, recipList: LIST OF ROPE];
RecipBundle: TYPE = REF RecipBundleRep; -- a queue of delivery pkgs for a single host
RecipBundleRep: TYPE = RECORD [
queue: QUEUE,
sent, recv: INT ← 0,
lastUp: BasicTime.GMT];
The Queues (Names are in ValTypeNames too)
Note: The names of all sub-queues must be unique
EmptyQueues: QUEUE ← ArpaQueue.Create["Empty"]; -- queue of RecipBundles
GVQueue: QUEUE ← ArpaQueue.Create["GV"]; -- queue of RecipBundles
ExpressQueue: QUEUE ← ArpaQueue.Create["Ex"]; -- queue of RecipBundles
ArpaExpressQueue: QUEUE ← ArpaQueue.Create["ArpaEx"]; -- queue of RecipBundles
HostQueues: QUEUE ← ArpaQueue.Create["ARPA"]; -- queue of RecipBundles
SickQueues: QUEUE ← ArpaQueue.Create["Sick"]; -- queue of RecipBundles
BadItemQueue: QUEUE ← ArpaQueue.Create["Bad"]; -- queue of DeliveryPkgs
tempQueue: QUEUE ← ArpaQueue.Create["Temp"]; -- queue of DeliveryPkgs
Special Host Names and such
gvHostName: ROPE ← "Grapevine"; -- Wired into format of msg saved on disk
expressHostName: ROPE ← "Express";
expressMail: LIST OF ROPE;
arpaExpressHostName: ROPE ← "ArpaExpress";
arpaExpressMail: LIST OF ROPE;
notExpressMail: LIST OF ROPE;
¬*¢®®¡‡g: PUBLIC LIST OF ROPE;
Policy controls
expressOK: BOOLTRUE;
debug: BOOLFALSE;
ExpressList: PUBLIC PROC RETURNS[list: LIST OF ROPE] = {RETURN[expressMail]};
SetExpressOK: PUBLIC PROC [ok: BOOL] = {expressOK ← ok};
Express: PROC [names: LIST OF ROPE] RETURNS [name: ROPE] = {
UNTIL names = NIL DO
IF NotExpress[names.first] THEN RETURN[NIL];
FOR list: LIST OF ROPE ← expressMail, list.rest UNTIL list = NIL DO
IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first];
ENDLOOP;
names ← names.rest;
ENDLOOP;
RETURN[NIL]; };
NotExpress: PROC [name: ROPE] RETURNS [BOOLEAN] = {
FOR list: LIST OF ROPE ← notExpressMail, list.rest UNTIL list = NIL DO
IF Rope.Equal[name, list.first, FALSE] THEN RETURN[TRUE];
ENDLOOP;
RETURN[FALSE]; };
ArpaExpress: PROC [names: LIST OF ROPE] RETURNS [name: ROPE] = {
UNTIL names = NIL DO
FOR list: LIST OF ROPE ← arpaExpressMail, list.rest UNTIL list = NIL DO
IF Rope.Equal[names.first, list.first, FALSE] THEN RETURN[list.first];
ENDLOOP;
names ← names.rest;
ENDLOOP;
RETURN[NIL]; };
DLWithoutUpArrow: PUBLIC PROC [dl: ROPE] RETURNS [valid: BOOL] = {
FOR list: LIST OF ROPE ← noUpArrowDLs, list.rest UNTIL list = NIL DO
IF Rope.Equal[dl, list.first, FALSE] THEN RETURN[TRUE];
ENDLOOP;
RETURN[FALSE];
};
GetQueueForNewMessage: INTERNAL PROC [home: QUEUE, name: ROPE] RETURNS [queue: QUEUE] = {
Find: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
IF Rope.Equal[msgs.queue.Name[], name, FALSE] THEN {
old ← qElem;
queue ← msgs.queue;
continue ← FALSE; }; };
old: ArpaQueue.QElem;
new: RecipBundle;
GVQueue.Enumerate[Find];
IF queue # NIL THEN RETURN;
ExpressQueue.Enumerate[Find];
IF queue # NIL THEN RETURN;
ArpaExpressQueue.Enumerate[Find];
IF queue # NIL THEN RETURN;
HostQueues.Enumerate[Find];
IF queue # NIL THEN RETURN;
SickQueues.Enumerate[Find];
IF queue # NIL THEN RETURN;
Not currently in use yet. Try for an old one.
EmptyQueues.Enumerate[Find];
IF queue # NIL THEN {
MoveAndLog[from: EmptyQueues, to: home, qElem: old];
RETURN; };
Doesn't exist yet. Make it.
new ← NEW[RecipBundleRep ← [
queue: ArpaQueue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]];
EnqueueAndLog[queue: home, qElem: ArpaQueue.NewElem[new]];
RETURN[new.queue]; };
PrintItem: PUBLIC ENTRY PROC [item: INT, out: STREAM] = {
ENABLE UNWIND => NULL;
descr: Descr;
qElem: ArpaQueue.QElem;
[qElem, descr] ← DescrFromHandle[item];
IF descr = NIL THEN {out.PutRope["No item with that handle.\n"]; RETURN};
descr.Print[out: out, form: short];
PrintInibition[qElem, out, NIL]; };
DescrFromHandle: INTERNAL PROC [handle: INT] RETURNS [qe: ArpaQueue.QElem, descr: Descr] = {
found: BOOLFALSE;
Check: ItemProc = {
qe ← qElem;
descr ← DescrFromElemVal[qElem.Value[]];
IF handle = descr.GetUserHandle[] THEN {found ← TRUE; continue ← FALSE}; };
EnumerateItems[Check];
IF ~found THEN descr ← NIL; };
PrintQueue: PUBLIC ENTRY PROC [name: ROPE, out: STREAM] = {
ENABLE UNWIND => NULL;
PrintQueueInner[name, out]; };
PrintQueueInner: INTERNAL PROC [name: ROPE, out: STREAM] = {
queue: QUEUE;
ScanForInfo: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
IF queue # msgs.queue THEN RETURN [TRUE];
out.PutF[
"\tsent: %G, recv: %G, last up: ", IO.int[msgs.sent], IO.int[msgs.recv]];
IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"]
ELSE out.Put[IO.time[msgs.lastUp]];
out.PutRope["\n"];
PrintInibition[qElem, out, "\t"];
RETURN[FALSE]; };
PrintVal: ArpaQueue.ElemProc = {
value: REF ANY = qElem.Value[];
SELECT ValTypeOfElem[value] FROM
RecipBundleT => {
msgs: RecipBundle = NARROW[value];
out.PutF[
"\t%G:\n\t\tsent: %G, recv: %G, last up: ",
IO.rope[msgs.queue.Name[]], IO.int[msgs.sent], IO.int[msgs.recv]];
IF msgs.lastUp = BasicTime.nullGMT THEN out.PutRope["never"]
ELSE out.Put[IO.time[msgs.lastUp]];
out.PutRope["\n"];
PrintInibition[qElem, out, "\t\t"];
msgs.queue.Enumerate[PrintVal]; };
ENDCASE => {
out.PutRope["\t\t"];
PrintElemVal[value, out];
out.PutChar['\n]; }; };
IF name = NIL THEN {
PrintQueueInner[EmptyQueues.Name[], out];
PrintQueueInner[GVQueue.Name[], out];
PrintQueueInner[ExpressQueue.Name[], out];
PrintQueueInner[ArpaExpressQueue.Name[], out];
PrintQueueInner[HostQueues.Name[], out];
PrintQueueInner[SickQueues.Name[], out];
RETURN; };
queue ← FindQueue[name];
IF queue = NIL THEN {
out.PutRope["Unknown queue: "]; out.PutRope[name]; out.PutRope[".\n"]; RETURN; };
out.PutRope[queue.Name[]];
out.PutRope[":\n"];
EmptyQueues.Enumerate[ScanForInfo];
GVQueue.Enumerate[ScanForInfo];
ExpressQueue.Enumerate[ScanForInfo];
ArpaExpressQueue.Enumerate[ScanForInfo];
HostQueues.Enumerate[ScanForInfo];
SickQueues.Enumerate[ScanForInfo];
queue.Enumerate[PrintVal]; };
FindQueue: INTERNAL PROC [name: ROPE] RETURNS [queue: QUEUENIL] = {
bundle: RecipBundle;
SELECT TRUE FROM
Rope.Equal[name, EmptyQueues.Name[], FALSE] => RETURN[EmptyQueues];
Rope.Equal[name, GVQueue.Name[], FALSE] => RETURN[GVQueue];
Rope.Equal[name, ExpressQueue.Name[], FALSE] => RETURN[ExpressQueue];
Rope.Equal[name, ArpaExpressQueue.Name[], FALSE] => RETURN[ArpaExpressQueue];
Rope.Equal[name, HostQueues.Name[], FALSE] => RETURN[HostQueues];
Rope.Equal[name, SickQueues.Name[], FALSE] => RETURN[SickQueues];
Rope.Equal[name, BadItemQueue.Name[], FALSE] => RETURN[BadItemQueue];
Rope.Equal[name, tempQueue.Name[], FALSE] => RETURN[tempQueue];
ENDCASE;
bundle ← GetBundle[name, FALSE];
IF bundle = NIL THEN RETURN[NIL];
RETURN[bundle.queue]; };
GetBundle: INTERNAL PROC [name: ROPE, create: BOOL] RETURNS [bundle: RecipBundle ← NIL] = {
Find: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
IF Rope.Equal[msgs.queue.Name[], name, FALSE] THEN {
bundle ← msgs;
continue ← FALSE; }; };
EmptyQueues.Enumerate[Find];
IF bundle # NIL THEN RETURN;
GVQueue.Enumerate[Find];
IF bundle # NIL THEN RETURN;
ExpressQueue.Enumerate[Find];
IF bundle # NIL THEN RETURN;
ArpaExpressQueue.Enumerate[Find];
IF bundle # NIL THEN RETURN;
HostQueues.Enumerate[Find];
IF bundle # NIL THEN RETURN;
SickQueues.Enumerate[Find];
IF bundle # NIL THEN RETURN;
IF ~create THEN RETURN;
bundle ← NEW[RecipBundleRep ← [
queue: ArpaQueue.Create[name], sent: 0, recv: 0, lastUp: BasicTime.nullGMT]];
EnqueueAndLog[queue: EmptyQueues, qElem: ArpaQueue.NewElem[bundle]]; };
PrintElemVal: PROC [value: REF ANY, out: STREAM] = {
SELECT ValTypeOfElem[value] FROM
DeliveryPkgT, RecipBundleT => out.PutRope[Unparse[value]];
ENDCASE =>
ERROR UserError["Somehow you have tried to print something that isn't a queue."]; };
PrintInibition: PROC [qElem: ArpaQueue.QElem, out: STREAM, prefix: ROPE] = {
for: INT;
why: ROPE;
[for, why] ← qElem.Inhibition[];
IF for > 0 THEN out.PutF[
"%GInhibited for %G:%02G because: %G.\n",
IO.rope[prefix], IO.int[for/60], IO.int[for MOD 60], IO.rope[why]]; };
Unparse: PROC [value: REF ANY] RETURNS [ROPE] = {
SELECT TRUE FROM
ISTYPE[value, ItemOnly] => RETURN[NARROW[value, ItemOnly]^.Unparse[]];
ISTYPE[value, DeliveryPkg] => {
pkg: DeliveryPkg = NARROW[value];
r: ROPE ← Rope.Cat[pkg.descr.Unparse[], " for "];
FOR l: LIST OF ROPE ← pkg.recipList, l.rest UNTIL l = NIL DO
r ← Rope.Cat[r, IF l = pkg.recipList THEN NIL ELSE ", ", l.first];
ENDLOOP;
RETURN[r]; };
ISTYPE[value, RecipBundle] =>
RETURN[NARROW[value, RecipBundle].queue.Name[]];
ENDCASE => RETURN["(Unparse ENDCASE)"]; };
StartServer: PUBLIC PROC = TRUSTED {
CheckLists[];
Process.Detach[FORK SendGVMessages[]];
Process.Detach[FORK SendArpaMessages[]];
Process.Detach[FORK Background[]]; };
newMessageArrived: CONDITION ← [timeout: Process.SecondsToTicks[30]];
StartNewMessage: PUBLIC ENTRY PROC [source: ROPE] = {}; -- Hack to hang on ML before reading message from the net. Otherwise we let duplicate messages through because we don't "accept" them after they get onto the disk.
AddNewMessage: PUBLIC ENTRY PROC [descr: Descr, source: ROPE] = {
ENABLE UNWIND => NULL;
host: RecipBundle ← GetBundle[source, TRUE];
qElem: ArpaQueue.QElem ← ArpaQueue.NewElem[NEW[Descr ← descr]];
value: REF ANY ← qElem.Value[];
CheckLists[];
SELECT descr.GetFormat[] FROM
gv => {
from: ROPE ← descr.GetGvSender[];
Fish Sender out of body of msg, but not if ReturnToSender has already set one up.
Beware of loops if you can't return a message to the original sender.
IF from = NIL THEN {
arpaReversePath, gvSender: ROPE;
gvID: ROPE;
temp: IO.STREAM = descr.RetrieveMsgStream[];
RetrieveReturnTo: ArpaSMTPSyntax.GVItemProc = {
IF itemHeader.type = PostMark THEN {
timestamp: GVBasics.Timestamp ← GVProtocol.ReceiveTimestamp[itemStream];
gvID ← GVBasics.RopeFromTimestamp[timestamp]; };
IF itemHeader.type = ReturnTo THEN {
gvSender ← ArpaSMTPSyntax.ReceiveRName[itemStream];
from ← gvSender;
continue ← FALSE; }; };
ArpaSMTPSyntax.EnumerateGVItems[temp, RetrieveReturnTo];
temp.Close[];
ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is ", gvID, "."];
ArpaSMTPDescr.SetGvSender[descr, gvSender];
arpaReversePath ← ArpaSMTPSyntax.ReversePath[gvSender];
descr.SetArpaReversePath[arpaReversePath]; };
ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is from ", from, "."];
IF ~ArpaSMTPSupport.AuthorizationCheck[from] OR ~ArpaSMTPSupport.CheckHeader[from, descr] THEN {
BEGIN ENABLE ArpaSMTPDescr.WrongState => CONTINUE;
ArpaSMTPDescr.RemoveRawRecipients[descr];
END;
ConsiderItemDestruction[descr];
RETURN; }; };
arpa => {
arpaReversePath: ROPE;
arpaReversePath ← ArpaSMTPDescr.GetArpaReversePath[descr];
arpaReversePath ← ArpaSMTPSyntax.UnBlessReturnPath[arpaReversePath];
IF arpaReversePath = NIL THEN {
arpaReversePath ← ArpaSMTPDescr.GetGvSender[descr];
arpaReversePath ← ArpaSMTPSyntax.UnBlessReturnPath[arpaReversePath];
arpaReversePath ← ArpaSMTPSyntax.ReversePath[arpaReversePath]; };
descr.SetArpaReversePath[arpaReversePath];
ArpaSMTPSupport.Log[verbose, descr.Unparse[], " is from ", arpaReversePath, "."]; };
ENDCASE => ERROR;
BEGIN ENABLE ArpaSMTPDescr.WrongState => CONTINUE; --already processed
redistributed: ROPE;
ProcessRawRecipient: ArpaSMTPDescr.RawRecipProc = {
descr: Descr = NARROW[procData, ItemOnly]^;
hostName, userName: ROPE;
[hostName, userName] ← ArpaSMTPSyntax.HostAndUser[rawRecipient];
IF hostName = NIL THEN {
hostName ← gvHostName;
IF Rope.Find[userName, "^."] # -1 OR DLWithoutUpArrow[userName] THEN {
IF redistributed = NIL THEN redistributed ← "Redistributed: "
ELSE redistributed ← Rope.Cat[redistributed, ", "];
-- Show old name if used
IF Rope.Find[rawRecipient, "Xerox.ARPA", 0, FALSE] # -1 THEN
redistributed ← Rope.Cat[redistributed, rawRecipient]
ELSE
redistributed ← Rope.Cat[redistributed, userName];}; };
descr.AddProcessedRecipient[hostName, userName, TRUE]; };
descr.EnumerateRawRecipients[ProcessRawRecipient, value];
IF redistributed # NIL THEN
descr.SetPrecedeMsgText[Rope.Cat[redistributed, "\n", descr.GetPrecedeMsgText[]]];
END;
descr.StoreItemInfo[]; -- update info on disk
descr.EnumerateRecipientHosts[QueueForDelivery, value];
ConsiderItemDestruction[descr]; -- Empty, Crashed after sending to last host
host.recv ← host.recv + 1;
host.lastUp ← BasicTime.Now[]; };
QueueForDelivery: INTERNAL ArpaSMTPDescr.HostProc = {
descr: Descr = NARROW[procData, ItemOnly]^;
mainQueue, hostQueue: QUEUE;
queueName: ROPE;
gv: BOOL = Rope.Equal[hostName, gvHostName];
SELECT TRUE FROM
~gv => {
queueName ← hostName;
mainQueue ← HostQueues;
NOTIFY newArpaMessage; };
(queueName ← Express[userNames]) # NIL => {
mainQueue ← ExpressQueue};
(queueName ← ArpaExpress[userNames]) # NIL => {
mainQueue ← ArpaExpressQueue};
ENDCASE => {
queueName ← gvHostName;
mainQueue ← GVQueue;
NOTIFY newGvMessage; };
hostQueue ← GetQueueForNewMessage[mainQueue, queueName];
EnqueueAndLog[queue: hostQueue,
qElem: ArpaQueue.NewElem[NEW[DeliveryPkgRep ← [descr, userNames]]]]; };
newArpaMessage: CONDITION ← [timeout: Process.SecondsToTicks[30]];
GetProcessableHost: ENTRY PROC RETURNS [host: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
DO
host ← HostQueues.GetProcessableElement[];
IF host # NIL THEN { MoveHostToEnd[HostQueues, host]; RETURN; };
WAIT newArpaMessage;
ENDLOOP; };
SendArpaMessages: PROC = {
DO
host: ArpaQueue.QElem ← GetProcessableHost[];
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
IF ProcessArpaHost[host] THEN LOOP;
IF host.Inhibition[].for > 0 THEN LOOP;
IF hostQueue.IsEmpty[] THEN LOOP; -- Bogus host name
MoveToSick[HostQueues, host, "Didn't make any progress"];
ENDLOOP; };
ProcessArpaHost: PROC [host: ArpaQueue.QElem] RETURNS [progress: BOOL] = {
hostStream: ArpaSMTPSend.Connection;
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
startTime: BasicTime.GMT ← BasicTime.Now[];
BEGIN -- so hostStream and hostQueue will be in scope for EXITS
msg: ArpaQueue.QElem;
progress ← FALSE;
IF hostQueue.IsEmpty[] THEN GOTO FlushHostQueue;
hostStream ← ArpaSMTPSend.Open[hostQueue.Name[] ! ArpaSMTPSend.Failed => {
SELECT withItem FROM
irrelevant, retryLater => {
IF problemWithHost THEN
MoveToSick[HostQueues, host, reason];
GOTO Done; };
returnToSender => { -- there must be a problem with this Host (no item was given)
on Open probably means the host name is unknown; all items should be returned
Reject the whole list...
UNTIL hostQueue.IsEmpty[] DO
qElem: ArpaQueue.QElem = hostQueue.GetHead[];
pkg: DeliveryPkg = NARROW[qElem.Value[]];
descr: Descr = pkg.descr;
IF debug THEN SIGNAL LookAtThis;
ArpaSMTPSupport.NotifySender[descr, reason];
FinishedWithMsg[queue: hostQueue, pkg: pkg];
ENDLOOP;
IF recipients.recv # 0 THEN GOTO FlushHostQueue; -- Foo@Concord.ms (yetch)
FlushThisHost[queue: HostQueues, value: host.Value[]];
GOTO Done; };
ENDCASE => ERROR; }]; -- putOnBadQueue makes no sense for Open
UNTIL (msg ← GetProcessableElement[hostQueue]) = NIL DO
IF BasicTime.Period[from: startTime, to: BasicTime.Now[]] > 5*60 THEN {
ArpaSMTPSupport.Log[important,
"Already spent more than 5 minutes sending mail to " , Unparse[host.Value[]], "."];
EXIT; };
IF ~SendArpaMsg[hostStream, host, msg] THEN { hostStream.Close[TRUE]; RETURN; };
recipients.sent ← recipients.sent + 1;
recipients.lastUp ← BasicTime.Now[];
progress ← TRUE;
ENDLOOP;
GOTO FlushHostQueue;
EXITS
FlushHostQueue => {
IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: HostQueues, host: host]; };
Done => NULL;
END;
IF hostStream # NIL THEN hostStream.Close[FALSE]; };
LookAtThis: SIGNAL = CODE;
SendArpaMsg: PROC [stream: ArpaSMTPSend.Connection, host, msg: ArpaQueue.QElem] RETURNS [continue: BOOLTRUE] = {
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
pkg: DeliveryPkg = NARROW[msg.Value[]];
descr: Descr = pkg.descr;
BEGIN -- so the above vars will be in scope for EXITS
ArpaSMTPSend.SendItem[descr, pkg.recipList, stream ! ArpaSMTPSend.Failed => {
IF problemWithHost THEN {
MoveToSick[HostQueues, host, reason];
continue ← FALSE; };-- stop the enumeration of SMTP items (stream may be aborted)
SELECT withItem FROM
retryLater => {
IF ~problemWithHost THEN
msg.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason];
GOTO Done; };
returnToSender => {
IF debug THEN SIGNAL LookAtThis;
ArpaSMTPSupport.NotifySender[descr, reason];
GOTO RemoveItem; };
putOnBadQueue => {
MoveToBad[queue: hostQueue, msg: msg];
GOTO Done; };
ENDCASE => ERROR; -- shouldn't be irrelevant
}];
GOTO RemoveItem;
EXITS
RemoveItem => {
FinishedWithMsg[queue: hostQueue, pkg: pkg]; };
Done => NULL;
END; };
newGvMessage: CONDITION ← [timeout: Process.SecondsToTicks[30]];
GetGvClump: ENTRY PROC RETURNS [host: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
DO
host ← GVQueue.GetProcessableElement[];
IF host # NIL THEN RETURN;
WAIT newGvMessage;
ENDLOOP; };
SendGVMessages: PROC = {
DO
host: ArpaQueue.QElem ← GetGvClump[];
IF ProcessGVQueue[host] THEN LOOP;
IF host.Inhibition[].for > 0 THEN LOOP;
MoveToSick[GVQueue, host, "Didn't make any progress"];
ENDLOOP; };
gvHandle: ArpaSMTPGVSend.Connection ← ArpaSMTPGVSend.Open[]; -- Only need one
ProcessGVQueue: PROC [host: ArpaQueue.QElem] RETURNS [progress: BOOL] = {
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
msg: ArpaQueue.QElem;
progress ← FALSE;
UNTIL (msg ← GetProcessableElement[hostQueue]) = NIL DO
IF ~SendGVMsg[host, msg] THEN RETURN;
recipients.sent ← recipients.sent + 1;
recipients.lastUp ← BasicTime.Now[];
progress ← TRUE;
ENDLOOP;
IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: GVQueue, host: host]; };
SendGVMsg: PROC [host, msg: ArpaQueue.QElem] RETURNS [progress: BOOLTRUE] = {
recipients: RecipBundle ← NARROW[host.Value[]];
hostQueue: QUEUE ← recipients.queue;
pkg: DeliveryPkg = NARROW[msg.Value[]];
descr: Descr = pkg.descr;
BEGIN -- so the above vars will be in scope for EXITS
ArpaSMTPGVSend.SendItem[descr, pkg.recipList, gvHandle
! ArpaSMTPGVSend.Failed => {
IF problemWithGV THEN {
MoveToSick[GVQueue, host, reason];
progress ← FALSE; }; -- stop the enumeration of GV item
SELECT withItem FROM
retryLater => {
IF ~problemWithGV THEN
msg.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason];
GOTO Done; };
returnToSender => {
IF debug THEN SIGNAL LookAtThis;
ArpaSMTPSupport.NotifySender[descr, reason];
GOTO RemoveItem; };
putOnBadQueue => {
MoveToBad[queue: hostQueue, msg: msg];
GOTO Done; };
ENDCASE => ERROR; -- shouldn't be "irrelevant"
}];
GOTO RemoveItem;
EXITS
RemoveItem => {
FinishedWithMsg[queue: hostQueue, pkg: pkg]; };
Done => NULL;
END; };
Background: ENTRY PROC = {
ENABLE UNWIND => NULL;
now: BasicTime.GMT ← BasicTime.Now[];
snooz: CONDITION ← [timeout: Process.SecondsToTicks[1*60]];
DO
WAIT snooz;
CheckLists[];
MaybePrintStats[];
ConsiderExpressMail[];
ConsiderOldMessages[];
ConsiderSickHosts[];
ENDLOOP; };
Info for lists
timeExpressLastChecked, timeArpaExpressLastChecked, timeNotExpressLastChecked, f%C *¢®®¡‡ègfïC©ÐC: BasicTime.GMT ← BasicTime.Update[BasicTime.Now[], - checkInterval];
expressStamp, arpaExpressStamp, notExpressStamp, ¬*¢®®¡Hfè%¢: GVBasics.Timestamp ← GVBasics.oldestTime;
checkInterval: INT = 2*60*60; -- 2 hours
CheckLists: PROC = {
now: BasicTime.GMT ← BasicTime.Now[];
IF BasicTime.Period[from: timeExpressLastChecked, to: now] > checkInterval THEN GetExpressList[];
IF BasicTime.Period[from: timeArpaExpressLastChecked, to: now] > checkInterval THEN GetArpaExpressList[];
IF BasicTime.Period[from: timeNotExpressLastChecked, to: now] > checkInterval THEN GetNotExpressList[];
†!#èg©¥%C+ÜC®zé®%§#f%C *¢®®¡‡ègfïC©ÐC #f§#¬¡{#e#©C©Ð†¬fC®hè#¥î #‚Cf *¢®®¡‡gfz{®#
};
GetExpressList: PROC = TRUSTED {
memberInfo: GVNames.MemberInfo ← [noChange[]];
memberInfo ← GVNames.GetMembers["ExpressMail^.MS", expressStamp];
WITH m: memberInfo SELECT FROM
noChange => timeExpressLastChecked ← BasicTime.Now[];
group => {
timeExpressLastChecked ← BasicTime.Now[];
expressMail ← m.members;
expressStamp ← m.stamp;
ArpaSMTPSupport.Log[important, "Updated Express List."]; };
ENDCASE => NULL; };
GetArpaExpressList: PROC = TRUSTED {
memberInfo: GVNames.MemberInfo ← [noChange[]];
memberInfo← GVNames.GetMembers["ArpaExpressMail^.MS", arpaExpressStamp];
WITH m: memberInfo SELECT FROM
noChange => timeArpaExpressLastChecked ← BasicTime.Now[];
group => {
timeArpaExpressLastChecked ← BasicTime.Now[];
arpaExpressMail ← m.members;
arpaExpressStamp ← m.stamp;
ArpaSMTPSupport.Log[important, "Updated Arpa Express List."]; };
ENDCASE => NULL; };
GetNotExpressList: PROC = TRUSTED {
memberInfo: GVNames.MemberInfo ← [noChange[]];
memberInfo← GVNames.GetMembers["NotArpaExpressMail^.MS", notExpressStamp];
WITH m: memberInfo SELECT FROM
noChange => timeNotExpressLastChecked ← BasicTime.Now[];
group => {
timeNotExpressLastChecked ← BasicTime.Now[];
notExpressMail ← m.members;
notExpressStamp ← m.stamp;
ArpaSMTPSupport.Log[important, "Updated Not Express List."]; };
ENDCASE => NULL; };
‚Cf *¢®®¡‡gf: PROC = TRUSTED {
memberInfo: GVNames.MemberInfo ← [noChange[]];
memberInfo← GVNames.GetMembers[" *¢®®¡‡gË+[H", ¬*¢®®¡Hfè%¢];
WITH m: memberInfo SELECT FROM
noChange => f%C *¢®®¡‡ègfïC©ÐC ← BasicTime.Now[];
group => {
f%C *¢®®¡‡ègfïC©ÐC ← BasicTime.Now[];
¬*¢®®¡‡g ← m.members;
¬*¢®®¡Hfè%¢ ← m.stamp;
ArpaSMTPSupport.Log[important, "Updated #*¢#®®¡#‡g."]; };
ENDCASE => NULL; };
totalGvMsgsSentLastHour: INT ← 0;
totalGvBytesSentLastHour: INT ← 0;
totalArpaMsgsSentLastHour: INT ← 0;
totalArpaBytesSentLastHour: INT ← 0;
totalGvMsgsSentLastDay: INT ← 0;
totalGvBytesSentLastDay: INT ← 0;
totalArpaMsgsSentLastDay: INT ← 0;
totalArpaBytesSentLastDay: INT ← 0;
hourStatsLastChecked: BasicTime.GMT ← BasicTime.Now[];
dayStatsLastChecked: BasicTime.GMT ← BasicTime.Now[];
MaybePrintStats: INTERNAL PROC = {
now: BasicTime.GMT ← BasicTime.Now[];
hourSecs: INT = 3600;
daySecs: INT = 86400;
hourPeriod: INT ← BasicTime.Period[from: hourStatsLastChecked, to: now];
dayPeriod: INT ← BasicTime.Period[from: dayStatsLastChecked, to: now];
totalGvMsgsSentInHour: INT𡤀
totalArpaMsgsSentInHour: INT𡤀
totalGvMsgsSentInDay: INT𡤀
totalArpaMsgsSentInDay: INT𡤀
totalGvBytesSentInHour: INT𡤀
totalArpaBytesSentInHour: INT𡤀
totalGvBytesSentInDay: INT𡤀
totalArpaBytesSentInDay: INT𡤀
date: ROPE ← Convert.RopeFromTime[from: now, start: years, end: days, includeDayOfWeek: TRUE];
IF hourPeriod < 3600 THEN RETURN;
hourStatsLastChecked ← now;
ArpaSMTPSupport.Log[important, "Date: ", date, ", Hourly Statistics: "];
totalGvMsgsSentInHour ← Real.InlineRound[((ArpaSMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastHour)*1.0*hourSecs)/hourPeriod];
totalGvMsgsSentLastHour ← ArpaSMTPGVSend.totalGvMsgsSent;
totalGvBytesSentInHour ← Real.InlineRound[((ArpaSMTPGVSend.totalGvBytesSent - totalGvBytesSentLastHour)*1.0*hourSecs)/hourPeriod];
totalGvBytesSentLastHour ← ArpaSMTPGVSend.totalGvBytesSent;
ArpaSMTPSupport.Log[important, " Hourly GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalGvBytesSentInHour], " bytes."];
totalArpaMsgsSentInHour ← Real.InlineRound[((ArpaSMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastHour)*1.0*hourSecs)/hourPeriod];
totalArpaMsgsSentLastHour ← ArpaSMTPSend.totalArpaMsgsSent;
totalArpaBytesSentInHour ← Real.InlineRound[((ArpaSMTPSend.totalArpaBytesSent - totalArpaBytesSentLastHour)*1.0*hourSecs)/hourPeriod];
totalArpaBytesSentLastHour ← ArpaSMTPSend.totalArpaBytesSent;
ArpaSMTPSupport.Log[important, " Hourly Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInHour], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInHour], " bytes."];
ArpaSMTPSupport.Log[important, " Arpa queue hosts ", Convert.RopeFromInt[CountQueueInternal["Arpa"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Arpa"]]];
ArpaSMTPSupport.Log[important, " Sick queue hosts ", Convert.RopeFromInt[CountQueueInternal["Sick"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Sick"]]];
ArpaSMTPSupport.Log[important, " Express queue hosts ", Convert.RopeFromInt[CountQueueInternal["Ex"]], ", messages ", Convert.RopeFromInt[CountMessagesInternal["Ex"]]];
ArpaSMTPSupport.Log[important, " GV queue messages ", Convert.RopeFromInt[CountMessagesInternal["GV"]]];
IF dayPeriod < daySecs THEN RETURN;
dayStatsLastChecked ← now;
totalGvMsgsSentInDay ← Real.InlineRound[((ArpaSMTPGVSend.totalGvMsgsSent - totalGvMsgsSentLastDay)*1.0*daySecs)/dayPeriod];
totalGvMsgsSentLastDay ← ArpaSMTPGVSend.totalGvMsgsSent;
totalGvBytesSentInDay ← Real.InlineRound[((ArpaSMTPGVSend.totalGvBytesSent - totalGvBytesSentLastDay)*1.0*daySecs)/dayPeriod];
totalGvBytesSentLastDay ← ArpaSMTPGVSend.totalGvBytesSent;
ArpaSMTPSupport.Log[important, " Daily GV bound rate: ", Convert.RopeFromInt[totalGvMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalGvBytesSentInDay], " bytes."];
totalArpaMsgsSentInDay ← Real.InlineRound[((ArpaSMTPSend.totalArpaMsgsSent - totalArpaMsgsSentLastDay)*1.0*daySecs)/dayPeriod];
totalArpaMsgsSentLastDay ← ArpaSMTPSend.totalArpaMsgsSent;
totalArpaBytesSentInDay ← Real.InlineRound[((ArpaSMTPSend.totalArpaBytesSent - totalArpaBytesSentLastDay)*1.0*daySecs)/dayPeriod];
totalArpaBytesSentLastDay ← ArpaSMTPSend.totalArpaBytesSent;
ArpaSMTPSupport.Log[important, " Daily Arpa bound rate: ", Convert.RopeFromInt[totalArpaMsgsSentInDay], " messages, ", Convert.RopeFromInt[totalArpaBytesSentInDay], " bytes."];
ArpaSMTPSupport.Log[important, " Total Arpa bound since reboot: ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaBytesSent], " bytes."];
ArpaSMTPSupport.Log[important, " Total GV bound since reboot: ", Convert.RopeFromInt[ArpaSMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPGVSend.totalGvBytesSent], " bytes."];
ArpaSMTPSupport.Log[important, " Totals since reboot: ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaMsgsSent+ArpaSMTPGVSend.totalGvMsgsSent], " messages, ", Convert.RopeFromInt[ArpaSMTPSend.totalArpaBytesSent+ArpaSMTPGVSend.totalGvBytesSent], " bytes."];
};
expressDelay: INT ← 1; -- 1 minute default delay
ConsiderExpressMail: INTERNAL PROC = {
now: BasicTime.Unpacked ← BasicTime.Unpack[BasicTime.Now[]];
host: ArpaQueue.QElem;
recipients: RecipBundle;
hostQueue, from, gv: QUEUE;
msg: ArpaQueue.QElem;
minutes: INT;
IF ~expressOK THEN RETURN;
IF ~GVQueue.IsEmpty[] THEN RETURN;
IF now.weekday IN [Monday..Friday] AND now.hour IN [8..12+8) THEN RETURN;
IF now.hour = 12+11 THEN RETURN; -- Archiver and RegPurger
host ← ExpressQueue.GetProcessableElement[];
IF host # NIL THEN {
from ← ExpressQueue;
MoveHostToEnd[ExpressQueue, host]; }
ELSE {
host ← ArpaExpressQueue.GetProcessableElement[];
IF host = NIL THEN RETURN;
from ← ArpaExpressQueue;
MoveHostToEnd[ArpaExpressQueue, host]; };
recipients ← NARROW[host.Value[]];
hostQueue ← recipients.queue;
msg ← hostQueue.GetProcessableElement[];
IF hostQueue.IsEmpty[] THEN MoveToEmpty[from: from, host: host];
IF msg = NIL THEN { UnlockedMoveToEmpty[from: from, host: host]; RETURN; };
gv ← GetQueueForNewMessage[GVQueue, gvHostName];
MoveAndLog[from: hostQueue, to: gv, qElem: msg];
IF hostQueue.IsEmpty[] THEN UnlockedMoveToEmpty[from: from, host: host];
NOTIFY newGvMessage;
This heuristic is a hack. For easy messages it's too slow. For nasty ones, GV still gets swamped.
SELECT now.hour FROM
12+8 => minutes ← expressDelay + 4; -- 12 messages first hour
12+9 => minutes ← expressDelay + 3; -- 15 messages next hour
12+10 => minutes ← expressDelay + 2; -- 20 messages next hour
ENDCASE => minutes ← expressDelay+1; -- 30 messages/hour => 197 msgs in 8 hrs
IF ~(now.weekday IN[Monday..Friday]) THEN minutes ← expressDelay;
DallyAWhile[minutes]; }; -- (At least 1) Extra minute in main loop
DallyAWhile: INTERNAL PROC [minutes: INT] = {
snooz: CONDITION ← [timeout: Process.SecondsToTicks[minutes*60]];
WAIT snooz; };
ConsiderSickHosts: INTERNAL PROC = {
host: ArpaQueue.QElem ← SickQueues.GetProcessableElement[];
IF ~HostQueues.IsEmpty[] THEN RETURN;
IF host # NIL THEN {
recipients: RecipBundle ← NARROW[host.Value[]];
gv: BOOL ← Rope.Equal[recipients.queue.Name[], gvHostName];
target: QUEUEIF gv THEN GVQueue ELSE HostQueues;
MoveAndLog[from: SickQueues, to: target, qElem: host];
IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; };
MoveAllSickHosts: PUBLIC ENTRY PROC = {
ENABLE UNWIND => NULL;
Uninhibit: ArpaQueue.ElemProc = {
ArpaQueue.Uninhibit[qElem, SickQueues];
};
host: ArpaQueue.QElem;
SickQueues.Enumerate[Uninhibit];
host ← SickQueues.GetProcessableElement[];
WHILE host # NIL DO
MoveSickHostQueue[host];
host ← SickQueues.GetProcessableElement[];
ENDLOOP;
};
MoveSickHostQueue: INTERNAL PROC[host: ArpaQueue.QElem] = {
IF host # NIL THEN {
recipients: RecipBundle ← NARROW[host.Value[]];
gv: BOOL ← Rope.Equal[recipients.queue.Name[], gvHostName];
target: QUEUEIF gv THEN GVQueue ELSE HostQueues;
MoveAndLog[from: SickQueues, to: target, qElem: host];
IF gv THEN NOTIFY newGvMessage ELSE NOTIFY newArpaMessage; }; };
FindSickHost: INTERNAL PROC[host: Rope.ROPE] RETURNS[element: ArpaQueue.QElem←NIL]= {
Find: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
IF Rope.Equal[msgs.queue.Name[], host, FALSE] THEN {
ArpaQueue.Uninhibit[qElem, SickQueues];
element ← qElem;
continue ← FALSE; }; };
IF host # NIL THEN SickQueues.Enumerate[Find];
};
MoveSickHost: PUBLIC ENTRY PROC[name: Rope.ROPENIL] = {
ENABLE UNWIND => NULL;
host: ArpaQueue.QElem ← FindSickHost[name];
IF host # NIL THEN {
target: QUEUE ← HostQueues;
MoveAndLog[from: SickQueues, to: target, qElem: host];
NOTIFY newArpaMessage; }; };
CountQueue: PUBLIC ENTRY PROC[name: Rope.ROPENIL] RETURNS[n: INT𡤀]= {
ENABLE UNWIND => NULL;
n ← CountQueueInternal[name];
};
CountQueueInternal: INTERNAL PROC[name: Rope.ROPENIL] RETURNS[n: INT𡤀]= {
Count: ArpaQueue.ElemProc = {
n ← n+1};
IF name = NIL THEN RETURN[-1];
FindQueue[name].Enumerate[Count];
};
CountMessages: PUBLIC ENTRY PROC[queueName: Rope.ROPENIL] RETURNS[n: INT𡤀]= {
ENABLE UNWIND => NULL;
n ← CountMessagesInternal[queueName];
};
CountMessagesInternal: INTERNAL PROC[name: Rope.ROPENIL] RETURNS[n: INT𡤀]= {
Count: ArpaQueue.ElemProc = {
n ← n+1};
CountNested: ArpaQueue.ElemProc = {
msgs: RecipBundle ← NARROW[qElem.Value[]];
msgs.queue.Enumerate[Count];
};
IF name = NIL THEN RETURN[-1];
SELECT TRUE FROM
Rope.Equal[name, HostQueues.Name[], FALSE] => HostQueues.Enumerate[CountNested];
Rope.Equal[name, ExpressQueue.Name[], FALSE] => ExpressQueue.Enumerate[CountNested];
Rope.Equal[name, SickQueues.Name[], FALSE] => SickQueues.Enumerate[CountNested];
Rope.Equal[name, GVQueue.Name[], FALSE] => GVQueue.Enumerate[CountNested];
ENDCASE => n ← CountQueueInternal[name];
};
ConsiderOldMessages: INTERNAL PROC = {
now: BasicTime.GMT ← BasicTime.Now[];
host: ArpaQueue.QElem ← SickQueues.GetProcessableElement[];
ConsiderExpiration: INTERNAL ItemProc = {
value: REF ANY = qElem.Value[];
pkg: DeliveryPkg = NARROW[qElem.Value[]];
expiryDate: BasicTime.GMT ← ArpaSMTPDescr.GetExpiryDate[pkg.descr];
IF BasicTime.Period[from: now, to: expiryDate] < 0 THEN {
reason: ROPE ← "Unable to deliver msg to ";
FOR loosers: LIST OF ROPE ← pkg.recipList, loosers.rest UNTIL loosers = NIL DO
reason ← Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first];
ENDLOOP;
reason ← Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."];
MoveAndLog[from: queue, to: tempQueue, qElem: qElem];
SendItBack[queue: queue, qElem: qElem];
RETURN[FALSE]; }; };
IF ~GVQueue.IsEmpty[] THEN RETURN;
IF ~HostQueues.IsEmpty[] THEN RETURN;
EnumerateSickPkgs[ConsiderExpiration]; };
SendItBack: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = {
pkg: DeliveryPkg = NARROW[qElem.Value[]];
reason: ROPE ← "Unable to deliver msg to ";
FOR loosers: LIST OF ROPE ← pkg.recipList, loosers.rest UNTIL loosers = NIL DO
reason ← Rope.Cat[reason, IF loosers = pkg.recipList THEN NIL ELSE ", ", loosers.first];
ENDLOOP;
reason ← Rope.Cat[reason, " at ", queue.Name[], " within a reasonable amount of time."];
ArpaSMTPSupport.NotifySender[pkg.descr, reason];
MoveAndLog[from: tempQueue, to: queue, qElem: qElem];
InternalFinishedWithMsg[queue: queue, pkg: pkg]; };
Miscellaneous
UserError: PUBLIC ERROR [reason: ROPE] = CODE;
Failed: ERROR [why: ErrorCode, reason: ROPE] = CODE;
ErrorCode: TYPE = {lockNotObtained, notOnQueue};
DescrFromElemVal: PROC [elemVal: REF ANY] RETURNS [Descr] = {
SELECT ValTypeOfElem[elemVal] FROM
ItemOnlyT => RETURN[NARROW[elemVal, ItemOnly]^];
DeliveryPkgT => RETURN[NARROW[elemVal, DeliveryPkg].descr];
ENDCASE => ERROR; };
ItemProc: TYPE = PROC [queue: QUEUE, qElem: ArpaQueue.QElem, procData: REF ANYNIL] RETURNS [continue: BOOLTRUE];
EnumerateSickPkgs: INTERNAL PROC [proc: ItemProc, data: REF ANYNIL] = {
continueOverall: BOOL;
queue: QUEUE;
ScanQueue: ArpaQueue.ElemProc = {
continueOverall ← proc[queue, qElem, data];
RETURN[continueOverall]; };
ScanHostQueue: ArpaQueue.ElemProc = {
queue ← NARROW[qElem.Value[], RecipBundle].queue;
queue.Enumerate[ScanQueue];
RETURN[continueOverall]; };
SickQueues.Enumerate[ScanHostQueue]; };
EnumerateItems: INTERNAL PROC [proc: ItemProc, data: REF ANYNIL] = {
continueOverall: BOOL;
queue: QUEUE;
ScanQueue: ArpaQueue.ElemProc = {
continueOverall ← proc[queue, qElem, data];
RETURN[continueOverall]; };
ScanHostQueue: ArpaQueue.ElemProc = {
queue ← NARROW[qElem.Value[], RecipBundle].queue;
queue.Enumerate[ScanQueue];
RETURN[continueOverall]; };
IF ~continueOverall THEN RETURN;
GVQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
ExpressQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
HostQueues.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
SickQueues.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
queue ← tempQueue;
tempQueue.Enumerate[ScanQueue, data];
queue ← BadItemQueue;
BadItemQueue.Enumerate[ScanQueue, data]; };
QueueProc: TYPE = PROC [queue: QUEUE, procData: REF ANYNIL] RETURNS [continue: BOOLTRUE];
EnumerateQueues: INTERNAL PROC [proc: QueueProc, data: REF ANYNIL] = {
continueOverall: BOOLTRUE;
ScanHostQueue: ArpaQueue.ElemProc = {
queue: QUEUENARROW[qElem.Value[], RecipBundle].queue;
continueOverall ← proc[queue, data];
RETURN[continueOverall]; };
GVQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
ExpressQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
ArpaExpressQueue.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
HostQueues.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
SickQueues.Enumerate[ScanHostQueue];
IF ~continueOverall THEN RETURN;
IF ~proc[tempQueue, data] THEN RETURN;
IF ~proc[BadItemQueue, data] THEN RETURN; };
GetProcessableElement: ENTRY PROC [queue: QUEUE] RETURNS [qElem: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
qElem ← queue.GetProcessableElement[];
IF qElem = NIL THEN RETURN;
queue.Dequeue[qElem.Value[]];
queue.Enqueue[qElem]; };
MoveHostToEnd: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = {
queue.Dequeue[qElem.Value[]];
queue.Enqueue[qElem]; };
MoveToEmpty: ENTRY PROC [from: QUEUE, host: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
MoveAndLog[from: from, to: EmptyQueues, qElem: host]; };
UnlockedMoveToEmpty: INTERNAL PROC [from: QUEUE, host: ArpaQueue.QElem] = {
MoveAndLog[from: from, to: EmptyQueues, qElem: host]; };
MoveToSick: ENTRY PROC [from: QUEUE, host: ArpaQueue.QElem, reason: ROPE] = {
ENABLE UNWIND => NULL;
host.Inhibit[for: ArpaSMTPControl.defaultInhibitTime, why: reason];
MoveAndLog[from: from, to: SickQueues, qElem: host]; };
MoveToBad: ENTRY PROC [queue: QUEUE, msg: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
DequeueAndLog[queue: queue, value: msg.Value[]];
EnqueueAndLog[queue: BadItemQueue, qElem: msg]; };
FlushThisHost: ENTRY PROC [queue: QUEUE, value: REF ANY] = {
ENABLE UNWIND => NULL;
DequeueAndLog[queue: queue, value: value]; };
FinishedWithMsg: ENTRY PROC [queue: QUEUE, pkg: DeliveryPkg] = {
ENABLE UNWIND => NULL;
InternalFinishedWithMsg[queue, pkg]; };
InternalFinishedWithMsg: INTERNAL PROC [queue: QUEUE, pkg: DeliveryPkg] = {
pkg.descr.RemoveRecipientHost[queue.Name[]];
DequeueAndLog[queue: queue, value: pkg];
ConsiderItemDestruction[descr: pkg.descr]; };
LockedMoveAndLog: ENTRY PROC [from, to: QUEUE, qElem: ArpaQueue.QElem] = {
ENABLE UNWIND => NULL;
MoveAndLog[from: from, to: to, qElem: qElem]; };
MoveAndLog: INTERNAL PROC [from, to: QUEUE, qElem: ArpaQueue.QElem] = {
from.Dequeue[qElem.Value[]];
to.Enqueue[qElem];
ArpaSMTPSupport.Log[verbose,
Unparse[qElem.Value[]], " moved from ", from.Name[], " to ", to.Name[], "."]; };
EnqueueAndLog: INTERNAL PROC [queue: QUEUE, qElem: ArpaQueue.QElem] = {
Will happily queue wrong value type on a queue.
queue.Enqueue[qElem];
ArpaSMTPSupport.Log[verbose,
Unparse[qElem.Value[]], " added to ", queue.Name[], "."]; };
DequeueAndLog: INTERNAL PROC [queue: QUEUE, value: REF ANY] = {
val: ROPE = Unparse[value];
queue.Dequeue[value];
ArpaSMTPSupport.Log[verbose, val, " removed from ", queue.Name[], "."]; };
ValTypeOfQueue: PROC [queue: QUEUE] RETURNS [ElemValType] = {
RETURN[SELECT queue FROM
tempQueue, BadItemQueue => DeliveryPkgT,
EmptyQueues, GVQueue, ExpressQueue, HostQueues, SickQueues => RecipBundleT,
ENDCASE => DeliveryPkgT ]; };
ValTypeOfElem: PROC [value: REF ANY] RETURNS [ElemValType] = {
SELECT TRUE FROM
ISTYPE[value, DeliveryPkg] => RETURN[DeliveryPkgT];
ISTYPE[value, RecipBundle] => RETURN[RecipBundleT];
ENDCASE => ERROR; };
ElemValType: TYPE = {ItemOnlyT, DeliveryPkgT, RecipBundleT};
ValTypeNames: ARRAY ElemValType OF ROPE = [
"Somebody is confused",
Rope.Cat[
"DeliveryPkg (from ", ", an Host queue, or ", BadItemQueue.Name[], ")"],
Rope.Cat["Host queue (from Empty, GV, Ex, ARPA, or Sick)"] ];
ConsiderItemDestruction: INTERNAL PROC [descr: Descr] = {
Check all item queues (excluding InSystemQueue) to determine if descr is on any of them. If so, the item should be retained, else destroyed unless there are further recipients in the descriptor (in which case there is some error so log an ATTENTION msg and place on BadItemQueue). Also log an error if the descr appears on some queue but has no further recipients.
The alternative to this (and the way I first implemented it) is to check only if there are no further recipients in the recipient list field of the Descr, and assume that if there aren't then the item isn't on any pkg queue. If all of the code was right, and users could never confuse the queue system, this would be the better choice because all of the queues wouldn't have to be searched. However, we don't want pkgs on queues pointing to a destroyed item, and this is a server so we must try to act intelligently in the face of corruptions...
queues: LIST OF QUEUENIL;
itemPresent: BOOLFALSE;
onQueue: QUEUE;
CollectQueues: QueueProc = {queues ← CONS[queue, queues]; };
EnumerateQueues[CollectQueues];
[itemPresent, onQueue] ← ItemIsOn[descr.GetUserHandle[], queues];
IF itemPresent THEN {
IF ~descr.MoreRecipients[] AND onQueue # BadItemQueue THEN -- queue corruption
ArpaSMTPSupport.Log[important,
"There are no more recipients in the item descriptor for item",
descr.Unparse[],
", but there is a reference to it on ", onQueue.Name[],
".\nThis should not have occurred; the demons should discard it."]; }
ELSE { -- item not present on any queues
IF descr.MoreRecipients[] THEN {
qElem: ArpaQueue.QElem = ArpaQueue.NewElem[NEW[DeliveryPkgRep ←
[descr: descr, recipList: NIL]]];
EnqueueAndLog[queue: BadItemQueue, qElem: qElem];
ArpaSMTPSupport.Log[ATTENTION,
"There are more recipients in the item descriptor for item ",
descr.Unparse[],
", though there is no reference to it on any queue.\n",
"This should not have occurred. Have placed a deliveryPkg on ",
BadItemQueue.Name[], "."]; }
ELSE { descr.Destroy[]; }; }; };
ItemIsOn: INTERNAL PROC [handle: INT, queueList: LIST OF QUEUE] RETURNS [itemPresent: BOOL, onQueue: QUEUE] = {
itemPresent ← FALSE;
FOR qList: LIST OF QUEUE ← queueList, qList.rest UNTIL qList = NIL DO
onQueue ← qList.first;
IF RetrieveElemVal[onQueue, handle, FALSE] # NIL THEN {
itemPresent ← TRUE;
EXIT; };
ENDLOOP; };
RetrieveElemVal: PUBLIC PROC [queue: QUEUE, handle: INT, errorIfNotFound: BOOLTRUE] RETURNS [value: REF ANYNIL] = {
Find the value of the element on queue with the given user handle. Value is NIL if handle not found on queue.
SELECT ValTypeOfQueue[queue] FROM
DeliveryPkgT => {
Find: ArpaQueue.ElemProc = {
IF handle = ArpaSMTPDescr.GetUserHandle[NARROW[qElem.Value[], DeliveryPkg].descr] THEN {value ← qElem.Value[]; continue ← FALSE}; };
queue.Enumerate[Find]; };
RecipBundleT =>
ERROR UserError[IO.PutFR["You have asked the queue module to retrieve item #%g from HostQueues, which doesn't directly contain items (possibly a code bug). Use HostQueue.", IO.int[handle]]];
ENDCASE => ERROR;
IF value = NIL AND errorIfNotFound THEN {
reason: ROPE = IO.PutFR[
"Msg #%g not found on %g.", IO.int[handle], IO.rope[queue.Name[]]];
ERROR UserError[reason]; };
}; -- end RetrieveElemVal
END.