Skip to content

Commit

Permalink
feat: pipelines and packages
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Jul 8, 2024
1 parent fe26234 commit 719e5b1
Show file tree
Hide file tree
Showing 34 changed files with 540 additions and 264 deletions.
12 changes: 9 additions & 3 deletions processors/shacl-validator-ts/index.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

<Validator>
a rdfc:Processor ;
rdfc:target rdfc:node ;
[]
a rdfc:Package ;
rdfc:version "0.0.1" ;
rdfc:author "Jens Pots" ;
rdfc:description "A SHACL Validator processor, written in TypeScript." ;
rdfc:repo "https://github.com/rdf-connect/orchestrator.git" ;
rdfc:license "MIT" ;
rdfc:prepare "npm run build" ;
rdfc:processors <Validator> .

<Validator>
a rdfc:Processor ;
rdfc:target rdfc:node ;
rdfc:entrypoint "build/index.js" .

[]
Expand Down
32 changes: 18 additions & 14 deletions src/main/kotlin/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@ import kotlinx.coroutines.runBlocking
import technology.idlab.parser.Parser

fun main(args: Array<String>) = runBlocking {
/**
* At the moment, the only argument that the runtime accepts is the path to the pipeline
* declaration file.
*/
// Retrieve the pipeline configuration path from the CLI arguments.
if (args.size != 1) {
println("Usage: jvm-runner <config>")
println("Usage: rdfc <config>")
exitProcess(0)
}

/**
* We start off by parsing the configuration file. This file contains the list of processors and
* stages that the runtime should prepare, as well as channel declarations.
*/
val configPath = args[0]
val config = File(configPath)
val parser = Parser.create(config)
val processors = parser.processors()
val stages = parser.stages()
// Load the configuration file.
val path = args[0]
val file = File(path)

// Parse said config to a IRPipeline.
val parser = Parser(file)

// Parse the pipeline out of the configuration file.
val pipeline = parser.pipelines[0]

// From all packages, retrieve all processors.
val processors = parser.processors

// Start the orchestrator.
val orchestrator = Orchestrator(pipeline, processors)
orchestrator.exec()
}
16 changes: 10 additions & 6 deletions src/main/kotlin/Orchestrator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,36 @@ import kotlinx.coroutines.withTimeout
import runner.Runner
import runner.impl.NodeRunner
import runner.jvm.JVMRunner
import technology.idlab.parser.intermediate.IRPipeline
import technology.idlab.parser.intermediate.IRProcessor
import technology.idlab.parser.intermediate.IRStage
import technology.idlab.util.Log

class Orchestrator(stages: Set<IRStage>) {
class Orchestrator(private val pipeline: IRPipeline, processors: List<IRProcessor>) {
/** An exhaustive list of all runners. */
private val channel = Channel<Runner.Payload>()
private val jvmRunner = JVMRunner(channel)
private val nodeRunner = NodeRunner(channel, 5000)
private val runners = listOf(nodeRunner, jvmRunner)

private val processors = processors.associateBy { it.uri }

/** A map of all channel URIs and their readers. */
private val readers = mutableMapOf<String, Runner>()

init {
/** Initialize the processors and stages in the runtimes. */
runBlocking { stages.forEach { stage -> prepare(stage) } }
runBlocking { pipeline.stages.forEach { prepare(it) } }
}

/** Prepare a stage inside of it's corresponding runtime. */
private suspend fun prepare(stage: IRStage) {
// Get the corresponding runner.
val runner = getRuntime(stage.processor.target)
runner.load(stage)
val processor = this.processors[stage.processorURI]!!
val runner = getRuntime(processor.target)
runner.load(processor, stage)

// Find all the readers in the stage.
stage.getReaders().forEach { this.readers[it] = runner }
stage.getReaders(processor).forEach { this.readers[it] = runner }
}

/** Execute all stages in all the runtimes. */
Expand Down
33 changes: 1 addition & 32 deletions src/main/kotlin/extensions/File.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,15 @@ import java.io.File
import org.apache.jena.ontology.OntModelSpec
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import org.apache.jena.vocabulary.OWL
import technology.idlab.util.Log

/**
* Read a model from a file and recursively import all referenced ontologies based on <owl:import>
* statements.
*/
internal fun File.readModelRecursively(): Model {
val result = ModelFactory.createDefaultModel()

val onthology = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM)
Log.shared.info("Importing file://${this.absolutePath}")
onthology.read(this.toURI().toString(), "TURTLE")

// Import any referenced ontologies.
val imported: MutableSet<String> = mutableSetOf()
val iter = onthology.listStatements(null, OWL.imports, null as Resource?)
while (iter.hasNext()) {
val statement = iter.nextStatement()
val uri = statement.getObject().toString()

// Check if we still need to import the referenced ontology.
if (imported.contains(uri)) {
continue
}

// Attempt importing the dataset.
Log.shared.info("Importing $uri")
try {
result.read(uri)
} catch (e: Exception) {
Log.shared.fatal(e)
}

imported.add(uri)
}

// Import original onthology into the model.
result.add(onthology)

return result
return onthology
}
93 changes: 72 additions & 21 deletions src/main/kotlin/parser/Parser.kt
Original file line number Diff line number Diff line change
@@ -1,30 +1,81 @@
package technology.idlab.parser

import java.io.File
import technology.idlab.parser.impl.RDFParser
import org.apache.jena.ontology.OntModelSpec
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import technology.idlab.extensions.validate
import technology.idlab.parser.impl.parseDependencies
import technology.idlab.parser.impl.parsePackages
import technology.idlab.parser.impl.parsePipelines
import technology.idlab.parser.intermediate.IRDependency
import technology.idlab.parser.intermediate.IRPackage
import technology.idlab.parser.intermediate.IRPipeline
import technology.idlab.parser.intermediate.IRProcessor
import technology.idlab.parser.intermediate.IRStage
import technology.idlab.util.Log
import technology.idlab.resolver.Resolver

/*
* The Parser class provides a generic way to construct all the components of a pipeline. In theory,
* this would allow us to extend the configuration possibilities with new formats, such as JSON or
* YAML.
/**
* Parse an RDF file into an intermediate representation, and validate it against the ontology and
* SHACL shapes.
*/
abstract class Parser {
/* Retrieve all the declared processors in the file. */
abstract fun processors(): List<IRProcessor>

/* Retrieve all the declared stages in the file. */
abstract fun stages(): List<IRStage>

companion object {
/* Create a parser based on the file extension. */
fun create(file: File): Parser {
return when (file.extension) {
"ttl" -> RDFParser(file)
else -> Log.shared.fatal("Unsupported file extension: ${file.extension}")
}
class Parser(file: File) {
/** The Apache Jena model. */
private val model: Model = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM)

/** The pipelines in the current configuration. */
val pipelines: List<IRPipeline>

/** The packages in the current configuration. */
val packages: List<IRPackage>

/** List of all known processors. */
val processors: List<IRProcessor>

init {
// Load the RDF-Connect ontology.
val resource = this::class.java.getResource("/pipeline.ttl")
val config = resource!!.path!!
this.load(config)

// Load the pipeline file into the parser.
this.load(file.path)

// Retrieve dependencies.
val dependencies = this.dependencies()

// Resolve all dependencies.
dependencies.forEach {
val path = Resolver.resolve(it)
this.load(path.toString())
}

// Since we updated the model, we will once again check if the SHACL shapes are valid.
this.model.validate()

// Parse the file.
this.pipelines = this.pipelines()
this.packages = this.packages()
this.processors = this.packages.map { it.processors }.flatten()
}

/** Parse the file as a list of pipelines, returning its containing stages and dependencies. */
private fun pipelines(): List<IRPipeline> {
return model.parsePipelines()
}

/** Parse the model as a list of packages, returning the provided processors inside. */
private fun packages(): List<IRPackage> {
return model.parsePackages()
}

/** Retrieve all dependencies in a given file. */
private fun dependencies(): List<IRDependency> {
return model.parseDependencies(null as Resource?)
}

/** Load an additional file into the parser. */
private fun load(path: String) {
this.model.read(path, "TURTLE")
}
}
Loading

0 comments on commit 719e5b1

Please sign in to comment.