From cfcd43b8fdeeb7a50620ae712f199794fe94ce73 Mon Sep 17 00:00:00 2001 From: Albert Chae Date: Sat, 24 Feb 2024 23:48:16 -0800 Subject: [PATCH] INN-2306: Implement step.waitForEvent https://github.com/inngest/inngest/blob/main/docs/SDK_SPEC.md#533-wait-for-event https://www.inngest.com/docs/reference/functions/step-wait-for-event#step-wait-for-event-id-options-promise-null-event-payload Currently only supporting the `if` optional condition, will add `match` later (`match` is still sent over the wire as an `if` anyway) --- .../src/main/kotlin/com/inngest/Function.kt | 19 ++++++++- .../src/main/kotlin/com/inngest/Step.kt | 42 +++++++++++++++++++ .../springbootdemo/InngestSingleton.java | 4 +- .../src/main/kotlin/com/inngest/App.kt | 4 +- 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/inngest-core/src/main/kotlin/com/inngest/Function.kt b/inngest-core/src/main/kotlin/com/inngest/Function.kt index 789bc292..22a3180c 100644 --- a/inngest-core/src/main/kotlin/com/inngest/Function.kt +++ b/inngest-core/src/main/kotlin/com/inngest/Function.kt @@ -21,9 +21,9 @@ enum class OpCode { Sleep, StepStateFailed, // TODO Step, + WaitForEvent, // FUTURE: - WaitForEvent, StepNotFound, } @@ -54,7 +54,7 @@ data class StepOptions( override val name: String, override val op: OpCode, override val statusCode: ResultStatusCode, - val opts: HashMap? = null, + val opts: Map?, ) : StepOp(id, name, op, statusCode) data class StepConfig( @@ -128,6 +128,21 @@ open class InngestFunction( statusCode = ResultStatusCode.StepComplete, data = SendEventPayload(e.eventIds), ) + } catch (e: StepInterruptWaitForEventException) { + return StepOptions( + id = e.hashedId, + name = e.id, + op = OpCode.WaitForEvent, + statusCode = ResultStatusCode.StepComplete, + opts = + buildMap { + put("event", e.waitEvent) + put("timeout", e.timeout) + if (e.ifExpression != null) { + put("if", e.ifExpression) + } + }, + ) } catch (e: StepInterruptSleepException) { return StepOptions( opts = hashMapOf("duration" to e.data), diff --git a/inngest-core/src/main/kotlin/com/inngest/Step.kt b/inngest-core/src/main/kotlin/com/inngest/Step.kt index ed1f1c64..526eac74 100644 --- a/inngest-core/src/main/kotlin/com/inngest/Step.kt +++ b/inngest-core/src/main/kotlin/com/inngest/Step.kt @@ -22,6 +22,15 @@ class StepInterruptSleepException(id: String, hashedId: String, override val dat class StepInterruptSendEventException(id: String, hashedId: String, val eventIds: Array) : StepInterruptException(id, hashedId, eventIds) +class StepInterruptWaitForEventException( + id: String, + hashedId: String, + val waitEvent: String, + val timeout: String, + val ifExpression: String?, +) : + StepInterruptException(id, hashedId, null) + // TODO: Add name, stack, etc. if poss class StepError(message: String) : Exception(message) @@ -125,4 +134,37 @@ class Step(val state: State) { throw StepInterruptSendEventException(id, hashedId, response!!.ids) } } + + /** + * Waits for an event with the name provided in `waitEvent`, optionally check for a condition + * specified in `ifExpression` + * + * @param id Unique step id for memoization. + * @param waitEvent The name of the event we want the function to wait for + * @param timeout The amount of time to wait to receive an event. A time string compatible with https://www.npmjs.com/package/ms + * @param ifExpression An expression on which to conditionally match the original event trigger (`event`) and the wait event (`async`). + * Expressions are defined using the Common Expression Language (CEL) with the events accessible using dot-notation. + * + */ + fun waitForEvent( + id: String, + waitEvent: String, + timeout: String, + ifExpression: String?, + // TODO use better types for timeout and ifExpression that serialize to the relevant strings we send to the inngest server, instead of using raw strings + // TODO support `match` which is a convenience for checking the same expression in `event` and `async`. Also make it a mutually exclusive argument with + // ifExpression, possibly with a sealed class? + ): Any? { + val hashedId = state.getHashFromId(id) + + try { + val stepResult = state.getState(hashedId) + if (stepResult != null) { + return stepResult + } + return null // TODO should this throw an exception? also look into `EventPayload` https://github.com/inngest/inngest-kt/pull/26#discussion_r150176713 + } catch (e: StateNotFound) { + throw StepInterruptWaitForEventException(id, hashedId, waitEvent, timeout, ifExpression) + } + } } diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/InngestSingleton.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/InngestSingleton.java index 5f19a4d5..9ea216c9 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/InngestSingleton.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/InngestSingleton.java @@ -40,8 +40,10 @@ public static synchronized CommHandler getInstance() { System.out.println("-> running step 1!! " + x); return new Result(y + 10); }, Result.class); - System.out.println("res" + res); + + step.waitForEvent("wait-for-hello", "hello", "10m", "event.data.hello == async.data.hello"); + int add = step.run("add-one-hundred", () -> { System.out.println("-> running step 2 :) " + (res != null ? res.sum : "")); return (res != null ? res.sum : 0) + 100; diff --git a/inngest-test-server/src/main/kotlin/com/inngest/App.kt b/inngest-test-server/src/main/kotlin/com/inngest/App.kt index e52bc839..ce19dc3e 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/App.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/App.kt @@ -84,8 +84,10 @@ val fn = sum = y + 10, ) } - println("res" + res) + + step.waitForEvent("wait-for-hello", "hello", "10m", "event.data.hello == async.data.hello") + val add: Int = step.run("add-one-hundred") { println("-> running step 2 :) " + res?.sum)