DIRECTORY FileStreamPrivate USING [ BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcessNode ], FS, IO, Process, Rope; FileStreamProcessCacheImpl: CEDAR MONITOR IMPORTS FileStreamPrivate, Process EXPORTS FileStreamPrivate = BEGIN ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; FileData: TYPE = FileStreamPrivate.FileData; BufferNode: TYPE = FileStreamPrivate.BufferNode; FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle; BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle; NumberOfProcesses: INT = 2 ; ModuleCondition: CONDITION; QueueRecord: TYPE = RECORD [ fileData: FileDataHandle, node: BufferNodeHandle ]; QUEUESIZE: INT = 20 ; QueueRec: TYPE = RECORD [ queue: ARRAY [0..QUEUESIZE) OF QueueRecord, putQueue: INT _ 0, getQueue: INT _ 0, noEntries: INT _ 0 ]; Queue: REF QueueRec _ NIL; StartRequest: PUBLIC ENTRY PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = { tempCount: INT _ Queue.noEntries ; ptr: INT _ Queue.getQueue ; IF node.status # needsParallelRead AND node.status # needsParallelWrite THEN ERROR ; WHILE tempCount > 0 DO IF Queue.queue[ptr].node = node THEN ERROR ; ptr _ IF ptr = QUEUESIZE-1 THEN 0 ELSE ptr+1 ; tempCount _ tempCount - 1 ; ENDLOOP; WHILE Queue.noEntries >= QUEUESIZE - 1 DO WAIT ModuleCondition ; ENDLOOP; Queue.queue[Queue.putQueue] _ [fileData, node --, node.status, node.firstFileByteInBuffer--]; Queue.putQueue _ IF Queue.putQueue = QUEUESIZE-1 THEN 0 ELSE Queue.putQueue+1 ; Queue.noEntries _ Queue.noEntries + 1; BROADCAST ModuleCondition ; }; FindSomethingToDo: ENTRY PROC [] RETURNS [fileData: FileDataHandle, node: BufferNodeHandle ] = { WHILE Queue.noEntries = 0 DO WAIT ModuleCondition ; ENDLOOP; [fileData, node ] _ Queue.queue[Queue.getQueue] ; Queue.queue[Queue.getQueue] _ [NIL,NIL]; Queue.getQueue _ IF Queue.getQueue = QUEUESIZE-1 THEN 0 ELSE Queue.getQueue+1 ; Queue.noEntries _ Queue.noEntries - 1; BROADCAST ModuleCondition ; }; FileStreamForegroundProcess: PROC [] = { DO fileData: FileDataHandle ; node: BufferNodeHandle; [fileData, node] _ FindSomethingToDo []; FileStreamPrivate.ProcessNode[fileData, node] ; ENDLOOP; }; Init: PROC [] = { myPriority: Process.Priority ; myPriority _ Process.GetPriority[]; Process.SetPriority[Process.priorityForeground]; Queue _ NEW[QueueRec]; FOR I:INT IN [1..NumberOfProcesses] DO process: PROCESS; process _ FORK FileStreamForegroundProcess[]; TRUSTED {Process.Detach[process]; }; ENDLOOP; Process.SetPriority[myPriority]; }; Init[]; END. CHANGE LOG Created by Hagmann on December 6, 1983 3:02 pm Modified by Hagmann on December 19, 1984 9:49:48 am PST FileStreamProcessCacheImpl.mesa Last Edited by Hagmann on December 6, 1983 4:50 pm Please maintain change log at end of file. ,nodeStatus: FileStreamPrivate.NodeStatus ,nodeFirstFileByteInBuffer: INT Queue up a request. Normal case is to queue and BROADCAST without waiting. If the queue is full, then wait for an entry to free up and then queue request. Except when the queue is full (and thus we are really behind), this returns immediately. This procedure is where the processes all wait for something to do. This procedure is the top of the processes that do all the parallel reads and writes. Initialization From a suggestion by Mark Brown, implement a cache of processes on a package wide basis. Reformatted, and changed the queue be a collectable object to reduce MDS and GFI use. Κ<˜headšœ™šΟc™Jš$™$—Jš*™*—Icode0˜šΟk ˜ LšœžœU˜lLšžœ˜Lšžœ˜L˜Lšœ˜L˜—šœžœž˜)šž˜L˜L˜—šž˜Lšœ˜——Lšœ˜Lšœž˜˜Lšžœžœžœ˜Lšžœžœžœžœ˜Lšœ žœ˜,Lšœ žœ ˜0Lšœžœ$˜8Lšœžœ&˜