-- Transport Mechanism Mail Server - algorithm to process input queue --
-- [Indigo]<Grapevine>MS>ReadInput.mesa
-- Randy Gobbel 19-May-81 23:20:09 --
-- Andrew Birrell 26-Oct-82 14:28:08 --
-- Mike Schroeder 18-Nov-82 11:18:50 --
DIRECTORY
BodyDefs USING[ maxRNameLength, PackedTime, RName, Timestamp ],
HeapDefs USING[ HeapReadData, HeapReadRName, HeapEndRead,
HeapStartWrite, HeapWriteData, HeapWriteRName,
ObjectNumber, ReaderHandle, ReadRList, WriterHandle ],
Inline USING[ LongCOPY ],
LogDefs USING[ --DisplayNumber,-- WriteChar, WriteLogEntry ],
MailboxDefs USING[ MBXWrite, Remail, WaitForUnlocked ],
NameInfoDefs USING[ Close, Enumerate ],
PolicyDefs USING[ CheckOperation, EndOperation, ExpressAllowed, Operation,
ReadPendingPause, WaitOperation],
PupDefs USING[ GetFreePupBuffer, PupAddress, PupBuffer,
PupRouterSendThis, SetPupContentsWords,
UniqueLocalPupAddress ],
Process USING[ Detach, DisableTimeout, Pause ],
ProtocolDefs USING[ AppendTimestamp, mailServerPollingSocket ],
RestartDefs USING[] --EXPORT only--,
ReturnDefs USING[ BadGroup, BadRecipients, LongTerm ],
ServerDefs USING[ EnumerateAll, ServerHandle ],
SiteCacheDefs USING[ FindMBXSite, NeedToRemail, RecipientInfo, RemailInfo, SingleFlush ],
SLDefs USING[ GetCount, SLHeader, SLStartRead, SLEndRead, SLWrite,
SLQueue, SLReadHandle, SLTransfer, WaitForNonEmpty ],
String USING[ AppendChar, AppendDecimal, AppendString,
EquivalentStrings ],
Time USING[ Current, Packed, Unpack];
ReadInput: MONITOR
IMPORTS HeapDefs, LogDefs, Inline, MailboxDefs, NameInfoDefs, PolicyDefs,
PupDefs, Process, ProtocolDefs, ReturnDefs, ServerDefs,
SiteCacheDefs, SLDefs, String, Time
EXPORTS MailboxDefs, RestartDefs --PROGRAM-- =
BEGIN
-- statistics --
messageTotal: CARDINAL ← 0;
localTotal: CARDINAL ← 0;
remoteTotal: CARDINAL ← 0;
badTotal: CARDINAL ← 0;
-- stats: -- recipientLog: STRING = [150]; -- about 10 recipients --
-- stats: --
-- stats: -- NoteRecipient: PROC[name: BodyDefs.RName] =
-- stats: -- BEGIN
-- stats: -- dot: STRING = "..."L;
-- stats: -- IF recipientLog.length + 1 + name.length + 1 + dot.length
-- stats: -- < recipientLog.maxlength
-- stats: -- THEN BEGIN
-- stats: -- String.AppendChar[recipientLog, ' ];
-- stats: -- String.AppendString[recipientLog, name];
-- stats: -- END
-- stats: -- ELSE IF recipientLog.length + 1 + dot.length
-- stats: -- < recipientLog.maxlength
-- stats: -- THEN BEGIN
-- stats: -- String.AppendChar[recipientLog, ' ];
-- stats: -- String.AppendString[recipientLog, dot];
-- stats: -- END;
-- stats: -- END;
-- time-outs --
shortTermLimit: CARDINAL = 2--days--;
longTermLimit: CARDINAL = 4--days--;
flushSource: PupDefs.PupAddress = PupDefs.UniqueLocalPupAddress[NIL];
TellOtherServer: PROC[received: BodyDefs.Timestamp,
name: BodyDefs.RName] =
BEGIN
-- name was non-local, so tell the M-Server that gave us the
-- message to flush it's cache --
IF received.net # 0 AND received.host # 0
THEN BEGIN
b: PupDefs.PupBuffer = PupDefs.GetFreePupBuffer[];
b.dest ← [ net:[received.net],
host:[received.host],
socket: ProtocolDefs.mailServerPollingSocket ];
b.source ← flushSource;
Inline.LongCOPY[from: name, to: @(b.pupWords),
nwords: SIZE[StringBody[name.length]] ];
PupDefs.SetPupContentsWords[b, SIZE[StringBody[name.length]]];
b.pupType ← LOOPHOLE[215]; -- to get around difference in Pilot/Alto PupTypes
PupDefs.PupRouterSendThis[b];
END
-- ELSE message came to us from a client --;
END;
Sort: PROCEDURE[handle: SLDefs.SLReadHandle,
body: HeapDefs.ObjectNumber,
SL: HeapDefs.ReaderHandle] =
BEGIN
-- Despite its name, this procedure has no sorting algorithm as such,
-- since the delivery sites for a message are determined in a single
-- pass through its steering list
pendingSL: HeapDefs.WriterHandle ← NIL;
lockedSL: HeapDefs.WriterHandle ← NIL;
badList: HeapDefs.WriterHandle ← NIL;
slHeader: SLDefs.SLHeader;
localCount: CARDINAL ← 0;
remoteCount: CARDINAL ← 0;
pendingCount: CARDINAL ← 0;
badCount: CARDINAL ← 0;
lockCount: CARDINAL ← 0;
MakeBad: PROC[recipient: BodyDefs.RName] RETURNS[done: BOOLEAN] =
BEGIN
done ← FALSE;
IF badList = NIL THEN badList ← HeapDefs.HeapStartWrite[temp];
HeapDefs.HeapWriteRName[badList, recipient];
END;
MakeLocked: PROC[recipient: BodyDefs.RName] =
BEGIN
IF lockedSL = NIL
THEN BEGIN
lockedSL ← HeapDefs.HeapStartWrite[SLpending];
slHeader.server ← NIL;
HeapDefs.HeapWriteData[lockedSL,
[@slHeader,SIZE[SLDefs.SLHeader]]];
END;
HeapDefs.HeapWriteRName[lockedSL, recipient];
lockCount ← lockCount + 1;
END;
MakePending: PROC[recipient: BodyDefs.RName] =
BEGIN
IF pendingSL = NIL
THEN BEGIN
pendingSL ← HeapDefs.HeapStartWrite[SLpending];
slHeader.server ← NIL;
HeapDefs.HeapWriteData[pendingSL,
[@slHeader,SIZE[SLDefs.SLHeader]]];
END;
HeapDefs.HeapWriteRName[pendingSL,recipient];
pendingCount ← pendingCount + 1;
END;
HaveIDoneThisOne: SIGNAL[new: BodyDefs.RName] RETURNS[ BOOLEAN ] = CODE;
-- This signal is used to prevent recursive loops when expanding
-- recursively defined DL's. --
Deliver: PROCEDURE[recipient: BodyDefs.RName] RETURNS[done: BOOLEAN] =
BEGIN -- deliver to one recipient, who may be a group --
DeliverLocal: ENTRY PROC RETURNS[info: SiteCacheDefs.RecipientInfo] =
BEGIN
-- part of the cache-flush synchronization arrangements --
info ← IF String.EquivalentStrings[recipient, "ExpressMail↑.ms"]
THEN [notFound[]]
ELSE SiteCacheDefs.FindMBXSite[recipient];
WITH info SELECT FROM
local =>
IF MailboxDefs.MBXWrite[recipient, body, slHeader.created]
THEN { LogDefs.WriteChar['L]; localCount ← localCount + 1 }
ELSE { LogDefs.WriteChar['M]; MakeLocked[recipient] };
allDown, notFound => NULL;
ENDCASE => TellOtherServer[slHeader.received, recipient];
END;
info: SiteCacheDefs.RecipientInfo = DeliverLocal[];
done ← FALSE;
WITH info SELECT FROM
dl =>
BEGIN
oldBadList: HeapDefs.WriterHandle = badList;
badList ← NIL;
LogDefs.WriteChar['G];
IF NOT (SIGNAL HaveIDoneThisOne[recipient])
THEN NameInfoDefs.Enumerate[members, Deliver !
HaveIDoneThisOne =>
IF String.EquivalentStrings[recipient, new]
THEN RESUME[TRUE] ];
IF badList # NIL
THEN ReturnDefs.BadGroup[badList, body, recipient];
badList ← oldBadList;
NameInfoDefs.Close[members];
END;
foreign =>
BEGIN
LogDefs.WriteChar['X];
IF NOT (SIGNAL HaveIDoneThisOne[recipient])
THEN HeapDefs.ReadRList[members, Deliver !
HaveIDoneThisOne =>
IF String.EquivalentStrings[recipient, new]
THEN RESUME[TRUE] ];
HeapDefs.HeapEndRead[members];
END;
local => NULL -- done by DeliverLocal --;
found =>
BEGIN
LogDefs.WriteChar['F];
IF server.SL = NIL
THEN BEGIN
server.SL ← HeapDefs.HeapStartWrite[SLforward];
slHeader.server ← server;
HeapDefs.HeapWriteData[server.SL,
[@slHeader,SIZE[SLDefs.SLHeader]]];
END;
HeapDefs.HeapWriteRName[server.SL,recipient];
remoteCount ← remoteCount + 1;
END;
allDown =>
BEGIN
LogDefs.WriteChar['P];
MakePending[recipient];
END;
notFound =>
BEGIN
LogDefs.WriteChar['B];
[] ← MakeBad[recipient];
badCount ← badCount + 1;
END;
ENDCASE => ERROR;
-- The loop calling this procedure may use lots of processor time --
Process.Pause[1];
END;
BEGIN
reject: BOOLEAN;
pendingSL ← NIL;
badList ← NIL;
localCount ← remoteCount ← pendingCount ← badCount ← lockCount ← 0;
BEGIN
BadSL: ERROR = CODE;
ended: BOOLEAN;
used: CARDINAL;
[ended,used] ←
HeapDefs.HeapReadData[SL, [@slHeader,SIZE[SLDefs.SLHeader]] ];
IF used # SIZE[SLDefs.SLHeader] THEN ERROR BadSL[];
reject ← ElapsedDays[slHeader.created.time, longTermLimit]
OR ElapsedDays[slHeader.received.time, shortTermLimit];
-- stats: -- recipientLog.length ← 0;
-- stats: -- String.AppendString[recipientLog, "RecipientLog "L];
-- stats: -- ProtocolDefs.AppendTimestamp[recipientLog, slHeader.created];
-- stats: -- String.AppendChar[recipientLog, ':];
UNTIL ended
DO recipient: BodyDefs.RName = [BodyDefs.maxRNameLength];
ended ← HeapDefs.HeapReadRName[SL, recipient];
-- stats: -- NoteRecipient[recipient];
IF reject
THEN [] ← MakeBad[recipient]
ELSE [] ← Deliver[recipient !
HaveIDoneThisOne => RESUME[FALSE] ];
ENDLOOP;
END;
HeapDefs.HeapEndRead[SL];
IF reject AND badList # NIL
THEN BEGIN
ReturnDefs.LongTerm[badList, body];
BEGIN
log: STRING = [128];
String.AppendString[log, "Long-term timeout on "L];
ProtocolDefs.AppendTimestamp[log, slHeader.created];
LogDefs.WriteLogEntry[log];
END;
END
ELSE BEGIN
IF lockedSL # NIL
THEN BEGIN
SLDefs.SLWrite[body, lockedSL, mailbox];
END;
IF pendingSL # NIL
THEN BEGIN
SLDefs.SLWrite[body, pendingSL, pending];
END;
BEGIN
EndServerSL: PROCEDURE[server: ServerDefs.ServerHandle] =
BEGIN
-- this is executed inside the ServerInfo monitor --
IF server.SL # NIL
THEN BEGIN
SLDefs.SLWrite[body, server.SL,
IF server.type = foreign THEN foreign ELSE forward];
server.SL ← NIL;
END;
END;
ServerDefs.EnumerateAll[EndServerSL];
END;
IF badList # NIL
THEN ReturnDefs.BadRecipients[badList, body];
IF pendingCount = 0
OR localCount # 0 OR remoteCount # 0 OR badCount # 0
OR lockCount # 0
THEN BEGIN
log: STRING = [140];
called: BOOLEAN ← FALSE;
Add: PROC[count: CARDINAL, text: STRING] =
BEGIN
IF count # 0
THEN BEGIN
String.AppendString[log, ", "L];
String.AppendDecimal[log, count];
String.AppendString[log, text];
called ← TRUE;
END;
END;
String.AppendString[log, "Delivered "L];
ProtocolDefs.AppendTimestamp[log, slHeader.created];
Add[localCount, " local"L];
Add[remoteCount, " remote"L];
Add[pendingCount, " pending"L];
Add[badCount, " bad"L];
Add[lockCount, " locked"L];
IF NOT called
THEN String.AppendString[log, ", no recipients"L];
LogDefs.WriteLogEntry[log];
-- stats: -- LogDefs.WriteLogEntry[recipientLog];
messageTotal ← messageTotal + 1;
localTotal ← localTotal + localCount;
remoteTotal ← remoteTotal + remoteCount;
badTotal ← badTotal + badCount;
END;
END;
SLDefs.SLEndRead[handle];
END;
END;
daySecs: LONG CARDINAL = 24 * LONG[60*60];
ElapsedDays: PROC[from: BodyDefs.PackedTime, interval: CARDINAL]
RETURNS[tooMany: BOOLEAN] =
-- Determines whether more than "interval" working days have elapsed
-- since "from".
BEGIN
-- For every five working days, we allow seven elapsed days --
-- For under five working days, must correct for weekends --
now: Time.Packed = Time.Current[];
completeWeeks: CARDINAL = (interval/5)*7;
partialWeek: CARDINAL = interval MOD 5;
limit: LONG CARDINAL ← from +
(completeWeeks+partialWeek) * daySecs;
IF now < limit
THEN RETURN[FALSE] -- optimize most calls --
ELSE BEGIN
limitDay: CARDINAL = Time.Unpack[LOOPHOLE[limit,Time.Packed]].weekday;
saturday: CARDINAL = 5;
sunday: CARDINAL = 6;
SELECT limitDay FROM
saturday =>
SELECT partialWeek FROM
0 => NULL; -- any time in the weekend is ok --
ENDCASE => limit ← limit + 2 * daySecs;
sunday =>
SELECT partialWeek FROM
0 => NULL; -- any time in the weekend is ok --
1 => -- limit should be set to end of following Monday --
limit ← limit + 2 * daySecs;
ENDCASE => limit ← limit + 2 * daySecs;
ENDCASE =>
IF limitDay < partialWeek
THEN limit ← limit + MIN[partialWeek-limitDay,2] * daySecs;
RETURN[ now > limit ]
END
END;
forever: BOOLEAN ← FALSE;
readPendingRunningTime: LONG CARDINAL ← 10 * 60; --seconds--
SortQueue: PROC[queue: SLDefs.SLQueue] =
BEGIN
which: Class = SELECT queue FROM
input => normal,
ENDCASE => other;
operation: PolicyDefs.Operation = SELECT queue FROM
input => readInput,
pending => readPending,
mailbox => readMailbox,
ENDCASE => ERROR;
DO exhaustion: LONG CARDINAL;
IF forever THEN SLDefs.WaitForNonEmpty[queue];
SELECT queue FROM
input => NULL;
pending => PolicyDefs.ReadPendingPause[];
mailbox => MailboxDefs.WaitForUnlocked[];
ENDCASE => ERROR;
ClaimSorter[which];
exhaustion ← Time.Current[] + readPendingRunningTime;
THROUGH [1..SLDefs.GetCount[queue]]
DO SL: HeapDefs.ReaderHandle;
handle: SLDefs.SLReadHandle;
body: HeapDefs.ObjectNumber;
IF which # other OR NOT PolicyDefs.CheckOperation[operation]
THEN BEGIN
ReleaseSorter[which];
IF forever
THEN PolicyDefs.WaitOperation[operation]
ELSE { IF NOT PolicyDefs.CheckOperation[operation] THEN RETURN };
ClaimSorter[which];
END;
[handle, body, SL] ← SLDefs.SLStartRead[queue];
Sort[handle, body, SL];
PolicyDefs.EndOperation[operation];
IF queue = input THEN ConsiderExpress[];
IF queue = pending
AND Time.Current[] > exhaustion
AND waiters # 0
THEN EXIT;
ENDLOOP;
ReleaseSorter[which];
IF NOT forever THEN EXIT;
ENDLOOP;
END;
otherQueueCount: CARDINAL ← 0; -- give priority to non input queues --
waiters: CARDINAL ← 0; -- number of processes wanting the sorter --
sorterInUse: BOOLEAN ← FALSE; -- mutual exclusion on calls of "Sort" --
sorterCond: CONDITION;
Class: TYPE = { normal, other }; -- class = other has priority --
ClaimSorter: ENTRY PROC[which: Class] =
BEGIN
waiters ← waiters+1;
IF which = other THEN otherQueueCount ← otherQueueCount+1;
WHILE sorterInUse
OR ( which # other AND otherQueueCount # 0 )
DO WAIT sorterCond ENDLOOP;
sorterInUse ← TRUE;
waiters ← waiters-1;
END;
ReleaseSorter: ENTRY PROC[which: Class] =
BEGIN
sorterInUse ← FALSE;
IF which = other THEN otherQueueCount ← otherQueueCount-1;
BROADCAST sorterCond;
END;
pony: CONDITION ← [timeout:0];
USPS: PROC =
BEGIN
DO handle: SLDefs.SLReadHandle;
body: HeapDefs.ObjectNumber;
SL: HeapDefs.ReaderHandle;
SLDefs.WaitForNonEmpty[express];
CheckExpress[];
PolicyDefs.WaitOperation[readExpress];
IF PolicyDefs.ExpressAllowed[SLDefs.GetCount[input]]
THEN BEGIN
[handle, body, SL] ← SLDefs.SLStartRead[express];
HeapDefs.HeapEndRead[SL];
SLDefs.SLTransfer[handle, input];
END;
PolicyDefs.EndOperation[readExpress];
ENDLOOP;
END;
CheckExpress: ENTRY PROC = INLINE
BEGIN
UNTIL PolicyDefs.ExpressAllowed[SLDefs.GetCount[input]]
DO WAIT pony ENDLOOP;
END;
ConsiderExpress: ENTRY PROC = INLINE
{ NOTIFY pony };
-- The synchronization for sitecache flushing and possible remailing is
-- complicated. The cause of the complication is that "Deliver" might
-- have found someone to be local but not placed the message in the
-- mailbox, at the time that we flush the cache. This is prevented by
-- the entry procedures "DeliverLocal" and "FlushCache".
FlushCacheAndRemail: PUBLIC PROC[ who: BodyDefs.RName ] =
BEGIN
FlushCache: ENTRY PROC =
{ SiteCacheDefs.SingleFlush[who] };
info: SiteCacheDefs.RemailInfo;
FlushCache[];
info ← SiteCacheDefs.NeedToRemail[who];
IF info # stillLocal
THEN MailboxDefs.Remail[who: who, valid: info = remail];
END;
StartLog: PROCEDURE =
BEGIN
-- LogDefs.DisplayNumber["Local recipients"L, [short[@localTotal]] ];
-- LogDefs.DisplayNumber["Remote recipients"L, [short[@remoteTotal]] ];
-- LogDefs.DisplayNumber["Bad recipients"L, [short[@badTotal]] ];
END;
Process.DisableTimeout[@sorterCond];
StartLog[];
-- empty queues once synchronously, to get us off the ground --
SortQueue[queue: mailbox];
SortQueue[queue: input];
forever ← TRUE;
Process.Detach[ FORK SortQueue[queue: input] ];
Process.Detach[ FORK SortQueue[queue: mailbox] ];
Process.Detach[ FORK SortQueue[queue: pending] ];
Process.Detach[ FORK USPS[] ];
END.