Skip to content

Commit

Permalink
Implement onFailure handler. (#63)
Browse files Browse the repository at this point in the history
* Implement an interface for functions that need to handle failures.

I took a more OOP approach for this feature which is slightly different
than the other SDKs but we can discuss this further in the PR review.

The `WithFailureHandler` interface has a single method `onFailure` that
is called when the function fails. An example of how to use this:

```kotlin
class MyFunction : InngestFunction(), WithFailureHandler {
    override fun execute(ctx: FunctionContext, step: Step): Any? {
        // Do something
    }

    override fun onFailure(ctx: FunctionContext, step: Step): Any? {
        // Handle failure
    }
}
```

* Add a function with an onFailure handler and a test for it

* Add a Kotlin function example that has a failure handler

* Make name in InngestFunctionConfigBuilder internal

* Run formatting

* Switch to an overridable `onFailure` method in `InngestFunction`

This removes the `WithFailureHandler` interface and instead provides an
overridable `onFailure` method in `InngestFunction` that can be used to
define a function to be called when the function fails.

It's used in the same way but without the need to implement an interface

```kotlin

class MyFunction : InngestFunction() {
    override fun execute(ctx: FunctionContext, step: Step): Any? {
        // Do something
    }

    override fun onFailure(ctx: FunctionContext, step: Step): Any? {
        // Handle failure
    }
}

```

* Address PR comments related to code clarity
  • Loading branch information
KiKoS0 authored Sep 11, 2024
1 parent 0d4e9c0 commit 31add6b
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ EventRunsResponse<Object> runsByEvent(String eventId) throws Exception {
});
}

EventsResponse listEvents() throws Exception {
Request request = new Request.Builder()
.url(String.format("%s/v1/events", baseUrl))
.build();
return makeRequest(request, new TypeReference<EventsResponse>() {
});
}

<T> RunResponse<T> runById(String eventId, Class<T> outputType) throws Exception {
Request request = new Request.Builder()
.url(String.format("%s/v1/runs/%S", baseUrl, eventId))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.inngest.springbootdemo;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
class EventsResponse {
EventEntry[] data;
}

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
class EventEntry {
String id;
String name;

String internal_id;

EventEntryData data;
}

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
class EventEntryData{
EventData event;
}

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
class EventData {
String name;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class WithOnFailureFunction extends InngestFunction {
@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("with-on-failure-function")
.name("With On Failure Function")
.triggerEvent("test/with-on-failure");
}

@Override
public String execute(FunctionContext ctx, Step step) {
step.run("fail-step", () -> {
throw new NonRetriableError("something fatally went wrong");
}, String.class);

return "Success";
}

@Nullable
@Override
public String onFailure(@NotNull FunctionContext ctx, @NotNull Step step) {
return "On Failure Success";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ protected HashMap<String, InngestFunction> functions() {
addInngestFunction(functions, new IdempotentFunction());
addInngestFunction(functions, new Scale2DObjectFunction());
addInngestFunction(functions, new MultiplyMatrixFunction());
addInngestFunction(functions, new WithOnFailureFunction());

return functions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.inngest.springbootdemo;

import com.inngest.CommHandler;
import com.inngest.Inngest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class WithOnFailureIntegrationTest {
@Autowired
private DevServerComponent devServer;

static int sleepTime = 5000;

@Autowired
private Inngest client;

@Test
void testWithOnFailureShouldCallOnFailure() throws Exception {
String eventName = "test/with-on-failure";
String eventId = InngestFunctionTestHelpers.sendEvent(client, eventName).getIds()[0];

Thread.sleep(sleepTime);

// Check that the original function failed
RunEntry<Object> run = devServer.runsByEvent(eventId).first();
LinkedHashMap<String, String> output = (LinkedHashMap<String, String>) run.getOutput();

assertEquals("Failed", run.getStatus());
assertNotNull(run.getEnded_at());
assert output.get("name").contains("NonRetriableError");

// Check that the onFailure function was called
Optional<EventEntry> event = Arrays.stream(devServer.listEvents().getData())
.filter(e -> "inngest/function.failed".equals(e.getName()) && eventName.equals(e.getData().getEvent().getName()))
.findFirst();

assert event.isPresent();

RunEntry<Object> onFailureRun = devServer.runsByEvent(event.get().getInternal_id()).first();

assertEquals("Completed", onFailureRun.getStatus());
assertEquals("On Failure Success", (String) onFailureRun.getOutput());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fun Application.module() {
TranscodeVideo(),
ImageFromPrompt(),
PushToSlackChannel(),
SlackFailureReport(),
WeeklyCleanup(),
),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.inngest.testserver

import com.inngest.*

class SlackFailureReport : InngestFunction() {
override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder =
builder
.id("always-fail-fn")
.name("Always Fail Function")
.triggerEvent("always-fail-fn")

override fun execute(
ctx: FunctionContext,
step: Step,
): String {
step.run("throw exception") {
throw RuntimeException("This function always fails")
"Step result"
}

return "Success"
}

override fun onFailure(
ctx: FunctionContext,
step: Step,
): String {
step.run("send slack message") {
"Sending a message to Slack"
}

return "onFailure Success"
}
}
13 changes: 12 additions & 1 deletion inngest/src/main/kotlin/com/inngest/Comm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,25 @@ data class CommError(

private val stepTerminalStatusCodes = setOf(ResultStatusCode.StepComplete, ResultStatusCode.StepError)

private fun generateFailureFunctions(
functions: Map<String, InngestFunction>,
client: Inngest,
): Map<String, InternalInngestFunction> =
functions
.mapNotNull { (_, fn) ->
fn.toFailureHandler(client.appId)?.let { it.id()!! to it }
}.toMap()

class CommHandler(
functions: Map<String, InngestFunction>,
val client: Inngest,
val config: ServeConfig,
private val framework: SupportedFrameworkName,
) {
val headers = Environment.inngestHeaders(framework).plus(client.headers)
private val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() }

private val failureFunctions = generateFailureFunctions(functions, client)
private val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() }.plus(failureFunctions)

fun callFunction(
functionId: String,
Expand Down
42 changes: 38 additions & 4 deletions inngest/src/main/kotlin/com/inngest/InngestFunction.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.inngest

const val FUNCTION_FAILED = "inngest/function.failed"

abstract class InngestFunction {
open fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = builder

Expand All @@ -19,15 +21,47 @@ abstract class InngestFunction {
return this.config(builder)
}

fun id(): String {
return buildConfig().id!!
}
fun id(): String = buildConfig().id!!

internal fun toInngestFunction(): InternalInngestFunction {
val builder = InngestFunctionConfigBuilder()
val configBuilder = this.config(builder)
return InternalInngestFunction(configBuilder, this::execute)
}

// TODO - Add toFailureHandler method to generate a second function if configured
internal fun toFailureHandler(appId: String): InternalInngestFunction? {
val onFailureMethod = this.javaClass.getMethod("onFailure", FunctionContext::class.java, Step::class.java)

// Only generate the failure handler if the onFailure method was overridden
if (onFailureMethod.declaringClass != InngestFunction::class.java) {
val fnConfig = buildConfig()
val fullyQualifiedId = "$appId-${fnConfig.id}"
val fnName = fnConfig.name ?: fnConfig.id

val configBuilder =
InngestFunctionConfigBuilder()
.id("${fnConfig.id}-failure")
.name("$fnName (failure)")
.triggerEventIf(FUNCTION_FAILED, "event.data.function_id == '$fullyQualifiedId'")

return InternalInngestFunction(configBuilder, this::onFailure)
}
return null
}

/**
* Provide a function to be called if your function fails, meaning
* that it ran out of retries and was unable to complete successfully.
*
* This is useful for sending warning notifications or cleaning up
* after a failure and supports all the same functionality as a
* regular handler.
*
* @param ctx The function context including event(s) that triggered the function
* @param step A class with methods to define steps within the function
*/
open fun onFailure(
ctx: FunctionContext,
step: Step,
): Any? = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.time.Duration
// TODO: Throw illegal argument exception
class InngestFunctionConfigBuilder {
var id: String? = null
private var name: String? = null
internal var name: String? = null
private var triggers: MutableList<InngestFunctionTrigger> = mutableListOf()
private var concurrency: MutableList<Concurrency>? = null
private var retries = 3
Expand Down Expand Up @@ -115,8 +115,14 @@ class InngestFunctionConfigBuilder {
scope: ConcurrencyScope? = null,
): InngestFunctionConfigBuilder {
when (scope) {
ConcurrencyScope.ENVIRONMENT -> if (key == null) throw InngestInvalidConfigurationException("Concurrency key required with environment scope")
ConcurrencyScope.ACCOUNT -> if (key == null) throw InngestInvalidConfigurationException("Concurrency key required with account scope")
ConcurrencyScope.ENVIRONMENT ->
if (key == null) {
throw InngestInvalidConfigurationException("Concurrency key required with environment scope")
}
ConcurrencyScope.ACCOUNT ->
if (key == null) {
throw InngestInvalidConfigurationException("Concurrency key required with account scope")
}
ConcurrencyScope.FUNCTION -> {}
null -> {}
}
Expand Down

0 comments on commit 31add6b

Please sign in to comment.