<> <> <> <> <> <> <> <> <> <> <> DIRECTORY Basics USING [bytesPerWord, LongNumber, BITAND, LowHalf, UnsafeBlock], BasicTime USING [GMT], FileStream USING [FinalizationProc], FileStreamPrivate USING[ Data, DoFinalization, FSDataHandle, BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcHandle, StartRequest ], FS USING [ByteCount, BytesForPages, Close, Error, ErrorDesc, GetInfo, Lock, nullOpenFile, OpenFile, Read, SetByteCountAndCreatedTime, SetPageCount, Write], FSLock USING [RemoveREF ], IO USING [CreateStreamProcs, EndOfStream, Error, STREAM], IOUtils USING [closedStreamProcs], PrincOps USING [ByteBltBlock], PrincOpsUtils USING [ByteBlt], Process USING [Detach, GetPriority, Priority, priorityForeground, SetPriority], Rope USING [ROPE], RuntimeError USING [BoundsFault], SafeStorage USING [EstablishFinalization, FinalizationQueue, FQNext, NewFQ], VM USING [ AddressForPageNumber, BytesForPages, Free, Interval, MakeChanged, MakeUnchanged, PageNumber, State]; FileStreamImpl: CEDAR MONITOR LOCKS fileData.lockRecord USING fileData: FileDataHandle IMPORTS Basics, FileStreamPrivate, FS, FSLock, PrincOpsUtils, IO, IOUtils, Process, RuntimeError, SafeStorage, VM EXPORTS FileStreamPrivate, FileStream = BEGIN OPEN Basics, Rope; STREAM: TYPE = IO.STREAM; ByteCount: TYPE = INT; ByteNumber: TYPE = ByteCount; -- index rather than count PageNumber: TYPE = VM.PageNumber; bytesPerFilePage: CARDINAL = FS.BytesForPages[1]; minFileExtend: INT = 10*bytesPerFilePage; Data: TYPE = FileStreamPrivate.Data; FSDataHandle: TYPE = FileStreamPrivate.FSDataHandle; BufferNode: TYPE = FileStreamPrivate.BufferNode; BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle; FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle; FileData: TYPE = FileStreamPrivate.FileData; ProcHandle: TYPE = FileStreamPrivate.ProcHandle; <> <> <> <> <> <> CleanupAfterPut: ENTRY PROC [fileData: FileDataHandle, selfData: FSDataHandle] = INLINE { <> <> <> <> currentNode: BufferNodeHandle = selfData.currentNode; IF currentNode.didPut THEN { currentNode.bufferDirty _ TRUE; IF selfData.index > currentNode.dataBytesInBuffer THEN { currentNode.dataBytesInBuffer _ selfData.index; fileData.fileLength _ currentNode.firstFileByteInBuffer + selfData.index; }; currentNode.didPut _ FALSE; }; }; EstablishFileLength: ENTRY PROC[fileData: FileDataHandle ] RETURNS [fileLength: INT] = INLINE { <> <> <> <> <> <> <> <> <<(This is mostly intended to allow the read stream to look at the file size seen>> <> ENABLE UNWIND => NULL; writeData: FSDataHandle _ fileData.writeStreamData; IF writeData = NIL OR writeData.streamIsClosed THEN { writeData _ NIL ; RETURN[fileData.fileLength] ; } ELSE { writeNode: BufferNodeHandle = writeData.currentNode ; writeNode.dataBytesInBuffer _ MAX[ writeData.index, writeNode.dataBytesInBuffer] ; fileLength _ MAX[ fileData.fileLength, writeNode.firstFileByteInBuffer + writeNode.dataBytesInBuffer ] ; fileData.fileLength _ fileLength; }; writeData _ NIL ; }; convertFStoIOError: PROC [self: STREAM, error: FS.ErrorDesc] = INLINE { selfData: FSDataHandle _ NARROW[self.streamData]; IF selfData # NIL THEN { selfData.FSErrorDesc _ error ; IF selfData.ConvertFStoIOErrors THEN IO.Error[$Failure, self]; } ELSE { IO.Error[$Failure, self]; }; }; GetChar: PUBLIC PROC [self: STREAM] RETURNS [CHAR] = { ENABLE FS.Error => { convertFStoIOError [self, error]; }; selfData: FSDataHandle _ NARROW[self.streamData]; node: BufferNodeHandle _ selfData.currentNode ; c: CHAR; fileLength: INT ; IF selfData.index >= node.dataBytesInBuffer THEN { <> <> fileLength _ EstablishFileLength[fileData: selfData.fileData ] ; <> <> <> <> <> IF fileLength <= selfData.index+node.firstFileByteInBuffer THEN ERROR IO.EndOfStream[self]; <> <> <> <> IF selfData.index = node.bufferBytes THEN node _ AdvanceBuffer[selfData] }; TRUSTED{c _ node.buffer[selfData.index]}; selfData.index _ selfData.index + 1; selfData _ NIL ; RETURN[c] ; }; PutChar: PUBLIC PROC [self: STREAM, char: CHAR] = { ENABLE FS.Error => { convertFStoIOError [self, error]; }; selfData: FSDataHandle _ NARROW[self.streamData]; node: BufferNodeHandle _ selfData.currentNode ; IF selfData.index = node.bufferBytes THEN node _ AdvanceBuffer[selfData]; TRUSTED{node.buffer[selfData.index] _ char}; selfData.index _ selfData.index + 1; node.didPut _ TRUE ; selfData _ NIL ; }; <> AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE { RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]]; }; GetBlock: PUBLIC PROC [self: STREAM, block: REF TEXT, startIndex: NAT, count: NAT] RETURNS [nBytesRead: NAT] = TRUSTED { ENABLE FS.Error => { convertFStoIOError [self, error]; }; selfData: FSDataHandle _ NARROW[self.streamData]; textBlock: PrincOps.ByteBltBlock; countRemaining: NAT; stopIndexPlusOne: NAT = MIN [block.maxLength, AddNAT[startIndex, count]]; textBlock _ [ blockPointer: LOOPHOLE[block, LONG POINTER] + TEXT[0].SIZE, startIndex: startIndex, stopIndexPlusOne: stopIndexPlusOne]; countRemaining _ IF startIndex > stopIndexPlusOne THEN 0 ELSE stopIndexPlusOne-startIndex; nBytesRead _ 0; WHILE countRemaining # 0 DO bufferBlock: PrincOps.ByteBltBlock _ [ blockPointer: selfData.currentNode.buffer, startIndex: selfData.index, stopIndexPlusOne: selfData.currentNode.dataBytesInBuffer]; countTransferred: CARDINAL _ 0; IF bufferBlock.startIndex < bufferBlock.stopIndexPlusOne THEN countTransferred _ PrincOpsUtils.ByteBlt[from: bufferBlock, to: textBlock]; selfData.index _ selfData.index + countTransferred; nBytesRead _ nBytesRead + countTransferred; IF (countRemaining _ countRemaining - countTransferred) = 0 THEN EXIT; <> <> <> IF EstablishFileLength[fileData: selfData.fileData ] <= selfData.index + selfData.currentNode.firstFileByteInBuffer THEN EXIT; textBlock.startIndex _ textBlock.startIndex + countTransferred; <> <> IF selfData.index = selfData.currentNode.bufferBytes THEN [] _ AdvanceBuffer[selfData]; ENDLOOP; IF nBytesRead # 0 THEN block.length _ startIndex + nBytesRead; selfData _ NIL ; RETURN[nBytesRead] }; PutBlock: PUBLIC PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT, count: NAT] = TRUSTED { ENABLE FS.Error => { convertFStoIOError [self, error]; }; selfData: FSDataHandle _ NARROW[self.streamData]; <> textBlock: PrincOps.ByteBltBlock; countRemaining: NAT; stopIndexPlusOne: NAT _ AddNAT[startIndex, count]; IF stopIndexPlusOne > block.maxLength THEN stopIndexPlusOne _ block.length; textBlock _ [ blockPointer: LOOPHOLE[block, LONG POINTER] + TEXT[0].SIZE, startIndex: startIndex, stopIndexPlusOne: stopIndexPlusOne]; countRemaining _ IF startIndex > stopIndexPlusOne THEN 0 ELSE stopIndexPlusOne-startIndex; WHILE countRemaining # 0 DO bufferBlock: PrincOps.ByteBltBlock _ [ blockPointer: selfData.currentNode.buffer, startIndex: selfData.index, stopIndexPlusOne: selfData.currentNode.bufferBytes]; -- allow put past current eof. countTransferred: CARDINAL _ PrincOpsUtils.ByteBlt[from: textBlock, to: bufferBlock]; selfData.index _ selfData.index + countTransferred; selfData.currentNode.didPut _ TRUE; IF (countRemaining _ countRemaining - countTransferred) = 0 THEN EXIT; textBlock.startIndex _ textBlock.startIndex + countTransferred; [] _ AdvanceBuffer[selfData]; ENDLOOP; selfData _ NIL ; }; maxWordsMoved: INT = (LAST[CARDINAL] / bytesPerWord) - 1; maxBytesMoved: INT = maxWordsMoved * bytesPerWord; maxStopIndexPlusOne: INT = maxBytesMoved ; <> <> UnsafeGetBlock: PUBLIC UNSAFE PROC [self: STREAM, block: UnsafeBlock] RETURNS [nBytesRead: INT] = UNCHECKED { ENABLE FS.Error => { convertFStoIOError [self, error]; }; selfData: FSDataHandle _ NARROW[self.streamData]; textBlock: PrincOps.ByteBltBlock; stopIndexPlusOne: INT; IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault; IF block.count = 0 THEN { selfData _ NIL ; RETURN [0]; }; IF block.startIndex > maxBytesMoved THEN { <> wordOffset: INT = block.startIndex / bytesPerWord; block.base _ block.base + wordOffset; block.startIndex _ block.startIndex - wordOffset*bytesPerWord; }; stopIndexPlusOne _ block.startIndex + block.count; nBytesRead _ 0; DO <> <> countRemaining: CARDINAL; textBlock _ [ blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: MIN[maxStopIndexPlusOne, stopIndexPlusOne]]; countRemaining _ textBlock.stopIndexPlusOne - textBlock.startIndex; < 0>> <> <> DO bufferBlock: PrincOps.ByteBltBlock _ [ blockPointer: selfData.currentNode.buffer, startIndex: selfData.index, stopIndexPlusOne: selfData.currentNode.dataBytesInBuffer]; countTransferred: CARDINAL _ 0; IF bufferBlock.startIndex < bufferBlock.stopIndexPlusOne THEN countTransferred _ PrincOpsUtils.ByteBlt[from: bufferBlock, to: textBlock]; selfData.index _ selfData.index + countTransferred; nBytesRead _ nBytesRead + countTransferred; IF (countRemaining _ countRemaining - countTransferred) = 0 THEN EXIT; IF EstablishFileLength[fileData: selfData.fileData ] <= selfData.index + selfData.currentNode.firstFileByteInBuffer THEN { selfData _ NIL ; GOTO return; }; textBlock.startIndex _ textBlock.startIndex + countTransferred; IF selfData.index = selfData.currentNode.bufferBytes THEN [] _ AdvanceBuffer[selfData]; ENDLOOP; IF textBlock.stopIndexPlusOne = stopIndexPlusOne THEN { selfData _ NIL ; GOTO return; }; <> block.base _ block.base + maxWordsMoved; block.startIndex _ 0; stopIndexPlusOne _ stopIndexPlusOne - maxBytesMoved; ENDLOOP; EXITS return => RETURN [nBytesRead] }; UnsafePutBlock: PUBLIC PROC [self: STREAM, block: UnsafeBlock] = TRUSTED { ENABLE FS.Error => { convertFStoIOError [self, error]; }; selfData: FSDataHandle _ NARROW[self.streamData]; textBlock: PrincOps.ByteBltBlock; stopIndexPlusOne: INT; IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault; IF block.startIndex > maxBytesMoved THEN { <> wordOffset: INT = block.startIndex / bytesPerWord; block.base _ block.base + wordOffset; block.startIndex _ block.startIndex - wordOffset*bytesPerWord; }; stopIndexPlusOne _ block.startIndex + block.count; DO <> <> countRemaining: CARDINAL; textBlock _ [ blockPointer: block.base, startIndex: block.startIndex, stopIndexPlusOne: MIN[maxStopIndexPlusOne, stopIndexPlusOne]]; countRemaining _ textBlock.stopIndexPlusOne - textBlock.startIndex; < 0>> <> DO bufferBlock: PrincOps.ByteBltBlock _ [ blockPointer: selfData.currentNode.buffer, startIndex: selfData.index, stopIndexPlusOne: selfData.currentNode.bufferBytes]; -- allow put past current eof. countTransferred: CARDINAL _ PrincOpsUtils.ByteBlt[from: textBlock, to: bufferBlock]; selfData.index _ selfData.index + countTransferred; selfData.currentNode.didPut _ TRUE; IF (countRemaining _ countRemaining - countTransferred) = 0 THEN EXIT; textBlock.startIndex _ textBlock.startIndex + countTransferred; [] _AdvanceBuffer[selfData]; ENDLOOP; IF textBlock.stopIndexPlusOne = stopIndexPlusOne THEN EXIT; <> block.base _ block.base + maxWordsMoved; block.startIndex _ 0; stopIndexPlusOne _ stopIndexPlusOne - maxBytesMoved; ENDLOOP ; selfData _ NIL ; }; AdvanceBuffer: PROC [fsData: FSDataHandle] RETURNS [node: BufferNodeHandle]= { <> <> <> <> fileData: FileDataHandle = fsData.fileData; firstByteOfNextPage: INT = fsData.currentNode.firstFileByteInBuffer + fsData.currentNode.bufferBytes; changeSize: BOOL _ FALSE; IF firstByteOfNextPage = maxLength THEN ERROR IO.Error[$Failure, NIL]; IF fsData.isWriteStream THEN CleanupAfterPut[fileData: fileData, selfData: fsData]; IF firstByteOfNextPage >= fileData.byteSize THEN { newSize: INT _ 0; IF fileData.extendFileProc # NIL THEN newSize _ fileData.extendFileProc[firstByteOfNextPage] ; fileData.byteSize _ IF newSize # 0 THEN MAX[newSize, firstByteOfNextPage] ELSE fileData.byteSize + MAX[minFileExtend, ((fileData.byteSize/10)/bytesPerFilePage)*bytesPerFilePage]; SetFileSize[fileData.fileHandle, fileData.byteSize] }; node _ SetupBuffer[fileData: fileData, fsData: fsData, fileByte: firstByteOfNextPage]; fsData.index _ LowHalf[firstByteOfNextPage-fsData.currentNode.firstFileByteInBuffer]; fsData _ NIL ; }; EndOf: PUBLIC PROC [self: STREAM] RETURNS[BOOL] = { selfData: FSDataHandle _ NARROW[self.streamData]; node: BufferNodeHandle _ selfData.currentNode ; <> IF selfData.index >= node.dataBytesInBuffer THEN { <> fileLength: INT = EstablishFileLength[fileData: selfData.fileData]; IF fileLength <= selfData.index+node.firstFileByteInBuffer THEN RETURN[TRUE]; }; selfData _ NIL ; RETURN[FALSE]; }; CharsAvail: PUBLIC PROC [self: STREAM, wait: BOOL] RETURNS [INT] = { RETURN[INT.LAST] }; GetIndex: PUBLIC PROC [self: STREAM] RETURNS [index: INT] = { selfData: FSDataHandle _ NARROW[self.streamData]; index _ selfData.currentNode.firstFileByteInBuffer + selfData.index ; selfData _ NIL ; }; SetIndex: PUBLIC PROC [self: STREAM, index: INT] = { ENABLE FS.Error => { convertFStoIOError [self, error]; }; fsData: FSDataHandle _ NARROW[self.streamData]; currentNode: BufferNodeHandle _ fsData.currentNode ; firstBufferByte: INT _ currentNode.firstFileByteInBuffer; fileData: FileDataHandle = fsData.fileData ; fileLength: INT ; IF index < 0 THEN ERROR IO.Error[BadIndex, self]; <> <> IF fsData.isWriteStream THEN { CleanupAfterPut[fileData: fileData, selfData: fsData]; fileLength _ fileData.fileLength; } ELSE fileLength _ EstablishFileLength[fileData: fileData ]; IF index > fileLength THEN ERROR IO.EndOfStream[self]; <> IF index NOT IN [firstBufferByte .. firstBufferByte+currentNode.bufferBytes) THEN { firstBufferByte _ index - (index MOD currentNode.bufferBytes); currentNode _ SetupBuffer[fileData: fileData, fsData: fsData, fileByte: firstBufferByte]; }; fsData.index _ index - firstBufferByte; fsData _ NIL ; }; Reset: PUBLIC PROC [self: STREAM] = { SetIndex[self, GetLength[self]] }; Flush: PUBLIC PROC [self: STREAM] = { ENABLE FS.Error => { convertFStoIOError [self, error]; }; fsData: FSDataHandle _ NARROW[self.streamData]; IF fsData.isWriteStream THEN ForceOut[ fsData: fsData ]; fsData _ NIL ; }; Close: PUBLIC PROC [self: STREAM, abort: BOOL] = { ENABLE FS.Error => { convertFStoIOError [self, error]; }; fsData: FSDataHandle _ NARROW[self.streamData]; IF ~abort THEN ForceOut[ fsData: fsData ! UNWIND => { <> <> fsData: FSDataHandle _ NARROW[self.streamData]; CloseFileDataForStream[fileData: fsData.fileData, fsData: fsData, forceCleanUp: TRUE]; fsData _ NIL ; self.streamData _ NIL ; self.streamProcs _ IOUtils.closedStreamProcs; }; ]; CloseFileDataForStream[fileData: fsData.fileData, fsData: fsData]; fsData _ NIL ; self.streamData _ NIL ; self.streamProcs _ IOUtils.closedStreamProcs; }; <> GetLength: PUBLIC PROC [self: STREAM] RETURNS [length: INT] = { selfData: FSDataHandle _ NARROW[self.streamData]; IF selfData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self]; length _ EstablishFileLength[fileData: selfData.fileData ] ; selfData _ NIL ; }; clearLowBits: CARDINAL = CARDINAL.LAST-(bytesPerFilePage-1); clearHighBits: CARDINAL = (bytesPerFilePage-1); maxLength: INT = INT.LAST - bytesPerFilePage; SetLength: PUBLIC PROC [self: STREAM, length: INT] = { <> ENABLE FS.Error => { convertFStoIOError [self, error]; }; fsData: FSDataHandle _ NARROW[self.streamData]; IF fsData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self]; IF length NOT IN [0 .. maxLength] THEN ERROR IO.Error[BadIndex, self]; SetLengthUnderMonitor[fileData: fsData.fileData, length: length]; IF fsData.index+fsData.currentNode.firstFileByteInBuffer > length THEN { <> <> <> fsData.index _ 0 ; SetIndex[self: self, index: length]; }; fsData _ NIL ; }; RoundUpToPages: PROC [bytes: INT] RETURNS [INT] = INLINE { bytes _ bytes + (bytesPerFilePage-1); LOOPHOLE[bytes, LongNumber[num]].lowbits _ BITAND[LOOPHOLE[bytes, LongNumber[num]].lowbits, clearLowBits]; RETURN[bytes]; }; PagesForRoundUpBytes: PROC [bytes: INT] RETURNS [INT] = INLINE { RETURN[RoundUpToPages[bytes]/bytesPerFilePage]; }; SetLengthUnderMonitor: ENTRY PROC [fileData: FileDataHandle, length: INT] = { ENABLE UNWIND => NULL; newFileBytes: INT = RoundUpToPages[length]; oldFileLength: INT ; nowNode: BufferNodeHandle _ fileData.firstBufferNode; writeData: FSDataHandle _ fileData.writeStreamData ; IF writeData # NIL AND writeData.currentNode.didPut THEN { <> <> currentNode: BufferNodeHandle = writeData.currentNode; currentNode.bufferDirty _ TRUE; currentNode.didPut _ FALSE; IF writeData.index > currentNode.dataBytesInBuffer THEN { currentNode.dataBytesInBuffer _ writeData.index; fileData.fileLength _ currentNode.firstFileByteInBuffer + writeData.index }; currentNode.didPut _ FALSE }; oldFileLength _ fileData.fileLength ; fileData.fileLength _ length; IF length < fileData.validBytesOnDisk THEN fileData.validBytesOnDisk _ length ; <> IF length > fileData.byteSize THEN { fileData.byteSize _ newFileBytes; SetFileSize[fileData.fileHandle, fileData.byteSize]; }; <> UNTIL nowNode = NIL DO IF (nowNode.status # invalid) THEN { IF (nowNode.firstFileByteInBuffer+nowNode.bufferBytes > length) THEN { IF nowNode.firstFileByteInBuffer >= length THEN { <> <> <> nowNode.dataBytesInBuffer _ 0; nowNode.bufferDirty _ FALSE ; nowNode.didPut _ FALSE ; } ELSE { <> nowNode.dataBytesInBuffer _ length - nowNode.firstFileByteInBuffer ; IF nowNode.didPut THEN { nowNode.didPut _ FALSE ; nowNode.bufferDirty _ TRUE ; }; }; } ELSE { <> nowNode.dataBytesInBuffer _ nowNode.bufferBytes ; }; }; nowNode _ nowNode.nextBufferNode ; ENDLOOP; writeData _ NIL ; }; EraseChar: PUBLIC PROC [self: STREAM, char: CHAR] = { index: INT = GetIndex[self]; IF index = 0 THEN ERROR IO.Error[IllegalBackup, self]; SetIndex[self, index-1]; IF GetChar[self] # char THEN {PutChar[self, '\\]; PutChar[self, char]} ELSE SetIndex[self, index-1] }; Backup: PUBLIC PROC [self: STREAM, char: CHAR] = { selfData: FSDataHandle _ NARROW[self.streamData]; index: INT; IF selfData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self]; index _ GetIndex[self]; IF index = 0 THEN ERROR IO.Error[IllegalBackup, self]; SetIndex[self, index-1]; IF GetChar[self] # char THEN ERROR IO.Error[IllegalBackup, self]; SetIndex[self, index-1]; selfData _ NIL ; }; CloseFileDataForStream: ENTRY PROC [fileData: FileDataHandle, fsData: FSDataHandle, forceCleanUp: BOOL _ FALSE] = { <> <> ENABLE UNWIND => BROADCAST fileData.somethingHappened; needDeleted: INT _ IF fileData.numberOfStreams = 0 THEN INT.LAST ELSE fileData.streamBufferParms.nBuffers ; lastNode: BufferNodeHandle _ NIL ; node: BufferNodeHandle ; IF (node _ fsData.readAheadNode) # NIL THEN { <> WHILE node.useCount = 1 AND node.status # valid AND node.status # invalid AND node.status # needsSequentialRead DO WAIT fileData.somethingHappened ; ENDLOOP; node.useCount _ node.useCount-1 ; }; fsData.readAheadNode _ NIL; IF (node _ fsData.currentNode) # NIL THEN node.useCount _ node.useCount-1 ; fsData.currentNode _ NIL ; IF ~forceCleanUp AND fsData.isWriteStream AND fileData.accessRights = $write AND fileData.streamOptions[truncatePagesOnClose] THEN { SetFileSize[fileData.fileHandle, fileData.fileLength] ; }; <> <> node _ fileData.firstBufferNode ; UNTIL node = NIL OR needDeleted = 0 DO IF node.useCount = 0 THEN { TRUSTED{VM.Free[node.bufferInterval]}; IF fileData.firstBufferNode = node THEN fileData.firstBufferNode _ node.nextBufferNode ELSE lastNode.nextBufferNode _ node.nextBufferNode ; needDeleted _ needDeleted - 1 ; } ELSE { lastNode _ node ; }; node _ node.nextBufferNode ; ENDLOOP; fsData.streamIsClosed _ TRUE; IF (fileData.numberOfStreams _ fileData.numberOfStreams - 1) = 0 THEN { fileDataTemp: FS.OpenFile _ fileData.fileHandle ; fileData.fileHandle _ FS.nullOpenFile ; IF fileData.streamOptions[closeFSOpenFileOnClose] THEN { fileDataTemp.Close[! FS.Error => IF forceCleanUp THEN CONTINUE;]; -- let finialization close the file is we fail. }; }; BROADCAST fileData.somethingHappened ; fsData _ NIL ; }; <> <> <> <> FinishWrites: ENTRY PROC [fileData: FileDataHandle, fsData: FSDataHandle, currentNode: BufferNodeHandle] = { ENABLE UNWIND => NULL; nowNode: BufferNodeHandle _ fileData.firstBufferNode; IF fileData.accessRights = read THEN RETURN ; IF currentNode = NIL THEN { -- this is only true when called via ForceOut <> WHILE fileData.writeCount # 0 DO WAIT fileData.somethingHappened ; ENDLOOP; }; UNTIL nowNode = NIL DO IF nowNode # currentNode THEN { WHILE (nowNode.status = parallelReadActive) OR (nowNode.status = needsParallelRead) OR (nowNode.status = parallelWriteActive) OR (nowNode.status = needsParallelWrite) DO WAIT fileData.somethingHappened; ENDLOOP; IF (nowNode.status = needsSequentialWrite OR (currentNode = NIL AND nowNode.status # invalid)) THEN { <> <> <> <> <> nowNode.status _ sequentialWriteActive ; TRUSTED{WriteFilePages[f: fsData.fileData.fileHandle, node: nowNode ! UNWIND => nowNode.status _ valid] }; nowNode.bufferDirty _ FALSE ; nowNode.status _ valid ; }; }; nowNode _ nowNode.nextBufferNode ; ENDLOOP; fsData _ NIL ; }; nodeNowAvailable: ENTRY PROC [fileData: FileDataHandle] RETURNS [BOOL]= { ENABLE UNWIND => NULL; nowNode: BufferNodeHandle ; DO nowNode _ fileData.firstBufferNode; UNTIL nowNode = NIL DO IF ( nowNode.status = valid OR nowNode.status = invalid) AND nowNode.useCount = 0 THEN RETURN [TRUE]; IF nowNode.status = needsSequentialWrite THEN RETURN[FALSE]; nowNode _ nowNode.nextBufferNode ; ENDLOOP; WAIT fileData.somethingHappened; ENDLOOP; }; FinishRead: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle, bufferSize: INT] = INLINE { ENABLE UNWIND => NULL; node.status _ valid; node.dataBytesInBuffer _ bufferSize ; BROADCAST fileData.somethingHappened; }; FinishBadRead: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] = INLINE { ENABLE UNWIND => NULL; node.status _ invalid; node.dataBytesInBuffer _ 0 ; BROADCAST fileData.somethingHappened; }; <> <> FinishBadPreRead: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] = INLINE { ENABLE UNWIND => NULL; IF (fileData.firstReadStream # NIL AND fileData.firstReadStream.currentNode = node) OR (fileData.writeStreamData # NIL AND fileData.writeStreamData.currentNode = node) THEN { node.status _ needsSequentialRead; node.dataBytesInBuffer _ 0 ; BROADCAST fileData.somethingHappened; } ELSE { IF fileData.firstReadStream # NIL AND fileData.firstReadStream.readAheadNode = node THEN { fileData.firstReadStream.readAheadNode _ NIL ; node.useCount _ 0 ; }; IF fileData.writeStreamData # NIL AND fileData.writeStreamData.readAheadNode = node THEN { fileData.writeStreamData.readAheadNode _ NIL ; node.useCount _ 0 ; }; node.status _ invalid; BROADCAST fileData.somethingHappened; }; }; markNodeNotWritten: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] = INLINE { ENABLE UNWIND => NULL; node.status _ needsSequentialWrite ; node.bufferDirty _ TRUE ; fileData.writeCount _ fileData.writeCount - 1; BROADCAST fileData.somethingHappened; }; markNodeWritten: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] = INLINE { ENABLE UNWIND => NULL; node.status _ valid ; fileData.writeCount _ fileData.writeCount - 1; BROADCAST fileData.somethingHappened; }; bumpWriteCount: ENTRY PROC [fileData: FileDataHandle] = INLINE { ENABLE UNWIND => NULL; fileData.writeCount _ fileData.writeCount + 1; }; WaitForOneBufferNotWriting: ENTRY PROC [fileData: FileDataHandle] = INLINE { ENABLE UNWIND => NULL; WHILE fileData.writeCount >= fileData.streamBufferParms.nBuffers DO WAIT fileData.somethingHappened; ENDLOOP; }; WaitForParallelWriteToComplete: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] = INLINE { ENABLE UNWIND => NULL; WHILE node.status = needsParallelWrite OR node.status = parallelWriteActive DO WAIT fileData.somethingHappened; ENDLOOP; }; ProcessNode: PUBLIC PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = { IF node.status = needsParallelRead THEN { node.status _ parallelReadActive ; ReadAhead [node: node, fileData: fileData]; RETURN ; }; IF node.status = needsParallelWrite THEN { node.status _ parallelWriteActive ; parallelWriteBuffer [ node: node, fileData: fileData ] ; RETURN ; }; ERROR ; }; parallelWriteBuffer: PROC [node: BufferNodeHandle, fileData: FileDataHandle] = { ENABLE FS.Error => { <> <> <> markNodeNotWritten[fileData: fileData, node: node]; GOTO done; }; TRUSTED{WriteFilePages[f: fileData.fileHandle, node: node] }; markNodeWritten[fileData: fileData, node: node]; EXITS done => RETURN }; ReadAhead: PROC [node: BufferNodeHandle, fileData: FileDataHandle] = { ENABLE FS.Error => { <> FinishBadPreRead[fileData: fileData, node: node]; GOTO done; }; bytesToRead: INT ; fileByte: INT = node.firstFileByteInBuffer ; IF (bytesToRead _ MIN[fileData.fileLength - fileByte, node.bufferBytes]) > 0 THEN TRUSTED{ReadFilePages[f: fileData.fileHandle, from: fileByte, numPages: PagesForRoundUpBytes[bytesToRead], to: node.buffer, interval: node.bufferInterval]}; FinishRead[fileData: fileData, node: node, bufferSize: bytesToRead]; EXITS done => RETURN }; SetupBuffer: PUBLIC PROC [fileData: FileDataHandle, fsData: FSDataHandle, fileByte: INT] RETURNS [currentNode: BufferNodeHandle] = { <> <> <> <> <> <> <> node: BufferNodeHandle _ fsData.currentNode ; readAheadNode: BufferNodeHandle; currentNodeStatus: FileStreamPrivate.NodeStatus; success: BOOL _ FALSE ; IF node = NIL THEN node _ fileData.firstBufferNode ; <> IF node.bufferDirty AND fsData.isWriteStream THEN { <> FinishWrites[fileData: fileData, fsData: fsData, currentNode: node]; <> IF node.dataBytesInBuffer + node.firstFileByteInBuffer > fileData.byteSize THEN { fileData.byteSize _ node.dataBytesInBuffer + node.firstFileByteInBuffer ; SetFileSize[fileData.fileHandle, fileData.byteSize] ; }; IF fileData.validBytesOnDisk < node.dataBytesInBuffer + node.firstFileByteInBuffer THEN fileData.validBytesOnDisk _ node.dataBytesInBuffer + node.firstFileByteInBuffer ; WaitForParallelWriteToComplete[fileData: fileData, node: node]; node.status _ needsParallelWrite ; node.bufferDirty _ FALSE ; bumpWriteCount[ fileData: fileData]; FileStreamPrivate.StartRequest [ fileData: fileData, node: node ] ; WaitForOneBufferNotWriting[fileData: fileData]; }; WHILE success # TRUE DO [success, currentNode, readAheadNode] _ SetUpNodes[fileData: fileData, fsData: fsData, fileByte: fileByte]; <> <> <> <<(This may not be necessary anymore since I changed the way the>> <