OpsQueueImpl.mesa
last edited by Levin on December 6, 1983 1:09 pm
DIRECTORY
DFOperations USING [BringOver, SModel, Verify],
DFOperationsQueue USING [NotifierProc, OpTerminationInteraction, Request, RequestRecord],
List USING [PutAssoc],
Process USING [Detach, Priority, priorityBackground, SetPriority],
ProcessProps USING [PushPropList];
OpsQueueImpl: CEDAR MONITOR LOCKS queue.LOCK USING queue: Queue
IMPORTS DFOperations, List, Process, ProcessProps
EXPORTS DFOperationsQueue =
BEGIN
OPEN OpsQ: DFOperationsQueue, Ops: DFOperations;
Exports to DFOperationsQueue
Queue: TYPE = REF QueueObject;
QueueObject: PUBLIC TYPE = MONITORED RECORD [
pending: LIST OF OpsQ.Request ← NIL,
pendingTail: LIST OF OpsQ.Request ← NIL,
priority: Process.Priority,
abortPending: BOOLFALSE,
idleNotifier: OpsQ.NotifierProc,
clientData: REF ANY
];
Create: PUBLIC PROC [
priority: Process.Priority ← Process.priorityBackground,
idleNotifier: OpsQ.NotifierProc ← NIL, clientData: REF ANYNIL]
RETURNS [Queue] = {
RETURN[NEW[QueueObject ← [
priority: priority,
idleNotifier: idleNotifier,
clientData: clientData
]]];
};
Empty: PUBLIC ENTRY PROC [queue: Queue] RETURNS [BOOL] = {
RETURN[queue.pending = NIL]
};
Abort: PUBLIC ENTRY PROC [queue: Queue] = {
IF queue.pending ~= NIL THEN {
queue.abortPending ← TRUE;
We don't try to abort the executing operation, since, in practice, our client will worry about that.
};
};
Enqueue: PUBLIC ENTRY PROC [queue: Queue, request: OpsQ.Request] = {
Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists
rL: LIST OF OpsQ.Request = CONS[request, NIL];
IF queue.pending ~= NIL THEN {
queue.pendingTail.rest ← rL;
queue.pendingTail ← rL;
}
ELSE {
queue.pending ← queue.pendingTail ← rL;
TRUSTED{Process.Detach[FORK Worker[queue]]};
};
};
Internal Procedures
Worker: PROC [queue: Queue] = {
remaining: LIST OF OpsQ.Request;
Process.SetPriority[queue.priority];
Assert: queue.pending ~= NIL.
DO
request: OpsQ.Request = queue.pending.first;
WorkerInner: PROC = {
errors, warnings, filesActedUpon: INT ← 0;
ReportTermination: PROC = {
[] ← request.interact[
NEW[OpsQ.OpTerminationInteraction ← [
op: request.op,
dfFile: request.dfFile,
filesActedUpon: filesActedUpon,
errors: errors,
warnings: warnings
]],
request.clientData
];
};
WITH request^ SELECT FROM
req: bringOver OpsQ.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← Ops.BringOver[
dfFile: req.dfFile,
filter: req.filter, action: req.action,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
req: sModel OpsQ.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← Ops.SModel[
dfFile: req.dfFile,
action: req.action,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
req: verify OpsQ.RequestRecord => {
[errors: errors, warnings: warnings, filesActedUpon: filesActedUpon] ← Ops.Verify[
dfFile: req.dfFile,
interact: req.interact,
clientData: req.clientData,
log: req.log
];
ReportTermination[];
};
ENDCASE => ERROR;
};
To preserve the monitor invariant (see Enqueue), the request is left on the queue while it is being executed.
ProcessProps.PushPropList[
List.PutAssoc[key: $WorkingDirectory, val: request.wDir, aList: NIL], WorkerInner];
IF ([remaining: remaining] ← Done[queue]).done THEN EXIT;
ENDLOOP;
queue.idleNotifier[queue, remaining, queue.clientData];
};
Done: ENTRY PROC [queue: Queue]
RETURNS [done: BOOL, remaining: LIST OF OpsQ.Request ← NIL] = {
Invariant (while outside the monitor): queue.pending = NIL iff no worker process exists
IF queue.abortPending THEN {
remaining ← queue.pending;
queue.pending ← NIL;
queue.abortPending ← FALSE;
}
ELSE queue.pending ← queue.pending.rest;
done ← queue.pending = NIL;
};
END.