<> <> <> <> <> <<>> <> <<>> <> DIRECTORY BasicTime USING [GMT], FileStream USING [ErrorFromStream, OpenFileFromStream, StreamFromOpenFile, StreamFromOpenStream], FileStreamPrivate USING [ BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcessNode ], FS USING [AccessOptions, ByteCount, Close, Create, Error, ErrorDesc, ExtendFileProc, InitialPosition, Lock, Open, OpenFile, OpenOrCreate, PagesForBytes, StreamBufferParms, StreamOptions], IO USING [STREAM], Process USING [Detach, Priority, priorityForeground, SetPriority], Rope USING [ROPE]; FileStreamMoreImpl: CEDAR MONITOR IMPORTS FileStream, FileStreamPrivate, FS, Process EXPORTS FileStreamPrivate, FS = BEGIN OPEN Rope, IO; FileData: TYPE = FileStreamPrivate.FileData; BufferNode: TYPE = FileStreamPrivate.BufferNode; FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle; BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle; NumberOfProcesses: NAT = 2 ; ModuleCondition: CONDITION; QueueRecord: TYPE = RECORD [ fileData: FileDataHandle, node: BufferNodeHandle <<,nodeStatus: FileStreamPrivate.NodeStatus>> <<,nodeFirstFileByteInBuffer: INT>> ]; QUEUESIZE: INT = 20 ; QueueRec: TYPE = RECORD [ queue: ARRAY [0..QUEUESIZE) OF QueueRecord, putQueue: INT _ 0, getQueue: INT _ 0, noEntries: INT _ 0 ]; Queue: REF QueueRec _ NIL; StartRequest: PUBLIC ENTRY PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = { <> tempCount: INT _ Queue.noEntries ; ptr: INT _ Queue.getQueue ; IF node.status # needsParallelRead AND node.status # needsParallelWrite THEN ERROR ; WHILE tempCount > 0 DO IF Queue.queue[ptr].node = node THEN ERROR ; ptr _ IF ptr = QUEUESIZE-1 THEN 0 ELSE ptr+1 ; tempCount _ tempCount - 1 ; ENDLOOP; WHILE Queue.noEntries >= QUEUESIZE - 1 DO WAIT ModuleCondition ; ENDLOOP; Queue.queue[Queue.putQueue] _ [fileData, node --, node.status, node.firstFileByteInBuffer--]; Queue.putQueue _ IF Queue.putQueue = QUEUESIZE-1 THEN 0 ELSE Queue.putQueue+1 ; Queue.noEntries _ Queue.noEntries + 1; BROADCAST ModuleCondition ; }; FindSomethingToDo: ENTRY PROC [] RETURNS [fileData: FileDataHandle, node: BufferNodeHandle ] = { <> WHILE Queue.noEntries = 0 DO WAIT ModuleCondition ; ENDLOOP; [fileData, node ] _ Queue.queue[Queue.getQueue] ; Queue.queue[Queue.getQueue] _ [NIL,NIL]; Queue.getQueue _ IF Queue.getQueue = QUEUESIZE-1 THEN 0 ELSE Queue.getQueue+1 ; Queue.noEntries _ Queue.noEntries - 1; BROADCAST ModuleCondition ; }; FileStreamForegroundProcess: PROC [] = { <> Process.SetPriority[Process.priorityForeground]; DO fileData: FileDataHandle ; node: BufferNodeHandle; [fileData, node] _ FindSomethingToDo []; FileStreamPrivate.ProcessNode[fileData, node] ; ENDLOOP; }; <> <<>> StreamOpen: PUBLIC PROC [fileName: ROPE, accessOptions: FS.AccessOptions, streamOptions: FS.StreamOptions, keep: CARDINAL, createByteCount: FS.ByteCount, streamBufferParms: FS.StreamBufferParms, extendFileProc: FS.ExtendFileProc, wantedCreatedTime: BasicTime.GMT, remoteCheck: BOOL, wDir: ROPE] RETURNS [STREAM] = { fileHandle: FS.OpenFile = SELECT accessOptions FROM $read => FS.Open[name: fileName, wantedCreatedTime: wantedCreatedTime, remoteCheck: remoteCheck, wDir: wDir], $create => FS.Create[name: fileName, keep: keep, pages: FS.PagesForBytes[createByteCount], wDir: wDir], $append => FS.OpenOrCreate[name: fileName, keep: keep, pages: FS.PagesForBytes[createByteCount], wDir: wDir], $write => FS.Open[name: fileName, lock: $write, wantedCreatedTime: wantedCreatedTime, remoteCheck: remoteCheck, wDir: wDir], ENDCASE => ERROR; RETURN[FileStream.StreamFromOpenFile[openFile: fileHandle, accessRights: IF accessOptions = $read THEN $read ELSE $write, initialPosition: IF accessOptions = $append THEN $end ELSE $start, streamOptions: streamOptions, streamBufferParms: streamBufferParms, extendFileProc: extendFileProc ! FS.Error => FS.Close[file: fileHandle]]]; }; <> <<>> StreamFromOpenFile: PUBLIC PROC [openFile: FS.OpenFile, accessRights: FS.Lock, initialPosition: FS.InitialPosition, streamOptions: FS.StreamOptions, streamBufferParms: FS.StreamBufferParms, extendFileProc: FS.ExtendFileProc] RETURNS [STREAM] = { RETURN [FileStream.StreamFromOpenFile[openFile: openFile, accessRights: accessRights, initialPosition: initialPosition, streamOptions: streamOptions, streamBufferParms: streamBufferParms, extendFileProc: extendFileProc]]; }; StreamFromOpenStream: PUBLIC PROC [self: STREAM] RETURNS [STREAM] = { RETURN [FileStream.StreamFromOpenStream[self]]; }; OpenFileFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.OpenFile] = { RETURN [FileStream.OpenFileFromStream[self]]; }; ErrorFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.ErrorDesc] = { RETURN [FileStream.ErrorFromStream[self]]; }; <<>> <> Init: PROC [] = { Queue _ NEW[QueueRec]; THROUGH [1..NumberOfProcesses] DO TRUSTED {Process.Detach[FORK FileStreamForegroundProcess[]]; }; ENDLOOP; }; Init[]; END. CHANGE LOG Created by Hagmann on December 6, 1983 3:02 pm <> Modified by Hagmann on December 19, 1984 9:49:48 am PST <> <<>> <> <> <<>> <<>>