From 939173a8c5cf2699fd26df406fe89f9a1e494fa0 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Wed, 17 Jul 2024 18:28:26 -0700 Subject: [PATCH 01/12] Config options builder WIP --- .../com/inngest/testserver/ProcessAlbum.kt | 8 ++ inngest/src/main/kotlin/com/inngest/Comm.kt | 6 +- .../src/main/kotlin/com/inngest/Function.kt | 68 ++++++++------- .../kotlin/com/inngest/InngestFunction.kt | 84 ++++++++++++++++--- 4 files changed, 120 insertions(+), 46 deletions(-) diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index e19a508c..0f5cd0e8 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -6,6 +6,14 @@ import java.time.Duration @FunctionConfig(id = "ProcessAlbum", name = "ProcessAlbum") @FunctionEventTrigger(event = "delivery/process.requested") class ProcessAlbum : InngestFunction() { + // override required +// override val id = "ProcessAlbum" + override fun config(builder: Builder): Builder { + return builder + .name("Process Album!") + .batchEvents(30, Duration.ofSeconds(30)) + } + override fun execute( ctx: FunctionContext, step: Step, diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index cf94427a..ad48316d 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -24,7 +24,7 @@ internal data class RegistrationRequestPayload( val appName: String, val deployType: String = "ping", val framework: String, - val functions: List = listOf(), + val functions: List> = listOf(), val sdk: String, val url: String, val v: String, @@ -112,8 +112,8 @@ class CommHandler( return mapper.writeValueAsString(requestBody) } - private fun getFunctionConfigs(): List { - val configs: MutableList = mutableListOf() + private fun getFunctionConfigs(): List> { + val configs: MutableList> = mutableListOf() functions.forEach { entry -> configs.add(entry.value.getFunctionConfig(getServeUrl(), client)) } return configs } diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index b8219eb2..6391e999 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -6,11 +6,11 @@ import java.util.function.BiFunction // IDEA: Use data classes internal data class InternalFunctionOptions( val id: String, - val name: String, + val config: Map, val triggers: Array, ) -internal data class InternalFunctionTrigger +data class InternalFunctionTrigger @JvmOverloads constructor( @Json(serializeNull = false) val event: String? = null, @@ -77,7 +77,7 @@ data class StepConfig( val runtime: HashMap = hashMapOf("type" to "http"), ) -internal data class InternalFunctionConfig( +data class InternalFunctionConfig( val id: String, val name: String, val triggers: Array, @@ -116,15 +116,15 @@ internal interface Function { // TODO: make this implement the Function interface internal open class InternalInngestFunction( - val config: InternalFunctionOptions, + val config: Map, val handler: (ctx: FunctionContext, step: Step) -> Any?, ) { - constructor(config: InternalFunctionOptions, handler: BiFunction) : this( + constructor(config: Map, handler: BiFunction) : this( config, handler.toKotlin(), ) - fun id() = config.id + fun id() = config.get("id") // TODO - Validate options and trigger @@ -217,33 +217,37 @@ internal open class InternalInngestFunction( } } - fun getFunctionConfig(serveUrl: String, client: Inngest): InternalFunctionConfig { + fun getFunctionConfig(serveUrl: String, client: Inngest): Map { // TODO use URL objects instead of strings so we can fetch things like scheme val scheme = serveUrl.split("://")[0] - return InternalFunctionConfig( - id = String.format("%s-%s", client.appId, config.id), - name = config.name, - triggers = config.triggers, - steps = - mapOf( - "step" to - StepConfig( - id = "step", - name = "step", - retries = - mapOf( - // TODO - Pull from FunctionOptions - "attempts" to 3, - ), - runtime = - hashMapOf( - "type" to scheme, - // TODO - Create correct URL - "url" to - "$serveUrl?fnId=${config.id}&stepId=step", - ), - ), - ), - ) + + return config; + +// return InternalFunctionConfig( +// id = String.format("%s-%s", client.appId, config.id), +// name = config.config.get("name"), +// triggers = config.triggers, +// +// steps = +// mapOf( +// "step" to +// StepConfig( +// id = "step", +// name = "step", +// retries = +// mapOf( +// // TODO - Pull from FunctionOptions +// "attempts" to 3, +// ), +// runtime = +// hashMapOf( +// "type" to scheme, +// // TODO - Create correct URL +// "url" to +// "$serveUrl?fnId=${config.id}&stepId=step", +// ), +// ), +// ), +// ) } } diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index a13cb93f..b9715178 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -1,6 +1,7 @@ package com.inngest import com.beust.klaxon.Json +import java.time.Duration @Target(AnnotationTarget.CLASS) @MustBeDocumented @@ -29,7 +30,51 @@ annotation class FunctionIfTrigger( @Json(serializeNull = false) val `if`: String, ) + abstract class InngestFunction { + +// abstract val id: String; + + // TODO: Throw illegal argument exception + class Builder { + private var name: String? = null; + private var triggers: MutableList = mutableListOf(); + private var batchEvents: BatchEvents? = null; + + fun name(name: String): Builder { + this.name = name + return this + } + + fun trigger(trigger: InngestFunctionTrigger): Builder { + // TODO - Check max triggers + // TODO - Check mutually exclusive opts (cron v. event+if?) + this.triggers.add(trigger) + return this + } + + fun batchEvents(maxSize: Int, timeout: Duration, key: String? = null): Builder { + this.batchEvents = BatchEvents(maxSize, timeout, key) + return this; + } + + fun build(): Map { + return buildMap { + put("triggers", triggers) + if (name != null) { + put("name", name!!) + } + if (batchEvents != null) { + put("batchEvents", batchEvents!!.maxSize) + } + } + } + } + + open fun config(builder: Builder): Builder { + return builder + } + abstract fun execute( ctx: FunctionContext, step: Step, @@ -45,18 +90,21 @@ abstract class InngestFunction { } internal fun toInngestFunction(): InternalInngestFunction { - if (config == null || config !is FunctionConfig) { - throw Exception("FunctionConfig annotation is required to setup an InngestFunction") - } +// if (config == null || config !is FunctionConfig) { +// throw Exception("FunctionConfig annotation is required to setup an InngestFunction") +// } val triggers = buildEventTriggers() + buildCronTriggers() + buildIfTriggers() - val fnConfig = - InternalFunctionOptions( - id = config.id, - name = config.name, - triggers = triggers.toTypedArray(), - ) - - return InternalInngestFunction(fnConfig, this::execute) + val builder = Builder() + val config = this.config(builder).build() + +// val fnConfig = +// InternalFunctionOptions( +// id = id(), +// config = config, +// triggers = triggers.toTypedArray(), +// ) + + return InternalInngestFunction(config, this::execute) } // TODO: DRY this @@ -72,3 +120,17 @@ abstract class InngestFunction { this::class.annotations.filter { it.annotationClass == FunctionIfTrigger::class } .map { InternalFunctionTrigger(event = (it as FunctionIfTrigger).`if`) } } + +data class InngestFunctionTrigger +@JvmOverloads +constructor( + @Json(serializeNull = false) val event: String? = null, + @Json(name = "expression", serializeNull = false) val `if`: String? = null, + @Json(serializeNull = false) val cron: String? = null, +) + +private data class BatchEvents( + val maxSize: Int, + val timeout: Duration, + @Json(serializeNull = false) val key: String? = null +) From 65ed9c0f37d29100d21e1e8e3beafca3d8292022 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Thu, 18 Jul 2024 07:00:55 -0700 Subject: [PATCH 02/12] Fix builder json serialization --- .../com/inngest/testserver/ProcessAlbum.kt | 3 +- inngest/src/main/kotlin/com/inngest/Comm.kt | 22 ++++-- .../src/main/kotlin/com/inngest/Function.kt | 30 +++++--- .../src/main/kotlin/com/inngest/HttpClient.kt | 4 +- .../kotlin/com/inngest/InngestFunction.kt | 72 +++++++++++++------ 5 files changed, 94 insertions(+), 37 deletions(-) diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index 0f5cd0e8..61280803 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -8,9 +8,10 @@ import java.time.Duration class ProcessAlbum : InngestFunction() { // override required // override val id = "ProcessAlbum" - override fun config(builder: Builder): Builder { + override fun config(builder: InngestFunction.Builder): InngestFunction.Builder { return builder .name("Process Album!") + .trigger(InngestFunctionTrigger(event = "delivery/process.requested")) .batchEvents(30, Duration.ofSeconds(30)) } diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index c3f4a0d8..2773c4b9 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -20,11 +20,11 @@ data class ExecutionContext( val env: String, ) -internal data class RegistrationRequestPayload( +internal data class RegistrationRequestPayload @JvmOverloads constructor( val appName: String, val deployType: String = "ping", val framework: String, - val functions: List> = listOf(), + val functions: List = listOf(), val sdk: String, val url: String, val v: String, @@ -112,9 +112,21 @@ class CommHandler( return mapper.writeValueAsString(requestBody) } + private fun serializePayload(payload: Any?): String { + try { + return Klaxon() + .fieldConverter(KlaxonDuration::class, durationConverter) + .toJsonString(payload) + } catch (e: Exception) { + println(e); + return """{ "message": "failed serialization" }""" + } + + } + - private fun getFunctionConfigs(origin: String): List> { - val configs: MutableList> = mutableListOf() + private fun getFunctionConfigs(origin: String): List { + val configs: MutableList = mutableListOf() functions.forEach { entry -> configs.add(entry.value.getFunctionConfig(getServeUrl(origin), client)) } return configs } @@ -150,7 +162,7 @@ class CommHandler( fun introspect(origin: String): String { val requestPayload = getRegistrationRequestPayload(origin) - return parseRequestBody(requestPayload) + return serializePayload(requestPayload) } private fun getRegistrationRequestPayload(origin: String): RegistrationRequestPayload { diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index 6391e999..ebd8cdc3 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -77,12 +77,18 @@ data class StepConfig( val runtime: HashMap = hashMapOf("type" to "http"), ) -data class InternalFunctionConfig( +class InternalFunctionConfig @JvmOverloads constructor( val id: String, - val name: String, - val triggers: Array, + val name: String?, + val triggers: MutableList, + @Json(serializeNull = false) + val batchEvents: BatchEvents?, val steps: Map, -) +) { +} +// NOTE - This should probably be called serialized or formatted config +// as it's only used to format the config for register requests +//typealias InternalFunctionConfig = Map /** * The context for the current function run @@ -111,20 +117,22 @@ data class SendEventPayload(val event_ids: Array) internal interface Function { fun id(): String - fun config(): InternalFunctionConfig +// fun config(): InternalFunctionConfig } + // TODO: make this implement the Function interface internal open class InternalInngestFunction( - val config: Map, + private val config: InternalFunctionConfig, val handler: (ctx: FunctionContext, step: Step) -> Any?, ) { - constructor(config: Map, handler: BiFunction) : this( + constructor(config: InternalFunctionConfig, handler: BiFunction) : this( config, handler.toKotlin(), ) - fun id() = config.get("id") + // fun id() = config.get("id") + fun id() = config.id // TODO - Validate options and trigger @@ -137,7 +145,7 @@ internal open class InternalInngestFunction( val step = Step(state, client) // DEBUG - println(state) +// println(state) try { val data = handler(ctx, step) @@ -217,10 +225,12 @@ internal open class InternalInngestFunction( } } - fun getFunctionConfig(serveUrl: String, client: Inngest): Map { + fun getFunctionConfig(serveUrl: String, client: Inngest): InternalFunctionConfig { // TODO use URL objects instead of strings so we can fetch things like scheme val scheme = serveUrl.split("://")[0] + println(config) + return config; // return InternalFunctionConfig( diff --git a/inngest/src/main/kotlin/com/inngest/HttpClient.kt b/inngest/src/main/kotlin/com/inngest/HttpClient.kt index bb28fd3f..2e54f7f3 100644 --- a/inngest/src/main/kotlin/com/inngest/HttpClient.kt +++ b/inngest/src/main/kotlin/com/inngest/HttpClient.kt @@ -26,7 +26,9 @@ internal class HttpClient(private val clientConfig: RequestConfig) { payload: Any, config: RequestConfig? = null, ): okhttp3.Request { - val jsonRequestBody = Klaxon().toJsonString(payload) + val jsonRequestBody = Klaxon() + .fieldConverter(KlaxonDuration::class, durationConverter) + .toJsonString(payload) val body = jsonRequestBody.toRequestBody(jsonMediaType) val clientHeaders = clientConfig.headers ?: emptyMap() diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index b9715178..2a5a1686 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -1,6 +1,9 @@ package com.inngest +import com.beust.klaxon.Converter import com.beust.klaxon.Json +import com.beust.klaxon.JsonValue +import com.beust.klaxon.KlaxonException import java.time.Duration @Target(AnnotationTarget.CLASS) @@ -58,16 +61,37 @@ abstract class InngestFunction { return this; } - fun build(): Map { - return buildMap { - put("triggers", triggers) - if (name != null) { - put("name", name!!) - } - if (batchEvents != null) { - put("batchEvents", batchEvents!!.maxSize) - } - } + private fun buildSteps(): Map { + return mapOf( + "step" to + StepConfig( + id = "step", + name = "step", + retries = + mapOf( + // TODO - Pull from conf option + "attempts" to 3, + ), + runtime = + hashMapOf( +// "type" to scheme, +// // TODO - Create correct URL +// "url" to +// "$serveUrl?fnId=${config.id}&stepId=step", + ), + ), + ) + } + + fun build(id: String): InternalFunctionConfig { + val config = InternalFunctionConfig( + id, + name, + triggers, + batchEvents, + steps = buildSteps() + ) + return config } } @@ -95,15 +119,7 @@ abstract class InngestFunction { // } val triggers = buildEventTriggers() + buildCronTriggers() + buildIfTriggers() val builder = Builder() - val config = this.config(builder).build() - -// val fnConfig = -// InternalFunctionOptions( -// id = id(), -// config = config, -// triggers = triggers.toTypedArray(), -// ) - + val config = this.config(builder).build(id()) return InternalInngestFunction(config, this::execute) } @@ -129,8 +145,24 @@ constructor( @Json(serializeNull = false) val cron: String? = null, ) -private data class BatchEvents( +@Target(AnnotationTarget.FIELD) +annotation class KlaxonDuration + +val durationConverter = object : Converter { + override fun canConvert(cls: Class<*>): Boolean = cls == Duration::class.java; + + // TODO Implement this - parse 30s into duration of seconds + override fun fromJson(jv: JsonValue): Duration = + throw KlaxonException("Duration parse not implemented: ${jv.string}") + + override fun toJson(value: Any): String = """"${(value as Duration).seconds}s"""" +} + +data class BatchEvents +@JvmOverloads +constructor( val maxSize: Int, + @KlaxonDuration val timeout: Duration, @Json(serializeNull = false) val key: String? = null ) From f6808d502226dfed21c601a28977fc12af53c692 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Thu, 18 Jul 2024 08:05:00 -0700 Subject: [PATCH 03/12] Get serve url and app id passed through to builder --- inngest/src/main/kotlin/com/inngest/Comm.kt | 2 - .../src/main/kotlin/com/inngest/Function.kt | 43 +++---------------- .../kotlin/com/inngest/InngestFunction.kt | 24 +++++------ .../src/main/kotlin/com/inngest/ktor/Route.kt | 2 +- 4 files changed, 19 insertions(+), 52 deletions(-) diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index 2773c4b9..8c056540 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -60,8 +60,6 @@ class CommHandler( functionId: String, requestBody: String, ): CommResponse { - println(requestBody) - try { val payload = Klaxon().parse(requestBody) // TODO - check that payload is not null and throw error diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index ebd8cdc3..86e9e20e 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -123,16 +123,16 @@ internal interface Function { // TODO: make this implement the Function interface internal open class InternalInngestFunction( - private val config: InternalFunctionConfig, + private val configBuilder: InngestFunction.Builder, val handler: (ctx: FunctionContext, step: Step) -> Any?, ) { - constructor(config: InternalFunctionConfig, handler: BiFunction) : this( - config, + constructor(configBuilder: InngestFunction.Builder, handler: BiFunction) : this( + configBuilder, handler.toKotlin(), ) // fun id() = config.get("id") - fun id() = config.id + fun id() = configBuilder.id // TODO - Validate options and trigger @@ -226,38 +226,7 @@ internal open class InternalInngestFunction( } fun getFunctionConfig(serveUrl: String, client: Inngest): InternalFunctionConfig { - // TODO use URL objects instead of strings so we can fetch things like scheme - val scheme = serveUrl.split("://")[0] - - println(config) - - return config; - -// return InternalFunctionConfig( -// id = String.format("%s-%s", client.appId, config.id), -// name = config.config.get("name"), -// triggers = config.triggers, -// -// steps = -// mapOf( -// "step" to -// StepConfig( -// id = "step", -// name = "step", -// retries = -// mapOf( -// // TODO - Pull from FunctionOptions -// "attempts" to 3, -// ), -// runtime = -// hashMapOf( -// "type" to scheme, -// // TODO - Create correct URL -// "url" to -// "$serveUrl?fnId=${config.id}&stepId=step", -// ), -// ), -// ), -// ) + // TODO use URL objects for serveUrl instead of strings so we can fetch things like scheme + return configBuilder.build(client.appId, serveUrl) } } diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index 2a5a1686..57e13852 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -39,7 +39,7 @@ abstract class InngestFunction { // abstract val id: String; // TODO: Throw illegal argument exception - class Builder { + class Builder(val id: String) { private var name: String? = null; private var triggers: MutableList = mutableListOf(); private var batchEvents: BatchEvents? = null; @@ -61,7 +61,8 @@ abstract class InngestFunction { return this; } - private fun buildSteps(): Map { + private fun buildSteps(serveUrl: String): Map { + val scheme = serveUrl.split("://")[0] return mapOf( "step" to StepConfig( @@ -74,22 +75,21 @@ abstract class InngestFunction { ), runtime = hashMapOf( -// "type" to scheme, -// // TODO - Create correct URL -// "url" to -// "$serveUrl?fnId=${config.id}&stepId=step", + "type" to scheme, + "url" to "$serveUrl?fnId=${id}&stepId=step", ), ), ) } - fun build(id: String): InternalFunctionConfig { + fun build(appId: String, serverUrl: String): InternalFunctionConfig { + val globalId = String.format("%s-%s", appId, id) val config = InternalFunctionConfig( - id, + globalId, name, triggers, batchEvents, - steps = buildSteps() + steps = buildSteps(serverUrl) ) return config } @@ -118,9 +118,9 @@ abstract class InngestFunction { // throw Exception("FunctionConfig annotation is required to setup an InngestFunction") // } val triggers = buildEventTriggers() + buildCronTriggers() + buildIfTriggers() - val builder = Builder() - val config = this.config(builder).build(id()) - return InternalInngestFunction(config, this::execute) + val builder = Builder(id()) + val configBuilder = this.config(builder) + return InternalInngestFunction(configBuilder, this::execute) } // TODO: DRY this diff --git a/inngest/src/main/kotlin/com/inngest/ktor/Route.kt b/inngest/src/main/kotlin/com/inngest/ktor/Route.kt index 23406c34..bfa7ecd6 100644 --- a/inngest/src/main/kotlin/com/inngest/ktor/Route.kt +++ b/inngest/src/main/kotlin/com/inngest/ktor/Route.kt @@ -66,7 +66,7 @@ fun Route.serve( call.response.status( HttpStatusCode(response.statusCode.code, response.statusCode.message), ) - println("response: " + response.body) +// println("response: " + response.body) call.respond(response.body) } catch (e: Exception) { call.respond(HttpStatusCode.InternalServerError, e.toString()) From a602505b3fd456ea9fa87212337c4f82d0fdc79e Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Thu, 18 Jul 2024 10:43:35 -0700 Subject: [PATCH 04/12] Add event id, batch demo --- .../com/inngest/testserver/ProcessAlbum.kt | 24 +++++++++++-------- .../inngest/testserver/RestoreFromGlacier.kt | 8 ++++++- inngest/src/main/kotlin/com/inngest/Event.kt | 1 + 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index 61280803..89ab21f9 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -12,7 +12,7 @@ class ProcessAlbum : InngestFunction() { return builder .name("Process Album!") .trigger(InngestFunctionTrigger(event = "delivery/process.requested")) - .batchEvents(30, Duration.ofSeconds(30)) + .batchEvents(30, Duration.ofSeconds(10)) } override fun execute( @@ -20,17 +20,21 @@ class ProcessAlbum : InngestFunction() { step: Step, ): LinkedHashMap { - // NOTE - App ID is set on the serve level - val res = step.invoke>( - "restore-album", - "ktor-dev", - "RestoreFromGlacier", - mapOf("some-arg" to "awesome"), - null, - +// val list = ctx.events.map { e -> e.data.get("something") } +// println(list); + + for (evt in ctx.events) { +// println(evt); + // NOTE - App ID is set on the serve level + val res = step.invoke>( + "restore-album-${evt.id}", + "ktor-dev", + "RestoreFromGlacier", + mapOf("something" to evt.data.get("something")), + null, ) + } -// throw NonRetriableError("Could not restore") return linkedMapOf("hello" to true) } diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt index acfadd29..e41e9981 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt @@ -4,8 +4,14 @@ import com.inngest.* import java.time.Duration @FunctionConfig(id = "RestoreFromGlacier", name = "RestoreFromGlacier") -@FunctionEventTrigger(event = "delivery/restore.requested") class RestoreFromGlacier : InngestFunction() { + + override fun config(builder: InngestFunction.Builder): InngestFunction.Builder { + return builder + .name("Restore from Glacier") + .trigger(InngestFunctionTrigger(event = "delivery/restore.requested")) + } + override fun execute( ctx: FunctionContext, step: Step, diff --git a/inngest/src/main/kotlin/com/inngest/Event.kt b/inngest/src/main/kotlin/com/inngest/Event.kt index 84580716..06e5ba0c 100644 --- a/inngest/src/main/kotlin/com/inngest/Event.kt +++ b/inngest/src/main/kotlin/com/inngest/Event.kt @@ -1,6 +1,7 @@ package com.inngest data class Event( + val id: String, val name: String, val data: LinkedHashMap, val user: LinkedHashMap? = null, From c1e61cbcd6bb09916bed50e755c9f9ee6ba54d93 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Thu, 18 Jul 2024 10:47:29 -0700 Subject: [PATCH 05/12] Improve example --- .../main/kotlin/com/inngest/testserver/ProcessAlbum.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index 89ab21f9..3cafb150 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -6,8 +6,8 @@ import java.time.Duration @FunctionConfig(id = "ProcessAlbum", name = "ProcessAlbum") @FunctionEventTrigger(event = "delivery/process.requested") class ProcessAlbum : InngestFunction() { - // override required -// override val id = "ProcessAlbum" + + // override val id = "ProcessAlbum" override fun config(builder: InngestFunction.Builder): InngestFunction.Builder { return builder .name("Process Album!") @@ -27,10 +27,10 @@ class ProcessAlbum : InngestFunction() { // println(evt); // NOTE - App ID is set on the serve level val res = step.invoke>( - "restore-album-${evt.id}", + "restore-album-${evt.data.get("albumId")}", "ktor-dev", "RestoreFromGlacier", - mapOf("something" to evt.data.get("something")), + evt.data, null, ) } From 3e7c9658dcd187d4e27754babd896c7e02ebcc65 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Thu, 18 Jul 2024 15:37:29 -0700 Subject: [PATCH 06/12] Refactor builder class --- .../com/inngest/testserver/ProcessAlbum.kt | 10 +- .../inngest/testserver/RestoreFromGlacier.kt | 7 +- .../src/main/kotlin/com/inngest/Function.kt | 8 +- .../kotlin/com/inngest/InngestFunction.kt | 87 +----------- .../inngest/InngestFunctionConfigBuilder.kt | 134 ++++++++++++++++++ .../com/inngest/InngestFunctionTriggers.kt | 35 +++++ 6 files changed, 185 insertions(+), 96 deletions(-) create mode 100644 inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt create mode 100644 inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index 3cafb150..8ad692d9 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -3,15 +3,18 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration + @FunctionConfig(id = "ProcessAlbum", name = "ProcessAlbum") @FunctionEventTrigger(event = "delivery/process.requested") class ProcessAlbum : InngestFunction() { - // override val id = "ProcessAlbum" - override fun config(builder: InngestFunction.Builder): InngestFunction.Builder { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { return builder .name("Process Album!") - .trigger(InngestFunctionTrigger(event = "delivery/process.requested")) + .triggerEvent("delivery/process.requested") + .triggerCron("5 0 * 8 *") + .trigger( + InngestFunctionTriggers.Cron("5 0 * 8 *")) .batchEvents(30, Duration.ofSeconds(10)) } @@ -23,6 +26,7 @@ class ProcessAlbum : InngestFunction() { // val list = ctx.events.map { e -> e.data.get("something") } // println(list); + for (evt in ctx.events) { // println(evt); // NOTE - App ID is set on the serve level diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt index e41e9981..2fda1400 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt @@ -3,13 +3,14 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration -@FunctionConfig(id = "RestoreFromGlacier", name = "RestoreFromGlacier") +//@FunctionConfig(id = "RestoreFromGlacier", name = "RestoreFromGlacier") class RestoreFromGlacier : InngestFunction() { - override fun config(builder: InngestFunction.Builder): InngestFunction.Builder { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { return builder + .id("RestoreFromGlacier") .name("Restore from Glacier") - .trigger(InngestFunctionTrigger(event = "delivery/restore.requested")) + .trigger(InngestFunctionTriggers.Event("delivery/restore.requested")) } override fun execute( diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index 86e9e20e..80bfc31f 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -10,7 +10,7 @@ internal data class InternalFunctionOptions( val triggers: Array, ) -data class InternalFunctionTrigger +internal open class InternalFunctionTrigger @JvmOverloads constructor( @Json(serializeNull = false) val event: String? = null, @@ -116,17 +116,15 @@ data class SendEventPayload(val event_ids: Array) internal interface Function { fun id(): String - -// fun config(): InternalFunctionConfig } // TODO: make this implement the Function interface internal open class InternalInngestFunction( - private val configBuilder: InngestFunction.Builder, + private val configBuilder: InngestFunctionConfigBuilder, val handler: (ctx: FunctionContext, step: Step) -> Any?, ) { - constructor(configBuilder: InngestFunction.Builder, handler: BiFunction) : this( + constructor(configBuilder: InngestFunctionConfigBuilder, handler: BiFunction) : this( configBuilder, handler.toKotlin(), ) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index 57e13852..4d847a34 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -38,64 +38,8 @@ abstract class InngestFunction { // abstract val id: String; - // TODO: Throw illegal argument exception - class Builder(val id: String) { - private var name: String? = null; - private var triggers: MutableList = mutableListOf(); - private var batchEvents: BatchEvents? = null; - - fun name(name: String): Builder { - this.name = name - return this - } - - fun trigger(trigger: InngestFunctionTrigger): Builder { - // TODO - Check max triggers - // TODO - Check mutually exclusive opts (cron v. event+if?) - this.triggers.add(trigger) - return this - } - - fun batchEvents(maxSize: Int, timeout: Duration, key: String? = null): Builder { - this.batchEvents = BatchEvents(maxSize, timeout, key) - return this; - } - - private fun buildSteps(serveUrl: String): Map { - val scheme = serveUrl.split("://")[0] - return mapOf( - "step" to - StepConfig( - id = "step", - name = "step", - retries = - mapOf( - // TODO - Pull from conf option - "attempts" to 3, - ), - runtime = - hashMapOf( - "type" to scheme, - "url" to "$serveUrl?fnId=${id}&stepId=step", - ), - ), - ) - } - - fun build(appId: String, serverUrl: String): InternalFunctionConfig { - val globalId = String.format("%s-%s", appId, id) - val config = InternalFunctionConfig( - globalId, - name, - triggers, - batchEvents, - steps = buildSteps(serverUrl) - ) - return config - } - } - open fun config(builder: Builder): Builder { + open fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { return builder } @@ -118,7 +62,7 @@ abstract class InngestFunction { // throw Exception("FunctionConfig annotation is required to setup an InngestFunction") // } val triggers = buildEventTriggers() + buildCronTriggers() + buildIfTriggers() - val builder = Builder(id()) + val builder = InngestFunctionConfigBuilder() val configBuilder = this.config(builder) return InternalInngestFunction(configBuilder, this::execute) } @@ -137,32 +81,5 @@ abstract class InngestFunction { .map { InternalFunctionTrigger(event = (it as FunctionIfTrigger).`if`) } } -data class InngestFunctionTrigger -@JvmOverloads -constructor( - @Json(serializeNull = false) val event: String? = null, - @Json(name = "expression", serializeNull = false) val `if`: String? = null, - @Json(serializeNull = false) val cron: String? = null, -) - -@Target(AnnotationTarget.FIELD) -annotation class KlaxonDuration -val durationConverter = object : Converter { - override fun canConvert(cls: Class<*>): Boolean = cls == Duration::class.java; - // TODO Implement this - parse 30s into duration of seconds - override fun fromJson(jv: JsonValue): Duration = - throw KlaxonException("Duration parse not implemented: ${jv.string}") - - override fun toJson(value: Any): String = """"${(value as Duration).seconds}s"""" -} - -data class BatchEvents -@JvmOverloads -constructor( - val maxSize: Int, - @KlaxonDuration - val timeout: Duration, - @Json(serializeNull = false) val key: String? = null -) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt new file mode 100644 index 00000000..3fc94bf4 --- /dev/null +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -0,0 +1,134 @@ +package com.inngest + +import com.beust.klaxon.Converter +import com.beust.klaxon.Json +import com.beust.klaxon.JsonValue +import com.beust.klaxon.KlaxonException +import java.time.Duration + +// TODO: Throw illegal argument exception +class InngestFunctionConfigBuilder() { + var id: String? = null; + private var name: String? = null; + private var triggers: MutableList = mutableListOf(); + private var batchEvents: BatchEvents? = null; + + /** + * @param id A unique identifier for the function that should not change over time + */ + fun id(id: String): InngestFunctionConfigBuilder { + this.id = id; + return this + } + + /** + * @param name A formatted name for the function, visible in UIs and logs + */ + fun name(name: String): InngestFunctionConfigBuilder { + this.name = name + return this + } + + /** + * Define a function trigger using a given InngestFunctionTrigger nested class constructor + * + * @param trigger An event or cron function trigger + */ + fun trigger(trigger: InngestFunctionTrigger): InngestFunctionConfigBuilder { + // TODO - Check max triggers + // TODO - Check mutually exclusive opts (cron v. event+if?) + this.triggers.add(trigger) + return this + } + + /** + * Define a function trigger for any matching events with a given name. + * Optionally filter matching events with an expression statement. + * + * @param event The name of the event to trigger on + * @param if A CEL expression to filter matching events to trigger on (optional). + * Example: "event.data.appId == '12345'" + */ + fun triggerEvent(event: String, `if`: String? = null): InngestFunctionConfigBuilder { + this.triggers.add(InngestFunctionTriggers.Event(event, `if`)) + return this + } + + /** + * @param cron A crontab expression + */ + fun triggerCron(cron: String): InngestFunctionConfigBuilder { + this.triggers.add(InngestFunctionTriggers.Cron(cron)) + return this + } + + /** + * Configure the function to be executed with batches of events (1 to n). + * Events will be added into a batch until the maxSize has been reached or + * until the timeout has expired. Any events in this batch will be passed + * to the executing function. + * + * @param maxSize The maximum number of events to execute the function with + * @param timeout The maximum duration of time to wait before executing the function + * @param key A CEL expression to group events batches by. Example: "event.data.destinationId" + */ + fun batchEvents(maxSize: Int, timeout: Duration, key: String? = null): InngestFunctionConfigBuilder { + this.batchEvents = BatchEvents(maxSize, timeout, key) + return this; + } + + private fun buildSteps(serveUrl: String): Map { + val scheme = serveUrl.split("://")[0] + return mapOf( + "step" to + StepConfig( + id = "step", + name = "step", + retries = + mapOf( + // TODO - Pull from conf option + "attempts" to 3, + ), + runtime = + hashMapOf( + "type" to scheme, + "url" to "$serveUrl?fnId=${id}&stepId=step", + ), + ), + ) + } + + fun build(appId: String, serverUrl: String): InternalFunctionConfig { + val globalId = String.format("%s-%s", appId, id) + val config = InternalFunctionConfig( + globalId, + name, + triggers, + batchEvents, + steps = buildSteps(serverUrl) + ) + return config + } +} + +@Target(AnnotationTarget.FIELD) +annotation class KlaxonDuration + +val durationConverter = object : Converter { + override fun canConvert(cls: Class<*>): Boolean = cls == Duration::class.java; + + // TODO Implement this - parse 30s into duration of seconds + override fun fromJson(jv: JsonValue): Duration = + throw KlaxonException("Duration parse not implemented: ${jv.string}") + + override fun toJson(value: Any): String = """"${(value as Duration).seconds}s"""" +} + +data class BatchEvents +@JvmOverloads +constructor( + val maxSize: Int, + @KlaxonDuration + val timeout: Duration, + @Json(serializeNull = false) val key: String? = null +) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt new file mode 100644 index 00000000..61385c41 --- /dev/null +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt @@ -0,0 +1,35 @@ +package com.inngest + +import com.beust.klaxon.Json + +abstract class InngestFunctionTrigger // or interface or data class +@JvmOverloads +constructor( + @Json(serializeNull = false) val event: String? = null, + @Json(serializeNull = false) val `if`: String? = null, + @Json(serializeNull = false) val cron: String? = null, +) + +/** + * A class that contains nested classes to define function triggers + */ +class InngestFunctionTriggers { + + /** + * Define a function trigger for any matching events with a given name. + * Optionally filter matching events with an expression statement. + * + * @param event The name of the event to trigger on + * @param if A CEL expression to filter matching events to trigger on (optional). + * Example: "event.data.appId == '12345'" + */ + class Event(event: String, `if`: String? = null) : InngestFunctionTrigger(event, `if`, null) {} + + /** + * Define a function trigger that will execute on a given crontab schedule. + * + * @param cron A crontab expression. Example: "0 9 * * 1" + */ + class Cron(cron: String) : InngestFunctionTrigger(null, null, cron) {} + +} From e891cb39c7ffa4f5c2aa6bbb1c0b95ad4bd7cc1f Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Thu, 18 Jul 2024 16:14:55 -0700 Subject: [PATCH 07/12] Cleanup add tests --- .../com/inngest/testserver/ProcessAlbum.kt | 5 +- inngest/src/main/kotlin/com/inngest/Errors.kt | 3 + .../src/main/kotlin/com/inngest/Function.kt | 10 ++- .../kotlin/com/inngest/InngestFunction.kt | 72 ++++--------------- .../inngest/InngestFunctionConfigBuilder.kt | 3 + .../com/inngest/InngestFunctionTriggers.kt | 4 ++ 6 files changed, 32 insertions(+), 65 deletions(-) create mode 100644 inngest/src/main/kotlin/com/inngest/Errors.kt diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index 8ad692d9..9fad2fbc 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -4,8 +4,9 @@ import com.inngest.* import java.time.Duration -@FunctionConfig(id = "ProcessAlbum", name = "ProcessAlbum") -@FunctionEventTrigger(event = "delivery/process.requested") +/** + * A demo function that accepts an event in a batch and invokes a child function + */ class ProcessAlbum : InngestFunction() { override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { diff --git a/inngest/src/main/kotlin/com/inngest/Errors.kt b/inngest/src/main/kotlin/com/inngest/Errors.kt new file mode 100644 index 00000000..35aa12e2 --- /dev/null +++ b/inngest/src/main/kotlin/com/inngest/Errors.kt @@ -0,0 +1,3 @@ +package com.inngest + +class InngestInvalidConfigurationException(message: String) : Exception(message) diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index 80bfc31f..b2529edf 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -120,6 +120,10 @@ internal interface Function { // TODO: make this implement the Function interface +/** + * An internal class that accepts the configuration and a function handler + * and handles the execution and memoization of an InngestFunction + */ internal open class InternalInngestFunction( private val configBuilder: InngestFunctionConfigBuilder, val handler: (ctx: FunctionContext, step: Step) -> Any?, @@ -129,11 +133,8 @@ internal open class InternalInngestFunction( handler.toKotlin(), ) - // fun id() = config.get("id") fun id() = configBuilder.id - // TODO - Validate options and trigger - fun call( ctx: FunctionContext, client: Inngest, @@ -142,9 +143,6 @@ internal open class InternalInngestFunction( val state = State(requestBody) val step = Step(state, client) - // DEBUG -// println(state) - try { val data = handler(ctx, step) return StepResult( diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index 4d847a34..20a6c327 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -1,84 +1,42 @@ package com.inngest -import com.beust.klaxon.Converter -import com.beust.klaxon.Json -import com.beust.klaxon.JsonValue -import com.beust.klaxon.KlaxonException -import java.time.Duration - -@Target(AnnotationTarget.CLASS) -@MustBeDocumented -annotation class FunctionConfig( - val id: String, - val name: String, -) - -@Target(AnnotationTarget.CLASS) -@Repeatable -annotation class FunctionEventTrigger( - @Json(serializeNull = false) val event: String, -) - -@Target(AnnotationTarget.CLASS) -@Repeatable -@MustBeDocumented -annotation class FunctionCronTrigger( - @Json(serializeNull = false) val cron: String, -) - -@Target(AnnotationTarget.CLASS) -@Repeatable -@MustBeDocumented -annotation class FunctionIfTrigger( - @Json(serializeNull = false) val `if`: String, -) - - abstract class InngestFunction { -// abstract val id: String; - - open fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { return builder } + /** + * The function handler that will be run whenever the function is executed. + * + * @param ctx The function context including event(s) that triggered the function + * @param step A class with methods to define steps within the function + */ abstract fun execute( ctx: FunctionContext, step: Step, ): Any? - private val config = this::class.annotations.find { it.annotationClass == FunctionConfig::class } + fun buildConfig(): InngestFunctionConfigBuilder { + val builder = InngestFunctionConfigBuilder() + return this.config(builder); + } fun id(): String { - if (config == null || config !is FunctionConfig) { - throw Exception("InngestFuncConfig annotation is required to setup an InngestFunc") + try { + return buildConfig().id!! + } catch (e: Exception) { + throw InngestInvalidConfigurationException("Function id must be configured via builder") } - return config.id } internal fun toInngestFunction(): InternalInngestFunction { -// if (config == null || config !is FunctionConfig) { -// throw Exception("FunctionConfig annotation is required to setup an InngestFunction") -// } - val triggers = buildEventTriggers() + buildCronTriggers() + buildIfTriggers() val builder = InngestFunctionConfigBuilder() val configBuilder = this.config(builder) return InternalInngestFunction(configBuilder, this::execute) } - // TODO: DRY this - private fun buildEventTriggers(): List = - this::class.annotations.filter { it.annotationClass == FunctionEventTrigger::class } - .map { InternalFunctionTrigger(event = (it as FunctionEventTrigger).event) } - - private fun buildCronTriggers(): List = - this::class.annotations.filter { it.annotationClass == FunctionCronTrigger::class } - .map { InternalFunctionTrigger(cron = (it as FunctionCronTrigger).cron) } - - private fun buildIfTriggers(): List = - this::class.annotations.filter { it.annotationClass == FunctionIfTrigger::class } - .map { InternalFunctionTrigger(event = (it as FunctionIfTrigger).`if`) } + // TODO - Add toFailureHandler method to generate a second function if configured } diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 3fc94bf4..45b544d6 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -99,6 +99,9 @@ class InngestFunctionConfigBuilder() { } fun build(appId: String, serverUrl: String): InternalFunctionConfig { + if (id == null) { + throw InngestInvalidConfigurationException("Function id must be configured via builder") + } val globalId = String.format("%s-%s", appId, id) val config = InternalFunctionConfig( globalId, diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt index 61385c41..a2330127 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt @@ -2,12 +2,16 @@ package com.inngest import com.beust.klaxon.Json +/** + * A generic class for defining and serializing function triggers + */ abstract class InngestFunctionTrigger // or interface or data class @JvmOverloads constructor( @Json(serializeNull = false) val event: String? = null, @Json(serializeNull = false) val `if`: String? = null, @Json(serializeNull = false) val cron: String? = null, + // IDEA - Add timeout and re-use for cancelOn? ) /** From cd29418cd66cb414761dd0a01d207c5d3748cb14 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Mon, 22 Jul 2024 12:36:44 -0400 Subject: [PATCH 08/12] Add concurrency scope and enum converter --- .../inngest/testserver/FollowupFunction.kt | 22 ++++---- .../com/inngest/testserver/ProcessAlbum.kt | 5 +- .../inngest/testserver/RestoreFromGlacier.kt | 1 + .../inngest/testserver/UserSignupFunction.kt | 10 ++-- inngest/src/main/kotlin/com/inngest/Comm.kt | 1 + .../src/main/kotlin/com/inngest/Function.kt | 4 +- .../src/main/kotlin/com/inngest/HttpClient.kt | 1 + .../kotlin/com/inngest/InngestFunction.kt | 4 +- .../inngest/InngestFunctionConfigBuilder.kt | 54 ++++++++++++++++++- 9 files changed, 80 insertions(+), 22 deletions(-) diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt index b7e49ae3..e91b3905 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt @@ -2,15 +2,13 @@ package com.inngest.testserver import com.inngest.* -@FunctionConfig(id = "fn-follow-up", name = "My follow up function!") -@FunctionEventTrigger(event = "user.signup.completed") -@FunctionEventTrigger(event = "random-event") -class FollowupFunction : InngestFunction() { - override fun execute( - ctx: FunctionContext, - step: Step, - ): LinkedHashMap { - println("-> follow up handler called " + ctx.event.name) - return ctx.event.data - } -} + +//class FollowupFunction : InngestFunction() { +// override fun execute( +// ctx: FunctionContext, +// step: Step, +// ): LinkedHashMap { +// println("-> follow up handler called " + ctx.event.name) +// return ctx.event.data +// } +//} diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index 9fad2fbc..5a6807af 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -11,11 +11,10 @@ class ProcessAlbum : InngestFunction() { override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { return builder + .id("ProcessAlbum") .name("Process Album!") .triggerEvent("delivery/process.requested") - .triggerCron("5 0 * 8 *") - .trigger( - InngestFunctionTriggers.Cron("5 0 * 8 *")) + .trigger(InngestFunctionTriggers.Cron("5 0 * 8 *")) .batchEvents(30, Duration.ofSeconds(10)) } diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt index 2fda1400..d4ae841f 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt @@ -11,6 +11,7 @@ class RestoreFromGlacier : InngestFunction() { .id("RestoreFromGlacier") .name("Restore from Glacier") .trigger(InngestFunctionTriggers.Event("delivery/restore.requested")) + .concurrency(10, null, ConcurrencyScope.ENVIRONMENT) } override fun execute( diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt index 749db8f6..9ac5a40e 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt @@ -3,9 +3,13 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration -@FunctionConfig(id = "fn-id-slug", name = "My function!") -@FunctionEventTrigger(event = "user-signup") -class UserSignupFunction : InngestFunction() { +class ProcessUserSignup : InngestFunction() { + + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { + return builder.id("process-user-signup") + .triggerEvent("user-signup") + } + override fun execute( ctx: FunctionContext, step: Step, diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index 8c056540..d07fb1e7 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -114,6 +114,7 @@ class CommHandler( try { return Klaxon() .fieldConverter(KlaxonDuration::class, durationConverter) + .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) .toJsonString(payload) } catch (e: Exception) { println(e); diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index b2529edf..f5e29a18 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -77,11 +77,13 @@ data class StepConfig( val runtime: HashMap = hashMapOf("type" to "http"), ) -class InternalFunctionConfig @JvmOverloads constructor( +internal class InternalFunctionConfig @JvmOverloads constructor( val id: String, val name: String?, val triggers: MutableList, @Json(serializeNull = false) + val concurrency: MutableList?, + @Json(serializeNull = false) val batchEvents: BatchEvents?, val steps: Map, ) { diff --git a/inngest/src/main/kotlin/com/inngest/HttpClient.kt b/inngest/src/main/kotlin/com/inngest/HttpClient.kt index 2e54f7f3..305210b1 100644 --- a/inngest/src/main/kotlin/com/inngest/HttpClient.kt +++ b/inngest/src/main/kotlin/com/inngest/HttpClient.kt @@ -28,6 +28,7 @@ internal class HttpClient(private val clientConfig: RequestConfig) { ): okhttp3.Request { val jsonRequestBody = Klaxon() .fieldConverter(KlaxonDuration::class, durationConverter) + .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) .toJsonString(payload) val body = jsonRequestBody.toRequestBody(jsonMediaType) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index 20a6c327..8a084673 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -26,7 +26,9 @@ abstract class InngestFunction { try { return buildConfig().id!! } catch (e: Exception) { - throw InngestInvalidConfigurationException("Function id must be configured via builder") + throw InngestInvalidConfigurationException( + "Function id must be configured via builder: ${this.javaClass.name}" + ) } } diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 45b544d6..2bca05c7 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -11,6 +11,7 @@ class InngestFunctionConfigBuilder() { var id: String? = null; private var name: String? = null; private var triggers: MutableList = mutableListOf(); + private var concurrency: MutableList? = null; private var batchEvents: BatchEvents? = null; /** @@ -77,6 +78,25 @@ class InngestFunctionConfigBuilder() { return this; } + /** + * Configure step concurrency limit + * + * @param limit Maximum number of concurrent executing steps across function type + * @param key A CEL expression to apply limit using event payload properties. Example: "event.data.destinationId" + * @param scope The scope to apply the limit to. Options + */ + fun concurrency(limit: Int, key: String? = null, scope: ConcurrencyScope? = null): InngestFunctionConfigBuilder { + val c = Concurrency(limit, key, scope) + // TODO - Limit concurrency length to 2 + if (this.concurrency == null) { + this.concurrency = mutableListOf(c) + } else { + this.concurrency?.add(c) + } + println(scope?.value) + return this + } + private fun buildSteps(serveUrl: String): Map { val scheme = serveUrl.split("://")[0] return mapOf( @@ -98,15 +118,18 @@ class InngestFunctionConfigBuilder() { ) } - fun build(appId: String, serverUrl: String): InternalFunctionConfig { + internal fun build(appId: String, serverUrl: String): InternalFunctionConfig { if (id == null) { throw InngestInvalidConfigurationException("Function id must be configured via builder") } val globalId = String.format("%s-%s", appId, id) + println("concurrency") + println(concurrency) val config = InternalFunctionConfig( globalId, name, triggers, + concurrency, batchEvents, steps = buildSteps(serverUrl) ) @@ -127,7 +150,34 @@ val durationConverter = object : Converter { override fun toJson(value: Any): String = """"${(value as Duration).seconds}s"""" } -data class BatchEvents +@Target(AnnotationTarget.FIELD) +annotation class KlaxonConcurrencyScope + +val concurrencyScopeConverter = object : Converter { + override fun canConvert(cls: Class<*>): Boolean = cls.isEnum + override fun fromJson(jv: JsonValue): ConcurrencyScope = enumValueOf(jv.string!!) + override fun toJson(value: Any): String = """"${(value as ConcurrencyScope).value}"""" +} + +// TODO - Convert enum element to value, not name +enum class ConcurrencyScope(val value: String) { + ACCOUNT("account"), + ENVIRONMENT("env"), + FUNCTION("fn"), +} + +internal data class Concurrency +@JvmOverloads +constructor( + val limit: Int, + @Json(serializeNull = false) + val key: String? = null, + @Json(serializeNull = false) + @KlaxonConcurrencyScope + val scope: ConcurrencyScope? = null, +) + +internal data class BatchEvents @JvmOverloads constructor( val maxSize: Int, From 71f1a590497c617156674671f4bcb88888847cf9 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Mon, 22 Jul 2024 12:38:29 -0400 Subject: [PATCH 09/12] Add config builder test --- .../InngestFunctionConfigBuilderTest.kt | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt diff --git a/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt new file mode 100644 index 00000000..94c09279 --- /dev/null +++ b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt @@ -0,0 +1,27 @@ +package com.inngest + +import com.inngest.signingkey.InvalidSigningKeyException +import kotlin.test.Test +import kotlin.test.assertEquals + +import org.junit.jupiter.api.Assertions.* +import kotlin.test.assertFailsWith + +class InngestFunctionConfigBuilderTest { + + @Test + fun testGlobalId() { + val config = InngestFunctionConfigBuilder() + .id("test-id") + .build("app-id", "https://mysite.com/api/inngest") + assertEquals("app-id-test-id", config.id) + } + + @Test + fun testMissingId() { + assertFailsWith { + InngestFunctionConfigBuilder() + .build("app-id", "https://mysite.com/api/inngest") + } + } +} From 8697038a46b59bb6871474650037410823363d20 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Mon, 22 Jul 2024 12:58:09 -0400 Subject: [PATCH 10/12] Update to use config builder. --- .../springbootdemo/FollowupFunction.java | 17 +++++++++++------ .../springbootdemo/UserSignupFunction.java | 12 ++++++++++-- .../testfunctions/CustomStepFunction.java | 12 ++++++++++-- .../testfunctions/EmptyStepFunction.java | 13 +++++++++++-- .../NonRetriableErrorFunction.java | 12 ++++++++++-- .../testfunctions/RetriableErrorFunction.java | 13 +++++++++++-- .../testfunctions/SendEventFunction.java | 12 ++++++++++-- .../testfunctions/SleepStepFunction.java | 13 +++++++++++-- .../testfunctions/TwoStepsFunction.java | 13 +++++++++++-- .../testfunctions/WaitForEventFunction.java | 12 ++++++++++-- inngest/src/main/kotlin/com/inngest/Comm.kt | 1 + .../inngest/InngestFunctionConfigBuilder.kt | 19 +++++++++++++------ 12 files changed, 119 insertions(+), 30 deletions(-) diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/FollowupFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/FollowupFunction.java index 7d881957..df3f0823 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/FollowupFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/FollowupFunction.java @@ -5,13 +5,18 @@ import java.util.LinkedHashMap; -@FunctionConfig( - id = "fn-follow-up", - name = "My follow up function!" -) -@FunctionEventTrigger(event = "user.signup.completed") -@FunctionEventTrigger(event = "random-event") public class FollowupFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("fn-follow-up") + .name("My follow up function!") + .triggerEvent("user.signup.completed") + .triggerEvent("random-event"); + } + @Override public LinkedHashMap execute(@NotNull FunctionContext ctx, @NotNull Step step) { System.out.println("-> follow up handler called " + ctx.getEvent().getName()); diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/UserSignupFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/UserSignupFunction.java index b4819504..ef516fca 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/UserSignupFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/UserSignupFunction.java @@ -6,9 +6,17 @@ import java.time.Duration; import java.util.HashMap; -@FunctionConfig(id = "fn-id-slug", name = "My function!") -@FunctionEventTrigger(event = "user-signup") public class UserSignupFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("fn-id-slug") + .name("My Function!") + .triggerEvent("user-signup"); + } + @Override public HashMap execute(@NotNull FunctionContext ctx, @NotNull Step step) { int x = 10; diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CustomStepFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CustomStepFunction.java index b0ba7dac..aaea1ab1 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CustomStepFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CustomStepFunction.java @@ -1,11 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "custom-result-fn", name = "Custom Result Function") -@FunctionEventTrigger(event = "test/custom.result.step") public class CustomStepFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("custom-result-fn") + .name("Custom Result Function") + .triggerEvent("test/custom.result.step"); + } + private final int count = 0; @Override diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/EmptyStepFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/EmptyStepFunction.java index 807fba34..abc638ef 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/EmptyStepFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/EmptyStepFunction.java @@ -1,10 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "no-step-fn", name = "No Step Function") -@FunctionEventTrigger(event = "test/no-step") public class EmptyStepFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("no-step-fn") + .name("No Step Function") + .triggerEvent("test/no-step"); + } + @Override public String execute(FunctionContext ctx, Step step) { return "hello world"; diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java index 431435a1..b6f9527a 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java @@ -1,11 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "non-retriable-fn", name = "NonRetriable Function") -@FunctionEventTrigger(event = "test/non.retriable") public class NonRetriableErrorFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("non-retriable-fn") + .name("NonRetriable Function") + .triggerEvent("test/non.retriable"); + } + @Override public String execute(FunctionContext ctx, Step step) { step.run("fail-step", () -> { diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RetriableErrorFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RetriableErrorFunction.java index 3c5aa82f..8a6f06a2 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RetriableErrorFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RetriableErrorFunction.java @@ -1,10 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "retriable-fn", name = "Retriable Function") -@FunctionEventTrigger(event = "test/retriable") public class RetriableErrorFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("retriable-fn") + .name("Retriable Function") + .triggerEvent("test/retriable"); + } + static int retryCount = 0; @Override diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SendEventFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SendEventFunction.java index 3d9a166e..09a9d673 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SendEventFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SendEventFunction.java @@ -1,13 +1,21 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; import java.util.HashMap; -@FunctionConfig(id = "send-fn", name = "Send Function") -@FunctionEventTrigger(event = "test/send") public class SendEventFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("send-fn") + .name("Send Function") + .triggerEvent("test/send"); + } + @Override public SendEventsResponse execute(FunctionContext ctx, Step step) { return step.sendEvent("send-test", new InngestEvent( diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SleepStepFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SleepStepFunction.java index 3353344d..5dfdba5b 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SleepStepFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SleepStepFunction.java @@ -1,12 +1,21 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; import java.time.Duration; -@FunctionConfig(id = "sleep-fn", name = "Sleep Function") -@FunctionEventTrigger(event = "test/sleep") public class SleepStepFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("sleep-fn") + .name("Sleep Function") + .triggerEvent("test/sleep"); + } + @Override public Integer execute(FunctionContext ctx, Step step) { int result = step.run("num", () -> 42, Integer.class); diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TwoStepsFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TwoStepsFunction.java index 985cc2a8..eb018bd6 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TwoStepsFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TwoStepsFunction.java @@ -1,11 +1,20 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "two-steps-fn", name = "Two Steps Function") -@FunctionEventTrigger(event = "test/two.steps") public class TwoStepsFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("two-steps-fn") + .name("Two Steps Function") + .triggerEvent("test/two.steps"); + } + + private final int count = 0; @Override diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WaitForEventFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WaitForEventFunction.java index 5ddb89b5..725bfab2 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WaitForEventFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WaitForEventFunction.java @@ -1,11 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "wait-for-event-fn", name = "Wait for Event Function") -@FunctionEventTrigger(event = "test/wait-for-event") public class WaitForEventFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("wait-for-event-fn") + .name("Wait for Event Function") + .triggerEvent("test/wait-for-event"); + } + @Override public String execute(FunctionContext ctx, Step step) { Object event = step.waitForEvent("wait-test", diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index d07fb1e7..1fe845f8 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -117,6 +117,7 @@ class CommHandler( .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) .toJsonString(payload) } catch (e: Exception) { + // TODO - Properly log this serialization failure println(e); return """{ "message": "failed serialization" }""" } diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 2bca05c7..bc27f5f8 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -44,13 +44,23 @@ class InngestFunctionConfigBuilder() { /** * Define a function trigger for any matching events with a given name. - * Optionally filter matching events with an expression statement. * * @param event The name of the event to trigger on - * @param if A CEL expression to filter matching events to trigger on (optional). + */ + fun triggerEvent(event: String): InngestFunctionConfigBuilder { + this.triggers.add(InngestFunctionTriggers.Event(event, null)) + return this + } + + /** + * Define a function trigger for any matching events with a given name and filter + * matching events with an expression statement. + * + * @param event The name of the event to trigger on + * @param if A CEL expression to filter matching events to trigger on. * Example: "event.data.appId == '12345'" */ - fun triggerEvent(event: String, `if`: String? = null): InngestFunctionConfigBuilder { + fun triggerEventIf(event: String, `if`: String? = null): InngestFunctionConfigBuilder { this.triggers.add(InngestFunctionTriggers.Event(event, `if`)) return this } @@ -93,7 +103,6 @@ class InngestFunctionConfigBuilder() { } else { this.concurrency?.add(c) } - println(scope?.value) return this } @@ -123,8 +132,6 @@ class InngestFunctionConfigBuilder() { throw InngestInvalidConfigurationException("Function id must be configured via builder") } val globalId = String.format("%s-%s", appId, id) - println("concurrency") - println(concurrency) val config = InternalFunctionConfig( globalId, name, From e3abd6f883caf64edb713c96359a467af06d5c4f Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Mon, 22 Jul 2024 14:11:31 -0400 Subject: [PATCH 11/12] Fix linter warnings --- .../main/kotlin/com/inngest/testserver/App.kt | 8 +- .../inngest/testserver/FollowupFunction.kt | 14 -- .../com/inngest/testserver/ProcessAlbum.kt | 36 ++--- ...SignupFunction.kt => ProcessUserSignup.kt} | 21 ++- .../inngest/testserver/RestoreFromGlacier.kt | 28 ++-- .../kotlin/com/inngest/testserver/AppTest.kt | 14 -- inngest/src/main/kotlin/com/inngest/Comm.kt | 46 +++--- .../main/kotlin/com/inngest/Environment.kt | 9 +- inngest/src/main/kotlin/com/inngest/Errors.kt | 3 - inngest/src/main/kotlin/com/inngest/Event.kt | 8 +- .../src/main/kotlin/com/inngest/Function.kt | 112 +++++++------- .../src/main/kotlin/com/inngest/HttpClient.kt | 25 +++- .../src/main/kotlin/com/inngest/Inngest.kt | 2 +- .../src/main/kotlin/com/inngest/InngestEnv.kt | 13 +- .../kotlin/com/inngest/InngestFunction.kt | 14 +- .../inngest/InngestFunctionConfigBuilder.kt | 141 +++++++++++------- .../com/inngest/InngestFunctionTriggers.kt | 25 ++-- .../kotlin/com/inngest/InngestHeaderKey.kt | 5 +- .../kotlin/com/inngest/NonRetriableError.kt | 5 +- .../kotlin/com/inngest/RetryAfterError.kt | 7 +- .../main/kotlin/com/inngest/RetryDecision.kt | 5 +- .../main/kotlin/com/inngest/ServeConfig.kt | 7 +- inngest/src/main/kotlin/com/inngest/State.kt | 6 +- inngest/src/main/kotlin/com/inngest/Step.kt | 81 +++++++--- .../com/inngest/SupportedFrameworkName.kt | 4 +- .../src/main/kotlin/com/inngest/Version.kt | 2 +- .../src/main/kotlin/com/inngest/ktor/Route.kt | 18 ++- .../com/inngest/signingkey/BearerToken.kt | 4 +- .../signingkey/SignatureVerification.kt | 26 ++-- .../InngestFunctionConfigBuilderTest.kt | 11 +- 30 files changed, 369 insertions(+), 331 deletions(-) delete mode 100644 inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt rename inngest-test-server/src/main/kotlin/com/inngest/testserver/{UserSignupFunction.kt => ProcessUserSignup.kt} (71%) delete mode 100644 inngest-test-server/src/test/kotlin/com/inngest/testserver/AppTest.kt delete mode 100644 inngest/src/main/kotlin/com/inngest/Errors.kt diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt index ba8983ec..14323c7f 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt @@ -15,20 +15,18 @@ data class Result( const val FOLLOW_UP_EVENT_NAME = "user.signup.completed" -data class IngestData(val message: String) - fun Application.module() { val inngest = Inngest(appId = "ktor-dev") routing { - serve("/api/inngest", inngest, listOf(ProcessAlbum(), RestoreFromGlacier())) + serve("/api/inngest", inngest, listOf(ProcessAlbum(), RestoreFromGlacier(), ProcessUserSignup())) } } fun main() { - var port = 8080 + val port = 8080 - println("Test server running on port " + port) + println("Test server running on port $port") embeddedServer( Netty, diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt deleted file mode 100644 index e91b3905..00000000 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt +++ /dev/null @@ -1,14 +0,0 @@ -package com.inngest.testserver - -import com.inngest.* - - -//class FollowupFunction : InngestFunction() { -// override fun execute( -// ctx: FunctionContext, -// step: Step, -// ): LinkedHashMap { -// println("-> follow up handler called " + ctx.event.name) -// return ctx.event.data -// } -//} diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index 5a6807af..25274cf1 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -3,53 +3,39 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration - /** * A demo function that accepts an event in a batch and invokes a child function */ class ProcessAlbum : InngestFunction() { - - override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { - return builder + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder .id("ProcessAlbum") .name("Process Album!") .triggerEvent("delivery/process.requested") .trigger(InngestFunctionTriggers.Cron("5 0 * 8 *")) .batchEvents(30, Duration.ofSeconds(10)) - } override fun execute( ctx: FunctionContext, step: Step, ): LinkedHashMap { - // val list = ctx.events.map { e -> e.data.get("something") } // println(list); - for (evt in ctx.events) { // println(evt); // NOTE - App ID is set on the serve level - val res = step.invoke>( - "restore-album-${evt.data.get("albumId")}", - "ktor-dev", - "RestoreFromGlacier", - evt.data, - null, - ) + val res = + step.invoke>( + "restore-album-${evt.data["albumId"]}", + "ktor-dev", + "RestoreFromGlacier", + evt.data, + null, + ) + println(res["restored"]) } return linkedMapOf("hello" to true) } - - fun isRestoredFromGlacier(temp: Int): Boolean { - if (temp > 2) { - return true - } - return false; - } - - fun restoreFromGlacier(): String { - return "FILES_RESTORED" - } } diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessUserSignup.kt similarity index 71% rename from inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt rename to inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessUserSignup.kt index 9ac5a40e..46ea2eeb 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessUserSignup.kt @@ -4,11 +4,10 @@ import com.inngest.* import java.time.Duration class ProcessUserSignup : InngestFunction() { - - override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { - return builder.id("process-user-signup") + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("process-user-signup") .triggerEvent("user-signup") - } override fun execute( ctx: FunctionContext, @@ -19,31 +18,29 @@ class ProcessUserSignup : InngestFunction() { println("-> handler called " + ctx.event.name) val y = - step.run("add-ten") { -> - x + 10 - } + step.run("add-ten") { x + 10 } val res = - step.run("cast-to-type-add-ten") { -> - println("-> running step 1!! " + x) + step.run("cast-to-type-add-ten") { + println("-> running step 1!! $x") // throw Exception("An error!") Result( sum = y + 10, ) } - println("res" + res) + println("res $res") step.waitForEvent("wait-for-hello", "hello", "10m", "event.data.hello == async.data.hello") val add: Int = step.run("add-one-hundred") { - println("-> running step 2 :) " + res?.sum) + println("-> running step 2 :) " + res.sum) res.sum + 100 } step.sleep("wait-one-sec", Duration.ofSeconds(2)) - step.run("last-step") { res.sum.times(add) ?: 0 } + step.run("last-step") { res.sum.times(add) } step.sendEvent("followup-event-id", InngestEvent(FOLLOW_UP_EVENT_NAME, data = hashMapOf("hello" to "world"))) diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt index d4ae841f..7a6b2e1e 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt @@ -3,22 +3,18 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration -//@FunctionConfig(id = "RestoreFromGlacier", name = "RestoreFromGlacier") class RestoreFromGlacier : InngestFunction() { - - override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { - return builder + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder .id("RestoreFromGlacier") .name("Restore from Glacier") .trigger(InngestFunctionTriggers.Event("delivery/restore.requested")) .concurrency(10, null, ConcurrencyScope.ENVIRONMENT) - } override fun execute( ctx: FunctionContext, step: Step, ): LinkedHashMap { - step.run("restore") { if (!isRestoredFromGlacier(0)) { restoreFromGlacier() @@ -26,9 +22,10 @@ class RestoreFromGlacier : InngestFunction() { } var i = 0 while (i < 6) { - val isRestored = step.run(String.format("check-status-%d", i)) { - isRestoredFromGlacier(i) - } + val isRestored = + step.run(String.format("check-status-%d", i)) { + isRestoredFromGlacier(i) + } if (isRestored) { return linkedMapOf("restored" to true) } @@ -40,14 +37,9 @@ class RestoreFromGlacier : InngestFunction() { return linkedMapOf("restored" to false) } - fun isRestoredFromGlacier(temp: Int): Boolean { - if (temp > 2) { - return true - } - return false; - } + // NOTE - This method is only a stub meant to simulate that Glacier restoration will return false + // the first couple of times in the loop. This is just to show the concept. + private fun isRestoredFromGlacier(temp: Int): Boolean = temp > 2 - fun restoreFromGlacier(): String { - return "FILES_RESTORED" - } + private fun restoreFromGlacier(): String = "FILES_RESTORED" } diff --git a/inngest-test-server/src/test/kotlin/com/inngest/testserver/AppTest.kt b/inngest-test-server/src/test/kotlin/com/inngest/testserver/AppTest.kt deleted file mode 100644 index 3e698e47..00000000 --- a/inngest-test-server/src/test/kotlin/com/inngest/testserver/AppTest.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * This Kotlin source file was generated by the Gradle 'init' task. - */ -package com.inngest.testserver - -// import kotlin.test.Test -// import kotlin.test.assertNotNull - -// class AppTest { -// @Test fun appHasAGreeting() { -// val classUnderTest = App() -// assertNotNull(classUnderTest.greeting, "app should have a greeting") -// } -// } diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index 1fe845f8..6a379a46 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -20,19 +20,21 @@ data class ExecutionContext( val env: String, ) -internal data class RegistrationRequestPayload @JvmOverloads constructor( - val appName: String, - val deployType: String = "ping", - val framework: String, - val functions: List = listOf(), - val sdk: String, - val url: String, - val v: String, -) - -enum class InngestSyncResult { - None, -} +internal data class RegistrationRequestPayload + @JvmOverloads + constructor( + val appName: String, + val deployType: String = "ping", + val framework: String, + val functions: List = listOf(), + val sdk: String, + val url: String, + val v: String, + ) + +// enum class InngestSyncResult { +// None, +// } data class CommResponse( val body: String, @@ -44,6 +46,8 @@ data class CommError( val name: String, val message: String?, val stack: String?, + // TODO - Convert to camelCase and use Klaxon property renaming for parsing/serialization + @Suppress("PropertyName") val __serialized: Boolean = true, ) @@ -54,7 +58,7 @@ class CommHandler( private val framework: SupportedFrameworkName, ) { val headers = Environment.inngestHeaders(framework).plus(client.headers) - internal val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() } + private val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() } fun callFunction( functionId: String, @@ -118,13 +122,11 @@ class CommHandler( .toJsonString(payload) } catch (e: Exception) { // TODO - Properly log this serialization failure - println(e); + println(e) return """{ "message": "failed serialization" }""" } - } - private fun getFunctionConfigs(origin: String): List { val configs: MutableList = mutableListOf() functions.forEach { entry -> configs.add(entry.value.getFunctionConfig(getServeUrl(origin), client)) } @@ -156,17 +158,16 @@ class CommHandler( return parseRequestBody(body) } - fun sync(): Result { - return Result.success(InngestSyncResult.None) - } + // TODO +// fun sync(): Result = Result.success(InngestSyncResult.None) fun introspect(origin: String): String { val requestPayload = getRegistrationRequestPayload(origin) return serializePayload(requestPayload) } - private fun getRegistrationRequestPayload(origin: String): RegistrationRequestPayload { - return RegistrationRequestPayload( + private fun getRegistrationRequestPayload(origin: String): RegistrationRequestPayload = + RegistrationRequestPayload( appName = config.appId(), framework = framework.toString(), sdk = "inngest-kt", @@ -174,7 +175,6 @@ class CommHandler( v = Version.getVersion(), functions = getFunctionConfigs(origin), ) - } private fun getServeUrl(origin: String): String { // TODO - property from SpringBoot should take preference to env variable? diff --git a/inngest/src/main/kotlin/com/inngest/Environment.kt b/inngest/src/main/kotlin/com/inngest/Environment.kt index b6d68d6e..542e7974 100644 --- a/inngest/src/main/kotlin/com/inngest/Environment.kt +++ b/inngest/src/main/kotlin/com/inngest/Environment.kt @@ -50,7 +50,7 @@ object Environment { "0" -> InngestEnv.Prod "1" -> InngestEnv.Dev else -> { - var other = InngestEnv.Other + val other = InngestEnv.Other other.value = sysDev other } @@ -64,7 +64,7 @@ object Environment { "prod" -> InngestEnv.Prod "production" -> InngestEnv.Prod else -> { - var other = InngestEnv.Other + val other = InngestEnv.Other other.value = env other } @@ -72,8 +72,7 @@ object Environment { } // Read from environment variable - val inngestEnv = System.getenv(InngestSystem.Env.value) - return when (inngestEnv) { + return when (val inngestEnv = System.getenv(InngestSystem.Env.value)) { null -> InngestEnv.Dev "dev" -> InngestEnv.Dev "development" -> InngestEnv.Dev @@ -81,7 +80,7 @@ object Environment { "production" -> InngestEnv.Prod else -> { - var other = InngestEnv.Other + val other = InngestEnv.Other other.value = inngestEnv other } diff --git a/inngest/src/main/kotlin/com/inngest/Errors.kt b/inngest/src/main/kotlin/com/inngest/Errors.kt deleted file mode 100644 index 35aa12e2..00000000 --- a/inngest/src/main/kotlin/com/inngest/Errors.kt +++ /dev/null @@ -1,3 +0,0 @@ -package com.inngest - -class InngestInvalidConfigurationException(message: String) : Exception(message) diff --git a/inngest/src/main/kotlin/com/inngest/Event.kt b/inngest/src/main/kotlin/com/inngest/Event.kt index 06e5ba0c..5a98242f 100644 --- a/inngest/src/main/kotlin/com/inngest/Event.kt +++ b/inngest/src/main/kotlin/com/inngest/Event.kt @@ -9,7 +9,7 @@ data class Event( val v: Any? = null, ) -data class EventAPIResponse( - val ids: Array, - val status: String, -) +// data class EventAPIResponse( +// val ids: Array, +// val status: String, +// ) diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index f5e29a18..a8c5e995 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -3,21 +3,6 @@ package com.inngest import com.beust.klaxon.Json import java.util.function.BiFunction -// IDEA: Use data classes -internal data class InternalFunctionOptions( - val id: String, - val config: Map, - val triggers: Array, -) - -internal open class InternalFunctionTrigger -@JvmOverloads -constructor( - @Json(serializeNull = false) val event: String? = null, - @Json(serializeNull = false) val `if`: String? = null, - @Json(serializeNull = false) val cron: String? = null, -) - // TODO - Add an abstraction layer between the Function call response and the comm handler response enum class OpCode { StepRun, @@ -28,10 +13,13 @@ enum class OpCode { InvokeFunction, // FUTURE: - StepNotFound, +// StepNotFound, } -enum class ResultStatusCode(val code: Int, val message: String) { +enum class ResultStatusCode( + val code: Int, + val message: String, +) { StepComplete(206, "Step Complete"), FunctionComplete(200, "Function Complete"), NonRetriableError(400, "Bad Request"), @@ -77,20 +65,22 @@ data class StepConfig( val runtime: HashMap = hashMapOf("type" to "http"), ) -internal class InternalFunctionConfig @JvmOverloads constructor( - val id: String, - val name: String?, - val triggers: MutableList, - @Json(serializeNull = false) - val concurrency: MutableList?, - @Json(serializeNull = false) - val batchEvents: BatchEvents?, - val steps: Map, -) { -} +@Suppress("unused") +internal class InternalFunctionConfig + @JvmOverloads + constructor( + val id: String, + val name: String?, + val triggers: MutableList, + @Json(serializeNull = false) + val concurrency: MutableList? = null, + @Json(serializeNull = false) + val batchEvents: BatchEvents? = null, + val steps: Map, + ) // NOTE - This should probably be called serialized or formatted config // as it's only used to format the config for register requests -//typealias InternalFunctionConfig = Map +// typealias InternalFunctionConfig = Map /** * The context for the current function run @@ -105,23 +95,29 @@ data class FunctionContext( val attempt: Int, ) -// TODO - Determine if we should merge config + trigger +data class SendEventPayload( + // TODO - Change this to camelCase and add Klaxon annotation to parse underscore name via API + @Suppress("PropertyName") + val event_ids: Array, +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false -/** - * A function that can be called by the Inngest system - * - * @param config The options for the function - * @param handler The function to be called when the function is triggered - */ + other as SendEventPayload + + return event_ids.contentEquals(other.event_ids) + } -data class SendEventPayload(val event_ids: Array) + override fun hashCode(): Int = event_ids.contentHashCode() +} internal interface Function { fun id(): String } - // TODO: make this implement the Function interface + /** * An internal class that accepts the configuration and a function handler * and handles the execution and memoization of an InngestFunction @@ -130,7 +126,11 @@ internal open class InternalInngestFunction( private val configBuilder: InngestFunctionConfigBuilder, val handler: (ctx: FunctionContext, step: Step) -> Any?, ) { - constructor(configBuilder: InngestFunctionConfigBuilder, handler: BiFunction) : this( + @Suppress("unused") + constructor( + configBuilder: InngestFunctionConfigBuilder, + handler: BiFunction, + ) : this( configBuilder, handler.toKotlin(), ) @@ -169,13 +169,13 @@ internal open class InternalInngestFunction( op = OpCode.WaitForEvent, statusCode = ResultStatusCode.StepComplete, opts = - buildMap { - put("event", e.waitEvent) - put("timeout", e.timeout) - if (e.ifExpression != null) { - put("if", e.ifExpression) - } - }, + buildMap { + put("event", e.waitEvent) + put("timeout", e.timeout) + if (e.ifExpression != null) { + put("if", e.ifExpression) + } + }, ) } catch (e: StepInterruptSleepException) { return StepOptions( @@ -192,13 +192,14 @@ internal open class InternalInngestFunction( name = e.id, op = OpCode.InvokeFunction, statusCode = ResultStatusCode.StepComplete, - opts = buildMap { - put("function_id", functionId) - put("payload", mapOf("data" to e.data)) - if (e.timeout != null) { - put("timeout", e.timeout) - } - } + opts = + buildMap { + put("function_id", functionId) + put("payload", mapOf("data" to e.data)) + if (e.timeout != null) { + put("timeout", e.timeout) + } + }, ) } catch (e: StepInterruptException) { // NOTE - Currently this error could be caught in the user's own function @@ -223,7 +224,10 @@ internal open class InternalInngestFunction( } } - fun getFunctionConfig(serveUrl: String, client: Inngest): InternalFunctionConfig { + fun getFunctionConfig( + serveUrl: String, + client: Inngest, + ): InternalFunctionConfig { // TODO use URL objects for serveUrl instead of strings so we can fetch things like scheme return configBuilder.build(client.appId, serveUrl) } diff --git a/inngest/src/main/kotlin/com/inngest/HttpClient.kt b/inngest/src/main/kotlin/com/inngest/HttpClient.kt index 305210b1..37f93e77 100644 --- a/inngest/src/main/kotlin/com/inngest/HttpClient.kt +++ b/inngest/src/main/kotlin/com/inngest/HttpClient.kt @@ -9,33 +9,42 @@ import okhttp3.Response typealias RequestHeaders = Map -data class RequestConfig(val headers: RequestHeaders? = null) +data class RequestConfig( + val headers: RequestHeaders? = null, +) val jsonMediaType = "application/json".toMediaType() -internal class HttpClient(private val clientConfig: RequestConfig) { +internal class HttpClient( + private val clientConfig: RequestConfig, +) { private val client = OkHttpClient() fun send( request: okhttp3.Request, handler: (Response) -> T, - ) = this.client.newCall(request).execute().use(handler) + ) = this.client + .newCall(request) + .execute() + .use(handler) fun build( url: String, payload: Any, config: RequestConfig? = null, ): okhttp3.Request { - val jsonRequestBody = Klaxon() - .fieldConverter(KlaxonDuration::class, durationConverter) - .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) - .toJsonString(payload) + val jsonRequestBody = + Klaxon() + .fieldConverter(KlaxonDuration::class, durationConverter) + .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) + .toJsonString(payload) val body = jsonRequestBody.toRequestBody(jsonMediaType) val clientHeaders = clientConfig.headers ?: emptyMap() val requestHeaders = config?.headers ?: emptyMap() - return okhttp3.Request.Builder() + return okhttp3.Request + .Builder() .url(url) .post(body) .headers(toOkHttpHeaders(clientHeaders + requestHeaders)) diff --git a/inngest/src/main/kotlin/com/inngest/Inngest.kt b/inngest/src/main/kotlin/com/inngest/Inngest.kt index 35a4bf59..0659dc45 100644 --- a/inngest/src/main/kotlin/com/inngest/Inngest.kt +++ b/inngest/src/main/kotlin/com/inngest/Inngest.kt @@ -17,7 +17,7 @@ class Inngest val headers: RequestHeaders = Environment.inngestHeaders() val env = Environment.inngestEnv(env = env, isDev = isDev) val eventKey = Environment.inngestEventKey(eventKey) - val baseUrl = Environment.inngestEventApiBaseUrl(env = this.env, url = baseUrl) + private val baseUrl = Environment.inngestEventApiBaseUrl(env = this.env, url = baseUrl) internal val httpClient = HttpClient(RequestConfig(headers)) diff --git a/inngest/src/main/kotlin/com/inngest/InngestEnv.kt b/inngest/src/main/kotlin/com/inngest/InngestEnv.kt index 21e6811e..cb41750a 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestEnv.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestEnv.kt @@ -1,6 +1,8 @@ package com.inngest -enum class InngestSystem(val value: String) { +enum class InngestSystem( + val value: String, +) { // Critical variables EventKey("INNGEST_EVENT_KEY"), SigningKey("INNGEST_SIGNING_KEY"), @@ -10,14 +12,17 @@ enum class InngestSystem(val value: String) { EventApiBaseUrl("INNGEST_BASE_URL"), ApiBaseUrl("INNGEST_API_BASE_URL"), LogLevel("INNGEST_LOG_LEVEL"), - ApiOrigin("INNGEST_API_ORIGIN"), + + // TODO - Rename this env variable to match other SDKS +// ApiOrigin("INNGEST_API_ORIGIN"), ServeOrigin("INNGEST_SERVE_ORIGIN"), ServePath("INNGEST_SERVE_PATH"), - Streaming("INNGEST_STREAMING"), Dev("INNGEST_DEV"), } -enum class InngestEnv(var value: String) { +enum class InngestEnv( + var value: String, +) { Dev("dev"), Prod("prod"), Other("other"), diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index 8a084673..bcbc24c6 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -1,10 +1,7 @@ package com.inngest abstract class InngestFunction { - - open fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder { - return builder - } + open fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = builder /** * The function handler that will be run whenever the function is executed. @@ -17,9 +14,9 @@ abstract class InngestFunction { step: Step, ): Any? - fun buildConfig(): InngestFunctionConfigBuilder { + private fun buildConfig(): InngestFunctionConfigBuilder { val builder = InngestFunctionConfigBuilder() - return this.config(builder); + return this.config(builder) } fun id(): String { @@ -27,7 +24,7 @@ abstract class InngestFunction { return buildConfig().id!! } catch (e: Exception) { throw InngestInvalidConfigurationException( - "Function id must be configured via builder: ${this.javaClass.name}" + "Function id must be configured via builder: ${this.javaClass.name}", ) } } @@ -40,6 +37,3 @@ abstract class InngestFunction { // TODO - Add toFailureHandler method to generate a second function if configured } - - - diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index bc27f5f8..3832570b 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -7,18 +7,18 @@ import com.beust.klaxon.KlaxonException import java.time.Duration // TODO: Throw illegal argument exception -class InngestFunctionConfigBuilder() { - var id: String? = null; - private var name: String? = null; - private var triggers: MutableList = mutableListOf(); - private var concurrency: MutableList? = null; - private var batchEvents: BatchEvents? = null; +class InngestFunctionConfigBuilder { + var id: String? = null + private var name: String? = null + private var triggers: MutableList = mutableListOf() + private var concurrency: MutableList? = null + private var batchEvents: BatchEvents? = null /** * @param id A unique identifier for the function that should not change over time */ fun id(id: String): InngestFunctionConfigBuilder { - this.id = id; + this.id = id return this } @@ -60,7 +60,11 @@ class InngestFunctionConfigBuilder() { * @param if A CEL expression to filter matching events to trigger on. * Example: "event.data.appId == '12345'" */ - fun triggerEventIf(event: String, `if`: String? = null): InngestFunctionConfigBuilder { + @Suppress("unused") + fun triggerEventIf( + event: String, + `if`: String? = null, + ): InngestFunctionConfigBuilder { this.triggers.add(InngestFunctionTriggers.Event(event, `if`)) return this } @@ -68,6 +72,7 @@ class InngestFunctionConfigBuilder() { /** * @param cron A crontab expression */ + @Suppress("unused") fun triggerCron(cron: String): InngestFunctionConfigBuilder { this.triggers.add(InngestFunctionTriggers.Cron(cron)) return this @@ -83,9 +88,13 @@ class InngestFunctionConfigBuilder() { * @param timeout The maximum duration of time to wait before executing the function * @param key A CEL expression to group events batches by. Example: "event.data.destinationId" */ - fun batchEvents(maxSize: Int, timeout: Duration, key: String? = null): InngestFunctionConfigBuilder { + fun batchEvents( + maxSize: Int, + timeout: Duration, + key: String? = null, + ): InngestFunctionConfigBuilder { this.batchEvents = BatchEvents(maxSize, timeout, key) - return this; + return this } /** @@ -95,7 +104,11 @@ class InngestFunctionConfigBuilder() { * @param key A CEL expression to apply limit using event payload properties. Example: "event.data.destinationId" * @param scope The scope to apply the limit to. Options */ - fun concurrency(limit: Int, key: String? = null, scope: ConcurrencyScope? = null): InngestFunctionConfigBuilder { + fun concurrency( + limit: Int, + key: String? = null, + scope: ConcurrencyScope? = null, + ): InngestFunctionConfigBuilder { val c = Concurrency(limit, key, scope) // TODO - Limit concurrency length to 2 if (this.concurrency == null) { @@ -114,81 +127,95 @@ class InngestFunctionConfigBuilder() { id = "step", name = "step", retries = - mapOf( - // TODO - Pull from conf option - "attempts" to 3, - ), + mapOf( + // TODO - Pull from conf option + "attempts" to 3, + ), runtime = - hashMapOf( - "type" to scheme, - "url" to "$serveUrl?fnId=${id}&stepId=step", - ), + hashMapOf( + "type" to scheme, + "url" to "$serveUrl?fnId=$id&stepId=step", + ), ), ) } - internal fun build(appId: String, serverUrl: String): InternalFunctionConfig { + internal fun build( + appId: String, + serverUrl: String, + ): InternalFunctionConfig { if (id == null) { throw InngestInvalidConfigurationException("Function id must be configured via builder") } val globalId = String.format("%s-%s", appId, id) - val config = InternalFunctionConfig( - globalId, - name, - triggers, - concurrency, - batchEvents, - steps = buildSteps(serverUrl) - ) + val config = + InternalFunctionConfig( + globalId, + name, + triggers, + concurrency, + batchEvents, + steps = buildSteps(serverUrl), + ) return config } } +class InngestInvalidConfigurationException( + message: String, +) : Exception(message) + @Target(AnnotationTarget.FIELD) annotation class KlaxonDuration -val durationConverter = object : Converter { - override fun canConvert(cls: Class<*>): Boolean = cls == Duration::class.java; +val durationConverter = + object : Converter { + override fun canConvert(cls: Class<*>): Boolean = cls == Duration::class.java - // TODO Implement this - parse 30s into duration of seconds - override fun fromJson(jv: JsonValue): Duration = - throw KlaxonException("Duration parse not implemented: ${jv.string}") + // TODO Implement this - parse 30s into duration of seconds + override fun fromJson(jv: JsonValue): Duration = + throw KlaxonException("Duration parse not implemented: ${jv.string}") - override fun toJson(value: Any): String = """"${(value as Duration).seconds}s"""" -} + override fun toJson(value: Any): String = """"${(value as Duration).seconds}s"""" + } @Target(AnnotationTarget.FIELD) annotation class KlaxonConcurrencyScope -val concurrencyScopeConverter = object : Converter { - override fun canConvert(cls: Class<*>): Boolean = cls.isEnum - override fun fromJson(jv: JsonValue): ConcurrencyScope = enumValueOf(jv.string!!) - override fun toJson(value: Any): String = """"${(value as ConcurrencyScope).value}"""" -} +val concurrencyScopeConverter = + object : Converter { + override fun canConvert(cls: Class<*>): Boolean = cls.isEnum + + override fun fromJson(jv: JsonValue): ConcurrencyScope = enumValueOf(jv.string!!) + + override fun toJson(value: Any): String = """"${(value as ConcurrencyScope).value}"""" + } // TODO - Convert enum element to value, not name -enum class ConcurrencyScope(val value: String) { +enum class ConcurrencyScope( + val value: String, +) { ACCOUNT("account"), ENVIRONMENT("env"), FUNCTION("fn"), } internal data class Concurrency -@JvmOverloads -constructor( - val limit: Int, - @Json(serializeNull = false) - val key: String? = null, - @Json(serializeNull = false) - @KlaxonConcurrencyScope - val scope: ConcurrencyScope? = null, -) + @JvmOverloads + constructor( + val limit: Int, + @Json(serializeNull = false) + val key: String? = null, + @Json(serializeNull = false) + @KlaxonConcurrencyScope + val scope: ConcurrencyScope? = null, + ) internal data class BatchEvents -@JvmOverloads -constructor( - val maxSize: Int, - @KlaxonDuration - val timeout: Duration, - @Json(serializeNull = false) val key: String? = null -) + @JvmOverloads + constructor( + val maxSize: Int, + @KlaxonDuration + val timeout: Duration, + @Json(serializeNull = false) val key: String? = null, + ) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt index a2330127..665c5f19 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt @@ -6,19 +6,18 @@ import com.beust.klaxon.Json * A generic class for defining and serializing function triggers */ abstract class InngestFunctionTrigger // or interface or data class -@JvmOverloads -constructor( - @Json(serializeNull = false) val event: String? = null, - @Json(serializeNull = false) val `if`: String? = null, - @Json(serializeNull = false) val cron: String? = null, - // IDEA - Add timeout and re-use for cancelOn? -) + @JvmOverloads + constructor( + @Json(serializeNull = false) val event: String? = null, + @Json(serializeNull = false) val `if`: String? = null, + @Json(serializeNull = false) val cron: String? = null, + // IDEA - Add timeout and re-use for cancelOn? + ) /** * A class that contains nested classes to define function triggers */ class InngestFunctionTriggers { - /** * Define a function trigger for any matching events with a given name. * Optionally filter matching events with an expression statement. @@ -27,13 +26,17 @@ class InngestFunctionTriggers { * @param if A CEL expression to filter matching events to trigger on (optional). * Example: "event.data.appId == '12345'" */ - class Event(event: String, `if`: String? = null) : InngestFunctionTrigger(event, `if`, null) {} + class Event( + event: String, + `if`: String? = null, + ) : InngestFunctionTrigger(event, `if`, null) /** * Define a function trigger that will execute on a given crontab schedule. * * @param cron A crontab expression. Example: "0 9 * * 1" */ - class Cron(cron: String) : InngestFunctionTrigger(null, null, cron) {} - + class Cron( + cron: String, + ) : InngestFunctionTrigger(null, null, cron) } diff --git a/inngest/src/main/kotlin/com/inngest/InngestHeaderKey.kt b/inngest/src/main/kotlin/com/inngest/InngestHeaderKey.kt index b4c2ed57..e9277ed6 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestHeaderKey.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestHeaderKey.kt @@ -1,6 +1,9 @@ package com.inngest -enum class InngestHeaderKey(val value: String) { +@Suppress("unused") +enum class InngestHeaderKey( + val value: String, +) { ContentType("content-type"), UserAgent("user-agent"), Sdk("x-inngest-sdk"), diff --git a/inngest/src/main/kotlin/com/inngest/NonRetriableError.kt b/inngest/src/main/kotlin/com/inngest/NonRetriableError.kt index 645a91f5..a59572fe 100644 --- a/inngest/src/main/kotlin/com/inngest/NonRetriableError.kt +++ b/inngest/src/main/kotlin/com/inngest/NonRetriableError.kt @@ -2,4 +2,7 @@ package com.inngest open class NonRetriableError @JvmOverloads - constructor(message: String, cause: Throwable? = null) : RuntimeException(message, cause) + constructor( + message: String, + cause: Throwable? = null, + ) : RuntimeException(message, cause) diff --git a/inngest/src/main/kotlin/com/inngest/RetryAfterError.kt b/inngest/src/main/kotlin/com/inngest/RetryAfterError.kt index 818d2b14..db7c04d5 100644 --- a/inngest/src/main/kotlin/com/inngest/RetryAfterError.kt +++ b/inngest/src/main/kotlin/com/inngest/RetryAfterError.kt @@ -5,8 +5,11 @@ import java.time.format.DateTimeFormatter open class RetryAfterError @JvmOverloads - constructor(message: String, retryAfter: Any, cause: Throwable? = null) : - RuntimeException(message, cause) { + constructor( + message: String, + retryAfter: Any, + cause: Throwable? = null, + ) : RuntimeException(message, cause) { var retryAfter: String = when (retryAfter) { is ZonedDateTime -> retryAfter.format(DateTimeFormatter.ISO_INSTANT) diff --git a/inngest/src/main/kotlin/com/inngest/RetryDecision.kt b/inngest/src/main/kotlin/com/inngest/RetryDecision.kt index 4c73a2b7..4a91323f 100644 --- a/inngest/src/main/kotlin/com/inngest/RetryDecision.kt +++ b/inngest/src/main/kotlin/com/inngest/RetryDecision.kt @@ -1,6 +1,9 @@ package com.inngest -internal data class RetryDecision(val shouldRetry: Boolean, val headers: Map) { +internal data class RetryDecision( + val shouldRetry: Boolean, + val headers: Map, +) { companion object { internal fun fromException(exception: Exception): RetryDecision = when (exception) { diff --git a/inngest/src/main/kotlin/com/inngest/ServeConfig.kt b/inngest/src/main/kotlin/com/inngest/ServeConfig.kt index 8bb9e63f..a9a0352e 100644 --- a/inngest/src/main/kotlin/com/inngest/ServeConfig.kt +++ b/inngest/src/main/kotlin/com/inngest/ServeConfig.kt @@ -23,10 +23,9 @@ class ServeConfig return when (client.env) { InngestEnv.Dev -> "test" else -> { - val signingKey = System.getenv(InngestSystem.SigningKey.value) - if (signingKey == null) { - throw Exception("signing key is required") - } + val signingKey = + System.getenv(InngestSystem.SigningKey.value) + ?: throw Exception("signing key is required") signingKey } } diff --git a/inngest/src/main/kotlin/com/inngest/State.kt b/inngest/src/main/kotlin/com/inngest/State.kt index e0366a14..616bb375 100644 --- a/inngest/src/main/kotlin/com/inngest/State.kt +++ b/inngest/src/main/kotlin/com/inngest/State.kt @@ -4,9 +4,11 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import java.security.MessageDigest -class StateNotFound() : Throwable("State not found for id") +class StateNotFound : Throwable("State not found for id") -class State(val payloadJson: String) { +class State( + private val payloadJson: String, +) { fun getHashFromId(id: String): String { val bytes = id.toByteArray(Charsets.UTF_8) val digest = MessageDigest.getInstance("SHA-1") diff --git a/inngest/src/main/kotlin/com/inngest/Step.kt b/inngest/src/main/kotlin/com/inngest/Step.kt index 79d07b2e..be091708 100644 --- a/inngest/src/main/kotlin/com/inngest/Step.kt +++ b/inngest/src/main/kotlin/com/inngest/Step.kt @@ -5,26 +5,63 @@ import java.time.Duration typealias MemoizedRecord = HashMap typealias MemoizedState = HashMap -data class InngestEvent(val name: String, val data: Any) +data class InngestEvent( + val name: String, + val data: Any, +) -data class SendEventsResponse(val ids: Array) +data class SendEventsResponse( + val ids: Array, +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false -class StepInvalidStateTypeException(val id: String, val hashedId: String) : Throwable("Step execution interrupted") + other as SendEventsResponse -class StepStateTypeMismatchException(val id: String, val hashedId: String) : Throwable("Step execution interrupted") + return ids.contentEquals(other.ids) + } -open class StepInterruptException(val id: String, val hashedId: String, open val data: kotlin.Any?) : - Throwable("Interrupt $id") + override fun hashCode(): Int = ids.contentHashCode() +} -class StepInterruptSleepException(id: String, hashedId: String, override val data: String) : - StepInterruptException(id, hashedId, data) +class StepInvalidStateTypeException( + val id: String, + val hashedId: String, +) : Throwable("Step execution interrupted") -class StepInterruptSendEventException(id: String, hashedId: String, val eventIds: Array) : - StepInterruptException(id, hashedId, eventIds) +// TODO - Add State type mismatch checks +// class StepStateTypeMismatchException( +// val id: String, +// val hashedId: String, +// ) : Throwable("Step execution interrupted") +open class StepInterruptException( + val id: String, + val hashedId: String, + open val data: Any?, +) : Throwable("Interrupt $id") -class StepInterruptInvokeException(id: String, hashedId: String, val appId: String, val fnId: String, data: kotlin.Any?, val timeout: String?) : - StepInterruptException(id, hashedId, data) +class StepInterruptSleepException( + id: String, + hashedId: String, + override val data: String, +) : StepInterruptException(id, hashedId, data) + +class StepInterruptSendEventException( + id: String, + hashedId: String, + val eventIds: Array, +) : StepInterruptException(id, hashedId, eventIds) + +class StepInterruptInvokeException( + id: String, + hashedId: String, + val appId: String, + val fnId: String, + data: Any?, + val timeout: String?, +) : StepInterruptException(id, hashedId, data) class StepInterruptWaitForEventException( id: String, @@ -32,13 +69,12 @@ class StepInterruptWaitForEventException( val waitEvent: String, val timeout: String, val ifExpression: String?, -) : - StepInterruptException(id, hashedId, null) - -// TODO: Add name, stack, etc. if poss -class StepError(message: String) : Exception(message) +) : StepInterruptException(id, hashedId, null) -class Step(val state: State, val client: Inngest) { +class Step( + private val state: State, + val client: Inngest, +) { /** * Run a function * @@ -77,7 +113,8 @@ class Step(val state: State, val client: Inngest) { * Invoke another Inngest function as a step * * @param id unique step id for memoization - * @param fn ID of the function to invoke + * @param appId ID of the Inngest app which contains the function to invoke (see client) + * @param fnId ID of the function to invoke * @param data the data to pass within `event.data` to the function * @param timeout an optional timeout for the invoked function. If the invoked function does * not finish within this time, the invoked function will be marked as failed. @@ -86,7 +123,7 @@ class Step(val state: State, val client: Inngest) { id: String, appId: String, fnId: String, - data: kotlin.Any?, + data: Any?, timeout: String?, ): T = invoke(id, appId, fnId, data, timeout, T::class.java) @@ -94,7 +131,7 @@ class Step(val state: State, val client: Inngest) { id: String, appId: String, fnId: String, - data: kotlin.Any?, + data: Any?, timeout: String?, type: Class, ): T { @@ -131,7 +168,7 @@ class Step(val state: State, val client: Inngest) { } return } catch (e: StateNotFound) { - val durationInSeconds = duration.getSeconds() + val durationInSeconds = duration.seconds throw StepInterruptSleepException(id, hashedId, "${durationInSeconds}s") } } diff --git a/inngest/src/main/kotlin/com/inngest/SupportedFrameworkName.kt b/inngest/src/main/kotlin/com/inngest/SupportedFrameworkName.kt index f1c3c02b..e8ba8fb1 100644 --- a/inngest/src/main/kotlin/com/inngest/SupportedFrameworkName.kt +++ b/inngest/src/main/kotlin/com/inngest/SupportedFrameworkName.kt @@ -1,6 +1,8 @@ package com.inngest -enum class SupportedFrameworkName(val value: String) { +enum class SupportedFrameworkName( + val value: String, +) { SpringBoot("springboot"), Ktor("ktor"), } diff --git a/inngest/src/main/kotlin/com/inngest/Version.kt b/inngest/src/main/kotlin/com/inngest/Version.kt index 5e791b6e..8bdc7c3d 100644 --- a/inngest/src/main/kotlin/com/inngest/Version.kt +++ b/inngest/src/main/kotlin/com/inngest/Version.kt @@ -1,6 +1,6 @@ package com.inngest -class Version() { +class Version { companion object { private val version: String? = Version::class.java.getPackage().implementationVersion diff --git a/inngest/src/main/kotlin/com/inngest/ktor/Route.kt b/inngest/src/main/kotlin/com/inngest/ktor/Route.kt index bfa7ecd6..d4a4084e 100644 --- a/inngest/src/main/kotlin/com/inngest/ktor/Route.kt +++ b/inngest/src/main/kotlin/com/inngest/ktor/Route.kt @@ -43,10 +43,7 @@ fun Route.serve( route(path) { get("") { - var origin = String.format("%s://%s", call.request.origin.scheme, call.request.origin.serverHost) - if (call.request.origin.serverPort != 80 || call.request.origin.serverPort != 443) { - origin = String.format("%s:%s", origin, call.request.origin.serverPort) - } + val origin = getOrigin(call) val resp = comm.introspect(origin) call.respond(HttpStatusCode.OK, resp) } @@ -77,12 +74,17 @@ fun Route.serve( } put("") { - var origin = String.format("%s://%s", call.request.origin.scheme, call.request.origin.serverHost) - if (call.request.origin.serverPort != 80 || call.request.origin.serverPort != 443) { - origin = String.format("%s:%s", origin, call.request.origin.serverPort) - } + val origin = getOrigin(call) val resp = comm.register(origin) call.respond(HttpStatusCode.OK, resp) } } } + +fun getOrigin(call: ApplicationCall): String { + var origin = String.format("%s://%s", call.request.origin.scheme, call.request.origin.serverHost) + if (call.request.origin.serverPort != 80 || call.request.origin.serverPort != 443) { + origin = String.format("%s:%s", origin, call.request.origin.serverPort) + } + return origin +} diff --git a/inngest/src/main/kotlin/com/inngest/signingkey/BearerToken.kt b/inngest/src/main/kotlin/com/inngest/signingkey/BearerToken.kt index 1cc0a6cb..b0d162e5 100644 --- a/inngest/src/main/kotlin/com/inngest/signingkey/BearerToken.kt +++ b/inngest/src/main/kotlin/com/inngest/signingkey/BearerToken.kt @@ -4,7 +4,7 @@ import com.inngest.RequestHeaders import java.lang.NumberFormatException import java.security.MessageDigest -val SIGNING_KEY_REGEX = Regex("""(?^signkey-[\w]+-)(?.*)""") +val SIGNING_KEY_REGEX = Regex("""(?^signkey-\w+-)(?.*)""") /** * Takes a signing key in the form "signkey--" and returns "signkey--" @@ -18,7 +18,7 @@ val SIGNING_KEY_REGEX = Regex("""(?^signkey-[\w]+-)(?.*)""") private fun hashedSigningKey(signingKey: String): String { val matchResult = SIGNING_KEY_REGEX.matchEntire(signingKey) ?: throw InvalidSigningKeyException() - // We aggressively assert non null here because if `matchEntire` had failed (and thus these capture groups didn't + // We aggressively assert non-null here because if `matchEntire` had failed (and thus these capture groups didn't // exist), we would have already thrown an exception val prefix = matchResult.groups["prefix"]!!.value val key = matchResult.groups["key"]!!.value diff --git a/inngest/src/main/kotlin/com/inngest/signingkey/SignatureVerification.kt b/inngest/src/main/kotlin/com/inngest/signingkey/SignatureVerification.kt index 60299d01..1a67fc00 100644 --- a/inngest/src/main/kotlin/com/inngest/signingkey/SignatureVerification.kt +++ b/inngest/src/main/kotlin/com/inngest/signingkey/SignatureVerification.kt @@ -12,12 +12,11 @@ const val HMAC_SHA256 = "HmacSHA256" // Implementation of this is inspired by the pure Java example from https://www.baeldung.com/java-hmac#hmac-using-jdk-apis @OptIn(ExperimentalStdlibApi::class) private fun computeHMAC( - algorithm: String, data: String, key: String, ): String { - val secretKeySpec = SecretKeySpec(key.toByteArray(Charsets.UTF_8), algorithm) - val mac = Mac.getInstance(algorithm) + val secretKeySpec = SecretKeySpec(key.toByteArray(Charsets.UTF_8), HMAC_SHA256) + val mac = Mac.getInstance(HMAC_SHA256) mac.init(secretKeySpec) return mac.doFinal(data.toByteArray(Charsets.UTF_8)).toHexString() } @@ -26,9 +25,7 @@ internal fun signRequest( requestBody: String, timestamp: Long, signingKey: String, -): String { - return signRequest(requestBody, timestamp.toString(), signingKey) -} +): String = signRequest(requestBody, timestamp.toString(), signingKey) private fun signRequest( requestBody: String, @@ -39,10 +36,12 @@ private fun signRequest( val key = matchResult.groups["key"]!!.value val message = requestBody + timestamp - return computeHMAC(HMAC_SHA256, message, key) + return computeHMAC(message, key) } -class InvalidSignatureHeaderException(message: String) : Throwable(message) +class InvalidSignatureHeaderException( + message: String, +) : Throwable(message) class ExpiredSignatureHeaderException : Throwable("signature header has expired") @@ -55,8 +54,12 @@ internal fun validateSignature( ) { // TODO: Find a way to parse signatureHeader as URL params without constructing a full URL val dummyUrl = "https://test.inngest.com/?$signatureHeader" - val url = dummyUrl.toHttpUrlOrNull() ?: throw InvalidSignatureHeaderException("signature header does not match expected format") - val timestamp = url.queryParameter("t")?.toLongOrNull() ?: throw InvalidSignatureHeaderException("timestamp is invalid") + val url = + dummyUrl.toHttpUrlOrNull() + ?: throw InvalidSignatureHeaderException("signature header does not match expected format") + val timestamp = + url.queryParameter("t")?.toLongOrNull() + ?: throw InvalidSignatureHeaderException("timestamp is invalid") val signature = url.queryParameter("s") ?: throw InvalidSignatureHeaderException("signature is invalid") val fiveMinutesAgo = Instant.now().minusSeconds(FIVE_MINUTES_IN_SECONDS).epochSecond @@ -98,7 +101,8 @@ fun checkHeadersAndValidateSignature( val signingKey = config.signingKey() - signatureHeader ?: throw InvalidSignatureHeaderException("Using cloud inngest but did not receive X-Inngest-Signature") + signatureHeader + ?: throw InvalidSignatureHeaderException("Using cloud inngest but did not receive X-Inngest-Signature") validateSignature(signatureHeader, signingKey, requestBody) } diff --git a/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt index 94c09279..229734e3 100644 --- a/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt +++ b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt @@ -1,19 +1,16 @@ package com.inngest -import com.inngest.signingkey.InvalidSigningKeyException import kotlin.test.Test import kotlin.test.assertEquals - -import org.junit.jupiter.api.Assertions.* import kotlin.test.assertFailsWith class InngestFunctionConfigBuilderTest { - @Test fun testGlobalId() { - val config = InngestFunctionConfigBuilder() - .id("test-id") - .build("app-id", "https://mysite.com/api/inngest") + val config = + InngestFunctionConfigBuilder() + .id("test-id") + .build("app-id", "https://mysite.com/api/inngest") assertEquals("app-id-test-id", config.id) } From ee6a684e363a87a5290b1b517dcdddb133db7cd5 Mon Sep 17 00:00:00 2001 From: Dan Farrelly Date: Mon, 22 Jul 2024 15:12:08 -0400 Subject: [PATCH 12/12] Update README --- README.md | 81 ++++++++++++------- .../main/kotlin/com/inngest/testserver/App.kt | 10 ++- .../com/inngest/testserver/TranscodeVideo.kt | 39 +++++++++ 3 files changed, 100 insertions(+), 30 deletions(-) create mode 100644 inngest-test-server/src/main/kotlin/com/inngest/testserver/TranscodeVideo.kt diff --git a/README.md b/README.md index 6e990b00..2ea745c7 100644 --- a/README.md +++ b/README.md @@ -8,32 +8,37 @@ Kotlin ```kotlin -import com.inngest.InngestFunction -import com.inngest.FunctionOptions -import com.inngest.FunctionTrigger - -val myFunction = InngestFunction( - FunctionOptions( - id = "fn-id-slug", - name = "My function!", - triggers = arrayOf(FunctionTrigger(event = "user.signup")), - ), -) { ctx, step -> - val x = 10 - - val res = - step.run("add-ten") { -> - x + 10 - } - val add: Int = - step.run("multiply-by-100") { - res * 100 - } - step.sleep("wait-one-minute", Duration.ofSeconds(60)) - - step.run("last-step") { res * add } - - hashMapOf("message" to "success") +class TranscodeVideo : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("process-video") + .name("Process video upload") + .triggerEvent("media/video.uploaded") + .concurrency(10) + + override fun execute( + ctx: FunctionContext, + step: Step, + ): HashMap { + val transcription = + step.run("transcribe-video") { + // Download video, run through transcription model, return output + "Hi there, My name is Jamie..." // dummy example content + } + + val summary = + step.run("summarize") { + // Send t + "Hi there, My name is Jamie..." // dummy example content + } + + step.run("save-results") { + // Save summary, to your database + // database.save(event.data["videoId"], transcription, summary) + } + + return hashMapOf("restored" to false) + } } ``` @@ -43,9 +48,28 @@ val myFunction = InngestFunction( Java (Coming soon) -## Declaring dependencies +## Defining configuration -WIP +Define your function's configuration using the `config` method and the `InngestFunctionConfigBuilder` class. +The `config` method must be overridden and an `id` is required. All options should are discoverable via +the builder class passed as the only argument to the `config` method. + +
+ Kotlin + +```kotlin +class TranscodeVideo : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("process-video") + .name("Process video upload") + .triggerEvent("media/video.uploaded") + .concurrency(10) + +} +``` + +
## Contributing [WIP] @@ -57,7 +81,6 @@ make dev-ktor This runs a `ktor` web server to test the SDK against the dev server. - To run the `spring-boot` test server: ``` diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt index 14323c7f..f378e066 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt @@ -19,7 +19,15 @@ fun Application.module() { val inngest = Inngest(appId = "ktor-dev") routing { - serve("/api/inngest", inngest, listOf(ProcessAlbum(), RestoreFromGlacier(), ProcessUserSignup())) + serve( + "/api/inngest", inngest, + listOf( + ProcessAlbum(), + RestoreFromGlacier(), + ProcessUserSignup(), + TranscodeVideo(), + ), + ) } } diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/TranscodeVideo.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/TranscodeVideo.kt new file mode 100644 index 00000000..9e96da41 --- /dev/null +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/TranscodeVideo.kt @@ -0,0 +1,39 @@ +package com.inngest.testserver + +import com.inngest.FunctionContext +import com.inngest.InngestFunction +import com.inngest.InngestFunctionConfigBuilder +import com.inngest.Step + +class TranscodeVideo : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("process-video") + .name("Process video upload") + .triggerEvent("media/video.uploaded") + .concurrency(10) + + override fun execute( + ctx: FunctionContext, + step: Step, + ): HashMap { + val transcription = + step.run("transcribe-video") { + // Download video, run through transcription model, return output + "Hi there, My name is Jamie..." // dummy example content + } + + val summary = + step.run("summarize") { + // Send t + "Hi there, My name is Jamie..." // dummy example content + } + + step.run("save-results") { + // Save summary, to your database + // database.save(event.data["videoId"], transcription, summary) + } + + return hashMapOf("restored" to false) + } +}