-- File: CoreStreams.mesa
-- Edited by Levin, 22-Oct-80 8:46:04
-- Edited by Schroeder, January 13, 1981 11:39 AM
-- Edited by Brotz, January 28, 1983 2:06 PM
DIRECTORY
ByteBltDefs USING [ByteBlt],
Core USING [Close, Open, OpenMode],
csD: FROM "CoreStreamDefs" USING [Position, StreamMode, StreamType],
exD: FROM "ExceptionDefs" USING [SysBug],
Inline USING [LongDivMod, LowHalf],
Process USING [Yield],
Storage USING [Free, Node],
Stream USING [Block],
tfD: FROM "TempFileDefs" USING [AllocateTempFile, FreeTempFile],
VMDefs USING [AllocatePage, FileHandle, GetFileLength, GetOpenFileParameters,
MarkStart, Page, PageAddress, PageNumber, Position, ReadPage, Release, RemapPage,
SetFileLength, UsePage, WaitFile];
CoreStreams: PROGRAM
IMPORTS ByteBltDefs, Core, exD, Inline, Process, Storage, tfD, VMDefs
EXPORTS csD =
BEGIN
OPEN csD;
StreamHandle: TYPE = POINTER TO CStream;
CStream: PUBLIC TYPE = RECORD
[fh: VMDefs.FileHandle,
fileType: {csOpened, userOpened, temp},
type: StreamType,
mode: StreamMode,
bufferState: {empty, loaded, dirty},
buffer: VMDefs.Page,
filePage: VMDefs.PageNumber,
positionByte: CARDINAL,
bytesInBuffer: CARDINAL,
newEOF: Position,
checkPointEOF: Position,
checkPointPosition: Position];
bytesPerPage: CARDINAL = 512;
wordsPerPage: CARDINAL = bytesPerPage/2;
EndOfStream: PUBLIC ERROR = CODE;
OpenFromName: PUBLIC PROCEDURE [name: STRING, type: StreamType, mode: StreamMode]
RETURNS [sh: StreamHandle] =
-- If the specified file does not exist, it will be created unless "mode" is "read", in which
-- case VMDefs.CantOpen[notFound] will be raised.
BEGIN
fh: VMDefs.FileHandle;
fileMode: Core.OpenMode = (IF mode = read THEN read ELSE update);
fh ← Core.Open[name, fileMode];
sh ← Open[fh, type, mode];
sh.fileType ← csOpened;
END; -- of OpenFromName --
Open: PUBLIC PROCEDURE [fh: VMDefs.FileHandle, type: StreamType, mode: StreamMode]
RETURNS [sh: StreamHandle] =
-- It is permissable for "fh" to be NIL. In this case, the data will be buffered in core until
-- a backing file is needed. At this point, a temporary backing file is gotten from the
-- TempFileManager. The initial stream end when no backing file is provided is 0.
BEGIN
IF mode # read AND fh # NIL
AND VMDefs.GetOpenFileParameters[fh].options = oldReadOnly
THEN Oops[];
sh ← Storage.Node[SIZE[CStream]];
sh↑ ← [fh: NIL, fileType: userOpened, type: byte, mode: read,
bufferState: empty, buffer: NIL, filePage: 0, positionByte: 0, bytesInBuffer: 0, newEOF: 0,
checkPointEOF: 0, checkPointPosition: 0 ];
sh.fh ← fh;
sh.type ← type;
sh.mode ← mode;
IF sh.fh = NIL THEN sh.fileType ← temp
ELSE IF mode # overwrite THEN
BEGIN
eofPosition: VMDefs.Position ← VMDefs.GetFileLength[sh.fh];
sh.newEOF ← sh.checkPointEOF
← MapPageByteToPosition[eofPosition.page, eofPosition.byte];
IF mode = append THEN
BEGIN
sh.checkPointPosition ← sh.checkPointEOF;
sh.filePage ← eofPosition.page;
sh.positionByte ← eofPosition.byte;
END;
END;
END; -- of Open --
Checkpoint: PUBLIC PROCEDURE [sh: StreamHandle] =
-- Remembers the current position and stream end for a subsequent Reset.
-- Causes SysBug if the current position is beyond the stream end, i.e. you’ve done a
-- SetPosition to beyond the stream end without a subsequent write.
-- If there is a backing file then any in-core dirty pages are flushed to disk and the size
-- of the backing file is set to match the stream end.
BEGIN
cpPosition: Position;
IF sh.mode = read THEN RETURN;
UpdateNewEOF[sh];
cpPosition ← MapPageByteToPosition[sh.filePage, sh.positionByte];
IF cpPosition > sh.newEOF THEN Oops[];
IF sh.fh # NIL THEN
BEGIN
eofPosition: VMDefs.Position;
WriteBufferIfDirty[sh];
VMDefs.WaitFile[sh.fh];
eofPosition ← VMDefs.GetFileLength[sh.fh];
IF sh.newEOF < MapPageByteToPosition[eofPosition.page, eofPosition.byte] THEN
BEGIN
eofPage: VMDefs.PageNumber;
eofByte: CARDINAL;
[eofPage, eofByte] ← MapPositionToPageByte[sh.newEOF];
VMDefs.SetFileLength[sh.fh, VMDefs.Position[eofPage, eofByte]];
END;
END;
-- If everything wqorked, set new checkpoint state. If any of above signalled, the
-- checkpoint state remains as it was.
sh.checkPointEOF ← sh.newEOF;
sh.checkPointPosition ← cpPosition;
END; -- of Checkpoint --
Destroy: PUBLIC PROCEDURE [sh: StreamHandle] =
-- Destroys the stream without performing the actions of Checkpoint.
-- "sh" becomes invalid and may not be reused. Destroy cannot raise ERRORs.
BEGIN
MakeBufferEmpty[sh];
SELECT sh.fileType FROM
csOpened => Core.Close[sh.fh];
temp => IF sh.fh # NIL THEN tfD.FreeTempFile[sh.fh];
ENDCASE; --userOpened--
Storage.Free[sh];
END; -- of Destroy --
Reset: PUBLIC PROCEDURE [sh: StreamHandle] =
-- Restores the position and stream end to what they were immediately after the stream was
-- opened or last checkpointed. Items within the restored stream end that were written
-- after the last checkpoint retain their new value.
BEGIN
eofPage: VMDefs.PageNumber;
eofByte: CARDINAL;
sh.newEOF ← sh.checkPointEOF;
[eofPage, eofByte] ← MapPositionToPageByte[sh.newEOF];
SELECT eofPage FROM
< sh.filePage => -- checkPointEOF is before buffer
MakeBufferEmpty[sh];
= sh.filePage => -- checkPointEOF is in buffer
BEGIN
IF sh.bytesInBuffer # 0 THEN sh.bytesInBuffer ← eofByte;
IF sh.bytesInBuffer = 0 AND sh.bufferState # empty THEN sh.bufferState ← loaded;
END;
ENDCASE; -- checkPointEOF is beyond buffer
Reposition[sh, sh.checkPointPosition];
END; -- of Reset --
GetType: PUBLIC PROCEDURE [sh: StreamHandle] RETURNS [StreamType] =
{RETURN[sh.type]};
Read: PUBLIC PROCEDURE [sh: StreamHandle] RETURNS [item: UNSPECIFIED] =
-- Raises EndOfStream if the current position is at stream end.
BEGIN
IF sh.positionByte = bytesPerPage OR sh.bufferState = empty THEN PrepareBuffer[sh];
IF sh.positionByte >= sh.bytesInBuffer THEN ERROR EndOfStream;
IF sh.type = byte THEN
{item ← sh.buffer.bytes[sh.positionByte]; sh.positionByte ← sh.positionByte + 1}
ELSE {item ← sh.buffer.words[sh.positionByte / 2]; sh.positionByte ← sh.positionByte + 2};
END; -- of Read --
Write: PUBLIC PROCEDURE [sh: StreamHandle, item: UNSPECIFIED] =
BEGIN
IF sh.mode = read THEN Oops[];
IF sh.positionByte = bytesPerPage OR sh.bufferState = empty THEN PrepareBuffer[sh];
IF sh.type = byte THEN
{sh.buffer.bytes[sh.positionByte] ← item; sh.positionByte ← sh.positionByte + 1}
ELSE {sh.buffer.words[sh.positionByte / 2] ← item; sh.positionByte ← sh.positionByte + 2};
sh.bytesInBuffer ← MAX[sh.bytesInBuffer, sh.positionByte];
sh.bufferState ← dirty;
END; -- of Write --
ReadBlock: PUBLIC PROCEDURE [sh: StreamHandle, to: POINTER, start, nItems: CARDINAL]
RETURNS [n: CARDINAL] =
-- Copies up to "nItems" to the user-provided storage block beginning at "@to[start]".
-- "n" is the number of items actually copied. which may be < "nItems" if the read
-- crosses the stream end. Will not raise EndOfStream.
BEGIN
sink, source: Stream.Block;
bytesCopied: CARDINAL;
sink ← MakeStreamBlock[sh, to, start, nItems];
n ← sink.startIndex;
UNTIL sink.startIndex = sink.stopIndexPlusOne DO
PrepareBuffer[sh];
source.blockPointer ← LONG[sh.buffer];
source.startIndex ← sh.positionByte;
source.stopIndexPlusOne ← sh.bytesInBuffer;
IF source.startIndex = source.stopIndexPlusOne THEN EXIT;
bytesCopied ← ByteBltDefs.ByteBlt[to: sink, from: source];
sink.startIndex ← sink.startIndex + bytesCopied;
sh.positionByte ← sh.positionByte + bytesCopied;
ENDLOOP;
n ← sink.startIndex - n;
IF sh.type = word THEN n ← (n + 1) / 2;
END; -- of ReadBlock --
WriteBlock: PUBLIC PROCEDURE
[ sh: StreamHandle, from: POINTER, start, nItems: CARDINAL] =
-- Copies "nItems" from the user-provided storage block beginning at "@from[start]" to "sh".
BEGIN
IF sh.mode = read THEN Oops[];
CopyBlockToCStream[MakeStreamBlock[sh, from, start, nItems], sh];
END; -- of WriteBlock --
StreamCopy: PUBLIC PROCEDURE [from, to: StreamHandle, fromItems: LONG CARDINAL] =
-- Copies "fromItems" from the "from" stream to the "to" stream. exD.SysBug if
-- (from.type = byte) AND (to.type = word) AND (nItems MOD 2 = 1).
-- StreamCopy between two streams on the same file works as long as the "to" (byte)
-- position is <= the "from" (byte) position, i.e. as long as you are copying down.
-- Copying up on the same file isn’t recommended. Will not raise EndOfStream.
BEGIN
Work: PROCEDURE [source: Stream.Block] = {CopyBlockToCStream[source, to]};
IF to.mode = read OR (to.type = word AND from.type = byte AND fromItems MOD 2 = 1)
THEN Oops[];
ReadStream[from, fromItems, Work]
END; -- of StreamCopy --
ReadStream: PUBLIC PROCEDURE [sh: StreamHandle, nItems: LONG CARDINAL,
AcceptBlock: PROCEDURE [Stream.Block]] =
-- Calls "AcceptBlock" for each block of "sh" from the current position, to that plus "nItem"
-- or to the stream’s end, whichever comes first. "startIndex" in the Stream.Block will be
-- even unless "sh" is a byte stream and the current position is odd. "stopIndexPlusOne"
-- will be even except sometimes for the last block. The maximum size of the block will
-- be one page. Will not raise EndOfStream.
BEGIN
source: Stream.Block;
items: LONG CARDINAL ← IF sh.type = word THEN 2 * nItems ELSE nItems;
DO
copyCount: CARDINAL;
PrepareBuffer[sh];
source.blockPointer ← LONG[sh.buffer];
copyCount ← Inline.LowHalf[MIN[sh.bytesInBuffer - sh.positionByte, items]];
source.startIndex ← sh.positionByte;
source.stopIndexPlusOne ← source.startIndex + copyCount;
AcceptBlock[source];
IF copyCount = 0 THEN EXIT;
Process.Yield[];
sh.positionByte ← sh.positionByte + copyCount;
items ← items - copyCount;
ENDLOOP;
END; -- of ReadStream --
GetLength: PUBLIC PROCEDURE [sh: StreamHandle] RETURNS [Position] =
-- Returns the current stream end position.
BEGIN
UpdateNewEOF[sh];
RETURN[IF sh.type = word THEN sh.newEOF / 2 ELSE sh.newEOF];
END; -- of GetLength --
GetPosition: PUBLIC PROCEDURE [sh: StreamHandle] RETURNS [Position] =
-- Returns the item postion for the next item to be read or to be written on "sh".
{RETURN[IF sh.type = byte
THEN MapPageByteToPosition[sh.filePage, sh.positionByte]
ELSE MapPageWordToPosition[sh.filePage, sh.positionByte / 2]]};
SetPosition: PUBLIC PROCEDURE [sh: StreamHandle, position: Position] =
-- Positions the stream so that the next item read or written will be at ’position’. Unless
-- the stream mode is read, positioning beyond the stream end is legal, but the stream
-- end will not be advanced until a write occurs. Positioning beyond the stream end on
-- a read mode stream causes an exD.SysBug.
BEGIN
bytePosition: Position = (IF sh.type = word THEN position * 2 ELSE position);
IF sh.mode = read AND bytePosition > sh.newEOF THEN Oops[];
Reposition[sh, bytePosition];
END; -- of SetPosition --
MapPageWordToPosition: PUBLIC PROCEDURE [page: CARDINAL, word: CARDINAL]
RETURNS [Position] =
{RETURN[LONG[page] * wordsPerPage + word]};
MapPositionToPageWord: PUBLIC PROCEDURE [position: Position]
RETURNS [page: CARDINAL, word: [0 .. wordsPerPage)] =
{[page, word] ← Inline.LongDivMod[position, wordsPerPage]};
MapPageByteToPosition: PUBLIC PROCEDURE [page: CARDINAL, byte: CARDINAL]
RETURNS [Position] =
{RETURN[LONG[page] * bytesPerPage + byte]};
MapPositionToPageByte: PUBLIC PROCEDURE [position: Position]
RETURNS [page: CARDINAL, byte: [0 .. bytesPerPage)] =
{[page, byte] ← Inline.LongDivMod[position, bytesPerPage]};
-- Internal procedures --
Reposition: PROCEDURE [sh: StreamHandle, bp: Position] =
BEGIN
page: VMDefs.PageNumber;
byte: CARDINAL;
[page, byte] ← MapPositionToPageByte[bp];
IF page # sh.filePage THEN
BEGIN
UpdateNewEOF[sh];
WriteBufferIfDirty[sh];
MakeBufferEmpty[sh];
sh.filePage ← page;
END;
sh.positionByte ← byte;
END; -- of Reposition --
MakeStreamBlock: PROCEDURE [sh: StreamHandle, p: POINTER, s, n: CARDINAL]
RETURNS [b: Stream.Block] =
BEGIN
b.blockPointer ← LONG[p];
b.startIndex ← (IF sh.type = byte THEN s ELSE s * 2);
b.stopIndexPlusOne ← b.startIndex + (IF sh.type = byte THEN n ELSE n * 2);
END; -- of MakeStreamBlock --
CopyBlockToCStream: PROCEDURE [source: Stream.Block, sh: StreamHandle] =
BEGIN
sink: Stream.Block;
bytesCopied: CARDINAL;
UNTIL source.startIndex = source.stopIndexPlusOne DO
PrepareBuffer[sh];
sink.blockPointer ← LONG[sh.buffer];
sink.startIndex ← sh.positionByte;
sink.stopIndexPlusOne ← bytesPerPage;
bytesCopied ← ByteBltDefs.ByteBlt[to: sink, from: source];
source.startIndex ← source.startIndex + bytesCopied;
sh.positionByte ← sh.positionByte + bytesCopied;
sh.bytesInBuffer ← MAX[sh.bytesInBuffer, sh.positionByte];
sh.bufferState ← dirty;
ENDLOOP;
END; -- of CopyBlockToCStream --
PrepareBuffer: PROCEDURE [sh: StreamHandle] =
BEGIN
IF sh.positionByte = bytesPerPage THEN
BEGIN
UpdateNewEOF[sh];
WriteBufferIfDirty[sh];
MakeBufferEmpty[sh];
sh.filePage ← sh.filePage + 1;
sh.positionByte ← 0;
END;
IF sh.bufferState = empty THEN
BEGIN
bufferPosition: LONG CARDINAL = MapPageByteToPosition[sh.filePage, 0];
sh.bytesInBuffer
← Inline.LowHalf[MIN[bytesPerPage, MAX[sh.newEOF, bufferPosition] - bufferPosition]];
sh.buffer ← SELECT TRUE FROM
sh.bytesInBuffer > 0 => VMDefs.ReadPage[[sh.fh, sh.filePage], 2],
sh.fh # NIL => VMDefs.UsePage[[sh.fh, sh.filePage]],
ENDCASE => VMDefs.AllocatePage[];
sh.bufferState ← loaded;
END;
END; -- of PrepareBuffer --
WriteBufferIfDirty: PROCEDURE [sh: StreamHandle] =
BEGIN
IF sh.bufferState = dirty THEN
BEGIN
IF sh.fh = NIL THEN
BEGIN
sh.fh ← tfD.AllocateTempFile[];
VMDefs.RemapPage[sh.buffer, VMDefs.PageAddress[sh.fh, sh.filePage]];
END;
IF VMDefs.GetFileLength[sh.fh].page = sh.filePage THEN
VMDefs.SetFileLength[sh.fh, [sh.filePage + 1, 0] ! UNWIND => MakeBufferEmpty[sh]];
VMDefs.MarkStart[sh.buffer];
sh.bufferState ← loaded;
END;
END; -- of WriteBufferIfDirty --
MakeBufferEmpty: PROCEDURE [sh: StreamHandle] =
BEGIN
IF sh.bufferState # empty THEN VMDefs.Release[sh.buffer];
sh.bufferState ← empty;
sh.bytesInBuffer ← 0;
END; -- of MakeBufferEmpty --
UpdateNewEOF: PROCEDURE [sh: StreamHandle] =
BEGIN
IF sh.bufferState = dirty THEN
sh.newEOF ← MAX[sh.newEOF, MapPageByteToPosition[sh.filePage, sh.bytesInBuffer]];
END; -- of UpdateNewEOF --
Oops: PROCEDURE = {exD.SysBug[]};
END. -- of CoreStreams --