FileStreamImpl.mesa
Copyright Ó 1985, 1986 by Xerox Corporation. All rights reserved.
MBrown on September 17, 1983 8:39 pm
Rovner on August 15, 1983 1:02 pm
Levin on September 22, 1983 3:22 pm
Birrell on August 23, 1983 3:14 pm
Schroeder on November 28, 1983 12:36 pm
Russ Atkinson (RRA) February 4, 1985 3:09:10 pm PST
Bob Hagmann October 17, 1988 2:08:24 pm PDT
Doug Wyatt, December 12, 1986 5:19:17 pm PST
Please maintain change log at end of file.
DIRECTORY
Basics USING [ByteBlt, ByteBltBlock, bytesPerWord, LongNumber, BITAND, LowHalf, UnsafeBlock],
BasicTime USING [GMT],
FileStream USING [FinalizationProc, SetLength],
FileStreamPrivate USING[ Data, DoFinalization, FSDataHandle, BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcHandle, ProcHandleFromAccessRights, SetupBuffer, StartRequest ],
FS USING [ByteCount, BytesForPages, Close, Error, ErrorDesc, ExtendFileProc, GetInfo, GetName, InitialPosition, Lock, nullOpenFile, OpenFile, Read, SetByteCountAndCreatedTime, SetPageCount, StreamBufferParms, StreamOptions, Write],
IO USING [Close, CreateStream, CreateStreamProcs, EndOfStream, Error, GetChar, GetIndex, GetLength, SetIndex, STREAM, StreamProcs],
IOUtils USING [closedStreamProcs, StoreData],
Process USING [Detach, GetPriority, Pause, Priority, prioritySysForeground, SetPriority],
Rope USING [ROPE],
RuntimeError USING [BoundsFault],
SafeStorage USING [EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ],
VM USING [ AddressForPageNumber, Allocate, BytesForPages, CantAllocate, Free, Interval, PageNumber];
FileStreamImpl:
CEDAR
MONITOR
LOCKS fileData.lockRecord USING fileData: FileDataHandle
IMPORTS Basics, FileStream, FileStreamPrivate, FS, IO, IOUtils, Process, RuntimeError, SafeStorage, VM
EXPORTS FileStreamPrivate, FileStream = BEGIN OPEN Basics, Rope;
STREAM: TYPE = IO.STREAM;
ByteCount: TYPE = INT;
ByteNumber: TYPE = ByteCount; -- index rather than count
PageNumber: TYPE = VM.PageNumber;
PageCount: TYPE = VM.PageNumber;
bytesPerFilePage: CARDINAL = FS.BytesForPages[1];
maxVMPagesPerBuffer: INT = 65536/VM.BytesForPages[pages: 1] ;
clearLowBits: CARDINAL = CARDINAL.LAST-(bytesPerFilePage-1);
minFileExtend: INT = 10*bytesPerFilePage;
Data: TYPE = FileStreamPrivate.Data;
FSDataHandle: TYPE = FileStreamPrivate.FSDataHandle;
BufferNode: TYPE = FileStreamPrivate.BufferNode;
BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle;
FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle;
FileData: TYPE = FileStreamPrivate.FileData;
ProcHandle: TYPE = FileStreamPrivate.ProcHandle;
This code does not protect itself from parallel use of a stream by concurrent
processes. It assumes that the processes will synchronize at a higher level.
Parallel use of different streams for the same open file is expected, but the
read/write stream must be opened by StreamOpen or StreamFromOpenFile,
and read stream by StreamFromOpenStream.
Get and Put
CleanupAfterPut:
ENTRY
PROC [fileData: FileDataHandle, selfData: FSDataHandle] =
INLINE {
Restores dataBytesInBuffer and fileLength if they are messed up by a PutChar or
PutBlock past the end of file.
Same logic in SetLengthUnderMonitor.
only call this routine with a write stream
currentNode: BufferNodeHandle = selfData.currentNode;
IF currentNode.didPut
THEN {
currentNode.bufferDirty ← TRUE;
IF selfData.index > currentNode.dataBytesInBuffer
THEN {
currentNode.dataBytesInBuffer ← selfData.index;
fileData.fileLength ← currentNode.firstFileByteInBuffer + selfData.index;
};
currentNode.didPut ← FALSE;
};
EstablishFileLength:
ENTRY
PROC[fileData: FileDataHandle ]
RETURNS [fileLength:
INT] =
INLINE {
Paw through write stream info to find the file length. The new length is
the true length modulo some uncertainity whether a put was done in parallel
during the execution of this routine. The file length returned is
as least as big as the file was when the monitor was acquired. This
is fine because the notion of EOF or file length for a reader of a file that is
in the process of being written is somewhat vague. A higher level protocol
should keep this straight in the client (why are you reading bytes that
might not be there yet?). Fix up dataBytesInBuffer if needed.
(This is mostly intended to allow the read stream to look at the file size seen
by the write stream.)
ENABLE UNWIND => NULL;
writeData: FSDataHandle ← fileData.writeStreamData;
IF writeData =
NIL
OR writeData.streamIsClosed
THEN {
writeData ← NIL ;
RETURN[fileData.fileLength] ;
}
ELSE {
writeNode: BufferNodeHandle = writeData.currentNode ;
writeNode.dataBytesInBuffer ← MAX[ writeData.index, writeNode.dataBytesInBuffer] ;
fileLength ←
MAX[
fileData.fileLength,
writeNode.firstFileByteInBuffer + writeNode.dataBytesInBuffer ] ;
fileData.fileLength ← fileLength;
};
writeData ← NIL ;
};
convertFStoIOError:
PROC [self:
STREAM, error:
FS.ErrorDesc] =
INLINE {
selfData: FSDataHandle ← NARROW[self.streamData];
IF selfData #
NIL
THEN {
selfData.FSErrorDesc ← error ;
IF selfData.ConvertFStoIOErrors THEN IO.Error[$Failure, self];
}
ELSE {
IO.Error[$Failure, self];
};
};
GetChar:
PUBLIC
PROC [self:
STREAM]
RETURNS [
CHAR] = {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
node: BufferNodeHandle ← selfData.currentNode ;
c: CHAR;
fileLength: INT ;
IF bufferBad[node.status] THEN IO.Error[$Failure, self];
IF selfData.index >= node.dataBytesInBuffer
THEN {
Suspect that end-of-buffer or end-of-file has been reached.
This may be false! However, the test is cheap and usually false.
fileLength ← EstablishFileLength[fileData: selfData.fileData ] ;
File length may be wrong if writer is using the same buffer
as the reader, so get a good file length. This is not cheap: we
have to get a monitor lock and maybe look inside the write stream.
Note that we use the local variable fileLength and not
selfData.fileData.fileLength.
IF fileLength <= selfData.index+node.firstFileByteInBuffer
THEN
ERROR
IO.EndOfStream[self];
We are not at EOF. If we are at EOB, then get the next buffer.
Not EOF and not EOB can occur if the writer has put some
char's into the buffer and this was not reflected in dataBytesInBuffer
until we did the EstablishFileLength call.
IF selfData.index = node.bufferBytes THEN node ← AdvanceBuffer[selfData] };
TRUSTED{c ← node.buffer[selfData.index]};
selfData.index ← selfData.index + 1;
selfData ← NIL ;
RETURN[c] ;
};
PutChar:
PUBLIC
PROC [self:
STREAM, char:
CHAR] = {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
node: BufferNodeHandle ← selfData.currentNode ;
IF selfData.index = node.bufferBytes THEN node ← AdvanceBuffer[selfData];
IF bufferBad[node.status] THEN IO.Error[$Failure, self];
TRUSTED{node.buffer[selfData.index] ← char};
selfData.index ← selfData.index + 1;
node.didPut ← TRUE ;
selfData ← NIL ;
};
Change use to IOUtils.AddNAT when the 0+0 gives NAT.LAST bug is fixed
AddNAT:
PROC [a, b:
NAT]
RETURNS [
NAT] =
INLINE {
RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]];
};
GetBlock:
PUBLIC
PROC [self:
STREAM, block:
REF
TEXT, startIndex:
NAT,
count: NAT]
RETURNS [nBytesRead: NAT] = TRUSTED {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
textBlock: Basics.ByteBltBlock;
countRemaining: NAT;
stopIndexPlusOne: NAT = MIN [block.maxLength, AddNAT[startIndex, count]];
IF bufferBad[selfData.currentNode.status] THEN IO.Error[$Failure, self];
textBlock ← [
blockPointer: LOOPHOLE[block, LONG POINTER] + TEXT[0].SIZE,
startIndex: startIndex,
stopIndexPlusOne: stopIndexPlusOne];
countRemaining ←
IF startIndex > stopIndexPlusOne THEN 0 ELSE stopIndexPlusOne-startIndex;
nBytesRead ← 0;
WHILE countRemaining # 0
DO
bufferBlock: Basics.ByteBltBlock ← [
blockPointer: selfData.currentNode.buffer,
startIndex: selfData.index,
stopIndexPlusOne: selfData.currentNode.dataBytesInBuffer];
countTransferred: CARDINAL ← 0;
IF bufferBlock.startIndex < bufferBlock.stopIndexPlusOne
THEN
countTransferred ← Basics.ByteBlt[from: bufferBlock, to: textBlock];
selfData.index ← selfData.index + countTransferred;
nBytesRead ← nBytesRead + countTransferred;
IF (countRemaining ← countRemaining - countTransferred) = 0
THEN
EXIT;
Bytes may be added concurrently with this get. EstablishFileLength gives
a true file length (which may be different from what it was when we
started this iteration) to see if there is more data to blt.
IF EstablishFileLength[fileData: selfData.fileData ] <=
selfData.index + selfData.currentNode.firstFileByteInBuffer
THEN EXIT;
textBlock.startIndex ← textBlock.startIndex + countTransferred;
The below IF is needed for the same reason we called
EstablishFileLength above.
IF selfData.index = selfData.currentNode.bufferBytes THEN [] ← AdvanceBuffer[selfData];
ENDLOOP;
IF nBytesRead # 0 THEN block.length ← startIndex + nBytesRead;
selfData ← NIL ;
RETURN[nBytesRead] };
PutBlock:
PUBLIC
PROC [self:
STREAM, block:
REF
READONLY
TEXT, startIndex:
NAT,
count: NAT] = TRUSTED {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
Fail if startIndex<0 or stopIndexPlusOne<0.
textBlock: Basics.ByteBltBlock;
countRemaining: NAT;
stopIndexPlusOne: NAT ← AddNAT[startIndex, count];
IF bufferBad[selfData.currentNode.status] THEN IO.Error[$Failure, self];
IF stopIndexPlusOne > block.maxLength THEN stopIndexPlusOne ← block.length;
textBlock ← [
blockPointer: LOOPHOLE[block, LONG POINTER] + TEXT[0].SIZE,
startIndex: startIndex,
stopIndexPlusOne: stopIndexPlusOne];
countRemaining ←
IF startIndex > stopIndexPlusOne THEN 0 ELSE stopIndexPlusOne-startIndex;
WHILE countRemaining # 0
DO
bufferBlock: Basics.ByteBltBlock ← [
blockPointer: selfData.currentNode.buffer,
startIndex: selfData.index,
stopIndexPlusOne: selfData.currentNode.bufferBytes]; -- allow put past current eof.
countTransferred: CARDINAL ← Basics.ByteBlt[from: textBlock, to: bufferBlock];
selfData.index ← selfData.index + countTransferred;
selfData.currentNode.didPut ← TRUE;
IF (countRemaining ← countRemaining - countTransferred) = 0 THEN EXIT;
textBlock.startIndex ← textBlock.startIndex + countTransferred;
[] ← AdvanceBuffer[selfData];
ENDLOOP;
selfData ← NIL ;
};
maxWordsMoved: INT = (LAST[CARD16] / bytesPerWord) - 1;
maxBytesMoved: INT = maxWordsMoved * bytesPerWord;
maxStopIndexPlusOne:
INT = maxBytesMoved ;
all designed to make the max number of bytes transferred an integral number of
words, which is good
UnsafeGetBlock:
PUBLIC
UNSAFE
PROC [self:
STREAM, block: UnsafeBlock]
RETURNS [nBytesRead: INT] = UNCHECKED {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
textBlock: Basics.ByteBltBlock;
stopIndexPlusOne: INT;
IF bufferBad[selfData.currentNode.status] THEN IO.Error[$Failure, self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
IF block.count = 0
THEN {
selfData ← NIL ;
RETURN [0];
};
IF block.startIndex > maxBytesMoved
THEN {
scale block.startIndex into [0 .. bytesPerWord)
wordOffset: INT = block.startIndex / bytesPerWord;
block.base ← block.base + wordOffset;
block.startIndex ← block.startIndex - wordOffset*bytesPerWord;
};
stopIndexPlusOne ← block.startIndex + block.count;
nBytesRead ← 0;
DO
Transfer at most maxBytesMoved bytes from the stream to block^.
Assert block.startIndex IN [0 .. maxStopIndexPlusOne), < stopIndexPlusOne
countRemaining: CARDINAL;
textBlock ← [
blockPointer: block.base,
startIndex: block.startIndex,
stopIndexPlusOne: MIN[maxStopIndexPlusOne, stopIndexPlusOne]];
countRemaining ← textBlock.stopIndexPlusOne - textBlock.startIndex;
Assert countRemaining > 0
The following loop transfers from the stream to textBlock^ until textBlock^ is full
or end of file is reached.
DO
bufferBlock: Basics.ByteBltBlock ← [
blockPointer: selfData.currentNode.buffer,
startIndex: selfData.index,
stopIndexPlusOne: selfData.currentNode.dataBytesInBuffer];
countTransferred: CARDINAL ← 0;
IF bufferBlock.startIndex < bufferBlock.stopIndexPlusOne
THEN
countTransferred ← Basics.ByteBlt[from: bufferBlock, to: textBlock];
selfData.index ← selfData.index + countTransferred;
nBytesRead ← nBytesRead + countTransferred;
IF (countRemaining ← countRemaining - countTransferred) = 0 THEN EXIT;
IF EstablishFileLength[fileData: selfData.fileData ] <=
selfData.index + selfData.currentNode.firstFileByteInBuffer
THEN {
selfData ← NIL ;
GOTO return;
};
textBlock.startIndex ← textBlock.startIndex + countTransferred;
IF selfData.index = selfData.currentNode.bufferBytes THEN [] ← AdvanceBuffer[selfData];
ENDLOOP;
IF textBlock.stopIndexPlusOne = stopIndexPlusOne
THEN {
selfData ← NIL ;
GOTO return;
};
Assert textBlock.stopIndexPlusOne = maxStopIndexPlusOne
block.base ← block.base + maxWordsMoved;
block.startIndex ← 0;
stopIndexPlusOne ← stopIndexPlusOne - maxBytesMoved;
ENDLOOP;
EXITS
return => RETURN [nBytesRead]
};
UnsafePutBlock:
PUBLIC
PROC [self:
STREAM, block: UnsafeBlock] =
TRUSTED {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
textBlock: Basics.ByteBltBlock;
stopIndexPlusOne: INT;
IF bufferBad[selfData.currentNode.status] THEN IO.Error[$Failure, self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
IF block.startIndex > maxBytesMoved
THEN {
scale block.startIndex into [0 .. bytesPerWord)
wordOffset: INT = block.startIndex / bytesPerWord;
block.base ← block.base + wordOffset;
block.startIndex ← block.startIndex - wordOffset*bytesPerWord;
};
stopIndexPlusOne ← block.startIndex + block.count;
DO
Transfer at most maxBytesMoved bytes from block^ to the stream.
Assert block.startIndex IN [0 .. maxStopIndexPlusOne), < stopIndexPlusOne
countRemaining: CARDINAL;
textBlock ← [
blockPointer: block.base,
startIndex: block.startIndex,
stopIndexPlusOne: MIN[maxStopIndexPlusOne, stopIndexPlusOne]];
countRemaining ← textBlock.stopIndexPlusOne - textBlock.startIndex;
Assert countRemaining > 0
The following loop transfers textBlock^ to the stream.
DO
bufferBlock: Basics.ByteBltBlock ← [
blockPointer: selfData.currentNode.buffer,
startIndex: selfData.index,
stopIndexPlusOne: selfData.currentNode.bufferBytes]; -- allow put past current eof.
countTransferred: CARDINAL ← Basics.ByteBlt[from: textBlock, to: bufferBlock];
selfData.index ← selfData.index + countTransferred;
selfData.currentNode.didPut ← TRUE;
IF (countRemaining ← countRemaining - countTransferred) = 0 THEN EXIT;
textBlock.startIndex ← textBlock.startIndex + countTransferred;
[] vanceBuffer[selfData];
ENDLOOP;
IF textBlock.stopIndexPlusOne = stopIndexPlusOne THEN EXIT;
Assert textBlock.stopIndexPlusOne = maxStopIndexPlusOne
block.base ← block.base + maxWordsMoved;
block.startIndex ← 0;
stopIndexPlusOne ← stopIndexPlusOne - maxBytesMoved;
ENDLOOP ;
selfData ← NIL ;
};
AdvanceBuffer:
PROC [fsData: FSDataHandle]
RETURNS [node: BufferNodeHandle]= {
On entry, index = dataBytesInBuffer = bufferBytes. Exit with same position in
file, but index < dataBytesInBuffer or EOF.
Handles implicit file extension.
Called from GetChar, PutChar, GetBlock, PutBlock, UnsafeGetBlock, UnsafePutBlock.
fileData: FileDataHandle = fsData.fileData;
firstByteOfNextPage:
INT = fsData.currentNode.firstFileByteInBuffer +
fsData.currentNode.bufferBytes;
changeSize: BOOL ← FALSE;
IF firstByteOfNextPage = maxLength THEN ERROR IO.Error[$Failure, NIL];
IF fsData.isWriteStream THEN CleanupAfterPut[fileData: fileData, selfData: fsData];
IF firstByteOfNextPage >= fileData.byteSize
THEN {
newSize: INT ← 0;
IF fileData.extendFileProc #
NIL
THEN
newSize ← fileData.extendFileProc[firstByteOfNextPage] ;
fileData.byteSize ←
IF newSize # 0
THEN
MAX[newSize, firstByteOfNextPage]
ELSE fileData.byteSize +
MAX[minFileExtend,
((fileData.byteSize/10)/bytesPerFilePage)*bytesPerFilePage];
SetFileSize[fileData.fileHandle, fileData.byteSize] };
node ← SetupBuffer[fileData: fileData, fsData: fsData, fileByte: firstByteOfNextPage];
fsData.index ← LowHalf[firstByteOfNextPage-fsData.currentNode.firstFileByteInBuffer];
fsData ← NIL ;
};
EndOf:
PUBLIC
PROC [self:
STREAM]
RETURNS[
BOOL] = {
selfData: FSDataHandle ← NARROW[self.streamData];
node: BufferNodeHandle ← selfData.currentNode ;
do cheap test to see if not at EOF
IF selfData.index >= node.dataBytesInBuffer
THEN {
Cheap test inconclusive. Find real file length.
fileLength: INT = EstablishFileLength[fileData: selfData.fileData];
IF fileLength <= selfData.index+node.firstFileByteInBuffer
THEN RETURN[TRUE];
};
selfData ← NIL ;
RETURN[FALSE];
};
CharsAvail:
PUBLIC
PROC [self:
STREAM, wait:
BOOL]
RETURNS [
INT] = {
RETURN[INT.LAST] };
GetIndex:
PUBLIC
PROC [self:
STREAM]
RETURNS [index:
INT] = {
selfData: FSDataHandle ← NARROW[self.streamData];
index ← selfData.currentNode.firstFileByteInBuffer + selfData.index ;
selfData ← NIL ;
};
SetIndex:
PUBLIC
PROC [self:
STREAM, index:
INT] = {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
fsData: FSDataHandle ← NARROW[self.streamData];
currentNode: BufferNodeHandle ← fsData.currentNode ;
firstBufferByte: INT ← currentNode.firstFileByteInBuffer;
fileData: FileDataHandle = fsData.fileData ;
fileLength: INT ;
IF index < 0
THEN
ERROR
IO.Error[BadIndex, self];
Make sure dataBytesInBuffer and fileLength are correct by calling
CleanupAfterPut or EstablishFileLength
IF fsData.isWriteStream
THEN {
CleanupAfterPut[fileData: fileData, selfData: fsData];
fileLength ← fileData.fileLength; }
ELSE fileLength ← EstablishFileLength[fileData: fileData ];
IF index > fileLength THEN ERROR IO.EndOfStream[self];
ensure that page containing byte "index" is in the buffer
IF index
NOT
IN [firstBufferByte .. firstBufferByte+currentNode.bufferBytes)
THEN {
firstBufferByte ← index - (index MOD currentNode.bufferBytes);
currentNode ← SetupBuffer[fileData: fileData, fsData: fsData, fileByte: firstBufferByte];
};
fsData.index ← index - firstBufferByte;
fsData ← NIL ;
};
Reset:
PUBLIC
PROC [self:
STREAM] = {
SetIndex[self, GetLength[self]] };
Flush:
PUBLIC
PROC [self:
STREAM] = {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
fsData: FSDataHandle ← NARROW[self.streamData];
IF fsData.isWriteStream THEN ForceOut[ fsData: fsData ];
fsData ← NIL ;
};
Close:
PUBLIC
PROC [self:
STREAM, abort:
BOOL] = {
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
fsData: FSDataHandle ← NARROW[self.streamData];
IF ~abort
THEN ForceOut[ fsData: fsData !
UNWIND => {
I would like to do a CONTINUE, but that is a "GOTO" and thus will stop the UNWIND here.
There is some bug that confuses the UNWIND machinery that trashes fsData from the local frame. We recompute it here.
fsData: FSDataHandle ← NARROW[self.streamData];
CloseFileDataForStream[fileData: fsData.fileData, fsData: fsData, forceCleanUp: TRUE];
fsData ← NIL ;
self.streamData ← NIL ;
self.streamProcs ← IOUtils.closedStreamProcs;
};
];
CloseFileDataForStream[fileData: fsData.fileData, fsData: fsData];
fsData ← NIL ;
self.streamData ← NIL ;
self.streamProcs ← IOUtils.closedStreamProcs;
};
Procs that are called via the property list mechanism.
GetLength:
PUBLIC
PROC [self:
STREAM]
RETURNS [length:
INT] = {
selfData: FSDataHandle ← NARROW[self.streamData];
IF selfData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self];
length ← EstablishFileLength[fileData: selfData.fileData ] ;
selfData ← NIL ;
};
clearHighBits: CARDINAL = (bytesPerFilePage-1);
maxLength:
INT =
INT.
LAST - bytesPerFilePage;
SetLength:
PUBLIC
PROC [self:
STREAM, length:
INT] = {
Note: do not reduce the size of a shortened file until stream closed.
ENABLE
FS.Error => {
convertFStoIOError [self, error];
};
fsData: FSDataHandle ← NARROW[self.streamData];
IF fsData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self];
IF length NOT IN [0 .. maxLength] THEN ERROR IO.Error[BadIndex, self];
SetLengthUnderMonitor[fileData: fsData.fileData, length: length];
IF fsData.index+fsData.currentNode.firstFileByteInBuffer > length
THEN {
If old index was past EOF, then move it to EOF.
We leave the cloned read stream alone: if it does not do a setPosition then
it will get an EOF on its next read.
fsData.index ← 0 ;
SetIndex[self: self, index: length];
};
fsData ← NIL ;
};
RoundUpToPages:
PROC [bytes:
INT]
RETURNS [
INT] =
INLINE {
bytes ← bytes + (bytesPerFilePage-1);
LOOPHOLE[bytes, LongNumber[pair]].lo ←
BITAND[LOOPHOLE[bytes, LongNumber[pair]].lo, clearLowBits];
RETURN[bytes];
};
PagesForRoundUpBytes:
PROC [bytes:
INT]
RETURNS [
INT] =
INLINE {
RETURN[RoundUpToPages[bytes]/bytesPerFilePage];
};
SetLengthUnderMonitor:
ENTRY
PROC [fileData: FileDataHandle, length:
INT] = {
ENABLE UNWIND => NULL;
newFileBytes: INT = RoundUpToPages[length];
oldFileLength: INT ;
nowNode: BufferNodeHandle ← fileData.firstBufferNode;
writeData: FSDataHandle ← fileData.writeStreamData ;
IF writeData #
NIL
AND writeData.currentNode.didPut
THEN {
CleanupAfterPut logic is copied here, but we cannot call CleanupAfterPut
because it is an ENTRY
currentNode: BufferNodeHandle = writeData.currentNode;
currentNode.bufferDirty ← TRUE;
currentNode.didPut ← FALSE;
IF writeData.index > currentNode.dataBytesInBuffer
THEN {
currentNode.dataBytesInBuffer ← writeData.index;
fileData.fileLength ← currentNode.firstFileByteInBuffer + writeData.index };
currentNode.didPut ← FALSE };
oldFileLength ← fileData.fileLength ;
fileData.fileLength ← length;
IF length < fileData.validBytesOnDisk
THEN fileData.validBytesOnDisk ← length ;
grow file if needed
IF length > fileData.byteSize
THEN {
fileData.byteSize ← newFileBytes;
SetFileSize[fileData.fileHandle, fileData.byteSize];
};
Look through nodes and adjust those past EOF or with EOF in buffer
UNTIL nowNode =
NIL
DO
IF (nowNode.status # invalid)
THEN {
IF (nowNode.firstFileByteInBuffer+nowNode.bufferBytes > length)
THEN {
IF nowNode.firstFileByteInBuffer >= length
THEN {
All of the buffer is past EOF.
Set dataBytesInBuffer to 0 so that gets will find themselves at EOF,
and clean the node to avoid redundant write
nowNode.dataBytesInBuffer ← 0;
nowNode.bufferDirty ← FALSE ;
nowNode.didPut ← FALSE ;
}
ELSE {
EOF is in (or just past) this buffer
nowNode.dataBytesInBuffer ← length - nowNode.firstFileByteInBuffer ;
IF nowNode.didPut
THEN {
nowNode.didPut ← FALSE ;
nowNode.bufferDirty ← TRUE ;
};
};
}
ELSE {
all of node is in the file
nowNode.dataBytesInBuffer ← nowNode.bufferBytes ;
};
};
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
writeData ← NIL ;
};
EraseChar:
PUBLIC
PROC [self:
STREAM, char:
CHAR] = {
index: INT = GetIndex[self];
IF index = 0 THEN ERROR IO.Error[IllegalBackup, self];
SetIndex[self, index-1];
IF GetChar[self] # char THEN {PutChar[self, '\\]; PutChar[self, char]}
ELSE SetIndex[self, index-1] };
Backup:
PUBLIC
PROC [self:
STREAM, char:
CHAR] = {
selfData: FSDataHandle ← NARROW[self.streamData];
index: INT;
IF selfData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self];
index ← GetIndex[self];
IF index = 0 THEN ERROR IO.Error[IllegalBackup, self];
SetIndex[self, index-1];
IF GetChar[self] # char THEN ERROR IO.Error[IllegalBackup, self];
SetIndex[self, index-1];
selfData ← NIL ;
};
CloseFileDataForStream:
ENTRY
PROC [fileData: FileDataHandle, fsData: FSDataHandle, forceCleanUp:
BOOL ←
FALSE] = {
Processing for "Close" that must be done under the monitor.
forceCleanUp being TRUE is when Close has caught UNWIND, and we should do all we can to blow away the data structures.
ENABLE UNWIND => BROADCAST fileData.somethingHappened;
needDeleted:
INT ←
IF fileData.numberOfStreams = 0
THEN INT.LAST
ELSE fileData.streamBufferParms.nBuffers ;
lastNode: BufferNodeHandle ← NIL ;
node: BufferNodeHandle ;
IF (node ← fsData.readAheadNode) #
NIL
THEN {
make sure the read-ahead has completed
WHILE node.useCount = 1
AND node.status # valid
AND
node.status # invalid
AND node.status # needsSequentialRead
DO
WAIT fileData.somethingHappened ;
ENDLOOP;
node.useCount ← node.useCount-1 ;
};
fsData.readAheadNode ← NIL;
IF ~forceCleanUp
AND fsData.isWriteStream
AND fileData.accessRights = $write
AND
fileData.streamOptions[truncatePagesOnClose]
THEN {
SetFileSize[fileData.fileHandle, fileData.fileLength] ;
};
We are now committed to closing the stream. Our last non-internal operation has completed (the SetFileSize).
IF (node ← fsData.currentNode) # NIL THEN node.useCount ← node.useCount-1 ;
fsData.currentNode ← NIL ;
look for up to two buffers to free if another stream is around
else, free all buffers
node ← fileData.firstBufferNode ;
UNTIL node =
NIL
OR needDeleted = 0
DO
IF node.useCount = 0
THEN {
TRUSTED{VM.Free[node.bufferInterval]};
IF fileData.firstBufferNode = node
THEN fileData.firstBufferNode ← node.nextBufferNode
ELSE lastNode.nextBufferNode ← node.nextBufferNode ;
needDeleted ← needDeleted - 1 ;
}
ELSE {
lastNode ← node ;
};
node ← node.nextBufferNode ;
ENDLOOP;
fsData.streamIsClosed ← TRUE;
IF (fileData.numberOfStreams ← fileData.numberOfStreams - 1) = 0
THEN {
fileDataTemp: FS.OpenFile ← fileData.fileHandle ;
fileData.fileHandle ←
FS.nullOpenFile ;
IF fileData.streamOptions[closeFSOpenFileOnClose]
THEN {
fileDataTemp.Close[!
FS.Error =>
IF forceCleanUp
THEN
CONTINUE;];
-- let finialization close the file is we fail.
};
};
BROADCAST fileData.somethingHappened ;
fsData ← NIL ;
};
Insure that all buffers, except the one corresponding to the currentNode,
have been written to disk.
Normal cases are too return immediately when no writes are outstanding,
or to wait until one finishes.
FinishWrites:
ENTRY
PROC [fileData: FileDataHandle, fsData: FSDataHandle,
currentNode: BufferNodeHandle, parallelWritesOK:
BOOL ←
FALSE] = {
ENABLE UNWIND => NULL;
nowNode: BufferNodeHandle ← fileData.firstBufferNode;
IF fileData.accessRights = read THEN RETURN ;
IF currentNode =
NIL
THEN {
-- this is only true when called via ForceOut
or when SetUpNodes can't get a node
WHILE fileData.writeCount # 0
DO
WAIT fileData.somethingHappened ;
ENDLOOP;
};
UNTIL nowNode =
NIL
DO
IF nowNode # currentNode
THEN {
WHILE (nowNode.status = parallelReadActive)
OR (nowNode.status = needsParallelRead)
OR ( ~parallelWritesOK
AND ((nowNode.status = parallelWriteActive)
OR (nowNode.status = needsParallelWrite)))
DO
WAIT fileData.somethingHappened;
ENDLOOP;
IF (nowNode.status = needsSequentialWrite
OR (currentNode =
NIL
AND nowNode.status # invalid))
THEN {
What has happened is that an asynchronous write has failed,
or we are trying to flush all dirty pages in ForceOut.
The parallel process has given up, and we are about to re-do
the write under the monitor to get the signal and the stack
correct so that the client sees a correct view of the error.
nowNode.status ← sequentialWriteActive ;
TRUSTED{WriteFilePages[f: fsData.fileData.fileHandle, node: nowNode ! UNWIND => nowNode.status ← valid] };
nowNode.bufferDirty ← FALSE ;
nowNode.status ← valid ;
};
};
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
fsData ← NIL ;
};
nodeNowAvailable:
ENTRY
PROC [fileData: FileDataHandle]
RETURNS [
BOOL]= {
ENABLE UNWIND => NULL;
nowNode: BufferNodeHandle ;
DO
nowNode ← fileData.firstBufferNode;
UNTIL nowNode =
NIL
DO
IF ( nowNode.status = valid
OR nowNode.status = invalid)
AND
nowNode.useCount = 0 THEN RETURN [TRUE];
IF nowNode.status = needsSequentialWrite THEN RETURN[FALSE];
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
WAIT fileData.somethingHappened;
ENDLOOP;
};
FinishRead:
ENTRY
PROC [fileData: FileDataHandle,
node: BufferNodeHandle, bufferSize:
INT] =
INLINE {
ENABLE UNWIND => NULL;
node.status ← valid;
node.dataBytesInBuffer ← bufferSize ;
BROADCAST fileData.somethingHappened;
};
FinishBadRead:
ENTRY
PROC [fileData: FileDataHandle, node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
node.status ← invalid;
node.dataBytesInBuffer ← 0 ;
BROADCAST fileData.somethingHappened;
};
a pre-read has failed. Mark it needsSequentialRead if a stream needs it
to continue, or just ignore it if it was a real preread.
FinishBadPreRead:
ENTRY
PROC [fileData: FileDataHandle, node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
IF (fileData.firstReadStream #
NIL
AND fileData.firstReadStream.currentNode = node)
OR
(fileData.writeStreamData #
NIL
AND fileData.writeStreamData.currentNode = node)
THEN {
node.status ← needsSequentialRead;
node.dataBytesInBuffer ← 0 ;
BROADCAST fileData.somethingHappened;
}
ELSE {
IF fileData.firstReadStream #
NIL
AND fileData.firstReadStream.readAheadNode = node
THEN {
fileData.firstReadStream.readAheadNode ← NIL ;
node.useCount ← 0 ;
};
IF fileData.writeStreamData #
NIL
AND fileData.writeStreamData.readAheadNode = node
THEN {
fileData.writeStreamData.readAheadNode ← NIL ;
node.useCount ← 0 ;
};
node.status ← invalid;
BROADCAST fileData.somethingHappened;
};
};
markNodeNotWritten:
ENTRY
PROC [fileData: FileDataHandle, node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
node.status ← needsSequentialWrite ;
node.bufferDirty ← TRUE ;
fileData.writeCount ← fileData.writeCount - 1;
BROADCAST fileData.somethingHappened;
};
markNodeWritten:
ENTRY
PROC [fileData: FileDataHandle, node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
node.status ← valid ;
fileData.writeCount ← fileData.writeCount - 1;
BROADCAST fileData.somethingHappened;
};
bumpWriteCount:
ENTRY
PROC [fileData: FileDataHandle] =
INLINE {
ENABLE UNWIND => NULL;
fileData.writeCount ← fileData.writeCount + 1;
};
WaitForOneBufferNotWriting:
ENTRY
PROC [fileData: FileDataHandle] =
INLINE {
ENABLE UNWIND => NULL;
WHILE fileData.writeCount >= fileData.streamBufferParms.nBuffers
DO
WAIT fileData.somethingHappened;
ENDLOOP;
};
WaitForParallelWriteToComplete:
ENTRY
PROC [fileData: FileDataHandle,
node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
WHILE node.status = needsParallelWrite
OR node.status = parallelWriteActive
DO
WAIT fileData.somethingHappened;
ENDLOOP;
};
ProcessNode:
PUBLIC
PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = {
IF node.status = needsParallelRead
THEN {
node.status ← parallelReadActive ;
ReadAhead [node: node, fileData: fileData];
RETURN ;
};
IF node.status = needsParallelWrite
THEN {
node.status ← parallelWriteActive ;
parallelWriteBuffer [ node: node, fileData: fileData ] ;
RETURN ;
};
ERROR ;
};
parallelWriteBuffer:
PROC [node: BufferNodeHandle, fileData: FileDataHandle] = {
ENABLE
FS.Error => {
By catching and ignoring FS errors, we insure that the write will
later be done in the process of the client so that signals will
look correct.
markNodeNotWritten[fileData: fileData, node: node];
GOTO done;
};
TRUSTED{WriteFilePages[f: fileData.fileHandle, node: node] };
markNodeWritten[fileData: fileData, node: node];
ReadAhead:
PROC [node: BufferNodeHandle, fileData: FileDataHandle] = {
ENABLE
FS.Error => {
on FS errors, invalidate the pre-read
FinishBadPreRead[fileData: fileData, node: node];
GOTO done;
};
bytesToRead: INT ;
fileByte: INT = node.firstFileByteInBuffer ;
IF (bytesToRead ←
MIN[fileData.fileLength - fileByte, INT[node.bufferBytes]]) > 0 THEN
TRUSTED{ReadFilePages[f: fileData.fileHandle, from: fileByte,
numPages: PagesForRoundUpBytes[bytesToRead], to: node.buffer,
interval: node.bufferInterval]};
FinishRead[fileData: fileData, node: node, bufferSize: bytesToRead];
SetupBuffer:
PUBLIC
PROC [fileData: FileDataHandle,
fsData: FSDataHandle, fileByte:
INT]
RETURNS [currentNode: BufferNodeHandle] = {
For write streams, didPut = FALSE if on entry (someone else called CleanupAfterPut).
Arranges buffer so that fileByte (must be buffer-aligned) is the first byte in it.
If buffer is dirty, writes it to file.
Maintains invariants of dataBytesInBuffer, bufferBytes, and
firstFileByteInBuffer in the face of all this. DOES NOT update index.
Called from AdvanceBuffer, SetIndex, SetLength,
StreamFromOpenStream and StreamFromOpenFile.
node: BufferNodeHandle ← fsData.currentNode ;
readAheadNode: BufferNodeHandle;
currentNodeStatus: FileStreamPrivate.NodeStatus;
success: BOOL ← FALSE ;
IF node = NIL THEN node ← fileData.firstBufferNode ;
IF node.bufferDirty
AND fsData.isWriteStream
THEN {
See if there are buffers that must be written sequentially.
FinishWrites[fileData: fileData, fsData: fsData, currentNode: node, parallelWritesOK:
TRUE];
Extend file if we are about to write over it.
IF node.dataBytesInBuffer + node.firstFileByteInBuffer > fileData.byteSize
THEN {
fileData.byteSize ← node.dataBytesInBuffer + node.firstFileByteInBuffer ;
SetFileSize[fileData.fileHandle, fileData.byteSize] ;
};
IF fileData.validBytesOnDisk < node.dataBytesInBuffer + node.firstFileByteInBuffer
THEN
fileData.validBytesOnDisk ← node.dataBytesInBuffer + node.firstFileByteInBuffer ;
WaitForParallelWriteToComplete[fileData: fileData, node: node];
node.status ← needsParallelWrite ;
node.bufferDirty ← FALSE ;
bumpWriteCount[ fileData: fileData];
FileStreamPrivate.StartRequest [ fileData: fileData, node: node ] ;
WaitForOneBufferNotWriting[fileData: fileData];
};
WHILE success #
TRUE
DO
[success, currentNode, readAheadNode] ←
SetUpNodes[fileData: fileData, fsData: fsData, fileByte: fileByte];
Copy the status out of the node before the tests. Since the status
can change at any time, it would be possible to have none of the
arms of the SELECT executed when there was a pre-read to do.
(This may not be necessary anymore since I changed the way the
SELECT is done, but it doesn't hurt).
IF success
THEN {
currentNodeStatus ← currentNode.status ;
SELECT currentNodeStatus FROM
invalid,needsParallelRead,parallelReadActive,needsSequentialRead,sequentialReadActive=>{
IF readAheadNode =
NIL
THEN {
makeNodeValid[fileData: fileData, node: currentNode ];
}
ELSE {
myPriority: Process.Priority ;
myPriority ← Process.GetPriority[];
Process.SetPriority[Process.prioritySysForeground];
FileStreamPrivate.StartRequest [ fileData: fileData, node: readAheadNode ];
makeNodeValid[fileData: fileData, node: currentNode ];
Process.SetPriority[myPriority];
};
};
valid,needsParallelWrite,parallelWriteActive,needsSequentialWrite,sequentialWriteActive=>{
IF readAheadNode #
NIL
THEN {
FileStreamPrivate.StartRequest [ fileData: fileData, node: readAheadNode ];
};
};
ENDCASE ;
fsData ← NIL ;
}
ELSE {
wait around and try to get a node that is ok to re-use.
Note that just because one becomes free does not mean that SetUpNodes
will find it next time around the loop (another stream may have grabbed it).
If the buffers get into an error state, then nodeNowAvailable returns FALSE
and we use FinishWrites to clean up.
IF
NOT nodeNowAvailable[fileData: fileData]
THEN
FinishWrites[fileData: fileData, fsData: fsData, currentNode: NIL];
};
makeNodeValid:
PROC [fileData: FileDataHandle, node: BufferNodeHandle] = {
bytesToRead: INT ;
WHILE node.status # valid
DO
IF doTheRead[fileData: fileData, node: node]
THEN {
bytesToRead ← MIN[fileData.fileLength - node.firstFileByteInBuffer, INT[node.bufferBytes]];
IF fileData.validBytesOnDisk <= node.firstFileByteInBuffer
THEN {
Avoid read: the data is trash on disk. We are extending the file anyway.
FinishRead[fileData: fileData, node: node, bufferSize: bytesToRead];
}
ELSE {
IF bytesToRead > 0
THEN
TRUSTED{
ReadFilePages[f: fileData.fileHandle,
from: node.firstFileByteInBuffer,
numPages: PagesForRoundUpBytes[bytesToRead],
to: node.buffer,
interval: node.bufferInterval
!
FS.Error => {
FinishBadRead[ fileData: fileData, node: node];
};
];
};
FinishRead[fileData: fileData, node: node, bufferSize: bytesToRead];
};
};
ENDLOOP;
};
doTheRead:
ENTRY
PROC [fileData: FileDataHandle, node: BufferNodeHandle]
RETURNS [
BOOL] =
INLINE {
ENABLE UNWIND => NULL;
IF node.status = invalid
OR node.status = needsSequentialRead
THEN {
node.status ← sequentialReadActive ;
RETURN [TRUE];
};
IF node.status = valid THEN RETURN [ FALSE ] ELSE WAIT fileData.somethingHappened ;
RETURN [ FALSE ] ;
};
SetUpNodes:
ENTRY
PROC [fileData: FileDataHandle, fsData: FSDataHandle, fileByte:
INT]
RETURNS [success:
BOOL ←
TRUE, currentNode: BufferNodeHandle ←
NIL,
nextNode: BufferNodeHandle ←
NIL] = {
This procedure runs under the monitor.
It looks for buffers for the current and next nodes.
The node returned in currentNode is the one to use as current. If it is
marked "active", then the caller must fill it before use.
The node nextNode is returned as NIL if no preread is needed. If non-NIL,
the caller should arrange to preread into this buffer.
ENABLE UNWIND => NULL;
nowNode: BufferNodeHandle ← fileData.firstBufferNode;
availableNode: BufferNodeHandle ← NIL ;
availableNodeLRUCount: INT ← 1000000;
maxLRUCount: INT ← 0 ;
node: BufferNodeHandle ;
oldCurrentNode: BufferNodeHandle ← fsData.currentNode ;
oldFirstByteInBuffer:
INT =
IF fsData.currentNode =
NIL
THEN -1
ELSE fsData.currentNode.firstFileByteInBuffer;
bufferBytes: INT = nowNode.bufferBytes ;
IF (node ← fsData.currentNode) # NIL THEN node.useCount ← node.useCount-1 ;
IF (node ← fsData.readAheadNode) # NIL THEN node.useCount ← node.useCount-1 ;
fsData.currentNode ← NIL ;
fsData.readAheadNode ← NIL;
UNTIL nowNode =
NIL
DO
firstByte: INT ← nowNode.firstFileByteInBuffer;
IF nowNode.LRUCount > maxLRUCount THEN maxLRUCount ← nowNode.LRUCount ;
SELECT
TRUE
FROM
buffer already has correct position in file
firstByte = fileByte => {
currentNode ← nowNode ;
fsData.currentNode ← nowNode ;
nowNode.useCount ← nowNode.useCount+1 ;
};
buffer is next after current
firstByte = fileByte+bufferBytes => {
IF fileData.streamBufferParms.nBuffers > 1
THEN {
nowNode.useCount ← nowNode.useCount+1 ;
fsData.readAheadNode ← nowNode ;
}
ELSE {
IF( nowNode.status = valid
OR nowNode.status = invalid)
AND nowNode.useCount = 0 AND
nowNode.LRUCount <= availableNodeLRUCount
THEN {
availableNodeLRUCount ← nowNode.LRUCount ;
availableNode ← nowNode ;
};
};
};
buffer not "near" stream pointer (tested in above two cases)
and it is not active, and it is not near the other stream (if it exits)
( nowNode.status = valid
OR nowNode.status = invalid)
AND
nowNode.useCount = 0
AND nowNode.LRUCount <= availableNodeLRUCount => {
availableNodeLRUCount ← nowNode.LRUCount ;
availableNode ← nowNode ;
};
ENDCASE; -- SELECT TRUE
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
IF currentNode =
NIL
THEN {
IF availableNode = NIL THEN RETURN[FALSE];
currentNode ← availableNode ;
currentNode.LRUCount ← maxLRUCount + 1 ;
currentNode.useCount ← currentNode.useCount+1 ;
currentNode.status ← invalid ;
currentNode.firstFileByteInBuffer ← fileByte ;
availableNode ← NIL ;
fsData.currentNode ← currentNode ;
};
preread if a sequence has been established.
IF fileData.streamBufferParms.nBuffers > 1
AND
(oldFirstByteInBuffer+bufferBytes = fileByte) AND
(fsData.lastFirstByteInBuffer+bufferBytes = oldFirstByteInBuffer) AND
(fileData.fileLength > fileByte+bufferBytes) AND
(fileData.validBytesOnDisk > fsData.lastFirstByteInBuffer+bufferBytes)
THEN {
IF fsData.readAheadNode #
NIL
THEN {
a node already points to the right place in the file
IF fsData.readAheadNode.status = invalid
THEN {
nextNode ← fsData.readAheadNode ;
nextNode.status ← needsParallelRead ;
};
}
ELSE {
IF availableNode =
NIL
THEN {
availableNodeLRUCount ← 1000000;
nowNode ← fileData.firstBufferNode;
UNTIL nowNode =
NIL
DO
IF ( nowNode.status = valid
OR nowNode.status = invalid)
AND
nowNode.useCount = 0 AND
nowNode.LRUCount <= availableNodeLRUCount
THEN
availableNode ← nowNode ;
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
};
IF availableNode #
NIL
THEN {
nextNode ← availableNode ;
nextNode.status ← needsParallelRead ;
nextNode.LRUCount ← maxLRUCount + 1 ;
nextNode.useCount ← nextNode.useCount+1 ;
nextNode.firstFileByteInBuffer ← fileByte + bufferBytes ;
fsData.readAheadNode ← nextNode ;
};
};
};
fsData.lastFirstByteInBuffer ← oldFirstByteInBuffer ;
fsData ← NIL ;
};
ForceOut:
PROC [fsData: FSDataHandle] = {
Called from Flush for write streams, or Close for any stream
This is the only proc that sets byte length, and only proc that finishes trans.
fileData: FileDataHandle = fsData.fileData;
node: BufferNodeHandle ← fsData.currentNode ;
IF fsData.isWriteStream THEN CleanupAfterPut[fileData: fileData, selfData: fsData];
IF node.dataBytesInBuffer + node.firstFileByteInBuffer > fileData.byteSize
THEN {
fileData.byteSize ← node.dataBytesInBuffer + node.firstFileByteInBuffer ;
SetFileSize[fileData.fileHandle, fileData.byteSize] ;
};
IF fileData.validBytesOnDisk < node.dataBytesInBuffer + node.firstFileByteInBuffer
THEN
fileData.validBytesOnDisk ← node.dataBytesInBuffer + node.firstFileByteInBuffer ;
This call does the writes under the monitor lock.
This should be true for read streams since you want to stop the writer from
dirting buffers.
For write streams, you could get by without the lock provided you were
extremely careful about a ForceOut on the read stream, a StreamFromOpenStream
(it will allocate more buffers). The easy way out is to use the monitor.
FinishWrites[ fileData: fileData, fsData: fsData, currentNode: NIL];
IF fsData.isWriteStream
AND fileData.accessRights = $write
AND
fileData.fileLength # fileData.byteLength
THEN {
fileData.byteLength ← fileData.fileLength;
fileData.fileHandle.SetByteCountAndCreatedTime[fileData.byteLength] };
fsData ← NIL ;
};
SaveStreamError:
PUBLIC
PROCEDURE [self:
STREAM, error:
FS.ErrorDesc] = {
WITH self.streamData
SELECT
FROM
fsData: FSDataHandle => fsData.FSErrorDesc ← error ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
ErrorFromStream:
PUBLIC
PROCEDURE [self:
STREAM]
RETURNS [
FS.ErrorDesc] = {
WITH self.streamData
SELECT
FROM
fsData: FSDataHandle => RETURN [fsData.FSErrorDesc];
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
SetStreamClassData:
PUBLIC
PROCEDURE [self:
STREAM, data:
REF
ANY] = {
WITH self.streamData
SELECT
FROM
fsData: FSDataHandle => fsData.StreamClassData ← data ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
GetStreamClassData:
PUBLIC
PROCEDURE [self:
STREAM]
RETURNS [data:
REF
ANY] = {
WITH self.streamData
SELECT
FROM
fsData: FSDataHandle => RETURN [fsData.StreamClassData] ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
SetFinalizationProc:
PUBLIC
PROCEDURE [self:
STREAM, proc: FileStream.FinalizationProc] = {
WITH self.streamData
SELECT
FROM
fsData: FSDataHandle => fsData.FinalizationProc ← proc ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
GetFinalizationProc:
PUBLIC
PROCEDURE [self:
STREAM]
RETURNS
[proc: FileStream.FinalizationProc] = {
WITH self.streamData
SELECT
FROM
fsData: FSDataHandle => RETURN [fsData.FinalizationProc] ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
bufferBad:
PROCEDURE [status: FileStreamPrivate.NodeStatus]
RETURNS [bad:
BOOL] =
INLINE {
SELECT status
FROM
valid, needsParallelWrite, parallelWriteActive, needsSequentialWrite, sequentialWriteActive => RETURN[FALSE];
ENDCASE => RETURN[TRUE];
};
Talking to FS
ReadFilePages:
PROC [f:
FS.OpenFile, from: ByteNumber, numPages:
INT, to:
LONG
POINTER, interval:
VM.Interval] =
INLINE {
p: PageNumber = from/bytesPerFilePage;
TRUSTED{f.Read[from: p, nPages: numPages, to: to]};
};
WriteFilePages:
PROC [f:
FS.OpenFile, node: BufferNodeHandle] = {
numPages: INT = PagesForRoundUpBytes[node.dataBytesInBuffer];
interval: VM.Interval = node.bufferInterval;
firstPage: VM.PageNumber ← interval.page ;
lastPage: VM.PageNumber ← interval.page+numPages-1 ;
from: LONG POINTER ;
IF node.firstFileByteInBuffer < 0 THEN RETURN; -- not a valid buffer to write
TRUSTED {from ← VM.AddressForPageNumber[firstPage]} ;
f.Write[from: from, nPages: lastPage-firstPage+1,
to: (node.firstFileByteInBuffer +
VM.BytesForPages[firstPage-interval.page])/bytesPerFilePage];
};
SetFileSize:
PROC [f:
FS.OpenFile, byteSize: ByteCount] = {
f.SetPageCount[pages: (byteSize+bytesPerFilePage-1)/bytesPerFilePage];
};
GetFileLock:
PROC [f:
FS.OpenFile]
RETURNS [
FS.Lock] = {
RETURN [f.GetInfo[].lock]
};
ProcHandleFromAccessRights:
PUBLIC
PROC [accessRights:
FS.Lock]
RETURNS [ procs: FileStreamPrivate.ProcHandle] = {
SELECT accessRights
FROM
read => RETURN [nucleusFileIOReadProcs];
write => RETURN [nucleusFileIOAllProcs];
ENDCASE => RETURN[NIL];
Stream creation
StreamFromOpenFile:
PUBLIC
PROC [openFile:
FS.OpenFile, accessRights:
FS.Lock, initialPosition:
FS.InitialPosition, streamOptions:
FS.StreamOptions, streamBufferParms:
FS.StreamBufferParms, extendFileProc:
FS.ExtendFileProc]
RETURNS [stream:
STREAM] = {
no monitors are needed in this code since, until this proc returns,
no other code can refer to the streams
pageAllocation: PageCount;
byteLength: ByteCount;
fileName: ROPE = openFile.GetName[].fullFName;
fsData: FSDataHandle;
fsDataFile: FileDataHandle ;
node: BufferNodeHandle ;
Index must always be less than 64K, so we have to clip off a page from the max.
IF streamBufferParms.vmPagesPerBuffer = 128
THEN
streamBufferParms.vmPagesPerBuffer ← 127;
IF accessRights = $write
AND GetFileLock[openFile] # $write
THEN
ERROR FS.Error [[lock, $wrongLock, fileName]];
[pages: pageAllocation, bytes: byteLength] ← openFile.GetInfo[];
fsData ← NEW[Data ← []];
fsDataFile ←
NEW[FileData ← [
fileName: fileName,
accessRights: accessRights,
fileLength: byteLength,
fileHandle: openFile,
streamBufferParms: streamBufferParms,
extendFileProc: extendFileProc,
streamOptions: streamOptions,
byteLength: byteLength,
byteSize: pageAllocation*bytesPerFilePage,
validBytesOnDisk: byteLength]
];
IF fsDataFile.byteLength > fsDataFile.byteSize THEN ERROR;
fsData.fileData ← fsDataFile ;
fsDataFile.firstBufferNode ← node ←
CreateBufferSpace[streamBufferParms.vmPagesPerBuffer, accessRights];
FOR i:
INT
IN [2..streamBufferParms.nBuffers]
DO
node.nextBufferNode ←
CreateBufferSpace[streamBufferParms.vmPagesPerBuffer, accessRights];
node ← node.nextBufferNode ;
ENDLOOP;
stream ← IO.CreateStream[FileStreamPrivate.ProcHandleFromAccessRights[accessRights], fsData];
IOUtils.StoreData[self: stream, key: $Name, data: fsDataFile.fileName];
IF accessRights = $write
THEN {
fsDataFile.writeStreamData ← fsData ;
fsData.isWriteStream ← TRUE ;
IF fsDataFile.byteSize = 0
THEN {
fsDataFile.byteSize ← NewByteSize[fsDataFile.byteSize];
SetFileSize[fsDataFile.fileHandle, fsDataFile.byteSize];
};
}
ELSE {
fsDataFile.firstReadStream ← fsData;
};
IF initialPosition = start
THEN {
[] ← FileStreamPrivate.SetupBuffer[fileData: fsDataFile, fsData: fsData, fileByte: 0]
}
ELSE {
initialPosition = end
node ← FileStreamPrivate.SetupBuffer[fileData: fsDataFile,
fsData: fsData, fileByte: PageContainingLastByte[fsDataFile.fileLength]];
fsData.index ← node.dataBytesInBuffer;
};
IF streamOptions[tiogaRead]
AND byteLength > 0
THEN {
isTioga: BOOL; len: INT;
[yes: isTioga, len: len] ← IsThisThingATiogaFile[stream];
IF isTioga
THEN {
IF accessRights = $read
THEN {
make length look changed by sneaky call to SetLength (not in stream procs).
since stream is opened for read only, this call won't change the length in the file.
FileStream.SetLength[stream, len];
fsDataFile.tiogaReader ← TRUE
}
ELSE {
you can't incrementally update a Tioga file with IO!
stream.Close[];
ERROR FS.Error[[user, $cantUpdateTiogaFile, fileName]];
}
}
};
IF FileStreamPrivate.DoFinalization
THEN {
SafeStorage.EnableFinalization[fsData];
};
fsData.ConvertFStoIOErrors ← TRUE;
fsData ← NIL ;
RETURN[stream];
};--StreamFromOpenFile
PageContainingLastByte:
PROC [byteLen:
INT]
RETURNS [
INT] =
INLINE {
IF byteLen = 0
THEN
RETURN[0]
ELSE {
byteLen ← byteLen - 1;
LOOPHOLE[byteLen, LongNumber[pair]].lo ←
BITAND[LOOPHOLE[byteLen, LongNumber[pair]].lo, clearLowBits];
RETURN[byteLen]
};
OpenFileFromStream:
PUBLIC
PROC [self:
STREAM]
RETURNS [
FS.OpenFile] = {
WITH self.streamData
SELECT
FROM
fsData: FSDataHandle => RETURN [fsData.fileData.fileHandle];
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
StreamFromOpenStream:
PUBLIC
PROC [self:
STREAM]
RETURNS [stream:
STREAM] = {
newData: FSDataHandle ;
filePos: INT ;
WITH self.streamData
SELECT
FROM
selfData: FSDataHandle => {
fileData: FileDataHandle = selfData.fileData;
IF
NOT selfData.isWriteStream
OR fileData.firstReadStream #
NIL
THEN
ERROR FS.Error[[client, $notImplemented, "self is not a write stream, or there already is a read stream"]];
newData ← NEW[Data ← [ ] ];
newData.fileData ← fileData ;
stream ←
IO.CreateStream[FileStreamPrivate.ProcHandleFromAccessRights[$read],
newData];
IOUtils.StoreData[self: stream, key: $Name, data: fileData.fileName];
fileData.firstReadStream ← newData;
filePos ← SetUpClonedStream[fileData: fileData, fsData: selfData];
[] ← FileStreamPrivate.SetupBuffer[fileData: fileData, fsData: newData,
fileByte: selfData.currentNode.firstFileByteInBuffer] ;
newData.index ← selfData.index ;
IF FileStreamPrivate.DoFinalization
THEN {
SafeStorage.EnableFinalization[newData];
};
newData.ConvertFStoIOErrors ← TRUE;
newData ← NIL ;
};
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
SetUpClonedStream:
ENTRY
PROC [fileData: FileDataHandle, fsData: FSDataHandle]
RETURNS [filePos:
INT]= {
ENABLE UNWIND => NULL;
node: BufferNodeHandle ← fileData.firstBufferNode;
Find last node
UNTIL node.nextBufferNode =
NIL
DO
node ← node.nextBufferNode;
ENDLOOP ;
Allocate some more nodes.
FOR i:
INT
IN [1..fileData.streamBufferParms.nBuffers]
DO
node.nextBufferNode ←
CreateBufferSpace[fileData.streamBufferParms.vmPagesPerBuffer, read];
node ← node.nextBufferNode;
ENDLOOP;
fileData.numberOfStreams ← fileData.numberOfStreams + 1 ;
filePos ← fsData.index + fsData.currentNode.firstFileByteInBuffer ;
fsData ← NIL ;
};
Buffer management
CreateBufferSpace:
PROC [vmPagesPerBuffer:
INT [1 .. 128], accessRights:
FS.Lock]
RETURNS [BufferNodeHandle] = {
vmPages: INT ← MIN[vmPagesPerBuffer, maxVMPagesPerBuffer] ;
newBuffer: BufferNodeHandle ← NEW[BufferNode];
allocateCounter: INT ← 0;
newBuffer.bufferInterval ←
VM.Allocate[count: vmPages
!
VM.CantAllocate => {
We cannot accept anything but the right size interval.
The program assumes that all buffers are the same size.
Process.Pause[4];
IF (allocateCounter ← allocateCounter + 1) < 100 THEN RETRY;
};
];
TRUSTED {
newBuffer.buffer ← VM.AddressForPageNumber[newBuffer.bufferInterval.page];
IF accessRights = write THEN VM.SwapIn[newBuffer.bufferInterval];
};
newBuffer.bufferBytes ← VM.BytesForPages[pages: vmPages];
RETURN[newBuffer]
};
NewByteSize:
PROC [byteCount: ByteCount]
RETURNS [ByteCount] = {
RETURN [byteCount+5120];
};
Tioga
IsThisThingATiogaFile:
PROC [h:
STREAM]
RETURNS [yes:
BOOL, len:
INT] = {
pos, length: INT;
{
-- block so EXITS code can use pos, len, and length.
controlHeaderId: ARRAY [0..fileIdSize) OF CHAR = [235C,312C];
controlTrailerId: ARRAY [0..fileIdSize) OF CHAR = [205C,227C];
commentHeaderId: ARRAY [0..fileIdSize) OF CHAR = [0C,0C];
fileIdSize: NAT = 2;
numTrailerLengths: NAT = 3; -- <file-props-length> <data-length> <file-length>
endSize: NAT = fileIdSize+numTrailerLengths*4; -- trailer plus three lengths
ReadLen:
PROC [h:
STREAM]
RETURNS [
INT] = {
start: PACKED ARRAY [0..3] OF CHARACTER;
start[0] ← h.GetChar[]; start[1] ← h.GetChar[];
start[2] ← h.GetChar[]; start[3] ← h.GetChar[];
RETURN [LOOPHOLE[start]] };
commentStart, commentLen, propsLen, controlLen, controlEnd: INT;
pos ← h.GetIndex[]; -- save position to restore later
length ← h.GetLength[]; -- length including any trailer stuff
controlEnd ← length-endSize; -- where the trailer info starts
IF controlEnd <= 0 THEN GOTO fail; -- too small
h.SetIndex[controlEnd]; -- set up to read the trailer
FOR i:
NAT
IN [0..fileIdSize)
DO
-- read the controlTrailerId
IF h.GetChar[] # controlTrailerId[i] THEN GOTO fail;
ENDLOOP;
IF (propsLen ← ReadLen[h]) NOT IN [0..controlEnd) THEN GOTO fail;
IF (commentStart ← ReadLen[h]) NOT IN [0..controlEnd) THEN GOTO fail;
IF ReadLen[h] # length THEN GOTO fail;
IF commentStart > 0
THEN {
-- may have padded text with a null
h.SetIndex[commentStart-1];
len ← IF h.GetChar[]=0C THEN commentStart-1 ELSE commentStart }
ELSE h.SetIndex[len ← commentStart];
FOR i:
NAT
IN [0..fileIdSize)
DO
-- read the commentHeaderId
IF h.GetChar[] # commentHeaderId[i] THEN GOTO fail;
ENDLOOP;
commentLen ← ReadLen[h]; -- the length of the comment section
IF commentStart+commentLen NOT IN [0..controlEnd) THEN GOTO fail;
h.SetIndex[commentStart+commentLen]; -- go to start of control info
FOR i:
NAT
IN [0..fileIdSize)
DO
-- check the controlHeaderId
IF h.GetChar[] # controlHeaderId[i] THEN GOTO fail;
ENDLOOP;
controlLen ← ReadLen[h]; -- the length of the control section
IF commentStart+commentLen+controlLen # length THEN GOTO fail;
GOTO succeed;
EXITS
fail => { h.SetIndex[pos]; RETURN [FALSE, length] };
succeed => { h.SetIndex[pos]; RETURN [TRUE, len] };
};
Procedure records (never modified)
nucleusFileIOReadProcs:
PUBLIC FileStreamPrivate.ProcHandle =
IO.CreateStreamProcs[
variety: $input, class: $File,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: NIL, -- not implemented
putBlock: NIL, -- call PutChar
unsafePutBlock: NIL, -- call PutChar
flush: Flush,
reset: Reset,
close: Close,
getIndex: GetIndex,
setIndex: SetIndex,
backup: Backup,
getLength: GetLength
];
nucleusFileIOAllProcs:
PUBLIC FileStreamPrivate.ProcHandle =
IO.CreateStreamProcs[
variety: $inputOutput, class: $File,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
putBlock: PutBlock,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
reset: Reset,
close: Close,
getIndex: GetIndex,
setIndex: SetIndex,
backup: Backup,
getLength: GetLength,
setLength: SetLength,
eraseChar: EraseChar
];
Finalization code
fQ: SafeStorage.FinalizationQueue;
Finalize:
PROC =
BEGIN
DO
fsData: FSDataHandle ← NARROW [ SafeStorage.FQNext[fQ] ];
streamIsClosed: BOOL ;
forceCleanUp: BOOL ← FALSE;
streamIsClosed ← fsData.streamIsClosed ;
IF
NOT fsData.streamIsClosed
AND fsData.currentNode #
NIL
THEN {
ForceOut[ fsData: fsData
! FS.Error => {forceCleanUp ← TRUE; CONTINUE}];
CloseFileDataForStream[fileData: fsData.fileData, fsData: fsData, forceCleanUp: forceCleanUp
! FS.Error => CONTINUE];
};
IF fsData.isWriteStream
THEN {
IF fsData.fileData.writeStreamData = fsData
THEN
fsData.fileData.writeStreamData ← NIL ;
}
ELSE {
IF fsData.fileData.firstReadStream = fsData
THEN
fsData.fileData.firstReadStream ← NIL ;
};
IF fsData.FinalizationProc #
NIL
THEN {
fsData.FinalizationProc [ openFile: fsData.fileData.fileHandle,
data: fsData.StreamClassData,
closed: streamIsClosed];
};
fsData ← NIL;
ENDLOOP;
END;
Initialization
start code: start up finalization stuff
IF FileStreamPrivate.DoFinalization
THEN {
fQ ← SafeStorage.NewFQ[];
SafeStorage.EstablishFinalization[CODE[Data], 1, fQ];
TRUSTED { Process.Detach[FORK Finalize[]] };
};
END.
CHANGE
LOG
Created by MBrown on June 22, 1983 10:08 am
By editing FileIOAlpineImpl.
Changed by MBrown on August 19, 1983 2:44 pm
Close FS file when closing stream (this should really be an option).
Changed by Birrell on August 23, 1983 3:14 pm
In SetFileSize: byteSize/bytesPerFilePage -> (byteSize+bytesPerFilePage-1)/bytesPerFilePage.
Changed by MBrown on August 25, 1983 1:18 pm
In SetIndex: fsData.byteSize < firstBufferByte+fsData.dataBytesInBuffer -> fsData.byteSize <= firstBufferByte+fsData.dataBytesInBuffer. Implemented GetFileLock (was stubbed waiting for FS). In StreamFromOpenFile, if stream open for write and file has no pages, extend it.
Changed by MBrown on September 17, 1983 8:41 pm
Conversion to new IO interface.
Changed by Hagmann on November 22, 1983 4:29 pm
Implement multiple-page stream buffer.
Implement coupled read and write streams on same open file.
Changed data structures in FileStreamPrivate, and fixed references in this module
to the new data structures. This meant changes to nearly every routine.
Close file during StreamOpen if an error occurs in StreamFromOpenFile.
Implement streamOptions and streamBufferParms features.
Added finalization.
Changed name from FileIOFSImpl.
Split out create code to make FileStreamCreateImpl smaller since compiler blows up in pass 3
Changed by Hagmann on November 28, 1983 12:00 pm
Fixed EndOf bug for multiple streams.
Added test for DoFinalization to enable FileStream testing without making a boot file
Changed by Hagmann on December 6, 1983 4:52 pm
Added code for process cache
Changed by Hagmann on December 27, 1983 4:52 pm
Fixed bug (reported by Plass, found by Nix) in unsafegetblock and unsafeputblock
for blocks > 32K.
Changed by Hagmann on December 29, 1983 12:54 pm
Fixed bug (reported and new code by Plass) in AddNAT
Changed by Hagmann on January 3, 1984 11:34 am
Added conditional conversion for FS to IO Errors in convertFStoIOError.
This makes FS errors that occur during stream open appear as FS errors, not IO errors
(fixes bug reported by Nix).
Changed by Hagmann on January 10, 1984 9:04 am
Fixed stream hung problem reported by Willie-Sue and Stewart. SetupBuffer was treating
buffers that are in some stage of writing was not valid. It would wait for them to become
valid (due to a parallel read), or do sequential reads if the parallel read failed. This caused
buffers that were writing in parallel that had a write error to wait forever (the node never
became valid or had a parallel read error).
Changed by Hagmann on January 20, 1984 5:11 pm
Added logic to use the changed/unchanged VM information for writing to disk
Changed by Hagmann on February 6, 1984 2:19:10 pm
PST
Added check for read-only streams to ForceOut. This is to avoid a bug
is the changed/unchanged VM information after a rollback (some pages looked
changed for a read-only file).
Changed by Hagmann on May 9, 1984 9:12:21 am
PDT
Added check to ensure that read aheads complete before the buffer is deallocated by VM
during a close. This fixes a problem encountered by Frank Crow while running the TSetter,
and narrowed down to FileStream by Willie-Sue.
Changed by December 18, 1984 5:40:06 pm PST
Added UNWIND in Close's call to ForceOut, and the forceCleanUp argument to CloseFileDataForStream. Reformatted file.
Bob Hagmann January 31, 1985 5:41:22 pm PST
remove use of FSLock
changes to: Finalize
Bob Hagmann May 14, 1985 11:09:42 am PDT
put back in use of FSLock
changes to: Finalize, Initialization
Bob Hagmann September 18, 1985 07:06:15 PDT
WriteFilePages and FinishWrites changes to avoid writing pages improperly after shortening the file
Bob Hagmann October 25, 1985 2:02:44 pm PDT
WriteFilePages to mark interval unchanged after write. Fixed subtile finalization bug
Bob Hagmann June 9, 1986 5:10:02 pm PDT
checked for node invalid on all get/put entries (previous error can get us into trouble)
merged in FileStreamCreateImpl
Change log for FileStreamCreateImpl:
Created by Hagmann on November 22, 1983 4:30 pm
By cutting this out of FSFileIOImpl.
Changed by Hagmann on November 28, 1983 12:01 pm
Added test for DoFinalization to enable FileStream testing without making a boot file
Changed by Hagmann on December 6, 1983 4:52 pm
Removed code for process cache
Changed by Hagmann on January 3, 1984 11:33 am
Added enable for ConvertFStoIOErrors.
Russ Atkinson (RRA) May 9, 1985 1:56:42 pm PDT
Added VM.SwapIn for write buffer pages to avoid bogus page faults
changes to: StreamFromOpenFile, SetUpClonedStream, CreateBufferSpace
Bob Hagmann May 14, 1985 11:10:58 am PDT
put back in use of FSLock
changes to: StreamFromOpenFile, StreamFromOpenStream
Bob Hagmann November 13, 1986 1:02:46 pm PST
modified to allow multiple parallel writes
Doug Wyatt, December 12, 1986 5:18:31 pm PST
changed Basics.LongNumber[num] to Basics.LongNumber[pair]
changes to: RoundUpToPages, PageContainingLastByte