Skip to content

Commit

Permalink
feat: parse stages from pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed May 1, 2024
1 parent aa1749a commit f844b43
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 35 deletions.
94 changes: 73 additions & 21 deletions src/main/kotlin/runner/Parser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,52 @@ private fun QuerySolution.toProcessor(): Class<Processor> {
.let { it as Class<Processor> }
}

private fun QuerySolution.toStage(processors: List<Class<Processor>>): Processor {
val byName = processors.associateBy { it.simpleName }

// Extract the list of arguments.
val name = this["processor"]
.toString()
.substringAfterLast("#")

val values = this["values"].toString().split(";")

val keys = this["keys"]
.toString()
.split(";")
.map { it.substringAfterLast("#") }

val kinds = this["kinds"]
.toString()
.split(";")
.map { it.substringAfterLast("#") }

// Retrieve a class instance of the Processor.
val processor = byName[name] ?: Log.shared.fatal("Processor $name not found")
val args = mutableMapOf<String, Any>()

for (i in keys.indices) {
val key = keys[i]
val value = values[i]
val kind = kinds[i]

Log.shared.debug("$key: $kind = $value")

args[key] = when (kind) {
"integer" -> value.toInt()
"ChannelWriter" -> bridge
"ChannelReader" -> bridge
else -> Log.shared.fatal("Unknown kind $kind")
}
}

val constructor = processor.getConstructor(Map::class.java)
return constructor.newInstance(args)
}

// TODO: Create some sort of a factory.
val bridge = MemoryBridge()

class Parser(file: File) {
private val model = ModelFactory.createDefaultModel()

Expand Down Expand Up @@ -67,6 +113,9 @@ class Parser(file: File) {
imported.add(uri)
}

// Import original onthology into the model.
model.add(onthology)

// Validate using SHACL.
val report = ShaclValidator.get().validate(model.graph, model.graph)

Expand Down Expand Up @@ -108,26 +157,29 @@ class Parser(file: File) {
val processors = getProcessors()
Log.shared.info("Parsing stages")

// Initialize the channel.
val bridge = MemoryBridge()

// Initialize the producer.
val producerClass = processors[0]
val producerArgs: MutableMap<String, Any> = mutableMapOf()
producerArgs["start"] = 0
producerArgs["end"] = 5
producerArgs["step"] = 1
producerArgs["writer"] = bridge
val producerConstructor = producerClass.getConstructor(Map::class.java)
val producer = producerConstructor.newInstance(producerArgs)

// Initialize the consumer.
val consumerClass = processors[1]
val consumerArgs: MutableMap<String, Any> = mutableMapOf()
consumerArgs["reader"] = bridge
val consumerConstructor = consumerClass.getConstructor(Map::class.java)
val consumer = consumerConstructor.newInstance(consumerArgs)

return listOf(producer, consumer)
// Execute the stages query.
val query = this.javaClass.getResource("/queries/stages.sparql")
?.readText()
?.let { QueryFactory.create(it) }

// Execute the query.
val iter = QueryExecutionFactory
.create(query, model)
.execSelect()

if (!iter.hasNext()) {
Log.shared.fatal("No processors found in the configuration")
}

val result = mutableListOf<Processor>()

while (iter.hasNext()) {
val solution = iter.nextSolution()
val stage = solution.toStage(processors)
Log.shared.info("Stage ${stage.javaClass.name} initialised successfully")
result.add(stage)
}

return result
}
}
8 changes: 8 additions & 0 deletions src/main/resources/queries/stages.sparql
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ SELECT
?processor
(group_concat(?key;separator=";") as ?keys)
(group_concat(?value;separator=";") as ?values)
(group_concat(?kind;separator=";") as ?kinds)

WHERE {
?processor a jvm:Processor.
?stage a ?processor.
?stage ?key ?value.
?shape sh:property ?property.
?property sh:path ?key.

OPTIONAL { ?property sh:datatype ?type. }
OPTIONAL { ?property sh:class ?class. }
BIND(COALESCE(?type, ?class) as ?kind )

FILTER (?key != "http://www.w3.org/1999/02/22-rdf-syntax-ns#type")
}

Expand Down
6 changes: 3 additions & 3 deletions src/test/resources/pipelines/range_reporter.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
[]
a jvm:Range;
jvm:start "0"^^xsd:integer;
jvm:end "100"^^xsd:integer;
jvm:end "10"^^xsd:integer;
jvm:step "1"^^xsd:integer;
jvm:outgoing <writer>.
jvm:output <writer>.

# Define a reporter.
[]
a jvm:Reporter;
jvm:incoming <reader>.
jvm:input <reader>.
12 changes: 6 additions & 6 deletions src/test/resources/processors/range.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ jvm:Range a jvm:Processor;
sh:property [
sh:path jvm:start;
sh:name "start";
sh:datatype xsd:int;
sh:datatype xsd:integer;
sh:minCount 1;
sh:maxCount 1;
], [
sh:path jvm:end;
sh:name "end";
sh:datatype xsd:int;
sh:datatype xsd:integer;
sh:minCount 1;
sh:maxCount 1;
], [
sh:path jvm:step;
sh:name "step";
sh:datatype xsd:int;
sh:datatype xsd:integer;
sh:minCount 1;
sh:maxCount 1;
], [
sh:path jvm:outgoing;
sh:name "outgoing";
sh:class jvm:WriterChannel;
sh:path jvm:output;
sh:name "output";
sh:class jvm:ChannelWriter;
sh:minCount 1;
sh:maxCount 1;
];
Expand Down
6 changes: 3 additions & 3 deletions src/test/resources/processors/reporter.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ jvm:Reporter a jvm:Processor;
[] a sh:NodeShape;
sh:targetClass jvm:Reporter;
sh:property [
sh:path jvm:incoming;
sh:name "incoming";
sh:class jvm:ReaderChannel;;
sh:path jvm:input;
sh:name "input";
sh:class jvm:ChannelReader;
sh:minCount 1;
sh:maxCount 1;
];
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/sources/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public Range(Map<String, Object> args) {
this.step = this.getArgument("step");

// Channels
this.writer = this.getArgument("writer");
this.writer = this.getArgument("output");
}

public void exec() {
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/sources/Reporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public Reporter(Map<String, Object> args) {
super(args);

// Parameters
this.reader = this.getArgument("reader");
this.reader = this.getArgument("input");
}

public void exec() {
Expand Down

0 comments on commit f844b43

Please sign in to comment.