From a8faebbda8a3810ff2899eb8f5aec0bbc270586e Mon Sep 17 00:00:00 2001 From: Dan Rubinstein Date: Thu, 12 Dec 2024 10:38:03 -0500 Subject: [PATCH] Retry on ClusterBlockException on transform destination index (#118194) * Retry on ClusterBlockException on transform destination index * Update docs/changelog/118194.yaml * Cleaning up tests * Fixing tests --------- Co-authored-by: Elastic Machine --- docs/changelog/118194.yaml | 5 + .../transforms/TransformFailureHandler.java | 45 +++- .../TransformFailureHandlerTests.java | 231 +++++++++++++----- 3 files changed, 220 insertions(+), 61 deletions(-) create mode 100644 docs/changelog/118194.yaml diff --git a/docs/changelog/118194.yaml b/docs/changelog/118194.yaml new file mode 100644 index 0000000000000..0e5eca55d597c --- /dev/null +++ b/docs/changelog/118194.yaml @@ -0,0 +1,5 @@ +pr: 118194 +summary: Retry on `ClusterBlockException` on transform destination index +area: Machine Learning +type: enhancement +issues: [] diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java index 337d3c5820c07..24586e5f36337 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java @@ -169,7 +169,14 @@ private void handleScriptException(ScriptException scriptException, boolean unat * @param numFailureRetries the number of configured retries */ private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) { - if (unattended == false && bulkIndexingException.isIrrecoverable()) { + if (bulkIndexingException.getCause() instanceof ClusterBlockException) { + retryWithoutIncrementingFailureCount( + bulkIndexingException, + bulkIndexingException.getDetailedMessage(), + unattended, + numFailureRetries + ); + } else if (unattended == false && bulkIndexingException.isIrrecoverable()) { String message = TransformMessages.getMessage( TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR, bulkIndexingException.getDetailedMessage() @@ -232,12 +239,46 @@ private void retry(Throwable unwrappedException, String message, boolean unatten && unwrappedException.getClass().equals(context.getLastFailure().getClass()); final int failureCount = context.incrementAndGetFailureCount(unwrappedException); - if (unattended == false && numFailureRetries != -1 && failureCount > numFailureRetries) { fail(unwrappedException, "task encountered more than " + numFailureRetries + " failures; latest failure: " + message); return; } + logRetry(unwrappedException, message, unattended, numFailureRetries, failureCount, repeatedFailure); + } + + /** + * Terminate failure handling without incrementing the retries used + *

+ * This is used when there is an ongoing recoverable issue and we want to retain + * retries for any issues that may occur after the issue is resolved + * + * @param unwrappedException The exception caught + * @param message error message to log/audit + * @param unattended whether the transform runs in unattended mode + * @param numFailureRetries the number of configured retries + */ + private void retryWithoutIncrementingFailureCount( + Throwable unwrappedException, + String message, + boolean unattended, + int numFailureRetries + ) { + // group failures to decide whether to report it below + final boolean repeatedFailure = context.getLastFailure() != null + && unwrappedException.getClass().equals(context.getLastFailure().getClass()); + + logRetry(unwrappedException, message, unattended, numFailureRetries, context.getFailureCount(), repeatedFailure); + } + + private void logRetry( + Throwable unwrappedException, + String message, + boolean unattended, + int numFailureRetries, + int failureCount, + boolean repeatedFailure + ) { // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one // and if the number of retries is about to exceed diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java index 84c8d4e140408..3894ff3043ccd 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; +import java.util.List; import java.util.Map; import java.util.Set; @@ -63,9 +64,121 @@ public int getFailureCountChangedCounter() { } } - public void testUnattended() { + public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeLessThanMinimumPageSize() { + var e = new CircuitBreakingException(randomAlphaOfLength(10), 1, 0, randomFrom(CircuitBreaker.Durability.values())); + assertRetryIfUnattendedOtherwiseFail(e); + } + + public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeNotLessThanMinimumPageSize() { + var e = new CircuitBreakingException(randomAlphaOfLength(10), 1, 1, randomFrom(CircuitBreaker.Durability.values())); + + List.of(true, false).forEach((unattended) -> { assertNoFailureAndContextPageSizeSet(e, unattended, 365); }); + } + + public void testHandleIndexerFailure_ScriptException() { + var e = new ScriptException( + randomAlphaOfLength(10), + new ArithmeticException(randomAlphaOfLength(10)), + singletonList(randomAlphaOfLength(10)), + randomAlphaOfLength(10), + randomAlphaOfLength(10) + ); + assertRetryIfUnattendedOtherwiseFail(e); + } + + public void testHandleIndexerFailure_BulkIndexExceptionWrappingClusterBlockException() { + final BulkIndexingException bulkIndexingException = new BulkIndexingException( + randomAlphaOfLength(10), + new ClusterBlockException(Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))), + randomBoolean() + ); + + List.of(true, false).forEach((unattended) -> { assertRetryFailureCountNotIncremented(bulkIndexingException, unattended); }); + } + + public void testHandleIndexerFailure_IrrecoverableBulkIndexException() { + final BulkIndexingException e = new BulkIndexingException( + randomAlphaOfLength(10), + new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR), + true + ); + assertRetryIfUnattendedOtherwiseFail(e); + } + + public void testHandleIndexerFailure_RecoverableBulkIndexException() { + final BulkIndexingException bulkIndexingException = new BulkIndexingException( + randomAlphaOfLength(10), + new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR), + false + ); + + List.of(true, false).forEach((unattended) -> { assertRetry(bulkIndexingException, unattended); }); + } + + public void testHandleIndexerFailure_ClusterBlockException() { + List.of(true, false).forEach((unattended) -> { + assertRetry( + new ClusterBlockException(Map.of(randomAlphaOfLength(10), Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))), + unattended + ); + }); + } + + public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithNoShardSearchFailures() { + List.of(true, false).forEach((unattended) -> { + assertRetry( + new SearchPhaseExecutionException(randomAlphaOfLength(10), randomAlphaOfLength(10), ShardSearchFailure.EMPTY_ARRAY), + unattended + ); + }); + } + + public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithShardSearchFailures() { + List.of(true, false).forEach((unattended) -> { + assertRetry( + new SearchPhaseExecutionException( + randomAlphaOfLength(10), + randomAlphaOfLength(10), + new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) } + ), + unattended + ); + }); + } + + public void testHandleIndexerFailure_RecoverableElasticsearchException() { + List.of(true, false).forEach((unattended) -> { + assertRetry(new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR), unattended); + }); + } + + public void testHandleIndexerFailure_IrrecoverableElasticsearchException() { + var e = new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.NOT_FOUND); + assertRetryIfUnattendedOtherwiseFail(e); + } + + public void testHandleIndexerFailure_IllegalArgumentException() { + var e = new IllegalArgumentException(randomAlphaOfLength(10)); + assertRetryIfUnattendedOtherwiseFail(e); + } + + public void testHandleIndexerFailure_UnexpectedException() { + List.of(true, false).forEach((unattended) -> { assertRetry(new Exception(), unattended); }); + } + + private void assertRetryIfUnattendedOtherwiseFail(Exception e) { + List.of(true, false).forEach((unattended) -> { + if (unattended) { + assertRetry(e, unattended); + } else { + assertFailure(e); + } + }); + } + + private void assertRetry(Exception e, boolean unattended) { String transformId = randomAlphaOfLength(10); - SettingsConfig settings = new SettingsConfig.Builder().setUnattended(true).build(); + SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build(); MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); MockTransformContextListener contextListener = new MockTransformContextListener(); @@ -74,51 +187,33 @@ public void testUnattended() { TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId); - handler.handleIndexerFailure( - new SearchPhaseExecutionException( - "query", - "Partial shards failure", - new ShardSearchFailure[] { - new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, CircuitBreaker.Durability.TRANSIENT)) } - ), - settings - ); + assertNoFailure(handler, e, contextListener, settings, true); + assertNoFailure(handler, e, contextListener, settings, true); + if (unattended) { + assertNoFailure(handler, e, contextListener, settings, true); + } else { + // fail after max retry attempts reached + assertFailure(handler, e, contextListener, settings, true); + } + } - // CBE isn't a failure, but it only affects page size(which we don't test here) - assertFalse(contextListener.getFailed()); - assertEquals(0, contextListener.getFailureCountChangedCounter()); + private void assertRetryFailureCountNotIncremented(Exception e, boolean unattended) { + String transformId = randomAlphaOfLength(10); + SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build(); - assertNoFailure( - handler, - new SearchPhaseExecutionException( - "query", - "Partial shards failure", - new ShardSearchFailure[] { - new ShardSearchFailure( - new ScriptException( - "runtime error", - new ArithmeticException("/ by zero"), - singletonList("stack"), - "test", - "painless" - ) - ) } - ), - contextListener, - settings - ); - assertNoFailure( - handler, - new ElasticsearchStatusException("something really bad happened", RestStatus.INTERNAL_SERVER_ERROR), - contextListener, - settings - ); - assertNoFailure(handler, new IllegalArgumentException("expected apples not oranges"), contextListener, settings); - assertNoFailure(handler, new RuntimeException("the s*** hit the fan"), contextListener, settings); - assertNoFailure(handler, new NullPointerException("NPE"), contextListener, settings); + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + MockTransformContextListener contextListener = new MockTransformContextListener(); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + context.setPageSize(500); + + TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId); + + assertNoFailure(handler, e, contextListener, settings, false); + assertNoFailure(handler, e, contextListener, settings, false); + assertNoFailure(handler, e, contextListener, settings, false); } - public void testClusterBlock() { + private void assertFailure(Exception e) { String transformId = randomAlphaOfLength(10); SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).build(); @@ -129,32 +224,50 @@ public void testClusterBlock() { TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId); - final ClusterBlockException clusterBlock = new ClusterBlockException( - Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK)) - ); + assertFailure(handler, e, contextListener, settings, false); + } - handler.handleIndexerFailure(clusterBlock, settings); - assertFalse(contextListener.getFailed()); - assertEquals(1, contextListener.getFailureCountChangedCounter()); + private void assertNoFailure( + TransformFailureHandler handler, + Exception e, + MockTransformContextListener mockTransformContextListener, + SettingsConfig settings, + boolean failureCountIncremented + ) { + handler.handleIndexerFailure(e, settings); + assertFalse(mockTransformContextListener.getFailed()); + assertEquals(failureCountIncremented ? 1 : 0, mockTransformContextListener.getFailureCountChangedCounter()); + mockTransformContextListener.reset(); + } - handler.handleIndexerFailure(clusterBlock, settings); - assertFalse(contextListener.getFailed()); - assertEquals(2, contextListener.getFailureCountChangedCounter()); + private void assertNoFailureAndContextPageSizeSet(Exception e, boolean unattended, int newPageSize) { + String transformId = randomAlphaOfLength(10); + SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build(); - handler.handleIndexerFailure(clusterBlock, settings); - assertTrue(contextListener.getFailed()); - assertEquals(3, contextListener.getFailureCountChangedCounter()); + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + MockTransformContextListener contextListener = new MockTransformContextListener(); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + context.setPageSize(500); + + TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId); + + handler.handleIndexerFailure(e, settings); + assertFalse(contextListener.getFailed()); + assertEquals(0, contextListener.getFailureCountChangedCounter()); + assertEquals(newPageSize, context.getPageSize()); + contextListener.reset(); } - private void assertNoFailure( + private void assertFailure( TransformFailureHandler handler, Exception e, MockTransformContextListener mockTransformContextListener, - SettingsConfig settings + SettingsConfig settings, + boolean failureCountChanged ) { handler.handleIndexerFailure(e, settings); - assertFalse(mockTransformContextListener.getFailed()); - assertEquals(1, mockTransformContextListener.getFailureCountChangedCounter()); + assertTrue(mockTransformContextListener.getFailed()); + assertEquals(failureCountChanged ? 1 : 0, mockTransformContextListener.getFailureCountChangedCounter()); mockTransformContextListener.reset(); }