diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index c0b793d76..74245bb7c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -37,13 +37,17 @@ import io.temporal.internal.worker.NexusTask; import io.temporal.internal.worker.NexusTaskHandler; import io.temporal.internal.worker.ShutdownManager; +import io.temporal.serviceclient.CheckedExceptionWrapper; import io.temporal.worker.TypeAlreadyRegisteredException; import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NexusTaskHandlerImpl implements NexusTaskHandler { + private static final Logger log = LoggerFactory.getLogger(NexusTaskHandlerImpl.class); private final DataConverter dataConverter; private final String namespace; private final String taskQueue; @@ -140,11 +144,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException return new Result( HandlerError.newBuilder() .setErrorType(OperationHandlerException.ErrorType.INTERNAL.toString()) - .setFailure( - Failure.newBuilder() - .setMessage("internal error") - .setDetails(ByteString.copyFromUtf8(e.toString())) - .build()) + .setFailure(Failure.newBuilder().setMessage(e.toString()).build()) .build()); } finally { // If the task timed out, we should not send a response back to the server @@ -173,6 +173,21 @@ private Failure createFailure(FailureInfo failInfo) { return failure.build(); } + private void cancelOperation(OperationContext context, OperationCancelDetails details) { + try { + serviceHandler.cancelOperation(context, details); + } catch (Throwable e) { + Throwable failure = CheckedExceptionWrapper.unwrap(e); + log.warn( + "Nexus cancel operation failure. Service={}, Operation={}", + context.getService(), + context.getOperation(), + failure); + // Re-throw the original exception to handle it in the caller + throw e; + } + } + private CancelOperationResponse handleCancelledOperation( OperationContext.Builder ctx, CancelOperationRequest task) { ctx.setService(task.getService()).setOperation(task.getOperation()); @@ -180,7 +195,7 @@ private CancelOperationResponse handleCancelledOperation( OperationCancelDetails operationCancelDetails = OperationCancelDetails.newBuilder().setOperationId(task.getOperationId()).build(); try { - serviceHandler.cancelOperation(ctx.build(), operationCancelDetails); + cancelOperation(ctx.build(), operationCancelDetails); } catch (Throwable failure) { convertKnownFailures(failure); } @@ -188,7 +203,8 @@ private CancelOperationResponse handleCancelledOperation( return CancelOperationResponse.newBuilder().build(); } - private void convertKnownFailures(Throwable failure) { + private void convertKnownFailures(Throwable e) { + Throwable failure = CheckedExceptionWrapper.unwrap(e); if (failure instanceof ApplicationFailure) { if (((ApplicationFailure) failure).isNonRetryable()) { throw new OperationHandlerException( @@ -207,6 +223,23 @@ private void convertKnownFailures(Throwable failure) { throw new RuntimeException(failure); } + private OperationStartResult startOperation( + OperationContext context, OperationStartDetails details, HandlerInputContent input) + throws OperationUnsuccessfulException { + try { + return serviceHandler.startOperation(context, details, input); + } catch (Throwable e) { + Throwable ex = CheckedExceptionWrapper.unwrap(e); + log.warn( + "Nexus start operation failure. Service={}, Operation={}", + context.getService(), + context.getOperation(), + ex); + // Re-throw the original exception to handle it in the caller + throw e; + } + } + private StartOperationResponse handleStartOperation( OperationContext.Builder ctx, StartOperationRequest task) throws InvalidProtocolBufferException { @@ -224,7 +257,7 @@ private StartOperationResponse handleStartOperation( StartOperationResponse.Builder startResponseBuilder = StartOperationResponse.newBuilder(); try { OperationStartResult result = - serviceHandler.startOperation(ctx.build(), operationStartDetails.build(), input.build()); + startOperation(ctx.build(), operationStartDetails.build(), input.build()); if (result.isSync()) { startResponseBuilder.setSyncSuccess( StartOperationResponse.Sync.newBuilder() @@ -240,11 +273,7 @@ private StartOperationResponse handleStartOperation( startResponseBuilder.setOperationError( UnsuccessfulOperationError.newBuilder() .setOperationState(e.getState().toString().toLowerCase()) - .setFailure( - Failure.newBuilder() - .setMessage(e.getFailureInfo().getMessage()) - .putAllMetadata(e.getFailureInfo().getMetadata()) - .build()) + .setFailure(createFailure(e.getFailureInfo())) .build()); } catch (Throwable failure) { convertKnownFailures(failure);