FileWriterImpl.Mesa
Written by Paxton. March 1981
Last Edited by: Paxton, August 19, 1983 1:25 pm
DIRECTORY
File USING [Capability],
FileIO USING [StreamFromCapability],
FileWriter USING [BlockList, BlockListBody, blockSize, FileWriterBody, Offset, OfWriter, Ref, WriteChar],
Inline USING [HighHalf, LowHalf],
IO USING [Close, Handle, PutBlock, SetIndex, SetLength],
MonitoredQueue USING [Add, Close, Create, EndOfQueue, MQ, Remove, Reset],
Rope USING [Concat, FromRefText, ROPE],
RopeReader USING [],
T2FileOps USING [controlTrailerId, DataPaddedFlag, DataUnpaddedFlag, versionNumber];
FileWriterImpl:
CEDAR MONITOR
IMPORTS Inline, IO, FileIO, FileWriter, MonitoredQueue, Rope
EXPORTS FileWriter
SHARES FileWriter = BEGIN
ROPE: TYPE = Rope.ROPE;
---------------------------------------------------------------
PRINCIPAL PUBLIC INTERFACES
Close:
PUBLIC
PROC [control, data: FileWriter.Ref, textOnly:
BOOL, flags:
CHAR]
RETURNS [dataLen, count: FileWriter.Offset, output: ROPE] = {
Close the file - i.e. compute the trailer information and write out all the buffers.
controlStart, controlLen, filePropsOffset, filePropsLen: FileWriter.Offset ← 0;
stream: IO.Handle ← data.stream;
len, controlBlocks, dataBlocks: NAT ← 0;
toRope: BOOLEAN ← data.toRope;
controlList, dataList: FileWriter.BlockList;
controlBlock, dataBlock: REF TEXT;
pad:
BOOLEAN ←
FALSE;
-- says if a data pad byte has been output
FinishWriter:
PROC [writer: FileWriter.Ref]
RETURNS [list: FileWriter.BlockList, block: REF TEXT] = INLINE {
list ← Reverse[writer.blockList]; block ← writer.block; FreeFileWriter[writer] };
LastBlock:
PROC [lst: FileWriter.BlockList]
RETURNS [block:
REF
TEXT] =
INLINE {
FOR l: FileWriter.BlockList ← lst, l.next DO IF l.next=NIL THEN RETURN [l.block]; ENDLOOP };
MakeRope:
PROC [first, num:
NAT]
RETURNS [
ROPE] = {
GetChars:
PROC [num:
NAT]
RETURNS [chars:
REF
TEXT] = {
GetIt:
PROC [blocks:
NAT, list: FileWriter.BlockList, block:
REF
TEXT]
RETURNS [ok:
BOOLEAN] = {
IF num NOT IN [0..blocks) THEN RETURN [FALSE];
FOR lst: FileWriter.BlockList ← list, lst.next
UNTIL lst=
NIL
DO
IF num = 0 THEN { chars ← lst.block; RETURN [TRUE] };
num ← num-1; ENDLOOP;
IF num = 0 THEN { chars ← block; RETURN [TRUE] };
ERROR };
IF GetIt[dataBlocks, dataList, dataBlock] THEN RETURN [chars];
num ← num-dataBlocks;
IF GetIt[controlBlocks, controlList, controlBlock] THEN RETURN [chars];
ERROR };
half: NAT;
SELECT num
FROM
0 => RETURN [NIL];
1 => RETURN [Rope.FromRefText[GetChars[first]]];
ENDCASE;
half ← num/2;
RETURN [Rope.Concat[MakeRope[first, half], MakeRope[first+half, num-half]]] };
Reverse:
PROC [lst: FileWriter.BlockList]
RETURNS [prev: FileWriter.BlockList] =
INLINE {
Reverse a writers block list
next: FileWriter.BlockList;
UNTIL lst=NIL DO next ← lst.next; lst.next ← prev; prev ← lst; lst ← next; ENDLOOP };
dataBlocks ← data.blockCount;
controlStart ← dataBlocks*LONG[FileWriter.blockSize];
IF (lenta.block.length)
MOD 2 = 1
THEN {
round up the data to an even number of bytes (word boundary) because this makes the copying to file buffers go much faster
FileWriter.WriteChar[0C, data]; -- this constitutes an extra EOF
pad ← TRUE; -- remind ourselves to mark trailer to say data is padded
len ← len+1 };
controlStart ← controlStart+len; -- calculate the size in bytes
dataBlocks ← dataBlocks+1; -- counts number of full and partial blocks needed
IF control #
NIL
THEN {
IF ~textOnly
THEN {
WriteControlInt:
PROC [int:
INT] = {
32 bits written as four bytes (unencoded)
deltaBytes: PACKED ARRAY [0..3] OF CHARACTER;
deltaBytes ← LOOPHOLE[int];
FOR i:
NAT
IN [0..3]
DO
FileWriter.WriteChar[deltaBytes[i], control];
ENDLOOP };
WriteControlCardinal:
PROC [int:
INT] = {
16 bits written as two bytes (unencoded)
c: CARDINAL;
deltaBytes: PACKED ARRAY [0..1] OF CHARACTER;
IF (c ←LOOPHOLE[Inline.HighHalf[int]]) # 0 THEN ERROR;
deltaBytes ← LOOPHOLE[Inline.LowHalf[int]];
FOR i:
NAT
IN [0..1]
DO
FileWriter.WriteChar[deltaBytes[i], control];
ENDLOOP };
ControlLen:
PROC
RETURNS [len:
INT] =
INLINE {
RETURN [LONG[FileWriter.blockSize]*control.blockCount+control.block.length] };
IF control.kind # control THEN ERROR;
IF control.toRope # toRope THEN ERROR;
IF control.closeStream # data.closeStream THEN ERROR;
IF control.stream # data.stream THEN ERROR;
filePropsOffset ← ControlLen[];
this is the place to write out the file props, if any
write as sequence of {keyRope, valueRope} pairs
filePropsLen ← ControlLen[]-filePropsOffset;
FileWriter.WriteChar[T2FileOps.controlTrailerId, control];
FileWriter.WriteChar[
IF pad THEN T2FileOps.DataPaddedFlag ELSE T2FileOps.DataUnpaddedFlag, control];
FileWriter.WriteChar[T2FileOps.versionNumber, control];
FileWriter.WriteChar[flags, control];
WriteControlCardinal[filePropsLen];
WriteControlInt[controlStart]; -- index of first control byte
controlLen ← ControlLen[];
controlBlocks ← control.blockCount+1; -- counts number of full and partial blocks needed
WriteControlInt[controlStart+controlLen+4]; -- file length (+4 accounts for self)
chars ← IF control.blockList=NIL THEN control.block ELSE LastBlock[control.blockList] };
[controlList, controlBlock] ← FinishWriter[control] }; -- reverse blocklist, free writer structure
IF data.kind # data THEN ERROR; -- checking for impossible error
dataBlock ← data.block;
IF toRope
THEN {
-- return a balanced rope
dataList ← Reverse[data.blockList]; output ← MakeRope[0, dataBlocks+controlBlocks] }
ELSE {
MonitoredQueue.Close[data.blockQueue];
TRUSTED {JOIN data.consumer};
FreeMQ[data.blockQueue];
IO.PutBlock[stream, dataBlock];
FreeBLOCK[dataBlock];
IF control #
NIL
THEN {
FOR lst:FileWriter.BlockList ← controlList, lst.next
UNTIL lst=
NIL
DO
IF ~textOnly THEN IO.PutBlock[stream, lst.block];
FreeBLOCK[lst.block];
ENDLOOP;
IF ~textOnly THEN IO.PutBlock[stream, controlBlock];
FreeBLOCK[controlBlock] }};
count ← controlStart+controlLen;
dataLen ← controlStart;
IF data.closeStream THEN IO.Close[stream];
FreeFileWriter[data] };
OpenC:
PUBLIC
PROC [
capability: File.Capability, start: FileWriter.Offset ← 0, makeControl: BOOLEAN ← TRUE]
RETURNS [control, data: FileWriter.Ref] = {
stream: IO.Handle = FileIO.StreamFromCapability[capability, overwrite];
IO.SetLength[stream, start];
IO.SetIndex[stream, start];
[control, data] ← CreateWriters[stream, FALSE, makeControl, TRUE] };
ToRope:
PUBLIC
PROC [makeControl:
BOOLEAN ←
TRUE]
RETURNS [control, data: FileWriter.Ref] = {
[control, data] ← CreateWriters[NIL, TRUE, makeControl, FALSE] };
ToStream:
PUBLIC
PROC [
stream: IO.Handle, makeControl: BOOLEAN ← TRUE] RETURNS [control, data: FileWriter.Ref] = {
[control, data] ← CreateWriters[stream, FALSE, makeControl, FALSE] };
---------------------------------------------------------------
MONITORED QUEUE MANAGEMENT
A two element cache (q1,q2) of Monitored queues is maintained from which new queues are initially allocated. Note that only one queue is needed per file write in progress.
q1, q2: MonitoredQueue.
MQ;
FreeMQ:
ENTRY
PROC [q: MonitoredQueue.
MQ] = {
ENABLE UNWIND => NULL;
IF q=NIL THEN RETURN;
MonitoredQueue.Reset[q];
IF q1 = NIL THEN q1 ← q ELSE IF q2 = NIL THEN q2 ← q };
GetMQ:
ENTRY
PROC
RETURNS [q: MonitoredQueue.
MQ] = {
ENABLE UNWIND => NULL;
IF q1 # NIL THEN { q ← q1; q1 ← NIL }
ELSE IF q2 # NIL THEN { q ← q2; q2 ← NIL }
ELSE q ← MonitoredQueue.Create[] };
---------------------------------------------------------------
FILE WRITER MANAGEMENT
A two element cache (fw1, fw2) of FileWriterBodys is maintained from which new FileWriterBodys are initially allocated.
fw1, fw2: FileWriter.Ref;
CreateWriters:
PROC [
stream: IO.Handle, toRope: BOOLEAN ← FALSE, makeControl, closeStream: BOOLEAN ← TRUE]
RETURNS [control, data: FileWriter.Ref] = {
Make a writer for the data stream and, if required, a writer for the control stream. Note that the 'stream' and 'closeStream' information is identical in both writers
MakeOne:
PROC [kind: FileWriter.OfWriter]
RETURNS [writer: FileWriter.Ref] = {
writer ← GetFileWriter[];
writer.kind ← kind;
writer.block ← NewBLOCK[];
writer.stream ← stream;
writer.toRope ← toRope;
writer.closeStream ← closeStream };
IF makeControl
THEN {
-- make control writer
control ← MakeOne[control]; THROUGH [0..1] DO FileWriter.WriteChar[0C, control]; ENDLOOP };
data ← MakeOne[data];
IF ~toRope
THEN {
-- fork process to write data as it is produced
data.blockQueue ← GetMQ[]; data.consumer ← FORK Consumer[data] }};
FreeFileWriter:
ENTRY
PROC [fw: FileWriter.Ref] = {
ENABLE UNWIND => NULL;
fw.block ← NIL;
fw.blockList ← NIL;
fw.blockQueue ← NIL;
fw.blockCount ← 0;
fw.stream ← NIL;
fw.kind ← unused;
IF fw1 = NIL THEN fw1 ← fw
ELSE IF fw2 = NIL THEN fw2 ← fw };
GetFileWriter:
ENTRY
PROC
RETURNS [fw: FileWriter.Ref] = {
ENABLE UNWIND => NULL;
IF fw1 # NIL THEN { fw ← fw1; fw1 ← NIL }
ELSE
IF fw2 # NIL THEN { fw ← fw2; fw2 ← NIL }
ELSE fw ← NEW[FileWriter.FileWriterBody];
IF fw.kind # unused THEN ERROR };
---------------------------------------------------------------
BLOCK CACHE MANAGEMENT
"blocks" contains a cache of blocks produced by the writers as they write blocks out.
blks: NAT = 8;
BlockArray: TYPE = ARRAY [0..blks) OF REF TEXT;
blocks: REF BlockArray ← NEW[BlockArray];
BumpWriter:
PUBLIC
PROC [writer: FileWriter.Ref]
RETURNS [block:
REF
TEXT] = {
writer.blockCount ← writer.blockCount+1; -- number added to queue/list
IF writer.kind=data
AND ~writer.toRope
THEN {
MonitoredQueue.Add[writer.block, writer.blockQueue];
writer.block ← block ← GetBLOCK[] } -- get a block from the cache
ELSE {
writer.blockList ← NEW[FileWriter.BlockListBody ← [writer.block, writer.blockList]];
writer.block ← block ← NewBLOCK[] }};
Consumer:
PROC [data: FileWriter.Ref] = {
queue: MonitoredQueue.MQ ← data.blockQueue;
stream: IO.Handle ← data.stream;
DO
x: REF ANY ← MonitoredQueue.Remove[queue ! MonitoredQueue.EndOfQueue => EXIT];
WITH x
SELECT
FROM
xx: REF TEXT => { IO.PutBlock[stream, xx]; FreeBLOCK[xx] };
ENDCASE => ERROR;
ENDLOOP };
FreeBLOCK:
ENTRY
PROC [b:
REF
TEXT] = {
ENABLE UNWIND => NULL;
FOR i:
NAT
IN [0..blks)
DO
IF blocks[i] = NIL THEN { blocks[i] ← b; BROADCAST freeBlock; RETURN };
ENDLOOP };
GetBLOCK:
ENTRY
PROC
RETURNS [b:
REF
TEXT] = {
ENABLE UNWIND => NULL;
WHILE
TRUE
DO
FOR i:
NAT
IN [0..blks)
DO
IF blocks[i] # NIL THEN { b ← blocks[i]; blocks[i] ← NIL; b.length ← 0; RETURN[b] };
ENDLOOP;
WAIT freeBlock;
ENDLOOP };
NewBLOCK: PROC RETURNS [b: REF TEXT] = INLINE { RETURN[NEW[TEXT[FileWriter.blockSize]]] };
FOR i:
CARDINAL
IN [0..blks)
DO blocks[i] ← NewBLOCK[];
ENDLOOP;
END.