Skip to content

Commit

Permalink
Nexus: Well know error translation (#2242)
Browse files Browse the repository at this point in the history
Add conversion for common errors
  • Loading branch information
Quinn-With-Two-Ns committed Oct 3, 2024
1 parent fe27eff commit aabc7b8
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ private static String getMessage(WorkflowExecution execution, String workflowTyp
+ execution.getWorkflowId()
+ "', runId='"
+ execution.getRunId()
+ (workflowType == null ? "" : "', workflowType='" + workflowType + '\'')
+ '}';
+ (workflowType == null ? "" : "', workflowType='" + workflowType + '\'');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import io.temporal.api.common.v1.Payload;
import io.temporal.api.nexus.v1.*;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowException;
import io.temporal.common.converter.DataConverter;
import io.temporal.failure.ApplicationFailure;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.worker.NexusTask;
import io.temporal.internal.worker.NexusTaskHandler;
Expand Down Expand Up @@ -177,12 +179,34 @@ private CancelOperationResponse handleCancelledOperation(

OperationCancelDetails operationCancelDetails =
OperationCancelDetails.newBuilder().setOperationId(task.getOperationId()).build();

serviceHandler.cancelOperation(ctx.build(), operationCancelDetails);
try {
serviceHandler.cancelOperation(ctx.build(), operationCancelDetails);
} catch (Throwable failure) {
convertKnownFailures(failure);
}

return CancelOperationResponse.newBuilder().build();
}

private void convertKnownFailures(Throwable failure) {
if (failure instanceof ApplicationFailure) {
if (((ApplicationFailure) failure).isNonRetryable()) {
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST, failure.getMessage());
}
throw new OperationHandlerException(
OperationHandlerException.ErrorType.INTERNAL, failure.getMessage());
}
if (failure instanceof WorkflowException) {
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST, failure.getMessage());
}
if (failure instanceof Error) {
throw (Error) failure;
}
throw new RuntimeException(failure);
}

private StartOperationResponse handleStartOperation(
OperationContext.Builder ctx, StartOperationRequest task)
throws InvalidProtocolBufferException {
Expand Down Expand Up @@ -222,6 +246,8 @@ private StartOperationResponse handleStartOperation(
.putAllMetadata(e.getFailureInfo().getMetadata())
.build())
.build());
} catch (Throwable failure) {
convertKnownFailures(failure);
}
return startResponseBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.nexus;

import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowExecutionAlreadyStarted;
import io.temporal.client.WorkflowFailedException;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.NexusOperationFailure;
import io.temporal.failure.TimeoutFailure;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestNexusServices;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class OperationFailureConversionTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Test
public void nexusOperationApplicationFailureNonRetryableFailureConversion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
WorkflowFailedException exception =
Assert.assertThrows(
WorkflowFailedException.class,
() -> workflowStub.execute("ApplicationFailureNonRetryable"));
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure);
}

@Test
public void nexusOperationWorkflowExecutionAlreadyStartedFailureConversion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
WorkflowFailedException exception =
Assert.assertThrows(
WorkflowFailedException.class,
() -> workflowStub.execute("WorkflowExecutionAlreadyStarted"));
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure);
}

@Test
public void nexusOperationApplicationFailureFailureConversion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
WorkflowFailedException exception =
Assert.assertThrows(
WorkflowFailedException.class, () -> workflowStub.execute("ApplicationFailure"));
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
Assert.assertTrue(nexusFailure.getCause() instanceof TimeoutFailure);
}

public static class TestNexus implements TestWorkflow1 {
@Override
public String execute(String testcase) {
NexusOperationOptions options =
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
.build();

NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
TestNexusServices.TestNexusService1 testNexusService =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions);
testNexusService.operation(testcase);
return "fail";
}
}

@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
public class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<String, String> operation() {
return OperationHandler.sync(
(ctx, details, name) -> {
if (name.equals("ApplicationFailure")) {
throw ApplicationFailure.newFailure("failed to call operation", "TestFailure");
} else if (name.equals("ApplicationFailureNonRetryable")) {
throw ApplicationFailure.newNonRetryableFailure(
"failed to call operation", "TestFailure");
} else if (name.equals("WorkflowExecutionAlreadyStarted")) {
throw new WorkflowExecutionAlreadyStarted(
WorkflowExecution.newBuilder().setWorkflowId("id").setRunId("runId").build(),
"TestWorkflow",
new RuntimeException("already started"));
}
Assert.fail();
return "fail";
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2419,7 +2419,8 @@ static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy origina
static RetryPolicy defaultNexusRetryPolicy() {
return RetryPolicy.newBuilder()
.addAllNonRetryableErrorTypes(
Arrays.asList("INVALID_ARGUMENT", "NOT_FOUND", "DEADLINE_EXCEEDED", "CANCELLED"))
Arrays.asList(
"BAD_REQUEST", "INVALID_ARGUMENT", "NOT_FOUND", "DEADLINE_EXCEEDED", "CANCELLED"))
.setInitialInterval(Durations.fromSeconds(1))
.setMaximumInterval(Durations.fromHours(1))
.setBackoffCoefficient(2.0)
Expand Down

0 comments on commit aabc7b8

Please sign in to comment.