Partition.mesa
Updated: April 7, 1983 10:51 AM
Updated: March 16, 1981 1:09 PM by TRS
Last Edited by: McCreight, July 2, 1985 10:16:32 am PDT
Bertrand Serlet May 21, 1987 4:24:13 pm PDT
DIRECTORY
Basics, FS, IO, IODefs, NStripeDefs, PartitionDefs, Rope;
Partition: CEDAR PROGRAM IMPORTS FS, IO, IODefs EXPORTS PartitionDefs =
BEGIN OPEN PartitionDefs;
partitionFile: IO.STREAMNIL;
firstAvailableIndex: INT ← 0; -- >=0 means writing, <0 means reading
PartitionRec: TYPE = RECORD [ firstIndex, futureIndex: INT ← -1 , buf: REF TEXTNIL ];
Partition: TYPE = REF PartitionRec ← NIL;
StripeArrayRec: TYPE = ARRAY stripeNumber OF Partition;
StripeArray: TYPE = REF StripeArrayRec ← NIL;
LayerArrayRec: TYPE = ARRAY layerNumber OF StripeArray;
layerArray: REF LayerArrayRec;
partitionLength: NAT = 512;
headerLength: NAT = Basics.bytesPerWord*SIZE[INT];
bufLength: NAT = partitionLength-headerLength;
InitPartitions: PUBLIC PROC [FileName: Rope.ROPE] =
BEGIN
partitionFile ← FS.StreamOpen[fileName: FileName, accessOptions: $create, createByteCount: 10000000 -- large initial extent -- ];
firstAvailableIndex ← 0;
layerArray ← NEW[LayerArrayRec];
END;
DestroyPartitions: PUBLIC PROC RETURNS [fileLength: INT] =
BEGIN
layerArray ← NIL;
IF partitionFile # NIL THEN
BEGIN
fileLength ← partitionFile.GetLength[];
partitionFile.Close[];
partitionFile ← NIL;
END
ELSE fileLength ← 0;
END;
SendObjectToPartition: PUBLIC PROC [Stripe: stripeNumber, Layer: layerNumber,
object: Basics.UnsafeBlock] =
BEGIN
size: BYTE;
s: StripeArray;
p: Partition;
WHILE (s ← layerArray[Layer]) = NIL DO
layerArray[Layer] ← NEW[StripeArrayRec];
ENDLOOP;
WHILE (p ← s[Stripe]) = NIL DO
s[Stripe] ← NEW[PartitionRec ← [buf: NEW[TEXT[bufLength]]]];
s[Stripe].buf.length ← 0;
ENDLOOP;
size ← object.count;
TRUSTED {PartitionPutChar[p, LOOPHOLE[size, CHAR]]};
PartitionUnsafePutBlock[p, object];
END;
PartitionPutChar: PROC [p: Partition, c: CHAR] =
BEGIN
IF p.buf.length>=p.buf.maxLength THEN EmptyFilledBuffer[p];
p.buf[p.buf.length] ← c;
p.buf.length ← p.buf.length+1;
END;
PartitionUnsafePutBlock: PROC [ p: Partition, block: Basics.UnsafeBlock ] =
BEGIN
FOR i: INT IN [block.startIndex..block.startIndex+block.count) DO
base: LONG POINTER TO PACKED ARRAY [0..0) OF BYTELOOPHOLE [block.base];
TRUSTED { PartitionPutChar[p, LOOPHOLE [base[i], CHAR]] };
ENDLOOP;
END;
EmptyFilledBuffer: PROC [p: Partition] =
BEGIN
IF p.buf.length # p.buf.maxLength THEN ERROR;
IF p.firstIndex<0 THEN p.firstIndex ← p.futureIndex ← AllocateIndex[];
partitionFile.SetIndex[p.futureIndex
! IO.EndOfStream => {LengthenFile[partitionFile]; RETRY}];
p.futureIndex ← AllocateIndex[];
PutInt[partitionFile, p.futureIndex];
partitionFile.PutBlock[p.buf, 0, p.buf.length];
p.buf.length ← 0;
END;
LengthenFile: PROC [ self: IO.STREAM, delta: INT ← 2000000 ] =
{ self.SetLength[self.GetLength+delta] };
FinishWritingPartitions: PROC =
BEGIN
IF firstAvailableIndex>=0 THEN
BEGIN -- stop writing and set up for reading
sa: StripeArray;
p: Partition;
FOR l: layerNumber IN layerNumber DO
IF (sa ← layerArray[l]) # NIL THEN
FOR s: stripeNumber IN stripeNumber DO
IF (p ← sa[s]) # NIL THEN
BEGIN
IODefs.PostIt[IO.PutFR["Finishing partition layer %g, stripe %g...",
IO.int[l], IO.int[s]]];
FinishPartition[p];
END;
ENDLOOP;
ENDLOOP;
firstAvailableIndex ← -1;
END;
END;
FinishPartition: PROC [p: Partition] =
BEGIN
IF p.buf.length=0 THEN
BEGIN
IF p.firstIndex # -1 THEN ERROR;
END
ELSE
BEGIN
IF p.firstIndex<0 THEN p.firstIndex ← p.futureIndex ← AllocateIndex[p.buf.length];
partitionFile.SetIndex[p.futureIndex
! IO.EndOfStream => {LengthenFile[partitionFile]; RETRY}];
PutInt[partitionFile, -p.buf.length];
partitionFile.PutBlock[p.buf, 0, p.buf.length];
END;
p.buf ← NIL;
END;
AllocateIndex: PROC [dataLength: NAT ← bufLength] RETURNS [index: INT] =
BEGIN
index ← firstAvailableIndex;
firstAvailableIndex ← firstAvailableIndex+headerLength+dataLength;
END;
PartitionStream: TYPE = REF PartitionStreamRec ← NIL;
PartitionStreamRec: TYPE = RECORD [
futureIndex: INT ← -1,
bytesLeftInPartition: NAT ← 0
];
readPartitionStreamProcs: REF IO.StreamProcs ← IO.CreateStreamProcs[
variety: input,
class: $MEBESPartition,
unsafeGetBlock: PartitionUnsafeGetBlock];
ReadPartitionCreate: PUBLIC PROC [Stripe: stripeNumber, Layer: layerNumber]
RETURNS [ s: IO.STREAM ] =
BEGIN
sa: StripeArray;
p: Partition;
FinishWritingPartitions[]; -- in case this is the first call on ReadPartitionCreate...
IF (sa ← layerArray[Layer]) = NIL THEN RETURN[NIL];
IF (p ← sa[Stripe]) = NIL THEN RETURN[NIL];
s ← IO.CreateStream[
streamProcs: readPartitionStreamProcs,
streamData: NEW[PartitionStreamRec ← [futureIndex: p.firstIndex]]];
END;
smallestObject: NAT ← Basics.bytesPerWord*MIN[
SIZE[NStripeDefs.Rectangle],
SIZE[NStripeDefs.BasicQuadrilateral],
SIZE[NStripeDefs.Trapezoid3]];
largestObject: NAT ← Basics.bytesPerWord*MAX[
SIZE[NStripeDefs.Rectangle],
SIZE[NStripeDefs.BasicQuadrilateral],
SIZE[NStripeDefs.Trapezoid3]];
ReadObject: PUBLIC PROC [ s: IO.STREAM, object: REF TEXT ] =
BEGIN
length: BYTE;
TRUSTED {length ← LOOPHOLE[s.GetChar[]]};
IF NOT (length IN [smallestObject..largestObject]) THEN ERROR;
IF s.GetBlock[block: object, startIndex: 0, count: length] # length THEN ERROR;
END;
PartitionUnsafeGetBlock: PROC [ self: IO.STREAM, block: Basics.UnsafeBlock ] RETURNS [ nBytesRead: INT ] =
BEGIN
p: PartitionStream = NARROW[self.streamData];
startIndex: INT ← block.startIndex;
nBytesRead ← 0;
WHILE nBytesRead<block.count DO
subCount: NAT;
IF p.bytesLeftInPartition=0 THEN
BEGIN
IF p.futureIndex<0 THEN RETURN; -- not allowed to return EndOfStream
partitionFile.SetIndex[p.futureIndex];
p.futureIndex ← GetInt[partitionFile];
p.bytesLeftInPartition ← IF p.futureIndex<0 THEN -p.futureIndex ELSE bufLength;
END;
TRUSTED {subCount ← partitionFile.UnsafeGetBlock[[block.base, startIndex, MIN[p.bytesLeftInPartition, block.count-nBytesRead]]]};
IF subCount = 0 THEN
{IF nBytesRead>0 THEN EXIT ELSE IO.Error[Overflow, self]};
p.bytesLeftInPartition ← p.bytesLeftInPartition-subCount;
startIndex ← startIndex+subCount;
nBytesRead ← nBytesRead+subCount;
ENDLOOP;
END;
PutInt: PROC [ self: IO.STREAM, n: INT ] =
TRUSTED BEGIN
self.UnsafePutBlock[
[LOOPHOLE[LONG[@n]], 0, Basics.bytesPerWord*SIZE[INT]]];
END;
GetInt: PROC [ self: IO.STREAM ] RETURNS [ n: INT ] =
TRUSTED BEGIN
IF self.UnsafeGetBlock[
[LOOPHOLE[LONG[@n]], 0, Basics.bytesPerWord*SIZE[INT]]] # Basics.bytesPerWord*SIZE[INT] THEN ERROR IO.Error[Overflow, self];
END;
END.