Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement onFailure handler. #63

Merged
merged 7 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ EventRunsResponse<Object> runsByEvent(String eventId) throws Exception {
});
}

EventsResponse listEvents() throws Exception {
Request request = new Request.Builder()
.url(String.format("%s/v1/events", baseUrl))
.build();
return makeRequest(request, new TypeReference<EventsResponse>() {
});
}

<T> RunResponse<T> runById(String eventId, Class<T> outputType) throws Exception {
Request request = new Request.Builder()
.url(String.format("%s/v1/runs/%S", baseUrl, eventId))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.inngest.springbootdemo;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
class EventsResponse {
EventEntry[] data;
}

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
class EventEntry {
String id;
String name;

String internal_id;

EventEntryData data;
}

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
class EventEntryData{
EventData event;
}

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
class EventData {
String name;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class WithOnFailureFunction extends InngestFunction {
@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("with-on-failure-function")
.name("With On Failure Function")
.triggerEvent("test/with-on-failure");
}

@Override
public String execute(FunctionContext ctx, Step step) {
step.run("fail-step", () -> {
throw new NonRetriableError("something fatally went wrong");
}, String.class);

return "Success";
}

@Nullable
@Override
public String onFailure(@NotNull FunctionContext ctx, @NotNull Step step) {
return "On Failure Success";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ protected HashMap<String, InngestFunction> functions() {
addInngestFunction(functions, new IdempotentFunction());
addInngestFunction(functions, new Scale2DObjectFunction());
addInngestFunction(functions, new MultiplyMatrixFunction());
addInngestFunction(functions, new WithOnFailureFunction());

return functions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.inngest.springbootdemo;

import com.inngest.CommHandler;
import com.inngest.Inngest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class WithOnFailureIntegrationTest {
@Autowired
private DevServerComponent devServer;

static int sleepTime = 5000;

@Autowired
private Inngest client;

@Test
void testWithOnFailureShouldCallOnFailure() throws Exception {
String eventName = "test/with-on-failure";
String eventId = InngestFunctionTestHelpers.sendEvent(client, eventName).getIds()[0];

Thread.sleep(sleepTime);

// Check that the original function failed
RunEntry<Object> run = devServer.runsByEvent(eventId).first();
LinkedHashMap<String, String> output = (LinkedHashMap<String, String>) run.getOutput();

assertEquals("Failed", run.getStatus());
assertNotNull(run.getEnded_at());
assert output.get("name").contains("NonRetriableError");

// Check that the onFailure function was called
Optional<EventEntry> event = Arrays.stream(devServer.listEvents().getData())
.filter(e -> "inngest/function.failed".equals(e.getName()) && eventName.equals(e.getData().getEvent().getName()))
.findFirst();

assert event.isPresent();

RunEntry<Object> onFailureRun = devServer.runsByEvent(event.get().getInternal_id()).first();

assertEquals("Completed", onFailureRun.getStatus());
assertEquals("On Failure Success", (String) onFailureRun.getOutput());
Comment on lines +46 to +55
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fun Application.module() {
TranscodeVideo(),
ImageFromPrompt(),
PushToSlackChannel(),
SlackFailureReport(),
WeeklyCleanup(),
),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.inngest.testserver

import com.inngest.*

class SlackFailureReport : InngestFunction() {
override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder =
builder
.id("always-fail-fn")
.name("Always Fail Function")
.triggerEvent("always-fail-fn")

override fun execute(
ctx: FunctionContext,
step: Step,
): String {
step.run("throw exception") {
throw RuntimeException("This function always fails")
"Step result"
}

return "Success"
}

override fun onFailure(
ctx: FunctionContext,
step: Step,
): String {
step.run("send slack message") {
"Sending a message to Slack"
}

return "onFailure Success"
}
}
13 changes: 12 additions & 1 deletion inngest/src/main/kotlin/com/inngest/Comm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,25 @@ data class CommError(

private val stepTerminalStatusCodes = setOf(ResultStatusCode.StepComplete, ResultStatusCode.StepError)

private fun generateFailureFunctions(
functions: Map<String, InngestFunction>,
client: Inngest,
): Map<String, InternalInngestFunction> =
functions
.mapNotNull { (_, fn) ->
fn.toFailureHandler(client.appId)?.let { it.id()!! to it }
}.toMap()

class CommHandler(
functions: Map<String, InngestFunction>,
val client: Inngest,
val config: ServeConfig,
private val framework: SupportedFrameworkName,
) {
val headers = Environment.inngestHeaders(framework).plus(client.headers)
private val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() }

private val failureFunctions = generateFailureFunctions(functions, client)
private val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() }.plus(failureFunctions)

fun callFunction(
functionId: String,
Expand Down
42 changes: 38 additions & 4 deletions inngest/src/main/kotlin/com/inngest/InngestFunction.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.inngest

const val FUNCTION_FAILED = "inngest/function.failed"

abstract class InngestFunction {
open fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = builder

Expand All @@ -19,15 +21,47 @@ abstract class InngestFunction {
return this.config(builder)
}

fun id(): String {
return buildConfig().id!!
}
fun id(): String = buildConfig().id!!

internal fun toInngestFunction(): InternalInngestFunction {
val builder = InngestFunctionConfigBuilder()
val configBuilder = this.config(builder)
return InternalInngestFunction(configBuilder, this::execute)
}

// TODO - Add toFailureHandler method to generate a second function if configured
internal fun toFailureHandler(appId: String): InternalInngestFunction? {
val onFailureMethod = this.javaClass.getMethod("onFailure", FunctionContext::class.java, Step::class.java)

// Only generate the failure handler if the onFailure method was overridden
if (onFailureMethod.declaringClass != InngestFunction::class.java) {
val fnConfig = buildConfig()
val fullyQualifiedId = "$appId-${fnConfig.id}"
val fnName = fnConfig.name ?: fnConfig.id

val configBuilder =
InngestFunctionConfigBuilder()
.id("${fnConfig.id}-failure")
.name("$fnName (failure)")
.triggerEventIf(FUNCTION_FAILED, "event.data.function_id == '$fullyQualifiedId'")

return InternalInngestFunction(configBuilder, this::onFailure)
}
return null
}

/**
* Provide a function to be called if your function fails, meaning
* that it ran out of retries and was unable to complete successfully.
*
* This is useful for sending warning notifications or cleaning up
* after a failure and supports all the same functionality as a
* regular handler.
*
* @param ctx The function context including event(s) that triggered the function
* @param step A class with methods to define steps within the function
*/
open fun onFailure(
ctx: FunctionContext,
step: Step,
): Any? = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.time.Duration
// TODO: Throw illegal argument exception
class InngestFunctionConfigBuilder {
var id: String? = null
private var name: String? = null
internal var name: String? = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because you use it to build the function name for the failure function, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly yeah

private var triggers: MutableList<InngestFunctionTrigger> = mutableListOf()
private var concurrency: MutableList<Concurrency>? = null
private var retries = 3
Expand Down Expand Up @@ -115,8 +115,14 @@ class InngestFunctionConfigBuilder {
scope: ConcurrencyScope? = null,
): InngestFunctionConfigBuilder {
when (scope) {
ConcurrencyScope.ENVIRONMENT -> if (key == null) throw InngestInvalidConfigurationException("Concurrency key required with environment scope")
ConcurrencyScope.ACCOUNT -> if (key == null) throw InngestInvalidConfigurationException("Concurrency key required with account scope")
ConcurrencyScope.ENVIRONMENT ->
if (key == null) {
throw InngestInvalidConfigurationException("Concurrency key required with environment scope")
}
ConcurrencyScope.ACCOUNT ->
if (key == null) {
throw InngestInvalidConfigurationException("Concurrency key required with account scope")
}
ConcurrencyScope.FUNCTION -> {}
null -> {}
}
Expand Down