Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves the retry behavior of functions by returning retry headers #39

Merged
merged 6 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions inngest-core/src/main/kotlin/com/inngest/Comm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class CommHandler(
headers = headers,
)
} catch (e: Exception) {
val retryDecision = RetryDecision.fromException(e)
val statusCode = if (retryDecision.shouldRetry) ResultStatusCode.RetriableError else ResultStatusCode.NonRetriableError

val err =
CommError(
name = e.toString(),
Expand All @@ -95,15 +98,15 @@ class CommHandler(
)
return CommResponse(
body = Klaxon().toJsonString(err),
statusCode = ResultStatusCode.Error,
headers = headers,
statusCode = statusCode,
headers = headers.plus(retryDecision.headers),
)
}
}

private fun getFunctionConfigs(): List<FunctionConfig> {
val configs: MutableList<FunctionConfig> = mutableListOf()
functions.forEach { entry -> configs.add(entry.value.getConfig()) }
functions.forEach { entry -> configs.add(entry.value.getFunctionConfig()) }
return configs
}

Expand Down
7 changes: 4 additions & 3 deletions inngest-core/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ enum class OpCode {
enum class ResultStatusCode(val code: Int, val message: String) {
StepComplete(206, "Step Complete"),
FunctionComplete(200, "Function Complete"),
Error(500, "Function Error"),
NonRetriableError(400, "Bad Request"),
RetriableError(500, "Function Error"),
}

abstract class StepOp(
Expand Down Expand Up @@ -187,12 +188,12 @@ open class InngestFunction(
id = e.hashedId,
name = e.id,
op = OpCode.StepStateFailed,
statusCode = ResultStatusCode.Error,
statusCode = ResultStatusCode.RetriableError,
)
}
}

fun getConfig(): FunctionConfig {
fun getFunctionConfig(): FunctionConfig {
return FunctionConfig(
id = config.id,
name = config.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.inngest

open class NonRetriableError
@JvmOverloads
constructor(message: String, cause: Throwable? = null) : RuntimeException(message, cause)
21 changes: 21 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/RetryAfterError.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.inngest

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

open class RetryAfterError
@JvmOverloads
constructor(message: String, retryAfter: Any, cause: Throwable? = null) :
RuntimeException(message, cause) {
var retryAfter: String =
when (retryAfter) {
is ZonedDateTime -> retryAfter.format(DateTimeFormatter.ISO_INSTANT)

is Int -> (retryAfter / 1000).toString()

// TODO: Add ms parsing: https://github.com/vercel/ms
is String -> (retryAfter.toInt() / 1000).toString()

else -> throw IllegalArgumentException("Invalid retryAfter type: ${retryAfter::class.simpleName}")
}
}
21 changes: 21 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/RetryDecision.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.inngest

internal data class RetryDecision(val shouldRetry: Boolean, val headers: Map<String, String>) {
companion object {
internal fun fromException(exception: Exception): RetryDecision =
when (exception) {
is RetryAfterError ->
RetryDecision(
true,
mapOf(InngestHeaderKey.RetryAfter.value to exception.retryAfter, noRetryFalse),
)

is NonRetriableError -> RetryDecision(false, mapOf(InngestHeaderKey.NoRetry.value to "true"))

// Any other error should have the default retry behavior.
else -> RetryDecision(true, mapOf(noRetryFalse))
}
}
}

private val noRetryFalse = InngestHeaderKey.NoRetry.value to "false"
28 changes: 28 additions & 0 deletions inngest-core/src/test/kotlin/com/inngest/RetryAfterErrorTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.inngest

import java.time.ZonedDateTime
import kotlin.test.Test
import kotlin.test.assertEquals

internal class RetryAfterErrorTest {
@Test
fun `should return retryAfter in seconds when an integer is passed`() {
val retryAfter = 5000
val retryAfterError = RetryAfterError("Error", retryAfter)
assertEquals("5", retryAfterError.retryAfter)
}

@Test
fun `should return retryAfter in seconds when an string is passed`() {
val retryAfter = "5000"
val retryAfterError = RetryAfterError("Error", retryAfter)
assertEquals("5", retryAfterError.retryAfter)
}

@Test
fun `should return retryAfter as an ISO string when a ZonedDateTime is passed`() {
val retryAfter = ZonedDateTime.parse("2021-08-25T00:00:00Z")
val retryAfterError = RetryAfterError("Error", retryAfter)
assertEquals("2021-08-25T00:00:00Z", retryAfterError.retryAfter)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public ResponseEntity<String> handleRequest(
try {
CommResponse response = commHandler.callFunction(functionId, body);

return ResponseEntity.status(response.getStatusCode().getCode()).headers(getHeaders())
HttpHeaders headers = new HttpHeaders();
response.getHeaders().forEach(headers::add);

return ResponseEntity.status(response.getStatusCode().getCode()).headers(headers)
.body(response.getBody());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@ public class DemoTestConfiguration extends InngestConfiguration {
@Override
protected HashMap<String, InngestFunction> functions() {
HashMap<String, InngestFunction> functions = new HashMap<>();
functions.put("no-step-fn", InngestFunctionTestHelpers.emptyStepFunction());
functions.put("sleep-fn", InngestFunctionTestHelpers.sleepStepFunction());
functions.put("two-steps-fn", InngestFunctionTestHelpers.twoStepsFunction());
functions.put("wait-for-event-fn", InngestFunctionTestHelpers.waitForEventFunction());
functions.put("send-fn", InngestFunctionTestHelpers.sendEventFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.emptyStepFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.sleepStepFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.twoStepsFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.waitForEventFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.sendEventFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.nonRetriableErrorFunction());
addInngestFunction(functions, InngestFunctionTestHelpers.retriableErrorFunction());

return functions;
}

private static void addInngestFunction(
HashMap<String, InngestFunction> functions,
InngestFunction function) {
functions.put(function.getConfig().getId(), function);
}

@Override
protected Inngest inngestClient() {
return new Inngest("spring_test_demo");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.inngest.springbootdemo;

import com.inngest.CommHandler;
import com.inngest.Inngest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.LinkedHashMap;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class ErrorsInStepsIntegrationTest {
@BeforeAll
static void setup(@Autowired CommHandler handler) {
handler.register();
}

@Autowired
private DevServerComponent devServer;

static int sleepTime = 10000;

@Autowired
private Inngest client;

@Test
void testNonRetriableShouldFail() throws Exception {
String eventId = InngestFunctionTestHelpers.sendEvent(client, "test/non.retriable").first();

Thread.sleep(sleepTime);

RunEntry<Object> run = devServer.runsByEvent(eventId).first();
LinkedHashMap<String, String> output = (LinkedHashMap<String, String>) run.getOutput();

assertEquals(run.getStatus(), "Failed");
assertNotNull(run.getEnded_at());
assert output.get("name").contains("NonRetriableError");
assert output.get("stack").contains("lambda$nonRetriableErrorFunction");
assertEquals(output.get("message"), "something fatally went wrong");
}


@Test
void testRetriableShouldSucceedAfterFirstAttempt() throws Exception {
String eventId = InngestFunctionTestHelpers.sendEvent(client, "test/retriable").first();

Thread.sleep(5000);

RunEntry<Object> run1 = devServer.runsByEvent(eventId).first();

assertEquals(run1.getStatus(), "Running");

// The second attempt should succeed, so we wait for the second run to finish.
Thread.sleep(15000);

RunEntry<Object> run2 = devServer.runsByEvent(eventId).first();

assertEquals(run2.getStatus(), "Completed");
assertNotNull(run2.getEnded_at(), "Completed");
assertNotNull(run2.getOutput(), "Success");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,43 @@ static InngestFunction sendEventFunction() {
}


static InngestFunction nonRetriableErrorFunction() {
FunctionTrigger fnTrigger = new FunctionTrigger("test/non.retriable");
FunctionTrigger[] triggers = {fnTrigger};
FunctionOptions fnConfig = new FunctionOptions("non-retriable-fn", "NonRetriable Function", triggers);

BiFunction<FunctionContext, Step, String> handler = (ctx, step) -> {
step.run("fail-step", () -> {
throw new NonRetriableError("something fatally went wrong");
}, String.class);

return "Success";
};

return new InngestFunction(fnConfig, handler);
}

static int retryCount = 0;

static InngestFunction retriableErrorFunction() {
FunctionTrigger fnTrigger = new FunctionTrigger("test/retriable");
FunctionTrigger[] triggers = {fnTrigger};
FunctionOptions fnConfig = new FunctionOptions("retriable-fn", "Retriable Function", triggers);

BiFunction<FunctionContext, Step, String> handler = (ctx, step) -> {
retryCount++;
step.run("retriable-step", () -> {
if (retryCount < 2) {
throw new RetryAfterError("something went wrong", 10000);
}
return "Success";
}, String.class);

return "Success";
};

return new InngestFunction(fnConfig, handler);
}


}
Loading