FileStreamProcessCacheImpl.mesa
Copyright © 1985 by Xerox Corporation. All rights reserved.
Hagmann on December 6, 1983 4:50 pm
Russ Atkinson (RRA) February 4, 1985 3:27:09 pm PST
Please maintain change log at end of file.
DIRECTORY
FileStreamPrivate USING [ BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcessNode ],
Process USING [Detach, Priority, priorityForeground, SetPriority];
FileStreamProcessCacheImpl: CEDAR MONITOR
IMPORTS FileStreamPrivate, Process
EXPORTS FileStreamPrivate = BEGIN
FileData: TYPE = FileStreamPrivate.FileData;
BufferNode: TYPE = FileStreamPrivate.BufferNode;
FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle;
BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle;
NumberOfProcesses: NAT = 2 ;
ModuleCondition: CONDITION;
QueueRecord: TYPE = RECORD [
fileData: FileDataHandle,
node: BufferNodeHandle
,nodeStatus: FileStreamPrivate.NodeStatus
,nodeFirstFileByteInBuffer: INT
];
QUEUESIZE: INT = 20 ;
QueueRec: TYPE = RECORD [
queue: ARRAY [0..QUEUESIZE) OF QueueRecord,
putQueue: INT ← 0,
getQueue: INT ← 0,
noEntries: INT ← 0
];
Queue: REF QueueRec ← NIL;
StartRequest: PUBLIC ENTRY PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = {
Queue up a request. Normal case is to queue and BROADCAST without waiting. If the queue is full, then wait for an entry to free up and then queue request. Except when the queue is full (and thus we are really behind), this returns immediately.
tempCount: INT ← Queue.noEntries ;
ptr: INT ← Queue.getQueue ;
IF node.status # needsParallelRead AND node.status # needsParallelWrite THEN ERROR ;
WHILE tempCount > 0 DO
IF Queue.queue[ptr].node = node THEN ERROR ;
ptr ← IF ptr = QUEUESIZE-1 THEN 0 ELSE ptr+1 ;
tempCount ← tempCount - 1 ;
ENDLOOP;
WHILE Queue.noEntries >= QUEUESIZE - 1 DO
WAIT ModuleCondition ;
ENDLOOP;
Queue.queue[Queue.putQueue] ← [fileData, node --, node.status, node.firstFileByteInBuffer--];
Queue.putQueue ← IF Queue.putQueue = QUEUESIZE-1 THEN 0 ELSE Queue.putQueue+1 ;
Queue.noEntries ← Queue.noEntries + 1;
BROADCAST ModuleCondition ;
};
FindSomethingToDo: ENTRY PROC [] RETURNS [fileData: FileDataHandle, node: BufferNodeHandle ] = {
This procedure is where the processes all wait for something to do.
WHILE Queue.noEntries = 0 DO
WAIT ModuleCondition ;
ENDLOOP;
[fileData, node ] ← Queue.queue[Queue.getQueue] ;
Queue.queue[Queue.getQueue] ← [NIL,NIL];
Queue.getQueue ← IF Queue.getQueue = QUEUESIZE-1 THEN 0 ELSE Queue.getQueue+1 ;
Queue.noEntries ← Queue.noEntries - 1;
BROADCAST ModuleCondition ;
};
FileStreamForegroundProcess: PROC [] = {
This procedure is the top of the processes that do all the parallel reads and writes.
Process.SetPriority[Process.priorityForeground];
DO
fileData: FileDataHandle ;
node: BufferNodeHandle;
[fileData, node] ← FindSomethingToDo [];
FileStreamPrivate.ProcessNode[fileData, node] ;
ENDLOOP;
};
Initialization
Init: PROC [] = {
Queue ← NEW[QueueRec];
THROUGH [1..NumberOfProcesses] DO
TRUSTED {Process.Detach[FORK FileStreamForegroundProcess[]]; };
ENDLOOP;
};
Init[];
END.
CHANGE LOG
Created by Hagmann on December 6, 1983 3:02 pm
From a suggestion by Mark Brown, implement a cache of processes on a package wide basis.
Modified by Hagmann on December 19, 1984 9:49:48 am PST
Reformatted, and changed the queue be a collectable object to reduce MDS and GFI use.