Skip to content

Commit

Permalink
Add a simplified mode to degree dist command
Browse files Browse the repository at this point in the history
  • Loading branch information
Ostrzyciel committed Jun 5, 2024
1 parent b7d8899 commit 0469d2f
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions src/main/scala/commands/batch/DegreeDistributionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@ object DegreeDistributionsCommand extends Command:
|As output it provides cardinality estimates for the number of unique triples in which subject, predicate,
|object, and graph node appears. The estimates are provided with lower and upper bounds with 95% confidence.
|
|Args: <profile-name> <version> <distribution-size> <output-nodes (0/1)> <out-dir>
|Args: <profile-name> <version> <distribution-size> <output-nodes (0/1)> <detailed (0/1)> <out-dir>
|""".stripMargin

override def validateArgs(args: Array[String]): Boolean = args.length == 6
override def validateArgs(args: Array[String]): Boolean = args.length == 7

override def run(args: Array[String]): Future[Unit] = Future {
val profileName = args(1)
val version = args(2)
val distribution = args(3)
val outputNodes = args(4).toInt
val outDir = Path.of(args(5))
val detailed = args(5).toInt
val outDir = Path.of(args(6))

println(f"Fetching profile $profileName version $version...")
val profileIri = PurlMaker.profile(profileName, version)
Expand All @@ -65,11 +66,11 @@ object DegreeDistributionsCommand extends Command:
Future.successful(())
else
datasetOutDir.toFile.mkdirs()
Await.ready(processDataset(dataset, distribution, datasetOutDir, outputNodes), 10.hours)
Await.ready(processDataset(dataset, distribution, datasetOutDir, outputNodes, detailed), 10.hours)
}


private def processDataset(datasetIri: Resource, distributionSize: String, outDir: Path, outputNodes: Int):
private def processDataset(datasetIri: Resource, distributionSize: String, outDir: Path, outputNodes: Int, detailed: Int):
Future[Unit] =
val datasetM = RDFDataMgr.loadModel(datasetIri.getURI, Lang.RDFXML)
val distribution = datasetM.listSubjectsWithProperty(RdfUtil.dctermsIdentifier, f"jelly-$distributionSize")
Expand All @@ -85,16 +86,17 @@ object DegreeDistributionsCommand extends Command:
val pMap = new mutable.HashMap[Node, (CpcSketch, LongMutable)]()
val oMap = new mutable.HashMap[Node, (CpcSketch, LongMutable)]()
val gMap = new mutable.HashMap[Node, (CpcSketch, LongMutable)]()
val stMap = new mutable.HashMap[Node, (CpcSketch, LongMutable)]()

val maps = Seq(
val maps = if detailed == 1 then Seq(
"subject" -> sMap,
"predicate" -> pMap,
"object" -> oMap,
"graph" -> gMap
)
) else Seq("statement" -> stMap)

def writeToMap(map: mutable.HashMap[Node, (CpcSketch, LongMutable)], node: Node, hashes: Array[Int]): Unit =
val (sketch, counter) = map.getOrElseUpdate(node, (CpcSketch(11), LongMutable(0L)))
def writeToMap[T](map: mutable.HashMap[T, (CpcSketch, LongMutable)], key: T, hashes: Array[Int]): Unit =
val (sketch, counter) = map.getOrElseUpdate(key, (CpcSketch(11), LongMutable(0L)))
sketch.update(hashes)
counter.value += 1

Expand All @@ -111,19 +113,27 @@ object DegreeDistributionsCommand extends Command:
val p = t.getPredicate
val o = t.getObject
val hashes = Array(s.hashCode, p.hashCode, o.hashCode)
writeToMap(sMap, s, hashes)
writeToMap(pMap, p, hashes)
writeToMap(oMap, o, hashes)
if detailed == 1 then
writeToMap(sMap, s, hashes)
writeToMap(pMap, p, hashes)
writeToMap(oMap, o, hashes)
else
for n <- Seq(s, p, o) do
writeToMap(stMap, n, hashes)
case q: Quad =>
val s = q.getSubject
val p = q.getPredicate
val o = q.getObject
val g = q.getGraph
val hashes = Array(s.hashCode, p.hashCode, o.hashCode, g.hashCode)
writeToMap(sMap, s, hashes)
writeToMap(pMap, p, hashes)
writeToMap(oMap, o, hashes)
writeToMap(gMap, g, hashes)
if detailed == 1 then
writeToMap(sMap, s, hashes)
writeToMap(pMap, p, hashes)
writeToMap(oMap, o, hashes)
writeToMap(gMap, g, hashes)
else
for n <- Seq(s, p, o, g) do
writeToMap(stMap, n, hashes)
}

streamFuture map { _ =>
Expand All @@ -141,7 +151,7 @@ object DegreeDistributionsCommand extends Command:
val data = f"${counter.value}\t" +
f"${sketch.getLowerBound(2)}\t${sketch.getEstimate}\t${sketch.getUpperBound(2)}\n"
if outputNodes == 1 then
os.write(f"${node.toString(true)}\t$data")
os.write(f"${node.toString}\t$data")
else
os.write(data)
os.close()
Expand Down

0 comments on commit 0469d2f

Please sign in to comment.