From f412de7e8ec93631b224e7d350907852e4433121 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 26 Sep 2024 17:32:17 +0100 Subject: [PATCH] Handle rejections in `IncrementalBulkIT` (#113599) Submitting a bare runnable to a threadpool risks an exception being thrown if the queue is full. This commit moves to submitting `AbstractRunnable` instances that won't be rejected. Closes #113365 --- muted-tests.yml | 3 - .../action/bulk/IncrementalBulkIT.java | 91 ++++++++++++------- 2 files changed, 60 insertions(+), 34 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 5d7474af06d86..7a21957a044c0 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -257,9 +257,6 @@ tests: - class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT method: test {yaml=reference/ccr/apis/follow/post-resume-follow/line_84} issue: https://github.com/elastic/elasticsearch/issues/113343 -- class: org.elasticsearch.action.bulk.IncrementalBulkIT - method: testBulkLevelBulkFailureAfterFirstIncrementalRequest - issue: https://github.com/elastic/elasticsearch/issues/113365 - class: org.elasticsearch.xpack.ml.integration.MlJobIT method: testDeleteJob_TimingStatsDocumentIsDeleted issue: https://github.com/elastic/elasticsearch/issues/113370 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java index d7a5d4e2ac973..75f914f76dd77 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Releasable; @@ -37,6 +38,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -214,14 +216,8 @@ public void testGlobalBulkFailure() throws InterruptedException { IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, randomNodeName); ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, randomNodeName); - int threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax(); - long queueSize = threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles(); - blockWritePool(threadCount, threadPool, blockingLatch); - - Runnable runnable = () -> {}; - for (int i = 0; i < queueSize; i++) { - threadPool.executor(ThreadPool.Names.WRITE).execute(runnable); - } + blockWritePool(threadPool, blockingLatch); + fillWriteQueue(threadPool); IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); if (randomBoolean()) { @@ -253,35 +249,32 @@ public void testBulkLevelBulkFailureAfterFirstIncrementalRequest() throws Except AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {}); PlainActionFuture future = new PlainActionFuture<>(); - int threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax(); - long queueSize = threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles(); - CountDownLatch blockingLatch1 = new CountDownLatch(1); AtomicBoolean nextRequested = new AtomicBoolean(true); AtomicLong hits = new AtomicLong(0); - try (Releasable ignored2 = blockingLatch1::countDown;) { - blockWritePool(threadCount, threadPool, blockingLatch1); + try { + blockWritePool(threadPool, blockingLatch1); while (nextRequested.get()) { nextRequested.set(false); refCounted.incRef(); handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextRequested.set(true)); hits.incrementAndGet(); } + } finally { + blockingLatch1.countDown(); } assertBusy(() -> assertTrue(nextRequested.get())); CountDownLatch blockingLatch2 = new CountDownLatch(1); - try (Releasable ignored3 = blockingLatch2::countDown;) { - blockWritePool(threadCount, threadPool, blockingLatch2); - Runnable runnable = () -> {}; - // Fill Queue - for (int i = 0; i < queueSize; i++) { - threadPool.executor(ThreadPool.Names.WRITE).execute(runnable); - } + try { + blockWritePool(threadPool, blockingLatch2); + fillWriteQueue(threadPool); handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); + } finally { + blockingLatch2.countDown(); } // Should not throw because some succeeded @@ -459,19 +452,55 @@ public void testShortCircuitShardLevelFailureWithIngestNodeHop() throws Exceptio } } - private static void blockWritePool(int threadCount, ThreadPool threadPool, CountDownLatch blockingLatch) throws InterruptedException { - CountDownLatch startedLatch = new CountDownLatch(threadCount); + private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) { + final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax(); + final var startBarrier = new CyclicBarrier(threadCount + 1); + final var blockingTask = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + fail(e); + } + + @Override + protected void doRun() { + safeAwait(startBarrier); + safeAwait(finishLatch); + } + + @Override + public boolean isForceExecution() { + return true; + } + }; for (int i = 0; i < threadCount; i++) { - threadPool.executor(ThreadPool.Names.WRITE).execute(() -> { - startedLatch.countDown(); - try { - blockingLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); + threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask); + } + safeAwait(startBarrier); + } + + private static void fillWriteQueue(ThreadPool threadPool) { + final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles()); + final var queueFilled = new AtomicBoolean(false); + final var queueFillingTask = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + fail(e); + } + + @Override + protected void doRun() { + assertTrue("thread pool not blocked", queueFilled.get()); + } + + @Override + public boolean isForceExecution() { + return true; + } + }; + for (int i = 0; i < queueSize; i++) { + threadPool.executor(ThreadPool.Names.WRITE).execute(queueFillingTask); } - startedLatch.await(); + queueFilled.set(true); } private BulkResponse executeBulk(long docs, String index, IncrementalBulkService.Handler handler, ExecutorService executorService) {