OpsQueueImpl.mesa
Copyright Ó 1985, 1986, 1991 by Xerox Corporation. All rights reserved.
created by Levin
Russ Atkinson (RRA) January 19, 1987 1:38:34 pm PST
DIRECTORY
DFOperations USING [BringOver, SModel, Verify],
DFOperationsQueue USING [NotifierProc, OpTerminationInteraction, Request, RequestRecord],
List USING [PutAssoc],
Process USING [Detach, Priority, priorityBackground, SetPriority],
ProcessProps USING [PushPropList];
OpsQueueImpl: CEDAR MONITOR LOCKS queue.LOCK USING queue: Queue
IMPORTS DFOperations, List, Process, ProcessProps
EXPORTS DFOperationsQueue
= BEGIN
Exports to DFOperationsQueue
Queue: TYPE = REF QueueObject;
QueueObject: PUBLIC TYPE = MONITORED RECORD [
pending: LIST OF DFOperationsQueue.Request ¬ NIL,
pendingTail: LIST OF DFOperationsQueue.Request ¬ NIL,
priority: Process.Priority,
abortPending: BOOL ¬ FALSE,
idleNotifier: DFOperationsQueue.NotifierProc,
clientData: REF ANY
];
Create: PUBLIC PROC [priority: Process.Priority ¬ Process.priorityBackground, idleNotifier: DFOperationsQueue.NotifierProc ¬ NIL, clientData: REF ANY ¬ NIL] RETURNS [Queue] = {
RETURN[NEW[QueueObject ¬ [
priority: priority,
idleNotifier: idleNotifier,
clientData: clientData
]]];
};
Empty: PUBLIC ENTRY PROC [queue: Queue] RETURNS [BOOL] = {
RETURN[queue.pending = NIL]
};
Abort: PUBLIC ENTRY PROC [queue: Queue] = {
IF queue.pending ~= NIL THEN {
queue.abortPending ¬ TRUE;
We don't try to abort the executing operation, since, in practice, our client will worry about that.
};
};
Enqueue: PUBLIC ENTRY PROC [queue: Queue, request: DFOperationsQueue.Request] = {
Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists
rL: LIST OF DFOperationsQueue.Request = CONS[request, NIL];
IF queue.pending ~= NIL THEN {
queue.pendingTail.rest ¬ rL;
queue.pendingTail ¬ rL;
}
ELSE {
queue.pending ¬ queue.pendingTail ¬ rL;
TRUSTED{Process.Detach[FORK Worker[queue]]};
};
};
Internal Procedures
Worker: PROC [queue: Queue] = {
remaining: LIST OF DFOperationsQueue.Request;
Process.SetPriority[queue.priority];
Assert: queue.pending ~= NIL.
DO
request: DFOperationsQueue.Request = queue.pending.first;
WorkerInner: PROC = {
errors, warnings, filesActedUpon: INT ¬ 0;
ReportTermination: PROC = {
[] ¬ request.interact[
NEW[DFOperationsQueue.OpTerminationInteraction ¬ [
op: request.op,
dfFile: request.dfFile,
filesActedUpon: filesActedUpon,
errors: errors,
warnings: warnings
]],
request.clientData
];
};
WITH request­ SELECT FROM
req: bringOver DFOperationsQueue.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ¬ DFOperations.BringOver[
dfFile: req.dfFile,
filter: req.filter, action: req.action,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
req: sModel DFOperationsQueue.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ¬ DFOperations.SModel[
dfFile: req.dfFile,
action: req.action,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
req: verify DFOperationsQueue.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ¬ DFOperations.Verify[
dfFile: req.dfFile,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
ENDCASE => ERROR;
};
To preserve the monitor invariant (see Enqueue), the request is left on the queue while it is being executed.
ProcessProps.PushPropList[
List.PutAssoc[key: $WorkingDirectory, val: request.wDir, aList: NIL], WorkerInner];
IF ([remaining: remaining] ¬ Done[queue]).done THEN EXIT;
ENDLOOP;
queue.idleNotifier[queue, remaining, queue.clientData];
};
Done: ENTRY PROC [queue: Queue] RETURNS [done: BOOL, remaining: LIST OF DFOperationsQueue.Request ¬ NIL] = {
Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists
IF queue.abortPending THEN {
remaining ¬ queue.pending;
queue.pending ¬ NIL;
queue.abortPending ¬ FALSE;
}
ELSE queue.pending ¬ queue.pending.rest;
done ¬ queue.pending = NIL;
};
END.