diff --git a/src/main/kotlin/bridge/MemoryReader.kt b/src/main/kotlin/bridge/MemoryReader.kt index db90786..582ed6f 100644 --- a/src/main/kotlin/bridge/MemoryReader.kt +++ b/src/main/kotlin/bridge/MemoryReader.kt @@ -18,7 +18,6 @@ class MemoryReader: Reader { } override fun readSync(): Reader.Result { - Log.shared.debug("Reading bytes") val channel = this.channel ?: Log.shared.fatal("Channel not set") val result = runBlocking { channel.receiveCatching() } @@ -38,7 +37,6 @@ class MemoryReader: Reader { } override suspend fun read(): Reader.Result { - Log.shared.debug("Reading bytes") val channel = this.channel ?: Log.shared.fatal("Channel not set") return try { diff --git a/src/main/kotlin/bridge/MemoryWriter.kt b/src/main/kotlin/bridge/MemoryWriter.kt index a514359..08f2d0f 100644 --- a/src/main/kotlin/bridge/MemoryWriter.kt +++ b/src/main/kotlin/bridge/MemoryWriter.kt @@ -18,10 +18,7 @@ class MemoryWriter: Writer { override fun pushSync(value: ByteArray) { val channel = this.channel ?: Log.shared.fatal("Channel not set") - - Log.shared.debug("Pushing ${value.size} bytes") runBlocking { channel.send(value) } - Log.shared.debug("Done") } override suspend fun push(value: ByteArray) { diff --git a/src/main/kotlin/runner/Pipeline.kt b/src/main/kotlin/runner/Pipeline.kt index 418a2d7..f8f6d66 100644 --- a/src/main/kotlin/runner/Pipeline.kt +++ b/src/main/kotlin/runner/Pipeline.kt @@ -15,18 +15,8 @@ class Pipeline(config: File) { * all are done. */ fun executeSync() { - // Run setup phase. - Log.shared.info("Running setup phase") - runBlocking { - processors.map { - async { it.setup() } - }.map { - it.await() - } - } + Log.shared.info("Executing pipeline.") - // Run execution phase. - Log.shared.info("Running execution phase") processors.map { thread { it.exec() } }.map { diff --git a/src/main/kotlin/runner/Processor.kt b/src/main/kotlin/runner/Processor.kt index 2ec4852..fb1dcf0 100644 --- a/src/main/kotlin/runner/Processor.kt +++ b/src/main/kotlin/runner/Processor.kt @@ -39,7 +39,5 @@ abstract class Processor { return Optional.ofNullable(result) as Optional } - open fun setup() {} - - open fun exec() {} + abstract fun exec() } diff --git a/src/test/resources/pipelines/range_reporter.ttl b/src/test/resources/pipelines/range_reporter.ttl index 4296c9d..4eabb59 100644 --- a/src/test/resources/pipelines/range_reporter.ttl +++ b/src/test/resources/pipelines/range_reporter.ttl @@ -10,17 +10,27 @@ # Include processor definitions. <> owl:imports <../pipeline.ttl>, + <../processors/square.ttl>, <../processors/range.ttl>, <../processors/reporter.ttl>. -# Define a memory bridge. - a jvm:MemoryChannelReader. - a jvm:MemoryChannelWriter. +# Range -> Square + a jvm:MemoryChannelReader. + a jvm:MemoryChannelWriter. - + a jvm:MemoryChannel; - jvm:reader ; - jvm:writer . + jvm:reader ; + jvm:writer . + + a jvm:MemoryChannelReader. + a jvm:MemoryChannelWriter. + +# Square -> Reporter + + a jvm:MemoryChannel; + jvm:reader ; + jvm:writer . # Define a range processor. [] @@ -28,9 +38,16 @@ jvm:start "0"^^xsd:integer; jvm:end "10"^^xsd:integer; jvm:step "1"^^xsd:integer; - jvm:output . + jvm:output . + +# Define a square processor. +[] + a jvm:Square; + jvm:input ; + jvm:output . # Define a reporter. [] a jvm:Reporter; - jvm:input . + jvm:input . + diff --git a/src/test/resources/processors/square.ttl b/src/test/resources/processors/square.ttl new file mode 100644 index 0000000..db52fcc --- /dev/null +++ b/src/test/resources/processors/square.ttl @@ -0,0 +1,32 @@ +@prefix jvm: . +@prefix fno: . +@prefix fnom: . +@prefix xsd: . +@prefix owl: . +@prefix : . +@prefix sh: . +@prefix rdf: . + +<> owl:imports <../pipeline.ttl>. + +jvm:Square a jvm:Processor; + jvm:file <../sources/Square.java>; + jvm:language "Java". + +[] a sh:NodeShape; + sh:targetClass jvm:Square; + sh:property [ + sh:path jvm:input; + sh:name "input"; + sh:class jvm:ChannelReader; + sh:minCount 1; + sh:maxCount 1; + ], [ + sh:path jvm:output; + sh:name "output"; + sh:class jvm:ChannelWriter; + sh:minCount 1; + sh:maxCount 1; + ]; + sh:closed true; + sh:ignoredProperties (rdf:type). diff --git a/src/test/resources/sources/Range.java b/src/test/resources/sources/Range.java index 890d9b1..97c83c2 100644 --- a/src/test/resources/sources/Range.java +++ b/src/test/resources/sources/Range.java @@ -25,14 +25,10 @@ public Range(Map args) { } public void exec() { - log.info("Initializing emitting loop."); - for (int i = start; i < end; i += step) { - log.info("Emitting " + i); + log.info(Integer.toString(i)); writer.pushSync(Integer.toString(i).getBytes()); } - - log.info("Closing outgoing channel."); writer.close(); } } diff --git a/src/test/resources/sources/Reporter.java b/src/test/resources/sources/Reporter.java index ba776e3..b1a475e 100644 --- a/src/test/resources/sources/Reporter.java +++ b/src/test/resources/sources/Reporter.java @@ -21,7 +21,7 @@ public void exec() { break; } - log.info("Received item: " + new String(result.getValue())); + log.info(new String(result.getValue())); } } } diff --git a/src/test/resources/sources/Square.java b/src/test/resources/sources/Square.java new file mode 100644 index 0000000..ca2da01 --- /dev/null +++ b/src/test/resources/sources/Square.java @@ -0,0 +1,39 @@ +import bridge.Reader; +import bridge.Writer; +import java.util.Map; + +import technology.idlab.logging.Log; +import technology.idlab.runner.Processor; + +public class Square extends Processor { + // Channels + private final Reader reader; + private final Writer writer; + + public Square(Map args) { + // Call super constructor. + super(args); + + // Channels + this.reader = this.getArgument("input"); + this.writer = this.getArgument("output"); + } + + public void exec() { + while (true) { + Reader.Result data = reader.readSync(); + + if (data.isClosed()) { + break; + } + + int value = Integer.parseInt(new String(data.getValue())); + int square = value * value; + byte[] result = Integer.toString(square).getBytes(); + + log.info(value + " * " + value + " = " + square); + writer.pushSync(result); + } + writer.close(); + } +}