Skip to content

Commit

Permalink
Fail indexDocs() on rejection (elastic#111962)
Browse files Browse the repository at this point in the history
In 9dc59e2 we relaxed the `indexDocs()` test utility to retry on
rejections caused by exceeding the write queue length limit, but then we
massively relaxed this limit in elastic#59559. We should not be seeing such
rejections any more, so we can revert this special handling and
strengthen the tests to assert that the indexing process encounters no
failures at all.
  • Loading branch information
DaveCTurner authored Aug 19, 2024
1 parent d2e6670 commit ca6d41c
Showing 1 changed file with 2 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -101,15 +100,13 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.gateway.PersistedClusterStateService;
Expand Down Expand Up @@ -186,7 +183,6 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand All @@ -212,7 +208,6 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -1735,7 +1730,6 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
}
}
Collections.shuffle(builders, random());
final CopyOnWriteArrayList<Tuple<IndexRequestBuilder, Exception>> errors = new CopyOnWriteArrayList<>();
List<CountDownLatch> inFlightAsyncOperations = new ArrayList<>();
// If you are indexing just a few documents then frequently do it one at a time. If many then frequently in bulk.
final String[] indicesArray = indices.toArray(new String[] {});
Expand All @@ -1744,7 +1738,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false);
for (IndexRequestBuilder indexRequestBuilder : builders) {
indexRequestBuilder.execute(
new PayloadLatchedActionListener<>(indexRequestBuilder, newLatch(inFlightAsyncOperations), errors)
new LatchedActionListener<DocWriteResponse>(newLatch(inFlightAsyncOperations)).delegateResponse((l, e) -> fail(e))
);
postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush);
}
Expand All @@ -1771,19 +1765,8 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
}
}
for (CountDownLatch operation : inFlightAsyncOperations) {
operation.await();
}
final List<Exception> actualErrors = new ArrayList<>();
for (Tuple<IndexRequestBuilder, Exception> tuple : errors) {
Throwable t = ExceptionsHelper.unwrapCause(tuple.v2());
if (t instanceof EsRejectedExecutionException) {
logger.debug("Error indexing doc: " + t.getMessage() + ", reindexing.");
tuple.v1().get(); // re-index if rejected
} else {
actualErrors.add(tuple.v2());
}
safeAwait(operation);
}
assertThat(actualErrors, emptyIterable());
if (bogusIds.isEmpty() == false) {
// delete the bogus types again - it might trigger merges or at least holes in the segments and enforces deleted docs!
for (List<String> doc : bogusIds) {
Expand Down Expand Up @@ -1957,23 +1940,6 @@ protected void addError(Exception e) {}

}

private class PayloadLatchedActionListener<Response, T> extends LatchedActionListener<Response> {
private final CopyOnWriteArrayList<Tuple<T, Exception>> errors;
private final T builder;

PayloadLatchedActionListener(T builder, CountDownLatch latch, CopyOnWriteArrayList<Tuple<T, Exception>> errors) {
super(latch);
this.errors = errors;
this.builder = builder;
}

@Override
protected void addError(Exception e) {
errors.add(new Tuple<>(builder, e));
}

}

/**
* Clears the given scroll Ids
*/
Expand Down

0 comments on commit ca6d41c

Please sign in to comment.