From ca6d41ce2093989dad829ddfe053a1194e0d0b7a Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 19 Aug 2024 12:04:55 +0100 Subject: [PATCH] Fail `indexDocs()` on rejection (#111962) In 9dc59e29 we relaxed the `indexDocs()` test utility to retry on rejections caused by exceeding the write queue length limit, but then we massively relaxed this limit in #59559. We should not be seeing such rejections any more, so we can revert this special handling and strengthen the tests to assert that the indexing process encounters no failures at all. --- .../elasticsearch/test/ESIntegTestCase.java | 38 +------------------ 1 file changed, 2 insertions(+), 36 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index aad3dcc457241..fa686a0bc753a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -21,7 +21,6 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.TotalHits; import org.apache.lucene.tests.util.LuceneTestCase; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -101,7 +100,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.XContentHelper; @@ -109,7 +107,6 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.gateway.PersistedClusterStateService; @@ -186,7 +183,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -212,7 +208,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -1735,7 +1730,6 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma } } Collections.shuffle(builders, random()); - final CopyOnWriteArrayList> errors = new CopyOnWriteArrayList<>(); List inFlightAsyncOperations = new ArrayList<>(); // If you are indexing just a few documents then frequently do it one at a time. If many then frequently in bulk. final String[] indicesArray = indices.toArray(new String[] {}); @@ -1744,7 +1738,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false); for (IndexRequestBuilder indexRequestBuilder : builders) { indexRequestBuilder.execute( - new PayloadLatchedActionListener<>(indexRequestBuilder, newLatch(inFlightAsyncOperations), errors) + new LatchedActionListener(newLatch(inFlightAsyncOperations)).delegateResponse((l, e) -> fail(e)) ); postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush); } @@ -1771,19 +1765,8 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma } } for (CountDownLatch operation : inFlightAsyncOperations) { - operation.await(); - } - final List actualErrors = new ArrayList<>(); - for (Tuple tuple : errors) { - Throwable t = ExceptionsHelper.unwrapCause(tuple.v2()); - if (t instanceof EsRejectedExecutionException) { - logger.debug("Error indexing doc: " + t.getMessage() + ", reindexing."); - tuple.v1().get(); // re-index if rejected - } else { - actualErrors.add(tuple.v2()); - } + safeAwait(operation); } - assertThat(actualErrors, emptyIterable()); if (bogusIds.isEmpty() == false) { // delete the bogus types again - it might trigger merges or at least holes in the segments and enforces deleted docs! for (List doc : bogusIds) { @@ -1957,23 +1940,6 @@ protected void addError(Exception e) {} } - private class PayloadLatchedActionListener extends LatchedActionListener { - private final CopyOnWriteArrayList> errors; - private final T builder; - - PayloadLatchedActionListener(T builder, CountDownLatch latch, CopyOnWriteArrayList> errors) { - super(latch); - this.errors = errors; - this.builder = builder; - } - - @Override - protected void addError(Exception e) { - errors.add(new Tuple<>(builder, e)); - } - - } - /** * Clears the given scroll Ids */