DIRECTORY Process USING [ Detach, EnableAborts, SecondsToTicks, SetTimeout ], Log USING [ ProblemBool, SLOG ], Nice, RPC USING [ CallFailed ], SafeStorage USING [ GetCanonicalType, Type ], ThParty USING [ Deregister ], ThPartyMonitorImpl, ThPartyPrivate USING [ CFRef, ConversationBody, ConversationData, DestroyConversation, Distribute, DistributionProc, GetEvent, PartyBody, PartyData, recordingRAtom, SmartsData, Verify ], Thrush USING [ ConversationHandle, ConvEvent, Credentials, Disposition, EventSequence, EventSequenceBody, H, HandleFault, KillHandle, MakeUnique, NB, Priorities, SHHH, StateID, ThHandle ], Triples USING [ Any, Erase, Foreach, ForeachProc, Select ] ; ThPartySupervisorImpl: CEDAR MONITOR LOCKS root IMPORTS Log, Nice, Process, root: ThPartyMonitorImpl, RPC, SafeStorage, --SpyLog,-- Thrush, ThParty, ThPartyPrivate, Triples EXPORTS ThPartyPrivate SHARES ThPartyMonitorImpl = { CFRef: TYPE = ThPartyPrivate.CFRef; ConversationData: TYPE = ThPartyPrivate.ConversationData; RTConvType: SafeStorage.Type = SafeStorage.GetCanonicalType[CODE[ThPartyPrivate.ConversationBody]]; ConvEvent: TYPE = Thrush.ConvEvent; Credentials: TYPE = Thrush.Credentials; H: PROC[r: REF] RETURNS [Thrush.ThHandle] = INLINE { RETURN[LOOPHOLE[Thrush.H[r]]]; }; NB: TYPE = Thrush.NB; PartyData: TYPE = ThPartyPrivate.PartyData; RTPartyType: SafeStorage.Type = SafeStorage.GetCanonicalType[CODE[ThPartyPrivate.PartyBody]]; StateID: TYPE = Thrush.StateID; PD: TYPE = RECORD [ maxSpvrInterval: NAT _ 10, maxNotifyWait: NAT _ 5, wdtActive: BOOL_TRUE ]; pd: REF PD _ NEW[PD_[]]; Timer: TYPE = RECORD [ done: BOOL_FALSE, doneCond: CONDITION, d: Thrush.Disposition_pass ]; IDone: ENTRY PROC[timer: REF Timer] = { NOTIFY timer.doneCond; }; Informer: PROC[ smarts: ThPartyPrivate.SmartsData, event: ConvEvent, info, yours, last: BOOL, timer: REF Timer] = TRUSTED { ENABLE RPC.CallFailed => { smarts.canProgress_FALSE; IDone[timer]; ThParty.Deregister[smartsID: H[smarts]]; CONTINUE; }; SELECT (timer.d _ smarts.interface.Progress[shh: smarts.shh, smartsID: H[smarts], informationOnly: info, yourParty: yours, event: event, latestEvent: last ]) FROM willAlwaysPassThisRequest => smarts.canProgress_FALSE; ENDCASE; timer.done_TRUE; IDone[timer]; }; Supervisor: PUBLIC PROC[ party: PartyData ] = { event: ConvEvent; toDo, failing, latest: BOOL_FALSE; informationOnly: BOOLEAN_FALSE; Inform: ThPartyPrivate.DistributionProc = { yourParty: BOOL = event.credentials.partyID=H[party]; d_pass; IF ~smarts.canProgress THEN RETURN; IF event.keyTable=NIL AND event.intervalSpec=NIL AND (NOT yourParty) AND (event.state#initiating) THEN RETURN; -- don't report uninteresting events IF ~pd.wdtActive OR ~smarts.remote THEN SELECT (d _ smarts.interface.Progress[shh: smarts.shh, smartsID: H[smarts], informationOnly: informationOnly, yourParty: yourParty, event: event, latestEvent: latest ]) FROM actedAndPass => informationOnly_TRUE; willAlwaysPassThisRequest => smarts.canProgress_FALSE; ENDCASE ELSE { timer: REF Timer _ NEW[Timer_[]]; IWait: ENTRY PROC RETURNS [done: BOOL] = TRUSTED { Process.EnableAborts[@timer.doneCond]; Process.SetTimeout[@timer.doneCond, Process.SecondsToTicks[pd.maxNotifyWait]]; Process.Detach[FORK Informer[smarts, event, informationOnly, yourParty, latest, timer]]; WAIT timer.doneCond; d_timer.d; IF d=actedAndPass THEN informationOnly _ TRUE; IF ~timer.done THEN smarts.canProgress _ FALSE; RETURN[timer.done]; }; IF ~IWait[] THEN ThParty.Deregister[smartsID: H[smarts]]; }; }; TRUSTED { Process.EnableAborts[@party.actionNeeded]; Process.SetTimeout[@party.actionNeeded, Process.SecondsToTicks[pd.maxSpvrInterval]]; }; DO ENABLE ABORTED => { party.partyFailed _ TRUE; CONTINUE; }; [ event, toDo, latest ] _ ScanPostOrWait[party]; informationOnly _ FALSE; IF toDo THEN []_ThPartyPrivate.Distribute[party: party, proc: Inform] ELSE IF failing THEN EXIT ELSE IF party.partyFailed THEN failing_TRUE -- one more pass to clean up! ELSE IF party.supervisor=NIL THEN RETURN; -- vanish to keep local frames to minimum. ENDLOOP; EndParty[party]; }; Supervise: PUBLIC INTERNAL PROC[party: PartyData] = TRUSTED { IF party.supervisor=NIL THEN Process.Detach[party.supervisor _ FORK Supervisor[party]]; NOTIFY party.actionNeeded; -- Spurious if process just spawned? }; ScanPostOrWait: ENTRY PROC[ party: PartyData ] RETURNS [ event: ConvEvent, toDo: BOOL_FALSE, latest: BOOL ] = { ENABLE UNWIND => NULL; Scanner: INTERNAL Triples.ForeachProc = TRUSTED { conv: ConversationData; WITH trip.att SELECT FROM r: CFRef => { cfRef: CFRef_NIL; conv _ NARROW[trip.obj]; IF r.lastNotedID RETURN[FALSE]; actedAndPass, pass, willAlwaysPassThisRequest => RETURN[TRUE]; ENDCASE=> RETURN[Log.ProblemBool[ remark: "Invalid distribute return code", where: $System, bool: TRUE]]; }; Triples.Foreach[priority, party, Triples.Any, DistOne]; IF d=actedAndStop THEN RETURN; ENDLOOP; ENDLOOP; }; Dissolve: INTERNAL PROC[cfRef: CFRef, conv: ConversationData, party: PartyData] = { IF cfRef.event.state#idle THEN RETURN; Triples.Erase[cfRef, conv, party]; conv.numParties _ conv.numParties-1; party.numConvs _ party.numConvs-1; IF conv.numParties=0 THEN ThPartyPrivate.DestroyConversation[conv]; TRUSTED { WITH p: party SELECT FROM trunk => { p.outgoing _ NIL; Triples.Erase[$RnameForTrunk, party, Triples.Any]; }; recording => Thrush.MakeUnique[$RnameForParty, party, ThPartyPrivate.recordingRAtom]; ENDCASE; }; }; EndParty: ENTRY PROC[party: PartyData] = { Thrush.KillHandle[H[party], RTPartyType!Thrush.HandleFault=>CONTINUE]; Triples.Erase[Triples.Any, party, Triples.Any]; party.supervisor _ NIL; }; GetHistory: PUBLIC ENTRY PROC[ shhh: Thrush.SHHH, credentials: Credentials, firstState: StateID, lastState: StateID -- default: get latest ] RETURNS [ nb: Thrush.NB, events: Thrush.EventSequence_NIL ] = { ENABLE UNWIND => NULL; conv: ConversationData; numStates: NAT; [conv, , , nb] _ ThPartyPrivate.Verify[credentials]; IF nb#success OR conv=NIL OR conv.currentStateID=0 THEN RETURN; IF lastState=0 OR lastState>conv.currentStateID THEN lastState_conv.currentStateID; SELECT firstState FROM <1 => firstState_1; >conv.currentStateID => RETURN; ENDCASE; IF lastState> Perform distribution in order of this Party's priorities <> Distribute to all Smarts at this priority level. If Post generates new work to do, stop and do it, then cycle again. Post is probably the wrong word for this activity, now. But the work needs doing. When a trunk party leaves a conversation, it loses its identity (<< something wrong here? >>) When a recording party leaves, it reverts to available. Ê ˜Jšœ™Jšœ8™8J˜šÏk ˜ Jšœœ6˜CJšœœœ˜ J˜Jšœœ˜J˜-Jšœœ˜Jšœ˜šœœ˜Jšœ£˜£—Jš œœ^œ'œœ˜¼Jšœœ-˜:J˜J˜—šœ œœ˜/š˜J˜J˜Jšœ˜J˜Jšœ˜J˜ JšÏc ˜ Jšœ˜J˜Jšœ˜Jšœ˜—Jšœ˜Jšœ˜J˜—J™™Jšœœ˜#šœœ#˜9Jšœ<œ#˜c—Jšœ œ˜#Jšœ œ˜'Jšœœœœœœœœ ˜VJšœœ œ˜šœ œ˜+Jšœ=œ˜]—J˜J˜—™J™šœœœ˜Jšœœ˜Jšœœ˜Jšœ œ˜J˜—Jš œœœœœ˜J˜—™'J™J™)šœœœ˜Jšœœœ œ˜D—Jš Ïnœœœœ œ˜AšŸœœ˜JšœHœ œ œ˜kšœœ˜Jšœœ˜Jšœ ˜ Jšœœ ˜(Jšœ˜ —šœAœ ˜QJšœ(˜(šœ#˜'Jšœ1œ˜7Jšœ˜——Jšœ œ˜Jšœ ˜ J˜—J˜šŸ œ œ˜/Jšœ˜Jšœ œ˜"Jšœœœ˜J˜šŸœ%˜+Jšœœ™šœœ˜Jšœ"œ˜:—Jšœž$˜?J˜J˜—šŸœœœ˜.Jšœœœ œ˜@Jšœœœ˜šœ œœ˜1Jšœœ ™6J˜šœ œ˜šœ ˜ Jšœ œ˜Jšœœ ˜Jšœ#œ ˜3Jšœœœ˜9šœœœ˜Jšœ(˜(Jšœœ˜ šœ˜Jšœ(œ(˜R—Jšœ9˜9Jšœœ˜ Jšœœ˜Jšœ˜—Jšœ˜—Jšœ˜—Jšœ˜—Jšœ:˜:šœœ˜$Jš œœœœœ˜K—Jšœ˜J˜—šœH™HšŸ œœœ;˜RJšœ#˜*J˜Jšœ œœœ˜!Jšœ œ$žœ˜KJšœ8™8š œ!œ$žœœ œ˜‡J™(š œ$œ'œœ˜nJšœ œœ˜-šŸœ˜ Jšœœ ˜#šœ˜ Jšœœ˜Jšœ1 œ˜>šœœ˜!Jšœ@œ˜G——Jšœ˜—Jšœ0™0Jšœ7˜7Jš œœœœœ˜3——J˜—J˜—šŸœœœ<˜SJ™CJ™RJšœœœ˜&J˜"J˜$J˜"Jšœœ*˜CJ™]J™7šœœ œ˜#Jšœœ7˜RJ˜UJšœ˜J˜—Jšœ˜—J˜šŸœœœ˜*Jšœœ)œ˜FJ˜/Jšœœ˜Jšœ˜J˜—šŸ œœœœ˜Jšœ œ˜Jšœ˜J˜Jšœž˜)Jšœœœœ˜AJšœœœ˜J˜Jšœ œ˜Jšœ4˜4Jš œ œœœœœ˜?Jšœ œœ˜Sšœ ˜J˜Jšœœ˜Jšœ˜—Jšœœœ˜$J˜#Jšœœ&˜0Jš œœœœ%œ˜O—J˜J˜—J˜—…—š+