ThPartySupervisorImpl.mesa
Last modified by D. Swinehart, December 28, 1983 9:30 pm
DIRECTORY
Process USING [ Detach, EnableAborts, SecondsToTicks, SetTimeout ],
Log USING [ ProblemBool, SLOG ],
RPC USING [ CallFailed ],
SafeStorage USING [ GetCanonicalType, Type ],
ThParty USING [ Deregister ],
ThPartyMonitorImpl,
ThPartyPrivate USING [
CFRef, ConversationBody, ConversationData, DestroyConversation, Distribute, DistributionProc, GetEvent, PartyBody, PartyData, recordingRAtom, Verify ],
Thrush USING [ ConversationHandle, ConvEvent, Credentials, Disposition, EventSequence, EventSequenceBody, H, HandleFault, KillHandle, MakeUnique, NB, Priorities, SHHH, StateID, ThHandle ],
Triples USING [ Any, Erase, Foreach, ForeachProc, Select ]
;
ThPartySupervisorImpl: CEDAR MONITOR LOCKS root
IMPORTS
Log,
Process,
root: ThPartyMonitorImpl,
RPC,
SafeStorage,
--SpyLog,--
Thrush,
ThParty,
ThPartyPrivate,
Triples
EXPORTS ThPartyPrivate
SHARES ThPartyMonitorImpl = {
Copies, Obligatory Concrete
CFRef: TYPE = ThPartyPrivate.CFRef;
ConversationData: TYPE = ThPartyPrivate.ConversationData;
RTConvType: SafeStorage.Type = SafeStorage.GetCanonicalType[CODE[ThPartyPrivate.ConversationBody]];
ConvEvent: TYPE = Thrush.ConvEvent;
Credentials: TYPE = Thrush.Credentials;
H: PROC[r: REF] RETURNS [Thrush.ThHandle] = INLINE { RETURN[LOOPHOLE[Thrush.H[r]]]; };
NB: TYPE = Thrush.NB;
PartyData: TYPE = ThPartyPrivate.PartyData;
RTPartyType: SafeStorage.Type = SafeStorage.GetCanonicalType[CODE[ThPartyPrivate.PartyBody]];
StateID: TYPE = Thrush.StateID;
Types
maxSpvrInterval: NAT ← 10;
Supervision Process . . . One per party
Supervisor: PUBLIC PROC[ party: PartyData ] = {
event: ConvEvent;
toDo, failing, latest: BOOL�LSE;
informationOnly: BOOLEANFALSE;
Inform: ThPartyPrivate.DistributionProc = {
PROC[smarts: SmartsData ] RETURNS [ d: Thrush.Disposition ];
ENABLE RPC.CallFailed => {
smarts.canProgress←FALSE;
ThParty.Deregister[smartsID: H[smarts]];
RETRY; };
yourParty: BOOL = event.credentials.partyID=H[party];
d←pass;
IF ~smarts.canProgress THEN RETURN;
IF event.keyTable=NIL AND event.intervalSpec=NIL AND
(NOT yourParty) AND (event.state#initiating) THEN RETURN; -- don't report uninteresting events
SELECT (d ← smarts.interface.Progress[shh: smarts.shh, smartsID: H[smarts],
informationOnly: informationOnly, yourParty: yourParty,
event: event, latestEvent: latest ]) FROM
actedAndPass => informationOnly←TRUE;
willAlwaysPassThisRequest => smarts.canProgress←FALSE;
ENDCASE;
};
TRUSTED {
Process.EnableAborts[@party.actionNeeded];
Process.SetTimeout[@party.actionNeeded, Process.SecondsToTicks[maxSpvrInterval]];
};
DO
ENABLE ABORTED => { party.partyFailed ← TRUE; CONTINUE; };
[ event, toDo, latest ] ← ScanPostOrWait[party];
Have discovered a change in state that our Smarts should know about. The Smarts will now learn about it, even if the state changes again before they can act on the change.
informationOnly ← FALSE;
IF toDo THEN []←ThPartyPrivate.Distribute[party: party, proc: Inform]
ELSE IF failing THEN EXIT
ELSE IF party.partyFailed THEN failing←TRUE -- one more pass to clean up!
ELSE IF party.supervisor=NIL THEN RETURN; -- vanish to keep local frames to minimum.
ENDLOOP;
EndParty[party];
};
Supervise: PUBLIC INTERNAL PROC[party: PartyData] = TRUSTED {
IF party.supervisor=NIL THEN
Process.Detach[party.supervisor ← FORK Supervisor[party]];
NOTIFY party.actionNeeded; -- Spurious if process just spawned?
};
ScanPostOrWait: ENTRY PROC[ party: PartyData ]
RETURNS [ event: ConvEvent, toDo: BOOLFALSE, latest: BOOL ] = {
ENABLE UNWIND => NULL;
Scanner: INTERNAL Triples.ForeachProc = TRUSTED {
PROC[trip: TripleRec] RETURNS [continue: BOOLEAN←TRUE]
conv: ConversationData;
WITH trip.att SELECT FROM
r: CFRef => {
cfRef: CFRef←NIL;
conv ← NARROW[trip.obj];
IF r.lastNotedID<conv.currentStateID THEN cfRef ← r
ELSE IF r.event.state=idle THEN Dissolve[r, conv, party];
IF cfRef#NIL THEN {
cfRef.lastNotedID ← cfRef.lastNotedID+1;
toDo←TRUE;
latest ←
cfRef.lastNotedID = conv.currentStateID OR cfRef.lastNotedID = cfRef.lastPostedID;
event ← ThPartyPrivate.GetEvent[conv, cfRef.lastNotedID];
Log.SLOG[];
RETURN[FALSE];
};
};
ENDCASE;
};
Triples.Foreach[Triples.Any, Triples.Any, party, Scanner];
IF ~toDo AND ~party.partyFailed THEN
IF party.numConvs=0 THEN party.supervisor←NIL ELSE WAIT party.actionNeeded;
};
Distribution utility. Basic Party distribution function << At last!! >>
Distribute: PUBLIC PROC[ party: PartyData, proc: ThPartyPrivate.DistributionProc ]
RETURNS [ d: Thrush.Disposition←pass ] = {
priorities: Thrush.Priorities;
IF party = NIL THEN RETURN[pass];
priorities ← NARROW[Triples.Select[$Priorities, party, -- priorities -- ]];
Perform distribution in order of this Party's priorities
FOR priorities: Thrush.Priorities ← NARROW[Triples.Select[$Priorities, party, --priorities--]], priorities.rest
WHILE priorities#NIL DO
<<Flatten next priority level, for now>>
FOR subpriorities: Thrush.Priorities ← NARROW[priorities.first], subpriorities.rest
WHILE subpriorities#NIL DO
priority: ATOMNARROW[subpriorities.first];
DistOne: Triples.ForeachProc = {
d ← proc[smarts: NARROW[trip.val]];
SELECT d FROM
actedAndStop => RETURN[FALSE];
actedAndPass, pass, willAlwaysPassThisRequest => RETURN[TRUE];
ENDCASE=> RETURN[Log.ProblemBool[
remark: "Invalid distribute return code", where: $System, bool: TRUE]];
};
Distribute to all Smarts at this priority level.
Triples.Foreach[priority, party, Triples.Any, DistOne];
IF d=actedAndStop THEN RETURN; ENDLOOP; ENDLOOP; };
Dissolve: INTERNAL PROC[cfRef: CFRef, conv: ConversationData, party: PartyData] = {
If Post generates new work to do, stop and do it, then cycle again.
Post is probably the wrong word for this activity, now. But the work needs doing.
IF cfRef.event.state#idle THEN RETURN;
Triples.Erase[cfRef, conv, party];
conv.numParties ← conv.numParties-1;
party.numConvs ← party.numConvs-1;
IF conv.numParties=0 THEN ThPartyPrivate.DestroyConversation[conv];
When a trunk party leaves a conversation, it loses its identity (<< something wrong here? >>)
When a recording party leaves, it reverts to available.
TRUSTED { WITH p: party SELECT FROM
trunk => { p.outgoing ← NIL; Triples.Erase[$RnameForTrunk, party, Triples.Any]; };
recording => Thrush.MakeUnique[$RnameForParty, party, ThPartyPrivate.recordingRAtom];
ENDCASE;
};
};
EndParty: ENTRY PROC[party: PartyData] = {
Thrush.KillHandle[H[party], RTPartyType!Thrush.HandleFault=>CONTINUE];
Triples.Erase[Triples.Any, party, Triples.Any];
party.supervisor ← NIL;
};
GetHistory: PUBLIC ENTRY PROC[
shhh: Thrush.SHHH,
credentials: Credentials,
firstState: StateID,
lastState: StateID -- default: get latest
] RETURNS [ nb: Thrush.NB, events: Thrush.EventSequence←NIL ] = {
ENABLE UNWIND => NULL;
conv: ConversationData;
numStates: NAT;
[conv, , , nb] ← ThPartyPrivate.Verify[credentials];
IF nb#success OR conv=NIL OR conv.currentStateID=0 THEN RETURN;
IF lastState=0 OR lastState>conv.currentStateID THEN lastState𡤌onv.currentStateID;
SELECT firstState FROM
<1 => firstState𡤁
>conv.currentStateID => RETURN;
ENDCASE;
IF lastState<firstState THEN RETURN;
numStates ← lastState-firstState+1;
events←NEW[Thrush.EventSequenceBody[numStates]];
FOR i: NAT IN [0..numStates) DO events[i] ← conv.log[i+firstState]; ENDLOOP; };
}.