<> <> DIRECTORY Process USING [ Detach, EnableAborts, SecondsToTicks, SetTimeout ], Log USING [ ProblemBool, SLOG ], RPC USING [ CallFailed ], SafeStorage USING [ GetCanonicalType, Type ], ThParty USING [ Deregister ], ThPartyMonitorImpl, ThPartyPrivate USING [ CFRef, ConversationBody, ConversationData, DestroyConversation, Distribute, DistributionProc, GetEvent, PartyBody, PartyData, recordingRAtom, Verify ], Thrush USING [ ConversationHandle, ConvEvent, Credentials, Disposition, EventSequence, EventSequenceBody, H, HandleFault, KillHandle, MakeUnique, NB, Priorities, SHHH, StateID, ThHandle ], Triples USING [ Any, Erase, Foreach, ForeachProc, Select ] ; ThPartySupervisorImpl: CEDAR MONITOR LOCKS root IMPORTS Log, Process, root: ThPartyMonitorImpl, RPC, SafeStorage, --SpyLog,-- Thrush, ThParty, ThPartyPrivate, Triples EXPORTS ThPartyPrivate SHARES ThPartyMonitorImpl = { <> <<>> CFRef: TYPE = ThPartyPrivate.CFRef; ConversationData: TYPE = ThPartyPrivate.ConversationData; RTConvType: SafeStorage.Type = SafeStorage.GetCanonicalType[CODE[ThPartyPrivate.ConversationBody]]; ConvEvent: TYPE = Thrush.ConvEvent; Credentials: TYPE = Thrush.Credentials; H: PROC[r: REF] RETURNS [Thrush.ThHandle] = INLINE { RETURN[LOOPHOLE[Thrush.H[r]]]; }; NB: TYPE = Thrush.NB; PartyData: TYPE = ThPartyPrivate.PartyData; RTPartyType: SafeStorage.Type = SafeStorage.GetCanonicalType[CODE[ThPartyPrivate.PartyBody]]; StateID: TYPE = Thrush.StateID; <> <<>> maxSpvrInterval: NAT _ 10; <> <<>> Supervisor: PUBLIC PROC[ party: PartyData ] = { event: ConvEvent; toDo, failing, latest: BOOL_FALSE; informationOnly: BOOLEAN_FALSE; Inform: ThPartyPrivate.DistributionProc = { <> ENABLE RPC.CallFailed => { smarts.canProgress_FALSE; ThParty.Deregister[smartsID: H[smarts]]; RETRY; }; yourParty: BOOL = event.credentials.partyID=H[party]; d_pass; IF ~smarts.canProgress THEN RETURN; IF event.keyTable=NIL AND event.intervalSpec=NIL AND (NOT yourParty) AND (event.state#initiating) THEN RETURN; -- don't report uninteresting events SELECT (d _ smarts.interface.Progress[shh: smarts.shh, smartsID: H[smarts], informationOnly: informationOnly, yourParty: yourParty, event: event, latestEvent: latest ]) FROM actedAndPass => informationOnly_TRUE; willAlwaysPassThisRequest => smarts.canProgress_FALSE; ENDCASE; }; TRUSTED { Process.EnableAborts[@party.actionNeeded]; Process.SetTimeout[@party.actionNeeded, Process.SecondsToTicks[maxSpvrInterval]]; }; DO ENABLE ABORTED => { party.partyFailed _ TRUE; CONTINUE; }; [ event, toDo, latest ] _ ScanPostOrWait[party]; <> informationOnly _ FALSE; IF toDo THEN []_ThPartyPrivate.Distribute[party: party, proc: Inform] ELSE IF failing THEN EXIT ELSE IF party.partyFailed THEN failing_TRUE -- one more pass to clean up! ELSE IF party.supervisor=NIL THEN RETURN; -- vanish to keep local frames to minimum. ENDLOOP; EndParty[party]; }; Supervise: PUBLIC INTERNAL PROC[party: PartyData] = TRUSTED { IF party.supervisor=NIL THEN Process.Detach[party.supervisor _ FORK Supervisor[party]]; NOTIFY party.actionNeeded; -- Spurious if process just spawned? }; ScanPostOrWait: ENTRY PROC[ party: PartyData ] RETURNS [ event: ConvEvent, toDo: BOOL_FALSE, latest: BOOL ] = { ENABLE UNWIND => NULL; Scanner: INTERNAL Triples.ForeachProc = TRUSTED { <> conv: ConversationData; WITH trip.att SELECT FROM r: CFRef => { cfRef: CFRef_NIL; conv _ NARROW[trip.obj]; IF r.lastNotedID>>> Distribute: PUBLIC PROC[ party: PartyData, proc: ThPartyPrivate.DistributionProc ] RETURNS [ d: Thrush.Disposition_pass ] = { priorities: Thrush.Priorities; IF party = NIL THEN RETURN[pass]; priorities _ NARROW[Triples.Select[$Priorities, party, -- priorities -- ]]; <> FOR priorities: Thrush.Priorities _ NARROW[Triples.Select[$Priorities, party, --priorities--]], priorities.rest WHILE priorities#NIL DO <<<>>> FOR subpriorities: Thrush.Priorities _ NARROW[priorities.first], subpriorities.rest WHILE subpriorities#NIL DO priority: ATOM _ NARROW[subpriorities.first]; DistOne: Triples.ForeachProc = { d _ proc[smarts: NARROW[trip.val]]; SELECT d FROM actedAndStop => RETURN[FALSE]; actedAndPass, pass, willAlwaysPassThisRequest => RETURN[TRUE]; ENDCASE=> RETURN[Log.ProblemBool[ remark: "Invalid distribute return code", where: $System, bool: TRUE]]; }; <> Triples.Foreach[priority, party, Triples.Any, DistOne]; IF d=actedAndStop THEN RETURN; ENDLOOP; ENDLOOP; }; Dissolve: INTERNAL PROC[cfRef: CFRef, conv: ConversationData, party: PartyData] = { <> <> IF cfRef.event.state#idle THEN RETURN; Triples.Erase[cfRef, conv, party]; conv.numParties _ conv.numParties-1; party.numConvs _ party.numConvs-1; IF conv.numParties=0 THEN ThPartyPrivate.DestroyConversation[conv]; <>)>> <> TRUSTED { WITH p: party SELECT FROM trunk => { p.outgoing _ NIL; Triples.Erase[$RnameForTrunk, party, Triples.Any]; }; recording => Thrush.MakeUnique[$RnameForParty, party, ThPartyPrivate.recordingRAtom]; ENDCASE; }; }; EndParty: ENTRY PROC[party: PartyData] = { Thrush.KillHandle[H[party], RTPartyType!Thrush.HandleFault=>CONTINUE]; Triples.Erase[Triples.Any, party, Triples.Any]; party.supervisor _ NIL; }; GetHistory: PUBLIC ENTRY PROC[ shhh: Thrush.SHHH, credentials: Credentials, firstState: StateID, lastState: StateID -- default: get latest ] RETURNS [ nb: Thrush.NB, events: Thrush.EventSequence_NIL ] = { ENABLE UNWIND => NULL; conv: ConversationData; numStates: NAT; [conv, , , nb] _ ThPartyPrivate.Verify[credentials]; IF nb#success OR conv=NIL OR conv.currentStateID=0 THEN RETURN; IF lastState=0 OR lastState>conv.currentStateID THEN lastState_conv.currentStateID; SELECT firstState FROM <1 => firstState_1; >conv.currentStateID => RETURN; ENDCASE; IF lastState