diff --git a/src/main/scala/commands/PackageCommand.scala b/src/main/scala/commands/PackageCommand.scala index ef88668..9cd8945 100644 --- a/src/main/scala/commands/PackageCommand.scala +++ b/src/main/scala/commands/PackageCommand.scala @@ -80,14 +80,19 @@ object PackageCommand extends Command: .zipWithIndex .buffer(32, OverflowStrategy.backpressure) val dsBroad = builder.add(Broadcast[(DatasetGraph, Long)](6)) + val flatSerialize = packageFlatSerializeFlow(metadata) + val flatSerializeBroad = builder.add(Broadcast[ByteString](2)) + val statsZip = builder.add(Zip[ByteString, (DatasetGraph, Long)]()) val checksMerge = builder.add(Merge[Unit](2)) in ~> inBroad inBroad ~> checkRdf4jFlow(metadata).async ~> checksMerge ~> sChecks inBroad ~> parseJenaBuffered.async ~> dsBroad ~> checkStructureFlow(metadata).async ~> checksMerge - dsBroad ~> statsFlow(stats).async ~> sStats + dsBroad ~> flatSerialize ~> flatSerializeBroad ~> sFlatPackage + flatSerializeBroad ~> statsZip.in0 + dsBroad ~> statsZip.in1 + statsZip.out ~> statsFlow(stats).async ~> sStats dsBroad ~> sStreamPackage - dsBroad ~> sFlatPackage dsBroad ~> sJellyPackage dsBroad ~> sSampleStream @@ -204,17 +209,18 @@ object PackageCommand extends Command: * @return a flow that computes statistics */ private def statsFlow(stats: StatCounterSuite): - Flow[(DatasetGraph, Long), (Long, StatCounterSuite.Result), NotUsed] = - Flow[(DatasetGraph, Long)] - .wireTap((_, num) => if (num + 1) % 100_000 == 0 then println(s"Stats stream at: ${num + 1}")) - .splitAfter((_, num) => - val shouldSplit = Constants.packageSizes.contains(num + 1) - if shouldSplit then println(s"Splitting stats stream at ${num + 1}") + Flow[(ByteString, (DatasetGraph, Long)), (Long, StatCounterSuite.Result), NotUsed] = + Flow[(ByteString, (DatasetGraph, Long))] + .wireTap((_, y) => if (y._2 + 1) % 100_000 == 0 then println(s"Stats stream at: ${y._2 + 1}")) + .splitAfter((_, y) => + val shouldSplit = Constants.packageSizes.contains(y._2 + 1) + if shouldSplit then println(s"Splitting stats stream at ${y._2 + 1}") shouldSplit ) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) - .map((ds, num) => { - stats.add(ds) + .map((bytes, y) => { + val (ds, num) = y + stats.add(ds, bytes) num + 1 }) .reduce((a, b) => a.max(b)) @@ -318,23 +324,7 @@ object PackageCommand extends Command: } StreamUtil.broadcastSink(sinks) - /** - * Creates a sink that writes the data to flat files - * @param metadata the metadata of the dataset - * @param outDir the directory to write the files to - * @param packages the packages to write (size, name) - * @return the sink - */ - private def packageFlatSink(metadata: MetadataInfo, outDir: Path, packages: Seq[(Long, String)]): - Sink[(DatasetGraph, Long), Seq[Future[SaveResult]]] = - val fileExtension = if metadata.streamTypes.exists(_.elementType == ElementType.Triple) then "nt" else "nq" - - val sinks = packages.map { case (size, name) => - Flow[ByteString] - .take(size) - .toMat(FileHelper.writeCompressed(outDir.resolve(s"flat_$name.$fileExtension.gz")))(Keep.right) - } - + private def packageFlatSerializeFlow(metadata: MetadataInfo): Flow[(DatasetGraph, Long), ByteString, NotUsed] = Flow[(DatasetGraph, Long)] .map((ds, _) => ds.find().asScala.toSeq.sorted) .map(quads => { @@ -350,6 +340,23 @@ object PackageCommand extends Command: os.toByteArray }) .map(ByteString.fromArrayUnsafe) + + /** + * Creates a sink that writes the data to flat files + * @param metadata the metadata of the dataset + * @param outDir the directory to write the files to + * @param packages the packages to write (size, name) + * @return the sink + */ + private def packageFlatSink(metadata: MetadataInfo, outDir: Path, packages: Seq[(Long, String)]): + Sink[ByteString, Seq[Future[SaveResult]]] = + val fileExtension = if metadata.streamTypes.exists(_.elementType == ElementType.Triple) then "nt" else "nq" + val sinks = packages.map { case (size, name) => + Flow[ByteString] + .take(size) + .toMat(FileHelper.writeCompressed(outDir.resolve(s"flat_$name.$fileExtension.gz")))(Keep.right) + } + Flow[ByteString] .toMat(StreamUtil.broadcastSink(sinks))(Keep.right) /** diff --git a/src/main/scala/util/StatCounter.scala b/src/main/scala/util/StatCounter.scala index fca56c3..446efb1 100644 --- a/src/main/scala/util/StatCounter.scala +++ b/src/main/scala/util/StatCounter.scala @@ -7,19 +7,61 @@ import org.apache.jena.rdf.model.Resource //noinspection UnstableApiUsage object StatCounter: - case class Result(sum: Long, mean: Double, stDev: Double, min: Long, max: Long, - uniqueCount: Option[Long], uniqueLowerBound: Option[Long], uniqueUpperBound: Option[Long]): + case class Result[TStat <: Double | Long] + ( + sum: Option[TStat], mean: Double, stDev: Double, min: TStat, max: TStat, uniqueCount: Option[Long], + uniqueLowerBound: Option[Long], uniqueUpperBound: Option[Long] + ): def addToRdf(statRes: Resource): Unit = - statRes.addProperty(RdfUtil.sum, sum.toString, XSDinteger) + val statDt = min match + case _: Long => XSDinteger + case _: Double => XSDdecimal + sum.foreach(c => statRes.addProperty(RdfUtil.sum, c.toString, statDt)) statRes.addProperty(RdfUtil.mean, mean.toString, XSDdecimal) statRes.addProperty(RdfUtil.stDev, stDev.toString, XSDdecimal) - statRes.addProperty(RdfUtil.minimum, min.toString, XSDinteger) - statRes.addProperty(RdfUtil.maximum, max.toString, XSDinteger) + statRes.addProperty(RdfUtil.minimum, min.toString, statDt) + statRes.addProperty(RdfUtil.maximum, max.toString, statDt) uniqueCount.foreach(c => statRes.addProperty(RdfUtil.uniqueCount, c.toString, XSDinteger)) uniqueLowerBound.foreach(c => statRes.addProperty(RdfUtil.uniqueCountLowerBound, c.toString, XSDinteger)) uniqueUpperBound.foreach(c => statRes.addProperty(RdfUtil.uniqueCountUpperBound, c.toString, XSDinteger)) -class LightStatCounter[T]: + +trait StatCounter[T, TStat <: Double | Long]: + def add(values: Seq[T]): Unit + def addUnique(values: Iterable[T]): Unit + def result: StatCounter.Result[TStat] + + +class UncountableStatCounter extends StatCounter[Double, Double]: + import StatCounter.* + + private var count: Long = 0 + private var sum: BigDecimal = 0 + private var sumSq: BigDecimal = 0 + private var min: Double = Double.MaxValue + private var max: Double = Double.MinValue + + override def add(values: Seq[Double]): Unit = addUnique(values) + + override def addUnique(values: Iterable[Double]): Unit = + values.foreach(addOne) + + def addOne(value: Double): Unit = this.synchronized { + count += 1 + sum += value + sumSq += value * value + if value < min then min = value + if value > max then max = value + } + + override def result: Result[Double] = this.synchronized { + val mean = sum.toDouble / count + val stDev = Math.sqrt(sumSq.toDouble / count - mean * mean) + Result(None, mean, stDev, min, max, None, None, None) + } + + +class LightStatCounter[T] extends StatCounter[T, Long]: import StatCounter.* private var count: Long = 0 @@ -42,13 +84,14 @@ class LightStatCounter[T]: if c > max then max = c } - def result: Result = this.synchronized { + def result: Result[Long] = this.synchronized { val mean = sum.toDouble / count val stDev = Math.sqrt(sumSq.toDouble / count - mean * mean) - Result(sum, mean, stDev, min, max, None, None, None) + Result(Some(sum), mean, stDev, min, max, None, None, None) } -class StatCounter extends LightStatCounter[String]: + +class SketchStatCounter extends LightStatCounter[String]: import StatCounter.* private val sketch = HllSketch(16) @@ -65,7 +108,7 @@ class StatCounter extends LightStatCounter[String]: } lightAdd(values.size) - override def result: Result = sketch.synchronized { + override def result: Result[Long] = sketch.synchronized { super.result.copy( uniqueCount = Some(sketch.getEstimate.toLong), uniqueLowerBound = Some(sketch.getLowerBound(2).toLong), @@ -73,6 +116,7 @@ class StatCounter extends LightStatCounter[String]: ) } + // uses sets instead of bloom filters class PreciseStatCounter[T] extends LightStatCounter[T]: private val set: scala.collection.mutable.HashSet[T] = scala.collection.mutable.HashSet.empty @@ -85,5 +129,5 @@ class PreciseStatCounter[T] extends LightStatCounter[T]: set ++= values lightAdd(values.size) - override def result: StatCounter.Result = + override def result: StatCounter.Result[Long] = super.result.copy(uniqueCount = Some(set.size.toLong)) diff --git a/src/main/scala/util/StatCounterSuite.scala b/src/main/scala/util/StatCounterSuite.scala index 56c2ab6..2c55683 100644 --- a/src/main/scala/util/StatCounterSuite.scala +++ b/src/main/scala/util/StatCounterSuite.scala @@ -6,18 +6,20 @@ import org.apache.jena.graph.{Node, NodeFactory, Node_URI, Triple} import org.apache.jena.rdf.model.{Model, Resource} import org.apache.jena.sparql.core.DatasetGraph import org.apache.jena.vocabulary.RDF +import org.apache.pekko.util.ByteString import scala.collection.mutable import scala.jdk.CollectionConverters.* object StatCounterSuite: - case class Result(iris: StatCounter.Result, blankNodes: StatCounter.Result, literals: StatCounter.Result, - plainLiterals: StatCounter.Result, dtLiterals: StatCounter.Result, - langLiterals: StatCounter.Result, datatypes: StatCounter.Result, - controlChars: StatCounter.Result, quotedTriples: StatCounter.Result, - subjects: StatCounter.Result, predicates: StatCounter.Result, - objects: StatCounter.Result, graphs: StatCounter.Result, - statements: StatCounter.Result): + case class Result(iris: StatCounter.Result[Long], blankNodes: StatCounter.Result[Long], + literals: StatCounter.Result[Long], + plainLiterals: StatCounter.Result[Long], dtLiterals: StatCounter.Result[Long], + langLiterals: StatCounter.Result[Long], datatypes: StatCounter.Result[Long], + controlChars: StatCounter.Result[Long], quotedTriples: StatCounter.Result[Long], + subjects: StatCounter.Result[Long], predicates: StatCounter.Result[Long], + objects: StatCounter.Result[Long], graphs: StatCounter.Result[Long], + statements: StatCounter.Result[Long], byteDensity: StatCounter.Result[Double]): def addToRdf(m: Model, size: Long, totalSize: Long): Resource = val toAdd = Seq( @@ -34,7 +36,8 @@ object StatCounterSuite: "PredicateCountStatistics" -> predicates, "ObjectCountStatistics" -> objects, "GraphCountStatistics" -> graphs, - "StatementCountStatistics" -> statements + "StatementCountStatistics" -> statements, + "ByteDensityStatistics" -> byteDensity ) val sizeName = Constants.packageSizeToHuman(size, true) val mainStatRes = m.createResource(RdfUtil.tempDataset.getURI + "#statistics-" + sizeName.toLowerCase) @@ -67,27 +70,27 @@ class StatCounterSuite(val size: Long): // Hack. Jena usually represents the default graph node as null, which is not great for us here. private val DEFAULT_GRAPH = NodeFactory.createBlankNode("DEFAULT GRAPH") - - // A bad heuristic: 10x the size of the stream is assumed to be the number of elements in the bloom filters - private val cIris = new StatCounter() - private val cLiterals = new StatCounter() - private val cPlainLiterals = new StatCounter() - private val cDtLiterals = new StatCounter() - private val cLangLiterals = new StatCounter() + + private val cIris = new SketchStatCounter() + private val cLiterals = new SketchStatCounter() + private val cPlainLiterals = new SketchStatCounter() + private val cDtLiterals = new SketchStatCounter() + private val cLangLiterals = new SketchStatCounter() private val cDatatypes = new PreciseStatCounter[String] private val cAsciiControlChars = LightStatCounter[Char]() private val cBlankNodes = new LightStatCounter[String]() private val cQuotedTriples = new LightStatCounter[String]() - private val cSubjects = new StatCounter() - private val cPredicates = new StatCounter() - private val cObjects = new StatCounter() - private val cGraphs = new StatCounter() - + private val cSubjects = new SketchStatCounter() + private val cPredicates = new SketchStatCounter() + private val cObjects = new SketchStatCounter() + private val cGraphs = new SketchStatCounter() private val cStatements = new LightStatCounter[String]() - def add(ds: DatasetGraph): Unit = + private val cByteDensity = new UncountableStatCounter() + + def add(ds: DatasetGraph, bytesInFlat: ByteString): Unit = if ds.getDefaultGraph.isEmpty then cGraphs.add(ds.listGraphNodes().asScala.map(_.toString()).toSeq) else @@ -163,6 +166,9 @@ class StatCounterSuite(val size: Long): cPredicates.addUnique(predicates.map(_.toString())) cObjects.addUnique(objects.map(_.toString())) cStatements.lightAdd(stCount) + + // Note: the byte count includes exactly stCount newlines. + cByteDensity.addOne(bytesInFlat.length.toDouble / stCount) private def countAsciiControlChars(s: String): Int = // 0x00–0x1F are disallowed except 0x09 (HT, tab), 0x0A (LF), 0x0D (CR) @@ -171,4 +177,4 @@ class StatCounterSuite(val size: Long): def result: StatCounterSuite.Result = StatCounterSuite.Result(cIris.result, cBlankNodes.result, cLiterals.result, cPlainLiterals.result, cDtLiterals.result, cLangLiterals.result, cDatatypes.result, cAsciiControlChars.result, cQuotedTriples.result, - cSubjects.result, cPredicates.result, cObjects.result, cGraphs.result, cStatements.result) + cSubjects.result, cPredicates.result, cObjects.result, cGraphs.result, cStatements.result, cByteDensity.result)