Skip to content

Commit

Permalink
Add stats about bytes per statement
Browse files Browse the repository at this point in the history
  • Loading branch information
Ostrzyciel committed Sep 19, 2024
1 parent a5bacbf commit 95fa89e
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 60 deletions.
61 changes: 34 additions & 27 deletions src/main/scala/commands/PackageCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,19 @@ object PackageCommand extends Command:
.zipWithIndex
.buffer(32, OverflowStrategy.backpressure)
val dsBroad = builder.add(Broadcast[(DatasetGraph, Long)](6))
val flatSerialize = packageFlatSerializeFlow(metadata)
val flatSerializeBroad = builder.add(Broadcast[ByteString](2))
val statsZip = builder.add(Zip[ByteString, (DatasetGraph, Long)]())
val checksMerge = builder.add(Merge[Unit](2))

in ~> inBroad
inBroad ~> checkRdf4jFlow(metadata).async ~> checksMerge ~> sChecks
inBroad ~> parseJenaBuffered.async ~> dsBroad ~> checkStructureFlow(metadata).async ~> checksMerge
dsBroad ~> statsFlow(stats).async ~> sStats
dsBroad ~> flatSerialize ~> flatSerializeBroad ~> sFlatPackage
flatSerializeBroad ~> statsZip.in0
dsBroad ~> statsZip.in1
statsZip.out ~> statsFlow(stats).async ~> sStats
dsBroad ~> sStreamPackage
dsBroad ~> sFlatPackage
dsBroad ~> sJellyPackage
dsBroad ~> sSampleStream

Expand Down Expand Up @@ -204,17 +209,18 @@ object PackageCommand extends Command:
* @return a flow that computes statistics
*/
private def statsFlow(stats: StatCounterSuite):
Flow[(DatasetGraph, Long), (Long, StatCounterSuite.Result), NotUsed] =
Flow[(DatasetGraph, Long)]
.wireTap((_, num) => if (num + 1) % 100_000 == 0 then println(s"Stats stream at: ${num + 1}"))
.splitAfter((_, num) =>
val shouldSplit = Constants.packageSizes.contains(num + 1)
if shouldSplit then println(s"Splitting stats stream at ${num + 1}")
Flow[(ByteString, (DatasetGraph, Long)), (Long, StatCounterSuite.Result), NotUsed] =
Flow[(ByteString, (DatasetGraph, Long))]
.wireTap((_, y) => if (y._2 + 1) % 100_000 == 0 then println(s"Stats stream at: ${y._2 + 1}"))
.splitAfter((_, y) =>
val shouldSplit = Constants.packageSizes.contains(y._2 + 1)
if shouldSplit then println(s"Splitting stats stream at ${y._2 + 1}")
shouldSplit
)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.map((ds, num) => {
stats.add(ds)
.map((bytes, y) => {
val (ds, num) = y
stats.add(ds, bytes)
num + 1
})
.reduce((a, b) => a.max(b))
Expand Down Expand Up @@ -318,23 +324,7 @@ object PackageCommand extends Command:
}
StreamUtil.broadcastSink(sinks)

/**
* Creates a sink that writes the data to flat files
* @param metadata the metadata of the dataset
* @param outDir the directory to write the files to
* @param packages the packages to write (size, name)
* @return the sink
*/
private def packageFlatSink(metadata: MetadataInfo, outDir: Path, packages: Seq[(Long, String)]):
Sink[(DatasetGraph, Long), Seq[Future[SaveResult]]] =
val fileExtension = if metadata.streamTypes.exists(_.elementType == ElementType.Triple) then "nt" else "nq"

val sinks = packages.map { case (size, name) =>
Flow[ByteString]
.take(size)
.toMat(FileHelper.writeCompressed(outDir.resolve(s"flat_$name.$fileExtension.gz")))(Keep.right)
}

private def packageFlatSerializeFlow(metadata: MetadataInfo): Flow[(DatasetGraph, Long), ByteString, NotUsed] =
Flow[(DatasetGraph, Long)]
.map((ds, _) => ds.find().asScala.toSeq.sorted)
.map(quads => {
Expand All @@ -350,6 +340,23 @@ object PackageCommand extends Command:
os.toByteArray
})
.map(ByteString.fromArrayUnsafe)

/**
* Creates a sink that writes the data to flat files
* @param metadata the metadata of the dataset
* @param outDir the directory to write the files to
* @param packages the packages to write (size, name)
* @return the sink
*/
private def packageFlatSink(metadata: MetadataInfo, outDir: Path, packages: Seq[(Long, String)]):
Sink[ByteString, Seq[Future[SaveResult]]] =
val fileExtension = if metadata.streamTypes.exists(_.elementType == ElementType.Triple) then "nt" else "nq"
val sinks = packages.map { case (size, name) =>
Flow[ByteString]
.take(size)
.toMat(FileHelper.writeCompressed(outDir.resolve(s"flat_$name.$fileExtension.gz")))(Keep.right)
}
Flow[ByteString]
.toMat(StreamUtil.broadcastSink(sinks))(Keep.right)

/**
Expand Down
66 changes: 55 additions & 11 deletions src/main/scala/util/StatCounter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,61 @@ import org.apache.jena.rdf.model.Resource

//noinspection UnstableApiUsage
object StatCounter:
case class Result(sum: Long, mean: Double, stDev: Double, min: Long, max: Long,
uniqueCount: Option[Long], uniqueLowerBound: Option[Long], uniqueUpperBound: Option[Long]):
case class Result[TStat <: Double | Long]
(
sum: Option[TStat], mean: Double, stDev: Double, min: TStat, max: TStat, uniqueCount: Option[Long],
uniqueLowerBound: Option[Long], uniqueUpperBound: Option[Long]
):
def addToRdf(statRes: Resource): Unit =
statRes.addProperty(RdfUtil.sum, sum.toString, XSDinteger)
val statDt = min match
case _: Long => XSDinteger
case _: Double => XSDdecimal
sum.foreach(c => statRes.addProperty(RdfUtil.sum, c.toString, statDt))
statRes.addProperty(RdfUtil.mean, mean.toString, XSDdecimal)
statRes.addProperty(RdfUtil.stDev, stDev.toString, XSDdecimal)
statRes.addProperty(RdfUtil.minimum, min.toString, XSDinteger)
statRes.addProperty(RdfUtil.maximum, max.toString, XSDinteger)
statRes.addProperty(RdfUtil.minimum, min.toString, statDt)
statRes.addProperty(RdfUtil.maximum, max.toString, statDt)
uniqueCount.foreach(c => statRes.addProperty(RdfUtil.uniqueCount, c.toString, XSDinteger))
uniqueLowerBound.foreach(c => statRes.addProperty(RdfUtil.uniqueCountLowerBound, c.toString, XSDinteger))
uniqueUpperBound.foreach(c => statRes.addProperty(RdfUtil.uniqueCountUpperBound, c.toString, XSDinteger))

class LightStatCounter[T]:

trait StatCounter[T, TStat <: Double | Long]:
def add(values: Seq[T]): Unit
def addUnique(values: Iterable[T]): Unit
def result: StatCounter.Result[TStat]


class UncountableStatCounter extends StatCounter[Double, Double]:
import StatCounter.*

private var count: Long = 0
private var sum: BigDecimal = 0
private var sumSq: BigDecimal = 0
private var min: Double = Double.MaxValue
private var max: Double = Double.MinValue

override def add(values: Seq[Double]): Unit = addUnique(values)

override def addUnique(values: Iterable[Double]): Unit =
values.foreach(addOne)

def addOne(value: Double): Unit = this.synchronized {
count += 1
sum += value
sumSq += value * value
if value < min then min = value
if value > max then max = value
}

override def result: Result[Double] = this.synchronized {
val mean = sum.toDouble / count
val stDev = Math.sqrt(sumSq.toDouble / count - mean * mean)
Result(None, mean, stDev, min, max, None, None, None)
}


class LightStatCounter[T] extends StatCounter[T, Long]:
import StatCounter.*

private var count: Long = 0
Expand All @@ -42,13 +84,14 @@ class LightStatCounter[T]:
if c > max then max = c
}

def result: Result = this.synchronized {
def result: Result[Long] = this.synchronized {
val mean = sum.toDouble / count
val stDev = Math.sqrt(sumSq.toDouble / count - mean * mean)
Result(sum, mean, stDev, min, max, None, None, None)
Result(Some(sum), mean, stDev, min, max, None, None, None)
}

class StatCounter extends LightStatCounter[String]:

class SketchStatCounter extends LightStatCounter[String]:
import StatCounter.*

private val sketch = HllSketch(16)
Expand All @@ -65,14 +108,15 @@ class StatCounter extends LightStatCounter[String]:
}
lightAdd(values.size)

override def result: Result = sketch.synchronized {
override def result: Result[Long] = sketch.synchronized {
super.result.copy(
uniqueCount = Some(sketch.getEstimate.toLong),
uniqueLowerBound = Some(sketch.getLowerBound(2).toLong),
uniqueUpperBound = Some(sketch.getUpperBound(2).toLong)
)
}


// uses sets instead of bloom filters
class PreciseStatCounter[T] extends LightStatCounter[T]:
private val set: scala.collection.mutable.HashSet[T] = scala.collection.mutable.HashSet.empty
Expand All @@ -85,5 +129,5 @@ class PreciseStatCounter[T] extends LightStatCounter[T]:
set ++= values
lightAdd(values.size)

override def result: StatCounter.Result =
override def result: StatCounter.Result[Long] =
super.result.copy(uniqueCount = Some(set.size.toLong))
50 changes: 28 additions & 22 deletions src/main/scala/util/StatCounterSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import org.apache.jena.graph.{Node, NodeFactory, Node_URI, Triple}
import org.apache.jena.rdf.model.{Model, Resource}
import org.apache.jena.sparql.core.DatasetGraph
import org.apache.jena.vocabulary.RDF
import org.apache.pekko.util.ByteString

import scala.collection.mutable
import scala.jdk.CollectionConverters.*

object StatCounterSuite:
case class Result(iris: StatCounter.Result, blankNodes: StatCounter.Result, literals: StatCounter.Result,
plainLiterals: StatCounter.Result, dtLiterals: StatCounter.Result,
langLiterals: StatCounter.Result, datatypes: StatCounter.Result,
controlChars: StatCounter.Result, quotedTriples: StatCounter.Result,
subjects: StatCounter.Result, predicates: StatCounter.Result,
objects: StatCounter.Result, graphs: StatCounter.Result,
statements: StatCounter.Result):
case class Result(iris: StatCounter.Result[Long], blankNodes: StatCounter.Result[Long],
literals: StatCounter.Result[Long],
plainLiterals: StatCounter.Result[Long], dtLiterals: StatCounter.Result[Long],
langLiterals: StatCounter.Result[Long], datatypes: StatCounter.Result[Long],
controlChars: StatCounter.Result[Long], quotedTriples: StatCounter.Result[Long],
subjects: StatCounter.Result[Long], predicates: StatCounter.Result[Long],
objects: StatCounter.Result[Long], graphs: StatCounter.Result[Long],
statements: StatCounter.Result[Long], byteDensity: StatCounter.Result[Double]):

def addToRdf(m: Model, size: Long, totalSize: Long): Resource =
val toAdd = Seq(
Expand All @@ -34,7 +36,8 @@ object StatCounterSuite:
"PredicateCountStatistics" -> predicates,
"ObjectCountStatistics" -> objects,
"GraphCountStatistics" -> graphs,
"StatementCountStatistics" -> statements
"StatementCountStatistics" -> statements,
"ByteDensityStatistics" -> byteDensity
)
val sizeName = Constants.packageSizeToHuman(size, true)
val mainStatRes = m.createResource(RdfUtil.tempDataset.getURI + "#statistics-" + sizeName.toLowerCase)
Expand Down Expand Up @@ -67,27 +70,27 @@ class StatCounterSuite(val size: Long):

// Hack. Jena usually represents the default graph node as null, which is not great for us here.
private val DEFAULT_GRAPH = NodeFactory.createBlankNode("DEFAULT GRAPH")

// A bad heuristic: 10x the size of the stream is assumed to be the number of elements in the bloom filters
private val cIris = new StatCounter()
private val cLiterals = new StatCounter()
private val cPlainLiterals = new StatCounter()
private val cDtLiterals = new StatCounter()
private val cLangLiterals = new StatCounter()

private val cIris = new SketchStatCounter()
private val cLiterals = new SketchStatCounter()
private val cPlainLiterals = new SketchStatCounter()
private val cDtLiterals = new SketchStatCounter()
private val cLangLiterals = new SketchStatCounter()
private val cDatatypes = new PreciseStatCounter[String]

private val cAsciiControlChars = LightStatCounter[Char]()
private val cBlankNodes = new LightStatCounter[String]()
private val cQuotedTriples = new LightStatCounter[String]()

private val cSubjects = new StatCounter()
private val cPredicates = new StatCounter()
private val cObjects = new StatCounter()
private val cGraphs = new StatCounter()

private val cSubjects = new SketchStatCounter()
private val cPredicates = new SketchStatCounter()
private val cObjects = new SketchStatCounter()
private val cGraphs = new SketchStatCounter()
private val cStatements = new LightStatCounter[String]()

def add(ds: DatasetGraph): Unit =
private val cByteDensity = new UncountableStatCounter()

def add(ds: DatasetGraph, bytesInFlat: ByteString): Unit =
if ds.getDefaultGraph.isEmpty then
cGraphs.add(ds.listGraphNodes().asScala.map(_.toString()).toSeq)
else
Expand Down Expand Up @@ -163,6 +166,9 @@ class StatCounterSuite(val size: Long):
cPredicates.addUnique(predicates.map(_.toString()))
cObjects.addUnique(objects.map(_.toString()))
cStatements.lightAdd(stCount)

// Note: the byte count includes exactly stCount newlines.
cByteDensity.addOne(bytesInFlat.length.toDouble / stCount)

private def countAsciiControlChars(s: String): Int =
// 0x00–0x1F are disallowed except 0x09 (HT, tab), 0x0A (LF), 0x0D (CR)
Expand All @@ -171,4 +177,4 @@ class StatCounterSuite(val size: Long):
def result: StatCounterSuite.Result =
StatCounterSuite.Result(cIris.result, cBlankNodes.result, cLiterals.result, cPlainLiterals.result,
cDtLiterals.result, cLangLiterals.result, cDatatypes.result, cAsciiControlChars.result, cQuotedTriples.result,
cSubjects.result, cPredicates.result, cObjects.result, cGraphs.result, cStatements.result)
cSubjects.result, cPredicates.result, cObjects.result, cGraphs.result, cStatements.result, cByteDensity.result)

0 comments on commit 95fa89e

Please sign in to comment.