<> <> <> <> <> <<>> <> <<>> <> DIRECTORY FileStream USING [ErrorFromStream, OpenFileFromStream, StreamFromOpenFile, StreamFromOpenStream], FileStreamPrivate USING [ BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcessNode ], FS USING [ErrorDesc, ExtendFileProc, InitialPosition, Lock, OpenFile, StreamBufferParms, StreamOptions], IO USING [STREAM], Process USING [Detach, Pause, Priority, prioritySysForeground, SecondsToTicks, SetPriority, Ticks], Rope USING [ROPE]; FileStreamMoreImpl: CEDAR MONITOR IMPORTS FileStream, FileStreamPrivate, Process EXPORTS FileStreamPrivate, FS = BEGIN OPEN Rope, IO; FileData: TYPE = FileStreamPrivate.FileData; BufferNode: TYPE = FileStreamPrivate.BufferNode; FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle; BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle; MaxNumberOfProcesses: NAT _ 8 ; NumberOfProcesses: NAT = 3 ; nProcesses: CARD _ 0; ModuleCondition: CONDITION; QueueRecord: TYPE = RECORD [ fileData: FileDataHandle, node: BufferNodeHandle <<,nodeStatus: FileStreamPrivate.NodeStatus>> <<,nodeFirstFileByteInBuffer: INT>> ]; QUEUESIZE: INT = 20 ; QueueRec: TYPE = RECORD [ queue: ARRAY [0..QUEUESIZE) OF QueueRecord, putQueue: INT _ 0, getQueue: INT _ 0, noEntries: INT _ 0, queueStuck: BOOL _ FALSE ]; Queue: REF QueueRec _ NIL; StartRequest: PUBLIC ENTRY PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = { <> tempCount: INT _ Queue.noEntries ; ptr: INT _ Queue.getQueue ; IF node.status # needsParallelRead AND node.status # needsParallelWrite THEN ERROR ; WHILE tempCount > 0 DO IF Queue.queue[ptr].node = node THEN ERROR ; ptr _ IF ptr = QUEUESIZE-1 THEN 0 ELSE ptr+1 ; tempCount _ tempCount - 1 ; ENDLOOP; WHILE Queue.noEntries >= QUEUESIZE - 1 DO WAIT ModuleCondition ; ENDLOOP; Queue.queue[Queue.putQueue] _ [fileData, node --, node.status, node.firstFileByteInBuffer--]; Queue.putQueue _ IF Queue.putQueue = QUEUESIZE-1 THEN 0 ELSE Queue.putQueue+1 ; Queue.noEntries _ Queue.noEntries + 1; Queue.queueStuck _ FALSE; BROADCAST ModuleCondition ; }; FindSomethingToDo: ENTRY PROC [] RETURNS [fileData: FileDataHandle, node: BufferNodeHandle ] = { <> WHILE Queue.noEntries = 0 DO WAIT ModuleCondition ; ENDLOOP; [fileData, node ] _ Queue.queue[Queue.getQueue] ; Queue.queue[Queue.getQueue] _ [NIL,NIL]; Queue.getQueue _ IF Queue.getQueue = QUEUESIZE-1 THEN 0 ELSE Queue.getQueue+1 ; Queue.noEntries _ Queue.noEntries - 1; Queue.queueStuck _ FALSE; BROADCAST ModuleCondition ; }; FileStreamForegroundProcess: PROC [] = { <> Process.SetPriority[Process.prioritySysForeground]; DO fileData: FileDataHandle ; node: BufferNodeHandle; [fileData, node] _ FindSomethingToDo []; FileStreamPrivate.ProcessNode[fileData, node] ; ENDLOOP; }; FileStreamForegroundWatcher: PROC [] = { <> threeSeconds: Process.Ticks _ Process.SecondsToTicks[3]; Process.SetPriority[Process.prioritySysForeground]; DO IF Queue.noEntries # 0 AND nProcesses < MaxNumberOfProcesses THEN { IF Queue.queueStuck THEN TRUSTED { Process.Detach[FORK FileStreamForegroundProcess[]]; }; Queue.queueStuck _ TRUE; }; Process.Pause[threeSeconds]; ENDLOOP; }; <> <<>> <> <<>> StreamFromOpenFile: PUBLIC PROC [openFile: FS.OpenFile, accessRights: FS.Lock, initialPosition: FS.InitialPosition, streamOptions: FS.StreamOptions, streamBufferParms: FS.StreamBufferParms, extendFileProc: FS.ExtendFileProc] RETURNS [STREAM] = { RETURN [FileStream.StreamFromOpenFile[openFile: openFile, accessRights: accessRights, initialPosition: initialPosition, streamOptions: streamOptions, streamBufferParms: streamBufferParms, extendFileProc: extendFileProc]]; }; StreamFromOpenStream: PUBLIC PROC [self: STREAM] RETURNS [STREAM] = { RETURN [FileStream.StreamFromOpenStream[self]]; }; OpenFileFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.OpenFile] = { RETURN [FileStream.OpenFileFromStream[self]]; }; ErrorFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.ErrorDesc] = { RETURN [FileStream.ErrorFromStream[self]]; }; <<>> <> Init: PROC [] = { Queue _ NEW[QueueRec]; THROUGH [1..NumberOfProcesses] DO nProcesses _ nProcesses + 1; TRUSTED {Process.Detach[FORK FileStreamForegroundProcess[]]; }; ENDLOOP; TRUSTED {Process.Detach[FORK FileStreamForegroundWatcher[]]; }; }; Init[]; END. CHANGE LOG Created by Hagmann on December 6, 1983 3:02 pm <> Modified by Hagmann on December 19, 1984 9:49:48 am PST <> <<>> <> <> <<>> <> <> <<>> <<>>