Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INN-2306: Implement step.waitForEvent #26

Merged
merged 1 commit into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions inngest-core/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ enum class OpCode {
Sleep,
StepStateFailed, // TODO
Step,
WaitForEvent,

// FUTURE:
WaitForEvent,
StepNotFound,
}

Expand Down Expand Up @@ -54,7 +54,7 @@ data class StepOptions(
override val name: String,
override val op: OpCode,
override val statusCode: ResultStatusCode,
val opts: HashMap<String, String>? = null,
val opts: Map<String, String>?,
) : StepOp(id, name, op, statusCode)

data class StepConfig(
Expand Down Expand Up @@ -128,6 +128,21 @@ open class InngestFunction(
statusCode = ResultStatusCode.StepComplete,
data = SendEventPayload(e.eventIds),
)
} catch (e: StepInterruptWaitForEventException) {
return StepOptions(
id = e.hashedId,
name = e.id,
op = OpCode.WaitForEvent,
statusCode = ResultStatusCode.StepComplete,
opts =
buildMap {
put("event", e.waitEvent)
put("timeout", e.timeout)
if (e.ifExpression != null) {
put("if", e.ifExpression)
}
},
)
} catch (e: StepInterruptSleepException) {
return StepOptions(
opts = hashMapOf("duration" to e.data),
Expand Down
42 changes: 42 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/Step.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ class StepInterruptSleepException(id: String, hashedId: String, override val dat
class StepInterruptSendEventException(id: String, hashedId: String, val eventIds: Array<String>) :
StepInterruptException(id, hashedId, eventIds)

class StepInterruptWaitForEventException(
id: String,
hashedId: String,
val waitEvent: String,
val timeout: String,
val ifExpression: String?,
) :
StepInterruptException(id, hashedId, null)

// TODO: Add name, stack, etc. if poss
class StepError(message: String) : Exception(message)

Expand Down Expand Up @@ -125,4 +134,37 @@ class Step(val state: State) {
throw StepInterruptSendEventException(id, hashedId, response!!.ids)
}
}

/**
* Waits for an event with the name provided in `waitEvent`, optionally check for a condition
* specified in `ifExpression`
*
* @param id Unique step id for memoization.
* @param waitEvent The name of the event we want the function to wait for
* @param timeout The amount of time to wait to receive an event. A time string compatible with https://www.npmjs.com/package/ms
* @param ifExpression An expression on which to conditionally match the original event trigger (`event`) and the wait event (`async`).
* Expressions are defined using the Common Expression Language (CEL) with the events accessible using dot-notation.
*
*/
fun waitForEvent(
id: String,
waitEvent: String,
timeout: String,
ifExpression: String?,
// TODO use better types for timeout and ifExpression that serialize to the relevant strings we send to the inngest server, instead of using raw strings
// TODO support `match` which is a convenience for checking the same expression in `event` and `async`. Also make it a mutually exclusive argument with
// ifExpression, possibly with a sealed class?
): Any? {
val hashedId = state.getHashFromId(id)

try {
val stepResult = state.getState<Any?>(hashedId)
if (stepResult != null) {
return stepResult
}
return null // TODO should this throw an exception? also look into `EventPayload` https://github.com/inngest/inngest-kt/pull/26#discussion_r150176713

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [ktlint] standard:max-line-length reported by reviewdog 🐶
Exceeded max line length (120)

} catch (e: StateNotFound) {
throw StepInterruptWaitForEventException(id, hashedId, waitEvent, timeout, ifExpression)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public static synchronized CommHandler getInstance() {
System.out.println("-> running step 1!! " + x);
return new Result(y + 10);
}, Result.class);

System.out.println("res" + res);

step.waitForEvent("wait-for-hello", "hello", "10m", "event.data.hello == async.data.hello");

int add = step.run("add-one-hundred", () -> {
System.out.println("-> running step 2 :) " + (res != null ? res.sum : ""));
return (res != null ? res.sum : 0) + 100;
Expand Down
4 changes: 3 additions & 1 deletion inngest-test-server/src/main/kotlin/com/inngest/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ val fn =
sum = y + 10,
)
}

println("res" + res)

step.waitForEvent("wait-for-hello", "hello", "10m", "event.data.hello == async.data.hello")

val add: Int =
step.run<Int>("add-one-hundred") {
println("-> running step 2 :) " + res?.sum)
Expand Down
Loading