DIRECTORY
Basics, FS, IO, IODefs, NStripeDefs, PartitionDefs, Rope;
Partition:
CEDAR
PROGRAM
IMPORTS
FS,
IO, IODefs
EXPORTS PartitionDefs =
BEGIN OPEN PartitionDefs;
partitionFile: IO.STREAM ← NIL;
firstAvailableIndex: INT ← 0; -- >=0 means writing, <0 means reading
PartitionRec: TYPE = RECORD [ firstIndex, futureIndex: INT ← -1 , buf: REF TEXT ← NIL ];
Partition: TYPE = REF PartitionRec ← NIL;
StripeArrayRec: TYPE = ARRAY stripeNumber OF Partition;
StripeArray: TYPE = REF StripeArrayRec ← NIL;
LayerArrayRec: TYPE = ARRAY layerNumber OF StripeArray;
layerArray: REF LayerArrayRec;
partitionLength: NAT = 512;
headerLength: NAT = Basics.bytesPerWord*SIZE[INT];
bufLength: NAT = partitionLength-headerLength;
InitPartitions:
PUBLIC
PROC [FileName: Rope.
ROPE] =
BEGIN
partitionFile ← FS.StreamOpen[fileName: FileName, accessOptions: $create, createByteCount: 10000000 -- large initial extent -- ];
firstAvailableIndex ← 0;
layerArray ← NEW[LayerArrayRec];
END;
DestroyPartitions:
PUBLIC
PROC
RETURNS [fileLength:
INT] =
BEGIN
layerArray ← NIL;
IF partitionFile #
NIL
THEN
BEGIN
fileLength ← partitionFile.GetLength[];
partitionFile.Close[];
partitionFile ← NIL;
END
ELSE fileLength ← 0;
END;
SendObjectToPartition:
PUBLIC
PROC [Stripe: stripeNumber, Layer: layerNumber,
object: Basics.UnsafeBlock] =
BEGIN
size: BYTE;
s: StripeArray;
p: Partition;
WHILE (s ← layerArray[Layer]) =
NIL
DO
layerArray[Layer] ← NEW[StripeArrayRec];
ENDLOOP;
WHILE (p ← s[Stripe]) =
NIL
DO
s[Stripe] ← NEW[PartitionRec ← [buf: NEW[TEXT[bufLength]]]];
s[Stripe].buf.length ← 0;
ENDLOOP;
size ← object.count;
TRUSTED {PartitionPutChar[p, LOOPHOLE[size, CHAR]]};
PartitionUnsafePutBlock[p, object];
END;
PartitionPutChar:
PROC [p: Partition, c:
CHAR] =
BEGIN
IF p.buf.length>=p.buf.maxLength THEN EmptyFilledBuffer[p];
p.buf[p.buf.length] ← c;
p.buf.length ← p.buf.length+1;
END;
PartitionUnsafePutBlock:
PROC [ p: Partition, block: Basics.UnsafeBlock ] =
BEGIN
FOR i:
INT
IN [block.startIndex..block.startIndex+block.count)
DO
base: LONG POINTER TO PACKED ARRAY [0..0) OF BYTE ← LOOPHOLE [block.base];
TRUSTED { PartitionPutChar[p, LOOPHOLE [base[i], CHAR]] };
ENDLOOP;
END;
EmptyFilledBuffer:
PROC [p: Partition] =
BEGIN
IF p.buf.length # p.buf.maxLength THEN ERROR;
IF p.firstIndex<0 THEN p.firstIndex ← p.futureIndex ← AllocateIndex[];
partitionFile.SetIndex[p.futureIndex
! IO.EndOfStream => {LengthenFile[partitionFile]; RETRY}];
p.futureIndex ← AllocateIndex[];
PutInt[partitionFile, p.futureIndex];
partitionFile.PutBlock[p.buf, 0, p.buf.length];
p.buf.length ← 0;
END;
LengthenFile:
PROC [ self:
IO.
STREAM, delta:
INT ← 2000000 ] =
{ self.SetLength[self.GetLength+delta] };
FinishWritingPartitions:
PROC =
BEGIN
IF firstAvailableIndex>=0
THEN
BEGIN -- stop writing and set up for reading
sa: StripeArray;
p: Partition;
FOR l: layerNumber
IN layerNumber
DO
IF (sa ← layerArray[l]) #
NIL
THEN
FOR s: stripeNumber
IN stripeNumber
DO
IF (p ← sa[s]) #
NIL
THEN
BEGIN
IODefs.PostIt[
IO.PutFR["Finishing partition layer %g, stripe %g...",
IO.int[l], IO.int[s]]];
FinishPartition[p];
END;
ENDLOOP;
ENDLOOP;
firstAvailableIndex ← -1;
END;
END;
FinishPartition:
PROC [p: Partition] =
BEGIN
IF p.buf.length=0
THEN
BEGIN
IF p.firstIndex # -1 THEN ERROR;
END
ELSE
BEGIN
IF p.firstIndex<0 THEN p.firstIndex ← p.futureIndex ← AllocateIndex[p.buf.length];
partitionFile.SetIndex[p.futureIndex
! IO.EndOfStream => {LengthenFile[partitionFile]; RETRY}];
PutInt[partitionFile, -p.buf.length];
partitionFile.PutBlock[p.buf, 0, p.buf.length];
END;
p.buf ← NIL;
END;
AllocateIndex:
PROC [dataLength:
NAT ← bufLength]
RETURNS [index:
INT] =
BEGIN
index ← firstAvailableIndex;
firstAvailableIndex ← firstAvailableIndex+headerLength+dataLength;
END;
PartitionStream: TYPE = REF PartitionStreamRec ← NIL;
PartitionStreamRec:
TYPE =
RECORD [
futureIndex: INT ← -1,
bytesLeftInPartition: NAT ← 0
];
readPartitionStreamProcs:
REF
IO.StreamProcs ←
IO.CreateStreamProcs[
variety: input,
class: $MEBESPartition,
unsafeGetBlock: PartitionUnsafeGetBlock];
ReadPartitionCreate:
PUBLIC
PROC [Stripe: stripeNumber, Layer: layerNumber]
RETURNS [ s: IO.STREAM ] =
BEGIN
sa: StripeArray;
p: Partition;
FinishWritingPartitions[]; -- in case this is the first call on ReadPartitionCreate...
IF (sa ← layerArray[Layer]) = NIL THEN RETURN[NIL];
IF (p ← sa[Stripe]) = NIL THEN RETURN[NIL];
s ←
IO.CreateStream[
streamProcs: readPartitionStreamProcs,
streamData: NEW[PartitionStreamRec ← [futureIndex: p.firstIndex]]];
END;
smallestObject:
NAT ← Basics.bytesPerWord*
MIN[
SIZE[NStripeDefs.Rectangle],
SIZE[NStripeDefs.BasicQuadrilateral],
SIZE[NStripeDefs.Trapezoid3]];
largestObject:
NAT ← Basics.bytesPerWord*
MAX[
SIZE[NStripeDefs.Rectangle],
SIZE[NStripeDefs.BasicQuadrilateral],
SIZE[NStripeDefs.Trapezoid3]];
ReadObject:
PUBLIC
PROC [ s:
IO.
STREAM, object:
REF
TEXT ] =
BEGIN
length: BYTE;
TRUSTED {length ← LOOPHOLE[s.GetChar[]]};
IF NOT (length IN [smallestObject..largestObject]) THEN ERROR;
IF s.GetBlock[block: object, startIndex: 0, count: length] # length THEN ERROR;
END;
PartitionUnsafeGetBlock:
PROC [ self:
IO.
STREAM, block: Basics.UnsafeBlock ]
RETURNS [ nBytesRead:
INT ] =
BEGIN
p: PartitionStream = NARROW[self.streamData];
startIndex: INT ← block.startIndex;
nBytesRead ← 0;
WHILE nBytesRead<block.count
DO
subCount: NAT;
IF p.bytesLeftInPartition=0
THEN
BEGIN
IF p.futureIndex<0 THEN RETURN; -- not allowed to return EndOfStream
partitionFile.SetIndex[p.futureIndex];
p.futureIndex ← GetInt[partitionFile];
p.bytesLeftInPartition ← IF p.futureIndex<0 THEN -p.futureIndex ELSE bufLength;
END;
TRUSTED {subCount ← partitionFile.UnsafeGetBlock[[block.base, startIndex, MIN[p.bytesLeftInPartition, block.count-nBytesRead]]]};
IF subCount = 0
THEN
{IF nBytesRead>0 THEN EXIT ELSE IO.Error[Overflow, self]};
p.bytesLeftInPartition ← p.bytesLeftInPartition-subCount;
startIndex ← startIndex+subCount;
nBytesRead ← nBytesRead+subCount;
ENDLOOP;
END;
PutInt:
PROC [ self:
IO.
STREAM, n:
INT ] =
TRUSTED BEGIN
self.UnsafePutBlock[
[LOOPHOLE[LONG[@n]], 0, Basics.bytesPerWord*SIZE[INT]]];
END;
GetInt:
PROC [ self:
IO.
STREAM ]
RETURNS [ n:
INT ] =
TRUSTED BEGIN
IF self.UnsafeGetBlock[
[LOOPHOLE[LONG[@n]], 0, Basics.bytesPerWord*SIZE[INT]]] # Basics.bytesPerWord*SIZE[INT] THEN ERROR IO.Error[Overflow, self];
END;
END.