DIRECTORY DFOperations USING [BringOver, SModel, Verify], DFOperationsQueue USING [NotifierProc, OpTerminationInteraction, Request, RequestRecord], List USING [PutAssoc], Process USING [Detach, Priority, priorityBackground, SetPriority], ProcessProps USING [PushPropList]; OpsQueueImpl: CEDAR MONITOR LOCKS queue.LOCK USING queue: Queue IMPORTS DFOperations, List, Process, ProcessProps EXPORTS DFOperationsQueue = BEGIN OPEN OpsQ: DFOperationsQueue, Ops: DFOperations; Queue: TYPE = REF QueueObject; QueueObject: PUBLIC TYPE = MONITORED RECORD [ pending: LIST OF OpsQ.Request _ NIL, pendingTail: LIST OF OpsQ.Request _ NIL, priority: Process.Priority, abortPending: BOOL _ FALSE, idleNotifier: OpsQ.NotifierProc, clientData: REF ANY ]; Create: PUBLIC PROC [ priority: Process.Priority _ Process.priorityBackground, idleNotifier: OpsQ.NotifierProc _ NIL, clientData: REF ANY _ NIL] RETURNS [Queue] = { RETURN[NEW[QueueObject _ [ priority: priority, idleNotifier: idleNotifier, clientData: clientData ]]]; }; Empty: PUBLIC ENTRY PROC [queue: Queue] RETURNS [BOOL] = { RETURN[queue.pending = NIL] }; Abort: PUBLIC ENTRY PROC [queue: Queue] = { IF queue.pending ~= NIL THEN { queue.abortPending _ TRUE; }; }; Enqueue: PUBLIC ENTRY PROC [queue: Queue, request: OpsQ.Request] = { rL: LIST OF OpsQ.Request = CONS[request, NIL]; IF queue.pending ~= NIL THEN { queue.pendingTail.rest _ rL; queue.pendingTail _ rL; } ELSE { queue.pending _ queue.pendingTail _ rL; TRUSTED{Process.Detach[FORK Worker[queue]]}; }; }; Worker: PROC [queue: Queue] = { remaining: LIST OF OpsQ.Request; Process.SetPriority[queue.priority]; DO request: OpsQ.Request = queue.pending.first; WorkerInner: PROC = { errors, warnings, filesActedUpon: INT _ 0; ReportTermination: PROC = { [] _ request.interact[ NEW[OpsQ.OpTerminationInteraction _ [ op: request.op, dfFile: request.dfFile, filesActedUpon: filesActedUpon, errors: errors, warnings: warnings ]], request.clientData ]; }; WITH request^ SELECT FROM req: bringOver OpsQ.RequestRecord => { [errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] _ Ops.BringOver[ dfFile: req.dfFile, filter: req.filter, action: req.action, interact: req.interact, clientData: req.clientData, log: req.log ]; ReportTermination[]; }; req: sModel OpsQ.RequestRecord => { [errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] _ Ops.SModel[ dfFile: req.dfFile, action: req.action, interact: req.interact, clientData: req.clientData, log: req.log ]; ReportTermination[]; }; req: verify OpsQ.RequestRecord => { [errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] _ Ops.Verify[ dfFile: req.dfFile, interact: req.interact, clientData: req.clientData, log: req.log ]; ReportTermination[]; }; ENDCASE => ERROR; }; ProcessProps.PushPropList[ List.PutAssoc[key: $WorkingDirectory, val: request.wDir, aList: NIL], WorkerInner]; IF ([remaining: remaining] _ Done[queue]).done THEN EXIT; ENDLOOP; queue.idleNotifier[queue, remaining, queue.clientData]; }; Done: ENTRY PROC [queue: Queue] RETURNS [done: BOOL, remaining: LIST OF OpsQ.Request _ NIL] = { IF queue.abortPending THEN { remaining _ queue.pending; queue.pending _ NIL; queue.abortPending _ FALSE; } ELSE queue.pending _ queue.pending.rest; done _ queue.pending = NIL; }; END. OpsQueueImpl.mesa last edited by Levin on December 6, 1983 1:09 pm Exports to DFOperationsQueue We don't try to abort the executing operation, since, in practice, our client will worry about that. Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists Internal Procedures Assert: queue.pending ~= NIL. To preserve the monitor invariant (see Enqueue), the request is left on the queue while it is being executed. Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists Êñ˜Jšœ™Jšœ0™0J˜šÏk ˜ Jšœ œ˜/JšœœB˜YJšœœ ˜Jšœœ5˜BJšœ œ˜"—J˜š œœœœœœ ˜?Jšœ*˜1Jšœ˜—J˜Jš˜J˜Jšœ,˜0J˜Jšœ™J˜Jšœœœ ˜J˜š œ œœ œœ˜-Jšœ œœœ˜$Jšœ œœœ˜(Jšœ˜Jšœœœ˜Jšœ ˜ Jšœ œ˜Jšœ˜—J˜šÏnœœœ˜Jšœ8˜8Jš œ"œœœœ˜AJšœ ˜šœœ˜Jšœ˜Jšœ˜Jšœ˜Jšœ˜—Jšœ˜—J˜š žœœœœœœ˜:Jšœœ˜J˜J™—šžœœœœ˜+šœœœ˜Jšœœ˜Jšœd™dJ˜—J˜—J˜šžœœœœ*˜DJšœX™XJš œœœœ œ˜.šœœœ˜Jšœ˜Jšœ˜J˜—šœ˜Jšœ'˜'Jšœœ˜,J˜—J˜—J˜J™J˜šžœœ˜Jšœ œœ˜ Jšœ$˜$Jšœ™š˜J˜,šž œœ˜Jšœ"œ˜*šžœœ˜šœ˜šœ"˜%Jšœ˜Jšœ˜Jšœ˜Jšœ˜Jšœ˜Jšœ˜—Jšœ˜Jšœ˜—J˜—šœ œ˜˜&šœU˜UJšœ˜Jšœ'˜'Jšœ˜Jšœ˜Jšœ ˜ Jšœ˜—Jšœ˜J˜—˜#šœR˜RJšœ˜Jšœ˜Jšœ˜Jšœ˜Jšœ ˜ Jšœ˜—Jšœ˜J˜—˜#šœR˜RJšœ˜Jšœ˜Jšœ˜Jšœ ˜ Jšœ˜—Jšœ˜J˜—Jšœœ˜—J˜—Jšœm™mšœ˜Jšœ@œ˜S—Jšœ-œœ˜9Jšœ˜—Jšœ7˜7J˜—J˜šžœœœ˜Jš œœ œœœ˜?JšœX™Xšœœ˜J˜Jšœœ˜Jšœœ˜J˜—Jšœ$˜(Jšœœ˜J˜—J˜Jšœ˜J˜J˜—…— Øç