Skip to content

Commit

Permalink
Update deps, stabilize flat format's order
Browse files Browse the repository at this point in the history
  • Loading branch information
Ostrzyciel committed Sep 17, 2024
1 parent 8944bef commit d598489
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 28 deletions.
11 changes: 6 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ resolvers +=
"Sonatype OSS Snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots"

lazy val circeV = "0.14.7"
lazy val jellyV = "0.14.1+12-0d137338-SNAPSHOT"
lazy val jenaV = "4.10.0"
lazy val pekkoV = "1.0.2"
lazy val jellyV = "2.0.0"
lazy val jenaV = "5.1.0"
lazy val pekkoV = "1.1.0"
lazy val pekkoHttpV = "1.0.1"
lazy val pekkoConnV = "1.0.2"
lazy val rdf4jV = "4.3.11"
lazy val rdf4jV = "5.0.2"
lazy val icu4jV = "74.2"

lazy val root = (project in file("."))
Expand All @@ -20,7 +20,7 @@ lazy val root = (project in file("."))

// Scala 3 or not Scala at all
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "33.2.0-jre",
"com.google.guava" % "guava" % "33.2.1-jre",
"com.ibm.icu" % "icu4j" % icu4jV,
"eu.ostrzyciel.jelly" %% "jelly-stream" % jellyV,
"eu.ostrzyciel.jelly" %% "jelly-jena" % jellyV,
Expand Down Expand Up @@ -62,5 +62,6 @@ lazy val root = (project in file("."))
// emit deprecated warnings
scalacOptions ++= Seq(
"-deprecation",
"-Werror",
),
)
32 changes: 19 additions & 13 deletions src/main/scala/commands/PackageCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import util.io.*
import eu.ostrzyciel.jelly.core.proto.v1.PhysicalStreamType
import eu.ostrzyciel.jelly.core.{JellyOptions, LogicalStreamTypeFactory}
import eu.ostrzyciel.jelly.stream.{EncoderFlow, JellyIo}
import org.apache.jena.graph.GraphMemFactory
import org.apache.jena.rdf.model.{ModelFactory, Resource}
import org.apache.jena.riot.{Lang, RDFParser, RDFWriter}
import org.apache.jena.riot.lang.LabelToNode
import org.apache.jena.riot.system.{ErrorHandlerFactory, FactoryRDFStd}
import org.apache.jena.riot.system.{ErrorHandlerFactory, FactoryRDFStd, StreamRDFWriter}
import org.apache.jena.sparql.core.{DatasetGraph, DatasetGraphFactory}
import org.apache.pekko.stream.*
import org.apache.pekko.stream.connectors.file.TarArchiveMetadata
Expand All @@ -20,14 +21,15 @@ import org.apache.pekko.{Done, NotUsed}
import org.eclipse.rdf4j.model.vocabulary.XSD
import org.eclipse.rdf4j.rio

import java.io.{ByteArrayInputStream, FileOutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream}
import java.nio.file.{FileSystems, Path}
import java.util.UUID
import scala.concurrent.Future
import scala.jdk.CollectionConverters.*

object PackageCommand extends Command:
import eu.ostrzyciel.jelly.convert.jena.{*, given}
import RdfOrdering.given

sealed trait DistType(val weight: Int)
object DistType:
Expand All @@ -48,6 +50,8 @@ object PackageCommand extends Command:
val repoDir = FileSystems.getDefault.getPath(args(1))
val sourceArchiveFile = FileSystems.getDefault.getPath(args(2))
val outDir = FileSystems.getDefault.getPath(args(3))

GraphMemFactory.setDftGraphSameTerm(false)

val metadata = MetadataReader.read(repoDir)
val stats = new StatCounterSuite(metadata.elementCount)
Expand Down Expand Up @@ -206,11 +210,12 @@ object PackageCommand extends Command:
Flow[(DatasetGraph, Long), (Long, StatCounterSuite.Result), NotUsed] =
Flow[(DatasetGraph, Long)]
.wireTap((_, num) => if (num + 1) % 100_000 == 0 then println(s"Stats stream at: ${num + 1}"))
.splitAfter(SubstreamCancelStrategy.propagate)((_, num) =>
.splitAfter((_, num) =>
val shouldSplit = Constants.packageSizes.contains(num + 1)
if shouldSplit then println(s"Splitting stats stream at ${num + 1}")
shouldSplit
)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.map((ds, num) => {
stats.add(ds)
num + 1
Expand Down Expand Up @@ -334,19 +339,20 @@ object PackageCommand extends Command:
}

Flow[(DatasetGraph, Long)]
.map((ds, _) => {
.map((ds, _) => ds.find().asScala.toSeq.sorted)
.map(quads => {
val os = ByteArrayOutputStream()
if metadata.streamTypes.exists(_.elementType == ElementType.Triple) then
RDFWriter.create()
.lang(Lang.NTRIPLES)
.source(ds.getDefaultGraph)
.asString()
val writer = StreamRDFWriter.getWriterStream(os, Lang.NTRIPLES)
quads.foreach(q => writer.triple(q.asTriple))
writer.finish()
else
RDFWriter.create()
.lang(Lang.NQUADS)
.source(ds)
.asString()
val writer = StreamRDFWriter.getWriterStream(os, Lang.NQUADS)
quads.foreach(q => writer.quad(q))
writer.finish()
os.toByteArray
})
.map(ByteString.fromString)
.map(ByteString.fromArrayUnsafe)
.toMat(StreamUtil.broadcastSink(sinks))(Keep.right)

/**
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/commands/PackageSchemaCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ object PackageSchemaCommand extends Command:
for name <- toProcessNames do
val inPath = repoDir.resolve(s"src/$name.ttl")
val model = RDFDataMgr.loadModel(inPath.toString)
model.listSubjectsWithProperty(RDF.`type`, OWL.Ontology).asScala.toList match
model.listSubjectsWithProperty(RDF.`type`, OWL2.Ontology).asScala.toList match
case List(ontology) =>
// Update version IRI
model.removeAll(ontology, OWL2.versionIRI, null)
model.add(ontology, OWL2.versionIRI, model.createResource(s"${ontology.getURI}/$version"))

// Update imports
ontology.listProperties(OWL.imports).asScala
ontology.listProperties(OWL2.imports).asScala
.filter(_.getObject.isURIResource)
.map(_.getObject.asResource)
.filter(_.getURI.startsWith(schemaBase))
Expand All @@ -57,8 +57,8 @@ object PackageSchemaCommand extends Command:
.toSeq
.foreach { (oldRes, nameParts) =>
val newRes = model.createResource(s"$schemaBase${nameParts(0)}/$version")
model.remove(ontology, OWL.imports, oldRes)
model.add(ontology, OWL.imports, newRes)
model.remove(ontology, OWL2.imports, oldRes)
model.add(ontology, OWL2.imports, newRes)
}
case _ => ()

Expand Down
19 changes: 19 additions & 0 deletions src/main/scala/util/RdfOrdering.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.github.riverbench.ci_worker
package util

import org.apache.jena.sparql.core.Quad

object RdfOrdering:

given Ordering[Quad] with
def compare(x: Quad, y: Quad): Int =
val cmpGraph = x.getGraph.toString.compareTo(y.getGraph.toString)
if cmpGraph != 0 then cmpGraph
else
val cmpSubject = x.getSubject.toString.compareTo(y.getSubject.toString)
if cmpSubject != 0 then cmpSubject
else
val cmpPredicate = x.getPredicate.toString.compareTo(y.getPredicate.toString)
if cmpPredicate != 0 then cmpPredicate
else
x.getObject.toString.compareTo(y.getObject.toString)
12 changes: 6 additions & 6 deletions src/main/scala/util/StatCounterSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class StatCounterSuite(val size: Long):

def add(ds: DatasetGraph): Unit =
if ds.getDefaultGraph.isEmpty then
cGraphs.add(ds.listGraphNodes().asScala.map(_.toString(true)).toSeq)
cGraphs.add(ds.listGraphNodes().asScala.map(_.toString()).toSeq)
else
cGraphs.add((ds.listGraphNodes().asScala.toSeq :+ DEFAULT_GRAPH).map(_.toString(true)))
cGraphs.add((ds.listGraphNodes().asScala.toSeq :+ DEFAULT_GRAPH).map(_.toString()))

val subjects = mutable.Set[Node]()
val predicates = mutable.Set[Node]()
Expand Down Expand Up @@ -131,7 +131,7 @@ class StatCounterSuite(val size: Long):
else if n.isBlank then
blankNodes += n.getBlankNodeLabel
else if n.isLiteral then
val lit = n.toString(false)
val lit = n.toString()
controlCharCount += countAsciiControlChars(lit)
literals += lit
if n.getLiteralLanguage != "" then
Expand Down Expand Up @@ -159,9 +159,9 @@ class StatCounterSuite(val size: Long):

cQuotedTriples.lightAdd(quotedTripleCount)

cSubjects.addUnique(subjects.map(_.toString(true)))
cPredicates.addUnique(predicates.map(_.toString(true)))
cObjects.addUnique(objects.map(_.toString(true)))
cSubjects.addUnique(subjects.map(_.toString()))
cPredicates.addUnique(predicates.map(_.toString()))
cObjects.addUnique(objects.map(_.toString()))
cStatements.lightAdd(stCount)

private def countAsciiControlChars(s: String): Int =
Expand Down

0 comments on commit d598489

Please sign in to comment.