diff --git a/src/main/kotlin/agent/LocalAgent.kt b/src/main/kotlin/agent/LocalAgent.kt index 5d852027..ae258cfc 100644 --- a/src/main/kotlin/agent/LocalAgent.kt +++ b/src/main/kotlin/agent/LocalAgent.kt @@ -175,7 +175,8 @@ class LocalAgent(private val vertx: Vertx, val dispatcher: CoroutineDispatcher, gaugeRetries.labels(exec.serviceId).inc() } withTimeout(exec.maxRuntime, "maximum runtime", exec.serviceId) { - execute(exec, processChain.id, runNumber, executor, contextWrapper) { p -> + val modifiedExec = applyInputAdapters(exec, processChain) + execute(modifiedExec, processChain.id, runNumber, executor, contextWrapper) { p -> val step = 1.0 / processChain.executables.size setProgress(step * index + step * p) } @@ -223,6 +224,35 @@ class LocalAgent(private val vertx: Vertx, val dispatcher: CoroutineDispatcher, coroutineContext.cancel() } + /** + * Applies any configured input adapter plugin to the given [executable]. + * Returns the modified executable, or the original one if no matching + * input adapters were found. + */ + private suspend fun applyInputAdapters(executable: Executable, processChain: ProcessChain): Executable { + var changed = false + + val newArguments = executable.arguments.flatMap { arg -> + if (arg.type == Argument.Type.INPUT) { + val inputAdapter = pluginRegistry.findInputAdapter(arg.dataType) + if (inputAdapter == null) { + listOf(arg) + } else { + changed = true + inputAdapter.call(arg, executable, processChain, vertx) + } + } else { + listOf(arg) + } + } + + return if (changed) { + executable.copy(arguments = newArguments) + } else { + executable + } + } + private suspend fun execute(exec: Executable, processChainId: String, runNumber: Long, executor: ExecutorService, vertx: VertxContextWrapper, progressUpdater: ((Double) -> Unit)? = null) { diff --git a/src/main/kotlin/db/PluginRegistry.kt b/src/main/kotlin/db/PluginRegistry.kt index 6d21a8c2..32fe4b29 100644 --- a/src/main/kotlin/db/PluginRegistry.kt +++ b/src/main/kotlin/db/PluginRegistry.kt @@ -1,6 +1,7 @@ package db import model.plugins.InitializerPlugin +import model.plugins.InputAdapterPlugin import model.plugins.OutputAdapterPlugin import model.plugins.Plugin import model.plugins.ProcessChainAdapterPlugin @@ -16,6 +17,8 @@ import model.plugins.SetupAdapterPlugin class PluginRegistry(private val compiledPlugins: List) { private val initializers = compiledPlugins.filterIsInstance() .toResolved() + private val inputAdapters = compiledPlugins.filterIsInstance() + .associateBy { it.supportedDataType } private val outputAdapters = compiledPlugins.filterIsInstance() .associateBy { it.supportedDataType } private val processChainAdapters = compiledPlugins @@ -42,6 +45,11 @@ class PluginRegistry(private val compiledPlugins: List) { */ fun getInitializers() = initializers + /** + * Get an input adapter that supports the given [dataType] + */ + fun findInputAdapter(dataType: String) = inputAdapters[dataType] + /** * Get an output adapter that supports the given [dataType] */ diff --git a/src/main/kotlin/db/PluginRegistryFactory.kt b/src/main/kotlin/db/PluginRegistryFactory.kt index 9ff46871..85622b50 100644 --- a/src/main/kotlin/db/PluginRegistryFactory.kt +++ b/src/main/kotlin/db/PluginRegistryFactory.kt @@ -8,6 +8,7 @@ import io.vertx.core.json.JsonArray import io.vertx.core.json.JsonObject import io.vertx.kotlin.coroutines.coAwait import model.plugins.InitializerPlugin +import model.plugins.InputAdapterPlugin import model.plugins.OutputAdapterPlugin import model.plugins.Plugin import model.plugins.ProcessChainAdapterPlugin @@ -16,6 +17,7 @@ import model.plugins.ProgressEstimatorPlugin import model.plugins.RuntimePlugin import model.plugins.SetupAdapterPlugin import model.plugins.initializerPluginTemplate +import model.plugins.inputAdapterPluginTemplate import model.plugins.outputAdapterPluginTemplate import model.plugins.processChainAdapterPluginTemplate import model.plugins.processChainConsistencyCheckerPluginTemplate @@ -23,6 +25,7 @@ import model.plugins.progressEstimatorPluginTemplate import model.plugins.runtimePluginTemplate import model.plugins.setupAdapterPluginTemplate import model.plugins.wrapPluginFunction +import model.processchain.Argument import model.processchain.ProcessChain import model.setup.Setup import org.slf4j.LoggerFactory @@ -236,6 +239,8 @@ object PluginRegistryFactory { return when (plugin) { is InitializerPlugin -> plugin.copy(compiledFunction = wrapPluginFunction( f as KFunction, ::initializerPluginTemplate.parameters)) + is InputAdapterPlugin -> plugin.copy(compiledFunction = wrapPluginFunction( + f as KFunction>, ::inputAdapterPluginTemplate.parameters)) is OutputAdapterPlugin -> plugin.copy(compiledFunction = wrapPluginFunction( f as KFunction>, ::outputAdapterPluginTemplate.parameters)) is ProcessChainAdapterPlugin -> plugin.copy(compiledFunction = wrapPluginFunction( diff --git a/src/main/kotlin/model/plugins/InputAdapterPlugin.kt b/src/main/kotlin/model/plugins/InputAdapterPlugin.kt new file mode 100644 index 00000000..aae829a8 --- /dev/null +++ b/src/main/kotlin/model/plugins/InputAdapterPlugin.kt @@ -0,0 +1,65 @@ +package model.plugins + +import com.fasterxml.jackson.annotation.JsonIgnore +import io.vertx.core.Vertx +import model.processchain.Argument +import model.processchain.Executable +import model.processchain.ProcessChain +import kotlin.reflect.KFunction +import kotlin.reflect.full.callSuspend + +/** + * An input adapter plugin is a function that can manipulate the inputs of + * services depending on their data type (see [model.metadata.ServiceParameter.dataType] + * and [model.processchain.Argument.dataType]). The function has the following + * signature: + * + * suspend fun myInputAdapter(output: model.processchain.Argument, + * processChain: model.processchain.ProcessChain, + * vertx: io.vertx.core.Vertx): List + * + * It takes an input argument extracted from an executable, the unmodified + * executable itself, the corresponding process chain, and the Vert.x instance. + * It returns a list of new arguments that will replace the input argument (in + * other words: the input argument will be removed from the process chain and + * the returned arguments will be inserted at the same position). This list can + * be empty to just remove the input argument. + * + * In contrast to a [ProcessChainAdapterPlugin], this type of plugin will be + * called on the local system where the service will be executed. This allows + * you to access local resources such as the file system. + * + * If required, the function can be a suspend function. + */ +data class InputAdapterPlugin( + override val name: String, + override val scriptFile: String, + override val version: String? = null, + + /** + * The input data type this plugin supports + */ + val supportedDataType: String, + + /** + * The compiled plugin + */ + @JsonIgnore + override val compiledFunction: KFunction> = throwPluginNeedsCompile() +) : Plugin + +@Suppress("UNUSED_PARAMETER") +internal fun inputAdapterPluginTemplate(input: Argument, executable: Executable, + processChain: ProcessChain, vertx: Vertx): List { + throw NotImplementedError("This is just a template specifying the " + + "function signature of an input adapter plugin") +} + +suspend fun InputAdapterPlugin.call(input: Argument, executable: Executable, + processChain: ProcessChain, vertx: Vertx): List { + return if (this.compiledFunction.isSuspend) { + this.compiledFunction.callSuspend(input, executable, processChain, vertx) + } else { + this.compiledFunction.call(input, executable, processChain, vertx) + } +} diff --git a/src/main/kotlin/model/plugins/Plugin.kt b/src/main/kotlin/model/plugins/Plugin.kt index cf708814..0d8d02d6 100644 --- a/src/main/kotlin/model/plugins/Plugin.kt +++ b/src/main/kotlin/model/plugins/Plugin.kt @@ -19,6 +19,7 @@ import kotlin.reflect.jvm.javaType property = "type") @JsonSubTypes( JsonSubTypes.Type(value = InitializerPlugin::class, name = "initializer"), + JsonSubTypes.Type(value = InputAdapterPlugin::class, name = "inputAdapter"), JsonSubTypes.Type(value = OutputAdapterPlugin::class, name = "outputAdapter"), JsonSubTypes.Type(value = ProcessChainAdapterPlugin::class, name = "processChainAdapter"), JsonSubTypes.Type(value = ProcessChainConsistencyCheckerPlugin::class, name = "processChainConsistencyChecker"), diff --git a/src/test/kotlin/agent/LocalAgentTest.kt b/src/test/kotlin/agent/LocalAgentTest.kt index 9ef8926b..e8fb3b77 100644 --- a/src/test/kotlin/agent/LocalAgentTest.kt +++ b/src/test/kotlin/agent/LocalAgentTest.kt @@ -33,6 +33,7 @@ import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import model.metadata.Service +import model.plugins.InputAdapterPlugin import model.plugins.ProgressEstimatorPlugin import model.processchain.Argument import model.processchain.ArgumentVariable @@ -815,4 +816,58 @@ class LocalAgentTest : AgentTest() { fun maxInactivityRetry(vertx: Vertx, ctx: VertxTestContext) { doMaxInactivity(vertx, ctx, RetryPolicy(3), 3) } + + /** + * Test if the agent calls an input adapter plugin + */ + @Test + fun inputAdapter(vertx: Vertx, ctx: VertxTestContext) { + val supportedDataType = "foobar" + + val customInputAdapter = spyk(object { + @Suppress("UNUSED_PARAMETER") + fun execute(input: Argument, executable: Executable, + processChain: ProcessChain, vertx: Vertx): List { + return listOf(input.copy(label = "-a")) + } + }) + + val pluginRegistry = mockk() + mockkObject(PluginRegistryFactory) + every { PluginRegistryFactory.create() } returns pluginRegistry + every { pluginRegistry.findInputAdapter(supportedDataType) } returns InputAdapterPlugin( + name = "foobar", + scriptFile = "", + supportedDataType = supportedDataType, + compiledFunction = customInputAdapter::execute + ) + every { pluginRegistry.findProgressEstimator(any()) } returns null + + mockkConstructor(OtherRuntime::class) + every { anyConstructed().execute(any(), any() as OutputCollector) } just Runs + + val inputArg = Argument( + variable = ArgumentVariable("id", "myValue"), + type = Argument.Type.INPUT, + dataType = supportedDataType + ) + val modifiedArg = inputArg.copy(label = "-a") + val exec = Executable(path = "ls", serviceId = "ls", + arguments = listOf(inputArg)) + val modifiedExec = exec.copy(arguments = listOf(modifiedArg)) + val processChain = ProcessChain(executables = listOf(exec)) + + val agent = createAgent(vertx) + + CoroutineScope(vertx.dispatcher()).launch { + ctx.coVerify { + agent.execute(processChain, 1) + verify(exactly = 1) { + customInputAdapter.execute(inputArg, exec, processChain, any()) + anyConstructed().execute(modifiedExec, any() as OutputCollector) + } + } + ctx.completeNow() + } + } } diff --git a/src/test/kotlin/db/PluginRegistryTest.kt b/src/test/kotlin/db/PluginRegistryTest.kt index fdca24dd..8657bb02 100644 --- a/src/test/kotlin/db/PluginRegistryTest.kt +++ b/src/test/kotlin/db/PluginRegistryTest.kt @@ -15,6 +15,7 @@ import io.vertx.kotlin.coroutines.dispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import model.plugins.InitializerPlugin +import model.plugins.InputAdapterPlugin import model.plugins.OutputAdapterPlugin import model.plugins.ProcessChainAdapterPlugin import model.plugins.ProgressEstimatorPlugin @@ -112,6 +113,54 @@ class PluginRegistryTest { assertThat(pr.getInitializers()).isEqualTo(listOf(init2, init3, init1)) } + /** + * Test if a simple input adapter can be compiled and executed + */ + @Test + fun compileDummyInputAdapter(vertx: Vertx, ctx: VertxTestContext) { + CoroutineScope(vertx.dispatcher()).launch { + val config = jsonObjectOf( + ConfigConstants.PLUGINS to "src/**/db/dummyInputAdapter.yaml" + ) + PluginRegistryFactory.initialize(vertx, config) + + val pr = PluginRegistryFactory.create() + val adapter = pr.findInputAdapter("dummy") + ctx.coVerify { + val inputArg = Argument( + variable = ArgumentVariable("id", "myValue"), + type = Argument.Type.OUTPUT + ) + val exec = Executable( + path = "myService", + serviceId = "myService", + arguments = listOf(inputArg) + ) + + assertThat(adapter).isNotNull + val result = adapter!!.call(inputArg, exec, ProcessChain(), vertx) + assertThat(result).hasSize(2) + assertThat(result[0].label).isEqualTo("-a") + assertThat(result[1].label).isEqualTo("-b") + } + + ctx.completeNow() + } + } + + /** + * Test if [PluginRegistry.findInputAdapter] works correctly + */ + @Test + fun findInputAdapter() { + val adapter1 = InputAdapterPlugin("a", "file.kts", supportedDataType = "dataType") + val adapter2 = InputAdapterPlugin("b", "file2.kts", supportedDataType = "dataType") + val adapter3 = InputAdapterPlugin("c", "file3.kts", supportedDataType = "custom") + val pr = PluginRegistry(listOf(adapter1, adapter2, adapter3)) + assertThat(pr.findInputAdapter("dataType")).isSameAs(adapter2) + assertThat(pr.findInputAdapter("wrongDataType")).isNull() + } + private fun doCompileDummyOutputAdapter(vertx: Vertx, ctx: VertxTestContext, name: String) { CoroutineScope(vertx.dispatcher()).launch { val config = jsonObjectOf( diff --git a/src/test/resources/db/dummyInputAdapter.kts b/src/test/resources/db/dummyInputAdapter.kts new file mode 100644 index 00000000..86eb7790 --- /dev/null +++ b/src/test/resources/db/dummyInputAdapter.kts @@ -0,0 +1,8 @@ +import io.vertx.core.Vertx +import model.processchain.Argument +import model.processchain.ProcessChain + +fun dummyInputAdapter(input: Argument, processChain: ProcessChain, + vertx: Vertx): List { + return listOf(input.copy(label = "-a"), input.copy(label = "-b")) +} diff --git a/src/test/resources/db/dummyInputAdapter.yaml b/src/test/resources/db/dummyInputAdapter.yaml new file mode 100644 index 00000000..c6a69554 --- /dev/null +++ b/src/test/resources/db/dummyInputAdapter.yaml @@ -0,0 +1,5 @@ +# An input adapter that creates a dummy list of files +- name: dummyInputAdapter + type: inputAdapter + scriptFile: db/dummyInputAdapter.kts + supportedDataType: dummy