<> <> <> <> <> <> <<>> <<>> <> DIRECTORY Ascii USING [Lower], Atom USING [MakeAtom], IO USING [PutFR, rope, STREAM, RIS], LoganBerry USING [Close, Cursor, DeleteEntry, EndGenerate, Entry, Error, GenerateEntries, NextEntry, nullDB, Open, OpenDB, ReadEntry, WriteEntry], LoganBerryEntry USING [AppendEntries, GetAttr], LoganQuery USING [AttributePatterns, ComplexCursor, FilterEntries, MatchProc, NextEntry, ReadAttributePatterns, SyntaxError], PatternMatch USING [DWIM, Lookup], Process USING [Detach, SecondsToTicks, SetTimeout, Yield], Rope USING [Cat, Equal, Fetch, Length, ROPE, SkipOver, SkipTo, Substr, Translate, TranslatorType], TapMsgQueue USING [CursorFromMsgQueue, Duplicate, Get, Msg, MsgField, MsgFromEntry, MsgQueue], TapFilter; TapFilterImpl: CEDAR MONITOR IMPORTS Ascii, Atom, IO, LoganBerry, LoganBerryEntry, LoganQuery, PatternMatch, Process, Rope, TapMsgQueue EXPORTS TapFilter ~ BEGIN ROPE: TYPE ~ Rope.ROPE; STREAM: TYPE ~ IO.STREAM; Msg: TYPE ~ TapMsgQueue.Msg; MsgQueue: TYPE ~ TapMsgQueue.MsgQueue; Annotation: TYPE ~ TapFilter.Annotation; Query: TYPE ~ TapFilter.Query; Agent: TYPE ~ TapFilter.Agent; MonitorProc: TYPE ~ TapFilter.MonitorProc; <<>> <> <<>> AgentPtr: TYPE ~ REF AgentRec; AgentRec: TYPE ~ RECORD [ filters: Filters _ NIL, feeder: MsgQueue _ NIL, feederClone: MsgQueue _ NIL, user: ROPE _ NIL, filterDBName: ROPE _ NIL, annotationDBName: ROPE _ NIL, annotationDB: LoganBerry.OpenDB _ LoganBerry.nullDB, mp: MonitorProc _ NIL, wakeup: CONDITION, -- used to wakeup agent sleeping: CONDITION, -- used to tell others that agent is about to sleep idle: BOOLEAN _ TRUE, terminated: BOOLEAN _ FALSE ]; agentSleepTimeout: CARDINAL _ 600; -- seconds between automatic wakeups Filters: TYPE ~ LIST OF Filter; Filter: TYPE ~ REF FilterRec; FilterRec: TYPE ~ RECORD [ cursor: LoganQuery.ComplexCursor, feeder: MsgQueue, filterID: ROPE, query: Query, note: Annotation ]; Error: PUBLIC ERROR [ec: ATOM, explanation: ROPE _ NIL] = CODE; <> <> GetAgentInfo: PROC [agent: Agent] RETURNS [ap: AgentPtr] ~ { WITH agent SELECT FROM ap: AgentPtr => RETURN[ap]; ENDCASE => ERROR Error[$BadAgent, "Invalid agent handle."]; }; CreateAgent: PUBLIC PROC [feeder: MsgQueue, filterDB: ROPE, user: ROPE, annotDB: ROPE] RETURNS [agent: Agent] ~ { <> ENABLE LoganBerry.Error => ERROR Error[ec, explanation]; db: LoganBerry.OpenDB; cursor: LoganBerry.Cursor; <> ap: AgentPtr _ NEW[AgentRec]; ap.feeder _ feeder; ap.user _ user; ap.filterDBName _ filterDB; ap.annotationDBName _ annotDB; agent _ ap; <> db _ LoganBerry.Open[dbName: ap.filterDBName]; cursor _ LoganBerry.GenerateEntries[db: db, key: $FID]; -- read complete database ap.feederClone _ ap.feeder; ap.filters _ NIL; FOR entry: LoganBerry.Entry _ LoganBerry.NextEntry[cursor: cursor], LoganBerry.NextEntry[cursor: cursor] WHILE entry # NIL DO filter: Filter _ NEW[FilterRec]; [fid: filter.filterID, query: filter.query, note: filter.note] _ ExtractFilterInfo[entry]; IF filter.query = NIL THEN ERROR Error[$BadFilter, IO.PutFR["No query specified in filter %g.", IO.rope[filter.filterID]]]; [ap.feederClone, filter.feeder] _ TapMsgQueue.Duplicate[queue: ap.feederClone]; filter.cursor _ SetUpQuery[filter.query, filter.feeder]; ap.filters _ CONS[filter, ap.filters]; ENDLOOP; LoganBerry.EndGenerate[cursor: cursor ! LoganBerry.Error => CONTINUE]; LoganBerry.Close[db: db ! LoganBerry.Error => CONTINUE]; <> ap.annotationDB _ LoganBerry.Open[dbName: ap.annotationDBName]; LoganBerry.Close[db: ap.annotationDB ! LoganBerry.Error => CONTINUE]; <> TRUSTED { Process.Detach[FORK FilteringAgent[ap]]; }; }; <<>> SetUpQuery: PROC [query: Query, input: MsgQueue] RETURNS [cursor: LoganQuery.ComplexCursor] ~ { <> patterns: LoganQuery.AttributePatterns; qs: STREAM; <<>> <> qs _ IO.RIS[query]; patterns _ LoganQuery.ReadAttributePatterns[qs ! LoganQuery.SyntaxError => ERROR Error[$BadFilter, IO.PutFR["Bad query specification: %g.", IO.rope[query]]]]; <<>> <> cursor _ TapMsgQueue.CursorFromMsgQueue[input]; FOR p: LoganQuery.AttributePatterns _ patterns, p.rest WHILE p # NIL DO mp: LoganQuery.MatchProc _ PatternMatch.Lookup[IF p.first.ptype=NIL OR Rope.Equal[p.first.ptype, "DWIM", FALSE] THEN PatternMatch.DWIM[p.first.attr.value] ELSE p.first.ptype]; cursor _ LoganQuery.FilterEntries[input: cursor, pattern: p.first.attr.value, filter: mp, atype: p.first.attr.type, stopIfNothingGreater: FALSE]; ENDLOOP; }; WakeupAgent: PUBLIC ENTRY PROC [agent: Agent] RETURNS [] ~ { <> ENABLE UNWIND => NULL; ap: AgentPtr _ GetAgentInfo[agent]; NOTIFY ap.wakeup; }; <<>> IsAgentIdle: PUBLIC PROC [agent: Agent, wait: BOOLEAN _ FALSE] RETURNS [idle: BOOLEAN] ~ { <> ENABLE UNWIND => NULL; WaitTillIdle: ENTRY PROC [ap: AgentPtr] RETURNS [] ~ { WHILE NOT ap.idle DO WAIT ap.sleeping; ENDLOOP; }; ap: AgentPtr _ GetAgentInfo[agent]; IF wait THEN { Process.Yield[]; -- give filtering agent a chance to run before declaring it idle WaitTillIdle[ap]; }; idle _ ap.idle; }; <<>> MonitorAgent: PUBLIC PROC [agent: Agent, proc: MonitorProc] RETURNS [] ~ { <> ap: AgentPtr _ GetAgentInfo[agent]; ap.mp _ proc; }; <<>> TerminateAgent: PUBLIC ENTRY PROC [agent: Agent] RETURNS [] ~ { <> ENABLE UNWIND => NULL; ap: AgentPtr _ GetAgentInfo[agent]; ap.terminated _ TRUE; NOTIFY ap.wakeup; }; <<>> FilteringAgent: PROC [ap: AgentPtr] RETURNS [] ~ { <> WHILE NOT ap.terminated DO gotSomething: BOOLEAN _ FALSE; <> FOR fL: Filters _ ap.filters, fL.rest WHILE fL#NIL DO ENABLE LoganBerry.Error => CONTINUE; FOR msg: LoganBerry.Entry _ LoganQuery.NextEntry[cursor: fL.first.cursor], LoganQuery.NextEntry[cursor: fL.first.cursor] WHILE msg#NIL DO <> msgID: ROPE _ LoganBerryEntry.GetAttr[entry: msg, type: $MsgID]; IF ap.mp = NIL OR ap.mp[msgID, TapMsgQueue.MsgFromEntry[msg], fL.first.filterID, fL.first.note] THEN [] _ AnnotateMsgI[ap.annotationDB, msgID, fL.first.filterID, fL.first.note]; gotSomething _ TRUE; ENDLOOP; ENDLOOP; <> FOR msg: Msg _ TapMsgQueue.Get[queue: ap.feederClone], TapMsgQueue.Get[queue: ap.feederClone] WHILE msg#NIL DO ENDLOOP; <<>> <> IF NOT gotSomething THEN Sleep[ap]; ENDLOOP; }; Sleep: ENTRY PROC[ap: AgentPtr] ~ TRUSTED { <> ENABLE UNWIND => NULL; Process.SetTimeout[condition: @ap.wakeup, ticks: Process.SecondsToTicks[agentSleepTimeout]]; ap.idle _ TRUE; BROADCAST ap.sleeping; WAIT ap.wakeup; ap.idle _ FALSE; }; <> <. An "annotation" is an arbitrary list of type: value pairs. Several entries may exist in the database for each user.>> AddFilter: PUBLIC PROC [filterDB: ROPE, user: ROPE, filterName: ROPE, query: Query, annot: Annotation, agent: Agent _ NIL] RETURNS [filterID: ROPE] ~ { <> ENABLE LoganBerry.Error => ERROR Error[ec, explanation]; db: LoganBerry.OpenDB _ LoganBerry.Open[dbName: filterDB]; entry: LoganBerry.Entry; <> [] _ LoganQuery.ReadAttributePatterns[IO.RIS[query] ! LoganQuery.SyntaxError => ERROR Error[$BadFilter, IO.PutFR["Bad query specification: %g.", IO.rope[query]]]]; <> entry _ annot; entry _ CONS[[$Query, query], entry]; entry _ CONS[[$Name, filterName], entry]; entry _ CONS[[$User, user], entry]; filterID _ Rope.Cat[user, "$", filterName]; entry _ CONS[[$FID, filterID], entry]; LoganBerry.WriteEntry[db: db, entry: entry]; <> IF agent # NIL THEN { ap: AgentPtr _ GetAgentInfo[agent]; filter: Filter _ NEW[FilterRec _ [filterID: filterID, query: query, note: annot]]; [ap.feederClone, filter.feeder] _ TapMsgQueue.Duplicate[queue: ap.feederClone]; filter.cursor _ SetUpQuery[filter.query, filter.feeder]; ap.filters _ CONS[filter, ap.filters]; }; }; <<>> DeleteFilter: PUBLIC PROC [filterDB: ROPE, filterID: ROPE, agent: Agent _ NIL] RETURNS [] ~ { <> ENABLE LoganBerry.Error => ERROR Error[ec, explanation]; newList, val: Filters; db: LoganBerry.OpenDB; newList _ NIL; val _ NIL; db _ LoganBerry.Open[dbName: filterDB]; <> [] _ LookupFilter[filterDB, filterID ! LoganBerry.Error => ERROR Error[$NonExistentFilter, "Cannot delete non-existent filter"] ]; <> [] _ LoganBerry.DeleteEntry[db: db, key: $FID, value: filterID]; <> IF agent # NIL THEN { ap: AgentPtr _ GetAgentInfo[agent]; <> UNTIL ap.filters = NIL DO IF ap.filters.first.filterID ~= filterID THEN {IF val = NIL THEN {val _ CONS[ap.filters.first, NIL]; newList _ val} ELSE {newList.rest _ CONS[ap.filters.first, newList.rest]; newList _ newList.rest} }; ap.filters _ ap.filters.rest; ENDLOOP; ap.filters _ val; }; }; LookupFilter: PUBLIC PROC [filterDB: ROPE, filterID: ROPE] RETURNS [filterName, user: ROPE, query: Query, annot: Annotation] ~ { <> ENABLE LoganBerry.Error => ERROR Error[ec, explanation]; db: LoganBerry.OpenDB; entry: LoganBerry.Entry; fid, name: ROPE; others: BOOLEAN; note: Annotation; <> db _ LoganBerry.Open[dbName: filterDB]; [entry, others] _ LoganBerry.ReadEntry[db: db, key: $FID, value: filterID]; [fid, user, name, query, note] _ ExtractFilterInfo[entry]; IF fid = NIL THEN { -- no filter in database! name _ NIL; }; RETURN[name, user, query, note]; }; LookupAllFilters: PUBLIC PROC [filterDB: ROPE] RETURNS [LIST OF TapFilter.FilterInfo] ~ { ENABLE LoganBerry.Error => ERROR Error[ec, explanation]; db: LoganBerry.OpenDB; cursor: LoganBerry.Cursor; fid, name, user: ROPE; query: Query; note: Annotation; stuff: LIST OF TapFilter.FilterInfo _ NIL; <> db _ LoganBerry.Open[dbName: filterDB]; cursor _ LoganBerry.GenerateEntries[db: db, key: $FID, start: NIL, end: NIL]; FOR entry: LoganBerry.Entry _ LoganBerry.NextEntry[cursor: cursor], LoganBerry.NextEntry[cursor: cursor] WHILE entry # NIL DO [fid, user, name, query, note] _ ExtractFilterInfo[entry]; stuff _ CONS[[name, user, query, note], stuff]; ENDLOOP; LoganBerry.EndGenerate[cursor: cursor ! LoganBerry.Error => CONTINUE]; RETURN[stuff]; }; ExistsFilter: PUBLIC PROC [filterDB: ROPE, filterID: ROPE] RETURNS [BOOLEAN] ~ { db: LoganBerry.OpenDB; entry: LoganBerry.Entry; others: BOOLEAN; <> db _ LoganBerry.Open[dbName: filterDB]; [entry, others] _ LoganBerry.ReadEntry[db: db, key: $FID, value: filterID]; IF entry # NIL THEN RETURN[TRUE] ELSE RETURN[FALSE]; }; <<>> ExtractFilterInfo: PROC [entry: LoganBerry.Entry] RETURNS [fid, user, name, query: ROPE _ NIL, note: Annotation _ NIL] ~ { FOR aL: LoganBerry.Entry _ entry, aL.rest WHILE aL#NIL DO SELECT aL.first.type FROM $FID => fid _ aL.first.value; $User => user _ aL.first.value; $Name => name _ aL.first.value; $Query => { query _ aL.first.value; note _ aL.rest; EXIT; }; ENDCASE => NULL; -- unexpected attribute? ENDLOOP; }; <> <. Any number of annotations may exist for each message. Only a single entry may exist in the database for each msgID.>> <<>> AnnotateMsg: PUBLIC PROC [annotDB: ROPE, msgID: ROPE, filterID: ROPE, annot: Annotation] RETURNS [annotID: ROPE] ~ { <> ENABLE LoganBerry.Error => ERROR Error[ec, explanation]; db: LoganBerry.OpenDB _ LoganBerry.Open[dbName: annotDB]; annotID _ AnnotateMsgI[db, msgID, filterID, annot]; }; <<>> AnnotateMsgI: PROC [db: LoganBerry.OpenDB, msgID: ROPE, filterID: ROPE, annot: Annotation] RETURNS [annotID: ROPE] ~ { <> existing, new: LoganBerry.Entry; existing _ LoganBerry.ReadEntry[db: db, key: $AID, value: msgID ! LoganBerry.Error => {existing _ NIL; CONTINUE}].entry; new _ IF existing = NIL THEN LIST[[$AID, msgID]] ELSE existing; new _ LoganBerryEntry.AppendEntries[new, LIST[[$FID, filterID]]]; new _ LoganBerryEntry.AppendEntries[new, annot]; LoganBerry.WriteEntry[db: db, entry: new, replace: existing # NIL]; annotID _ msgID; }; <<>> GetAnnotations: PUBLIC PROC [annotDB: ROPE, msgID: ROPE] RETURNS [annot: Annotation] ~ { <> ENABLE LoganBerry.Error => ERROR Error[ec, explanation]; db: LoganBerry.OpenDB _ LoganBerry.Open[dbName: annotDB]; annot _ LoganBerry.ReadEntry[db: db, key: $AID, value: msgID].entry; }; <<>> <> ParseMsgIntoFields: PUBLIC PROC [msgtext: ROPE] RETURNS [msg: Msg] ~ { <> AppendToMsg: PROC [f: TapMsgQueue.MsgField] ~ { IF msg = NIL THEN {msg _ LIST[f]; msgTail _ msg} ELSE {msgTail.rest _ LIST[f]; msgTail _ msgTail.rest;}; }; ToLower: PROC [rope: ROPE] RETURNS [low: ROPE] ~ { Lower: Rope.TranslatorType = { <<[old: CHAR] RETURNS [new: CHAR]>> RETURN [Ascii.Lower[old]]; }; low _ Rope.Translate[base: rope, translator: Lower]; }; linestart, textstart, nextcolon, nextcr: INT _ -1; msgtextlength: INT _ Rope.Length[msgtext]; msgTail: LIST OF TapMsgQueue.MsgField _ NIL; msg _ NIL; linestart _ 0; WHILE linestart < msgtextlength DO -- for each line in message leadingWhitespace: BOOLEAN _ Rope.Fetch[base: msgtext, index: linestart] IN [0C .. ' ]; nextcolon _ Rope.SkipTo[s: msgtext, pos: linestart, skip: ":"]; nextcr _ Rope.SkipTo[s: msgtext, pos: linestart, skip: "\n\l\r"]; IF nextcolon >= msgtextlength OR nextcr >= msgtextlength THEN -- hit end of msg text nextcr _ msgtextlength - 1; -- pretend next CR is at msg end IF nextcolon < nextcr AND NOT leadingWhitespace THEN { -- found field valuestart: INT; IF textstart # -1 THEN { -- output previous text segment AppendToMsg[[type: $text, value: Rope.Substr[base: msgtext, start: textstart, len: linestart-textstart]]]; textstart _ -1; }; valuestart _ Rope.SkipOver[s: msgtext, pos: nextcolon+1, skip: " \t"]; -- skip spaces AppendToMsg[[type: Atom.MakeAtom[ToLower[Rope.Substr[base: msgtext, start: linestart, len: nextcolon-linestart]]], value: Rope.Substr[base: msgtext, start: valuestart, len: nextcr-valuestart]]]; } ELSE { -- another line of text IF textstart = -1 THEN textstart _ linestart -- start new text segment ELSE NULL; -- continuation of previous text segment }; linestart _ nextcr + 1; ENDLOOP; IF textstart # -1 THEN { -- output last text segment AppendToMsg[[type: $text, value: Rope.Substr[base: msgtext, start: textstart, len: msgtextlength-textstart]]]; }; }; END.