<> <> <> <> <> <<>> DIRECTORY DB, FS USING [StreamOpen], IO USING [BreakProc, Close, GetChar, GetInt, GetTokenRope, IDProc, int, Put, PutChar, PutRope, RIS, RopeFromROS, ROS, SkipWhitespace, STREAM], Process USING[Detach, Pause, SecondsToTicks], Rope USING [Cat, Equal, Fetch, Length, ROPE], SymTab USING [Create, Fetch, Insert, Ref, Store], UserProfile USING [Token], WalnutSortDB, WalnutSortMail USING [Error]; WalnutSortDBImpl: CEDAR MONITOR IMPORTS DB, FS, IO, Process, Rope, SymTab, UserProfile, WalnutSortMail EXPORTS WalnutSortDB = BEGIN OPEN WalnutSortDB; ROPE: TYPE ~ Rope.ROPE; <> wsdSegment: DB.Segment ~ $WalnutSortDef; trigger: DB.Relation; kwDomain: DB.Domain; kwDomainProc: PROC[] RETURNS[type: DB.TypeCode] ~ { type _ DB.TypeForDomain[kwDomain] }; kwType: DB.TypeSpec = [ indirect[ kwDomainProc ] ]; trgKwAttribute: CARDINAL = 0; trgMSAttribute: CARDINAL = 1; trgPrAttribute: CARDINAL = 2; triggerRelationType: DB.FieldSpec = DB.L2FS[LIST[[name: "keyword", type: kwType, lengthHint: 50], [name: "msgSet", type: DB.String, lengthHint: 50], [name: "priority", type: DB.Integer]]]; triggerIndex: DB.Index; open: BOOLEAN _ FALSE; activity: BOOL _ FALSE; transOpened: CONDITION; transaction: DB.TransactionHandle; <> cachedTriggerSet: TriggerSet _ NIL; cacheValid: BOOLEAN _ FALSE; WatchDBActivity: PROC[] = { WHILE TRUE DO Process.Pause[Process.SecondsToTicks[3*60]]; CheckConnection[] ENDLOOP }; CheckConnection: ENTRY PROC[] = { ENABLE UNWIND => NULL; IF NOT activity THEN { CloseInternal[]; WAIT transOpened }; activity _ FALSE }; DeclareSegment: PUBLIC ENTRY PROC [user: ROPE] ~ { ENABLE UNWIND => NULL; DB.Initialize[]; CloseInternal[]; DB.DeclareSegment[ filePath: UserProfile.Token [ key: "WalnutSort.SegmentFile", default: Rope.Cat["[", UserProfile.Token["Alpine.Server", "Luther"], ".Alpine]<", user, ">WalnutSort.Segment"]], segment: wsdSegment]; <> cacheValid _ FALSE; CarefullyApply[OpenInternal] }; CarefullyApply: INTERNAL PROC [action: PROC] ~ { errorMsg: ROPE _ NIL; { count: INT _ 5; --Try twice for most errors, but five times for DB.Failure activity _ TRUE; WHILE count > 0 DO { OpenInternal[]; action[ ! DB.Aborted => IF count#1 THEN GOTO OneRetry ELSE GOTO SignalAborted; DB.Error => GOTO SignalError; DB.Failure => { errorMsg _ info; GOTO SignalFailure }; ]; EXIT EXITS OneRetry => { DB.AbortTransaction[transaction ! DB.Failure, DB.Error => CONTINUE]; open _ FALSE; transaction _ NIL; count _ MIN[count, 2] - 1; }; <> SignalError => { ERROR WalnutSortMail.Error["Database failure: caught DB.Error after two retries"]; }; SignalAborted => { ERROR WalnutSortMail.Error["Database failure: caught DB.Aborted after two retries"]; }; SignalFailure => { ERROR WalnutSortMail.Error[Rope.Cat["Database failure: caught DB.Failure after five retries, info: ", errorMsg]]; }; } ENDLOOP; }; }; ResetSchema: PROC[] ~ { kwDomain _ DB.DeclareDomain["keyword", wsdSegment]; trigger _ DB.DeclareRelation["trigger", wsdSegment, triggerRelationType]; triggerIndex _ DB.DeclareIndex[trigger, DB.L2F[LIST[trgPrAttribute, trgKwAttribute]]] }; OpenInternal: INTERNAL PROC ~ { --Should only be called from under CarefullyApply schemaInvalid: BOOL _ TRUE; IF open THEN RETURN; [transaction, schemaInvalid] _ DB.OpenTransaction[wsdSegment ! DB.Error => IF code=TransactionAlreadyOpen THEN CONTINUE ELSE REJECT]; IF schemaInvalid THEN ResetSchema[]; open _ TRUE; NOTIFY transOpened }; MarkInternal: INTERNAL PROC ~ { <> cacheValid _ FALSE; DB.MarkTransaction[transaction] }; CloseInternal: INTERNAL PROC ~ { IF ~open THEN RETURN; DB.CloseTransaction[transaction ! DB.Error, DB.Failure, DB.Aborted => CONTINUE]; open _ FALSE; transaction _ NIL }; Close: PUBLIC ENTRY PROC ~ { <> ENABLE UNWIND => NULL; CloseInternal[]; }; GetTriggerSetFromDB: PUBLIC ENTRY PROC RETURNS [t: TriggerSet] ~ { ENABLE UNWIND => NULL; MakeTriggerSetNil: PROC ~ { cachedTriggerSet _ NIL; cacheValid _ FALSE; t _ NIL; }; GetTriggerSetFromDBInternal: INTERNAL PROC ~ { rSet: DB.RelshipSet ~ DB.RelationSubset[r: trigger, index: triggerIndex, constraint: NIL]; r: DB.Relship; t _ NEW[TriggerSetRep _ [mapping: SymTab.Create[], clientData: NIL]]; UNTIL (r _ DB.NextRelship[rSet])=NIL DO key: ROPE ~ DB.EntityInfo[DB.V2E[DB.GetF[r, trgKwAttribute]]].name; msName: ROPE ~ DB.V2S[DB.GetF[r, trgMSAttribute]]; priority: INT ~ DB.V2I[DB.GetF[r, trgPrAttribute]]; IF NOT SymTab.Insert[x: t.mapping, key: key, val: NEW[Trigger _ [LIST[msName], priority]]] THEN { <> trig: REF Trigger _ NARROW[SymTab.Fetch[t.mapping, key].val]; SELECT TRUE FROM trig.priority = priority => trig.msNames _ CONS[msName, trig.msNames]; trig.priority < priority => [] _ SymTab.Store[x: t.mapping, key: key, val: NEW[Trigger _ [LIST[msName], priority]]]; ENDCASE => NULL; -- ignore items of lower priority than what we have already }; ENDLOOP; DB.ReleaseRelshipSet[rSet]; cachedTriggerSet _ t; cacheValid _ TRUE; CloseInternal[] }; IF cacheValid THEN RETURN [cachedTriggerSet]; CarefullyApply[GetTriggerSetFromDBInternal ! WalnutSortMail.Error => MakeTriggerSetNil[]]; }; AddTriggerInternal: INTERNAL PROC [msName, keyword: ROPE, priority: INTEGER _ 10] ~ { AddTriggerInternalInternal: INTERNAL PROC ~ { kw: DB.Entity ~ DB.DeclareEntity[kwDomain, keyword]; setForKw: DB.RelshipSet ~ DB.RelshipsWithEntityField[trigger, trgKwAttribute, kw]; FOR next: DB.Relship _ DB.NextRelship[setForKw], DB.NextRelship[setForKw] UNTIL next = NIL DO IF Rope.Equal[msName, DB.V2S[DB.GetF[next, trgMSAttribute]]] THEN { DB.SetF[next, trgPrAttribute, DB.I2V[priority]]; DB.ReleaseRelshipSet[setForKw]; RETURN } ENDLOOP; DB.ReleaseRelshipSet[setForKw]; [] _ DB.CreateRelship[trigger, DB.L2VS[LIST[DB.E2V[kw], DB.S2V[msName], DB.I2V[priority]]]]; }; CarefullyApply[AddTriggerInternalInternal]; }; AddTrigger: PUBLIC ENTRY PROC [msName, keyword: ROPE, priority: CARDINAL _ 10] ~ { ENABLE UNWIND => NULL; AddTriggerInternalInternal: INTERNAL PROC ~ { AddTriggerInternal[msName, keyword, priority]; MarkInternal[] }; CarefullyApply[AddTriggerInternalInternal]; }; DeleteTrigger: PUBLIC ENTRY PROC [msName, keyword: ROPE] ~ { ENABLE UNWIND => NULL; DeleteTriggerInternal: INTERNAL PROC ~ { kw: DB.Entity ~ DB.LookupEntity[kwDomain, keyword]; IF kw = NIL THEN RETURN; BEGIN setForKw: DB.RelshipSet ~ DB.RelshipsWithEntityField[trigger, trgKwAttribute, kw]; relshipToDestroy: DB.Relship; FOR next: DB.Relship _ DB.NextRelship[setForKw], DB.NextRelship[setForKw] UNTIL next = NIL DO IF Rope.Equal[msName, DB.V2S[DB.GetF[next, trgMSAttribute]]] THEN { relshipToDestroy _ next; RETURN } ENDLOOP; DB.ReleaseRelshipSet[setForKw]; IF relshipToDestroy = NIL THEN RETURN; DB.DestroyRelship[relshipToDestroy]; DB.MarkTransaction[transaction] END }; CarefullyApply[DeleteTriggerInternal]; }; DumpToStream: PUBLIC ENTRY PROC [s: IO.STREAM] ~ { ENABLE UNWIND => NULL; DumpToStreamInternal: INTERNAL PROC ~ { OpenInternal[]; { rels: DB.RelshipSet ~ DB.RelationSubset[r: trigger, index: triggerIndex, constraint: NIL]; rel: DB.Relship; WHILE (rel _ DB.NextRelship[rels]) # NIL DO keyword: ROPE ~ DB.EntityInfo[DB.V2E[DB.GetF[rel, trgKwAttribute]]].name; IO.Put[s, IO.int[DB.V2I[DB.GetF[rel, trgPrAttribute]]]]; IO.PutChar[s, '\t]; IO.PutRope[s, DB.V2S[DB.GetF[rel, trgMSAttribute]]]; IO.PutChar[s, '\t]; FOR index: INT IN [0 .. Rope.Length[keyword]) DO PutEncodedChar[s, Rope.Fetch[base: keyword, index: index]]; ENDLOOP; IO.PutChar[s, '\n]; ENDLOOP; DB.ReleaseRelshipSet[rels] }; CloseInternal[]; }; CarefullyApply[DumpToStreamInternal]; }; DumpToFile: PUBLIC PROC [fileName: ROPE] ~ { s: IO.STREAM _ FS.StreamOpen[ fileName: fileName, accessOptions: create ]; DumpToStream[s]; IO.Close[s]; }; LoadFromStream: PUBLIC ENTRY PROC [s: IO.STREAM] ~ { ENABLE UNWIND => NULL; LoadFromStreamInternal: INTERNAL PROC ~ { msgSet, keyword: ROPE; priority: INT; LineBreak: IO.BreakProc ~ {RETURN [IF char='\n THEN sepr ELSE other]}; transaction _ DB.EraseSegment[wsdSegment]; MarkInternal[]; ResetSchema[]; DO priority _ IO.GetInt[s ! ANY => EXIT]; msgSet _ IO.GetTokenRope[s, IO.IDProc ! ANY => EXIT].token; [] _ IO.SkipWhitespace[s, FALSE]; keyword _ IO.GetTokenRope[s, LineBreak ! ANY => EXIT].token; AddTriggerInternal[msgSet, DecodeRope[keyword], priority]; ENDLOOP; MarkInternal[]; }; CloseInternal[]; CarefullyApply[LoadFromStreamInternal]; }; LoadFromFile: PUBLIC PROC [fileName: ROPE] ~ { s: IO.STREAM _ FS.StreamOpen[ fileName: fileName, accessOptions: read ]; LoadFromStream[s]; IO.Close[s]; }; GetEncodedChar: PROC [s: IO.STREAM] RETURNS [c: CHAR] ~ { c _ IO.GetChar[s]; SELECT c FROM '\\ => { SELECT IO.GetChar[s] FROM 'n, 'N => RETURN ['\n]; '\\ => RETURN ['\\]; ENDCASE => ERROR; }; ENDCASE => RETURN [c]; }; PutEncodedChar: PROC [s: IO.STREAM, c: CHAR] ~ { SELECT c FROM '\n => IO.PutRope[s, "\\\n"]; '\\ => IO.PutRope[s, "\\\\"]; ENDCASE => IO.PutChar[s, c]; }; DecodeRope: PROC [in: ROPE] RETURNS [out: ROPE] ~ { outS: IO.STREAM _ IO.ROS[]; inS: IO.STREAM _ IO.RIS[in]; DO IO.PutChar[outS, GetEncodedChar[inS ! ANY => EXIT]]; ENDLOOP; out _ IO.RopeFromROS[outS]; }; InvalidateCache: PUBLIC ENTRY PROC ~ { ENABLE UNWIND => NULL; cacheValid _ FALSE }; TRUSTED { Process.Detach[FORK WatchDBActivity[]] }; END. <> <> <> <<>>