-- Copyright (C) 1981, 1982, 1984, 1985 by Xerox Corporation. All rights reserved.
-- ReadInput.mesa, Transport Mechanism Mail Server - algorithm to process input queue --
-- HGM, 17-Oct-85 21:58:07
-- Randy Gobbel 19-May-81 23:20:09 --
-- Andrew Birrell 30-Dec-81 15:25:29 --
-- Mark Johnson 7-Jan-82 15:28:38 --
-- Ted Wobber 2-Nov-82 11:08:23
-- Brenda Hankins 15-Aug-84 16:19:11
DIRECTORY
BodyDefs USING [maxRNameLength, PackedTime, RName, Timestamp],
Buffer USING [AccessHandle, GetBuffer, MakePool],
Heap USING [systemZone],
HeapDefs USING [
HeapReadData, HeapReadRName, HeapEndRead, HeapStartWrite, HeapWriteData,
HeapWriteRName, ObjectNumber, ReaderHandle, ReadRList, WriterHandle],
Inline USING [LongCOPY],
LogDefs USING [DisplayNumber, WriteChar, WriteLogEntry],
LogPrivateDefs USING [startUpTime],
MailboxDefs USING [MBXWrite, Remail, WaitForUnlocked],
NameInfoDefs USING [Close, Enumerate],
PolicyDefs USING [
CheckOperation, EndOperation, Operation, ReadPendingPause, Wait, WaitOperation],
PupDefs USING [
PupAddress, PupBuffer, PupRouterSendThis, SetPupContentsWords,
UniqueLocalPupAddress],
Process USING [Detach, DisableTimeout, Pause],
ProtocolDefs USING [AppendTimestamp, mailServerPollingSocket],
RestartDefs USING [] --EXPORT -- ,
ReturnDefs USING [BadGroup, BadRecipients, LongTerm],
ServerDefs USING [EnumerateAll, ServerHandle],
SiteCacheDefs USING [
FindMBXSite, NeedToRemail, RecipientInfo, RemailInfo, SingleFlush],
SLDefs USING [
GetCount, SLHeader, SLStartRead, SLEndRead, SLWrite, SLQueue, SLReadHandle,
SLTransfer, WaitForNonEmpty],
String USING [AppendChar, AppendDecimal, AppendString, AppendLongDecimal, EquivalentStrings],
Time USING [Current, Packed, Unpack];
ReadInput: MONITOR
IMPORTS
Buffer, Heap, HeapDefs, LogDefs, LogPrivateDefs, Inline, MailboxDefs, NameInfoDefs,
PolicyDefs, PupDefs, Process, ProtocolDefs, ReturnDefs, ServerDefs,
SiteCacheDefs, SLDefs, String, Time
EXPORTS MailboxDefs, RestartDefs =
BEGIN
BadSL: ERROR = CODE;
-- statistics --
messageTotal, localTotal, remoteTotal, badTotal: LONG CARDINAL ← 0;
foreignTotal, dlTotal, skippedTotal: LONG CARDINAL ← 0;
recipientLog: LONG STRING ← Heap.systemZone.NEW[StringBody[150]]; -- about 10 recipients --
NoteRecipient: PROC [name: BodyDefs.RName] =
BEGIN
dot: STRING = "..."L;
IF recipientLog.length + 1 + name.length + 1 + dot.length < recipientLog.maxlength THEN
BEGIN
String.AppendChar[recipientLog, ' ];
String.AppendString[recipientLog, name];
END
ELSE
IF recipientLog.length + 1 + dot.length < recipientLog.maxlength THEN
BEGIN
String.AppendChar[recipientLog, ' ];
String.AppendString[recipientLog, dot];
END;
END;
-- time-outs --
shortTermLimit: CARDINAL = 2 --days-- ;
longTermLimit: CARDINAL = 4 --days-- ;
timeoutDisabled: BOOLEAN ← TRUE;
bufferPool: Buffer.AccessHandle;
flushSource: PupDefs.PupAddress = PupDefs.UniqueLocalPupAddress[NIL];
TellOtherServer: PROC [received: BodyDefs.Timestamp, name: BodyDefs.RName] =
BEGIN
-- non-local name, tell the M-Server that gave us the msg to flush it's cache
IF received.net # 0 AND received.host # 0 THEN
BEGIN
b: PupDefs.PupBuffer = Buffer.GetBuffer[
type: pup, aH: bufferPool, function: send];
b.pup.dest ← [
net: [received.net], host: [received.host],
socket: ProtocolDefs.mailServerPollingSocket];
b.pup.source ← flushSource;
Inline.LongCOPY[
from: name, to: @(b.pup.pupWords),
nwords: SIZE[StringBody [name.length]]];
PupDefs.SetPupContentsWords[b, SIZE[StringBody [name.length]]];
b.pup.pupType ← LOOPHOLE[215B];
PupDefs.PupRouterSendThis[b];
END
-- ELSE message came to us from a client -- ;
END;
Sort: PROCEDURE [
handle: SLDefs.SLReadHandle, body: HeapDefs.ObjectNumber,
SL: HeapDefs.ReaderHandle] =
BEGIN
-- Despite its name, this procedure has no sorting algorithm as such,
-- since the delivery sites for a message are determined in a single
-- pass through its steering list
pendingSL: HeapDefs.WriterHandle ← NIL;
lockedSL: HeapDefs.WriterHandle ← NIL;
badList: HeapDefs.WriterHandle ← NIL;
slHeader: SLDefs.SLHeader;
localCount, remoteCount, pendingCount, badCount, lockCount: CARDINAL ← 0;
foreignCount, dlCount, skippedCount: CARDINAL ← 0;
MakeBad: PROC [recipient: BodyDefs.RName] =
BEGIN
IF badList = NIL THEN badList ← HeapDefs.HeapStartWrite[temp];
HeapDefs.HeapWriteRName[badList, recipient];
END;
MakeLocked: PROC [recipient: BodyDefs.RName] =
BEGIN
IF lockedSL = NIL THEN
BEGIN
lockedSL ← HeapDefs.HeapStartWrite[SLpending];
slHeader.server ← NIL;
HeapDefs.HeapWriteData[lockedSL, [@slHeader, SIZE[SLDefs.SLHeader]]];
END;
HeapDefs.HeapWriteRName[lockedSL, recipient];
lockCount ← lockCount + 1;
END;
MakePending: PROC [recipient: BodyDefs.RName] =
BEGIN
IF pendingSL = NIL THEN
BEGIN
pendingSL ← HeapDefs.HeapStartWrite[SLpending];
slHeader.server ← NIL;
HeapDefs.HeapWriteData[pendingSL, [@slHeader, SIZE[SLDefs.SLHeader]]];
END;
HeapDefs.HeapWriteRName[pendingSL, recipient];
pendingCount ← pendingCount + 1;
END;
HaveIDoneThisOne: SIGNAL [new: BodyDefs.RName] RETURNS [BOOLEAN] = CODE;
-- This signal is used to prevent recursive loops when expanding
-- recursively defined DL's. --
Deliver: PROCEDURE [recipient: BodyDefs.RName] RETURNS [done: BOOLEAN] =
BEGIN -- deliver to one recipient, who may be a group --
DeliverLocal: ENTRY PROC RETURNS [info: SiteCacheDefs.RecipientInfo] =
BEGIN -- part of the cache-flush synchronization arrangements --
info ←
IF String.EquivalentStrings[recipient, "ExpressMail↑.ms"] THEN [
notFound[]] ELSE SiteCacheDefs.FindMBXSite[recipient];
WITH info SELECT FROM
local =>
IF MailboxDefs.MBXWrite[recipient, body, slHeader.created] THEN {
LogDefs.WriteChar['L]; localCount ← localCount + 1}
ELSE {LogDefs.WriteChar['M]; MakeLocked[recipient]};
allDown, notFound => NULL;
ENDCASE => TellOtherServer[slHeader.received, recipient];
END;
info: SiteCacheDefs.RecipientInfo = DeliverLocal[];
done ← FALSE;
WITH info SELECT FROM
dl =>
BEGIN
oldBadList: HeapDefs.WriterHandle = badList;
badList ← NIL;
LogDefs.WriteChar['G];
dlCount ← dlCount + 1;
IF ~(SIGNAL HaveIDoneThisOne[recipient]) THEN
BEGIN
NameInfoDefs.Enumerate[
members, Deliver !
HaveIDoneThisOne =>
IF String.EquivalentStrings[recipient, new] THEN RESUME [TRUE]];
END
ELSE skippedCount ← skippedCount + 1;
IF badList # NIL THEN ReturnDefs.BadGroup[badList, body, recipient];
badList ← oldBadList;
NameInfoDefs.Close[members];
END;
foreign =>
BEGIN
LogDefs.WriteChar['X];
IF ~(SIGNAL HaveIDoneThisOne[recipient]) THEN
BEGIN
HeapDefs.ReadRList[
members, Deliver !
HaveIDoneThisOne =>
IF String.EquivalentStrings[recipient, new] THEN RESUME [TRUE]];
END
ELSE skippedCount ← skippedCount + 1;
HeapDefs.HeapEndRead[members];
foreignCount ← foreignCount + 1;
END;
local => NULL -- done by DeliverLocal -- ;
found =>
BEGIN
LogDefs.WriteChar['F];
IF server.SL = NIL THEN
BEGIN
server.SL ← HeapDefs.HeapStartWrite[SLforward];
slHeader.server ← server;
HeapDefs.HeapWriteData[server.SL, [@slHeader, SIZE[SLDefs.SLHeader]]];
END;
HeapDefs.HeapWriteRName[server.SL, recipient];
remoteCount ← remoteCount + 1;
END;
allDown => {LogDefs.WriteChar['P]; MakePending[recipient]};
notFound =>
BEGIN
LogDefs.WriteChar['B];
MakeBad[recipient];
badCount ← badCount + 1;
END;
ENDCASE => ERROR;
-- The loop calling this procedure may use lots of processor time:
Process.Pause[1];
END;
BEGIN
startTime: Time.Packed ← Time.Current[];
reject: BOOLEAN;
pendingSL ← NIL;
badList ← NIL;
BEGIN
ended: BOOLEAN;
used: CARDINAL;
[ended, used] ← HeapDefs.HeapReadData[SL, [@slHeader, SIZE[SLDefs.SLHeader]]];
IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[];
reject ← ElapsedDays[LOOPHOLE[slHeader.created.time], longTermLimit]
OR ElapsedDays[LOOPHOLE[slHeader.received.time], shortTermLimit];
recipientLog.length ← 0;
String.AppendString[recipientLog, "RecipientLog "L];
ProtocolDefs.AppendTimestamp[recipientLog, slHeader.created];
String.AppendChar[recipientLog, ':];
UNTIL ended DO
recipient: BodyDefs.RName = [BodyDefs.maxRNameLength];
ended ← HeapDefs.HeapReadRName[SL, recipient];
NoteRecipient[recipient];
IF reject THEN MakeBad[recipient]
ELSE [] ← Deliver[recipient ! HaveIDoneThisOne => RESUME [FALSE]];
ENDLOOP;
END;
HeapDefs.HeapEndRead[SL];
IF reject AND badList # NIL THEN
BEGIN
log: LONG STRING ← Heap.systemZone.NEW[StringBody[128]];
ReturnDefs.LongTerm[badList, body];
String.AppendString[log, "Long-term timeout on "L];
ProtocolDefs.AppendTimestamp[log, slHeader.created];
LogDefs.WriteLogEntry[log];
Heap.systemZone.FREE[@log];
END
ELSE
BEGIN
EndServerSL: PROCEDURE [server: ServerDefs.ServerHandle] =
BEGIN -- this is executed inside the ServerInfo monitor --
IF server.SL # NIL THEN
BEGIN
SLDefs.SLWrite[body, server.SL, forward];
server.SL ← NIL;
END;
END;
IF lockedSL # NIL THEN SLDefs.SLWrite[body, lockedSL, mailbox];
IF pendingSL # NIL THEN SLDefs.SLWrite[body, pendingSL, pending];
ServerDefs.EnumerateAll[EndServerSL];
IF badList # NIL THEN ReturnDefs.BadRecipients[badList, body];
IF pendingCount # 0 OR localCount # 0 OR remoteCount # 0 OR badCount # 0
OR lockCount # 0 THEN
BEGIN
log: LONG STRING ← Heap.systemZone.NEW[StringBody[200]];
called: BOOLEAN ← FALSE;
Add: PROC [count: CARDINAL, text: STRING] =
BEGIN
IF count # 0 THEN
BEGIN
String.AppendString[log, ", "L];
String.AppendDecimal[log, count];
String.AppendString[log, text];
called ← TRUE;
END;
END;
String.AppendString[log, "Delivered "L];
ProtocolDefs.AppendTimestamp[log, slHeader.created];
Add[localCount, " local"L];
Add[remoteCount, " remote"L];
Add[pendingCount, " pending"L];
Add[badCount, " bad"L];
Add[lockCount, " locked"L];
Add[foreignCount, " foreign"L];
Add[dlCount, " DLs"L];
Add[skippedCount, " DLs skipped"L];
IF ~called THEN String.AppendString[log, ", no recipients"L];
LogDefs.WriteLogEntry[log];
Heap.systemZone.FREE[@log];
LogDefs.WriteLogEntry[recipientLog];
messageTotal ← messageTotal + 1;
localTotal ← localTotal + localCount;
remoteTotal ← remoteTotal + remoteCount;
badTotal ← badTotal + badCount;
foreignTotal ← foreignTotal + foreignCount;
dlTotal ← dlTotal + dlCount;
skippedTotal ← skippedTotal + skippedCount;
END;
END;
SLDefs.SLEndRead[handle];
LogFinished[slHeader.created, startTime];
END;
END; -- Sort
LogFinished: PROC [stamp: BodyDefs.Timestamp, startTime: Time.Packed] =
BEGIN
log: LONG STRING;
seconds: LONG CARDINAL ← Time.Current[]-startTime;
IF seconds < 30 THEN RETURN;
log ← Heap.systemZone.NEW[StringBody[128]];
String.AppendString[log, "Processing "L];
ProtocolDefs.AppendTimestamp[log, stamp];
String.AppendString[log, " took "L];
String.AppendLongDecimal[log, seconds];
String.AppendString[log, " seconds"L];
LogDefs.WriteLogEntry[log];
Heap.systemZone.FREE[@log];
END;
daySecs: LONG CARDINAL = 24 * LONG[60 * 60];
ElapsedDays: PROC [from: BodyDefs.PackedTime, interval: CARDINAL]
RETURNS [tooMany: BOOLEAN] =
-- Determines whether more than "interval" working days have elapsed
-- since "from".
BEGIN
-- For every five working days, we allow seven elapsed days --
-- For under five working days, must correct for weekends --
now: Time.Packed = Time.Current[];
completeWeeks: CARDINAL = (interval / 5) * 7;
partialWeek: CARDINAL = interval MOD 5;
limit: LONG CARDINAL ← from + (completeWeeks + partialWeek) * daySecs;
IF timeoutDisabled OR now < limit THEN RETURN[FALSE] -- optimize most calls
ELSE
BEGIN
limitDay: CARDINAL = Time.Unpack[LOOPHOLE[limit, Time.Packed]].weekday;
saturday: CARDINAL = 5;
sunday: CARDINAL = 6;
SELECT limitDay FROM
saturday =>
SELECT partialWeek FROM
0 => NULL; -- any time in the weekend is ok --
ENDCASE => limit ← limit + 2 * daySecs;
sunday =>
SELECT partialWeek FROM
0 => NULL; -- any time in the weekend is ok --
1 => -- limit should be set to end of following Monday --
limit ← limit + 2 * daySecs;
ENDCASE => limit ← limit + 2 * daySecs;
ENDCASE =>
IF limitDay < partialWeek THEN
limit ← limit + MIN[partialWeek - limitDay, 2] * daySecs;
RETURN[now > limit]
END
END;
SeeIfTimeToStartTimeouts: PROCEDURE =
BEGIN -- check uptime to see if greater than 6 hours.
timeoutDisabled ← Time.Current[] - LogPrivateDefs.startUpTime < 6 * 60 * 60
END;
forever: BOOLEAN ← FALSE;
SortQueue: PROC [queue: SLDefs.SLQueue] =
BEGIN
which: Class = SELECT queue FROM input => normal, ENDCASE => other;
operation: PolicyDefs.Operation =
SELECT queue FROM
input => readInput,
pending => readPending,
mailbox => readMailbox,
ENDCASE => ERROR;
DO
IF timeoutDisabled THEN SeeIfTimeToStartTimeouts[];
IF forever THEN SLDefs.WaitForNonEmpty[queue];
SELECT queue FROM
input => NULL;
pending => PolicyDefs.ReadPendingPause[];
mailbox => MailboxDefs.WaitForUnlocked[];
ENDCASE => ERROR;
ClaimSorter[which];
THROUGH [0..SLDefs.GetCount[queue]) DO
SL: HeapDefs.ReaderHandle;
handle: SLDefs.SLReadHandle;
body: HeapDefs.ObjectNumber;
IF queue = input OR ~PolicyDefs.CheckOperation[operation] THEN
BEGIN
ReleaseSorter[which];
IF forever THEN PolicyDefs.WaitOperation[operation]
ELSE {IF ~PolicyDefs.CheckOperation[operation] THEN RETURN};
ClaimSorter[which];
END;
[handle, body, SL] ← SLDefs.SLStartRead[queue];
Sort[handle, body, SL];
PolicyDefs.EndOperation[operation];
ENDLOOP;
ReleaseSorter[which];
IF ~forever THEN EXIT;
ENDLOOP;
END;
otherQueueCount: CARDINAL ← 0; -- give priority to non input queues --
waiters: CARDINAL ← 0; -- number of processes wanting the sorter --
sorterInUse: BOOLEAN ← FALSE; -- mutual exclusion on calls of "Sort" --
sorterCond: CONDITION;
Class: TYPE = {normal, other}; -- class = other has priority --
ClaimSorter: ENTRY PROC [which: Class] =
BEGIN
waiters ← waiters + 1;
IF which = other THEN otherQueueCount ← otherQueueCount + 1;
WHILE sorterInUse OR (which # other AND otherQueueCount # 0) DO
WAIT sorterCond ENDLOOP;
sorterInUse ← TRUE;
waiters ← waiters - 1;
END;
ReleaseSorter: ENTRY PROC [which: Class] =
BEGIN
sorterInUse ← FALSE;
IF which = other THEN otherQueueCount ← otherQueueCount - 1;
BROADCAST sorterCond;
END;
USPS: PROC =
BEGIN
DO
used: CARDINAL;
handle: SLDefs.SLReadHandle;
body: HeapDefs.ObjectNumber;
SL: HeapDefs.ReaderHandle;
slHeader: SLDefs.SLHeader;
[handle, body, SL] ← SLDefs.SLStartRead[express];
[, used] ← HeapDefs.HeapReadData[SL, [@slHeader, SIZE[SLDefs.SLHeader]]];
IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[];
HeapDefs.HeapEndRead[SL];
UNTIL (Time.Current[]-slHeader.created.time) > 60*60 DO
-- Delay all Express mail at least an hour
PolicyDefs.Wait[mins: 1]; ENDLOOP;
PolicyDefs.WaitOperation[readExpress];
SLDefs.SLTransfer[handle, input];
PolicyDefs.EndOperation[readExpress];
LogExpressMotion[slHeader.created];
ENDLOOP;
END;
LogExpressMotion: PROC [stamp: BodyDefs.Timestamp] =
BEGIN
log: LONG STRING ← Heap.systemZone.NEW[StringBody[128]];
ProtocolDefs.AppendTimestamp[log, stamp];
String.AppendString[log, " moved from Express to Input Queue"L];
LogDefs.WriteLogEntry[log];
Heap.systemZone.FREE[@log];
END;
-- The synchronization for sitecache flushing and possible remailing is
-- complicated. The cause of the complication is that "Deliver" might
-- have found someone to be local but not placed the message in the
-- mailbox, at the time that we flush the cache. This is prevented by
-- the entry procedures "DeliverLocal" and "FlushCache".
FlushCacheAndRemail: PUBLIC PROC [who: BodyDefs.RName] =
BEGIN
FlushCache: ENTRY PROC = {SiteCacheDefs.SingleFlush[who]};
info: SiteCacheDefs.RemailInfo;
FlushCache[];
info ← SiteCacheDefs.NeedToRemail[who];
IF info # stillLocal THEN MailboxDefs.Remail[who: who, valid: info = remail];
END;
ReadInput1: PUBLIC PROCEDURE =
BEGIN
Process.DisableTimeout[@sorterCond];
bufferPool ← Buffer.MakePool[send: 3, receive: 0];
LogDefs.DisplayNumber["Local recipients"L, [long[@localTotal]] ];
LogDefs.DisplayNumber["Remote recipients"L, [long[@remoteTotal]] ];
LogDefs.DisplayNumber["Bad recipients"L, [long[@badTotal]] ];
LogDefs.DisplayNumber["Foreign recipients"L, [long[@foreignTotal]] ];
LogDefs.DisplayNumber["DLs expanded"L, [long[@dlTotal]] ];
LogDefs.DisplayNumber["Nested DLs skipped"L, [long[@skippedTotal]] ];
END;
ReadInput2: PUBLIC PROCEDURE =
BEGIN
-- empty queues once synchronously, to get us off the ground
SortQueue[queue: mailbox];
SortQueue[queue: input];
forever ← TRUE;
Process.Detach[FORK SortQueue[queue: input]];
Process.Detach[FORK SortQueue[queue: mailbox]];
Process.Detach[FORK SortQueue[queue: pending]];
Process.Detach[FORK USPS[]];
END;
END.
log:
August 84 - blh: Klamath update
15-Aug-84 10:20:13 - blh: disable timeouts for first 6 hours after startup.