OpsQueueImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Levin on December 6, 1983 1:09 pm
Russ Atkinson (RRA) February 13, 1985 11:50:51 am PST
DIRECTORY
DFOperations USING [BringOver, SModel, Verify],
DFOperationsQueue USING [NotifierProc, OpTerminationInteraction, Request, RequestRecord],
List USING [PutAssoc],
Process USING [Detach, Priority, priorityBackground, SetPriority],
ProcessProps USING [PushPropList];
OpsQueueImpl:
CEDAR
MONITOR
LOCKS queue.
LOCK
USING queue: Queue
IMPORTS DFOperations, List, Process, ProcessProps
EXPORTS DFOperationsQueue
= BEGIN
Exports to DFOperationsQueue
Queue: TYPE = REF QueueObject;
QueueObject:
PUBLIC
TYPE =
MONITORED
RECORD [
pending: LIST OF DFOperationsQueue.Request ← NIL,
pendingTail: LIST OF DFOperationsQueue.Request ← NIL,
priority: Process.Priority,
abortPending: BOOL ← FALSE,
idleNotifier: DFOperationsQueue.NotifierProc,
clientData: REF ANY
];
Create:
PUBLIC
PROC [priority: Process.Priority ← Process.priorityBackground, idleNotifier: DFOperationsQueue.NotifierProc ←
NIL, clientData:
REF
ANY ←
NIL]
RETURNS [Queue] = {
RETURN[
NEW[QueueObject ← [
priority: priority,
idleNotifier: idleNotifier,
clientData: clientData
]]];
};
Empty:
PUBLIC
ENTRY
PROC [queue: Queue]
RETURNS [
BOOL] = {
RETURN[queue.pending = NIL]
};
Abort:
PUBLIC
ENTRY
PROC [queue: Queue] = {
IF queue.pending ~=
NIL
THEN {
queue.abortPending ← TRUE;
We don't try to abort the executing operation, since, in practice, our client will worry about that.
};
};
Enqueue:
PUBLIC
ENTRY
PROC [queue: Queue, request: DFOperationsQueue.Request] = {
Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists
rL: LIST OF DFOperationsQueue.Request = CONS[request, NIL];
IF queue.pending ~=
NIL
THEN {
queue.pendingTail.rest ← rL;
queue.pendingTail ← rL;
}
ELSE {
queue.pending ← queue.pendingTail ← rL;
TRUSTED{Process.Detach[FORK Worker[queue]]};
};
};
Internal Procedures
Worker:
PROC [queue: Queue] = {
remaining: LIST OF DFOperationsQueue.Request;
Process.SetPriority[queue.priority];
Assert: queue.pending ~= NIL.
DO
request: DFOperationsQueue.Request = queue.pending.first;
WorkerInner:
PROC = {
errors, warnings, filesActedUpon: INT ← 0;
ReportTermination:
PROC = {
[] ← request.interact[
NEW[DFOperationsQueue.OpTerminationInteraction ← [
op: request.op,
dfFile: request.dfFile,
filesActedUpon: filesActedUpon,
errors: errors,
warnings: warnings
]],
request.clientData
];
};
WITH request^
SELECT
FROM
req: bringOver DFOperationsQueue.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← DFOperations.BringOver[
dfFile: req.dfFile,
filter: req.filter, action: req.action,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
req: sModel DFOperationsQueue.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← DFOperations.SModel[
dfFile: req.dfFile,
action: req.action,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
req: verify DFOperationsQueue.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← DFOperations.Verify[
dfFile: req.dfFile,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
ENDCASE => ERROR;
};
To preserve the monitor invariant (see Enqueue), the request is left on the queue while it is being executed.
ProcessProps.PushPropList[
List.PutAssoc[key: $WorkingDirectory, val: request.wDir, aList: NIL], WorkerInner];
IF ([remaining: remaining] ← Done[queue]).done THEN EXIT;
ENDLOOP;
queue.idleNotifier[queue, remaining, queue.clientData];
};
Done:
ENTRY
PROC [queue: Queue]
RETURNS [done:
BOOL, remaining:
LIST
OF DFOperationsQueue.Request ←
NIL] = {
Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists
IF queue.abortPending
THEN {
remaining ← queue.pending;
queue.pending ← NIL;
queue.abortPending ← FALSE;
}
ELSE queue.pending ← queue.pending.rest;
done ← queue.pending = NIL;
};
END.