diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt index b7e49ae3..e91b3905 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt @@ -2,15 +2,13 @@ package com.inngest.testserver import com.inngest.* -@FunctionConfig(id = "fn-follow-up", name = "My follow up function!") -@FunctionEventTrigger(event = "user.signup.completed") -@FunctionEventTrigger(event = "random-event") -class FollowupFunction : InngestFunction() { - override fun execute( - ctx: FunctionContext, - step: Step, - ): LinkedHashMap { - println("-> follow up handler called " + ctx.event.name) - return ctx.event.data - } -} + +//class FollowupFunction : InngestFunction() { +// override fun execute( +// ctx: FunctionContext, +// step: Step, +// ): LinkedHashMap { +// println("-> follow up handler called " + ctx.event.name) +// return ctx.event.data +// } +//} diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index 9fad2fbc..5a6807af 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -11,11 +11,10 @@ class ProcessAlbum : InngestFunction() { override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { return builder + .id("ProcessAlbum") .name("Process Album!") .triggerEvent("delivery/process.requested") - .triggerCron("5 0 * 8 *") - .trigger( - InngestFunctionTriggers.Cron("5 0 * 8 *")) + .trigger(InngestFunctionTriggers.Cron("5 0 * 8 *")) .batchEvents(30, Duration.ofSeconds(10)) } diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt index 2fda1400..d4ae841f 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt @@ -11,6 +11,7 @@ class RestoreFromGlacier : InngestFunction() { .id("RestoreFromGlacier") .name("Restore from Glacier") .trigger(InngestFunctionTriggers.Event("delivery/restore.requested")) + .concurrency(10, null, ConcurrencyScope.ENVIRONMENT) } override fun execute( diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt index 749db8f6..9ac5a40e 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt @@ -3,9 +3,13 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration -@FunctionConfig(id = "fn-id-slug", name = "My function!") -@FunctionEventTrigger(event = "user-signup") -class UserSignupFunction : InngestFunction() { +class ProcessUserSignup : InngestFunction() { + + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { + return builder.id("process-user-signup") + .triggerEvent("user-signup") + } + override fun execute( ctx: FunctionContext, step: Step, diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index 8c056540..d07fb1e7 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -114,6 +114,7 @@ class CommHandler( try { return Klaxon() .fieldConverter(KlaxonDuration::class, durationConverter) + .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) .toJsonString(payload) } catch (e: Exception) { println(e); diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index b2529edf..f5e29a18 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -77,11 +77,13 @@ data class StepConfig( val runtime: HashMap = hashMapOf("type" to "http"), ) -class InternalFunctionConfig @JvmOverloads constructor( +internal class InternalFunctionConfig @JvmOverloads constructor( val id: String, val name: String?, val triggers: MutableList, @Json(serializeNull = false) + val concurrency: MutableList?, + @Json(serializeNull = false) val batchEvents: BatchEvents?, val steps: Map, ) { diff --git a/inngest/src/main/kotlin/com/inngest/HttpClient.kt b/inngest/src/main/kotlin/com/inngest/HttpClient.kt index 2e54f7f3..305210b1 100644 --- a/inngest/src/main/kotlin/com/inngest/HttpClient.kt +++ b/inngest/src/main/kotlin/com/inngest/HttpClient.kt @@ -28,6 +28,7 @@ internal class HttpClient(private val clientConfig: RequestConfig) { ): okhttp3.Request { val jsonRequestBody = Klaxon() .fieldConverter(KlaxonDuration::class, durationConverter) + .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) .toJsonString(payload) val body = jsonRequestBody.toRequestBody(jsonMediaType) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index 20a6c327..8a084673 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -26,7 +26,9 @@ abstract class InngestFunction { try { return buildConfig().id!! } catch (e: Exception) { - throw InngestInvalidConfigurationException("Function id must be configured via builder") + throw InngestInvalidConfigurationException( + "Function id must be configured via builder: ${this.javaClass.name}" + ) } } diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 45b544d6..2bca05c7 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -11,6 +11,7 @@ class InngestFunctionConfigBuilder() { var id: String? = null; private var name: String? = null; private var triggers: MutableList = mutableListOf(); + private var concurrency: MutableList? = null; private var batchEvents: BatchEvents? = null; /** @@ -77,6 +78,25 @@ class InngestFunctionConfigBuilder() { return this; } + /** + * Configure step concurrency limit + * + * @param limit Maximum number of concurrent executing steps across function type + * @param key A CEL expression to apply limit using event payload properties. Example: "event.data.destinationId" + * @param scope The scope to apply the limit to. Options + */ + fun concurrency(limit: Int, key: String? = null, scope: ConcurrencyScope? = null): InngestFunctionConfigBuilder { + val c = Concurrency(limit, key, scope) + // TODO - Limit concurrency length to 2 + if (this.concurrency == null) { + this.concurrency = mutableListOf(c) + } else { + this.concurrency?.add(c) + } + println(scope?.value) + return this + } + private fun buildSteps(serveUrl: String): Map { val scheme = serveUrl.split("://")[0] return mapOf( @@ -98,15 +118,18 @@ class InngestFunctionConfigBuilder() { ) } - fun build(appId: String, serverUrl: String): InternalFunctionConfig { + internal fun build(appId: String, serverUrl: String): InternalFunctionConfig { if (id == null) { throw InngestInvalidConfigurationException("Function id must be configured via builder") } val globalId = String.format("%s-%s", appId, id) + println("concurrency") + println(concurrency) val config = InternalFunctionConfig( globalId, name, triggers, + concurrency, batchEvents, steps = buildSteps(serverUrl) ) @@ -127,7 +150,34 @@ val durationConverter = object : Converter { override fun toJson(value: Any): String = """"${(value as Duration).seconds}s"""" } -data class BatchEvents +@Target(AnnotationTarget.FIELD) +annotation class KlaxonConcurrencyScope + +val concurrencyScopeConverter = object : Converter { + override fun canConvert(cls: Class<*>): Boolean = cls.isEnum + override fun fromJson(jv: JsonValue): ConcurrencyScope = enumValueOf(jv.string!!) + override fun toJson(value: Any): String = """"${(value as ConcurrencyScope).value}"""" +} + +// TODO - Convert enum element to value, not name +enum class ConcurrencyScope(val value: String) { + ACCOUNT("account"), + ENVIRONMENT("env"), + FUNCTION("fn"), +} + +internal data class Concurrency +@JvmOverloads +constructor( + val limit: Int, + @Json(serializeNull = false) + val key: String? = null, + @Json(serializeNull = false) + @KlaxonConcurrencyScope + val scope: ConcurrencyScope? = null, +) + +internal data class BatchEvents @JvmOverloads constructor( val maxSize: Int,