OpsQueueImpl.mesa
last edited by Levin on December 6, 1983 1:09 pm
DIRECTORY
DFOperations USING [BringOver, SModel, Verify],
DFOperationsQueue USING [NotifierProc, OpTerminationInteraction, Request, RequestRecord],
List USING [PutAssoc],
Process USING [Detach, Priority, priorityBackground, SetPriority],
ProcessProps USING [PushPropList];
OpsQueueImpl:
CEDAR
MONITOR
LOCKS queue.
LOCK
USING queue: Queue
IMPORTS DFOperations, List, Process, ProcessProps
EXPORTS DFOperationsQueue =
BEGIN
OPEN OpsQ: DFOperationsQueue, Ops: DFOperations;
Exports to DFOperationsQueue
Queue: TYPE = REF QueueObject;
QueueObject:
PUBLIC
TYPE =
MONITORED
RECORD [
pending: LIST OF OpsQ.Request ← NIL,
pendingTail: LIST OF OpsQ.Request ← NIL,
priority: Process.Priority,
abortPending: BOOL ← FALSE,
idleNotifier: OpsQ.NotifierProc,
clientData: REF ANY
];
Create:
PUBLIC
PROC [
priority: Process.Priority ← Process.priorityBackground,
idleNotifier: OpsQ.NotifierProc ← NIL, clientData: REF ANY ← NIL]
RETURNS [Queue] = {
RETURN[
NEW[QueueObject ← [
priority: priority,
idleNotifier: idleNotifier,
clientData: clientData
]]];
};
Empty:
PUBLIC
ENTRY
PROC [queue: Queue]
RETURNS [
BOOL] = {
RETURN[queue.pending = NIL]
};
Abort:
PUBLIC
ENTRY
PROC [queue: Queue] = {
IF queue.pending ~=
NIL
THEN {
queue.abortPending ← TRUE;
We don't try to abort the executing operation, since, in practice, our client will worry about that.
};
};
Enqueue:
PUBLIC
ENTRY
PROC [queue: Queue, request: OpsQ.Request] = {
Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists
rL: LIST OF OpsQ.Request = CONS[request, NIL];
IF queue.pending ~=
NIL
THEN {
queue.pendingTail.rest ← rL;
queue.pendingTail ← rL;
}
ELSE {
queue.pending ← queue.pendingTail ← rL;
TRUSTED{Process.Detach[FORK Worker[queue]]};
};
};
Internal Procedures
Worker:
PROC [queue: Queue] = {
remaining: LIST OF OpsQ.Request;
Process.SetPriority[queue.priority];
Assert: queue.pending ~= NIL.
DO
request: OpsQ.Request = queue.pending.first;
WorkerInner:
PROC = {
errors, warnings, filesActedUpon: INT ← 0;
ReportTermination:
PROC = {
[] ← request.interact[
NEW[OpsQ.OpTerminationInteraction ← [
op: request.op,
dfFile: request.dfFile,
filesActedUpon: filesActedUpon,
errors: errors,
warnings: warnings
]],
request.clientData
];
};
WITH request^
SELECT
FROM
req: bringOver OpsQ.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← Ops.BringOver[
dfFile: req.dfFile,
filter: req.filter, action: req.action,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
req: sModel OpsQ.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← Ops.SModel[
dfFile: req.dfFile,
action: req.action,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
req: verify OpsQ.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← Ops.Verify[
dfFile: req.dfFile,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
ENDCASE => ERROR;
};
To preserve the monitor invariant (see Enqueue), the request is left on the queue while it is being executed.
ProcessProps.PushPropList[
List.PutAssoc[key: $WorkingDirectory, val: request.wDir, aList: NIL], WorkerInner];
IF ([remaining: remaining] ← Done[queue]).done THEN EXIT;
ENDLOOP;
queue.idleNotifier[queue, remaining, queue.clientData];
};
Done:
ENTRY
PROC [queue: Queue]
RETURNS [done: BOOL, remaining: LIST OF OpsQ.Request ← NIL] = {
Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists
IF queue.abortPending
THEN {
remaining ← queue.pending;
queue.pending ← NIL;
queue.abortPending ← FALSE;
}
ELSE queue.pending ← queue.pending.rest;
done ← queue.pending = NIL;
};
END.