diff --git a/README.md b/README.md index 6e990b00..2ea745c7 100644 --- a/README.md +++ b/README.md @@ -8,32 +8,37 @@ Kotlin ```kotlin -import com.inngest.InngestFunction -import com.inngest.FunctionOptions -import com.inngest.FunctionTrigger - -val myFunction = InngestFunction( - FunctionOptions( - id = "fn-id-slug", - name = "My function!", - triggers = arrayOf(FunctionTrigger(event = "user.signup")), - ), -) { ctx, step -> - val x = 10 - - val res = - step.run("add-ten") { -> - x + 10 - } - val add: Int = - step.run("multiply-by-100") { - res * 100 - } - step.sleep("wait-one-minute", Duration.ofSeconds(60)) - - step.run("last-step") { res * add } - - hashMapOf("message" to "success") +class TranscodeVideo : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("process-video") + .name("Process video upload") + .triggerEvent("media/video.uploaded") + .concurrency(10) + + override fun execute( + ctx: FunctionContext, + step: Step, + ): HashMap { + val transcription = + step.run("transcribe-video") { + // Download video, run through transcription model, return output + "Hi there, My name is Jamie..." // dummy example content + } + + val summary = + step.run("summarize") { + // Send t + "Hi there, My name is Jamie..." // dummy example content + } + + step.run("save-results") { + // Save summary, to your database + // database.save(event.data["videoId"], transcription, summary) + } + + return hashMapOf("restored" to false) + } } ``` @@ -43,9 +48,28 @@ val myFunction = InngestFunction( Java (Coming soon) -## Declaring dependencies +## Defining configuration -WIP +Define your function's configuration using the `config` method and the `InngestFunctionConfigBuilder` class. +The `config` method must be overridden and an `id` is required. All options should are discoverable via +the builder class passed as the only argument to the `config` method. + +
+ Kotlin + +```kotlin +class TranscodeVideo : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("process-video") + .name("Process video upload") + .triggerEvent("media/video.uploaded") + .concurrency(10) + +} +``` + +
## Contributing [WIP] @@ -57,7 +81,6 @@ make dev-ktor This runs a `ktor` web server to test the SDK against the dev server. - To run the `spring-boot` test server: ``` diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/FollowupFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/FollowupFunction.java index 7d881957..df3f0823 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/FollowupFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/FollowupFunction.java @@ -5,13 +5,18 @@ import java.util.LinkedHashMap; -@FunctionConfig( - id = "fn-follow-up", - name = "My follow up function!" -) -@FunctionEventTrigger(event = "user.signup.completed") -@FunctionEventTrigger(event = "random-event") public class FollowupFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("fn-follow-up") + .name("My follow up function!") + .triggerEvent("user.signup.completed") + .triggerEvent("random-event"); + } + @Override public LinkedHashMap execute(@NotNull FunctionContext ctx, @NotNull Step step) { System.out.println("-> follow up handler called " + ctx.getEvent().getName()); diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/UserSignupFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/UserSignupFunction.java index b4819504..ef516fca 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/UserSignupFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/UserSignupFunction.java @@ -6,9 +6,17 @@ import java.time.Duration; import java.util.HashMap; -@FunctionConfig(id = "fn-id-slug", name = "My function!") -@FunctionEventTrigger(event = "user-signup") public class UserSignupFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("fn-id-slug") + .name("My Function!") + .triggerEvent("user-signup"); + } + @Override public HashMap execute(@NotNull FunctionContext ctx, @NotNull Step step) { int x = 10; diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CustomStepFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CustomStepFunction.java index b0ba7dac..aaea1ab1 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CustomStepFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CustomStepFunction.java @@ -1,11 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "custom-result-fn", name = "Custom Result Function") -@FunctionEventTrigger(event = "test/custom.result.step") public class CustomStepFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("custom-result-fn") + .name("Custom Result Function") + .triggerEvent("test/custom.result.step"); + } + private final int count = 0; @Override diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/EmptyStepFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/EmptyStepFunction.java index 807fba34..abc638ef 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/EmptyStepFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/EmptyStepFunction.java @@ -1,10 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "no-step-fn", name = "No Step Function") -@FunctionEventTrigger(event = "test/no-step") public class EmptyStepFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("no-step-fn") + .name("No Step Function") + .triggerEvent("test/no-step"); + } + @Override public String execute(FunctionContext ctx, Step step) { return "hello world"; diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java index 431435a1..b6f9527a 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/NonRetriableErrorFunction.java @@ -1,11 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "non-retriable-fn", name = "NonRetriable Function") -@FunctionEventTrigger(event = "test/non.retriable") public class NonRetriableErrorFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("non-retriable-fn") + .name("NonRetriable Function") + .triggerEvent("test/non.retriable"); + } + @Override public String execute(FunctionContext ctx, Step step) { step.run("fail-step", () -> { diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RetriableErrorFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RetriableErrorFunction.java index 3c5aa82f..8a6f06a2 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RetriableErrorFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RetriableErrorFunction.java @@ -1,10 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "retriable-fn", name = "Retriable Function") -@FunctionEventTrigger(event = "test/retriable") public class RetriableErrorFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("retriable-fn") + .name("Retriable Function") + .triggerEvent("test/retriable"); + } + static int retryCount = 0; @Override diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SendEventFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SendEventFunction.java index 3d9a166e..09a9d673 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SendEventFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SendEventFunction.java @@ -1,13 +1,21 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; import java.util.HashMap; -@FunctionConfig(id = "send-fn", name = "Send Function") -@FunctionEventTrigger(event = "test/send") public class SendEventFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("send-fn") + .name("Send Function") + .triggerEvent("test/send"); + } + @Override public SendEventsResponse execute(FunctionContext ctx, Step step) { return step.sendEvent("send-test", new InngestEvent( diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SleepStepFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SleepStepFunction.java index 3353344d..5dfdba5b 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SleepStepFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/SleepStepFunction.java @@ -1,12 +1,21 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; import java.time.Duration; -@FunctionConfig(id = "sleep-fn", name = "Sleep Function") -@FunctionEventTrigger(event = "test/sleep") public class SleepStepFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("sleep-fn") + .name("Sleep Function") + .triggerEvent("test/sleep"); + } + @Override public Integer execute(FunctionContext ctx, Step step) { int result = step.run("num", () -> 42, Integer.class); diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TwoStepsFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TwoStepsFunction.java index 985cc2a8..eb018bd6 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TwoStepsFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/TwoStepsFunction.java @@ -1,11 +1,20 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "two-steps-fn", name = "Two Steps Function") -@FunctionEventTrigger(event = "test/two.steps") public class TwoStepsFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("two-steps-fn") + .name("Two Steps Function") + .triggerEvent("test/two.steps"); + } + + private final int count = 0; @Override diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WaitForEventFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WaitForEventFunction.java index 5ddb89b5..725bfab2 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WaitForEventFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WaitForEventFunction.java @@ -1,11 +1,19 @@ package com.inngest.springbootdemo.testfunctions; import com.inngest.*; +import org.jetbrains.annotations.NotNull; -@FunctionConfig(id = "wait-for-event-fn", name = "Wait for Event Function") -@FunctionEventTrigger(event = "test/wait-for-event") public class WaitForEventFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("wait-for-event-fn") + .name("Wait for Event Function") + .triggerEvent("test/wait-for-event"); + } + @Override public String execute(FunctionContext ctx, Step step) { Object event = step.waitForEvent("wait-test", diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt index ba8983ec..f378e066 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt @@ -15,20 +15,26 @@ data class Result( const val FOLLOW_UP_EVENT_NAME = "user.signup.completed" -data class IngestData(val message: String) - fun Application.module() { val inngest = Inngest(appId = "ktor-dev") routing { - serve("/api/inngest", inngest, listOf(ProcessAlbum(), RestoreFromGlacier())) + serve( + "/api/inngest", inngest, + listOf( + ProcessAlbum(), + RestoreFromGlacier(), + ProcessUserSignup(), + TranscodeVideo(), + ), + ) } } fun main() { - var port = 8080 + val port = 8080 - println("Test server running on port " + port) + println("Test server running on port $port") embeddedServer( Netty, diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt deleted file mode 100644 index b7e49ae3..00000000 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/FollowupFunction.kt +++ /dev/null @@ -1,16 +0,0 @@ -package com.inngest.testserver - -import com.inngest.* - -@FunctionConfig(id = "fn-follow-up", name = "My follow up function!") -@FunctionEventTrigger(event = "user.signup.completed") -@FunctionEventTrigger(event = "random-event") -class FollowupFunction : InngestFunction() { - override fun execute( - ctx: FunctionContext, - step: Step, - ): LinkedHashMap { - println("-> follow up handler called " + ctx.event.name) - return ctx.event.data - } -} diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt index e19a508c..25274cf1 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessAlbum.kt @@ -3,36 +3,39 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration -@FunctionConfig(id = "ProcessAlbum", name = "ProcessAlbum") -@FunctionEventTrigger(event = "delivery/process.requested") +/** + * A demo function that accepts an event in a batch and invokes a child function + */ class ProcessAlbum : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("ProcessAlbum") + .name("Process Album!") + .triggerEvent("delivery/process.requested") + .trigger(InngestFunctionTriggers.Cron("5 0 * 8 *")) + .batchEvents(30, Duration.ofSeconds(10)) + override fun execute( ctx: FunctionContext, step: Step, ): LinkedHashMap { +// val list = ctx.events.map { e -> e.data.get("something") } +// println(list); - // NOTE - App ID is set on the serve level - val res = step.invoke>( - "restore-album", - "ktor-dev", - "RestoreFromGlacier", - mapOf("some-arg" to "awesome"), - null, - - ) - -// throw NonRetriableError("Could not restore") - return linkedMapOf("hello" to true) - } - - fun isRestoredFromGlacier(temp: Int): Boolean { - if (temp > 2) { - return true + for (evt in ctx.events) { +// println(evt); + // NOTE - App ID is set on the serve level + val res = + step.invoke>( + "restore-album-${evt.data["albumId"]}", + "ktor-dev", + "RestoreFromGlacier", + evt.data, + null, + ) + println(res["restored"]) } - return false; - } - fun restoreFromGlacier(): String { - return "FILES_RESTORED" + return linkedMapOf("hello" to true) } } diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessUserSignup.kt similarity index 63% rename from inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt rename to inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessUserSignup.kt index 749db8f6..46ea2eeb 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/UserSignupFunction.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/ProcessUserSignup.kt @@ -3,9 +3,12 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration -@FunctionConfig(id = "fn-id-slug", name = "My function!") -@FunctionEventTrigger(event = "user-signup") -class UserSignupFunction : InngestFunction() { +class ProcessUserSignup : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("process-user-signup") + .triggerEvent("user-signup") + override fun execute( ctx: FunctionContext, step: Step, @@ -15,31 +18,29 @@ class UserSignupFunction : InngestFunction() { println("-> handler called " + ctx.event.name) val y = - step.run("add-ten") { -> - x + 10 - } + step.run("add-ten") { x + 10 } val res = - step.run("cast-to-type-add-ten") { -> - println("-> running step 1!! " + x) + step.run("cast-to-type-add-ten") { + println("-> running step 1!! $x") // throw Exception("An error!") Result( sum = y + 10, ) } - println("res" + res) + println("res $res") step.waitForEvent("wait-for-hello", "hello", "10m", "event.data.hello == async.data.hello") val add: Int = step.run("add-one-hundred") { - println("-> running step 2 :) " + res?.sum) + println("-> running step 2 :) " + res.sum) res.sum + 100 } step.sleep("wait-one-sec", Duration.ofSeconds(2)) - step.run("last-step") { res.sum.times(add) ?: 0 } + step.run("last-step") { res.sum.times(add) } step.sendEvent("followup-event-id", InngestEvent(FOLLOW_UP_EVENT_NAME, data = hashMapOf("hello" to "world"))) diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt index acfadd29..7a6b2e1e 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/RestoreFromGlacier.kt @@ -3,14 +3,18 @@ package com.inngest.testserver import com.inngest.* import java.time.Duration -@FunctionConfig(id = "RestoreFromGlacier", name = "RestoreFromGlacier") -@FunctionEventTrigger(event = "delivery/restore.requested") class RestoreFromGlacier : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("RestoreFromGlacier") + .name("Restore from Glacier") + .trigger(InngestFunctionTriggers.Event("delivery/restore.requested")) + .concurrency(10, null, ConcurrencyScope.ENVIRONMENT) + override fun execute( ctx: FunctionContext, step: Step, ): LinkedHashMap { - step.run("restore") { if (!isRestoredFromGlacier(0)) { restoreFromGlacier() @@ -18,9 +22,10 @@ class RestoreFromGlacier : InngestFunction() { } var i = 0 while (i < 6) { - val isRestored = step.run(String.format("check-status-%d", i)) { - isRestoredFromGlacier(i) - } + val isRestored = + step.run(String.format("check-status-%d", i)) { + isRestoredFromGlacier(i) + } if (isRestored) { return linkedMapOf("restored" to true) } @@ -32,14 +37,9 @@ class RestoreFromGlacier : InngestFunction() { return linkedMapOf("restored" to false) } - fun isRestoredFromGlacier(temp: Int): Boolean { - if (temp > 2) { - return true - } - return false; - } + // NOTE - This method is only a stub meant to simulate that Glacier restoration will return false + // the first couple of times in the loop. This is just to show the concept. + private fun isRestoredFromGlacier(temp: Int): Boolean = temp > 2 - fun restoreFromGlacier(): String { - return "FILES_RESTORED" - } + private fun restoreFromGlacier(): String = "FILES_RESTORED" } diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/TranscodeVideo.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/TranscodeVideo.kt new file mode 100644 index 00000000..9e96da41 --- /dev/null +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/TranscodeVideo.kt @@ -0,0 +1,39 @@ +package com.inngest.testserver + +import com.inngest.FunctionContext +import com.inngest.InngestFunction +import com.inngest.InngestFunctionConfigBuilder +import com.inngest.Step + +class TranscodeVideo : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("process-video") + .name("Process video upload") + .triggerEvent("media/video.uploaded") + .concurrency(10) + + override fun execute( + ctx: FunctionContext, + step: Step, + ): HashMap { + val transcription = + step.run("transcribe-video") { + // Download video, run through transcription model, return output + "Hi there, My name is Jamie..." // dummy example content + } + + val summary = + step.run("summarize") { + // Send t + "Hi there, My name is Jamie..." // dummy example content + } + + step.run("save-results") { + // Save summary, to your database + // database.save(event.data["videoId"], transcription, summary) + } + + return hashMapOf("restored" to false) + } +} diff --git a/inngest-test-server/src/test/kotlin/com/inngest/testserver/AppTest.kt b/inngest-test-server/src/test/kotlin/com/inngest/testserver/AppTest.kt deleted file mode 100644 index 3e698e47..00000000 --- a/inngest-test-server/src/test/kotlin/com/inngest/testserver/AppTest.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * This Kotlin source file was generated by the Gradle 'init' task. - */ -package com.inngest.testserver - -// import kotlin.test.Test -// import kotlin.test.assertNotNull - -// class AppTest { -// @Test fun appHasAGreeting() { -// val classUnderTest = App() -// assertNotNull(classUnderTest.greeting, "app should have a greeting") -// } -// } diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index ca873e13..6a379a46 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -20,19 +20,21 @@ data class ExecutionContext( val env: String, ) -internal data class RegistrationRequestPayload( - val appName: String, - val deployType: String = "ping", - val framework: String, - val functions: List = listOf(), - val sdk: String, - val url: String, - val v: String, -) - -enum class InngestSyncResult { - None, -} +internal data class RegistrationRequestPayload + @JvmOverloads + constructor( + val appName: String, + val deployType: String = "ping", + val framework: String, + val functions: List = listOf(), + val sdk: String, + val url: String, + val v: String, + ) + +// enum class InngestSyncResult { +// None, +// } data class CommResponse( val body: String, @@ -44,6 +46,8 @@ data class CommError( val name: String, val message: String?, val stack: String?, + // TODO - Convert to camelCase and use Klaxon property renaming for parsing/serialization + @Suppress("PropertyName") val __serialized: Boolean = true, ) @@ -54,14 +58,12 @@ class CommHandler( private val framework: SupportedFrameworkName, ) { val headers = Environment.inngestHeaders(framework).plus(client.headers) - internal val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() } + private val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() } fun callFunction( functionId: String, requestBody: String, ): CommResponse { - println(requestBody) - try { val payload = Klaxon().parse(requestBody) // TODO - check that payload is not null and throw error @@ -112,6 +114,19 @@ class CommHandler( return mapper.writeValueAsString(requestBody) } + private fun serializePayload(payload: Any?): String { + try { + return Klaxon() + .fieldConverter(KlaxonDuration::class, durationConverter) + .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) + .toJsonString(payload) + } catch (e: Exception) { + // TODO - Properly log this serialization failure + println(e) + return """{ "message": "failed serialization" }""" + } + } + private fun getFunctionConfigs(origin: String): List { val configs: MutableList = mutableListOf() functions.forEach { entry -> configs.add(entry.value.getFunctionConfig(getServeUrl(origin), client)) } @@ -143,17 +158,16 @@ class CommHandler( return parseRequestBody(body) } - fun sync(): Result { - return Result.success(InngestSyncResult.None) - } + // TODO +// fun sync(): Result = Result.success(InngestSyncResult.None) fun introspect(origin: String): String { val requestPayload = getRegistrationRequestPayload(origin) - return parseRequestBody(requestPayload) + return serializePayload(requestPayload) } - private fun getRegistrationRequestPayload(origin: String): RegistrationRequestPayload { - return RegistrationRequestPayload( + private fun getRegistrationRequestPayload(origin: String): RegistrationRequestPayload = + RegistrationRequestPayload( appName = config.appId(), framework = framework.toString(), sdk = "inngest-kt", @@ -161,7 +175,6 @@ class CommHandler( v = Version.getVersion(), functions = getFunctionConfigs(origin), ) - } private fun getServeUrl(origin: String): String { // TODO - property from SpringBoot should take preference to env variable? diff --git a/inngest/src/main/kotlin/com/inngest/Environment.kt b/inngest/src/main/kotlin/com/inngest/Environment.kt index b6d68d6e..542e7974 100644 --- a/inngest/src/main/kotlin/com/inngest/Environment.kt +++ b/inngest/src/main/kotlin/com/inngest/Environment.kt @@ -50,7 +50,7 @@ object Environment { "0" -> InngestEnv.Prod "1" -> InngestEnv.Dev else -> { - var other = InngestEnv.Other + val other = InngestEnv.Other other.value = sysDev other } @@ -64,7 +64,7 @@ object Environment { "prod" -> InngestEnv.Prod "production" -> InngestEnv.Prod else -> { - var other = InngestEnv.Other + val other = InngestEnv.Other other.value = env other } @@ -72,8 +72,7 @@ object Environment { } // Read from environment variable - val inngestEnv = System.getenv(InngestSystem.Env.value) - return when (inngestEnv) { + return when (val inngestEnv = System.getenv(InngestSystem.Env.value)) { null -> InngestEnv.Dev "dev" -> InngestEnv.Dev "development" -> InngestEnv.Dev @@ -81,7 +80,7 @@ object Environment { "production" -> InngestEnv.Prod else -> { - var other = InngestEnv.Other + val other = InngestEnv.Other other.value = inngestEnv other } diff --git a/inngest/src/main/kotlin/com/inngest/Event.kt b/inngest/src/main/kotlin/com/inngest/Event.kt index 84580716..5a98242f 100644 --- a/inngest/src/main/kotlin/com/inngest/Event.kt +++ b/inngest/src/main/kotlin/com/inngest/Event.kt @@ -1,6 +1,7 @@ package com.inngest data class Event( + val id: String, val name: String, val data: LinkedHashMap, val user: LinkedHashMap? = null, @@ -8,7 +9,7 @@ data class Event( val v: Any? = null, ) -data class EventAPIResponse( - val ids: Array, - val status: String, -) +// data class EventAPIResponse( +// val ids: Array, +// val status: String, +// ) diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index b8219eb2..a8c5e995 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -3,21 +3,6 @@ package com.inngest import com.beust.klaxon.Json import java.util.function.BiFunction -// IDEA: Use data classes -internal data class InternalFunctionOptions( - val id: String, - val name: String, - val triggers: Array, -) - -internal data class InternalFunctionTrigger -@JvmOverloads -constructor( - @Json(serializeNull = false) val event: String? = null, - @Json(serializeNull = false) val `if`: String? = null, - @Json(serializeNull = false) val cron: String? = null, -) - // TODO - Add an abstraction layer between the Function call response and the comm handler response enum class OpCode { StepRun, @@ -28,10 +13,13 @@ enum class OpCode { InvokeFunction, // FUTURE: - StepNotFound, +// StepNotFound, } -enum class ResultStatusCode(val code: Int, val message: String) { +enum class ResultStatusCode( + val code: Int, + val message: String, +) { StepComplete(206, "Step Complete"), FunctionComplete(200, "Function Complete"), NonRetriableError(400, "Bad Request"), @@ -77,12 +65,22 @@ data class StepConfig( val runtime: HashMap = hashMapOf("type" to "http"), ) -internal data class InternalFunctionConfig( - val id: String, - val name: String, - val triggers: Array, - val steps: Map, -) +@Suppress("unused") +internal class InternalFunctionConfig + @JvmOverloads + constructor( + val id: String, + val name: String?, + val triggers: MutableList, + @Json(serializeNull = false) + val concurrency: MutableList? = null, + @Json(serializeNull = false) + val batchEvents: BatchEvents? = null, + val steps: Map, + ) +// NOTE - This should probably be called serialized or formatted config +// as it's only used to format the config for register requests +// typealias InternalFunctionConfig = Map /** * The context for the current function run @@ -97,36 +95,47 @@ data class FunctionContext( val attempt: Int, ) -// TODO - Determine if we should merge config + trigger +data class SendEventPayload( + // TODO - Change this to camelCase and add Klaxon annotation to parse underscore name via API + @Suppress("PropertyName") + val event_ids: Array, +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false -/** - * A function that can be called by the Inngest system - * - * @param config The options for the function - * @param handler The function to be called when the function is triggered - */ + other as SendEventPayload + + return event_ids.contentEquals(other.event_ids) + } -data class SendEventPayload(val event_ids: Array) + override fun hashCode(): Int = event_ids.contentHashCode() +} internal interface Function { fun id(): String - - fun config(): InternalFunctionConfig } // TODO: make this implement the Function interface + +/** + * An internal class that accepts the configuration and a function handler + * and handles the execution and memoization of an InngestFunction + */ internal open class InternalInngestFunction( - val config: InternalFunctionOptions, + private val configBuilder: InngestFunctionConfigBuilder, val handler: (ctx: FunctionContext, step: Step) -> Any?, ) { - constructor(config: InternalFunctionOptions, handler: BiFunction) : this( - config, + @Suppress("unused") + constructor( + configBuilder: InngestFunctionConfigBuilder, + handler: BiFunction, + ) : this( + configBuilder, handler.toKotlin(), ) - fun id() = config.id - - // TODO - Validate options and trigger + fun id() = configBuilder.id fun call( ctx: FunctionContext, @@ -136,9 +145,6 @@ internal open class InternalInngestFunction( val state = State(requestBody) val step = Step(state, client) - // DEBUG - println(state) - try { val data = handler(ctx, step) return StepResult( @@ -163,13 +169,13 @@ internal open class InternalInngestFunction( op = OpCode.WaitForEvent, statusCode = ResultStatusCode.StepComplete, opts = - buildMap { - put("event", e.waitEvent) - put("timeout", e.timeout) - if (e.ifExpression != null) { - put("if", e.ifExpression) - } - }, + buildMap { + put("event", e.waitEvent) + put("timeout", e.timeout) + if (e.ifExpression != null) { + put("if", e.ifExpression) + } + }, ) } catch (e: StepInterruptSleepException) { return StepOptions( @@ -186,13 +192,14 @@ internal open class InternalInngestFunction( name = e.id, op = OpCode.InvokeFunction, statusCode = ResultStatusCode.StepComplete, - opts = buildMap { - put("function_id", functionId) - put("payload", mapOf("data" to e.data)) - if (e.timeout != null) { - put("timeout", e.timeout) - } - } + opts = + buildMap { + put("function_id", functionId) + put("payload", mapOf("data" to e.data)) + if (e.timeout != null) { + put("timeout", e.timeout) + } + }, ) } catch (e: StepInterruptException) { // NOTE - Currently this error could be caught in the user's own function @@ -217,33 +224,11 @@ internal open class InternalInngestFunction( } } - fun getFunctionConfig(serveUrl: String, client: Inngest): InternalFunctionConfig { - // TODO use URL objects instead of strings so we can fetch things like scheme - val scheme = serveUrl.split("://")[0] - return InternalFunctionConfig( - id = String.format("%s-%s", client.appId, config.id), - name = config.name, - triggers = config.triggers, - steps = - mapOf( - "step" to - StepConfig( - id = "step", - name = "step", - retries = - mapOf( - // TODO - Pull from FunctionOptions - "attempts" to 3, - ), - runtime = - hashMapOf( - "type" to scheme, - // TODO - Create correct URL - "url" to - "$serveUrl?fnId=${config.id}&stepId=step", - ), - ), - ), - ) + fun getFunctionConfig( + serveUrl: String, + client: Inngest, + ): InternalFunctionConfig { + // TODO use URL objects for serveUrl instead of strings so we can fetch things like scheme + return configBuilder.build(client.appId, serveUrl) } } diff --git a/inngest/src/main/kotlin/com/inngest/HttpClient.kt b/inngest/src/main/kotlin/com/inngest/HttpClient.kt index bb28fd3f..37f93e77 100644 --- a/inngest/src/main/kotlin/com/inngest/HttpClient.kt +++ b/inngest/src/main/kotlin/com/inngest/HttpClient.kt @@ -9,30 +9,42 @@ import okhttp3.Response typealias RequestHeaders = Map -data class RequestConfig(val headers: RequestHeaders? = null) +data class RequestConfig( + val headers: RequestHeaders? = null, +) val jsonMediaType = "application/json".toMediaType() -internal class HttpClient(private val clientConfig: RequestConfig) { +internal class HttpClient( + private val clientConfig: RequestConfig, +) { private val client = OkHttpClient() fun send( request: okhttp3.Request, handler: (Response) -> T, - ) = this.client.newCall(request).execute().use(handler) + ) = this.client + .newCall(request) + .execute() + .use(handler) fun build( url: String, payload: Any, config: RequestConfig? = null, ): okhttp3.Request { - val jsonRequestBody = Klaxon().toJsonString(payload) + val jsonRequestBody = + Klaxon() + .fieldConverter(KlaxonDuration::class, durationConverter) + .fieldConverter(KlaxonConcurrencyScope::class, concurrencyScopeConverter) + .toJsonString(payload) val body = jsonRequestBody.toRequestBody(jsonMediaType) val clientHeaders = clientConfig.headers ?: emptyMap() val requestHeaders = config?.headers ?: emptyMap() - return okhttp3.Request.Builder() + return okhttp3.Request + .Builder() .url(url) .post(body) .headers(toOkHttpHeaders(clientHeaders + requestHeaders)) diff --git a/inngest/src/main/kotlin/com/inngest/Inngest.kt b/inngest/src/main/kotlin/com/inngest/Inngest.kt index 35a4bf59..0659dc45 100644 --- a/inngest/src/main/kotlin/com/inngest/Inngest.kt +++ b/inngest/src/main/kotlin/com/inngest/Inngest.kt @@ -17,7 +17,7 @@ class Inngest val headers: RequestHeaders = Environment.inngestHeaders() val env = Environment.inngestEnv(env = env, isDev = isDev) val eventKey = Environment.inngestEventKey(eventKey) - val baseUrl = Environment.inngestEventApiBaseUrl(env = this.env, url = baseUrl) + private val baseUrl = Environment.inngestEventApiBaseUrl(env = this.env, url = baseUrl) internal val httpClient = HttpClient(RequestConfig(headers)) diff --git a/inngest/src/main/kotlin/com/inngest/InngestEnv.kt b/inngest/src/main/kotlin/com/inngest/InngestEnv.kt index 21e6811e..cb41750a 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestEnv.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestEnv.kt @@ -1,6 +1,8 @@ package com.inngest -enum class InngestSystem(val value: String) { +enum class InngestSystem( + val value: String, +) { // Critical variables EventKey("INNGEST_EVENT_KEY"), SigningKey("INNGEST_SIGNING_KEY"), @@ -10,14 +12,17 @@ enum class InngestSystem(val value: String) { EventApiBaseUrl("INNGEST_BASE_URL"), ApiBaseUrl("INNGEST_API_BASE_URL"), LogLevel("INNGEST_LOG_LEVEL"), - ApiOrigin("INNGEST_API_ORIGIN"), + + // TODO - Rename this env variable to match other SDKS +// ApiOrigin("INNGEST_API_ORIGIN"), ServeOrigin("INNGEST_SERVE_ORIGIN"), ServePath("INNGEST_SERVE_PATH"), - Streaming("INNGEST_STREAMING"), Dev("INNGEST_DEV"), } -enum class InngestEnv(var value: String) { +enum class InngestEnv( + var value: String, +) { Dev("dev"), Prod("prod"), Other("other"), diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index a13cb93f..bcbc24c6 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -1,74 +1,39 @@ package com.inngest -import com.beust.klaxon.Json - -@Target(AnnotationTarget.CLASS) -@MustBeDocumented -annotation class FunctionConfig( - val id: String, - val name: String, -) - -@Target(AnnotationTarget.CLASS) -@Repeatable -annotation class FunctionEventTrigger( - @Json(serializeNull = false) val event: String, -) - -@Target(AnnotationTarget.CLASS) -@Repeatable -@MustBeDocumented -annotation class FunctionCronTrigger( - @Json(serializeNull = false) val cron: String, -) - -@Target(AnnotationTarget.CLASS) -@Repeatable -@MustBeDocumented -annotation class FunctionIfTrigger( - @Json(serializeNull = false) val `if`: String, -) - abstract class InngestFunction { + open fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = builder + + /** + * The function handler that will be run whenever the function is executed. + * + * @param ctx The function context including event(s) that triggered the function + * @param step A class with methods to define steps within the function + */ abstract fun execute( ctx: FunctionContext, step: Step, ): Any? - private val config = this::class.annotations.find { it.annotationClass == FunctionConfig::class } + private fun buildConfig(): InngestFunctionConfigBuilder { + val builder = InngestFunctionConfigBuilder() + return this.config(builder) + } fun id(): String { - if (config == null || config !is FunctionConfig) { - throw Exception("InngestFuncConfig annotation is required to setup an InngestFunc") + try { + return buildConfig().id!! + } catch (e: Exception) { + throw InngestInvalidConfigurationException( + "Function id must be configured via builder: ${this.javaClass.name}", + ) } - return config.id } internal fun toInngestFunction(): InternalInngestFunction { - if (config == null || config !is FunctionConfig) { - throw Exception("FunctionConfig annotation is required to setup an InngestFunction") - } - val triggers = buildEventTriggers() + buildCronTriggers() + buildIfTriggers() - val fnConfig = - InternalFunctionOptions( - id = config.id, - name = config.name, - triggers = triggers.toTypedArray(), - ) - - return InternalInngestFunction(fnConfig, this::execute) + val builder = InngestFunctionConfigBuilder() + val configBuilder = this.config(builder) + return InternalInngestFunction(configBuilder, this::execute) } - // TODO: DRY this - private fun buildEventTriggers(): List = - this::class.annotations.filter { it.annotationClass == FunctionEventTrigger::class } - .map { InternalFunctionTrigger(event = (it as FunctionEventTrigger).event) } - - private fun buildCronTriggers(): List = - this::class.annotations.filter { it.annotationClass == FunctionCronTrigger::class } - .map { InternalFunctionTrigger(cron = (it as FunctionCronTrigger).cron) } - - private fun buildIfTriggers(): List = - this::class.annotations.filter { it.annotationClass == FunctionIfTrigger::class } - .map { InternalFunctionTrigger(event = (it as FunctionIfTrigger).`if`) } + // TODO - Add toFailureHandler method to generate a second function if configured } diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt new file mode 100644 index 00000000..3832570b --- /dev/null +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -0,0 +1,221 @@ +package com.inngest + +import com.beust.klaxon.Converter +import com.beust.klaxon.Json +import com.beust.klaxon.JsonValue +import com.beust.klaxon.KlaxonException +import java.time.Duration + +// TODO: Throw illegal argument exception +class InngestFunctionConfigBuilder { + var id: String? = null + private var name: String? = null + private var triggers: MutableList = mutableListOf() + private var concurrency: MutableList? = null + private var batchEvents: BatchEvents? = null + + /** + * @param id A unique identifier for the function that should not change over time + */ + fun id(id: String): InngestFunctionConfigBuilder { + this.id = id + return this + } + + /** + * @param name A formatted name for the function, visible in UIs and logs + */ + fun name(name: String): InngestFunctionConfigBuilder { + this.name = name + return this + } + + /** + * Define a function trigger using a given InngestFunctionTrigger nested class constructor + * + * @param trigger An event or cron function trigger + */ + fun trigger(trigger: InngestFunctionTrigger): InngestFunctionConfigBuilder { + // TODO - Check max triggers + // TODO - Check mutually exclusive opts (cron v. event+if?) + this.triggers.add(trigger) + return this + } + + /** + * Define a function trigger for any matching events with a given name. + * + * @param event The name of the event to trigger on + */ + fun triggerEvent(event: String): InngestFunctionConfigBuilder { + this.triggers.add(InngestFunctionTriggers.Event(event, null)) + return this + } + + /** + * Define a function trigger for any matching events with a given name and filter + * matching events with an expression statement. + * + * @param event The name of the event to trigger on + * @param if A CEL expression to filter matching events to trigger on. + * Example: "event.data.appId == '12345'" + */ + @Suppress("unused") + fun triggerEventIf( + event: String, + `if`: String? = null, + ): InngestFunctionConfigBuilder { + this.triggers.add(InngestFunctionTriggers.Event(event, `if`)) + return this + } + + /** + * @param cron A crontab expression + */ + @Suppress("unused") + fun triggerCron(cron: String): InngestFunctionConfigBuilder { + this.triggers.add(InngestFunctionTriggers.Cron(cron)) + return this + } + + /** + * Configure the function to be executed with batches of events (1 to n). + * Events will be added into a batch until the maxSize has been reached or + * until the timeout has expired. Any events in this batch will be passed + * to the executing function. + * + * @param maxSize The maximum number of events to execute the function with + * @param timeout The maximum duration of time to wait before executing the function + * @param key A CEL expression to group events batches by. Example: "event.data.destinationId" + */ + fun batchEvents( + maxSize: Int, + timeout: Duration, + key: String? = null, + ): InngestFunctionConfigBuilder { + this.batchEvents = BatchEvents(maxSize, timeout, key) + return this + } + + /** + * Configure step concurrency limit + * + * @param limit Maximum number of concurrent executing steps across function type + * @param key A CEL expression to apply limit using event payload properties. Example: "event.data.destinationId" + * @param scope The scope to apply the limit to. Options + */ + fun concurrency( + limit: Int, + key: String? = null, + scope: ConcurrencyScope? = null, + ): InngestFunctionConfigBuilder { + val c = Concurrency(limit, key, scope) + // TODO - Limit concurrency length to 2 + if (this.concurrency == null) { + this.concurrency = mutableListOf(c) + } else { + this.concurrency?.add(c) + } + return this + } + + private fun buildSteps(serveUrl: String): Map { + val scheme = serveUrl.split("://")[0] + return mapOf( + "step" to + StepConfig( + id = "step", + name = "step", + retries = + mapOf( + // TODO - Pull from conf option + "attempts" to 3, + ), + runtime = + hashMapOf( + "type" to scheme, + "url" to "$serveUrl?fnId=$id&stepId=step", + ), + ), + ) + } + + internal fun build( + appId: String, + serverUrl: String, + ): InternalFunctionConfig { + if (id == null) { + throw InngestInvalidConfigurationException("Function id must be configured via builder") + } + val globalId = String.format("%s-%s", appId, id) + val config = + InternalFunctionConfig( + globalId, + name, + triggers, + concurrency, + batchEvents, + steps = buildSteps(serverUrl), + ) + return config + } +} + +class InngestInvalidConfigurationException( + message: String, +) : Exception(message) + +@Target(AnnotationTarget.FIELD) +annotation class KlaxonDuration + +val durationConverter = + object : Converter { + override fun canConvert(cls: Class<*>): Boolean = cls == Duration::class.java + + // TODO Implement this - parse 30s into duration of seconds + override fun fromJson(jv: JsonValue): Duration = + throw KlaxonException("Duration parse not implemented: ${jv.string}") + + override fun toJson(value: Any): String = """"${(value as Duration).seconds}s"""" + } + +@Target(AnnotationTarget.FIELD) +annotation class KlaxonConcurrencyScope + +val concurrencyScopeConverter = + object : Converter { + override fun canConvert(cls: Class<*>): Boolean = cls.isEnum + + override fun fromJson(jv: JsonValue): ConcurrencyScope = enumValueOf(jv.string!!) + + override fun toJson(value: Any): String = """"${(value as ConcurrencyScope).value}"""" + } + +// TODO - Convert enum element to value, not name +enum class ConcurrencyScope( + val value: String, +) { + ACCOUNT("account"), + ENVIRONMENT("env"), + FUNCTION("fn"), +} + +internal data class Concurrency + @JvmOverloads + constructor( + val limit: Int, + @Json(serializeNull = false) + val key: String? = null, + @Json(serializeNull = false) + @KlaxonConcurrencyScope + val scope: ConcurrencyScope? = null, + ) + +internal data class BatchEvents + @JvmOverloads + constructor( + val maxSize: Int, + @KlaxonDuration + val timeout: Duration, + @Json(serializeNull = false) val key: String? = null, + ) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt new file mode 100644 index 00000000..665c5f19 --- /dev/null +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt @@ -0,0 +1,42 @@ +package com.inngest + +import com.beust.klaxon.Json + +/** + * A generic class for defining and serializing function triggers + */ +abstract class InngestFunctionTrigger // or interface or data class + @JvmOverloads + constructor( + @Json(serializeNull = false) val event: String? = null, + @Json(serializeNull = false) val `if`: String? = null, + @Json(serializeNull = false) val cron: String? = null, + // IDEA - Add timeout and re-use for cancelOn? + ) + +/** + * A class that contains nested classes to define function triggers + */ +class InngestFunctionTriggers { + /** + * Define a function trigger for any matching events with a given name. + * Optionally filter matching events with an expression statement. + * + * @param event The name of the event to trigger on + * @param if A CEL expression to filter matching events to trigger on (optional). + * Example: "event.data.appId == '12345'" + */ + class Event( + event: String, + `if`: String? = null, + ) : InngestFunctionTrigger(event, `if`, null) + + /** + * Define a function trigger that will execute on a given crontab schedule. + * + * @param cron A crontab expression. Example: "0 9 * * 1" + */ + class Cron( + cron: String, + ) : InngestFunctionTrigger(null, null, cron) +} diff --git a/inngest/src/main/kotlin/com/inngest/InngestHeaderKey.kt b/inngest/src/main/kotlin/com/inngest/InngestHeaderKey.kt index b4c2ed57..e9277ed6 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestHeaderKey.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestHeaderKey.kt @@ -1,6 +1,9 @@ package com.inngest -enum class InngestHeaderKey(val value: String) { +@Suppress("unused") +enum class InngestHeaderKey( + val value: String, +) { ContentType("content-type"), UserAgent("user-agent"), Sdk("x-inngest-sdk"), diff --git a/inngest/src/main/kotlin/com/inngest/NonRetriableError.kt b/inngest/src/main/kotlin/com/inngest/NonRetriableError.kt index 645a91f5..a59572fe 100644 --- a/inngest/src/main/kotlin/com/inngest/NonRetriableError.kt +++ b/inngest/src/main/kotlin/com/inngest/NonRetriableError.kt @@ -2,4 +2,7 @@ package com.inngest open class NonRetriableError @JvmOverloads - constructor(message: String, cause: Throwable? = null) : RuntimeException(message, cause) + constructor( + message: String, + cause: Throwable? = null, + ) : RuntimeException(message, cause) diff --git a/inngest/src/main/kotlin/com/inngest/RetryAfterError.kt b/inngest/src/main/kotlin/com/inngest/RetryAfterError.kt index 818d2b14..db7c04d5 100644 --- a/inngest/src/main/kotlin/com/inngest/RetryAfterError.kt +++ b/inngest/src/main/kotlin/com/inngest/RetryAfterError.kt @@ -5,8 +5,11 @@ import java.time.format.DateTimeFormatter open class RetryAfterError @JvmOverloads - constructor(message: String, retryAfter: Any, cause: Throwable? = null) : - RuntimeException(message, cause) { + constructor( + message: String, + retryAfter: Any, + cause: Throwable? = null, + ) : RuntimeException(message, cause) { var retryAfter: String = when (retryAfter) { is ZonedDateTime -> retryAfter.format(DateTimeFormatter.ISO_INSTANT) diff --git a/inngest/src/main/kotlin/com/inngest/RetryDecision.kt b/inngest/src/main/kotlin/com/inngest/RetryDecision.kt index 4c73a2b7..4a91323f 100644 --- a/inngest/src/main/kotlin/com/inngest/RetryDecision.kt +++ b/inngest/src/main/kotlin/com/inngest/RetryDecision.kt @@ -1,6 +1,9 @@ package com.inngest -internal data class RetryDecision(val shouldRetry: Boolean, val headers: Map) { +internal data class RetryDecision( + val shouldRetry: Boolean, + val headers: Map, +) { companion object { internal fun fromException(exception: Exception): RetryDecision = when (exception) { diff --git a/inngest/src/main/kotlin/com/inngest/ServeConfig.kt b/inngest/src/main/kotlin/com/inngest/ServeConfig.kt index 8bb9e63f..a9a0352e 100644 --- a/inngest/src/main/kotlin/com/inngest/ServeConfig.kt +++ b/inngest/src/main/kotlin/com/inngest/ServeConfig.kt @@ -23,10 +23,9 @@ class ServeConfig return when (client.env) { InngestEnv.Dev -> "test" else -> { - val signingKey = System.getenv(InngestSystem.SigningKey.value) - if (signingKey == null) { - throw Exception("signing key is required") - } + val signingKey = + System.getenv(InngestSystem.SigningKey.value) + ?: throw Exception("signing key is required") signingKey } } diff --git a/inngest/src/main/kotlin/com/inngest/State.kt b/inngest/src/main/kotlin/com/inngest/State.kt index e0366a14..616bb375 100644 --- a/inngest/src/main/kotlin/com/inngest/State.kt +++ b/inngest/src/main/kotlin/com/inngest/State.kt @@ -4,9 +4,11 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import java.security.MessageDigest -class StateNotFound() : Throwable("State not found for id") +class StateNotFound : Throwable("State not found for id") -class State(val payloadJson: String) { +class State( + private val payloadJson: String, +) { fun getHashFromId(id: String): String { val bytes = id.toByteArray(Charsets.UTF_8) val digest = MessageDigest.getInstance("SHA-1") diff --git a/inngest/src/main/kotlin/com/inngest/Step.kt b/inngest/src/main/kotlin/com/inngest/Step.kt index 79d07b2e..be091708 100644 --- a/inngest/src/main/kotlin/com/inngest/Step.kt +++ b/inngest/src/main/kotlin/com/inngest/Step.kt @@ -5,26 +5,63 @@ import java.time.Duration typealias MemoizedRecord = HashMap typealias MemoizedState = HashMap -data class InngestEvent(val name: String, val data: Any) +data class InngestEvent( + val name: String, + val data: Any, +) -data class SendEventsResponse(val ids: Array) +data class SendEventsResponse( + val ids: Array, +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false -class StepInvalidStateTypeException(val id: String, val hashedId: String) : Throwable("Step execution interrupted") + other as SendEventsResponse -class StepStateTypeMismatchException(val id: String, val hashedId: String) : Throwable("Step execution interrupted") + return ids.contentEquals(other.ids) + } -open class StepInterruptException(val id: String, val hashedId: String, open val data: kotlin.Any?) : - Throwable("Interrupt $id") + override fun hashCode(): Int = ids.contentHashCode() +} -class StepInterruptSleepException(id: String, hashedId: String, override val data: String) : - StepInterruptException(id, hashedId, data) +class StepInvalidStateTypeException( + val id: String, + val hashedId: String, +) : Throwable("Step execution interrupted") -class StepInterruptSendEventException(id: String, hashedId: String, val eventIds: Array) : - StepInterruptException(id, hashedId, eventIds) +// TODO - Add State type mismatch checks +// class StepStateTypeMismatchException( +// val id: String, +// val hashedId: String, +// ) : Throwable("Step execution interrupted") +open class StepInterruptException( + val id: String, + val hashedId: String, + open val data: Any?, +) : Throwable("Interrupt $id") -class StepInterruptInvokeException(id: String, hashedId: String, val appId: String, val fnId: String, data: kotlin.Any?, val timeout: String?) : - StepInterruptException(id, hashedId, data) +class StepInterruptSleepException( + id: String, + hashedId: String, + override val data: String, +) : StepInterruptException(id, hashedId, data) + +class StepInterruptSendEventException( + id: String, + hashedId: String, + val eventIds: Array, +) : StepInterruptException(id, hashedId, eventIds) + +class StepInterruptInvokeException( + id: String, + hashedId: String, + val appId: String, + val fnId: String, + data: Any?, + val timeout: String?, +) : StepInterruptException(id, hashedId, data) class StepInterruptWaitForEventException( id: String, @@ -32,13 +69,12 @@ class StepInterruptWaitForEventException( val waitEvent: String, val timeout: String, val ifExpression: String?, -) : - StepInterruptException(id, hashedId, null) - -// TODO: Add name, stack, etc. if poss -class StepError(message: String) : Exception(message) +) : StepInterruptException(id, hashedId, null) -class Step(val state: State, val client: Inngest) { +class Step( + private val state: State, + val client: Inngest, +) { /** * Run a function * @@ -77,7 +113,8 @@ class Step(val state: State, val client: Inngest) { * Invoke another Inngest function as a step * * @param id unique step id for memoization - * @param fn ID of the function to invoke + * @param appId ID of the Inngest app which contains the function to invoke (see client) + * @param fnId ID of the function to invoke * @param data the data to pass within `event.data` to the function * @param timeout an optional timeout for the invoked function. If the invoked function does * not finish within this time, the invoked function will be marked as failed. @@ -86,7 +123,7 @@ class Step(val state: State, val client: Inngest) { id: String, appId: String, fnId: String, - data: kotlin.Any?, + data: Any?, timeout: String?, ): T = invoke(id, appId, fnId, data, timeout, T::class.java) @@ -94,7 +131,7 @@ class Step(val state: State, val client: Inngest) { id: String, appId: String, fnId: String, - data: kotlin.Any?, + data: Any?, timeout: String?, type: Class, ): T { @@ -131,7 +168,7 @@ class Step(val state: State, val client: Inngest) { } return } catch (e: StateNotFound) { - val durationInSeconds = duration.getSeconds() + val durationInSeconds = duration.seconds throw StepInterruptSleepException(id, hashedId, "${durationInSeconds}s") } } diff --git a/inngest/src/main/kotlin/com/inngest/SupportedFrameworkName.kt b/inngest/src/main/kotlin/com/inngest/SupportedFrameworkName.kt index f1c3c02b..e8ba8fb1 100644 --- a/inngest/src/main/kotlin/com/inngest/SupportedFrameworkName.kt +++ b/inngest/src/main/kotlin/com/inngest/SupportedFrameworkName.kt @@ -1,6 +1,8 @@ package com.inngest -enum class SupportedFrameworkName(val value: String) { +enum class SupportedFrameworkName( + val value: String, +) { SpringBoot("springboot"), Ktor("ktor"), } diff --git a/inngest/src/main/kotlin/com/inngest/Version.kt b/inngest/src/main/kotlin/com/inngest/Version.kt index 5e791b6e..8bdc7c3d 100644 --- a/inngest/src/main/kotlin/com/inngest/Version.kt +++ b/inngest/src/main/kotlin/com/inngest/Version.kt @@ -1,6 +1,6 @@ package com.inngest -class Version() { +class Version { companion object { private val version: String? = Version::class.java.getPackage().implementationVersion diff --git a/inngest/src/main/kotlin/com/inngest/ktor/Route.kt b/inngest/src/main/kotlin/com/inngest/ktor/Route.kt index 23406c34..d4a4084e 100644 --- a/inngest/src/main/kotlin/com/inngest/ktor/Route.kt +++ b/inngest/src/main/kotlin/com/inngest/ktor/Route.kt @@ -43,10 +43,7 @@ fun Route.serve( route(path) { get("") { - var origin = String.format("%s://%s", call.request.origin.scheme, call.request.origin.serverHost) - if (call.request.origin.serverPort != 80 || call.request.origin.serverPort != 443) { - origin = String.format("%s:%s", origin, call.request.origin.serverPort) - } + val origin = getOrigin(call) val resp = comm.introspect(origin) call.respond(HttpStatusCode.OK, resp) } @@ -66,7 +63,7 @@ fun Route.serve( call.response.status( HttpStatusCode(response.statusCode.code, response.statusCode.message), ) - println("response: " + response.body) +// println("response: " + response.body) call.respond(response.body) } catch (e: Exception) { call.respond(HttpStatusCode.InternalServerError, e.toString()) @@ -77,12 +74,17 @@ fun Route.serve( } put("") { - var origin = String.format("%s://%s", call.request.origin.scheme, call.request.origin.serverHost) - if (call.request.origin.serverPort != 80 || call.request.origin.serverPort != 443) { - origin = String.format("%s:%s", origin, call.request.origin.serverPort) - } + val origin = getOrigin(call) val resp = comm.register(origin) call.respond(HttpStatusCode.OK, resp) } } } + +fun getOrigin(call: ApplicationCall): String { + var origin = String.format("%s://%s", call.request.origin.scheme, call.request.origin.serverHost) + if (call.request.origin.serverPort != 80 || call.request.origin.serverPort != 443) { + origin = String.format("%s:%s", origin, call.request.origin.serverPort) + } + return origin +} diff --git a/inngest/src/main/kotlin/com/inngest/signingkey/BearerToken.kt b/inngest/src/main/kotlin/com/inngest/signingkey/BearerToken.kt index 1cc0a6cb..b0d162e5 100644 --- a/inngest/src/main/kotlin/com/inngest/signingkey/BearerToken.kt +++ b/inngest/src/main/kotlin/com/inngest/signingkey/BearerToken.kt @@ -4,7 +4,7 @@ import com.inngest.RequestHeaders import java.lang.NumberFormatException import java.security.MessageDigest -val SIGNING_KEY_REGEX = Regex("""(?^signkey-[\w]+-)(?.*)""") +val SIGNING_KEY_REGEX = Regex("""(?^signkey-\w+-)(?.*)""") /** * Takes a signing key in the form "signkey--" and returns "signkey--" @@ -18,7 +18,7 @@ val SIGNING_KEY_REGEX = Regex("""(?^signkey-[\w]+-)(?.*)""") private fun hashedSigningKey(signingKey: String): String { val matchResult = SIGNING_KEY_REGEX.matchEntire(signingKey) ?: throw InvalidSigningKeyException() - // We aggressively assert non null here because if `matchEntire` had failed (and thus these capture groups didn't + // We aggressively assert non-null here because if `matchEntire` had failed (and thus these capture groups didn't // exist), we would have already thrown an exception val prefix = matchResult.groups["prefix"]!!.value val key = matchResult.groups["key"]!!.value diff --git a/inngest/src/main/kotlin/com/inngest/signingkey/SignatureVerification.kt b/inngest/src/main/kotlin/com/inngest/signingkey/SignatureVerification.kt index 60299d01..1a67fc00 100644 --- a/inngest/src/main/kotlin/com/inngest/signingkey/SignatureVerification.kt +++ b/inngest/src/main/kotlin/com/inngest/signingkey/SignatureVerification.kt @@ -12,12 +12,11 @@ const val HMAC_SHA256 = "HmacSHA256" // Implementation of this is inspired by the pure Java example from https://www.baeldung.com/java-hmac#hmac-using-jdk-apis @OptIn(ExperimentalStdlibApi::class) private fun computeHMAC( - algorithm: String, data: String, key: String, ): String { - val secretKeySpec = SecretKeySpec(key.toByteArray(Charsets.UTF_8), algorithm) - val mac = Mac.getInstance(algorithm) + val secretKeySpec = SecretKeySpec(key.toByteArray(Charsets.UTF_8), HMAC_SHA256) + val mac = Mac.getInstance(HMAC_SHA256) mac.init(secretKeySpec) return mac.doFinal(data.toByteArray(Charsets.UTF_8)).toHexString() } @@ -26,9 +25,7 @@ internal fun signRequest( requestBody: String, timestamp: Long, signingKey: String, -): String { - return signRequest(requestBody, timestamp.toString(), signingKey) -} +): String = signRequest(requestBody, timestamp.toString(), signingKey) private fun signRequest( requestBody: String, @@ -39,10 +36,12 @@ private fun signRequest( val key = matchResult.groups["key"]!!.value val message = requestBody + timestamp - return computeHMAC(HMAC_SHA256, message, key) + return computeHMAC(message, key) } -class InvalidSignatureHeaderException(message: String) : Throwable(message) +class InvalidSignatureHeaderException( + message: String, +) : Throwable(message) class ExpiredSignatureHeaderException : Throwable("signature header has expired") @@ -55,8 +54,12 @@ internal fun validateSignature( ) { // TODO: Find a way to parse signatureHeader as URL params without constructing a full URL val dummyUrl = "https://test.inngest.com/?$signatureHeader" - val url = dummyUrl.toHttpUrlOrNull() ?: throw InvalidSignatureHeaderException("signature header does not match expected format") - val timestamp = url.queryParameter("t")?.toLongOrNull() ?: throw InvalidSignatureHeaderException("timestamp is invalid") + val url = + dummyUrl.toHttpUrlOrNull() + ?: throw InvalidSignatureHeaderException("signature header does not match expected format") + val timestamp = + url.queryParameter("t")?.toLongOrNull() + ?: throw InvalidSignatureHeaderException("timestamp is invalid") val signature = url.queryParameter("s") ?: throw InvalidSignatureHeaderException("signature is invalid") val fiveMinutesAgo = Instant.now().minusSeconds(FIVE_MINUTES_IN_SECONDS).epochSecond @@ -98,7 +101,8 @@ fun checkHeadersAndValidateSignature( val signingKey = config.signingKey() - signatureHeader ?: throw InvalidSignatureHeaderException("Using cloud inngest but did not receive X-Inngest-Signature") + signatureHeader + ?: throw InvalidSignatureHeaderException("Using cloud inngest but did not receive X-Inngest-Signature") validateSignature(signatureHeader, signingKey, requestBody) } diff --git a/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt new file mode 100644 index 00000000..229734e3 --- /dev/null +++ b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt @@ -0,0 +1,24 @@ +package com.inngest + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class InngestFunctionConfigBuilderTest { + @Test + fun testGlobalId() { + val config = + InngestFunctionConfigBuilder() + .id("test-id") + .build("app-id", "https://mysite.com/api/inngest") + assertEquals("app-id-test-id", config.id) + } + + @Test + fun testMissingId() { + assertFailsWith { + InngestFunctionConfigBuilder() + .build("app-id", "https://mysite.com/api/inngest") + } + } +}