From a6f1e581ea6bb88df5b0b13b951e9b8fea0f01d8 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 2 Oct 2024 15:36:40 -0700 Subject: [PATCH] Nexus: Well know error translation (#2242) Add conversion for common errors --- .../io/temporal/client/WorkflowException.java | 3 +- .../internal/nexus/NexusTaskHandlerImpl.java | 30 ++++- .../nexus/OperationFailureConversionTest.java | 127 ++++++++++++++++++ .../internal/testservice/StateMachines.java | 3 +- 4 files changed, 158 insertions(+), 5 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailureConversionTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowException.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowException.java index 7afd311c5..df089ce2e 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowException.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowException.java @@ -59,7 +59,6 @@ private static String getMessage(WorkflowExecution execution, String workflowTyp + execution.getWorkflowId() + "', runId='" + execution.getRunId() - + (workflowType == null ? "" : "', workflowType='" + workflowType + '\'') - + '}'; + + (workflowType == null ? "" : "', workflowType='" + workflowType + '\''); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 426cc7f68..c0b793d76 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -30,7 +30,9 @@ import io.temporal.api.common.v1.Payload; import io.temporal.api.nexus.v1.*; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowException; import io.temporal.common.converter.DataConverter; +import io.temporal.failure.ApplicationFailure; import io.temporal.internal.common.NexusUtil; import io.temporal.internal.worker.NexusTask; import io.temporal.internal.worker.NexusTaskHandler; @@ -177,12 +179,34 @@ private CancelOperationResponse handleCancelledOperation( OperationCancelDetails operationCancelDetails = OperationCancelDetails.newBuilder().setOperationId(task.getOperationId()).build(); - - serviceHandler.cancelOperation(ctx.build(), operationCancelDetails); + try { + serviceHandler.cancelOperation(ctx.build(), operationCancelDetails); + } catch (Throwable failure) { + convertKnownFailures(failure); + } return CancelOperationResponse.newBuilder().build(); } + private void convertKnownFailures(Throwable failure) { + if (failure instanceof ApplicationFailure) { + if (((ApplicationFailure) failure).isNonRetryable()) { + throw new OperationHandlerException( + OperationHandlerException.ErrorType.BAD_REQUEST, failure.getMessage()); + } + throw new OperationHandlerException( + OperationHandlerException.ErrorType.INTERNAL, failure.getMessage()); + } + if (failure instanceof WorkflowException) { + throw new OperationHandlerException( + OperationHandlerException.ErrorType.BAD_REQUEST, failure.getMessage()); + } + if (failure instanceof Error) { + throw (Error) failure; + } + throw new RuntimeException(failure); + } + private StartOperationResponse handleStartOperation( OperationContext.Builder ctx, StartOperationRequest task) throws InvalidProtocolBufferException { @@ -222,6 +246,8 @@ private StartOperationResponse handleStartOperation( .putAllMetadata(e.getFailureInfo().getMetadata()) .build()) .build()); + } catch (Throwable failure) { + convertKnownFailures(failure); } return startResponseBuilder.build(); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailureConversionTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailureConversionTest.java new file mode 100644 index 000000000..763ae43c9 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailureConversionTest.java @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowExecutionAlreadyStarted; +import io.temporal.client.WorkflowFailedException; +import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.NexusOperationFailure; +import io.temporal.failure.TimeoutFailure; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class OperationFailureConversionTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void nexusOperationApplicationFailureNonRetryableFailureConversion() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowFailedException exception = + Assert.assertThrows( + WorkflowFailedException.class, + () -> workflowStub.execute("ApplicationFailureNonRetryable")); + Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); + NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); + Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure); + } + + @Test + public void nexusOperationWorkflowExecutionAlreadyStartedFailureConversion() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowFailedException exception = + Assert.assertThrows( + WorkflowFailedException.class, + () -> workflowStub.execute("WorkflowExecutionAlreadyStarted")); + Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); + NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); + Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure); + } + + @Test + public void nexusOperationApplicationFailureFailureConversion() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowFailedException exception = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("ApplicationFailure")); + Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); + NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); + Assert.assertTrue(nexusFailure.getCause() instanceof TimeoutFailure); + } + + public static class TestNexus implements TestWorkflow1 { + @Override + public String execute(String testcase) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .build(); + + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + TestNexusServices.TestNexusService1 testNexusService = + Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); + testNexusService.operation(testcase); + return "fail"; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync( + (ctx, details, name) -> { + if (name.equals("ApplicationFailure")) { + throw ApplicationFailure.newFailure("failed to call operation", "TestFailure"); + } else if (name.equals("ApplicationFailureNonRetryable")) { + throw ApplicationFailure.newNonRetryableFailure( + "failed to call operation", "TestFailure"); + } else if (name.equals("WorkflowExecutionAlreadyStarted")) { + throw new WorkflowExecutionAlreadyStarted( + WorkflowExecution.newBuilder().setWorkflowId("id").setRunId("runId").build(), + "TestWorkflow", + new RuntimeException("already started")); + } + Assert.fail(); + return "fail"; + }); + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 92175b4c8..6a36716fc 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -2455,7 +2455,8 @@ static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy origina static RetryPolicy defaultNexusRetryPolicy() { return RetryPolicy.newBuilder() .addAllNonRetryableErrorTypes( - Arrays.asList("INVALID_ARGUMENT", "NOT_FOUND", "DEADLINE_EXCEEDED", "CANCELLED")) + Arrays.asList( + "BAD_REQUEST", "INVALID_ARGUMENT", "NOT_FOUND", "DEADLINE_EXCEEDED", "CANCELLED")) .setInitialInterval(Durations.fromSeconds(1)) .setMaximumInterval(Durations.fromHours(1)) .setBackoffCoefficient(2.0)