TapFilterImpl.mesa
Copyright Ó 1989 by Xerox Corporation. All rights reserved.
Doug Terry, July 3, 1990 3:45:58 pm PDT
Theimer, March 1, 1990 2:10 pm PST
Brian Oki, March 3, 1991 1:19 pm PST
Sabel, August 16, 1990 3:58 pm PDT
Commands for adding filters, etc.
DIRECTORY
Ascii USING [Lower],
Atom USING [MakeAtom],
IO USING [PutFR, rope, STREAM, RIS],
LoganBerry USING [Close, Cursor, DeleteEntry, EndGenerate, Entry, Error, GenerateEntries, NextEntry, nullDB, Open, OpenDB, ReadEntry, WriteEntry],
LoganBerryEntry USING [AppendEntries, GetAttr],
LoganQuery USING [AttributePatterns, ComplexCursor, FilterEntries, MatchProc, NextEntry, ReadAttributePatterns, SyntaxError],
PatternMatch USING [DWIM, Lookup],
Process USING [Detach, SecondsToTicks, SetTimeout, Yield],
Rope USING [Cat, Equal, Fetch, Length, ROPE, SkipOver, SkipTo, Substr, Translate, TranslatorType],
TapMsgQueue USING [CursorFromMsgQueue, Duplicate, Get, Msg, MsgField, MsgFromEntry, MsgQueue],
TapFilter;
TapFilterImpl: CEDAR MONITOR
IMPORTS Ascii, Atom, IO, LoganBerry, LoganBerryEntry, LoganQuery, PatternMatch, Process, Rope, TapMsgQueue
EXPORTS TapFilter
~ BEGIN
ROPE: TYPE ~ Rope.ROPE;
STREAM: TYPE ~ IO.STREAM;
Msg: TYPE ~ TapMsgQueue.Msg;
MsgQueue: TYPE ~ TapMsgQueue.MsgQueue;
Annotation: TYPE ~ TapFilter.Annotation;
Query: TYPE ~ TapFilter.Query;
Agent: TYPE ~ TapFilter.Agent;
MonitorProc: TYPE ~ TapFilter.MonitorProc;
Private types:
AgentPtr: TYPE ~ REF AgentRec;
AgentRec: TYPE ~ RECORD [
filters: Filters ← NIL,
feeder: MsgQueue ← NIL,
feederClone: MsgQueue ← NIL,
user: ROPENIL,
filterDBName: ROPENIL,
annotationDBName: ROPENIL,
annotationDB: LoganBerry.OpenDB ← LoganBerry.nullDB,
mp: MonitorProc ← NIL,
wakeup: CONDITION, -- used to wakeup agent
sleeping: CONDITION, -- used to tell others that agent is about to sleep
idle: BOOLEANTRUE,
terminated: BOOLEANFALSE
];
agentSleepTimeout: CARDINAL ← 600; -- seconds between automatic wakeups
Filters: TYPE ~ LIST OF Filter;
Filter: TYPE ~ REF FilterRec;
FilterRec: TYPE ~ RECORD [
cursor: LoganQuery.ComplexCursor,
feeder: MsgQueue,
filterID: ROPE,
query: Query,
note: Annotation
];
Error: PUBLIC ERROR [ec: ATOM, explanation: ROPENIL] = CODE;
Agents
Agents process a queue of messages by running them through a set of filters. If a message "passes" a particular filter, then the annotation associated with that filter is attached to the message. Once an agent is created, it exists until it is terminated. When an agent discovers that there are no more messages to be processed, it goes to sleep, i.e. into the idle state. An agent may periodically wakeup and look for more messages or it may be explicitly awaken.
GetAgentInfo: PROC [agent: Agent] RETURNS [ap: AgentPtr] ~ {
WITH agent SELECT FROM
ap: AgentPtr => RETURN[ap];
ENDCASE => ERROR Error[$BadAgent, "Invalid agent handle."];
};
CreateAgent: PUBLIC PROC [feeder: MsgQueue, filterDB: ROPE, user: ROPE, annotDB: ROPE] RETURNS [agent: Agent] ~ {
Creates a filtering agent for the particular user. This agent reads messages from the feeder message queue. The set of filters are obtained from the given filterDB, while any generated annotations are written to the given annotDB.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB;
cursor: LoganBerry.Cursor;
Allocate agent info.
ap: AgentPtr ← NEW[AgentRec];
ap.feeder ← feeder;
ap.user ← user;
ap.filterDBName ← filterDB;
ap.annotationDBName ← annotDB;
agent ← ap;
Create filter pipelines.
db ← LoganBerry.Open[dbName: ap.filterDBName];
cursor ← LoganBerry.GenerateEntries[db: db, key: $FID]; -- read complete database
ap.feederClone ← ap.feeder;
ap.filters ← NIL;
FOR entry: LoganBerry.Entry ← LoganBerry.NextEntry[cursor: cursor], LoganBerry.NextEntry[cursor: cursor] WHILE entry # NIL DO
filter: Filter ← NEW[FilterRec];
[fid: filter.filterID, query: filter.query, note: filter.note] ← ExtractFilterInfo[entry];
IF filter.query = NIL THEN
ERROR Error[$BadFilter, IO.PutFR["No query specified in filter %g.", IO.rope[filter.filterID]]];
[ap.feederClone, filter.feeder] ← TapMsgQueue.Duplicate[queue: ap.feederClone];
filter.cursor ← SetUpQuery[filter.query, filter.feeder];
ap.filters ← CONS[filter, ap.filters];
ENDLOOP;
LoganBerry.EndGenerate[cursor: cursor ! LoganBerry.Error => CONTINUE];
LoganBerry.Close[db: db ! LoganBerry.Error => CONTINUE];
Open and close annotation database to ensure it exists.
ap.annotationDB ← LoganBerry.Open[dbName: ap.annotationDBName];
LoganBerry.Close[db: ap.annotationDB ! LoganBerry.Error => CONTINUE];
Start up filtering agent process.
TRUSTED {
Process.Detach[FORK FilteringAgent[ap]];
};
};
SetUpQuery: PROC [query: Query, input: MsgQueue] RETURNS [cursor: LoganQuery.ComplexCursor] ~ {
Returns a cursor that should be used to pull out msgs that match the query.
patterns: LoganQuery.AttributePatterns;
qs: STREAM;
Parse query.
qs ← IO.RIS[query];
patterns ← LoganQuery.ReadAttributePatterns[qs ! LoganQuery.SyntaxError => ERROR Error[$BadFilter, IO.PutFR["Bad query specification: %g.", IO.rope[query]]]];
Build up filters.
cursor ← TapMsgQueue.CursorFromMsgQueue[input];
FOR p: LoganQuery.AttributePatterns ← patterns, p.rest WHILE p # NIL DO
mp: LoganQuery.MatchProc ← PatternMatch.Lookup[IF p.first.ptype=NIL OR Rope.Equal[p.first.ptype, "DWIM", FALSE] THEN PatternMatch.DWIM[p.first.attr.value] ELSE p.first.ptype];
cursor ← LoganQuery.FilterEntries[input: cursor, pattern: p.first.attr.value, filter: mp, atype: p.first.attr.type, stopIfNothingGreater: FALSE];
ENDLOOP;
};
WakeupAgent: PUBLIC ENTRY PROC [agent: Agent] RETURNS [] ~ {
Prod the given agent to start processing messages. This may not be necessary, but is always a good thing to do after adding messages to the agent's feeder queue.
ENABLE UNWIND => NULL;
ap: AgentPtr ← GetAgentInfo[agent];
NOTIFY ap.wakeup;
};
IsAgentIdle: PUBLIC PROC [agent: Agent, wait: BOOLEANFALSE] RETURNS [idle: BOOLEAN] ~ {
Checks if the given agent is currently idle. If wait=TRUE then the procedure blocks until it can return idle=TRUE.
ENABLE UNWIND => NULL;
WaitTillIdle: ENTRY PROC [ap: AgentPtr] RETURNS [] ~ {
WHILE NOT ap.idle DO
WAIT ap.sleeping;
ENDLOOP;
};
ap: AgentPtr ← GetAgentInfo[agent];
IF wait THEN {
Process.Yield[]; -- give filtering agent a chance to run before declaring it idle
WaitTillIdle[ap];
};
idle ← ap.idle;
};
MonitorAgent: PUBLIC PROC [agent: Agent, proc: MonitorProc] RETURNS [] ~ {
The given MonitorProc is called by the given agent whenever it is about to add an annotation to the database. The MonitorProc is passed information about the message being annotated, the filter that selected that message, and the annotation itself. If the MonitorProc returns doIt=FALSE then the annotation is not added, otherwise it proceeds as planned.
ap: AgentPtr ← GetAgentInfo[agent];
ap.mp ← proc;
};
TerminateAgent: PUBLIC ENTRY PROC [agent: Agent] RETURNS [] ~ {
Halts and destroys the given agent.
ENABLE UNWIND => NULL;
ap: AgentPtr ← GetAgentInfo[agent];
ap.terminated ← TRUE;
NOTIFY ap.wakeup;
};
FilteringAgent: PROC [ap: AgentPtr] RETURNS [] ~ {
Process to retrieve and annotate messages.
WHILE NOT ap.terminated DO
gotSomething: BOOLEANFALSE;
Pass messages through each filter.
FOR fL: Filters ← ap.filters, fL.rest WHILE fL#NIL DO
ENABLE LoganBerry.Error => CONTINUE;
FOR msg: LoganBerry.Entry ← LoganQuery.NextEntry[cursor: fL.first.cursor], LoganQuery.NextEntry[cursor: fL.first.cursor] WHILE msg#NIL DO
Add annotations to msg.
msgID: ROPE ← LoganBerryEntry.GetAttr[entry: msg, type: $MsgID];
IF ap.mp = NIL OR ap.mp[msgID, TapMsgQueue.MsgFromEntry[msg], fL.first.filterID, fL.first.note] THEN
[] ← AnnotateMsgI[ap.annotationDB, msgID, fL.first.filterID, fL.first.note];
gotSomething ← TRUE;
ENDLOOP;
ENDLOOP;
Clean out clone of feeder message queue.
FOR msg: Msg ← TapMsgQueue.Get[queue: ap.feederClone], TapMsgQueue.Get[queue: ap.feederClone] WHILE msg#NIL DO
ENDLOOP;
Wait until notified of more work.
IF NOT gotSomething THEN
Sleep[ap];
ENDLOOP;
};
Sleep: ENTRY PROC[ap: AgentPtr] ~ TRUSTED {
The given agent goes to sleep until told to wake up or until a timeout expires.
ENABLE UNWIND => NULL;
Process.SetTimeout[condition: @ap.wakeup, ticks: Process.SecondsToTicks[agentSleepTimeout]];
ap.idle ← TRUE;
BROADCAST ap.sleeping;
WAIT ap.wakeup;
ap.idle ← FALSE;
};
Filters
Filters are associated with particular users and stored in a database. Filters are queries in the form of LoganQuery attribute patterns and their associated annotations. Each entry in the filter database is a tuple of the form <$FID: user$filtername, $User: user, $Name: filtername, $Query: query, annotation>. An "annotation" is an arbitrary list of type: value pairs. Several entries may exist in the database for each user.
AddFilter: PUBLIC PROC [filterDB: ROPE, user: ROPE, filterName: ROPE, query: Query, annot: Annotation, agent: Agent ← NIL] RETURNS [filterID: ROPE] ~ {
Adds a new filter. If agent#NIL then the given agent immediately starts using the new filter.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB ← LoganBerry.Open[dbName: filterDB];
entry: LoganBerry.Entry;
Check that query is valid.
[] ← LoganQuery.ReadAttributePatterns[IO.RIS[query] ! LoganQuery.SyntaxError => ERROR Error[$BadFilter, IO.PutFR["Bad query specification: %g.", IO.rope[query]]]];
Write filter to database.
entry ← annot;
entry ← CONS[[$Query, query], entry];
entry ← CONS[[$Name, filterName], entry];
entry ← CONS[[$User, user], entry];
filterID ← Rope.Cat[user, "$", filterName];
entry ← CONS[[$FID, filterID], entry];
LoganBerry.WriteEntry[db: db, entry: entry];
Add filter to agent's filter list.
IF agent # NIL THEN {
ap: AgentPtr ← GetAgentInfo[agent];
filter: Filter ← NEW[FilterRec ← [filterID: filterID, query: query, note: annot]];
[ap.feederClone, filter.feeder] ← TapMsgQueue.Duplicate[queue: ap.feederClone];
filter.cursor ← SetUpQuery[filter.query, filter.feeder];
ap.filters ← CONS[filter, ap.filters];
};
};
DeleteFilter: PUBLIC PROC [filterDB: ROPE, filterID: ROPE, agent: Agent ← NIL] RETURNS [] ~ {
Effects: Removes the given filter. If agent#NIL then the given agent stops using the specified filter.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
newList, val: Filters;
db: LoganBerry.OpenDB;
newList ← NIL;
val ← NIL;
db ← LoganBerry.Open[dbName: filterDB];
First lookup filter to see if in database
[] ← LookupFilter[filterDB, filterID !
LoganBerry.Error => ERROR Error[$NonExistentFilter, "Cannot delete non-existent filter"]
];
Delete filter from database.
[] ← LoganBerry.DeleteEntry[db: db, key: $FID, value: filterID];
Delete filter from agent's filter list.
IF agent # NIL THEN {
ap: AgentPtr ← GetAgentInfo[agent];
Remove the filter by constructing a new list excluding that filter
UNTIL ap.filters = NIL DO
IF ap.filters.first.filterID ~= filterID THEN
{IF val = NIL THEN {val ← CONS[ap.filters.first, NIL]; newList ← val}
ELSE {newList.rest ← CONS[ap.filters.first, newList.rest]; newList ← newList.rest}
};
ap.filters ← ap.filters.rest;
ENDLOOP;
ap.filters ← val;
};
};
LookupFilter: PUBLIC PROC [filterDB: ROPE, filterID: ROPE] RETURNS [filterName, user: ROPE, query: Query, annot: Annotation] ~ {
Effects: Returns information about the given filter. If filter doesn't exist, filterName is NIL.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB;
entry: LoganBerry.Entry;
fid, name: ROPE;
others: BOOLEAN;
note: Annotation;
Open the filter database
db ← LoganBerry.Open[dbName: filterDB];
[entry, others] ← LoganBerry.ReadEntry[db: db, key: $FID, value: filterID];
[fid, user, name, query, note] ← ExtractFilterInfo[entry];
IF fid = NIL THEN { -- no filter in database!
name ← NIL;
};
RETURN[name, user, query, note];
};
LookupAllFilters: PUBLIC PROC [filterDB: ROPE] RETURNS [LIST OF TapFilter.FilterInfo] ~ {
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB;
cursor: LoganBerry.Cursor;
fid, name, user: ROPE;
query: Query;
note: Annotation;
stuff: LIST OF TapFilter.FilterInfo ← NIL;
Open the filter database
db ← LoganBerry.Open[dbName: filterDB];
cursor ← LoganBerry.GenerateEntries[db: db, key: $FID, start: NIL, end: NIL];
FOR entry: LoganBerry.Entry ← LoganBerry.NextEntry[cursor: cursor], LoganBerry.NextEntry[cursor: cursor] WHILE entry # NIL DO
[fid, user, name, query, note] ← ExtractFilterInfo[entry];
stuff ← CONS[[name, user, query, note], stuff];
ENDLOOP;
LoganBerry.EndGenerate[cursor: cursor ! LoganBerry.Error => CONTINUE];
RETURN[stuff];
};
ExistsFilter: PUBLIC PROC [filterDB: ROPE, filterID: ROPE] RETURNS [BOOLEAN] ~ {
db: LoganBerry.OpenDB;
entry: LoganBerry.Entry;
others: BOOLEAN;
Open the filter database
db ← LoganBerry.Open[dbName: filterDB];
[entry, others] ← LoganBerry.ReadEntry[db: db, key: $FID, value: filterID];
IF entry # NIL THEN RETURN[TRUE] ELSE RETURN[FALSE];
};
ExtractFilterInfo: PROC [entry: LoganBerry.Entry] RETURNS [fid, user, name, query: ROPENIL, note: Annotation ← NIL] ~ {
FOR aL: LoganBerry.Entry ← entry, aL.rest WHILE aL#NIL DO
SELECT aL.first.type FROM
$FID => fid ← aL.first.value;
$User => user ← aL.first.value;
$Name => name ← aL.first.value;
$Query => {
query ← aL.first.value;
note ← aL.rest;
EXIT;
};
ENDCASE => NULL; -- unexpected attribute?
ENDLOOP;
};
Annotations
Annotations are associated with particular messages and stored in a database. Each entry is a tuple of the form <$AID: msgID, $FID: filterID, annotation, ..., $FID: filterID, annotation>. Any number of annotations may exist for each message. Only a single entry may exist in the database for each msgID.
AnnotateMsg: PUBLIC PROC [annotDB: ROPE, msgID: ROPE, filterID: ROPE, annot: Annotation] RETURNS [annotID: ROPE] ~ {
Adds an annotation for the given message. Creates or updates the msg's entry in the annotation database. A single database entry exists for each message; thus, if a message is annotated more than once, new annotations are added to the previous ones.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB ← LoganBerry.Open[dbName: annotDB];
annotID ← AnnotateMsgI[db, msgID, filterID, annot];
};
AnnotateMsgI: PROC [db: LoganBerry.OpenDB, msgID: ROPE, filterID: ROPE, annot: Annotation] RETURNS [annotID: ROPE] ~ {
Like AnnotateMsg but takes an OpenDB handle rather than a database name.
existing, new: LoganBerry.Entry;
existing ← LoganBerry.ReadEntry[db: db, key: $AID, value: msgID ! LoganBerry.Error => {existing ← NIL; CONTINUE}].entry;
new ← IF existing = NIL THEN LIST[[$AID, msgID]] ELSE existing;
new ← LoganBerryEntry.AppendEntries[new, LIST[[$FID, filterID]]];
new ← LoganBerryEntry.AppendEntries[new, annot];
LoganBerry.WriteEntry[db: db, entry: new, replace: existing # NIL];
annotID ← msgID;
};
GetAnnotations: PUBLIC PROC [annotDB: ROPE, msgID: ROPE] RETURNS [annot: Annotation] ~ {
Returns all annotations for the given message.
ENABLE LoganBerry.Error => ERROR Error[ec, explanation];
db: LoganBerry.OpenDB ← LoganBerry.Open[dbName: annotDB];
annot ← LoganBerry.ReadEntry[db: db, key: $AID, value: msgID].entry;
};
Utilities
ParseMsgIntoFields: PUBLIC PROC [msgtext: ROPE] RETURNS [msg: Msg] ~ {
Takes a textual message and parses it into a list of fields and values. A field is some text of the form "field: value" where the field name must start at the beginning of a line and may not contain whitespace. Text that cannot be parsed into fields is returned as a field of type $text. Note: could, and should, possibly use GVMailParse for some or all of this.
AppendToMsg: PROC [f: TapMsgQueue.MsgField] ~ {
IF msg = NIL
THEN {msg ← LIST[f]; msgTail ← msg}
ELSE {msgTail.rest ← LIST[f]; msgTail ← msgTail.rest;};
};
ToLower: PROC [rope: ROPE] RETURNS [low: ROPE] ~ {
Lower: Rope.TranslatorType = {
[old: CHAR] RETURNS [new: CHAR]
RETURN [Ascii.Lower[old]];
};
low ← Rope.Translate[base: rope, translator: Lower];
};
linestart, textstart, nextcolon, nextcr: INT ← -1;
msgtextlength: INT ← Rope.Length[msgtext];
msgTail: LIST OF TapMsgQueue.MsgField ← NIL;
msg ← NIL;
linestart ← 0;
WHILE linestart < msgtextlength DO -- for each line in message
leadingWhitespace: BOOLEAN ← Rope.Fetch[base: msgtext, index: linestart] IN [0C .. ' ];
nextcolon ← Rope.SkipTo[s: msgtext, pos: linestart, skip: ":"];
nextcr ← Rope.SkipTo[s: msgtext, pos: linestart, skip: "\n\l\r"];
IF nextcolon >= msgtextlength OR nextcr >= msgtextlength THEN -- hit end of msg text
nextcr ← msgtextlength - 1; -- pretend next CR is at msg end
IF nextcolon < nextcr AND NOT leadingWhitespace
THEN { -- found field
valuestart: INT;
IF textstart # -1 THEN { -- output previous text segment
AppendToMsg[[type: $text, value: Rope.Substr[base: msgtext, start: textstart, len: linestart-textstart]]];
textstart ← -1;
};
valuestart ← Rope.SkipOver[s: msgtext, pos: nextcolon+1, skip: " \t"]; -- skip spaces
AppendToMsg[[type: Atom.MakeAtom[ToLower[Rope.Substr[base: msgtext, start: linestart, len: nextcolon-linestart]]], value: Rope.Substr[base: msgtext, start: valuestart, len: nextcr-valuestart]]];
}
ELSE { -- another line of text
IF textstart = -1
THEN textstart ← linestart -- start new text segment
ELSE NULL; -- continuation of previous text segment
};
linestart ← nextcr + 1;
ENDLOOP;
IF textstart # -1 THEN { -- output last text segment
AppendToMsg[[type: $text, value: Rope.Substr[base: msgtext, start: textstart, len: msgtextlength-textstart]]];
};
};
END.