Skip to content

Commit

Permalink
Improve nexus operation failure handling (#2267)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 15, 2024
1 parent 9ca0b7d commit 1d2e250
Showing 1 changed file with 42 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@
import io.temporal.internal.worker.NexusTask;
import io.temporal.internal.worker.NexusTaskHandler;
import io.temporal.internal.worker.ShutdownManager;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.worker.TypeAlreadyRegisteredException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NexusTaskHandlerImpl implements NexusTaskHandler {
private static final Logger log = LoggerFactory.getLogger(NexusTaskHandlerImpl.class);
private final DataConverter dataConverter;
private final String namespace;
private final String taskQueue;
Expand Down Expand Up @@ -140,11 +144,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
return new Result(
HandlerError.newBuilder()
.setErrorType(OperationHandlerException.ErrorType.INTERNAL.toString())
.setFailure(
Failure.newBuilder()
.setMessage("internal error")
.setDetails(ByteString.copyFromUtf8(e.toString()))
.build())
.setFailure(Failure.newBuilder().setMessage(e.toString()).build())
.build());
} finally {
// If the task timed out, we should not send a response back to the server
Expand Down Expand Up @@ -173,22 +173,38 @@ private Failure createFailure(FailureInfo failInfo) {
return failure.build();
}

private void cancelOperation(OperationContext context, OperationCancelDetails details) {
try {
serviceHandler.cancelOperation(context, details);
} catch (Throwable e) {
Throwable failure = CheckedExceptionWrapper.unwrap(e);
log.warn(
"Nexus cancel operation failure. Service={}, Operation={}",
context.getService(),
context.getOperation(),
failure);
// Re-throw the original exception to handle it in the caller
throw e;
}
}

private CancelOperationResponse handleCancelledOperation(
OperationContext.Builder ctx, CancelOperationRequest task) {
ctx.setService(task.getService()).setOperation(task.getOperation());

OperationCancelDetails operationCancelDetails =
OperationCancelDetails.newBuilder().setOperationId(task.getOperationId()).build();
try {
serviceHandler.cancelOperation(ctx.build(), operationCancelDetails);
cancelOperation(ctx.build(), operationCancelDetails);
} catch (Throwable failure) {
convertKnownFailures(failure);
}

return CancelOperationResponse.newBuilder().build();
}

private void convertKnownFailures(Throwable failure) {
private void convertKnownFailures(Throwable e) {
Throwable failure = CheckedExceptionWrapper.unwrap(e);
if (failure instanceof ApplicationFailure) {
if (((ApplicationFailure) failure).isNonRetryable()) {
throw new OperationHandlerException(
Expand All @@ -207,6 +223,23 @@ private void convertKnownFailures(Throwable failure) {
throw new RuntimeException(failure);
}

private OperationStartResult<HandlerResultContent> startOperation(
OperationContext context, OperationStartDetails details, HandlerInputContent input)
throws OperationUnsuccessfulException {
try {
return serviceHandler.startOperation(context, details, input);
} catch (Throwable e) {
Throwable ex = CheckedExceptionWrapper.unwrap(e);
log.warn(
"Nexus start operation failure. Service={}, Operation={}",
context.getService(),
context.getOperation(),
ex);
// Re-throw the original exception to handle it in the caller
throw e;
}
}

private StartOperationResponse handleStartOperation(
OperationContext.Builder ctx, StartOperationRequest task)
throws InvalidProtocolBufferException {
Expand All @@ -224,7 +257,7 @@ private StartOperationResponse handleStartOperation(
StartOperationResponse.Builder startResponseBuilder = StartOperationResponse.newBuilder();
try {
OperationStartResult<HandlerResultContent> result =
serviceHandler.startOperation(ctx.build(), operationStartDetails.build(), input.build());
startOperation(ctx.build(), operationStartDetails.build(), input.build());
if (result.isSync()) {
startResponseBuilder.setSyncSuccess(
StartOperationResponse.Sync.newBuilder()
Expand All @@ -240,11 +273,7 @@ private StartOperationResponse handleStartOperation(
startResponseBuilder.setOperationError(
UnsuccessfulOperationError.newBuilder()
.setOperationState(e.getState().toString().toLowerCase())
.setFailure(
Failure.newBuilder()
.setMessage(e.getFailureInfo().getMessage())
.putAllMetadata(e.getFailureInfo().getMetadata())
.build())
.setFailure(createFailure(e.getFailureInfo()))
.build());
} catch (Throwable failure) {
convertKnownFailures(failure);
Expand Down

0 comments on commit 1d2e250

Please sign in to comment.