<> <> <> <> <<>> DIRECTORY Process, Rope, SlackProcess, SlackProcessConcreteTypes, SlackProcessExtras, SlackProcessTypes, ViewerAbort, ViewerClasses; SlackProcessImpl: CEDAR MONITOR LOCKS handle USING handle: SlackHandle IMPORTS Process, ViewerAbort EXPORTS SlackProcess, SlackProcessExtras, SlackProcessTypes = BEGIN SlackHandle: TYPE = REF SlackHandleObj; SlackHandleObj: PUBLIC TYPE = SlackProcessConcreteTypes.SlackHandleObj; QueueEntryGenerator: TYPE = REF QueueEntryGeneratorObj; QueueEntryGeneratorObj: PUBLIC TYPE = SlackProcessConcreteTypes.QueueEntryGeneratorObj; <<>> ClientDatas: TYPE = REF ClientDatasData; ClientDatasData: TYPE = SlackProcessConcreteTypes.ClientDatasData; OptimizeHints: TYPE = REF OptimizeHintsData; OptimizeHintsData: TYPE = SlackProcessConcreteTypes.OptimizeHintsData; Actions: TYPE = REF ActionsData; ActionsData: TYPE = SlackProcessConcreteTypes.ActionsData; ActionProcs: TYPE = REF ActionProcsData; ActionProcsData: TYPE = SlackProcessConcreteTypes.ActionProcsData; Log: TYPE = REF LogData; LogData: TYPE = SlackProcessConcreteTypes.LogData; Queue: TYPE = REF QueueData; QueueData: TYPE = SlackProcessConcreteTypes.QueueData; Abort: TYPE = REF AbortData; AbortData: TYPE = SlackProcessConcreteTypes.AbortData; Bashed: TYPE = REF BashedData; BashedData: TYPE = SlackProcessConcreteTypes.BashedData; Events: TYPE = REF EventsData; EventsData: TYPE = SlackProcessConcreteTypes.EventsData; Received: TYPE = REF ReceivedData; ReceivedData: TYPE = SlackProcessConcreteTypes.ReceivedData; QueueEvents: TYPE = REF QueueEventsData; QueueEventsData: TYPE = SlackProcessConcreteTypes.QueueEventsData; ActionProc: TYPE = SlackProcess.ActionProc; AbortProc: TYPE = SlackProcess.AbortProc; LoggingProc: TYPE = SlackProcess.LoggingProc; OptimizeProc: TYPE = SlackProcess.OptimizeProc; Viewer: TYPE = ViewerClasses.Viewer; Problem: PUBLIC ERROR[msg: Rope.ROPE] = CODE; NoOptimization: OptimizeProc = { -- do all of the actions on the queue skipActions _ 0; }; Create: PUBLIC PROC [queueSize: NAT _ 50, logSize: NAT _ 50, optimizeProc: OptimizeProc, loggingProc: LoggingProc _ NIL, abortProc: AbortProc _ NIL, abortData: REF ANY _ NIL, abortViewer: ViewerClasses.Viewer _ NIL, priority: Process.Priority _ Process.priorityNormal] RETURNS [handle: SlackHandle] = { handle _ NEW[SlackHandleObj _ [slackProcess: NIL, queue: NEW[QueueData _ [] ], qeGen: NEW[QueueEntryGeneratorObj], log: NEW[LogData _ [] ], abort: NEW[AbortData _ [] ], priority: priority ]]; handle.queue.clientDatas _ NEW[ClientDatasData[queueSize]]; handle.queue.optimizeHints _ NEW[OptimizeHintsData[queueSize]]; handle.queue.actions _ NEW[ActionsData[queueSize]]; handle.queue.actionProcs _ NEW[ActionProcsData[queueSize]]; handle.queue.events _ NEW[QueueEventsData[queueSize]]; handle.priority _ priority; handle.queue.size _ queueSize; IF optimizeProc = NIL THEN handle.optimizeProc _ NoOptimization ELSE handle.optimizeProc _ optimizeProc; handle.log.received _ NEW[ReceivedData[logSize]]; handle.log.bashed _ NEW[BashedData[logSize]]; handle.log.events _ NEW[EventsData[logSize]]; handle.log.size _ logSize; handle.log.logger _ loggingProc; handle.abort^ _ [enabled: FALSE, viewer: abortViewer, proc: abortProc, data: abortData]; }; ChangePriority: PUBLIC ENTRY PROC [handle: SlackHandle, priority: Process.Priority] = { handle.priority _ priority; }; <<>> QueueOrTimeout: PUBLIC ENTRY PROC [handle: SlackHandle, callBack: ActionProc, inputAction: REF, clientData: REF, optimizeHint: REF, timeoutTicks: Process.Ticks, priority: Process.Priority _ LAST[CARD32]] RETURNS [success: BOOL _ TRUE] = { ENABLE UNWIND => NULL; queue: Queue _ handle.queue; qIndex: NAT _ 0; IF timeoutTicks = LAST[CARD32] THEN { WHILE (queue.tail+1) MOD queue.size = queue.head DO WAIT queue.notFull ENDLOOP; } ELSE { TRUSTED { Process.SetTimeout[@queue.timedNotFull, timeoutTicks] }; WHILE (queue.tail+1) MOD queue.size = queue.head DO WAIT queue.timedNotFull ENDLOOP; }; IF (queue.tail+1) MOD queue.size = queue.head THEN RETURN[FALSE]; -- timed out qIndex _ queue.tail; queue.clientDatas[qIndex] _ clientData; queue.optimizeHints[qIndex] _ optimizeHint; queue.actions[qIndex] _ inputAction; queue.actionProcs[qIndex] _ callBack; queue.events[qIndex].priority _ priority; queue.events[qIndex].readOnly _ FALSE; queue.tail _ (qIndex + 1) MOD queue.size; LogReceived[log: handle.log, inputAction: inputAction, bash: FALSE]; IF handle.slackProcess = NIL AND (NOT handle.pauseRequest) THEN Process.Detach[handle.slackProcess _ FORK Actor[handle]]; }; QueueAction: PUBLIC ENTRY PROC [handle: SlackHandle, callBack: ActionProc, inputAction: REF, clientData: REF, optimizeHint: REF, priority: Process.Priority _ LAST[CARD32]] = { <> ENABLE UNWIND => NULL; queue: Queue _ handle.queue; qIndex: NAT _ 0; WHILE (queue.tail+1) MOD queue.size = queue.head DO WAIT queue.notFull ENDLOOP; qIndex _ queue.tail; queue.clientDatas[qIndex] _ clientData; queue.optimizeHints[qIndex] _ optimizeHint; queue.actions[qIndex] _ inputAction; queue.actionProcs[qIndex] _ callBack; queue.events[qIndex].priority _ priority; queue.events[qIndex].readOnly _ FALSE; queue.tail _ (qIndex + 1) MOD queue.size; LogReceived[log: handle.log, inputAction: inputAction, bash: FALSE]; IF handle.slackProcess = NIL AND (NOT handle.pauseRequest) THEN Process.Detach[handle.slackProcess _ FORK Actor[handle]]; }; QueueAtHead: PUBLIC ENTRY PROC [handle: SlackHandle, callBack: ActionProc, inputAction: REF, clientData: REF, optimizeHint: REF, priority: Process.Priority _ LAST[CARD32]] = { <> ENABLE UNWIND => NULL; queue: Queue _ handle.queue; qIndex: NAT _ 0; WHILE (queue.tail+1) MOD queue.size = queue.head DO WAIT queue.notFull ENDLOOP; qIndex _ (queue.head + queue.size - 1) MOD queue.size; queue.clientDatas[qIndex] _ clientData; queue.optimizeHints[qIndex] _ optimizeHint; queue.actions[qIndex] _ inputAction; queue.actionProcs[qIndex] _ callBack; queue.events[qIndex].priority _ priority; queue.events[qIndex].readOnly _ FALSE; queue.tail _ (qIndex + 1) MOD queue.size; LogReceived[log: handle.log, inputAction: inputAction, bash: FALSE]; IF handle.slackProcess = NIL AND (NOT handle.pauseRequest) THEN Process.Detach[handle.slackProcess _ FORK Actor[handle]]; }; QueueReadOnly: PUBLIC ENTRY PROC [handle: SlackHandle, callBack: ActionProc, inputAction: REF, clientData: REF, optimizeHint: REF, priority: Process.Priority _ LAST[CARD32]] = { <> ENABLE UNWIND => NULL; queue: Queue _ handle.queue; qIndex: NAT _ 0; WHILE (queue.tail+1) MOD queue.size = queue.head DO WAIT queue.notFull ENDLOOP; qIndex _ queue.tail; queue.clientDatas[qIndex] _ clientData; queue.optimizeHints[qIndex] _ optimizeHint; queue.actions[qIndex] _ inputAction; queue.actionProcs[qIndex] _ callBack; queue.events[qIndex].priority _ priority; queue.events[qIndex].readOnly _ TRUE; queue.tail _ (qIndex + 1) MOD queue.size; LogReceived[log: handle.log, inputAction: inputAction, bash: FALSE]; IF handle.slackProcess = NIL AND (NOT handle.pauseRequest) THEN Process.Detach[handle.slackProcess _ FORK Actor[handle]]; }; <<>> Pause: PUBLIC ENTRY PROC [handle: SlackHandle] = { handle.pauseRequest _ TRUE; }; Continue: PUBLIC ENTRY PROC [handle: SlackHandle] = { handle.pauseRequest _ FALSE; IF NOT EmptyQ[handle.queue] AND handle.slackProcess = NIL THEN Process.Detach[handle.slackProcess _ FORK Actor[handle]]; }; PauseRequested: PUBLIC ENTRY PROC [handle: SlackHandle] RETURNS [BOOL] = { <> RETURN[handle.pauseRequest]; }; <<>> ProcessIsBusy: PUBLIC ENTRY PROC [handle: SlackHandle] RETURNS [BOOL] = { <> RETURN[handle.slackProcess # NIL]; }; <<>> WaitUntilPaused: PUBLIC PROC [handle: SlackHandle, timeoutTicks: Process.Ticks _ LAST[CARD32]] RETURNS [success: BOOL _ TRUE] = { <> <> IF timeoutTicks = LAST[CARD32] THEN { WaitForPause[handle]; success _ TRUE; } ELSE { success _ WaitForPauseOrTimeout[handle, timeoutTicks]; }; }; WaitUntilPausedOrIdle: PUBLIC PROC [handle: SlackHandle, timeoutTicks: Process.Ticks _ LAST[CARD32]] RETURNS [state: SlackProcessExtras.ProcessState] = { IF timeoutTicks = LAST[CARD32] THEN { state _ WaitForPauseOrIdle[handle]; } ELSE { state _ WaitForPauseOrIdleOrTimeout[handle, timeoutTicks]; }; }; WaitForPauseOrTimeout: ENTRY PROC [handle: SlackHandle, timeoutTicks: Process.Ticks] RETURNS [success: BOOL] = { ENABLE UNWIND => NULL; TRUSTED { Process.SetTimeout[@handle.timedPaused, timeoutTicks] }; WAIT handle.timedPaused; RETURN[handle.slackProcess = NIL AND handle.pauseRequest]; -- not quite right, since the process going idle will cause this routine to fail, even if the timeout hasn't occurred. }; WaitForPauseOrIdleOrTimeout: ENTRY PROC [handle: SlackHandle, timeoutTicks: Process.Ticks] RETURNS [state: SlackProcessExtras.ProcessState _ timeout] = { ENABLE UNWIND => NULL; TRUSTED { Process.SetTimeout[@handle.timedPaused, timeoutTicks] }; WAIT handle.timedPaused; IF handle.slackProcess # NIL THEN RETURN[timeout] ELSE { RETURN[IF handle.pauseRequest THEN paused ELSE idle]; }; }; WaitForPause: ENTRY PROC [handle: SlackHandle] = { ENABLE UNWIND => NULL; UNTIL handle.slackProcess = NIL AND handle.pauseRequest DO WAIT handle.paused ENDLOOP; }; WaitForPauseOrIdle: ENTRY PROC [handle: SlackHandle] RETURNS [state: SlackProcessExtras.ProcessState] = { ENABLE UNWIND => NULL; UNTIL handle.slackProcess = NIL DO WAIT handle.paused ENDLOOP; state _ IF handle.pauseRequest THEN paused ELSE idle; }; AllQueuedAreReadOnly: PUBLIC ENTRY PROC [handle: SlackHandle] RETURNS [BOOL] = { <> ENABLE UNWIND => NULL; queue: Queue _ handle.queue; FOR i: NAT _ queue.head, (i+1) MOD queue.size UNTIL i = queue.tail DO IF NOT queue.events[i].readOnly THEN RETURN[FALSE]; ENDLOOP; RETURN[TRUE]; }; Restart: PUBLIC ENTRY PROC [handle: SlackHandle] = TRUSTED { <> <> ENABLE UNWIND => NULL; Process.Detach[handle.slackProcess _ FORK Actor[handle] ]; }; QueueIsBusy: PUBLIC ENTRY PROC [handle: SlackHandle] RETURNS [BOOL] = { RETURN[(NOT EmptyQ[handle.queue]) OR handle.slackProcess # NIL]; }; FlushQueue: PUBLIC ENTRY PROC [handle: SlackHandle] = { ENABLE UNWIND => NULL; FlushQueueInternal[handle]; }; FlushQueueInternal: INTERNAL PROC [handle: SlackHandle] = { <> queueSize: NAT _ handle.queue.size; FOR i: NAT IN [0..queueSize-1] DO handle.queue.clientDatas[i] _ NIL; handle.queue.optimizeHints[i] _ NIL; handle.queue.actions[i] _ NIL; handle.queue.actionProcs[i] _ NIL; <> ENDLOOP; handle.queue.head _ 0; handle.queue.tail _ 0; BROADCAST handle.queue.notFull; BROADCAST handle.queue.timedNotFull; }; <> Actor: PROC [handle: SlackHandle] = { -- runs in autonomous process(es) to service queue(s) IF handle.abort.enabled THEN { ActorWithAbort: PROC = { DoActorsJob[handle]; }; ViewerAbort.CallWithAbortAndCallback[handle.abort.viewer, ActorWithAbort, CallMe, handle]; } ELSE DoActorsJob[handle]; }; CallMe: PROC [data: REF] = { -- IMPORTANT: called at high priority. <> handle: SlackHandle _ NARROW[data]; NillAndKillIt[handle]; }; NillAndKillIt: PRIVATE ENTRY PROC [handle: SlackHandle] = { <> ENABLE UNWIND => NULL; handle.slackProcess _ NIL; FlushQueueInternal[handle]; -- must be in an ENTRY proc IF handle.abort.proc#NIL THEN handle.abort.proc[handle.abort.data]; -- notify the user that abort happened. Hold the lock. }; DoActorsJob: PROC [handle: SlackHandle] = { -- autonomous process(es) to service queue(s) ENABLE UNWIND => NULL; inputAction: REF; clientData: REF ANY; actionProc: ActionProc; priority: Process.Priority; [actionProc, inputAction, clientData, priority] _ NextAction[handle]; WHILE inputAction # NIL DO Process.SetPriority[priority]; actionProc[clientData, inputAction]; -- actually do the client's action [actionProc, inputAction, clientData, priority] _ NextAction[handle]; ENDLOOP; }; EmptyQ: INTERNAL PROC [queue: Queue] RETURNS [BOOL] = { RETURN[queue.tail = queue.head]; }; <<>> NextAction: ENTRY PROC [handle: SlackHandle] RETURNS [actionProc: ActionProc, inputAction: REF, clientData: REF, priority: Process.Priority] = { ENABLE UNWIND => NULL; queue: Queue _ handle.queue; success: BOOL _ TRUE; errorMsg: Rope.ROPE; skipActions: NAT; qeGen: QueueEntryGenerator; qeGen _ PrepareGenerator[handle]; skipActions _ handle.optimizeProc[qeGen, qeGen.actionsInQueue ! Problem => {success _ FALSE; errorMsg _ msg; CONTINUE}]; IF NOT success THEN RETURN WITH ERROR Problem[msg: errorMsg]; [actionProc, inputAction, clientData, priority] _ DeQueueAction[handle, skipActions]; IF inputAction = NIL THEN { handle.slackProcess _ NIL; -- no mouse action. Let process die. Caller will terminate. BROADCAST handle.paused; BROADCAST handle.timedPaused; }; priority _ IF priority = LAST[CARD32] THEN handle.priority ELSE priority; BROADCAST queue.notFull; BROADCAST queue.timedNotFull; }; InspectQueue: PUBLIC ENTRY PROC [handle: SlackHandle, inspectProc: SlackProcessExtras.InspectProc] = { qeGen: QueueEntryGenerator _ PrepareGenerator[handle]; inspectProc[qeGen, qeGen.actionsInQueue]; }; QueueSize: INTERNAL PROC [queue: Queue] RETURNS [count: NAT] = { count _ (queue.tail - queue.head + queue.size) MOD queue.size; }; PrepareGenerator: INTERNAL PROC [handle: SlackHandle] RETURNS [qeGen: QueueEntryGenerator] = { qeGen _ handle.qeGen; qeGen.queue _ handle.queue; qeGen.actionsInQueue _ QueueSize[handle.queue] }; GetQueueEntry: PUBLIC PROC [qeGen: QueueEntryGenerator, index: NAT] RETURNS [clientData: REF, optimizeHint: REF, inputAction: REF, priority: Process.Priority] = { queue: Queue _ qeGen.queue; head: NAT _ queue.head; realIndex: NAT; IF index + 1 > qeGen.actionsInQueue THEN ERROR Problem[msg: "GetQueueEntry called with index too high"]; realIndex _ (head + index) MOD queue.size; clientData _ queue.clientDatas[realIndex]; optimizeHint _ queue.optimizeHints[realIndex]; inputAction _ queue.actions[realIndex]; priority _ queue.events[realIndex].priority; }; DeQueueAction: INTERNAL PROC [handle: SlackHandle, skipActions: NAT] RETURNS [actionProc: ActionProc, inputAction: REF, clientData: REF, priority: Process.Priority] = { ENABLE UNWIND => NULL; queue: Queue _ handle.queue; queueSize, where: NAT; IF handle.pauseRequest THEN RETURN[NIL, NIL, NIL, 0]; -- don't do anything more for now queueSize _ QueueSize[handle.queue]; IF skipActions > queueSize THEN ERROR; IF skipActions = queueSize THEN RETURN[NIL, NIL, NIL, 0]; -- there is nothing left to do queue.head _ (queue.head + skipActions) MOD queue.size; where _ queue.head; actionProc _ queue.actionProcs[where]; inputAction _ queue.actions[where]; clientData _ queue.clientDatas[where]; priority _ queue.events[where].priority; queue.head _ (queue.head + 1) MOD queue.size; LogActed[handle.log, inputAction, clientData]; }; <> LogReceived: INTERNAL PROC [log: Log, inputAction: REF, bash: BOOL] = { where: NAT _ log.head; log.received[where] _ TRUE; log.bashed[where] _ bash; log.events[where] _ inputAction; log.head _ (log.head + 1) MOD log.size; }; LogActed: INTERNAL PROC [log: Log, inputAction: REF, clientData: REF] = { where: NAT _ log.head; log.received[where] _ FALSE; log.bashed[where] _ FALSE; log.events[where] _ inputAction; log.head _ (log.head + 1) MOD log.size; IF log.loggerEnabled THEN log.logger[clientData, inputAction]; }; <> EnableSessionLogging: PUBLIC ENTRY PROC [handle: SlackHandle] RETURNS [alreadyEnabled: BOOL _ FALSE] = { ENABLE UNWIND => NULL; IF handle.log.loggerEnabled THEN alreadyEnabled _ TRUE; handle.log.loggerEnabled _ TRUE; }; DisableSessionLogging: PUBLIC ENTRY PROC [handle: SlackHandle] RETURNS [alreadyDisabled: BOOL _ FALSE] = { ENABLE UNWIND => NULL; IF NOT handle.log.loggerEnabled THEN alreadyDisabled _ TRUE; handle.log.loggerEnabled _ FALSE; }; RegisterLogger: PUBLIC ENTRY PROC [handle: SlackHandle, loggingProc: LoggingProc] RETURNS [alreadyRegistered: BOOL _ FALSE] = { <> ENABLE UNWIND => NULL; IF handle.log.logger#NIL THEN alreadyRegistered _ TRUE; handle.log.logger _ loggingProc; }; <> EnableAborts: PUBLIC ENTRY PROC [handle: SlackHandle] RETURNS [alreadyEnabled: BOOL _ FALSE] = { ENABLE UNWIND => NULL; IF handle.abort.enabled THEN alreadyEnabled _ TRUE; handle.abort.enabled _ TRUE; }; DisableAborts: PUBLIC ENTRY PROC [handle: SlackHandle] RETURNS [alreadyDisabled: BOOL _ FALSE] = { ENABLE UNWIND => NULL; IF NOT handle.abort.enabled THEN alreadyDisabled _ TRUE; handle.abort.enabled _ FALSE; }; RegisterAbortProc: PUBLIC ENTRY PROC [handle: SlackHandle, abortProc: AbortProc _ NIL, abortData: REF ANY _ NIL, abortViewer: ViewerClasses.Viewer _ NIL] RETURNS [alreadyRegistered: BOOL _ FALSE] = { <> ENABLE UNWIND => NULL; IF handle.abort.proc#NIL THEN alreadyRegistered _ TRUE; handle.abort.proc _ abortProc; handle.abort.data _ abortData; handle.abort.viewer _ abortViewer; }; END.