From 0b71746d96acde1397a11a99ea466f1078124eb3 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 9 May 2024 14:19:40 -0400 Subject: [PATCH] [Transform] Retry Destination IndexNotFoundException (#108394) A Destination Index can be removed from its previous shard in the middle of a Transform run. Ideally, this happens as part of the Delete API, and the Transform has already been stopped, but in the case that it isn't, we want to retry the checkpoint. If the Transform had been stopped, the retry will move the Indexer into a graceful shutdown. If the Transform had not been stopped, the retry will check if the Index exists or recreate the Index if it does not exist. This is currently how unattended Transforms work, and this change will make it so regular Transforms can also auto-recover from this error. Fix #107263 --- docs/changelog/108394.yaml | 6 + .../transforms/ClientTransformIndexer.java | 6 +- .../transforms/TransformContext.java | 9 + .../transforms/TransformIndexer.java | 33 ++- .../utils/ExceptionRootCauseFinder.java | 12 +- .../TransformIndexerFailureHandlingTests.java | 204 +++++++++++++++++- .../utils/ExceptionRootCauseFinderTests.java | 129 +++-------- 7 files changed, 278 insertions(+), 121 deletions(-) create mode 100644 docs/changelog/108394.yaml diff --git a/docs/changelog/108394.yaml b/docs/changelog/108394.yaml new file mode 100644 index 0000000000000..58f48fa548c6e --- /dev/null +++ b/docs/changelog/108394.yaml @@ -0,0 +1,6 @@ +pr: 108394 +summary: Handle `IndexNotFoundException` +area: Transform +type: bug +issues: + - 107263 diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index ed0f721f5f7f0..df8c3f62034e5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -193,7 +193,11 @@ protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener listener) { }, listener::onFailure); var deducedDestIndexMappings = new SetOnce>(); - var shouldMaybeCreateDestIndexForUnattended = context.getCheckpoint() == 0 - && TransformEffectiveSettings.isUnattended(transformConfig.getSettings()); + + // if the unattended transform had not created the destination index yet, or if the destination index was deleted for any + // type of transform during the last run, then we try to create the destination index. + // This is important to create the destination index explicitly before indexing documents. Otherwise, the destination + // index aliases may be missing. + var shouldMaybeCreateDestIndex = isFirstUnattendedRun() || context.shouldRecreateDestinationIndex(); ActionListener> fieldMappingsListener = ActionListener.wrap(destIndexMappings -> { if (destIndexMappings.isEmpty() == false) { @@ -359,11 +363,12 @@ protected void onStart(long now, ActionListener listener) { // ... otherwise we fall back to index mappings deduced based on source indices this.fieldMappings = deducedDestIndexMappings.get(); } - // Since the unattended transform could not have created the destination index yet, we do it here. - // This is important to create the destination index explicitly before indexing first documents. Otherwise, the destination - // index aliases may be missing. - if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndexForUnattended) { - doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener); + + if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndex) { + doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener.delegateFailure((delegate, response) -> { + context.setShouldRecreateDestinationIndex(false); + delegate.onResponse(response); + })); } else { configurationReadyListener.onResponse(null); } @@ -380,7 +385,7 @@ protected void onStart(long now, ActionListener listener) { deducedDestIndexMappings.set(validationResponse.getDestIndexMappings()); if (isContinuous()) { transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(config -> { - if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndexForUnattended == false) { + if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndex == false) { logger.trace("[{}] transform config has not changed.", getJobId()); configurationReadyListener.onResponse(null); } else { @@ -415,7 +420,7 @@ protected void onStart(long now, ActionListener listener) { }, listener::onFailure); Instant instantOfTrigger = Instant.ofEpochMilli(now); - // If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on, + // If we are not on the initial batch checkpoint and it's the first pass of whatever continuous checkpoint we are on, // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit. if (context.getCheckpoint() > 0 && initialRun()) { checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> { @@ -436,8 +441,7 @@ protected void onStart(long now, ActionListener listener) { hasSourceChanged = true; listener.onFailure(failure); })); - } else if (context.getCheckpoint() == 0 && TransformEffectiveSettings.isUnattended(transformConfig.getSettings())) { - // this transform runs in unattended mode and has never run, to go on + } else if (shouldMaybeCreateDestIndex) { validate(changedSourceListener); } else { hasSourceChanged = true; @@ -447,6 +451,13 @@ protected void onStart(long now, ActionListener listener) { } } + /** + * Returns true if this transform runs in unattended mode and has never run. + */ + private boolean isFirstUnattendedRun() { + return context.getCheckpoint() == 0 && TransformEffectiveSettings.isUnattended(transformConfig.getSettings()); + } + protected void initializeFunction() { // create the function function = FunctionFactory.create(getConfig()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java index 8618b01a0440b..8bf859a020ba4 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.tasks.TaskCancelledException; @@ -63,7 +64,7 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti } if (unwrappedThrowable instanceof ElasticsearchException elasticsearchException) { - if (isExceptionIrrecoverable(elasticsearchException)) { + if (isExceptionIrrecoverable(elasticsearchException) && isNotIndexNotFoundException(elasticsearchException)) { return elasticsearchException; } } @@ -72,6 +73,15 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti return null; } + /** + * We can safely recover from IndexNotFoundExceptions on Bulk responses. + * If the transform is running, the next checkpoint will recreate the index. + * If the transform is not running, the next start request will recreate the index. + */ + private static boolean isNotIndexNotFoundException(ElasticsearchException elasticsearchException) { + return elasticsearchException instanceof IndexNotFoundException == false; + } + public static boolean isExceptionIrrecoverable(ElasticsearchException elasticsearchException) { if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index fe54847af0404..f39a4329f2bb1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -10,10 +10,13 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -27,6 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.script.ScriptException; @@ -75,6 +79,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; @@ -85,6 +90,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.matchesRegex; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -101,6 +107,10 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase { private Client client; private ThreadPool threadPool; + private static final Function EMPTY_BULK_RESPONSE = bulkRequest -> new BulkResponse( + new BulkItemResponse[0], + 100 + ); static class MockedTransformIndexer extends ClientTransformIndexer { @@ -110,6 +120,7 @@ static class MockedTransformIndexer extends ClientTransformIndexer { // used for synchronizing with the test private CountDownLatch latch; + private int doProcessCount; MockedTransformIndexer( ThreadPool threadPool, @@ -127,7 +138,8 @@ static class MockedTransformIndexer extends ClientTransformIndexer { TransformContext context, Function searchFunction, Function bulkFunction, - Function deleteByQueryFunction + Function deleteByQueryFunction, + int doProcessCount ) { super( threadPool, @@ -157,6 +169,7 @@ static class MockedTransformIndexer extends ClientTransformIndexer { this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.deleteByQueryFunction = deleteByQueryFunction; + this.doProcessCount = doProcessCount; } public void initialize() { @@ -278,6 +291,17 @@ void doGetFieldMappings(ActionListener> fieldMappingsListene protected void persistState(TransformState state, ActionListener listener) { listener.onResponse(null); } + + @Override + protected IterationResult doProcess(SearchResponse searchResponse) { + if (doProcessCount > 0) { + doProcessCount -= 1; + // pretend that we processed 10k documents for each call + getStats().incrementNumDocuments(10_000); + return new IterationResult<>(Stream.of(new IndexRequest()), new TransformIndexerPosition(null, null), false); + } + return super.doProcess(searchResponse); + } } @Before @@ -936,6 +960,152 @@ public void testHandleFailureAuditing() { auditor.assertAllExpectationsMatched(); } + /** + * Given no bulk upload errors + * When we run the indexer + * Then we should not fail or recreate the destination index + */ + public void testHandleBulkResponseWithNoFailures() throws Exception { + var indexer = runIndexer(createMockIndexer(returnHit(), EMPTY_BULK_RESPONSE)); + assertThat(indexer.getStats().getIndexFailures(), is(0L)); + assertFalse(indexer.context.shouldRecreateDestinationIndex()); + assertNull(indexer.context.getLastFailure()); + } + + private static TransformIndexer runIndexer(MockedTransformIndexer indexer) throws Exception { + var latch = indexer.newLatch(1); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + return indexer; + } + + private MockedTransformIndexer createMockIndexer( + Function searchFunction, + Function bulkFunction + ) { + return createMockIndexer(searchFunction, bulkFunction, mock(TransformContext.Listener.class)); + } + + private static Function returnHit() { + return request -> new SearchResponse( + new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + false, + false, + new SearchProfileResults(Collections.emptyMap()), + 1, + "", + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + } + + /** + * Given an irrecoverable bulk upload error + * When we run the indexer + * Then we should fail without retries and not recreate the destination index + */ + public void testHandleBulkResponseWithIrrecoverableFailures() throws Exception { + var failCalled = new AtomicBoolean(); + var indexer = runIndexer( + createMockIndexer( + returnHit(), + bulkResponseWithError(new ResourceNotFoundException("resource not found error")), + createContextListener(failCalled, new AtomicReference<>()) + ) + ); + assertThat(indexer.getStats().getIndexFailures(), is(1L)); + assertFalse(indexer.context.shouldRecreateDestinationIndex()); + assertTrue(failCalled.get()); + } + + private MockedTransformIndexer createMockIndexer( + Function searchFunction, + Function bulkFunction, + TransformContext.Listener listener + ) { + return createMockIndexer( + new TransformConfig( + randomAlphaOfLength(10), + randomSourceConfig(), + randomDestConfig(), + null, + null, + null, + randomPivotConfig(), + null, + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + new SettingsConfig.Builder().setMaxPageSearchSize(randomBoolean() ? null : randomIntBetween(500, 10_000)).build(), + null, + null, + null, + null + ), + new AtomicReference<>(IndexerState.STOPPED), + searchFunction, + bulkFunction, + null, + threadPool, + ThreadPool.Names.GENERIC, + mock(TransformAuditor.class), + new TransformContext(TransformTaskState.STARTED, "", 0, listener), + 1 + ); + } + + private static Function bulkResponseWithError(Exception e) { + return bulkRequest -> new BulkResponse( + new BulkItemResponse[] { + BulkItemResponse.failure(1, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("the_index", "id", e)) }, + 100 + ); + } + + /** + * Given an IndexNotFound bulk upload error + * When we run the indexer + * Then we should fail with retries and recreate the destination index + */ + public void testHandleBulkResponseWithIndexNotFound() throws Exception { + var indexer = runIndexerWithBulkResponseError(new IndexNotFoundException("Some Error")); + assertThat(indexer.getStats().getIndexFailures(), is(1L)); + assertTrue(indexer.context.shouldRecreateDestinationIndex()); + assertFalse(bulkIndexingException(indexer).isIrrecoverable()); + } + + private TransformIndexer runIndexerWithBulkResponseError(Exception e) throws Exception { + return runIndexer(createMockIndexer(returnHit(), bulkResponseWithError(e))); + } + + private static BulkIndexingException bulkIndexingException(TransformIndexer indexer) { + var lastFailure = indexer.context.getLastFailure(); + assertNotNull(lastFailure); + assertThat(lastFailure, instanceOf(BulkIndexingException.class)); + return (BulkIndexingException) lastFailure; + } + + /** + * Given a recoverable bulk upload error + * When we run the indexer + * Then we should fail with retries and not recreate the destination index + */ + public void testHandleBulkResponseWithNoIrrecoverableFailures() throws Exception { + var indexer = runIndexerWithBulkResponseError(new EsRejectedExecutionException("es rejected execution")); + assertThat(indexer.getStats().getIndexFailures(), is(1L)); + assertFalse(indexer.context.shouldRecreateDestinationIndex()); + assertFalse(bulkIndexingException(indexer).isIrrecoverable()); + } + public void testHandleFailure() { testHandleFailure(0, 5, 0, 0); testHandleFailure(5, 0, 5, 2); @@ -1042,11 +1212,36 @@ private MockedTransformIndexer createMockIndexer( String executorName, TransformAuditor auditor, TransformContext context + ) { + return createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + deleteByQueryFunction, + threadPool, + executorName, + auditor, + context, + 0 + ); + } + + private MockedTransformIndexer createMockIndexer( + TransformConfig config, + AtomicReference state, + Function searchFunction, + Function bulkFunction, + Function deleteByQueryFunction, + ThreadPool threadPool, + String executorName, + TransformAuditor auditor, + TransformContext context, + int doProcessCount ) { IndexBasedTransformConfigManager transformConfigManager = mock(IndexBasedTransformConfigManager.class); doAnswer(invocationOnMock -> { - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + ActionListener listener = invocationOnMock.getArgument(1); listener.onResponse(config); return null; }).when(transformConfigManager).getTransformConfiguration(any(), any()); @@ -1066,7 +1261,8 @@ private MockedTransformIndexer createMockIndexer( context, searchFunction, bulkFunction, - deleteByQueryFunction + deleteByQueryFunction, + doProcessCount ); indexer.initialize(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java index b71156cad5adf..9a0431d40a972 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.DocumentParsingException; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.shard.ShardId; @@ -27,116 +28,27 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentLocation; +import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; public class ExceptionRootCauseFinderTests extends ESTestCase { public void testGetFirstIrrecoverableExceptionFromBulkResponses() { - Map bulkItemResponses = new HashMap<>(); - - int id = 1; - // 1 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new DocumentParsingException(XContentLocation.UNKNOWN, "document parsing error") - ) - ) - ); - // 2 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure("the_index", "id", new ResourceNotFoundException("resource not found error")) - ) - ); - // 3 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure("the_index", "id", new IllegalArgumentException("illegal argument error")) - ) - ); - // 4 not irrecoverable - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure("the_index", "id", new EsRejectedExecutionException("es rejected execution")) - ) - ); - // 5 not irrecoverable - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure("the_index", "id", new TranslogException(new ShardId("the_index", "uid", 0), "translog error")) - ) - ); - // 6 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED) - ) - ) - ); - // 7 - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN) - ) - ) - ); - // 8 not irrecoverable - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS) - ) - ) - ); - // 9 not irrecoverable - bulkItemResponses.put( - id, - BulkItemResponse.failure( - id++, - OpType.INDEX, - new BulkItemResponse.Failure( - "the_index", - "id", - new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR) - ) - ) + Map bulkItemResponses = bulkItemResponses( + new DocumentParsingException(XContentLocation.UNKNOWN, "document parsing error"), + new ResourceNotFoundException("resource not found error"), + new IllegalArgumentException("illegal argument error"), + new EsRejectedExecutionException("es rejected execution"), + new TranslogException(new ShardId("the_index", "uid", 0), "translog error"), + new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED), + new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN), + new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS), + new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR), + new IndexNotFoundException("some missing index") ); assertFirstException(bulkItemResponses.values(), DocumentParsingException.class, "document parsing error"); @@ -157,6 +69,14 @@ public void testGetFirstIrrecoverableExceptionFromBulkResponses() { assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values())); } + private static Map bulkItemResponses(Exception... exceptions) { + var id = new AtomicInteger(1); + return Arrays.stream(exceptions) + .map(exception -> new BulkItemResponse.Failure("the_index", "id", exception)) + .map(failure -> BulkItemResponse.failure(id.get(), OpType.INDEX, failure)) + .collect(Collectors.toMap(response -> id.getAndIncrement(), Function.identity())); + } + public void testIsIrrecoverable() { assertFalse(ExceptionRootCauseFinder.isExceptionIrrecoverable(new MapperException("mappings problem"))); assertFalse(ExceptionRootCauseFinder.isExceptionIrrecoverable(new TaskCancelledException("cancelled task"))); @@ -174,6 +94,7 @@ public void testIsIrrecoverable() { assertTrue( ExceptionRootCauseFinder.isExceptionIrrecoverable(new DocumentParsingException(new XContentLocation(1, 2), "parse error")) ); + assertTrue(ExceptionRootCauseFinder.isExceptionIrrecoverable(new IndexNotFoundException("some missing index"))); } private static void assertFirstException(Collection bulkItemResponses, Class expectedClass, String message) {