Skip to content

Commit

Permalink
Improves the retry behavior of functions (#39)
Browse files Browse the repository at this point in the history
* Rename clashing function with getConfig getter

* Add Retriable and NonRetriable exceptions

* Return retry headers with error responses

* Add tests for retriable/non-retriable function errors

* Rename enum BadRequest -> NonRetriableError

* Rename enum Error -> RetriableError
  • Loading branch information
KiKoS0 authored Feb 28, 2024
1 parent 741ca45 commit cf9e627
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 12 deletions.
9 changes: 6 additions & 3 deletions inngest-core/src/main/kotlin/com/inngest/Comm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class CommHandler(
headers = headers,
)
} catch (e: Exception) {
val retryDecision = RetryDecision.fromException(e)
val statusCode = if (retryDecision.shouldRetry) ResultStatusCode.RetriableError else ResultStatusCode.NonRetriableError

val err =
CommError(
name = e.toString(),
Expand All @@ -95,15 +98,15 @@ class CommHandler(
)
return CommResponse(
body = Klaxon().toJsonString(err),
statusCode = ResultStatusCode.Error,
headers = headers,
statusCode = statusCode,
headers = headers.plus(retryDecision.headers),
)
}
}

private fun getFunctionConfigs(): List<FunctionConfig> {
val configs: MutableList<FunctionConfig> = mutableListOf()
functions.forEach { entry -> configs.add(entry.value.getConfig()) }
functions.forEach { entry -> configs.add(entry.value.getFunctionConfig()) }
return configs
}

Expand Down
7 changes: 4 additions & 3 deletions inngest-core/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ enum class OpCode {
enum class ResultStatusCode(val code: Int, val message: String) {
StepComplete(206, "Step Complete"),
FunctionComplete(200, "Function Complete"),
Error(500, "Function Error"),
NonRetriableError(400, "Bad Request"),
RetriableError(500, "Function Error"),
}

abstract class StepOp(
Expand Down Expand Up @@ -187,12 +188,12 @@ open class InngestFunction(
id = e.hashedId,
name = e.id,
op = OpCode.StepStateFailed,
statusCode = ResultStatusCode.Error,
statusCode = ResultStatusCode.RetriableError,
)
}
}

fun getConfig(): FunctionConfig {
fun getFunctionConfig(): FunctionConfig {
return FunctionConfig(
id = config.id,
name = config.name,
Expand Down
5 changes: 5 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/NonRetriableError.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.inngest

open class NonRetriableError
@JvmOverloads
constructor(message: String, cause: Throwable? = null) : RuntimeException(message, cause)
21 changes: 21 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/RetryAfterError.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.inngest

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

open class RetryAfterError
@JvmOverloads
constructor(message: String, retryAfter: Any, cause: Throwable? = null) :
RuntimeException(message, cause) {
var retryAfter: String =
when (retryAfter) {
is ZonedDateTime -> retryAfter.format(DateTimeFormatter.ISO_INSTANT)

is Int -> (retryAfter / 1000).toString()

// TODO: Add ms parsing: https://github.com/vercel/ms
is String -> (retryAfter.toInt() / 1000).toString()

else -> throw IllegalArgumentException("Invalid retryAfter type: ${retryAfter::class.simpleName}")
}
}
21 changes: 21 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/RetryDecision.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.inngest

internal data class RetryDecision(val shouldRetry: Boolean, val headers: Map<String, String>) {
companion object {
internal fun fromException(exception: Exception): RetryDecision =
when (exception) {
is RetryAfterError ->
RetryDecision(
true,
mapOf(InngestHeaderKey.RetryAfter.value to exception.retryAfter, noRetryFalse),
)

is NonRetriableError -> RetryDecision(false, mapOf(InngestHeaderKey.NoRetry.value to "true"))

// Any other error should have the default retry behavior.
else -> RetryDecision(true, mapOf(noRetryFalse))
}
}
}

private val noRetryFalse = InngestHeaderKey.NoRetry.value to "false"
28 changes: 28 additions & 0 deletions inngest-core/src/test/kotlin/com/inngest/RetryAfterErrorTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.inngest

import java.time.ZonedDateTime
import kotlin.test.Test
import kotlin.test.assertEquals

internal class RetryAfterErrorTest {
@Test
fun `should return retryAfter in seconds when an integer is passed`() {
val retryAfter = 5000
val retryAfterError = RetryAfterError("Error", retryAfter)
assertEquals("5", retryAfterError.retryAfter)
}

@Test
fun `should return retryAfter in seconds when an string is passed`() {
val retryAfter = "5000"
val retryAfterError = RetryAfterError("Error", retryAfter)
assertEquals("5", retryAfterError.retryAfter)
}

@Test
fun `should return retryAfter as an ISO string when a ZonedDateTime is passed`() {
val retryAfter = ZonedDateTime.parse("2021-08-25T00:00:00Z")
val retryAfterError = RetryAfterError("Error", retryAfter)
assertEquals("2021-08-25T00:00:00Z", retryAfterError.retryAfter)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public ResponseEntity<String> handleRequest(
try {
CommResponse response = commHandler.callFunction(functionId, body);

return ResponseEntity.status(response.getStatusCode().getCode()).headers(getHeaders())
HttpHeaders headers = new HttpHeaders();
response.getHeaders().forEach(headers::add);

return ResponseEntity.status(response.getStatusCode().getCode()).headers(headers)
.body(response.getBody());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@ public class DemoTestConfiguration extends InngestConfiguration {
@Override
protected HashMap<String, InngestFunction> functions() {
HashMap<String, InngestFunction> functions = new HashMap<>();
functions.put("no-step-fn", InngestFunctionTestHelpers.emptyStepFunction());
functions.put("sleep-fn", InngestFunctionTestHelpers.sleepStepFunction());
functions.put("two-steps-fn", InngestFunctionTestHelpers.twoStepsFunction());
functions.put("wait-for-event-fn", InngestFunctionTestHelpers.waitForEventFunction());
functions.put("send-fn", InngestFunctionTestHelpers.sendEventFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.emptyStepFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.sleepStepFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.twoStepsFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.waitForEventFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.sendEventFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.nonRetriableErrorFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.retriableErrorFunction());

return functions;
}

private static void addInngestFunction(
HashMap<String, InngestFunction> functions,
InngestFunction function) {
functions.put(function.getConfig().getId(), function);
}

@Override
protected Inngest inngestClient() {
return new Inngest("spring_test_demo");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.inngest.springbootdemo;

import com.inngest.CommHandler;
import com.inngest.Inngest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.LinkedHashMap;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class ErrorsInStepsIntegrationTest {
@BeforeAll
static void setup(@Autowired CommHandler handler) {
handler.register();
}

@Autowired
private DevServerComponent devServer;

static int sleepTime = 10000;

@Autowired
private Inngest client;

@Test
void testNonRetriableShouldFail() throws Exception {
String eventId = InngestFunctionTestHelpers.sendEvent(client, "test/non.retriable").first();

Thread.sleep(sleepTime);

RunEntry<Object> run = devServer.runsByEvent(eventId).first();
LinkedHashMap<String, String> output = (LinkedHashMap<String, String>) run.getOutput();

assertEquals(run.getStatus(), "Failed");
assertNotNull(run.getEnded_at());
assert output.get("name").contains("NonRetriableError");
assert output.get("stack").contains("lambda$nonRetriableErrorFunction");
assertEquals(output.get("message"), "something fatally went wrong");
}


@Test
void testRetriableShouldSucceedAfterFirstAttempt() throws Exception {
String eventId = InngestFunctionTestHelpers.sendEvent(client, "test/retriable").first();

Thread.sleep(5000);

RunEntry<Object> run1 = devServer.runsByEvent(eventId).first();

assertEquals(run1.getStatus(), "Running");

// The second attempt should succeed, so we wait for the second run to finish.
Thread.sleep(15000);

RunEntry<Object> run2 = devServer.runsByEvent(eventId).first();

assertEquals(run2.getStatus(), "Completed");
assertNotNull(run2.getEnded_at(), "Completed");
assertNotNull(run2.getOutput(), "Success");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,43 @@ static InngestFunction sendEventFunction() {
}


static InngestFunction nonRetriableErrorFunction() {
FunctionTrigger fnTrigger = new FunctionTrigger("test/non.retriable");
FunctionTrigger[] triggers = {fnTrigger};
FunctionOptions fnConfig = new FunctionOptions("non-retriable-fn", "NonRetriable Function", triggers);

BiFunction<FunctionContext, Step, String> handler = (ctx, step) -> {
step.run("fail-step", () -> {
throw new NonRetriableError("something fatally went wrong");
}, String.class);

return "Success";
};

return new InngestFunction(fnConfig, handler);
}

static int retryCount = 0;

static InngestFunction retriableErrorFunction() {
FunctionTrigger fnTrigger = new FunctionTrigger("test/retriable");
FunctionTrigger[] triggers = {fnTrigger};
FunctionOptions fnConfig = new FunctionOptions("retriable-fn", "Retriable Function", triggers);

BiFunction<FunctionContext, Step, String> handler = (ctx, step) -> {
retryCount++;
step.run("retriable-step", () -> {
if (retryCount < 2) {
throw new RetryAfterError("something went wrong", 10000);
}
return "Success";
}, String.class);

return "Success";
};

return new InngestFunction(fnConfig, handler);
}


}

0 comments on commit cf9e627

Please sign in to comment.