FileStreamMoreImpl.mesa
Copyright Ó 1985, 1986 by Xerox Corporation. All rights reserved.
Hagmann on December 6, 1983 4:50 pm
Russ Atkinson (RRA) February 4, 1985 3:27:09 pm PST
Bob Hagmann October 17, 1988 1:27:53 pm PDT
Please maintain change log at end of file.
This compilation is logically part of FileStreamImpl, but must be separate since it uses a module monitor (FileStreamImpl uses object monitors) and we export to FS here (there is a type conflict on Close).
DIRECTORY
FileStream USING [ErrorFromStream, OpenFileFromStream, StreamFromOpenFile, StreamFromOpenStream],
FileStreamPrivate USING [ BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcessNode ],
FS USING [ErrorDesc, ExtendFileProc, InitialPosition, Lock, OpenFile, StreamBufferParms, StreamOptions],
IO USING [STREAM],
Process USING [Detach, Pause, Priority, prioritySysForeground, SecondsToTicks, SetPriority, Ticks],
Rope USING [ROPE];
FileStreamMoreImpl: CEDAR MONITOR
IMPORTS FileStream, FileStreamPrivate, Process
EXPORTS FileStreamPrivate, FS = BEGIN
OPEN Rope, IO;
FileData: TYPE = FileStreamPrivate.FileData;
BufferNode: TYPE = FileStreamPrivate.BufferNode;
FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle;
BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle;
MaxNumberOfProcesses: NAT ← 8 ;
NumberOfProcesses: NAT = 3 ;
nProcesses: CARD ← 0;
ModuleCondition: CONDITION;
QueueRecord: TYPE = RECORD [
fileData: FileDataHandle,
node: BufferNodeHandle
,nodeStatus: FileStreamPrivate.NodeStatus
,nodeFirstFileByteInBuffer: INT
];
QUEUESIZE: INT = 20 ;
QueueRec: TYPE = RECORD [
queue: ARRAY [0..QUEUESIZE) OF QueueRecord,
putQueue: INT ← 0,
getQueue: INT ← 0,
noEntries: INT ← 0,
queueStuck: BOOLFALSE
];
Queue: REF QueueRec ← NIL;
StartRequest: PUBLIC ENTRY PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = {
Queue up a request. Normal case is to queue and BROADCAST without waiting. If the queue is full, then wait for an entry to free up and then queue request. Except when the queue is full (and thus we are really behind), this returns immediately.
tempCount: INT ← Queue.noEntries ;
ptr: INT ← Queue.getQueue ;
IF node.status # needsParallelRead AND node.status # needsParallelWrite THEN ERROR ;
WHILE tempCount > 0 DO
IF Queue.queue[ptr].node = node THEN ERROR ;
ptr ← IF ptr = QUEUESIZE-1 THEN 0 ELSE ptr+1 ;
tempCount ← tempCount - 1 ;
ENDLOOP;
WHILE Queue.noEntries >= QUEUESIZE - 1 DO
WAIT ModuleCondition ;
ENDLOOP;
Queue.queue[Queue.putQueue] ← [fileData, node --, node.status, node.firstFileByteInBuffer--];
Queue.putQueue ← IF Queue.putQueue = QUEUESIZE-1 THEN 0 ELSE Queue.putQueue+1 ;
Queue.noEntries ← Queue.noEntries + 1;
Queue.queueStuck ← FALSE;
BROADCAST ModuleCondition ;
};
FindSomethingToDo: ENTRY PROC [] RETURNS [fileData: FileDataHandle, node: BufferNodeHandle ] = {
This procedure is where the processes all wait for something to do.
WHILE Queue.noEntries = 0 DO
WAIT ModuleCondition ;
ENDLOOP;
[fileData, node ] ← Queue.queue[Queue.getQueue] ;
Queue.queue[Queue.getQueue] ← [NIL,NIL];
Queue.getQueue ← IF Queue.getQueue = QUEUESIZE-1 THEN 0 ELSE Queue.getQueue+1 ;
Queue.noEntries ← Queue.noEntries - 1;
Queue.queueStuck ← FALSE;
BROADCAST ModuleCondition ;
};
FileStreamForegroundProcess: PROC [] = {
This procedure is the top of the processes that do all the parallel reads and writes.
Process.SetPriority[Process.prioritySysForeground];
DO
fileData: FileDataHandle ;
node: BufferNodeHandle;
[fileData, node] ← FindSomethingToDo [];
FileStreamPrivate.ProcessNode[fileData, node] ;
ENDLOOP;
};
FileStreamForegroundWatcher: PROC [] = {
This procedure is the top of the processes that do all the parallel reads and writes.
threeSeconds: Process.Ticks ← Process.SecondsToTicks[3];
Process.SetPriority[Process.prioritySysForeground];
DO
IF Queue.noEntries # 0 AND nProcesses < MaxNumberOfProcesses THEN {
IF Queue.queueStuck THEN TRUSTED {
Process.Detach[FORK FileStreamForegroundProcess[]];
};
Queue.queueStuck ← TRUE;
};
Process.Pause[threeSeconds];
ENDLOOP;
};
Implementation of the "workstation FS" subclass of FileStream. This is mostly a veneer over the generic FileStream; only stream creation is special.
The following procedures simply re-export the ones from FileStream. However, directly exporting the procedure values imported from FileStream does not work for some reason.
StreamFromOpenFile: PUBLIC PROC [openFile: FS.OpenFile, accessRights: FS.Lock, initialPosition: FS.InitialPosition, streamOptions: FS.StreamOptions, streamBufferParms: FS.StreamBufferParms, extendFileProc: FS.ExtendFileProc] RETURNS [STREAM] = {
RETURN [FileStream.StreamFromOpenFile[openFile: openFile, accessRights: accessRights, initialPosition: initialPosition, streamOptions: streamOptions, streamBufferParms: streamBufferParms, extendFileProc: extendFileProc]];
};
StreamFromOpenStream: PUBLIC PROC [self: STREAM] RETURNS [STREAM] = {
RETURN [FileStream.StreamFromOpenStream[self]];
};
OpenFileFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.OpenFile] = {
RETURN [FileStream.OpenFileFromStream[self]];
};
ErrorFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.ErrorDesc] = {
RETURN [FileStream.ErrorFromStream[self]];
};
Initialization
Init: PROC [] = {
Queue ← NEW[QueueRec];
THROUGH [1..NumberOfProcesses] DO
nProcesses ← nProcesses + 1;
TRUSTED {Process.Detach[FORK FileStreamForegroundProcess[]]; };
ENDLOOP;
TRUSTED {Process.Detach[FORK FileStreamForegroundWatcher[]]; };
};
Init[];
END.
CHANGE LOG
Created by Hagmann on December 6, 1983 3:02 pm
From a suggestion by Mark Brown, implement a cache of processes on a package wide basis.
Modified by Hagmann on December 19, 1984 9:49:48 am PST
Reformatted, and changed the queue be a collectable object to reduce MDS and GFI use.
Bob Hagmann June 10, 1986 8:22:35 am PDT
merged FSStreamImpl and FileStreamProcessCacheImpl to form this program
Bob Hagmann November 12, 1986 4:48:12 pm PST
added FileStreamForegroundWatcher - grow the number of foreground processes if needed