FileStreamImpl.mesa
Copyright Ó 1985, 1986 by Xerox Corporation. All rights reserved.
MBrown on September 17, 1983 8:39 pm
Rovner on August 15, 1983 1:02 pm
Levin on September 22, 1983 3:22 pm
Birrell on August 23, 1983 3:14 pm
Schroeder on November 28, 1983 12:36 pm
Russ Atkinson (RRA) February 4, 1985 3:09:10 pm PST
Bob Hagmann October 17, 1988 2:08:24 pm PDT
Doug Wyatt, December 12, 1986 5:19:17 pm PST
Please maintain change log at end of file.
DIRECTORY
Basics USING [ByteBlt, ByteBltBlock, bytesPerWord, LongNumber, BITAND, LowHalf, UnsafeBlock],
BasicTime USING [GMT],
FileStream USING [FinalizationProc, SetLength],
FileStreamPrivate USING[ Data, DoFinalization, FSDataHandle, BufferNodeHandle, BufferNode, FileDataHandle, FileData, NodeStatus, ProcHandle, ProcHandleFromAccessRights, SetupBuffer, StartRequest ],
FS USING [ByteCount, BytesForPages, Close, Error, ErrorDesc, ExtendFileProc, GetInfo, GetName, InitialPosition, Lock, nullOpenFile, OpenFile, Read, SetByteCountAndCreatedTime, SetPageCount, StreamBufferParms, StreamOptions, Write],
IO USING [Close, CreateStream, CreateStreamProcs, EndOfStream, Error, GetChar, GetIndex, GetLength, SetIndex, STREAM, StreamProcs],
IOUtils USING [closedStreamProcs, StoreData],
Process USING [Detach, GetPriority, Pause, Priority, prioritySysForeground, SetPriority],
Rope USING [ROPE],
RuntimeError USING [BoundsFault],
SafeStorage USING [EnableFinalization, EstablishFinalization, FinalizationQueue, FQNext, NewFQ],
VM USING [ AddressForPageNumber, Allocate, BytesForPages, CantAllocate, Free, Interval, PageNumber];
FileStreamImpl: CEDAR MONITOR
LOCKS fileData.lockRecord USING fileData: FileDataHandle
IMPORTS Basics, FileStream, FileStreamPrivate, FS, IO, IOUtils, Process, RuntimeError, SafeStorage, VM
EXPORTS FileStreamPrivate, FileStream = BEGIN OPEN Basics, Rope;
STREAM: TYPE = IO.STREAM;
ByteCount: TYPE = INT;
ByteNumber: TYPE = ByteCount; -- index rather than count
PageNumber: TYPE = VM.PageNumber;
PageCount: TYPE = VM.PageNumber;
bytesPerFilePage: CARDINAL = FS.BytesForPages[1];
maxVMPagesPerBuffer: INT = 65536/VM.BytesForPages[pages: 1] ;
clearLowBits: CARDINAL = CARDINAL.LAST-(bytesPerFilePage-1);
minFileExtend: INT = 10*bytesPerFilePage;
Data: TYPE = FileStreamPrivate.Data;
FSDataHandle: TYPE = FileStreamPrivate.FSDataHandle;
BufferNode: TYPE = FileStreamPrivate.BufferNode;
BufferNodeHandle: TYPE = FileStreamPrivate.BufferNodeHandle;
FileDataHandle: TYPE = FileStreamPrivate.FileDataHandle;
FileData: TYPE = FileStreamPrivate.FileData;
ProcHandle: TYPE = FileStreamPrivate.ProcHandle;
This code does not protect itself from parallel use of a stream by concurrent
processes. It assumes that the processes will synchronize at a higher level.
Parallel use of different streams for the same open file is expected, but the
read/write stream must be opened by StreamOpen or StreamFromOpenFile,
and read stream by StreamFromOpenStream.
Get and Put
CleanupAfterPut: ENTRY PROC [fileData: FileDataHandle, selfData: FSDataHandle] = INLINE {
Restores dataBytesInBuffer and fileLength if they are messed up by a PutChar or
PutBlock past the end of file.
Same logic in SetLengthUnderMonitor.
only call this routine with a write stream
currentNode: BufferNodeHandle = selfData.currentNode;
IF currentNode.didPut THEN {
currentNode.bufferDirty ← TRUE;
IF selfData.index > currentNode.dataBytesInBuffer THEN {
currentNode.dataBytesInBuffer ← selfData.index;
fileData.fileLength ← currentNode.firstFileByteInBuffer + selfData.index;
};
currentNode.didPut ← FALSE;
};
};
EstablishFileLength: ENTRY PROC[fileData: FileDataHandle ]
RETURNS [fileLength: INT] = INLINE {
Paw through write stream info to find the file length. The new length is
the true length modulo some uncertainity whether a put was done in parallel
during the execution of this routine. The file length returned is
as least as big as the file was when the monitor was acquired. This
is fine because the notion of EOF or file length for a reader of a file that is
in the process of being written is somewhat vague. A higher level protocol
should keep this straight in the client (why are you reading bytes that
might not be there yet?). Fix up dataBytesInBuffer if needed.
(This is mostly intended to allow the read stream to look at the file size seen
by the write stream.)
ENABLE UNWIND => NULL;
writeData: FSDataHandle ← fileData.writeStreamData;
IF writeData = NIL OR writeData.streamIsClosed THEN {
writeData ← NIL ;
RETURN[fileData.fileLength] ;
}
ELSE {
writeNode: BufferNodeHandle = writeData.currentNode ;
writeNode.dataBytesInBuffer ← MAX[ writeData.index, writeNode.dataBytesInBuffer] ;
fileLength ← MAX[
fileData.fileLength,
writeNode.firstFileByteInBuffer + writeNode.dataBytesInBuffer ] ;
fileData.fileLength ← fileLength;
};
writeData ← NIL ;
};
convertFStoIOError: PROC [self: STREAM, error: FS.ErrorDesc] = INLINE {
selfData: FSDataHandle ← NARROW[self.streamData];
IF selfData # NIL THEN {
selfData.FSErrorDesc ← error ;
IF selfData.ConvertFStoIOErrors THEN IO.Error[$Failure, self];
}
ELSE {
IO.Error[$Failure, self];
};
};
GetChar: PUBLIC PROC [self: STREAM] RETURNS [CHAR] = {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
node: BufferNodeHandle ← selfData.currentNode ;
c: CHAR;
fileLength: INT ;
IF bufferBad[node.status] THEN IO.Error[$Failure, self];
IF selfData.index >= node.dataBytesInBuffer THEN {
Suspect that end-of-buffer or end-of-file has been reached.
This may be false! However, the test is cheap and usually false.
fileLength ← EstablishFileLength[fileData: selfData.fileData ] ;
File length may be wrong if writer is using the same buffer
as the reader, so get a good file length. This is not cheap: we
have to get a monitor lock and maybe look inside the write stream.
Note that we use the local variable fileLength and not
selfData.fileData.fileLength.
IF fileLength <= selfData.index+node.firstFileByteInBuffer
THEN ERROR IO.EndOfStream[self];
We are not at EOF. If we are at EOB, then get the next buffer.
Not EOF and not EOB can occur if the writer has put some
char's into the buffer and this was not reflected in dataBytesInBuffer
until we did the EstablishFileLength call.
IF selfData.index = node.bufferBytes THEN node ← AdvanceBuffer[selfData] };
TRUSTED{c ← node.buffer[selfData.index]};
selfData.index ← selfData.index + 1;
selfData ← NIL ;
RETURN[c] ;
};
PutChar: PUBLIC PROC [self: STREAM, char: CHAR] = {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
node: BufferNodeHandle ← selfData.currentNode ;
IF selfData.index = node.bufferBytes THEN node ← AdvanceBuffer[selfData];
IF bufferBad[node.status] THEN IO.Error[$Failure, self];
TRUSTED{node.buffer[selfData.index] ← char};
selfData.index ← selfData.index + 1;
node.didPut ← TRUE ;
selfData ← NIL ;
};
Change use to IOUtils.AddNAT when the 0+0 gives NAT.LAST bug is fixed
AddNAT: PROC [a, b: NAT] RETURNS [NAT] = INLINE {
RETURN [MIN[CARDINAL[a]+CARDINAL[b], NAT.LAST]];
};
GetBlock: PUBLIC PROC [self: STREAM, block: REF TEXT, startIndex: NAT,
count: NAT]
RETURNS [nBytesRead: NAT] = TRUSTED {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
textBlock: Basics.ByteBltBlock;
countRemaining: NAT;
stopIndexPlusOne: NAT = MIN [block.maxLength, AddNAT[startIndex, count]];
IF bufferBad[selfData.currentNode.status] THEN IO.Error[$Failure, self];
textBlock ← [
blockPointer: LOOPHOLE[block, LONG POINTER] + TEXT[0].SIZE,
startIndex: startIndex,
stopIndexPlusOne: stopIndexPlusOne];
countRemaining ←
IF startIndex > stopIndexPlusOne THEN 0 ELSE stopIndexPlusOne-startIndex;
nBytesRead ← 0;
WHILE countRemaining # 0 DO
bufferBlock: Basics.ByteBltBlock ← [
blockPointer: selfData.currentNode.buffer,
startIndex: selfData.index,
stopIndexPlusOne: selfData.currentNode.dataBytesInBuffer];
countTransferred: CARDINAL ← 0;
IF bufferBlock.startIndex < bufferBlock.stopIndexPlusOne THEN
countTransferred ← Basics.ByteBlt[from: bufferBlock, to: textBlock];
selfData.index ← selfData.index + countTransferred;
nBytesRead ← nBytesRead + countTransferred;
IF (countRemaining ← countRemaining - countTransferred) = 0 THEN EXIT;
Bytes may be added concurrently with this get. EstablishFileLength gives
a true file length (which may be different from what it was when we
started this iteration) to see if there is more data to blt.
IF EstablishFileLength[fileData: selfData.fileData ] <=
selfData.index + selfData.currentNode.firstFileByteInBuffer
THEN EXIT;
textBlock.startIndex ← textBlock.startIndex + countTransferred;
The below IF is needed for the same reason we called
EstablishFileLength above.
IF selfData.index = selfData.currentNode.bufferBytes THEN [] ← AdvanceBuffer[selfData];
ENDLOOP;
IF nBytesRead # 0 THEN block.length ← startIndex + nBytesRead;
selfData ← NIL ;
RETURN[nBytesRead] };
PutBlock: PUBLIC PROC [self: STREAM, block: REF READONLY TEXT, startIndex: NAT,
count: NAT] = TRUSTED {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
Fail if startIndex<0 or stopIndexPlusOne<0.
textBlock: Basics.ByteBltBlock;
countRemaining: NAT;
stopIndexPlusOne: NAT ← AddNAT[startIndex, count];
IF bufferBad[selfData.currentNode.status] THEN IO.Error[$Failure, self];
IF stopIndexPlusOne > block.maxLength THEN stopIndexPlusOne ← block.length;
textBlock ← [
blockPointer: LOOPHOLE[block, LONG POINTER] + TEXT[0].SIZE,
startIndex: startIndex,
stopIndexPlusOne: stopIndexPlusOne];
countRemaining ←
IF startIndex > stopIndexPlusOne THEN 0 ELSE stopIndexPlusOne-startIndex;
WHILE countRemaining # 0 DO
bufferBlock: Basics.ByteBltBlock ← [
blockPointer: selfData.currentNode.buffer,
startIndex: selfData.index,
stopIndexPlusOne: selfData.currentNode.bufferBytes]; -- allow put past current eof.
countTransferred: CARDINAL ← Basics.ByteBlt[from: textBlock, to: bufferBlock];
selfData.index ← selfData.index + countTransferred;
selfData.currentNode.didPut ← TRUE;
IF (countRemaining ← countRemaining - countTransferred) = 0 THEN EXIT;
textBlock.startIndex ← textBlock.startIndex + countTransferred;
[] ← AdvanceBuffer[selfData];
ENDLOOP;
selfData ← NIL ;
};
maxWordsMoved: INT = (LAST[CARD16] / bytesPerWord) - 1;
maxBytesMoved: INT = maxWordsMoved * bytesPerWord;
maxStopIndexPlusOne: INT = maxBytesMoved ;
all designed to make the max number of bytes transferred an integral number of
words, which is good
UnsafeGetBlock: PUBLIC UNSAFE PROC [self: STREAM, block: UnsafeBlock]
RETURNS [nBytesRead: INT] = UNCHECKED {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
textBlock: Basics.ByteBltBlock;
stopIndexPlusOne: INT;
IF bufferBad[selfData.currentNode.status] THEN IO.Error[$Failure, self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
IF block.count = 0 THEN {
selfData ← NIL ;
RETURN [0];
};
IF block.startIndex > maxBytesMoved THEN {
scale block.startIndex into [0 .. bytesPerWord)
wordOffset: INT = block.startIndex / bytesPerWord;
block.base ← block.base + wordOffset;
block.startIndex ← block.startIndex - wordOffset*bytesPerWord;
};
stopIndexPlusOne ← block.startIndex + block.count;
nBytesRead ← 0;
DO
Transfer at most maxBytesMoved bytes from the stream to block^.
Assert block.startIndex IN [0 .. maxStopIndexPlusOne), < stopIndexPlusOne
countRemaining: CARDINAL;
textBlock ← [
blockPointer: block.base,
startIndex: block.startIndex,
stopIndexPlusOne: MIN[maxStopIndexPlusOne, stopIndexPlusOne]];
countRemaining ← textBlock.stopIndexPlusOne - textBlock.startIndex;
Assert countRemaining > 0
The following loop transfers from the stream to textBlock^ until textBlock^ is full
or end of file is reached.
DO
bufferBlock: Basics.ByteBltBlock ← [
blockPointer: selfData.currentNode.buffer,
startIndex: selfData.index,
stopIndexPlusOne: selfData.currentNode.dataBytesInBuffer];
countTransferred: CARDINAL ← 0;
IF bufferBlock.startIndex < bufferBlock.stopIndexPlusOne THEN
countTransferred ← Basics.ByteBlt[from: bufferBlock, to: textBlock];
selfData.index ← selfData.index + countTransferred;
nBytesRead ← nBytesRead + countTransferred;
IF (countRemaining ← countRemaining - countTransferred) = 0 THEN EXIT;
IF EstablishFileLength[fileData: selfData.fileData ] <=
selfData.index + selfData.currentNode.firstFileByteInBuffer THEN {
selfData ← NIL ;
GOTO return;
};
textBlock.startIndex ← textBlock.startIndex + countTransferred;
IF selfData.index = selfData.currentNode.bufferBytes THEN [] ← AdvanceBuffer[selfData];
ENDLOOP;
IF textBlock.stopIndexPlusOne = stopIndexPlusOne THEN {
selfData ← NIL ;
GOTO return;
};
Assert textBlock.stopIndexPlusOne = maxStopIndexPlusOne
block.base ← block.base + maxWordsMoved;
block.startIndex ← 0;
stopIndexPlusOne ← stopIndexPlusOne - maxBytesMoved;
ENDLOOP;
EXITS
return => RETURN [nBytesRead]
};
UnsafePutBlock: PUBLIC PROC [self: STREAM, block: UnsafeBlock] = TRUSTED {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
selfData: FSDataHandle ← NARROW[self.streamData];
textBlock: Basics.ByteBltBlock;
stopIndexPlusOne: INT;
IF bufferBad[selfData.currentNode.status] THEN IO.Error[$Failure, self];
IF block.startIndex < 0 OR block.count < 0 THEN ERROR RuntimeError.BoundsFault;
IF block.startIndex > maxBytesMoved THEN {
scale block.startIndex into [0 .. bytesPerWord)
wordOffset: INT = block.startIndex / bytesPerWord;
block.base ← block.base + wordOffset;
block.startIndex ← block.startIndex - wordOffset*bytesPerWord;
};
stopIndexPlusOne ← block.startIndex + block.count;
DO
Transfer at most maxBytesMoved bytes from block^ to the stream.
Assert block.startIndex IN [0 .. maxStopIndexPlusOne), < stopIndexPlusOne
countRemaining: CARDINAL;
textBlock ← [
blockPointer: block.base,
startIndex: block.startIndex,
stopIndexPlusOne: MIN[maxStopIndexPlusOne, stopIndexPlusOne]];
countRemaining ← textBlock.stopIndexPlusOne - textBlock.startIndex;
Assert countRemaining > 0
The following loop transfers textBlock^ to the stream.
DO
bufferBlock: Basics.ByteBltBlock ← [
blockPointer: selfData.currentNode.buffer,
startIndex: selfData.index,
stopIndexPlusOne: selfData.currentNode.bufferBytes]; -- allow put past current eof.
countTransferred: CARDINAL ← Basics.ByteBlt[from: textBlock, to: bufferBlock];
selfData.index ← selfData.index + countTransferred;
selfData.currentNode.didPut ← TRUE;
IF (countRemaining ← countRemaining - countTransferred) = 0 THEN EXIT;
textBlock.startIndex ← textBlock.startIndex + countTransferred;
[] �vanceBuffer[selfData];
ENDLOOP;
IF textBlock.stopIndexPlusOne = stopIndexPlusOne THEN EXIT;
Assert textBlock.stopIndexPlusOne = maxStopIndexPlusOne
block.base ← block.base + maxWordsMoved;
block.startIndex ← 0;
stopIndexPlusOne ← stopIndexPlusOne - maxBytesMoved;
ENDLOOP ;
selfData ← NIL ;
};
AdvanceBuffer: PROC [fsData: FSDataHandle] RETURNS [node: BufferNodeHandle]= {
On entry, index = dataBytesInBuffer = bufferBytes. Exit with same position in
file, but index < dataBytesInBuffer or EOF.
Handles implicit file extension.
Called from GetChar, PutChar, GetBlock, PutBlock, UnsafeGetBlock, UnsafePutBlock.
fileData: FileDataHandle = fsData.fileData;
firstByteOfNextPage: INT = fsData.currentNode.firstFileByteInBuffer +
fsData.currentNode.bufferBytes;
changeSize: BOOLFALSE;
IF firstByteOfNextPage = maxLength THEN ERROR IO.Error[$Failure, NIL];
IF fsData.isWriteStream THEN CleanupAfterPut[fileData: fileData, selfData: fsData];
IF firstByteOfNextPage >= fileData.byteSize THEN {
newSize: INT ← 0;
IF fileData.extendFileProc # NIL THEN
newSize ← fileData.extendFileProc[firstByteOfNextPage] ;
fileData.byteSize ← IF newSize # 0 THEN MAX[newSize, firstByteOfNextPage]
ELSE fileData.byteSize + MAX[minFileExtend,
((fileData.byteSize/10)/bytesPerFilePage)*bytesPerFilePage];
SetFileSize[fileData.fileHandle, fileData.byteSize] };
node ← SetupBuffer[fileData: fileData, fsData: fsData, fileByte: firstByteOfNextPage];
fsData.index ← LowHalf[firstByteOfNextPage-fsData.currentNode.firstFileByteInBuffer];
fsData ← NIL ;
};
EndOf: PUBLIC PROC [self: STREAM] RETURNS[BOOL] = {
selfData: FSDataHandle ← NARROW[self.streamData];
node: BufferNodeHandle ← selfData.currentNode ;
do cheap test to see if not at EOF
IF selfData.index >= node.dataBytesInBuffer THEN {
Cheap test inconclusive. Find real file length.
fileLength: INT = EstablishFileLength[fileData: selfData.fileData];
IF fileLength <= selfData.index+node.firstFileByteInBuffer
THEN RETURN[TRUE];
};
selfData ← NIL ;
RETURN[FALSE];
};
CharsAvail: PUBLIC PROC [self: STREAM, wait: BOOL] RETURNS [INT] = {
RETURN[INT.LAST] };
GetIndex: PUBLIC PROC [self: STREAM] RETURNS [index: INT] = {
selfData: FSDataHandle ← NARROW[self.streamData];
index ← selfData.currentNode.firstFileByteInBuffer + selfData.index ;
selfData ← NIL ;
};
SetIndex: PUBLIC PROC [self: STREAM, index: INT] = {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
fsData: FSDataHandle ← NARROW[self.streamData];
currentNode: BufferNodeHandle ← fsData.currentNode ;
firstBufferByte: INT ← currentNode.firstFileByteInBuffer;
fileData: FileDataHandle = fsData.fileData ;
fileLength: INT ;
IF index < 0 THEN ERROR IO.Error[BadIndex, self];
Make sure dataBytesInBuffer and fileLength are correct by calling
CleanupAfterPut or EstablishFileLength
IF fsData.isWriteStream THEN {
CleanupAfterPut[fileData: fileData, selfData: fsData];
fileLength ← fileData.fileLength; }
ELSE fileLength ← EstablishFileLength[fileData: fileData ];
IF index > fileLength THEN ERROR IO.EndOfStream[self];
ensure that page containing byte "index" is in the buffer
IF index NOT IN [firstBufferByte .. firstBufferByte+currentNode.bufferBytes)
THEN {
firstBufferByte ← index - (index MOD currentNode.bufferBytes);
currentNode ← SetupBuffer[fileData: fileData, fsData: fsData, fileByte: firstBufferByte];
};
fsData.index ← index - firstBufferByte;
fsData ← NIL ;
};
Reset: PUBLIC PROC [self: STREAM] = {
SetIndex[self, GetLength[self]] };
Flush: PUBLIC PROC [self: STREAM] = {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
fsData: FSDataHandle ← NARROW[self.streamData];
IF fsData.isWriteStream THEN ForceOut[ fsData: fsData ];
fsData ← NIL ;
};
Close: PUBLIC PROC [self: STREAM, abort: BOOL] = {
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
fsData: FSDataHandle ← NARROW[self.streamData];
IF ~abort THEN ForceOut[ fsData: fsData ! UNWIND => {
I would like to do a CONTINUE, but that is a "GOTO" and thus will stop the UNWIND here.
There is some bug that confuses the UNWIND machinery that trashes fsData from the local frame. We recompute it here.
fsData: FSDataHandle ← NARROW[self.streamData];
CloseFileDataForStream[fileData: fsData.fileData, fsData: fsData, forceCleanUp: TRUE];
fsData ← NIL ;
self.streamData ← NIL ;
self.streamProcs ← IOUtils.closedStreamProcs;
};
];
CloseFileDataForStream[fileData: fsData.fileData, fsData: fsData];
fsData ← NIL ;
self.streamData ← NIL ;
self.streamProcs ← IOUtils.closedStreamProcs;
};
Procs that are called via the property list mechanism.
GetLength: PUBLIC PROC [self: STREAM] RETURNS [length: INT] = {
selfData: FSDataHandle ← NARROW[self.streamData];
IF selfData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self];
length ← EstablishFileLength[fileData: selfData.fileData ] ;
selfData ← NIL ;
};
clearHighBits: CARDINAL = (bytesPerFilePage-1);
maxLength: INT = INT.LAST - bytesPerFilePage;
SetLength: PUBLIC PROC [self: STREAM, length: INT] = {
Note: do not reduce the size of a shortened file until stream closed.
ENABLE FS.Error => {
convertFStoIOError [self, error];
};
fsData: FSDataHandle ← NARROW[self.streamData];
IF fsData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self];
IF length NOT IN [0 .. maxLength] THEN ERROR IO.Error[BadIndex, self];
SetLengthUnderMonitor[fileData: fsData.fileData, length: length];
IF fsData.index+fsData.currentNode.firstFileByteInBuffer > length THEN {
If old index was past EOF, then move it to EOF.
We leave the cloned read stream alone: if it does not do a setPosition then
it will get an EOF on its next read.
fsData.index ← 0 ;
SetIndex[self: self, index: length];
};
fsData ← NIL ;
};
RoundUpToPages: PROC [bytes: INT] RETURNS [INT] = INLINE {
bytes ← bytes + (bytesPerFilePage-1);
LOOPHOLE[bytes, LongNumber[pair]].lo ←
BITAND[LOOPHOLE[bytes, LongNumber[pair]].lo, clearLowBits];
RETURN[bytes];
};
PagesForRoundUpBytes: PROC [bytes: INT] RETURNS [INT] = INLINE {
RETURN[RoundUpToPages[bytes]/bytesPerFilePage];
};
SetLengthUnderMonitor: ENTRY PROC [fileData: FileDataHandle, length: INT] = {
ENABLE UNWIND => NULL;
newFileBytes: INT = RoundUpToPages[length];
oldFileLength: INT ;
nowNode: BufferNodeHandle ← fileData.firstBufferNode;
writeData: FSDataHandle ← fileData.writeStreamData ;
IF writeData # NIL AND writeData.currentNode.didPut THEN {
CleanupAfterPut logic is copied here, but we cannot call CleanupAfterPut
because it is an ENTRY
currentNode: BufferNodeHandle = writeData.currentNode;
currentNode.bufferDirty ← TRUE;
currentNode.didPut ← FALSE;
IF writeData.index > currentNode.dataBytesInBuffer THEN {
currentNode.dataBytesInBuffer ← writeData.index;
fileData.fileLength ← currentNode.firstFileByteInBuffer + writeData.index };
currentNode.didPut ← FALSE };
oldFileLength ← fileData.fileLength ;
fileData.fileLength ← length;
IF length < fileData.validBytesOnDisk THEN fileData.validBytesOnDisk ← length ;
grow file if needed
IF length > fileData.byteSize THEN {
fileData.byteSize ← newFileBytes;
SetFileSize[fileData.fileHandle, fileData.byteSize];
};
Look through nodes and adjust those past EOF or with EOF in buffer
UNTIL nowNode = NIL DO
IF (nowNode.status # invalid) THEN {
IF (nowNode.firstFileByteInBuffer+nowNode.bufferBytes > length) THEN {
IF nowNode.firstFileByteInBuffer >= length THEN {
All of the buffer is past EOF.
Set dataBytesInBuffer to 0 so that gets will find themselves at EOF,
and clean the node to avoid redundant write
nowNode.dataBytesInBuffer ← 0;
nowNode.bufferDirty ← FALSE ;
nowNode.didPut ← FALSE ;
}
ELSE {
EOF is in (or just past) this buffer
nowNode.dataBytesInBuffer ← length - nowNode.firstFileByteInBuffer ;
IF nowNode.didPut THEN {
nowNode.didPut ← FALSE ;
nowNode.bufferDirty ← TRUE ;
};
};
}
ELSE {
all of node is in the file
nowNode.dataBytesInBuffer ← nowNode.bufferBytes ;
};
};
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
writeData ← NIL ;
};
EraseChar: PUBLIC PROC [self: STREAM, char: CHAR] = {
index: INT = GetIndex[self];
IF index = 0 THEN ERROR IO.Error[IllegalBackup, self];
SetIndex[self, index-1];
IF GetChar[self] # char THEN {PutChar[self, '\\]; PutChar[self, char]}
ELSE SetIndex[self, index-1] };
Backup: PUBLIC PROC [self: STREAM, char: CHAR] = {
selfData: FSDataHandle ← NARROW[self.streamData];
index: INT;
IF selfData.streamIsClosed THEN ERROR IO.Error[StreamClosed, self];
index ← GetIndex[self];
IF index = 0 THEN ERROR IO.Error[IllegalBackup, self];
SetIndex[self, index-1];
IF GetChar[self] # char THEN ERROR IO.Error[IllegalBackup, self];
SetIndex[self, index-1];
selfData ← NIL ;
};
CloseFileDataForStream: ENTRY PROC [fileData: FileDataHandle, fsData: FSDataHandle, forceCleanUp: BOOLFALSE] = {
Processing for "Close" that must be done under the monitor.
forceCleanUp being TRUE is when Close has caught UNWIND, and we should do all we can to blow away the data structures.
ENABLE UNWIND => BROADCAST fileData.somethingHappened;
needDeleted: INTIF fileData.numberOfStreams = 0
THEN INT.LAST
ELSE fileData.streamBufferParms.nBuffers ;
lastNode: BufferNodeHandle ← NIL ;
node: BufferNodeHandle ;
IF (node ← fsData.readAheadNode) # NIL THEN {
make sure the read-ahead has completed
WHILE node.useCount = 1 AND node.status # valid AND
node.status # invalid AND node.status # needsSequentialRead DO
WAIT fileData.somethingHappened ;
ENDLOOP;
node.useCount ← node.useCount-1 ;
};
fsData.readAheadNode ← NIL;
IF ~forceCleanUp AND fsData.isWriteStream AND fileData.accessRights = $write AND
fileData.streamOptions[truncatePagesOnClose] THEN {
SetFileSize[fileData.fileHandle, fileData.fileLength] ;
};
We are now committed to closing the stream. Our last non-internal operation has completed (the SetFileSize).
IF (node ← fsData.currentNode) # NIL THEN node.useCount ← node.useCount-1 ;
fsData.currentNode ← NIL ;
look for up to two buffers to free if another stream is around
else, free all buffers
node ← fileData.firstBufferNode ;
UNTIL node = NIL OR needDeleted = 0 DO
IF node.useCount = 0 THEN {
TRUSTED{VM.Free[node.bufferInterval]};
IF fileData.firstBufferNode = node
THEN fileData.firstBufferNode ← node.nextBufferNode
ELSE lastNode.nextBufferNode ← node.nextBufferNode ;
needDeleted ← needDeleted - 1 ;
}
ELSE {
lastNode ← node ;
};
node ← node.nextBufferNode ;
ENDLOOP;
fsData.streamIsClosed ← TRUE;
IF (fileData.numberOfStreams ← fileData.numberOfStreams - 1) = 0 THEN {
fileDataTemp: FS.OpenFile ← fileData.fileHandle ;
fileData.fileHandle ← FS.nullOpenFile ;
IF fileData.streamOptions[closeFSOpenFileOnClose] THEN {
fileDataTemp.Close[! FS.Error => IF forceCleanUp THEN CONTINUE;];
-- let finialization close the file is we fail.
};
};
BROADCAST fileData.somethingHappened ;
fsData ← NIL ;
};
Insure that all buffers, except the one corresponding to the currentNode,
have been written to disk.
Normal cases are too return immediately when no writes are outstanding,
or to wait until one finishes.
FinishWrites: ENTRY PROC [fileData: FileDataHandle, fsData: FSDataHandle,
currentNode: BufferNodeHandle, parallelWritesOK: BOOLFALSE] = {
ENABLE UNWIND => NULL;
nowNode: BufferNodeHandle ← fileData.firstBufferNode;
IF fileData.accessRights = read THEN RETURN ;
IF currentNode = NIL THEN {   -- this is only true when called via ForceOut
or when SetUpNodes can't get a node
WHILE fileData.writeCount # 0 DO
WAIT fileData.somethingHappened ;
ENDLOOP;
};
UNTIL nowNode = NIL DO
IF nowNode # currentNode THEN {
WHILE (nowNode.status = parallelReadActive) OR (nowNode.status = needsParallelRead) OR ( ~parallelWritesOK AND ((nowNode.status = parallelWriteActive) OR (nowNode.status = needsParallelWrite))) DO
WAIT fileData.somethingHappened;
ENDLOOP;
IF (nowNode.status = needsSequentialWrite OR (currentNode = NIL AND nowNode.status # invalid)) THEN {
What has happened is that an asynchronous write has failed,
or we are trying to flush all dirty pages in ForceOut.
The parallel process has given up, and we are about to re-do
the write under the monitor to get the signal and the stack
correct so that the client sees a correct view of the error.
nowNode.status ← sequentialWriteActive ;
TRUSTED{WriteFilePages[f: fsData.fileData.fileHandle, node: nowNode ! UNWIND => nowNode.status ← valid] };
nowNode.bufferDirty ← FALSE ;
nowNode.status ← valid ;
};
};
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
fsData ← NIL ;
};
nodeNowAvailable: ENTRY PROC [fileData: FileDataHandle] RETURNS [BOOL]= {
ENABLE UNWIND => NULL;
nowNode: BufferNodeHandle ;
DO
nowNode ← fileData.firstBufferNode;
UNTIL nowNode = NIL DO
IF ( nowNode.status = valid OR nowNode.status = invalid) AND
nowNode.useCount = 0 THEN RETURN [TRUE];
IF nowNode.status = needsSequentialWrite THEN RETURN[FALSE];
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
WAIT fileData.somethingHappened;
ENDLOOP;
};
FinishRead: ENTRY PROC [fileData: FileDataHandle,
node: BufferNodeHandle, bufferSize: INT] = INLINE {
ENABLE UNWIND => NULL;
node.status ← valid;
node.dataBytesInBuffer ← bufferSize ;
BROADCAST fileData.somethingHappened;
};
FinishBadRead: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
node.status ← invalid;
node.dataBytesInBuffer ← 0 ;
BROADCAST fileData.somethingHappened;
};
a pre-read has failed. Mark it needsSequentialRead if a stream needs it
to continue, or just ignore it if it was a real preread.    
FinishBadPreRead: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
IF (fileData.firstReadStream # NIL AND fileData.firstReadStream.currentNode = node) OR
(fileData.writeStreamData # NIL AND fileData.writeStreamData.currentNode = node) THEN {
node.status ← needsSequentialRead;
node.dataBytesInBuffer ← 0 ;
BROADCAST fileData.somethingHappened;
}
ELSE {
IF fileData.firstReadStream # NIL AND fileData.firstReadStream.readAheadNode = node THEN {
fileData.firstReadStream.readAheadNode ← NIL ;
node.useCount ← 0 ;
};
IF fileData.writeStreamData # NIL AND fileData.writeStreamData.readAheadNode = node THEN {
fileData.writeStreamData.readAheadNode ← NIL ;
node.useCount ← 0 ;
};
node.status ← invalid;
BROADCAST fileData.somethingHappened;
};
};
markNodeNotWritten: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
node.status ← needsSequentialWrite ;
node.bufferDirty ← TRUE ;
fileData.writeCount ← fileData.writeCount - 1;
BROADCAST fileData.somethingHappened;
};
markNodeWritten: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle] =
INLINE {
ENABLE UNWIND => NULL;
node.status ← valid ;
fileData.writeCount ← fileData.writeCount - 1;
BROADCAST fileData.somethingHappened;
};
bumpWriteCount: ENTRY PROC [fileData: FileDataHandle] = INLINE {
ENABLE UNWIND => NULL;
fileData.writeCount ← fileData.writeCount + 1;
};
WaitForOneBufferNotWriting: ENTRY PROC [fileData: FileDataHandle] = INLINE {
ENABLE UNWIND => NULL;
WHILE fileData.writeCount >= fileData.streamBufferParms.nBuffers DO
WAIT fileData.somethingHappened;
ENDLOOP;
};
WaitForParallelWriteToComplete: ENTRY PROC [fileData: FileDataHandle,
node: BufferNodeHandle] = INLINE {
ENABLE UNWIND => NULL;
WHILE node.status = needsParallelWrite OR node.status = parallelWriteActive DO
WAIT fileData.somethingHappened;
ENDLOOP;
};
ProcessNode: PUBLIC PROC [ fileData: FileDataHandle, node: BufferNodeHandle ] = {
IF node.status = needsParallelRead THEN {
node.status ← parallelReadActive ;
ReadAhead [node: node, fileData: fileData];
RETURN ;
};
IF node.status = needsParallelWrite THEN {
node.status ← parallelWriteActive ;
parallelWriteBuffer [ node: node, fileData: fileData ] ;
RETURN ;
};
ERROR ;
};
parallelWriteBuffer: PROC [node: BufferNodeHandle, fileData: FileDataHandle] = {
ENABLE FS.Error => {
By catching and ignoring FS errors, we insure that the write will
later be done in the process of the client so that signals will
look correct.
markNodeNotWritten[fileData: fileData, node: node];
GOTO done;
};
TRUSTED{WriteFilePages[f: fileData.fileHandle, node: node] };
markNodeWritten[fileData: fileData, node: node];
EXITS
done => RETURN
};
ReadAhead: PROC [node: BufferNodeHandle, fileData: FileDataHandle] = {
ENABLE FS.Error => {
on FS errors, invalidate the pre-read
FinishBadPreRead[fileData: fileData, node: node];
GOTO done;
};
bytesToRead: INT ;
fileByte: INT = node.firstFileByteInBuffer ;
IF (bytesToRead ←
MIN[fileData.fileLength - fileByte, INT[node.bufferBytes]]) > 0 THEN
TRUSTED{ReadFilePages[f: fileData.fileHandle, from: fileByte,
numPages: PagesForRoundUpBytes[bytesToRead], to: node.buffer,
interval: node.bufferInterval]};
FinishRead[fileData: fileData, node: node, bufferSize: bytesToRead];
EXITS
done => RETURN
};
SetupBuffer: PUBLIC PROC [fileData: FileDataHandle,
fsData: FSDataHandle, fileByte: INT] RETURNS [currentNode: BufferNodeHandle] = {
For write streams, didPut = FALSE if on entry (someone else called CleanupAfterPut).
Arranges buffer so that fileByte (must be buffer-aligned) is the first byte in it.
If buffer is dirty, writes it to file.
Maintains invariants of dataBytesInBuffer, bufferBytes, and
firstFileByteInBuffer in the face of all this. DOES NOT update index.
Called from AdvanceBuffer, SetIndex, SetLength,
StreamFromOpenStream and StreamFromOpenFile.
node: BufferNodeHandle ← fsData.currentNode ;
readAheadNode: BufferNodeHandle;
currentNodeStatus: FileStreamPrivate.NodeStatus;
success: BOOLFALSE ;
IF node = NIL THEN node ← fileData.firstBufferNode ;
write buffer if needed
IF node.bufferDirty AND fsData.isWriteStream THEN {
See if there are buffers that must be written sequentially.
FinishWrites[fileData: fileData, fsData: fsData, currentNode: node, parallelWritesOK: TRUE];
Extend file if we are about to write over it.
IF node.dataBytesInBuffer + node.firstFileByteInBuffer > fileData.byteSize THEN {
fileData.byteSize ← node.dataBytesInBuffer + node.firstFileByteInBuffer ;
SetFileSize[fileData.fileHandle, fileData.byteSize] ;
};
IF fileData.validBytesOnDisk < node.dataBytesInBuffer + node.firstFileByteInBuffer THEN
fileData.validBytesOnDisk ← node.dataBytesInBuffer + node.firstFileByteInBuffer ;
WaitForParallelWriteToComplete[fileData: fileData, node: node];
node.status ← needsParallelWrite ;
node.bufferDirty ← FALSE ;
bumpWriteCount[ fileData: fileData];
FileStreamPrivate.StartRequest [ fileData: fileData, node: node ] ;
WaitForOneBufferNotWriting[fileData: fileData];
};
WHILE success # TRUE DO
[success, currentNode, readAheadNode] ←
SetUpNodes[fileData: fileData, fsData: fsData, fileByte: fileByte];
Copy the status out of the node before the tests. Since the status
can change at any time, it would be possible to have none of the
arms of the SELECT executed when there was a pre-read to do.
(This may not be necessary anymore since I changed the way the
SELECT is done, but it doesn't hurt).
IF success THEN {
currentNodeStatus ← currentNode.status ;
SELECT currentNodeStatus FROM
invalid,needsParallelRead,parallelReadActive,needsSequentialRead,sequentialReadActive=>{
IF readAheadNode = NIL THEN {
makeNodeValid[fileData: fileData, node: currentNode ];
}
ELSE {
myPriority: Process.Priority ;
myPriority ← Process.GetPriority[];
Process.SetPriority[Process.prioritySysForeground];
FileStreamPrivate.StartRequest [ fileData: fileData, node: readAheadNode ];
makeNodeValid[fileData: fileData, node: currentNode ];
Process.SetPriority[myPriority];
};
};
valid,needsParallelWrite,parallelWriteActive,needsSequentialWrite,sequentialWriteActive=>{
IF readAheadNode # NIL THEN {
FileStreamPrivate.StartRequest [ fileData: fileData, node: readAheadNode ];
};
};
ENDCASE ;
fsData ← NIL ;
} ELSE {
wait around and try to get a node that is ok to re-use.
Note that just because one becomes free does not mean that SetUpNodes
will find it next time around the loop (another stream may have grabbed it).
If the buffers get into an error state, then nodeNowAvailable returns FALSE
and we use FinishWrites to clean up.
IF NOT nodeNowAvailable[fileData: fileData] THEN
FinishWrites[fileData: fileData, fsData: fsData, currentNode: NIL];
};
ENDLOOP;
};
makeNodeValid: PROC [fileData: FileDataHandle, node: BufferNodeHandle] = {
bytesToRead: INT ;
WHILE node.status # valid DO
IF doTheRead[fileData: fileData, node: node] THEN {
bytesToRead ← MIN[fileData.fileLength - node.firstFileByteInBuffer, INT[node.bufferBytes]];
IF fileData.validBytesOnDisk <= node.firstFileByteInBuffer THEN {
Avoid read: the data is trash on disk. We are extending the file anyway.
FinishRead[fileData: fileData, node: node, bufferSize: bytesToRead];
}
ELSE {
IF bytesToRead > 0 THEN TRUSTED{
ReadFilePages[f: fileData.fileHandle,
from: node.firstFileByteInBuffer,
numPages: PagesForRoundUpBytes[bytesToRead],
to: node.buffer,
interval: node.bufferInterval
! FS.Error => {
FinishBadRead[ fileData: fileData, node: node];
};
];
};
FinishRead[fileData: fileData, node: node, bufferSize: bytesToRead];
};
};
ENDLOOP;
};
doTheRead: ENTRY PROC [fileData: FileDataHandle, node: BufferNodeHandle]
RETURNS [BOOL] = INLINE {
ENABLE UNWIND => NULL;
IF node.status = invalid OR node.status = needsSequentialRead THEN {
node.status ← sequentialReadActive ;
RETURN [TRUE];
};
IF node.status = valid THEN RETURN [ FALSE ] ELSE WAIT fileData.somethingHappened ;
RETURN [ FALSE ] ;
};
SetUpNodes: ENTRY PROC [fileData: FileDataHandle, fsData: FSDataHandle, fileByte: INT]
RETURNS [success: BOOLTRUE, currentNode: BufferNodeHandle ← NIL,
nextNode: BufferNodeHandle ← NIL] = {
This procedure runs under the monitor.
It looks for buffers for the current and next nodes.
The node returned in currentNode is the one to use as current. If it is
marked "active", then the caller must fill it before use.
The node nextNode is returned as NIL if no preread is needed. If non-NIL,
the caller should arrange to preread into this buffer.
ENABLE UNWIND => NULL;
nowNode: BufferNodeHandle ← fileData.firstBufferNode;
availableNode: BufferNodeHandle ← NIL ;
availableNodeLRUCount: INT ← 1000000;
maxLRUCount: INT ← 0 ;
node: BufferNodeHandle ;
oldCurrentNode: BufferNodeHandle ← fsData.currentNode ;
oldFirstByteInBuffer: INT = IF fsData.currentNode = NIL THEN -1
ELSE fsData.currentNode.firstFileByteInBuffer;
bufferBytes: INT = nowNode.bufferBytes ;
IF (node ← fsData.currentNode) # NIL THEN node.useCount ← node.useCount-1 ;
IF (node ← fsData.readAheadNode) # NIL THEN node.useCount ← node.useCount-1 ;
fsData.currentNode ← NIL ;
fsData.readAheadNode ← NIL;
UNTIL nowNode = NIL DO
firstByte: INT ← nowNode.firstFileByteInBuffer;
IF nowNode.LRUCount > maxLRUCount THEN maxLRUCount ← nowNode.LRUCount ;
SELECT TRUE FROM
buffer already has correct position in file
firstByte = fileByte => {
currentNode ← nowNode ;
fsData.currentNode ← nowNode ;
nowNode.useCount ← nowNode.useCount+1 ;
};
buffer is next after current
firstByte = fileByte+bufferBytes => {
IF fileData.streamBufferParms.nBuffers > 1 THEN {
nowNode.useCount ← nowNode.useCount+1 ;
fsData.readAheadNode ← nowNode ;
}
ELSE {
IF( nowNode.status = valid OR nowNode.status = invalid)
AND nowNode.useCount = 0 AND
nowNode.LRUCount <= availableNodeLRUCount THEN {
availableNodeLRUCount ← nowNode.LRUCount ;
availableNode ← nowNode ;
};
};
};
buffer not "near" stream pointer (tested in above two cases)
and it is not active, and it is not near the other stream (if it exits)
( nowNode.status = valid OR nowNode.status = invalid) AND
nowNode.useCount = 0 AND nowNode.LRUCount <= availableNodeLRUCount => {
availableNodeLRUCount ← nowNode.LRUCount ;
availableNode ← nowNode ;
};
ENDCASE; -- SELECT TRUE
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
IF currentNode = NIL THEN {
IF availableNode = NIL THEN RETURN[FALSE];
currentNode ← availableNode ;
currentNode.LRUCount ← maxLRUCount + 1 ;
currentNode.useCount ← currentNode.useCount+1 ;
currentNode.status ← invalid ;
currentNode.firstFileByteInBuffer ← fileByte ;
availableNode ← NIL ;
fsData.currentNode ← currentNode ;
};
preread if a sequence has been established.
IF fileData.streamBufferParms.nBuffers > 1 AND
(oldFirstByteInBuffer+bufferBytes = fileByte) AND
(fsData.lastFirstByteInBuffer+bufferBytes = oldFirstByteInBuffer) AND
(fileData.fileLength > fileByte+bufferBytes) AND
(fileData.validBytesOnDisk > fsData.lastFirstByteInBuffer+bufferBytes) THEN {
IF fsData.readAheadNode # NIL THEN {
a node already points to the right place in the file
IF fsData.readAheadNode.status = invalid THEN {
nextNode ← fsData.readAheadNode ;
nextNode.status ← needsParallelRead ;
};
}
ELSE {
IF availableNode = NIL THEN {
availableNodeLRUCount ← 1000000;
nowNode ← fileData.firstBufferNode;
UNTIL nowNode = NIL DO
IF ( nowNode.status = valid OR nowNode.status = invalid) AND
nowNode.useCount = 0 AND
nowNode.LRUCount <= availableNodeLRUCount THEN
availableNode ← nowNode ;
nowNode ← nowNode.nextBufferNode ;
ENDLOOP;
};
IF availableNode # NIL THEN {
nextNode ← availableNode ;
nextNode.status ← needsParallelRead ;
nextNode.LRUCount ← maxLRUCount + 1 ;
nextNode.useCount ← nextNode.useCount+1 ;
nextNode.firstFileByteInBuffer ← fileByte + bufferBytes ;
fsData.readAheadNode ← nextNode ;
};
};
};
fsData.lastFirstByteInBuffer ← oldFirstByteInBuffer ;
fsData ← NIL ;
};
ForceOut: PROC [fsData: FSDataHandle] = {
Called from Flush for write streams, or Close for any stream
This is the only proc that sets byte length, and only proc that finishes trans.
fileData: FileDataHandle = fsData.fileData;
node: BufferNodeHandle ← fsData.currentNode ;
IF fsData.isWriteStream THEN CleanupAfterPut[fileData: fileData, selfData: fsData];
IF node.dataBytesInBuffer + node.firstFileByteInBuffer > fileData.byteSize THEN {
fileData.byteSize ← node.dataBytesInBuffer + node.firstFileByteInBuffer ;
SetFileSize[fileData.fileHandle, fileData.byteSize] ;
};
IF fileData.validBytesOnDisk < node.dataBytesInBuffer + node.firstFileByteInBuffer THEN
fileData.validBytesOnDisk ← node.dataBytesInBuffer + node.firstFileByteInBuffer ;
This call does the writes under the monitor lock.
This should be true for read streams since you want to stop the writer from
dirting buffers.
For write streams, you could get by without the lock provided you were
extremely careful about a ForceOut on the read stream, a StreamFromOpenStream
(it will allocate more buffers). The easy way out is to use the monitor.
FinishWrites[ fileData: fileData, fsData: fsData, currentNode: NIL];
IF fsData.isWriteStream AND fileData.accessRights = $write AND
fileData.fileLength # fileData.byteLength THEN {
fileData.byteLength ← fileData.fileLength;
fileData.fileHandle.SetByteCountAndCreatedTime[fileData.byteLength] };
fsData ← NIL ;
};
SaveStreamError: PUBLIC PROCEDURE [self: STREAM, error: FS.ErrorDesc] = {
WITH self.streamData SELECT FROM
fsData: FSDataHandle => fsData.FSErrorDesc ← error ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
ErrorFromStream: PUBLIC PROCEDURE [self: STREAM] RETURNS [FS.ErrorDesc] = {
WITH self.streamData SELECT FROM
fsData: FSDataHandle => RETURN [fsData.FSErrorDesc];
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
SetStreamClassData: PUBLIC PROCEDURE [self: STREAM, data: REF ANY] = {
WITH self.streamData SELECT FROM
fsData: FSDataHandle => fsData.StreamClassData ← data ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
GetStreamClassData: PUBLIC PROCEDURE [self: STREAM] RETURNS [data: REF ANY] = {
WITH self.streamData SELECT FROM
fsData: FSDataHandle => RETURN [fsData.StreamClassData] ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
SetFinalizationProc: PUBLIC PROCEDURE [self: STREAM, proc: FileStream.FinalizationProc] = {
WITH self.streamData SELECT FROM
fsData: FSDataHandle => fsData.FinalizationProc ← proc ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
GetFinalizationProc: PUBLIC PROCEDURE [self: STREAM] RETURNS
[proc: FileStream.FinalizationProc] = {
WITH self.streamData SELECT FROM
fsData: FSDataHandle => RETURN [fsData.FinalizationProc] ;
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
bufferBad: PROCEDURE [status: FileStreamPrivate.NodeStatus] RETURNS [bad: BOOL] = INLINE {
SELECT status FROM
valid, needsParallelWrite, parallelWriteActive, needsSequentialWrite, sequentialWriteActive => RETURN[FALSE];
ENDCASE => RETURN[TRUE];
};
Talking to FS
ReadFilePages: PROC [f: FS.OpenFile, from: ByteNumber, numPages: INT, to: LONG POINTER, interval: VM.Interval] = INLINE {
p: PageNumber = from/bytesPerFilePage;
TRUSTED{f.Read[from: p, nPages: numPages, to: to]};
};
WriteFilePages: PROC [f: FS.OpenFile, node: BufferNodeHandle] = {
numPages: INT = PagesForRoundUpBytes[node.dataBytesInBuffer];
interval: VM.Interval = node.bufferInterval;
firstPage: VM.PageNumber ← interval.page ;
lastPage: VM.PageNumber ← interval.page+numPages-1 ;
from: LONG POINTER ;
IF node.firstFileByteInBuffer < 0 THEN RETURN; -- not a valid buffer to write
TRUSTED {from ← VM.AddressForPageNumber[firstPage]} ;
f.Write[from: from, nPages: lastPage-firstPage+1,
to: (node.firstFileByteInBuffer +
VM.BytesForPages[firstPage-interval.page])/bytesPerFilePage];
};
SetFileSize: PROC [f: FS.OpenFile, byteSize: ByteCount] = {
f.SetPageCount[pages: (byteSize+bytesPerFilePage-1)/bytesPerFilePage];
};
GetFileLock: PROC [f: FS.OpenFile] RETURNS [FS.Lock] = {
RETURN [f.GetInfo[].lock]
};
ProcHandleFromAccessRights: PUBLIC PROC [accessRights: FS.Lock]
RETURNS [ procs: FileStreamPrivate.ProcHandle] = {
SELECT accessRights FROM
read => RETURN [nucleusFileIOReadProcs];
write => RETURN [nucleusFileIOAllProcs];
ENDCASE => RETURN[NIL];
};
Stream creation
StreamFromOpenFile: PUBLIC PROC [openFile: FS.OpenFile, accessRights: FS.Lock, initialPosition: FS.InitialPosition, streamOptions: FS.StreamOptions, streamBufferParms: FS.StreamBufferParms, extendFileProc: FS.ExtendFileProc] RETURNS [stream: STREAM] = {
no monitors are needed in this code since, until this proc returns,
no other code can refer to the streams
pageAllocation: PageCount;
byteLength: ByteCount;
fileName: ROPE = openFile.GetName[].fullFName;
fsData: FSDataHandle;
fsDataFile: FileDataHandle ;
node: BufferNodeHandle ;
Index must always be less than 64K, so we have to clip off a page from the max.
IF streamBufferParms.vmPagesPerBuffer = 128 THEN
streamBufferParms.vmPagesPerBuffer ← 127;
IF accessRights = $write AND GetFileLock[openFile] # $write THEN
ERROR FS.Error [[lock, $wrongLock, fileName]];
[pages: pageAllocation, bytes: byteLength] ← openFile.GetInfo[];
fsData ← NEW[Data ← []];
fsDataFile ← NEW[FileData ← [
fileName: fileName,
accessRights: accessRights,
fileLength: byteLength,
fileHandle: openFile,
streamBufferParms: streamBufferParms,
extendFileProc: extendFileProc,
streamOptions: streamOptions,
byteLength: byteLength,
byteSize: pageAllocation*bytesPerFilePage,
validBytesOnDisk: byteLength]
];
IF fsDataFile.byteLength > fsDataFile.byteSize THEN ERROR;
fsData.fileData ← fsDataFile ;
fsDataFile.firstBufferNode ← node ←
CreateBufferSpace[streamBufferParms.vmPagesPerBuffer, accessRights];
FOR i: INT IN [2..streamBufferParms.nBuffers] DO
node.nextBufferNode ←
CreateBufferSpace[streamBufferParms.vmPagesPerBuffer, accessRights];
node ← node.nextBufferNode ;
ENDLOOP;
stream ← IO.CreateStream[FileStreamPrivate.ProcHandleFromAccessRights[accessRights], fsData];
IOUtils.StoreData[self: stream, key: $Name, data: fsDataFile.fileName];
IF accessRights = $write
THEN {
fsDataFile.writeStreamData ← fsData ;
fsData.isWriteStream ← TRUE ;
IF fsDataFile.byteSize = 0 THEN {
fsDataFile.byteSize ← NewByteSize[fsDataFile.byteSize];
SetFileSize[fsDataFile.fileHandle, fsDataFile.byteSize];
};
}
ELSE {
fsDataFile.firstReadStream ← fsData;
};
IF initialPosition = start
THEN {
[] ← FileStreamPrivate.SetupBuffer[fileData: fsDataFile, fsData: fsData, fileByte: 0]
}
ELSE {
initialPosition = end
node ← FileStreamPrivate.SetupBuffer[fileData: fsDataFile,
fsData: fsData, fileByte: PageContainingLastByte[fsDataFile.fileLength]];
fsData.index ← node.dataBytesInBuffer;
};
IF streamOptions[tiogaRead] AND byteLength > 0 THEN {
isTioga: BOOL; len: INT;
[yes: isTioga, len: len] ← IsThisThingATiogaFile[stream];
IF isTioga THEN {
IF accessRights = $read
THEN {
make length look changed by sneaky call to SetLength (not in stream procs).
since stream is opened for read only, this call won't change the length in the file.
FileStream.SetLength[stream, len];
fsDataFile.tiogaReader ← TRUE
}
ELSE {
you can't incrementally update a Tioga file with IO!
stream.Close[];
ERROR FS.Error[[user, $cantUpdateTiogaFile, fileName]];
}
}
};
IF FileStreamPrivate.DoFinalization THEN {
SafeStorage.EnableFinalization[fsData];
};
fsData.ConvertFStoIOErrors ← TRUE;
fsData ← NIL ;
RETURN[stream];
};--StreamFromOpenFile
PageContainingLastByte: PROC [byteLen: INT] RETURNS [INT] = INLINE {
IF byteLen = 0 THEN RETURN[0] ELSE {
byteLen ← byteLen - 1;
LOOPHOLE[byteLen, LongNumber[pair]].lo ←
BITAND[LOOPHOLE[byteLen, LongNumber[pair]].lo, clearLowBits];
RETURN[byteLen]
};
};
OpenFileFromStream: PUBLIC PROC [self: STREAM] RETURNS [FS.OpenFile] = {
WITH self.streamData SELECT FROM
fsData: FSDataHandle => RETURN [fsData.fileData.fileHandle];
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
StreamFromOpenStream: PUBLIC PROC [self: STREAM] RETURNS [stream: STREAM] = {
newData: FSDataHandle ;
filePos: INT ;
WITH self.streamData SELECT FROM
selfData: FSDataHandle => {
fileData: FileDataHandle = selfData.fileData;
IF NOT selfData.isWriteStream OR fileData.firstReadStream # NIL THEN
ERROR FS.Error[[client, $notImplemented, "self is not a write stream, or there already is a read stream"]];
newData ← NEW[Data ← [ ] ];
newData.fileData ← fileData ;
stream ← IO.CreateStream[FileStreamPrivate.ProcHandleFromAccessRights[$read],
newData];
IOUtils.StoreData[self: stream, key: $Name, data: fileData.fileName];
fileData.firstReadStream ← newData;
filePos ← SetUpClonedStream[fileData: fileData, fsData: selfData];
[] ← FileStreamPrivate.SetupBuffer[fileData: fileData, fsData: newData,
fileByte: selfData.currentNode.firstFileByteInBuffer] ;
newData.index ← selfData.index ;
IF FileStreamPrivate.DoFinalization THEN {
SafeStorage.EnableFinalization[newData];
};
newData.ConvertFStoIOErrors ← TRUE;
newData ← NIL ;
};
ENDCASE => ERROR IO.Error[NotImplementedForThisStream, self];
};
SetUpClonedStream: ENTRY PROC [fileData: FileDataHandle, fsData: FSDataHandle] RETURNS [filePos: INT]= {
ENABLE UNWIND => NULL;
node: BufferNodeHandle ← fileData.firstBufferNode;
Find last node
UNTIL node.nextBufferNode = NIL DO
node ← node.nextBufferNode;
ENDLOOP ;
Allocate some more nodes.
FOR i:INT IN [1..fileData.streamBufferParms.nBuffers] DO
node.nextBufferNode ←
CreateBufferSpace[fileData.streamBufferParms.vmPagesPerBuffer, read];
node ← node.nextBufferNode;
ENDLOOP;
fileData.numberOfStreams ← fileData.numberOfStreams + 1 ;
filePos ← fsData.index + fsData.currentNode.firstFileByteInBuffer ;
fsData ← NIL ;
};
Buffer management
CreateBufferSpace: PROC [vmPagesPerBuffer: INT [1 .. 128], accessRights: FS.Lock] RETURNS [BufferNodeHandle] = {
vmPages: INTMIN[vmPagesPerBuffer, maxVMPagesPerBuffer] ;
newBuffer: BufferNodeHandle ← NEW[BufferNode];
allocateCounter: INT ← 0;
newBuffer.bufferInterval ← VM.Allocate[count: vmPages
! VM.CantAllocate => {
We cannot accept anything but the right size interval.
The program assumes that all buffers are the same size.
Process.Pause[4];
IF (allocateCounter ← allocateCounter + 1) < 100 THEN RETRY;
};
];
TRUSTED {
newBuffer.buffer ← VM.AddressForPageNumber[newBuffer.bufferInterval.page];
IF accessRights = write THEN VM.SwapIn[newBuffer.bufferInterval];
};
newBuffer.bufferBytes ← VM.BytesForPages[pages: vmPages];
RETURN[newBuffer]
};
NewByteSize: PROC [byteCount: ByteCount] RETURNS [ByteCount] = {
RETURN [byteCount+5120];
};
Tioga
IsThisThingATiogaFile: PROC [h: STREAM] RETURNS [yes: BOOL, len: INT] = {
pos, length: INT;
{ -- block so EXITS code can use pos, len, and length.
controlHeaderId: ARRAY [0..fileIdSize) OF CHAR = [235C,312C];
controlTrailerId: ARRAY [0..fileIdSize) OF CHAR = [205C,227C];
commentHeaderId: ARRAY [0..fileIdSize) OF CHAR = [0C,0C];
fileIdSize: NAT = 2;
numTrailerLengths: NAT = 3; -- <file-props-length> <data-length> <file-length>
endSize: NAT = fileIdSize+numTrailerLengths*4; -- trailer plus three lengths
ReadLen: PROC [h: STREAM] RETURNS [INT] = {
start: PACKED ARRAY [0..3] OF CHARACTER;
start[0] ← h.GetChar[]; start[1] ← h.GetChar[];
start[2] ← h.GetChar[]; start[3] ← h.GetChar[];
RETURN [LOOPHOLE[start]] };
commentStart, commentLen, propsLen, controlLen, controlEnd: INT;
pos ← h.GetIndex[]; -- save position to restore later
length ← h.GetLength[]; -- length including any trailer stuff
controlEnd ← length-endSize; -- where the trailer info starts
IF controlEnd <= 0 THEN GOTO fail; -- too small
h.SetIndex[controlEnd]; -- set up to read the trailer
FOR i:NAT IN [0..fileIdSize) DO -- read the controlTrailerId
IF h.GetChar[] # controlTrailerId[i] THEN GOTO fail;
ENDLOOP;
IF (propsLen ← ReadLen[h]) NOT IN [0..controlEnd) THEN GOTO fail;
IF (commentStart ← ReadLen[h]) NOT IN [0..controlEnd) THEN GOTO fail;
IF ReadLen[h] # length THEN GOTO fail;
IF commentStart > 0
THEN { -- may have padded text with a null
h.SetIndex[commentStart-1];
len ← IF h.GetChar[]=0C THEN commentStart-1 ELSE commentStart }
ELSE h.SetIndex[len ← commentStart];
FOR i:NAT IN [0..fileIdSize) DO -- read the commentHeaderId
IF h.GetChar[] # commentHeaderId[i] THEN GOTO fail;
ENDLOOP;
commentLen ← ReadLen[h]; -- the length of the comment section
IF commentStart+commentLen NOT IN [0..controlEnd) THEN GOTO fail;
h.SetIndex[commentStart+commentLen]; -- go to start of control info
FOR i:NAT IN [0..fileIdSize) DO -- check the controlHeaderId
IF h.GetChar[] # controlHeaderId[i] THEN GOTO fail;
ENDLOOP;
controlLen ← ReadLen[h]; -- the length of the control section
IF commentStart+commentLen+controlLen # length THEN GOTO fail;
GOTO succeed;
EXITS
fail => { h.SetIndex[pos]; RETURN [FALSE, length] };
succeed => { h.SetIndex[pos]; RETURN [TRUE, len] };
};
};
Procedure records (never modified)
nucleusFileIOReadProcs: PUBLIC FileStreamPrivate.ProcHandle = IO.CreateStreamProcs[
variety: $input, class: $File,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: NIL, -- not implemented
putBlock: NIL, -- call PutChar
unsafePutBlock: NIL, -- call PutChar
flush: Flush,
reset: Reset,
close: Close,
getIndex: GetIndex,
setIndex: SetIndex,
backup: Backup,
getLength: GetLength
];
nucleusFileIOAllProcs: PUBLIC FileStreamPrivate.ProcHandle = IO.CreateStreamProcs[
variety: $inputOutput, class: $File,
getChar: GetChar,
endOf: EndOf,
charsAvail: CharsAvail,
getBlock: GetBlock,
unsafeGetBlock: UnsafeGetBlock,
putChar: PutChar,
putBlock: PutBlock,
unsafePutBlock: UnsafePutBlock,
flush: Flush,
reset: Reset,
close: Close,
getIndex: GetIndex,
setIndex: SetIndex,
backup: Backup,
getLength: GetLength,
setLength: SetLength,
eraseChar: EraseChar
];
Finalization code
fQ: SafeStorage.FinalizationQueue;
Finalize: PROC = BEGIN
DO
fsData: FSDataHandle ← NARROW [ SafeStorage.FQNext[fQ] ];
streamIsClosed: BOOL ;
forceCleanUp: BOOLFALSE;
streamIsClosed ← fsData.streamIsClosed ;
IF NOT fsData.streamIsClosed AND fsData.currentNode # NIL THEN {
ForceOut[ fsData: fsData
! FS.Error => {forceCleanUp ← TRUE; CONTINUE}];
CloseFileDataForStream[fileData: fsData.fileData, fsData: fsData, forceCleanUp: forceCleanUp
! FS.Error => CONTINUE];
};
IF fsData.isWriteStream THEN {
IF fsData.fileData.writeStreamData = fsData THEN
fsData.fileData.writeStreamData ← NIL ;
}
ELSE {
IF fsData.fileData.firstReadStream = fsData THEN
fsData.fileData.firstReadStream ← NIL ;
};
IF fsData.FinalizationProc # NIL THEN {
fsData.FinalizationProc [ openFile: fsData.fileData.fileHandle,
data: fsData.StreamClassData,
closed: streamIsClosed];
};
fsData ← NIL;
ENDLOOP;
END;
Initialization
start code: start up finalization stuff
IF FileStreamPrivate.DoFinalization THEN {
fQ ← SafeStorage.NewFQ[];
SafeStorage.EstablishFinalization[CODE[Data], 1, fQ];
TRUSTED { Process.Detach[FORK Finalize[]] };
};
END.
CHANGE LOG
Created by MBrown on June 22, 1983 10:08 am
By editing FileIOAlpineImpl.
Changed by MBrown on August 19, 1983 2:44 pm
Close FS file when closing stream (this should really be an option).
Changed by Birrell on August 23, 1983 3:14 pm
In SetFileSize: byteSize/bytesPerFilePage -> (byteSize+bytesPerFilePage-1)/bytesPerFilePage.
Changed by MBrown on August 25, 1983 1:18 pm
In SetIndex: fsData.byteSize < firstBufferByte+fsData.dataBytesInBuffer -> fsData.byteSize <= firstBufferByte+fsData.dataBytesInBuffer. Implemented GetFileLock (was stubbed waiting for FS). In StreamFromOpenFile, if stream open for write and file has no pages, extend it.
Changed by MBrown on September 17, 1983 8:41 pm
Conversion to new IO interface.
Changed by Hagmann on November 22, 1983 4:29 pm
Implement multiple-page stream buffer.
Implement coupled read and write streams on same open file.
Changed data structures in FileStreamPrivate, and fixed references in this module
to the new data structures. This meant changes to nearly every routine.
Close file during StreamOpen if an error occurs in StreamFromOpenFile.
Implement streamOptions and streamBufferParms features.
Added finalization.
Changed name from FileIOFSImpl.
Split out create code to make FileStreamCreateImpl smaller since compiler blows up in pass 3
Changed by Hagmann on November 28, 1983 12:00 pm
Fixed EndOf bug for multiple streams.
Added test for DoFinalization to enable FileStream testing without making a boot file
Changed by Hagmann on December 6, 1983 4:52 pm
Added code for process cache
Changed by Hagmann on December 27, 1983 4:52 pm
Fixed bug (reported by Plass, found by Nix) in unsafegetblock and unsafeputblock
for blocks > 32K.
Changed by Hagmann on December 29, 1983 12:54 pm
Fixed bug (reported and new code by Plass) in AddNAT
Changed by Hagmann on January 3, 1984 11:34 am
Added conditional conversion for FS to IO Errors in convertFStoIOError.
This makes FS errors that occur during stream open appear as FS errors, not IO errors
(fixes bug reported by Nix).
Changed by Hagmann on January 10, 1984 9:04 am
Fixed stream hung problem reported by Willie-Sue and Stewart. SetupBuffer was treating
buffers that are in some stage of writing was not valid. It would wait for them to become
valid (due to a parallel read), or do sequential reads if the parallel read failed. This caused
buffers that were writing in parallel that had a write error to wait forever (the node never
became valid or had a parallel read error).
Changed by Hagmann on January 20, 1984 5:11 pm
Added logic to use the changed/unchanged VM information for writing to disk
Changed by Hagmann on February 6, 1984 2:19:10 pm PST
Added check for read-only streams to ForceOut. This is to avoid a bug
is the changed/unchanged VM information after a rollback (some pages looked
changed for a read-only file).
Changed by Hagmann on May 9, 1984 9:12:21 am PDT
Added check to ensure that read aheads complete before the buffer is deallocated by VM
during a close. This fixes a problem encountered by Frank Crow while running the TSetter,
and narrowed down to FileStream by Willie-Sue.
Changed by December 18, 1984 5:40:06 pm PST
Added UNWIND in Close's call to ForceOut, and the forceCleanUp argument to CloseFileDataForStream. Reformatted file.
Bob Hagmann January 31, 1985 5:41:22 pm PST
remove use of FSLock
changes to: Finalize
Bob Hagmann May 14, 1985 11:09:42 am PDT
put back in use of FSLock
changes to: Finalize, Initialization
Bob Hagmann September 18, 1985 07:06:15 PDT
WriteFilePages and FinishWrites changes to avoid writing pages improperly after shortening the file
Bob Hagmann October 25, 1985 2:02:44 pm PDT
WriteFilePages to mark interval unchanged after write. Fixed subtile finalization bug
Bob Hagmann June 9, 1986 5:10:02 pm PDT
checked for node invalid on all get/put entries (previous error can get us into trouble)
merged in FileStreamCreateImpl
Change log for FileStreamCreateImpl:
Created by Hagmann on November 22, 1983 4:30 pm
By cutting this out of FSFileIOImpl.
Changed by Hagmann on November 28, 1983 12:01 pm
Added test for DoFinalization to enable FileStream testing without making a boot file
Changed by Hagmann on December 6, 1983 4:52 pm
Removed code for process cache
Changed by Hagmann on January 3, 1984 11:33 am
Added enable for ConvertFStoIOErrors.
Russ Atkinson (RRA) May 9, 1985 1:56:42 pm PDT
Added VM.SwapIn for write buffer pages to avoid bogus page faults
changes to: StreamFromOpenFile, SetUpClonedStream, CreateBufferSpace
Bob Hagmann May 14, 1985 11:10:58 am PDT
put back in use of FSLock
changes to: StreamFromOpenFile, StreamFromOpenStream
Bob Hagmann November 13, 1986 1:02:46 pm PST
modified to allow multiple parallel writes
Doug Wyatt, December 12, 1986 5:18:31 pm PST
changed Basics.LongNumber[num] to Basics.LongNumber[pair]
changes to: RoundUpToPages, PageContainingLastByte