<> <> <> <<>> DIRECTORY File USING [Capability], FileIO USING [StreamFromCapability], FileWriter USING [BlockList, BlockListBody, blockSize, FileWriterBody, Offset, OfWriter, Ref, WriteChar], Inline USING [HighHalf, LowHalf], IO USING [Close, Handle, PutBlock, SetIndex, SetLength], MonitoredQueue USING [Add, Close, Create, EndOfQueue, MQ, Remove, Reset], Rope USING [Concat, FromRefText, ROPE], RopeReader USING [], T2FileOps USING [controlTrailerId, DataPaddedFlag, DataUnpaddedFlag, versionNumber]; FileWriterImpl: CEDAR MONITOR IMPORTS Inline, IO, FileIO, FileWriter, MonitoredQueue, Rope EXPORTS FileWriter SHARES FileWriter = BEGIN ROPE: TYPE = Rope.ROPE; <<--------------------------------------------------------------->> <> <<>> Close: PUBLIC PROC [control, data: FileWriter.Ref, textOnly: BOOL, flags: CHAR] RETURNS [dataLen, count: FileWriter.Offset, output: ROPE] = { <> controlStart, controlLen, filePropsOffset, filePropsLen: FileWriter.Offset _ 0; stream: IO.Handle _ data.stream; len, controlBlocks, dataBlocks: NAT _ 0; toRope: BOOLEAN _ data.toRope; controlList, dataList: FileWriter.BlockList; controlBlock, dataBlock: REF TEXT; pad:BOOLEAN _FALSE; -- says if a data pad byte has been output FinishWriter: PROC [writer: FileWriter.Ref] RETURNS [list: FileWriter.BlockList, block: REF TEXT] = INLINE { list _ Reverse[writer.blockList]; block _ writer.block; FreeFileWriter[writer] }; LastBlock: PROC [lst: FileWriter.BlockList] RETURNS [block: REF TEXT] = INLINE { FOR l: FileWriter.BlockList _ lst, l.next DO IF l.next=NIL THEN RETURN [l.block]; ENDLOOP }; MakeRope: PROC [first, num: NAT] RETURNS [ROPE] = { GetChars: PROC [num: NAT] RETURNS [chars: REF TEXT] = { GetIt: PROC [blocks: NAT, list: FileWriter.BlockList, block: REF TEXT] RETURNS [ok: BOOLEAN] = { IF num NOT IN [0..blocks) THEN RETURN [FALSE]; FOR lst: FileWriter.BlockList _ list, lst.next UNTIL lst=NIL DO IF num = 0 THEN { chars _ lst.block; RETURN [TRUE] }; num _ num-1; ENDLOOP; IF num = 0 THEN { chars _ block; RETURN [TRUE] }; ERROR }; IF GetIt[dataBlocks, dataList, dataBlock] THEN RETURN [chars]; num _ num-dataBlocks; IF GetIt[controlBlocks, controlList, controlBlock] THEN RETURN [chars]; ERROR }; half: NAT; SELECT num FROM 0 => RETURN [NIL]; 1 => RETURN [Rope.FromRefText[GetChars[first]]]; ENDCASE; half _ num/2; RETURN [Rope.Concat[MakeRope[first, half], MakeRope[first+half, num-half]]] }; Reverse: PROC [lst: FileWriter.BlockList] RETURNS [prev: FileWriter.BlockList] = INLINE { <> next: FileWriter.BlockList; UNTIL lst=NIL DO next _ lst.next; lst.next _ prev; prev _ lst; lst _ next; ENDLOOP }; dataBlocks _ data.blockCount; controlStart _ dataBlocks*LONG[FileWriter.blockSize]; IF (len_data.block.length) MOD 2 = 1 THEN { <> FileWriter.WriteChar[0C, data]; -- this constitutes an extra EOF pad _ TRUE; -- remind ourselves to mark trailer to say data is padded len _ len+1 }; controlStart _ controlStart+len; -- calculate the size in bytes dataBlocks _ dataBlocks+1; -- counts number of full and partial blocks needed IF control # NIL THEN { IF ~textOnly THEN { chars: REF TEXT; WriteControlInt: PROC [int: INT] = { <<32 bits written as four bytes (unencoded)>> deltaBytes: PACKED ARRAY [0..3] OF CHARACTER; deltaBytes _ LOOPHOLE[int]; FOR i:NAT IN [0..3] DO FileWriter.WriteChar[deltaBytes[i], control]; ENDLOOP }; WriteControlCardinal: PROC [int: INT] = { <<16 bits written as two bytes (unencoded)>> c: CARDINAL; deltaBytes: PACKED ARRAY [0..1] OF CHARACTER; IF (c _LOOPHOLE[Inline.HighHalf[int]]) # 0 THEN ERROR; deltaBytes _ LOOPHOLE[Inline.LowHalf[int]]; FOR i:NAT IN [0..1] DO FileWriter.WriteChar[deltaBytes[i], control]; ENDLOOP }; ControlLen: PROC RETURNS [len: INT] = INLINE { RETURN [LONG[FileWriter.blockSize]*control.blockCount+control.block.length] }; IF control.kind # control THEN ERROR; IF control.toRope # toRope THEN ERROR; IF control.closeStream # data.closeStream THEN ERROR; IF control.stream # data.stream THEN ERROR; filePropsOffset _ ControlLen[]; <> <> <<>> filePropsLen _ ControlLen[]-filePropsOffset; FileWriter.WriteChar[T2FileOps.controlTrailerId, control]; FileWriter.WriteChar[ IF pad THEN T2FileOps.DataPaddedFlag ELSE T2FileOps.DataUnpaddedFlag, control]; FileWriter.WriteChar[T2FileOps.versionNumber, control]; FileWriter.WriteChar[flags, control]; WriteControlCardinal[filePropsLen]; WriteControlInt[controlStart]; -- index of first control byte controlLen _ ControlLen[]; controlBlocks _ control.blockCount+1; -- counts number of full and partial blocks needed WriteControlInt[controlStart+controlLen+4]; -- file length (+4 accounts for self) chars _ IF control.blockList=NIL THEN control.block ELSE LastBlock[control.blockList] }; [controlList, controlBlock] _ FinishWriter[control] }; -- reverse blocklist, free writer structure IF data.kind # data THEN ERROR; -- checking for impossible error dataBlock _ data.block; IF toRope THEN { -- return a balanced rope dataList _ Reverse[data.blockList]; output _ MakeRope[0, dataBlocks+controlBlocks] } ELSE { MonitoredQueue.Close[data.blockQueue]; TRUSTED {JOIN data.consumer}; FreeMQ[data.blockQueue]; IO.PutBlock[stream, dataBlock]; FreeBLOCK[dataBlock]; IF control # NIL THEN { FOR lst:FileWriter.BlockList _ controlList, lst.next UNTIL lst=NIL DO IF ~textOnly THEN IO.PutBlock[stream, lst.block]; FreeBLOCK[lst.block]; ENDLOOP; IF ~textOnly THEN IO.PutBlock[stream, controlBlock]; FreeBLOCK[controlBlock] }}; count _ controlStart+controlLen; dataLen _ controlStart; IF data.closeStream THEN IO.Close[stream]; FreeFileWriter[data] }; OpenC: PUBLIC PROC [ capability: File.Capability, start: FileWriter.Offset _ 0, makeControl: BOOLEAN _ TRUE] RETURNS [control, data: FileWriter.Ref] = { stream: IO.Handle = FileIO.StreamFromCapability[capability, overwrite]; IO.SetLength[stream, start]; IO.SetIndex[stream, start]; [control, data] _ CreateWriters[stream, FALSE, makeControl, TRUE] }; ToRope: PUBLIC PROC [makeControl: BOOLEAN _ TRUE] RETURNS [control, data: FileWriter.Ref] = { [control, data] _ CreateWriters[NIL, TRUE, makeControl, FALSE] }; ToStream: PUBLIC PROC [ stream: IO.Handle, makeControl: BOOLEAN _ TRUE] RETURNS [control, data: FileWriter.Ref] = { [control, data] _ CreateWriters[stream, FALSE, makeControl, FALSE] }; <<--------------------------------------------------------------->> <> <> q1, q2: MonitoredQueue.MQ; FreeMQ: ENTRY PROC [q: MonitoredQueue.MQ] = { ENABLE UNWIND => NULL; IF q=NIL THEN RETURN; MonitoredQueue.Reset[q]; IF q1 = NIL THEN q1 _ q ELSE IF q2 = NIL THEN q2 _ q }; GetMQ: ENTRY PROC RETURNS [q: MonitoredQueue.MQ] = { ENABLE UNWIND => NULL; IF q1 # NIL THEN { q _ q1; q1 _ NIL } ELSE IF q2 # NIL THEN { q _ q2; q2 _ NIL } ELSE q _ MonitoredQueue.Create[] }; <<--------------------------------------------------------------->> <> <> fw1, fw2: FileWriter.Ref; CreateWriters: PROC [ stream: IO.Handle, toRope: BOOLEAN _ FALSE, makeControl, closeStream: BOOLEAN _ TRUE] RETURNS [control, data: FileWriter.Ref] = { <> MakeOne: PROC [kind: FileWriter.OfWriter] RETURNS [writer: FileWriter.Ref] = { writer _ GetFileWriter[]; writer.kind _ kind; writer.block _ NewBLOCK[]; writer.stream _ stream; writer.toRope _ toRope; writer.closeStream _ closeStream }; IF makeControl THEN { -- make control writer control _ MakeOne[control]; THROUGH [0..1] DO FileWriter.WriteChar[0C, control]; ENDLOOP }; data _ MakeOne[data]; IF ~toRope THEN { -- fork process to write data as it is produced data.blockQueue _ GetMQ[]; data.consumer _ FORK Consumer[data] }}; FreeFileWriter: ENTRY PROC [fw: FileWriter.Ref] = { ENABLE UNWIND => NULL; fw.block _ NIL; fw.blockList _ NIL; fw.blockQueue _ NIL; fw.blockCount _ 0; fw.stream _ NIL; fw.kind _ unused; IF fw1 = NIL THEN fw1 _ fw ELSE IF fw2 = NIL THEN fw2 _ fw }; <<>> GetFileWriter: ENTRY PROC RETURNS [fw: FileWriter.Ref] = { ENABLE UNWIND => NULL; IF fw1 # NIL THEN { fw _ fw1; fw1 _ NIL } ELSE IF fw2 # NIL THEN { fw _ fw2; fw2 _ NIL } ELSE fw _ NEW[FileWriter.FileWriterBody]; IF fw.kind # unused THEN ERROR }; <<--------------------------------------------------------------->> <> <<"blocks" contains a cache of blocks produced by the writers as they write blocks out. >> <<>> blks: NAT = 8; BlockArray: TYPE = ARRAY [0..blks) OF REF TEXT; blocks: REF BlockArray _ NEW[BlockArray]; freeBlock: CONDITION; BumpWriter: PUBLIC PROC [writer: FileWriter.Ref] RETURNS [block: REF TEXT] = { writer.blockCount _ writer.blockCount+1; -- number added to queue/list IF writer.kind=data AND ~writer.toRope THEN { MonitoredQueue.Add[writer.block, writer.blockQueue]; writer.block _ block _ GetBLOCK[] } -- get a block from the cache ELSE { writer.blockList _ NEW[FileWriter.BlockListBody _ [writer.block, writer.blockList]]; writer.block _ block _ NewBLOCK[] }}; <<>> Consumer: PROC [data: FileWriter.Ref] = { queue: MonitoredQueue.MQ _ data.blockQueue; stream: IO.Handle _ data.stream; DO x: REF ANY _ MonitoredQueue.Remove[queue ! MonitoredQueue.EndOfQueue => EXIT]; WITH x SELECT FROM xx: REF TEXT => { IO.PutBlock[stream, xx]; FreeBLOCK[xx] }; ENDCASE => ERROR; ENDLOOP }; <<>> FreeBLOCK: ENTRY PROC [b: REF TEXT] = { ENABLE UNWIND => NULL; FOR i:NAT IN [0..blks) DO IF blocks[i] = NIL THEN { blocks[i] _ b; BROADCAST freeBlock; RETURN }; ENDLOOP }; GetBLOCK: ENTRY PROC RETURNS [b: REF TEXT] = { ENABLE UNWIND => NULL; WHILE TRUE DO FOR i:NAT IN [0..blks) DO IF blocks[i] # NIL THEN { b _ blocks[i]; blocks[i] _ NIL; b.length _ 0; RETURN[b] }; ENDLOOP; WAIT freeBlock; ENDLOOP }; NewBLOCK: PROC RETURNS [b: REF TEXT] = INLINE { RETURN[NEW[TEXT[FileWriter.blockSize]]] }; FOR i:CARDINAL IN [0..blks) DO blocks[i] _ NewBLOCK[]; ENDLOOP; END.