DIRECTORY FileStream USING [ErrorFromStream, OpenFileFromStream, StreamFromOpenFile, StreamFromOpenStream], FileStreamPrivate USING [ BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcessNode ], FS USING [ErrorDesc, ExtendFileProc, InitialPosition, Lock, OpenFile, StreamBufferParms, StreamOptions], IO USING [STREAM], Process USING [Detach, Pause, Priority, prioritySysForeground, SecondsToTicks, SetPriority, Ticks], Rope USING [ROPE]; FileStreamMoreImpl: CEDAR MONITOR IMPORTS FileStream, FileStreamPrivate, Process EXPORTS FileStreamPrivate, FS = BEGIN OPEN Rope, IO; FileData: TYPE = FileStreamPrivate.FileData; BufferNode: TYPE = FileStreamPrivate.BufferNode; FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle; BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle; MaxNumberOfProcesses: NAT _ 8 ; NumberOfProcesses: NAT = 3 ; nProcesses: CARD _ 0; ModuleCondition: CONDITION; QueueRecord: TYPE = RECORD [ fileData: FileDataHandle, node: BufferNodeHandle ]; QUEUESIZE: INT = 20 ; QueueRec: TYPE = RECORD [ queue: ARRAY [0..QUEUESIZE) OF QueueRecord, putQueue: INT _ 0, getQueue: INT _ 0, noEntries: INT _ 0, queueStuck: BOOL _ FALSE ]; Queue: REF QueueRec _ NIL; StartRequest: PUBLIC ENTRY PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = { tempCount: INT _ Queue.noEntries ; ptr: INT _ Queue.getQueue ; IF node.status # needsParallelRead AND node.status # needsParallelWrite THEN ERROR ; WHILE tempCount > 0 DO IF Queue.queue[ptr].node = node THEN ERROR ; ptr _ IF ptr = QUEUESIZE-1 THEN 0 ELSE ptr+1 ; tempCount _ tempCount - 1 ; ENDLOOP; WHILE Queue.noEntries >= QUEUESIZE - 1 DO WAIT ModuleCondition ; ENDLOOP; Queue.queue[Queue.putQueue] _ [fileData, node --, node.status, node.firstFileByteInBuffer--]; Queue.putQueue _ IF Queue.putQueue = QUEUESIZE-1 THEN 0 ELSE Queue.putQueue+1 ; Queue.noEntries _ Queue.noEntries + 1; Queue.queueStuck _ FALSE; BROADCAST ModuleCondition ; }; FindSomethingToDo: ENTRY PROC [] RETURNS [fileData: FileDataHandle, node: BufferNodeHandle ] = { WHILE Queue.noEntries = 0 DO WAIT ModuleCondition ; ENDLOOP; [fileData, node ] _ Queue.queue[Queue.getQueue] ; Queue.queue[Queue.getQueue] _ [NIL,NIL]; Queue.getQueue _ IF Queue.getQueue = QUEUESIZE-1 THEN 0 ELSE Queue.getQueue+1 ; Queue.noEntries _ Queue.noEntries - 1; Queue.queueStuck _ FALSE; BROADCAST ModuleCondition ; }; FileStreamForegroundProcess: PROC [] = { Process.SetPriority[Process.prioritySysForeground]; DO fileData: FileDataHandle ; node: BufferNodeHandle; [fileData, node] _ FindSomethingToDo []; FileStreamPrivate.ProcessNode[fileData, node] ; ENDLOOP; }; FileStreamForegroundWatcher: PROC [] = { threeSeconds: Process.Ticks _ Process.SecondsToTicks[3]; Process.SetPriority[Process.prioritySysForeground]; DO IF Queue.noEntries # 0 AND nProcesses < MaxNumberOfProcesses THEN { IF Queue.queueStuck THEN TRUSTED { Process.Detach[FORK FileStreamForegroundProcess[]]; }; Queue.queueStuck _ TRUE; }; Process.Pause[threeSeconds]; ENDLOOP; }; StreamFromOpenFile: PUBLIC PROC [openFile: FS.OpenFile, accessRights: FS.Lock, initialPosition: FS.InitialPosition, streamOptions: FS.StreamOptions, streamBufferParms: FS.StreamBufferParms, extendFileProc: FS.ExtendFileProc] RETURNS [STREAM] = { RETURN [FileStream.StreamFromOpenFile[openFile: openFile, accessRights: accessRights, initialPosition: initialPosition, streamOptions: streamOptions, streamBufferParms: streamBufferParms, extendFileProc: extendFileProc]]; }; StreamFromOpenStream: PUBLIC PROC [self: STREAM] RETURNS [STREAM] = { RETURN [FileStream.StreamFromOpenStream[self]]; }; OpenFileFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.OpenFile] = { RETURN [FileStream.OpenFileFromStream[self]]; }; ErrorFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.ErrorDesc] = { RETURN [FileStream.ErrorFromStream[self]]; }; Init: PROC [] = { Queue _ NEW[QueueRec]; THROUGH [1..NumberOfProcesses] DO nProcesses _ nProcesses + 1; TRUSTED {Process.Detach[FORK FileStreamForegroundProcess[]]; }; ENDLOOP; TRUSTED {Process.Detach[FORK FileStreamForegroundWatcher[]]; }; }; Init[]; END. CHANGE LOG Created by Hagmann on December 6, 1983 3:02 pm Modified by Hagmann on December 19, 1984 9:49:48 am PST FileStreamMoreImpl.mesa Copyright Σ 1985, 1986 by Xerox Corporation. All rights reserved. Hagmann on December 6, 1983 4:50 pm Russ Atkinson (RRA) February 4, 1985 3:27:09 pm PST Bob Hagmann October 17, 1988 1:27:53 pm PDT Please maintain change log at end of file. This compilation is logically part of FileStreamImpl, but must be separate since it uses a module monitor (FileStreamImpl uses object monitors) and we export to FS here (there is a type conflict on Close). ,nodeStatus: FileStreamPrivate.NodeStatus ,nodeFirstFileByteInBuffer: INT Queue up a request. Normal case is to queue and BROADCAST without waiting. If the queue is full, then wait for an entry to free up and then queue request. Except when the queue is full (and thus we are really behind), this returns immediately. This procedure is where the processes all wait for something to do. This procedure is the top of the processes that do all the parallel reads and writes. This procedure is the top of the processes that do all the parallel reads and writes. Implementation of the "workstation FS" subclass of FileStream. This is mostly a veneer over the generic FileStream; only stream creation is special. The following procedures simply re-export the ones from FileStream. However, directly exporting the procedure values imported from FileStream does not work for some reason. Initialization From a suggestion by Mark Brown, implement a cache of processes on a package wide basis. Reformatted, and changed the queue be a collectable object to reduce MDS and GFI use. Bob Hagmann June 10, 1986 8:22:35 am PDT merged FSStreamImpl and FileStreamProcessCacheImpl to form this program Bob Hagmann November 12, 1986 4:48:12 pm PST added FileStreamForegroundWatcher - grow the number of foreground processes if needed Κθ˜codešœ™KšœB™BKšΟc#™#J™3K™+—K™Kš*™*K™K™ΞK˜šΟk ˜ Kšœ žœQ˜aKšœžœU˜lKšžœžœ`˜hKšžœžœžœ˜KšœžœV˜cKšœžœžœ˜K˜—šΟnœžœž˜!Kšžœ'˜.Kšžœžœž˜%—˜Kšžœžœ˜K˜Kšœ žœ˜,Kšœ žœ ˜0Kšœžœ$˜8Kšœžœ&˜