FileWriterImpl.Mesa
Written by Paxton. March 1981
Last Edited by: Paxton, August 19, 1983 1:25 pm
DIRECTORY
File    USING [Capability],
FileIO    USING [StreamFromCapability],
FileWriter  USING [BlockList, BlockListBody, blockSize, FileWriterBody, Offset, OfWriter, Ref, WriteChar],
Inline    USING [HighHalf, LowHalf],
IO     USING [Close, Handle, PutBlock, SetIndex, SetLength],
MonitoredQueue USING [Add, Close, Create, EndOfQueue, MQ, Remove, Reset],
Rope    USING [Concat, FromRefText, ROPE],
RopeReader  USING [],
T2FileOps  USING [controlTrailerId, DataPaddedFlag, DataUnpaddedFlag, versionNumber];
FileWriterImpl: CEDAR MONITOR
IMPORTS Inline, IO, FileIO, FileWriter, MonitoredQueue, Rope
EXPORTS FileWriter
SHARES FileWriter = BEGIN
ROPE: TYPE = Rope.ROPE;
---------------------------------------------------------------
PRINCIPAL PUBLIC INTERFACES
Close: PUBLIC PROC [control, data: FileWriter.Ref, textOnly: BOOL, flags: CHAR]
RETURNS [dataLen, count: FileWriter.Offset, output: ROPE] = {
Close the file - i.e. compute the trailer information and write out all the buffers.
controlStart, controlLen, filePropsOffset, filePropsLen: FileWriter.Offset ← 0;
stream: IO.Handle ← data.stream;
len, controlBlocks, dataBlocks: NAT ← 0;
toRope: BOOLEAN ← data.toRope;
controlList, dataList: FileWriter.BlockList;
controlBlock, dataBlock: REF TEXT;
pad:BOOLEANFALSE; -- says if a data pad byte has been output
FinishWriter: PROC [writer: FileWriter.Ref]
RETURNS [list: FileWriter.BlockList, block: REF TEXT] = INLINE {
list ← Reverse[writer.blockList]; block ← writer.block; FreeFileWriter[writer] };
LastBlock: PROC [lst: FileWriter.BlockList] RETURNS [block: REF TEXT] = INLINE {
FOR l: FileWriter.BlockList ← lst, l.next DO IF l.next=NIL THEN RETURN [l.block]; ENDLOOP };
MakeRope: PROC [first, num: NAT] RETURNS [ROPE] = {
GetChars: PROC [num: NAT] RETURNS [chars: REF TEXT] = {
GetIt: PROC [blocks: NAT, list: FileWriter.BlockList, block: REF TEXT] RETURNS [ok: BOOLEAN] = {
IF num NOT IN [0..blocks) THEN RETURN [FALSE];
FOR lst: FileWriter.BlockList ← list, lst.next UNTIL lst=NIL DO
IF num = 0 THEN { chars ← lst.block; RETURN [TRUE] };
num ← num-1; ENDLOOP;
IF num = 0 THEN { chars ← block; RETURN [TRUE] };
ERROR };
IF GetIt[dataBlocks, dataList, dataBlock] THEN RETURN [chars];
num ← num-dataBlocks;
IF GetIt[controlBlocks, controlList, controlBlock] THEN RETURN [chars];
ERROR };
half: NAT;
SELECT num FROM
0 => RETURN [NIL];
1 => RETURN [Rope.FromRefText[GetChars[first]]];
ENDCASE;
half ← num/2;
RETURN [Rope.Concat[MakeRope[first, half], MakeRope[first+half, num-half]]] };
Reverse: PROC [lst: FileWriter.BlockList] RETURNS [prev: FileWriter.BlockList] = INLINE {
Reverse a writers block list
next: FileWriter.BlockList;
UNTIL lst=NIL DO next ← lst.next; lst.next ← prev; prev ← lst; lst ← next; ENDLOOP };
dataBlocks ← data.blockCount;
controlStart ← dataBlocks*LONG[FileWriter.blockSize];
IF (len�ta.block.length) MOD 2 = 1 THEN {
round up the data to an even number of bytes (word boundary) because this makes the copying to file buffers go much faster
FileWriter.WriteChar[0C, data]; -- this constitutes an extra EOF
pad ← TRUE; -- remind ourselves to mark trailer to say data is padded
len ← len+1 };
controlStart ← controlStart+len; -- calculate the size in bytes
dataBlocks ← dataBlocks+1;  -- counts number of full and partial blocks needed
IF control # NIL THEN {
IF ~textOnly THEN {
chars: REF TEXT;
WriteControlInt: PROC [int: INT] = {
32 bits written as four bytes (unencoded)
deltaBytes: PACKED ARRAY [0..3] OF CHARACTER;
deltaBytes ← LOOPHOLE[int];
FOR i:NAT IN [0..3] DO
FileWriter.WriteChar[deltaBytes[i], control];
ENDLOOP };
WriteControlCardinal: PROC [int: INT] = {
16 bits written as two bytes (unencoded)
c: CARDINAL;
deltaBytes: PACKED ARRAY [0..1] OF CHARACTER;
IF (c ←LOOPHOLE[Inline.HighHalf[int]]) # 0 THEN ERROR;
deltaBytes ← LOOPHOLE[Inline.LowHalf[int]];
FOR i:NAT IN [0..1] DO
FileWriter.WriteChar[deltaBytes[i], control];
ENDLOOP };
ControlLen: PROC RETURNS [len: INT] = INLINE {
RETURN [LONG[FileWriter.blockSize]*control.blockCount+control.block.length] };
IF control.kind # control THEN ERROR;
IF control.toRope # toRope THEN ERROR;
IF control.closeStream # data.closeStream THEN ERROR;
IF control.stream # data.stream THEN ERROR;
filePropsOffset ← ControlLen[];
this is the place to write out the file props, if any
write as sequence of {keyRope, valueRope} pairs
filePropsLen ← ControlLen[]-filePropsOffset;
FileWriter.WriteChar[T2FileOps.controlTrailerId, control];
FileWriter.WriteChar[
IF pad THEN T2FileOps.DataPaddedFlag ELSE T2FileOps.DataUnpaddedFlag, control];
FileWriter.WriteChar[T2FileOps.versionNumber, control];
FileWriter.WriteChar[flags, control];
WriteControlCardinal[filePropsLen];
WriteControlInt[controlStart];  -- index of first control byte
controlLen ← ControlLen[];
controlBlocks ← control.blockCount+1; -- counts number of full and partial blocks needed
WriteControlInt[controlStart+controlLen+4]; -- file length (+4 accounts for self)
chars ← IF control.blockList=NIL THEN control.block ELSE LastBlock[control.blockList] };
[controlList, controlBlock] ← FinishWriter[control] }; -- reverse blocklist, free writer structure
IF data.kind # data THEN ERROR; -- checking for impossible error
dataBlock ← data.block;
IF toRope THEN { -- return a balanced rope
dataList ← Reverse[data.blockList]; output ← MakeRope[0, dataBlocks+controlBlocks] }
ELSE {
MonitoredQueue.Close[data.blockQueue];
TRUSTED {JOIN data.consumer};
FreeMQ[data.blockQueue];
IO.PutBlock[stream, dataBlock];
FreeBLOCK[dataBlock];
IF control # NIL THEN {
FOR lst:FileWriter.BlockList ← controlList, lst.next UNTIL lst=NIL DO
IF ~textOnly THEN IO.PutBlock[stream, lst.block];
FreeBLOCK[lst.block];
ENDLOOP;
IF ~textOnly THEN IO.PutBlock[stream, controlBlock];
FreeBLOCK[controlBlock] }};
count ← controlStart+controlLen;
dataLen ← controlStart;
IF data.closeStream THEN IO.Close[stream];
FreeFileWriter[data] };
OpenC: PUBLIC PROC [
capability: File.Capability, start: FileWriter.Offset ← 0, makeControl: BOOLEANTRUE]
RETURNS [control, data: FileWriter.Ref] = {
stream: IO.Handle = FileIO.StreamFromCapability[capability, overwrite];
IO.SetLength[stream, start];
IO.SetIndex[stream, start];
[control, data] ← CreateWriters[stream, FALSE, makeControl, TRUE] };
ToRope: PUBLIC PROC [makeControl: BOOLEANTRUE] RETURNS [control, data: FileWriter.Ref] = {
[control, data] ← CreateWriters[NIL, TRUE, makeControl, FALSE] };
ToStream: PUBLIC PROC [
stream: IO.Handle, makeControl: BOOLEANTRUE] RETURNS [control, data: FileWriter.Ref] = {
[control, data] ← CreateWriters[stream, FALSE, makeControl, FALSE] };
---------------------------------------------------------------
MONITORED QUEUE MANAGEMENT
A two element cache (q1,q2) of Monitored queues is maintained from which new queues are initially allocated. Note that only one queue is needed per file write in progress.
q1, q2: MonitoredQueue.MQ;
FreeMQ: ENTRY PROC [q: MonitoredQueue.MQ] = {
ENABLE UNWIND => NULL;
IF q=NIL THEN RETURN;
MonitoredQueue.Reset[q];
IF q1 = NIL THEN q1 ← q ELSE IF q2 = NIL THEN q2 ← q };
GetMQ: ENTRY PROC RETURNS [q: MonitoredQueue.MQ] = {
ENABLE UNWIND => NULL;
IF q1 # NIL THEN { q ← q1; q1 ← NIL }
ELSE IF q2 # NIL THEN { q ← q2; q2 ← NIL }
ELSE q ← MonitoredQueue.Create[] };
---------------------------------------------------------------
FILE WRITER MANAGEMENT
A two element cache (fw1, fw2) of FileWriterBodys is maintained from which new FileWriterBodys are initially allocated.
fw1, fw2: FileWriter.Ref;
CreateWriters: PROC [
stream: IO.Handle, toRope: BOOLEANFALSE, makeControl, closeStream: BOOLEANTRUE]
RETURNS [control, data: FileWriter.Ref] = {
Make a writer for the data stream and, if required, a writer for the control stream. Note that the 'stream' and 'closeStream' information is identical in both writers
MakeOne: PROC [kind: FileWriter.OfWriter] RETURNS [writer: FileWriter.Ref] = {
writer ← GetFileWriter[];
writer.kind ← kind;
writer.block ← NewBLOCK[];
writer.stream ← stream;
writer.toRope ← toRope;
writer.closeStream ← closeStream };
IF makeControl THEN { -- make control writer
control ← MakeOne[control]; THROUGH [0..1] DO FileWriter.WriteChar[0C, control]; ENDLOOP };
data ← MakeOne[data];
IF ~toRope THEN { -- fork process to write data as it is produced
data.blockQueue ← GetMQ[]; data.consumer ← FORK Consumer[data] }};
FreeFileWriter: ENTRY PROC [fw: FileWriter.Ref] = {
ENABLE UNWIND => NULL;
fw.block ← NIL;
fw.blockList ← NIL;
fw.blockQueue ← NIL;
fw.blockCount ← 0;
fw.stream ← NIL;
fw.kind ← unused;
IF fw1 = NIL THEN fw1 ← fw
ELSE IF fw2 = NIL THEN fw2 ← fw };
GetFileWriter: ENTRY PROC RETURNS [fw: FileWriter.Ref] = {
ENABLE UNWIND => NULL;
IF fw1 # NIL THEN { fw ← fw1; fw1 ← NIL }
ELSE
IF fw2 # NIL THEN { fw ← fw2; fw2 ← NIL }
ELSE fw ← NEW[FileWriter.FileWriterBody];
IF fw.kind # unused THEN ERROR };
---------------------------------------------------------------
BLOCK CACHE MANAGEMENT
"blocks" contains a cache of blocks produced by the writers as they write blocks out.
blks: NAT = 8;
BlockArray: TYPE = ARRAY [0..blks) OF REF TEXT;
blocks: REF BlockArray ← NEW[BlockArray];
freeBlock: CONDITION;
BumpWriter: PUBLIC PROC [writer: FileWriter.Ref] RETURNS [block: REF TEXT] = {
writer.blockCount ← writer.blockCount+1; -- number added to queue/list
IF writer.kind=data AND ~writer.toRope THEN {
MonitoredQueue.Add[writer.block, writer.blockQueue];
writer.block ← block ← GetBLOCK[] } -- get a block from the cache
ELSE {
writer.blockList ← NEW[FileWriter.BlockListBody ← [writer.block, writer.blockList]];
writer.block ← block ← NewBLOCK[] }};
Consumer: PROC [data: FileWriter.Ref] = {
queue: MonitoredQueue.MQ ← data.blockQueue;
stream: IO.Handle ← data.stream;
DO
x: REF ANY ← MonitoredQueue.Remove[queue ! MonitoredQueue.EndOfQueue => EXIT];
WITH x SELECT FROM
xx: REF TEXT => { IO.PutBlock[stream, xx]; FreeBLOCK[xx] };
ENDCASE => ERROR;
ENDLOOP };
FreeBLOCK: ENTRY PROC [b: REF TEXT] = {
ENABLE UNWIND => NULL;
FOR i:NAT IN [0..blks) DO
IF blocks[i] = NIL THEN { blocks[i] ← b; BROADCAST freeBlock; RETURN };
ENDLOOP };
GetBLOCK: ENTRY PROC RETURNS [b: REF TEXT] = {
ENABLE UNWIND => NULL;
WHILE TRUE DO
FOR i:NAT IN [0..blks) DO
IF blocks[i] # NIL THEN { b ← blocks[i]; blocks[i] ← NIL; b.length ← 0; RETURN[b] };
ENDLOOP;
WAIT freeBlock;
ENDLOOP };
NewBLOCK: PROC RETURNS [b: REF TEXT] = INLINE { RETURN[NEW[TEXT[FileWriter.blockSize]]] };
FOR i:CARDINAL IN [0..blks) DO blocks[i] ← NewBLOCK[]; ENDLOOP;
END.