Skip to content

Commit

Permalink
Add input adapter plugin type
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-kraemer committed Sep 30, 2024
1 parent 3f845be commit e47b875
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 1 deletion.
32 changes: 31 additions & 1 deletion src/main/kotlin/agent/LocalAgent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class LocalAgent(private val vertx: Vertx, val dispatcher: CoroutineDispatcher,
gaugeRetries.labels(exec.serviceId).inc()
}
withTimeout(exec.maxRuntime, "maximum runtime", exec.serviceId) {
execute(exec, processChain.id, runNumber, executor, contextWrapper) { p ->
val modifiedExec = applyInputAdapters(exec, processChain)
execute(modifiedExec, processChain.id, runNumber, executor, contextWrapper) { p ->
val step = 1.0 / processChain.executables.size
setProgress(step * index + step * p)
}
Expand Down Expand Up @@ -223,6 +224,35 @@ class LocalAgent(private val vertx: Vertx, val dispatcher: CoroutineDispatcher,
coroutineContext.cancel()
}

/**
* Applies any configured input adapter plugin to the given [executable].
* Returns the modified executable, or the original one if no matching
* input adapters were found.
*/
private suspend fun applyInputAdapters(executable: Executable, processChain: ProcessChain): Executable {
var changed = false

val newArguments = executable.arguments.flatMap { arg ->
if (arg.type == Argument.Type.INPUT) {
val inputAdapter = pluginRegistry.findInputAdapter(arg.dataType)
if (inputAdapter == null) {
listOf(arg)
} else {
changed = true
inputAdapter.call(arg, executable, processChain, vertx)
}
} else {
listOf(arg)
}
}

return if (changed) {
executable.copy(arguments = newArguments)
} else {
executable
}
}

private suspend fun execute(exec: Executable, processChainId: String,
runNumber: Long, executor: ExecutorService, vertx: VertxContextWrapper,
progressUpdater: ((Double) -> Unit)? = null) {
Expand Down
8 changes: 8 additions & 0 deletions src/main/kotlin/db/PluginRegistry.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import model.plugins.InitializerPlugin
import model.plugins.InputAdapterPlugin
import model.plugins.OutputAdapterPlugin
import model.plugins.Plugin
import model.plugins.ProcessChainAdapterPlugin
Expand All @@ -16,6 +17,8 @@ import model.plugins.SetupAdapterPlugin
class PluginRegistry(private val compiledPlugins: List<Plugin>) {
private val initializers = compiledPlugins.filterIsInstance<InitializerPlugin>()
.toResolved()
private val inputAdapters = compiledPlugins.filterIsInstance<InputAdapterPlugin>()
.associateBy { it.supportedDataType }
private val outputAdapters = compiledPlugins.filterIsInstance<OutputAdapterPlugin>()
.associateBy { it.supportedDataType }
private val processChainAdapters = compiledPlugins
Expand All @@ -42,6 +45,11 @@ class PluginRegistry(private val compiledPlugins: List<Plugin>) {
*/
fun getInitializers() = initializers

/**
* Get an input adapter that supports the given [dataType]
*/
fun findInputAdapter(dataType: String) = inputAdapters[dataType]

/**
* Get an output adapter that supports the given [dataType]
*/
Expand Down
5 changes: 5 additions & 0 deletions src/main/kotlin/db/PluginRegistryFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import io.vertx.kotlin.coroutines.coAwait
import model.plugins.InitializerPlugin
import model.plugins.InputAdapterPlugin
import model.plugins.OutputAdapterPlugin
import model.plugins.Plugin
import model.plugins.ProcessChainAdapterPlugin
Expand All @@ -16,13 +17,15 @@ import model.plugins.ProgressEstimatorPlugin
import model.plugins.RuntimePlugin
import model.plugins.SetupAdapterPlugin
import model.plugins.initializerPluginTemplate
import model.plugins.inputAdapterPluginTemplate
import model.plugins.outputAdapterPluginTemplate
import model.plugins.processChainAdapterPluginTemplate
import model.plugins.processChainConsistencyCheckerPluginTemplate
import model.plugins.progressEstimatorPluginTemplate
import model.plugins.runtimePluginTemplate
import model.plugins.setupAdapterPluginTemplate
import model.plugins.wrapPluginFunction
import model.processchain.Argument
import model.processchain.ProcessChain
import model.setup.Setup
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -236,6 +239,8 @@ object PluginRegistryFactory {
return when (plugin) {
is InitializerPlugin -> plugin.copy(compiledFunction = wrapPluginFunction(
f as KFunction<Unit>, ::initializerPluginTemplate.parameters))
is InputAdapterPlugin -> plugin.copy(compiledFunction = wrapPluginFunction(
f as KFunction<List<Argument>>, ::inputAdapterPluginTemplate.parameters))
is OutputAdapterPlugin -> plugin.copy(compiledFunction = wrapPluginFunction(
f as KFunction<List<Any>>, ::outputAdapterPluginTemplate.parameters))
is ProcessChainAdapterPlugin -> plugin.copy(compiledFunction = wrapPluginFunction(
Expand Down
65 changes: 65 additions & 0 deletions src/main/kotlin/model/plugins/InputAdapterPlugin.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package model.plugins

import com.fasterxml.jackson.annotation.JsonIgnore
import io.vertx.core.Vertx
import model.processchain.Argument
import model.processchain.Executable
import model.processchain.ProcessChain
import kotlin.reflect.KFunction
import kotlin.reflect.full.callSuspend

/**
* An input adapter plugin is a function that can manipulate the inputs of
* services depending on their data type (see [model.metadata.ServiceParameter.dataType]
* and [model.processchain.Argument.dataType]). The function has the following
* signature:
*
* suspend fun myInputAdapter(output: model.processchain.Argument,
* processChain: model.processchain.ProcessChain,
* vertx: io.vertx.core.Vertx): List<model.processchain.Argument>
*
* It takes an input argument extracted from an executable, the unmodified
* executable itself, the corresponding process chain, and the Vert.x instance.
* It returns a list of new arguments that will replace the input argument (in
* other words: the input argument will be removed from the process chain and
* the returned arguments will be inserted at the same position). This list can
* be empty to just remove the input argument.
*
* In contrast to a [ProcessChainAdapterPlugin], this type of plugin will be
* called on the local system where the service will be executed. This allows
* you to access local resources such as the file system.
*
* If required, the function can be a suspend function.
*/
data class InputAdapterPlugin(
override val name: String,
override val scriptFile: String,
override val version: String? = null,

/**
* The input data type this plugin supports
*/
val supportedDataType: String,

/**
* The compiled plugin
*/
@JsonIgnore
override val compiledFunction: KFunction<List<Argument>> = throwPluginNeedsCompile()
) : Plugin

@Suppress("UNUSED_PARAMETER")
internal fun inputAdapterPluginTemplate(input: Argument, executable: Executable,
processChain: ProcessChain, vertx: Vertx): List<Argument> {
throw NotImplementedError("This is just a template specifying the " +
"function signature of an input adapter plugin")
}

suspend fun InputAdapterPlugin.call(input: Argument, executable: Executable,
processChain: ProcessChain, vertx: Vertx): List<Argument> {
return if (this.compiledFunction.isSuspend) {
this.compiledFunction.callSuspend(input, executable, processChain, vertx)
} else {
this.compiledFunction.call(input, executable, processChain, vertx)
}
}
1 change: 1 addition & 0 deletions src/main/kotlin/model/plugins/Plugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import kotlin.reflect.jvm.javaType
property = "type")
@JsonSubTypes(
JsonSubTypes.Type(value = InitializerPlugin::class, name = "initializer"),
JsonSubTypes.Type(value = InputAdapterPlugin::class, name = "inputAdapter"),
JsonSubTypes.Type(value = OutputAdapterPlugin::class, name = "outputAdapter"),
JsonSubTypes.Type(value = ProcessChainAdapterPlugin::class, name = "processChainAdapter"),
JsonSubTypes.Type(value = ProcessChainConsistencyCheckerPlugin::class, name = "processChainConsistencyChecker"),
Expand Down
55 changes: 55 additions & 0 deletions src/test/kotlin/agent/LocalAgentTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import model.metadata.Service
import model.plugins.InputAdapterPlugin
import model.plugins.ProgressEstimatorPlugin
import model.processchain.Argument
import model.processchain.ArgumentVariable
Expand Down Expand Up @@ -815,4 +816,58 @@ class LocalAgentTest : AgentTest() {
fun maxInactivityRetry(vertx: Vertx, ctx: VertxTestContext) {
doMaxInactivity(vertx, ctx, RetryPolicy(3), 3)
}

/**
* Test if the agent calls an input adapter plugin
*/
@Test
fun inputAdapter(vertx: Vertx, ctx: VertxTestContext) {
val supportedDataType = "foobar"

val customInputAdapter = spyk(object {
@Suppress("UNUSED_PARAMETER")
fun execute(input: Argument, executable: Executable,
processChain: ProcessChain, vertx: Vertx): List<Argument> {
return listOf(input.copy(label = "-a"))
}
})

val pluginRegistry = mockk<PluginRegistry>()
mockkObject(PluginRegistryFactory)
every { PluginRegistryFactory.create() } returns pluginRegistry
every { pluginRegistry.findInputAdapter(supportedDataType) } returns InputAdapterPlugin(
name = "foobar",
scriptFile = "",
supportedDataType = supportedDataType,
compiledFunction = customInputAdapter::execute
)
every { pluginRegistry.findProgressEstimator(any()) } returns null

mockkConstructor(OtherRuntime::class)
every { anyConstructed<OtherRuntime>().execute(any(), any() as OutputCollector) } just Runs

val inputArg = Argument(
variable = ArgumentVariable("id", "myValue"),
type = Argument.Type.INPUT,
dataType = supportedDataType
)
val modifiedArg = inputArg.copy(label = "-a")
val exec = Executable(path = "ls", serviceId = "ls",
arguments = listOf(inputArg))
val modifiedExec = exec.copy(arguments = listOf(modifiedArg))
val processChain = ProcessChain(executables = listOf(exec))

val agent = createAgent(vertx)

CoroutineScope(vertx.dispatcher()).launch {
ctx.coVerify {
agent.execute(processChain, 1)
verify(exactly = 1) {
customInputAdapter.execute(inputArg, exec, processChain, any())
anyConstructed<OtherRuntime>().execute(modifiedExec, any() as OutputCollector)
}
}
ctx.completeNow()
}
}
}
49 changes: 49 additions & 0 deletions src/test/kotlin/db/PluginRegistryTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.vertx.kotlin.coroutines.dispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import model.plugins.InitializerPlugin
import model.plugins.InputAdapterPlugin
import model.plugins.OutputAdapterPlugin
import model.plugins.ProcessChainAdapterPlugin
import model.plugins.ProgressEstimatorPlugin
Expand Down Expand Up @@ -112,6 +113,54 @@ class PluginRegistryTest {
assertThat(pr.getInitializers()).isEqualTo(listOf(init2, init3, init1))
}

/**
* Test if a simple input adapter can be compiled and executed
*/
@Test
fun compileDummyInputAdapter(vertx: Vertx, ctx: VertxTestContext) {
CoroutineScope(vertx.dispatcher()).launch {
val config = jsonObjectOf(
ConfigConstants.PLUGINS to "src/**/db/dummyInputAdapter.yaml"
)
PluginRegistryFactory.initialize(vertx, config)

val pr = PluginRegistryFactory.create()
val adapter = pr.findInputAdapter("dummy")
ctx.coVerify {
val inputArg = Argument(
variable = ArgumentVariable("id", "myValue"),
type = Argument.Type.OUTPUT
)
val exec = Executable(
path = "myService",
serviceId = "myService",
arguments = listOf(inputArg)
)

assertThat(adapter).isNotNull
val result = adapter!!.call(inputArg, exec, ProcessChain(), vertx)
assertThat(result).hasSize(2)
assertThat(result[0].label).isEqualTo("-a")
assertThat(result[1].label).isEqualTo("-b")
}

ctx.completeNow()
}
}

/**
* Test if [PluginRegistry.findInputAdapter] works correctly
*/
@Test
fun findInputAdapter() {
val adapter1 = InputAdapterPlugin("a", "file.kts", supportedDataType = "dataType")
val adapter2 = InputAdapterPlugin("b", "file2.kts", supportedDataType = "dataType")
val adapter3 = InputAdapterPlugin("c", "file3.kts", supportedDataType = "custom")
val pr = PluginRegistry(listOf(adapter1, adapter2, adapter3))
assertThat(pr.findInputAdapter("dataType")).isSameAs(adapter2)
assertThat(pr.findInputAdapter("wrongDataType")).isNull()
}

private fun doCompileDummyOutputAdapter(vertx: Vertx, ctx: VertxTestContext, name: String) {
CoroutineScope(vertx.dispatcher()).launch {
val config = jsonObjectOf(
Expand Down
8 changes: 8 additions & 0 deletions src/test/resources/db/dummyInputAdapter.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import io.vertx.core.Vertx
import model.processchain.Argument
import model.processchain.ProcessChain

fun dummyInputAdapter(input: Argument, processChain: ProcessChain,
vertx: Vertx): List<Argument> {
return listOf(input.copy(label = "-a"), input.copy(label = "-b"))
}
5 changes: 5 additions & 0 deletions src/test/resources/db/dummyInputAdapter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# An input adapter that creates a dummy list of files
- name: dummyInputAdapter
type: inputAdapter
scriptFile: db/dummyInputAdapter.kts
supportedDataType: dummy

0 comments on commit e47b875

Please sign in to comment.