<> <> <> <> <> DIRECTORY Basics, FS, IO, IODefs, NStripeDefs, PartitionDefs, Rope; Partition: CEDAR PROGRAM IMPORTS FS, IO, IODefs EXPORTS PartitionDefs = BEGIN OPEN PartitionDefs; partitionFile: IO.STREAM _ NIL; firstAvailableIndex: INT _ 0; -- >=0 means writing, <0 means reading PartitionRec: TYPE = RECORD [ firstIndex, futureIndex: INT _ -1 , buf: REF TEXT _ NIL ]; Partition: TYPE = REF PartitionRec _ NIL; StripeArrayRec: TYPE = ARRAY stripeNumber OF Partition; StripeArray: TYPE = REF StripeArrayRec _ NIL; LayerArrayRec: TYPE = ARRAY layerNumber OF StripeArray; layerArray: REF LayerArrayRec; partitionLength: NAT = 512; headerLength: NAT = Basics.bytesPerWord*SIZE[INT]; bufLength: NAT = partitionLength-headerLength; InitPartitions: PUBLIC PROC [FileName: Rope.ROPE] = BEGIN partitionFile _ FS.StreamOpen[fileName: FileName, accessOptions: $create, createByteCount: 10000000 -- large initial extent -- ]; firstAvailableIndex _ 0; layerArray _ NEW[LayerArrayRec]; END; DestroyPartitions: PUBLIC PROC RETURNS [fileLength: INT] = BEGIN layerArray _ NIL; IF partitionFile # NIL THEN BEGIN fileLength _ partitionFile.GetLength[]; partitionFile.Close[]; partitionFile _ NIL; END ELSE fileLength _ 0; END; SendObjectToPartition: PUBLIC PROC [Stripe: stripeNumber, Layer: layerNumber, object: Basics.UnsafeBlock] = BEGIN size: BYTE; s: StripeArray; p: Partition; WHILE (s _ layerArray[Layer]) = NIL DO layerArray[Layer] _ NEW[StripeArrayRec]; ENDLOOP; WHILE (p _ s[Stripe]) = NIL DO s[Stripe] _ NEW[PartitionRec _ [buf: NEW[TEXT[bufLength]]]]; s[Stripe].buf.length _ 0; ENDLOOP; size _ object.count; TRUSTED {PartitionPutChar[p, LOOPHOLE[size, CHAR]]}; PartitionUnsafePutBlock[p, object]; END; PartitionPutChar: PROC [p: Partition, c: CHAR] = BEGIN IF p.buf.length>=p.buf.maxLength THEN EmptyFilledBuffer[p]; p.buf[p.buf.length] _ c; p.buf.length _ p.buf.length+1; END; PartitionUnsafePutBlock: PROC [ p: Partition, block: Basics.UnsafeBlock ] = BEGIN FOR i: INT IN [block.startIndex..block.startIndex+block.count) DO base: LONG POINTER TO PACKED ARRAY [0..0) OF BYTE _ LOOPHOLE [block.base]; TRUSTED { PartitionPutChar[p, LOOPHOLE [base[i], CHAR]] }; ENDLOOP; END; EmptyFilledBuffer: PROC [p: Partition] = BEGIN IF p.buf.length # p.buf.maxLength THEN ERROR; IF p.firstIndex<0 THEN p.firstIndex _ p.futureIndex _ AllocateIndex[]; partitionFile.SetIndex[p.futureIndex ! IO.EndOfStream => {LengthenFile[partitionFile]; RETRY}]; p.futureIndex _ AllocateIndex[]; PutInt[partitionFile, p.futureIndex]; partitionFile.PutBlock[p.buf, 0, p.buf.length]; p.buf.length _ 0; END; LengthenFile: PROC [ self: IO.STREAM, delta: INT _ 2000000 ] = { self.SetLength[self.GetLength+delta] }; FinishWritingPartitions: PROC = BEGIN IF firstAvailableIndex>=0 THEN BEGIN -- stop writing and set up for reading sa: StripeArray; p: Partition; FOR l: layerNumber IN layerNumber DO IF (sa _ layerArray[l]) # NIL THEN FOR s: stripeNumber IN stripeNumber DO IF (p _ sa[s]) # NIL THEN BEGIN IODefs.PostIt[IO.PutFR["Finishing partition layer %g, stripe %g...", IO.int[l], IO.int[s]]]; FinishPartition[p]; END; ENDLOOP; ENDLOOP; firstAvailableIndex _ -1; END; END; FinishPartition: PROC [p: Partition] = BEGIN IF p.buf.length=0 THEN BEGIN IF p.firstIndex # -1 THEN ERROR; END ELSE BEGIN IF p.firstIndex<0 THEN p.firstIndex _ p.futureIndex _ AllocateIndex[p.buf.length]; partitionFile.SetIndex[p.futureIndex ! IO.EndOfStream => {LengthenFile[partitionFile]; RETRY}]; PutInt[partitionFile, -p.buf.length]; partitionFile.PutBlock[p.buf, 0, p.buf.length]; END; p.buf _ NIL; END; AllocateIndex: PROC [dataLength: NAT _ bufLength] RETURNS [index: INT] = BEGIN index _ firstAvailableIndex; firstAvailableIndex _ firstAvailableIndex+headerLength+dataLength; END; PartitionStream: TYPE = REF PartitionStreamRec _ NIL; PartitionStreamRec: TYPE = RECORD [ futureIndex: INT _ -1, bytesLeftInPartition: NAT _ 0 ]; readPartitionStreamProcs: REF IO.StreamProcs _ IO.CreateStreamProcs[ variety: input, class: $MEBESPartition, unsafeGetBlock: PartitionUnsafeGetBlock]; ReadPartitionCreate: PUBLIC PROC [Stripe: stripeNumber, Layer: layerNumber] RETURNS [ s: IO.STREAM ] = BEGIN sa: StripeArray; p: Partition; FinishWritingPartitions[]; -- in case this is the first call on ReadPartitionCreate... IF (sa _ layerArray[Layer]) = NIL THEN RETURN[NIL]; IF (p _ sa[Stripe]) = NIL THEN RETURN[NIL]; s _ IO.CreateStream[ streamProcs: readPartitionStreamProcs, streamData: NEW[PartitionStreamRec _ [futureIndex: p.firstIndex]]]; END; smallestObject: NAT _ Basics.bytesPerWord*MIN[ SIZE[NStripeDefs.Rectangle], SIZE[NStripeDefs.BasicQuadrilateral], SIZE[NStripeDefs.Trapezoid3]]; largestObject: NAT _ Basics.bytesPerWord*MAX[ SIZE[NStripeDefs.Rectangle], SIZE[NStripeDefs.BasicQuadrilateral], SIZE[NStripeDefs.Trapezoid3]]; ReadObject: PUBLIC PROC [ s: IO.STREAM, object: REF TEXT ] = BEGIN length: BYTE; TRUSTED {length _ LOOPHOLE[s.GetChar[]]}; IF NOT (length IN [smallestObject..largestObject]) THEN ERROR; IF s.GetBlock[block: object, startIndex: 0, count: length] # length THEN ERROR; END; PartitionUnsafeGetBlock: PROC [ self: IO.STREAM, block: Basics.UnsafeBlock ] RETURNS [ nBytesRead: INT ] = BEGIN p: PartitionStream = NARROW[self.streamData]; startIndex: INT _ block.startIndex; nBytesRead _ 0; WHILE nBytesRead0 THEN EXIT ELSE IO.Error[Overflow, self]}; p.bytesLeftInPartition _ p.bytesLeftInPartition-subCount; startIndex _ startIndex+subCount; nBytesRead _ nBytesRead+subCount; ENDLOOP; END; PutInt: PROC [ self: IO.STREAM, n: INT ] = TRUSTED BEGIN self.UnsafePutBlock[ [LOOPHOLE[LONG[@n]], 0, Basics.bytesPerWord*SIZE[INT]]]; END; GetInt: PROC [ self: IO.STREAM ] RETURNS [ n: INT ] = TRUSTED BEGIN IF self.UnsafeGetBlock[ [LOOPHOLE[LONG[@n]], 0, Basics.bytesPerWord*SIZE[INT]]] # Basics.bytesPerWord*SIZE[INT] THEN ERROR IO.Error[Overflow, self]; END; END.