TapFilterImpl.mesa
Copyright Ó 1989 by Xerox Corporation. All rights reserved.
Doug Terry, July 3, 1990 3:45:58 pm PDT
Theimer, March 1, 1990 2:10 pm PST
Brian Oki, March 3, 1991 1:19 pm PST
Sabel, August 16, 1990 3:58 pm PDT
Commands for adding filters, etc.
DIRECTORY
Ascii USING [Lower],
Atom USING [MakeAtom],
IO USING [PutFR, rope, STREAM, RIS],
LoganBerry USING [Close, Cursor, DeleteEntry, EndGenerate, Entry, Error, GenerateEntries, NextEntry, nullDB, Open, OpenDB, ReadEntry, WriteEntry],
LoganBerryEntry USING [AppendEntries, GetAttr],
LoganQuery USING [AttributePatterns, ComplexCursor, FilterEntries, MatchProc, NextEntry, ReadAttributePatterns, SyntaxError],
PatternMatch USING [DWIM, Lookup],
Process USING [Detach, SecondsToTicks, SetTimeout, Yield],
Rope USING [Cat, Equal, Fetch, Length, ROPE, SkipOver, SkipTo, Substr, Translate, TranslatorType],
TapMsgQueue USING [CursorFromMsgQueue, Duplicate, Get, Msg, MsgField, MsgFromEntry, MsgQueue],
TapFilter;
TapFilterImpl: CEDAR MONITOR
IMPORTS Ascii, Atom, IO, LoganBerry, LoganBerryEntry, LoganQuery, PatternMatch, Process, Rope, TapMsgQueue
EXPORTS TapFilter
~
BEGIN
ROPE: TYPE ~ Rope.ROPE;
STREAM: TYPE ~ IO.STREAM;
Msg: TYPE ~ TapMsgQueue.Msg;
MsgQueue: TYPE ~ TapMsgQueue.MsgQueue;
Annotation: TYPE ~ TapFilter.Annotation;
Query: TYPE ~ TapFilter.Query;
Agent: TYPE ~ TapFilter.Agent;
MonitorProc: TYPE ~ TapFilter.MonitorProc;
Private types:
AgentPtr: TYPE ~ REF AgentRec;
AgentRec:
TYPE ~
RECORD [
filters: Filters ← NIL,
feeder: MsgQueue ← NIL,
feederClone: MsgQueue ← NIL,
user: ROPE ← NIL,
filterDBName: ROPE ← NIL,
annotationDBName: ROPE ← NIL,
annotationDB: LoganBerry.OpenDB ← LoganBerry.nullDB,
mp: MonitorProc ← NIL,
wakeup: CONDITION, -- used to wakeup agent
sleeping: CONDITION, -- used to tell others that agent is about to sleep
idle: BOOLEAN ← TRUE,
terminated: BOOLEAN ← FALSE
];
agentSleepTimeout: CARDINAL ← 600; -- seconds between automatic wakeups
Filters: TYPE ~ LIST OF Filter;
Filter: TYPE ~ REF FilterRec;
FilterRec:
TYPE ~
RECORD [
cursor: LoganQuery.ComplexCursor,
feeder: MsgQueue,
filterID: ROPE,
query: Query,
note: Annotation
];
Error: PUBLIC ERROR [ec: ATOM, explanation: ROPE ← NIL] = CODE;
Agents
Agents process a queue of messages by running them through a set of filters. If a message "passes" a particular filter, then the annotation associated with that filter is attached to the message. Once an agent is created, it exists until it is terminated. When an agent discovers that there are no more messages to be processed, it goes to sleep, i.e. into the idle state. An agent may periodically wakeup and look for more messages or it may be explicitly awaken.
GetAgentInfo:
PROC [agent: Agent]
RETURNS [ap: AgentPtr] ~ {
WITH agent
SELECT
FROM
ap: AgentPtr => RETURN[ap];
ENDCASE => ERROR Error[$BadAgent, "Invalid agent handle."];
};
CreateAgent:
PUBLIC
PROC [feeder: MsgQueue, filterDB:
ROPE, user:
ROPE, annotDB:
ROPE]
RETURNS [agent: Agent] ~ {
Creates a filtering agent for the particular user. This agent reads messages from the feeder message queue. The set of filters are obtained from the given filterDB, while any generated annotations are written to the given annotDB.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB;
cursor: LoganBerry.Cursor;
Allocate agent info.
ap: AgentPtr ← NEW[AgentRec];
ap.feeder ← feeder;
ap.user ← user;
ap.filterDBName ← filterDB;
ap.annotationDBName ← annotDB;
agent ← ap;
Create filter pipelines.
db ← LoganBerry.Open[dbName: ap.filterDBName];
cursor ← LoganBerry.GenerateEntries[db: db, key: $FID]; -- read complete database
ap.feederClone ← ap.feeder;
ap.filters ← NIL;
FOR entry: LoganBerry.Entry ← LoganBerry.NextEntry[cursor: cursor], LoganBerry.NextEntry[cursor: cursor]
WHILE entry #
NIL
DO
filter: Filter ← NEW[FilterRec];
[fid: filter.filterID, query: filter.query, note: filter.note] ← ExtractFilterInfo[entry];
IF filter.query =
NIL
THEN
ERROR Error[$BadFilter, IO.PutFR["No query specified in filter %g.", IO.rope[filter.filterID]]];
[ap.feederClone, filter.feeder] ← TapMsgQueue.Duplicate[queue: ap.feederClone];
filter.cursor ← SetUpQuery[filter.query, filter.feeder];
ap.filters ← CONS[filter, ap.filters];
ENDLOOP;
LoganBerry.EndGenerate[cursor: cursor ! LoganBerry.Error => CONTINUE];
LoganBerry.Close[db: db ! LoganBerry.Error => CONTINUE];
Open and close annotation database to ensure it exists.
ap.annotationDB ← LoganBerry.Open[dbName: ap.annotationDBName];
LoganBerry.Close[db: ap.annotationDB ! LoganBerry.Error => CONTINUE];
Start up filtering agent process.
TRUSTED {
Process.Detach[FORK FilteringAgent[ap]];
};
};
SetUpQuery:
PROC [query: Query, input: MsgQueue]
RETURNS [cursor: LoganQuery.ComplexCursor] ~ {
Returns a cursor that should be used to pull out msgs that match the query.
patterns: LoganQuery.AttributePatterns;
qs: STREAM;
Parse query.
qs ← IO.RIS[query];
patterns ← LoganQuery.ReadAttributePatterns[qs ! LoganQuery.SyntaxError => ERROR Error[$BadFilter, IO.PutFR["Bad query specification: %g.", IO.rope[query]]]];
Build up filters.
cursor ← TapMsgQueue.CursorFromMsgQueue[input];
FOR p: LoganQuery.AttributePatterns ← patterns, p.rest
WHILE p #
NIL
DO
mp: LoganQuery.MatchProc ← PatternMatch.Lookup[IF p.first.ptype=NIL OR Rope.Equal[p.first.ptype, "DWIM", FALSE] THEN PatternMatch.DWIM[p.first.attr.value] ELSE p.first.ptype];
cursor ← LoganQuery.FilterEntries[input: cursor, pattern: p.first.attr.value, filter: mp, atype: p.first.attr.type, stopIfNothingGreater: FALSE];
ENDLOOP;
};
WakeupAgent:
PUBLIC
ENTRY
PROC [agent: Agent]
RETURNS [] ~ {
Prod the given agent to start processing messages. This may not be necessary, but is always a good thing to do after adding messages to the agent's feeder queue.
ENABLE UNWIND => NULL;
ap: AgentPtr ← GetAgentInfo[agent];
NOTIFY ap.wakeup;
};
IsAgentIdle:
PUBLIC
PROC [agent: Agent, wait:
BOOLEAN ←
FALSE]
RETURNS [idle:
BOOLEAN] ~ {
Checks if the given agent is currently idle. If wait=TRUE then the procedure blocks until it can return idle=TRUE.
ENABLE UNWIND => NULL;
WaitTillIdle:
ENTRY
PROC [ap: AgentPtr]
RETURNS [] ~ {
WHILE
NOT ap.idle
DO
WAIT ap.sleeping;
ENDLOOP;
};
ap: AgentPtr ← GetAgentInfo[agent];
IF wait
THEN {
Process.Yield[]; -- give filtering agent a chance to run before declaring it idle
WaitTillIdle[ap];
};
idle ← ap.idle;
};
MonitorAgent:
PUBLIC
PROC [agent: Agent, proc: MonitorProc]
RETURNS [] ~ {
The given MonitorProc is called by the given agent whenever it is about to add an annotation to the database. The MonitorProc is passed information about the message being annotated, the filter that selected that message, and the annotation itself. If the MonitorProc returns doIt=FALSE then the annotation is not added, otherwise it proceeds as planned.
ap: AgentPtr ← GetAgentInfo[agent];
ap.mp ← proc;
};
TerminateAgent:
PUBLIC
ENTRY
PROC [agent: Agent]
RETURNS [] ~ {
Halts and destroys the given agent.
ENABLE UNWIND => NULL;
ap: AgentPtr ← GetAgentInfo[agent];
ap.terminated ← TRUE;
NOTIFY ap.wakeup;
};
FilteringAgent:
PROC [ap: AgentPtr]
RETURNS [] ~ {
Process to retrieve and annotate messages.
WHILE
NOT ap.terminated
DO
gotSomething: BOOLEAN ← FALSE;
Pass messages through each filter.
FOR fL: Filters ← ap.filters, fL.rest
WHILE fL#
NIL
DO
ENABLE LoganBerry.Error => CONTINUE;
FOR msg: LoganBerry.Entry ← LoganQuery.NextEntry[cursor: fL.first.cursor], LoganQuery.NextEntry[cursor: fL.first.cursor]
WHILE msg#
NIL
DO
Add annotations to msg.
msgID: ROPE ← LoganBerryEntry.GetAttr[entry: msg, type: $MsgID];
IF ap.mp =
NIL
OR ap.mp[msgID, TapMsgQueue.MsgFromEntry[msg], fL.first.filterID, fL.first.note]
THEN
[] ← AnnotateMsgI[ap.annotationDB, msgID, fL.first.filterID, fL.first.note];
gotSomething ← TRUE;
ENDLOOP;
ENDLOOP;
Clean out clone of feeder message queue.
FOR msg: Msg ← TapMsgQueue.Get[queue: ap.feederClone], TapMsgQueue.Get[queue: ap.feederClone]
WHILE msg#
NIL
DO
ENDLOOP;
Wait until notified of more work.
IF
NOT gotSomething
THEN
Sleep[ap];
ENDLOOP;
};
Sleep:
ENTRY
PROC[ap: AgentPtr] ~
TRUSTED {
The given agent goes to sleep until told to wake up or until a timeout expires.
ENABLE UNWIND => NULL;
Process.SetTimeout[condition: @ap.wakeup, ticks: Process.SecondsToTicks[agentSleepTimeout]];
ap.idle ← TRUE;
BROADCAST ap.sleeping;
WAIT ap.wakeup;
ap.idle ← FALSE;
};
Filters
Filters are associated with particular users and stored in a database. Filters are queries in the form of LoganQuery attribute patterns and their associated annotations. Each entry in the filter database is a tuple of the form <$FID: user$filtername, $User: user, $Name: filtername, $Query: query, annotation>. An "annotation" is an arbitrary list of type: value pairs. Several entries may exist in the database for each user.
AddFilter:
PUBLIC
PROC [filterDB:
ROPE, user:
ROPE, filterName:
ROPE, query: Query, annot: Annotation, agent: Agent ←
NIL]
RETURNS [filterID:
ROPE] ~ {
Adds a new filter. If agent#NIL then the given agent immediately starts using the new filter.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB ← LoganBerry.Open[dbName: filterDB];
entry: LoganBerry.Entry;
Check that query is valid.
[] ← LoganQuery.ReadAttributePatterns[IO.RIS[query] ! LoganQuery.SyntaxError => ERROR Error[$BadFilter, IO.PutFR["Bad query specification: %g.", IO.rope[query]]]];
Write filter to database.
entry ← annot;
entry ← CONS[[$Query, query], entry];
entry ← CONS[[$Name, filterName], entry];
entry ← CONS[[$User, user], entry];
filterID ← Rope.Cat[user, "$", filterName];
entry ← CONS[[$FID, filterID], entry];
LoganBerry.WriteEntry[db: db, entry: entry];
Add filter to agent's filter list.
IF agent #
NIL
THEN {
ap: AgentPtr ← GetAgentInfo[agent];
filter: Filter ← NEW[FilterRec ← [filterID: filterID, query: query, note: annot]];
[ap.feederClone, filter.feeder] ← TapMsgQueue.Duplicate[queue: ap.feederClone];
filter.cursor ← SetUpQuery[filter.query, filter.feeder];
ap.filters ← CONS[filter, ap.filters];
};
};
DeleteFilter:
PUBLIC
PROC [filterDB:
ROPE, filterID:
ROPE, agent: Agent ←
NIL]
RETURNS [] ~ {
Effects: Removes the given filter. If agent#NIL then the given agent stops using the specified filter.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
newList, val: Filters;
db: LoganBerry.OpenDB;
newList ← NIL;
val ← NIL;
db ← LoganBerry.Open[dbName: filterDB];
First lookup filter to see if in database
[] ← LookupFilter[filterDB, filterID !
LoganBerry.Error => ERROR Error[$NonExistentFilter, "Cannot delete non-existent filter"]
];
Delete filter from database.
[] ← LoganBerry.DeleteEntry[db: db, key: $FID, value: filterID];
Delete filter from agent's filter list.
IF agent #
NIL
THEN {
ap: AgentPtr ← GetAgentInfo[agent];
Remove the filter by constructing a new list excluding that filter
UNTIL ap.filters =
NIL
DO
IF ap.filters.first.filterID ~= filterID
THEN
{
IF val =
NIL
THEN {val ←
CONS[ap.filters.first,
NIL]; newList ← val}
ELSE {newList.rest ← CONS[ap.filters.first, newList.rest]; newList ← newList.rest}
};
ap.filters ← ap.filters.rest;
ENDLOOP;
ap.filters ← val;
};
};
LookupFilter:
PUBLIC
PROC [filterDB:
ROPE, filterID:
ROPE]
RETURNS [filterName, user:
ROPE, query: Query, annot: Annotation] ~ {
Effects: Returns information about the given filter. If filter doesn't exist, filterName is NIL.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB;
entry: LoganBerry.Entry;
fid, name: ROPE;
others: BOOLEAN;
note: Annotation;
Open the filter database
db ← LoganBerry.Open[dbName: filterDB];
[entry, others] ← LoganBerry.ReadEntry[db: db, key: $FID, value: filterID];
[fid, user, name, query, note] ← ExtractFilterInfo[entry];
IF fid =
NIL
THEN {
-- no filter in database!
name ← NIL;
};
RETURN[name, user, query, note];
};
LookupAllFilters:
PUBLIC
PROC [filterDB:
ROPE]
RETURNS [
LIST
OF TapFilter.FilterInfo] ~ {
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB;
cursor: LoganBerry.Cursor;
fid, name, user: ROPE;
query: Query;
note: Annotation;
stuff: LIST OF TapFilter.FilterInfo ← NIL;
Open the filter database
db ← LoganBerry.Open[dbName: filterDB];
cursor ← LoganBerry.GenerateEntries[db: db, key: $FID, start: NIL, end: NIL];
FOR entry: LoganBerry.Entry ← LoganBerry.NextEntry[cursor: cursor], LoganBerry.NextEntry[cursor: cursor]
WHILE entry #
NIL
DO
[fid, user, name, query, note] ← ExtractFilterInfo[entry];
stuff ← CONS[[name, user, query, note], stuff];
ENDLOOP;
LoganBerry.EndGenerate[cursor: cursor ! LoganBerry.Error => CONTINUE];
RETURN[stuff];
};
ExistsFilter:
PUBLIC
PROC [filterDB:
ROPE, filterID:
ROPE]
RETURNS [
BOOLEAN] ~ {
db: LoganBerry.OpenDB;
entry: LoganBerry.Entry;
others: BOOLEAN;
Open the filter database
db ← LoganBerry.Open[dbName: filterDB];
[entry, others] ← LoganBerry.ReadEntry[db: db, key: $FID, value: filterID];
IF entry # NIL THEN RETURN[TRUE] ELSE RETURN[FALSE];
};
ExtractFilterInfo:
PROC [entry: LoganBerry.Entry]
RETURNS [fid, user, name, query:
ROPE ←
NIL, note: Annotation ←
NIL] ~ {
FOR aL: LoganBerry.Entry ← entry, aL.rest
WHILE aL#
NIL
DO
SELECT aL.first.type
FROM
$FID => fid ← aL.first.value;
$User => user ← aL.first.value;
$Name => name ← aL.first.value;
$Query => {
query ← aL.first.value;
note ← aL.rest;
EXIT;
};
ENDCASE => NULL; -- unexpected attribute?
ENDLOOP;
};
Annotations
Annotations are associated with particular messages and stored in a database. Each entry is a tuple of the form <$AID: msgID, $FID: filterID, annotation, ..., $FID: filterID, annotation>. Any number of annotations may exist for each message. Only a single entry may exist in the database for each msgID.
AnnotateMsg:
PUBLIC
PROC [annotDB:
ROPE, msgID:
ROPE, filterID:
ROPE, annot: Annotation]
RETURNS [annotID:
ROPE] ~ {
Adds an annotation for the given message. Creates or updates the msg's entry in the annotation database. A single database entry exists for each message; thus, if a message is annotated more than once, new annotations are added to the previous ones.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB ← LoganBerry.Open[dbName: annotDB];
annotID ← AnnotateMsgI[db, msgID, filterID, annot];
};
AnnotateMsgI:
PROC [db: LoganBerry.OpenDB, msgID:
ROPE, filterID:
ROPE, annot: Annotation]
RETURNS [annotID:
ROPE] ~ {
Like AnnotateMsg but takes an OpenDB handle rather than a database name.
existing, new: LoganBerry.Entry;
existing ← LoganBerry.ReadEntry[db: db, key: $AID, value: msgID ! LoganBerry.Error => {existing ← NIL; CONTINUE}].entry;
new ← IF existing = NIL THEN LIST[[$AID, msgID]] ELSE existing;
new ← LoganBerryEntry.AppendEntries[new, LIST[[$FID, filterID]]];
new ← LoganBerryEntry.AppendEntries[new, annot];
LoganBerry.WriteEntry[db: db, entry: new, replace: existing # NIL];
annotID ← msgID;
};
GetAnnotations:
PUBLIC
PROC [annotDB:
ROPE, msgID:
ROPE]
RETURNS [annot: Annotation] ~ {
Returns all annotations for the given message.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB ← LoganBerry.Open[dbName: annotDB];
annot ← LoganBerry.ReadEntry[db: db, key: $AID, value: msgID].entry;
};
Utilities
ParseMsgIntoFields:
PUBLIC
PROC [msgtext:
ROPE]
RETURNS [msg: Msg] ~ {
Takes a textual message and parses it into a list of fields and values. A field is some text of the form "field: value" where the field name must start at the beginning of a line and may not contain whitespace. Text that cannot be parsed into fields is returned as a field of type $text. Note: could, and should, possibly use GVMailParse for some or all of this.
AppendToMsg:
PROC [f: TapMsgQueue.MsgField] ~ {
IF msg =
NIL
THEN {msg ← LIST[f]; msgTail ← msg}
ELSE {msgTail.rest ← LIST[f]; msgTail ← msgTail.rest;};
};
ToLower:
PROC [rope:
ROPE]
RETURNS [low:
ROPE] ~ {
Lower: Rope.TranslatorType = {
[old: CHAR] RETURNS [new: CHAR]
RETURN [Ascii.Lower[old]];
};
low ← Rope.Translate[base: rope, translator: Lower];
};
linestart, textstart, nextcolon, nextcr: INT ← -1;
msgtextlength: INT ← Rope.Length[msgtext];
msgTail: LIST OF TapMsgQueue.MsgField ← NIL;
msg ← NIL;
linestart ← 0;
WHILE linestart < msgtextlength
DO
-- for each line in message
leadingWhitespace: BOOLEAN ← Rope.Fetch[base: msgtext, index: linestart] IN [0C .. ' ];
nextcolon ← Rope.SkipTo[s: msgtext, pos: linestart, skip: ":"];
nextcr ← Rope.SkipTo[s: msgtext, pos: linestart, skip: "\n\l\r"];
IF nextcolon >= msgtextlength
OR nextcr >= msgtextlength
THEN
-- hit end of msg text
nextcr ← msgtextlength - 1; -- pretend next CR is at msg end
IF nextcolon < nextcr
AND
NOT leadingWhitespace
THEN {
-- found field
valuestart: INT;
IF textstart # -1
THEN {
-- output previous text segment
AppendToMsg[[type: $text, value: Rope.Substr[base: msgtext, start: textstart, len: linestart-textstart]]];
textstart ← -1;
};
valuestart ← Rope.SkipOver[s: msgtext, pos: nextcolon+1, skip: " \t"]; -- skip spaces
AppendToMsg[[type: Atom.MakeAtom[ToLower[Rope.Substr[base: msgtext, start: linestart, len: nextcolon-linestart]]], value: Rope.Substr[base: msgtext, start: valuestart, len: nextcr-valuestart]]];
}
ELSE {
-- another line of text
IF textstart = -1
THEN textstart ← linestart -- start new text segment
ELSE NULL; -- continuation of previous text segment
};
linestart ← nextcr + 1;
ENDLOOP;
IF textstart # -1
THEN {
-- output last text segment
AppendToMsg[[type: $text, value: Rope.Substr[base: msgtext, start: textstart, len: msgtextlength-textstart]]];
};
};
END.