DIRECTORY BasicTime USING [GMT], FileStream USING [ErrorFromStream, OpenFileFromStream, StreamFromOpenFile, StreamFromOpenStream], FileStreamPrivate USING [ BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcessNode ], FS USING [AccessOptions, ByteCount, Close, Create, Error, ErrorDesc, ExtendFileProc, InitialPosition, Lock, Open, OpenFile, OpenOrCreate, PagesForBytes, StreamBufferParms, StreamOptions], IO USING [STREAM], Process USING [Detach, Priority, priorityForeground, SetPriority], Rope USING [ROPE]; FileStreamMoreImpl: CEDAR MONITOR IMPORTS FileStream, FileStreamPrivate, FS, Process EXPORTS FileStreamPrivate, FS = BEGIN OPEN Rope, IO; FileData: TYPE = FileStreamPrivate.FileData; BufferNode: TYPE = FileStreamPrivate.BufferNode; FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle; BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle; NumberOfProcesses: NAT = 2 ; ModuleCondition: CONDITION; QueueRecord: TYPE = RECORD [ fileData: FileDataHandle, node: BufferNodeHandle ]; QUEUESIZE: INT = 20 ; QueueRec: TYPE = RECORD [ queue: ARRAY [0..QUEUESIZE) OF QueueRecord, putQueue: INT _ 0, getQueue: INT _ 0, noEntries: INT _ 0 ]; Queue: REF QueueRec _ NIL; StartRequest: PUBLIC ENTRY PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = { tempCount: INT _ Queue.noEntries ; ptr: INT _ Queue.getQueue ; IF node.status # needsParallelRead AND node.status # needsParallelWrite THEN ERROR ; WHILE tempCount > 0 DO IF Queue.queue[ptr].node = node THEN ERROR ; ptr _ IF ptr = QUEUESIZE-1 THEN 0 ELSE ptr+1 ; tempCount _ tempCount - 1 ; ENDLOOP; WHILE Queue.noEntries >= QUEUESIZE - 1 DO WAIT ModuleCondition ; ENDLOOP; Queue.queue[Queue.putQueue] _ [fileData, node --, node.status, node.firstFileByteInBuffer--]; Queue.putQueue _ IF Queue.putQueue = QUEUESIZE-1 THEN 0 ELSE Queue.putQueue+1 ; Queue.noEntries _ Queue.noEntries + 1; BROADCAST ModuleCondition ; }; FindSomethingToDo: ENTRY PROC [] RETURNS [fileData: FileDataHandle, node: BufferNodeHandle ] = { WHILE Queue.noEntries = 0 DO WAIT ModuleCondition ; ENDLOOP; [fileData, node ] _ Queue.queue[Queue.getQueue] ; Queue.queue[Queue.getQueue] _ [NIL,NIL]; Queue.getQueue _ IF Queue.getQueue = QUEUESIZE-1 THEN 0 ELSE Queue.getQueue+1 ; Queue.noEntries _ Queue.noEntries - 1; BROADCAST ModuleCondition ; }; FileStreamForegroundProcess: PROC [] = { Process.SetPriority[Process.priorityForeground]; DO fileData: FileDataHandle ; node: BufferNodeHandle; [fileData, node] _ FindSomethingToDo []; FileStreamPrivate.ProcessNode[fileData, node] ; ENDLOOP; }; StreamOpen: PUBLIC PROC [fileName: ROPE, accessOptions: FS.AccessOptions, streamOptions: FS.StreamOptions, keep: CARDINAL, createByteCount: FS.ByteCount, streamBufferParms: FS.StreamBufferParms, extendFileProc: FS.ExtendFileProc, wantedCreatedTime: BasicTime.GMT, remoteCheck: BOOL, wDir: ROPE] RETURNS [STREAM] = { fileHandle: FS.OpenFile = SELECT accessOptions FROM $read => FS.Open[name: fileName, wantedCreatedTime: wantedCreatedTime, remoteCheck: remoteCheck, wDir: wDir], $create => FS.Create[name: fileName, keep: keep, pages: FS.PagesForBytes[createByteCount], wDir: wDir], $append => FS.OpenOrCreate[name: fileName, keep: keep, pages: FS.PagesForBytes[createByteCount], wDir: wDir], $write => FS.Open[name: fileName, lock: $write, wantedCreatedTime: wantedCreatedTime, remoteCheck: remoteCheck, wDir: wDir], ENDCASE => ERROR; RETURN[FileStream.StreamFromOpenFile[openFile: fileHandle, accessRights: IF accessOptions = $read THEN $read ELSE $write, initialPosition: IF accessOptions = $append THEN $end ELSE $start, streamOptions: streamOptions, streamBufferParms: streamBufferParms, extendFileProc: extendFileProc ! FS.Error => FS.Close[file: fileHandle]]]; }; StreamFromOpenFile: PUBLIC PROC [openFile: FS.OpenFile, accessRights: FS.Lock, initialPosition: FS.InitialPosition, streamOptions: FS.StreamOptions, streamBufferParms: FS.StreamBufferParms, extendFileProc: FS.ExtendFileProc] RETURNS [STREAM] = { RETURN [FileStream.StreamFromOpenFile[openFile: openFile, accessRights: accessRights, initialPosition: initialPosition, streamOptions: streamOptions, streamBufferParms: streamBufferParms, extendFileProc: extendFileProc]]; }; StreamFromOpenStream: PUBLIC PROC [self: STREAM] RETURNS [STREAM] = { RETURN [FileStream.StreamFromOpenStream[self]]; }; OpenFileFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.OpenFile] = { RETURN [FileStream.OpenFileFromStream[self]]; }; ErrorFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.ErrorDesc] = { RETURN [FileStream.ErrorFromStream[self]]; }; Init: PROC [] = { Queue _ NEW[QueueRec]; THROUGH [1..NumberOfProcesses] DO TRUSTED {Process.Detach[FORK FileStreamForegroundProcess[]]; }; ENDLOOP; }; Init[]; END. CHANGE LOG Created by Hagmann on December 6, 1983 3:02 pm Modified by Hagmann on December 19, 1984 9:49:48 am PST *FileStreamMoreImpl.mesa Copyright c 1985 by Xerox Corporation. All rights reserved. Hagmann on December 6, 1983 4:50 pm Russ Atkinson (RRA) February 4, 1985 3:27:09 pm PST Bob Hagmann June 10, 1986 8:23:39 am PDT Please maintain change log at end of file. This compilation is logically part of FileStreamImpl, but must be seperate since it uses a module monitor (FileStreamImpl uses object monitors) and we export to FS here (there is a type conflict on Close). ,nodeStatus: FileStreamPrivate.NodeStatus ,nodeFirstFileByteInBuffer: INT Queue up a request. Normal case is to queue and BROADCAST without waiting. If the queue is full, then wait for an entry to free up and then queue request. Except when the queue is full (and thus we are really behind), this returns immediately. This procedure is where the processes all wait for something to do. This procedure is the top of the processes that do all the parallel reads and writes. Implementation of the "workstation FS" subclass of FileStream. This is mostly a veneer over the generic FileStream; only stream creation is special. The following procedures simply re-export the ones from FileStream. However, directly exporting the procedure values imported from FileStream does not work for some reason. Initialization From a suggestion by Mark Brown, implement a cache of processes on a package wide basis. Reformatted, and changed the queue be a collectable object to reduce MDS and GFI use. Bob Hagmann June 10, 1986 8:22:35 am PDT merged FSStreamImpl and FileStreamProcessCacheImpl to form this program Κη˜codešœ™Kšœ Οmœ1™