From 427f84782e5d596eda25959ca3cc9d1b915567c4 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 13 Oct 2023 13:24:06 +0100 Subject: [PATCH] [Transform] Consider task cancelled exceptions as recoverable A task cancelled exception has REST status 400, which makes it irrecoverable as far as transforms is concerned. This means that a transform that suffers such an exception will fail without doing any retries. This is bad, because a search can fail with a task cancelled exception if one of its lower level phases suffers a circuit breaker exception. We want transforms to retry in the event of there temporarily not being sufficient memory for a search. --- .../transforms/TransformFailureHandler.java | 2 +- .../utils/ExceptionRootCauseFinder.java | 42 ++++++++++++------- .../utils/ExceptionRootCauseFinderTests.java | 20 +++++++++ 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java index cbe309f94aadb..ee00e8fc37b3c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java @@ -188,7 +188,7 @@ private void handleBulkIndexingException(BulkIndexingException bulkIndexingExcep * @param numFailureRetries the number of configured retries */ private void handleElasticsearchException(ElasticsearchException elasticsearchException, boolean unattended, int numFailureRetries) { - if (unattended == false && ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { + if (unattended == false && ExceptionRootCauseFinder.isExceptionIrrecoverable(elasticsearchException)) { String message = "task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage(); fail(message); } else { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java index 4a0d1680b658a..40144bd59b127 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java @@ -10,10 +10,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskCancelledException; -import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.Set; /** @@ -24,17 +23,15 @@ public final class ExceptionRootCauseFinder { /** * List of rest statuses that we consider irrecoverable */ - public static final Set IRRECOVERABLE_REST_STATUSES = new HashSet<>( - Arrays.asList( - RestStatus.GONE, - RestStatus.NOT_IMPLEMENTED, - RestStatus.NOT_FOUND, - RestStatus.BAD_REQUEST, - RestStatus.UNAUTHORIZED, - RestStatus.FORBIDDEN, - RestStatus.METHOD_NOT_ALLOWED, - RestStatus.NOT_ACCEPTABLE - ) + static final Set IRRECOVERABLE_REST_STATUSES = Set.of( + RestStatus.GONE, + RestStatus.NOT_IMPLEMENTED, + RestStatus.NOT_FOUND, + RestStatus.BAD_REQUEST, + RestStatus.UNAUTHORIZED, + RestStatus.FORBIDDEN, + RestStatus.METHOD_NOT_ALLOWED, + RestStatus.NOT_ACCEPTABLE ); /** @@ -65,7 +62,7 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti } if (unwrappedThrowable instanceof ElasticsearchException elasticsearchException) { - if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { + if (isExceptionIrrecoverable(elasticsearchException)) { return elasticsearchException; } } @@ -74,6 +71,21 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti return null; } - private ExceptionRootCauseFinder() {} + public static boolean isExceptionIrrecoverable(ElasticsearchException elasticsearchException) { + if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { + + // Even if the status indicates the exception is irrecoverable, some exceptions + // with these status are worth retrying on. + // A TaskCancelledException occurs if a sub-action of a search encounters a circuit + // breaker exception. In this case the overall search task is cancelled. + if (elasticsearchException instanceof TaskCancelledException) { + return false; + } + return true; + } + return false; + } + + private ExceptionRootCauseFinder() {} } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java index fa494d8ba9513..ca19e9157e9e0 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java @@ -11,11 +11,17 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentParsingException; +import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentLocation; @@ -149,6 +155,20 @@ public void testGetFirstIrrecoverableExceptionFromBulkResponses() { assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values())); } + public void testIsIrrecoverable() { + assertFalse(ExceptionRootCauseFinder.isExceptionIrrecoverable(new MapperException("mappings problem"))); + assertFalse(ExceptionRootCauseFinder.isExceptionIrrecoverable(new TaskCancelledException("cancelled task"))); + assertFalse( + ExceptionRootCauseFinder.isExceptionIrrecoverable( + new CircuitBreakingException("circuit broken", CircuitBreaker.Durability.TRANSIENT) + ) + ); + assertTrue(ExceptionRootCauseFinder.isExceptionIrrecoverable(new IndexClosedException(new Index("index", "1234")))); + assertTrue( + ExceptionRootCauseFinder.isExceptionIrrecoverable(new DocumentParsingException(new XContentLocation(1, 2), "parse error")) + ); + } + private static void assertFirstException(Collection bulkItemResponses, Class expectedClass, String message) { Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses); assertNotNull(t);