Skip to content

Commit

Permalink
Handle rejections in IncrementalBulkIT (elastic#113599)
Browse files Browse the repository at this point in the history
Submitting a bare runnable to a threadpool risks an exception being
thrown if the queue is full. This commit moves to submitting
`AbstractRunnable` instances that won't be rejected.

Closes elastic#113365
  • Loading branch information
DaveCTurner authored Sep 26, 2024
1 parent 1e2c19f commit f412de7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 34 deletions.
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,6 @@ tests:
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
method: test {yaml=reference/ccr/apis/follow/post-resume-follow/line_84}
issue: https://github.com/elastic/elasticsearch/issues/113343
- class: org.elasticsearch.action.bulk.IncrementalBulkIT
method: testBulkLevelBulkFailureAfterFirstIncrementalRequest
issue: https://github.com/elastic/elasticsearch/issues/113365
- class: org.elasticsearch.xpack.ml.integration.MlJobIT
method: testDeleteJob_TimingStatsDocumentIsDeleted
issue: https://github.com/elastic/elasticsearch/issues/113370
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Releasable;
Expand All @@ -37,6 +38,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -214,14 +216,8 @@ public void testGlobalBulkFailure() throws InterruptedException {
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, randomNodeName);
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, randomNodeName);

int threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
long queueSize = threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles();
blockWritePool(threadCount, threadPool, blockingLatch);

Runnable runnable = () -> {};
for (int i = 0; i < queueSize; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(runnable);
}
blockWritePool(threadPool, blockingLatch);
fillWriteQueue(threadPool);

IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
if (randomBoolean()) {
Expand Down Expand Up @@ -253,35 +249,32 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();

int threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
long queueSize = threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles();

CountDownLatch blockingLatch1 = new CountDownLatch(1);

AtomicBoolean nextRequested = new AtomicBoolean(true);
AtomicLong hits = new AtomicLong(0);
try (Releasable ignored2 = blockingLatch1::countDown;) {
blockWritePool(threadCount, threadPool, blockingLatch1);
try {
blockWritePool(threadPool, blockingLatch1);
while (nextRequested.get()) {
nextRequested.set(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextRequested.set(true));
hits.incrementAndGet();
}
} finally {
blockingLatch1.countDown();
}
assertBusy(() -> assertTrue(nextRequested.get()));

CountDownLatch blockingLatch2 = new CountDownLatch(1);

try (Releasable ignored3 = blockingLatch2::countDown;) {
blockWritePool(threadCount, threadPool, blockingLatch2);
Runnable runnable = () -> {};
// Fill Queue
for (int i = 0; i < queueSize; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(runnable);
}
try {
blockWritePool(threadPool, blockingLatch2);
fillWriteQueue(threadPool);

handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
} finally {
blockingLatch2.countDown();
}

// Should not throw because some succeeded
Expand Down Expand Up @@ -459,19 +452,55 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio
}
}

private static void blockWritePool(int threadCount, ThreadPool threadPool, CountDownLatch blockingLatch) throws InterruptedException {
CountDownLatch startedLatch = new CountDownLatch(threadCount);
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
final var startBarrier = new CyclicBarrier(threadCount + 1);
final var blockingTask = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
fail(e);
}

@Override
protected void doRun() {
safeAwait(startBarrier);
safeAwait(finishLatch);
}

@Override
public boolean isForceExecution() {
return true;
}
};
for (int i = 0; i < threadCount; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
startedLatch.countDown();
try {
blockingLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask);
}
safeAwait(startBarrier);
}

private static void fillWriteQueue(ThreadPool threadPool) {
final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles());
final var queueFilled = new AtomicBoolean(false);
final var queueFillingTask = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
fail(e);
}

@Override
protected void doRun() {
assertTrue("thread pool not blocked", queueFilled.get());
}

@Override
public boolean isForceExecution() {
return true;
}
};
for (int i = 0; i < queueSize; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(queueFillingTask);
}
startedLatch.await();
queueFilled.set(true);
}

private BulkResponse executeBulk(long docs, String index, IncrementalBulkService.Handler handler, ExecutorService executorService) {
Expand Down

0 comments on commit f412de7

Please sign in to comment.