diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 66cb1fac6bdf..a0117fb5692b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -65,12 +65,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class MasterServiceTests extends ESTestCase { @@ -463,6 +465,118 @@ public void onFailure(Exception e) { } } + public void testMultipleSubmissionBatching() throws Exception { + + class Task implements ClusterStateTaskListener { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + } + + final int executorCount = between(1, 5); + final var executionCountDown = new CountDownLatch(executorCount); + + class Executor implements ClusterStateTaskExecutor { + + final AtomicBoolean executed = new AtomicBoolean(); + + int expectedTaskCount; + + public void addExpectedTaskCount(int taskCount) { + expectedTaskCount += taskCount; + } + + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + assertTrue("Should execute all tasks at once", executed.compareAndSet(false, true)); + assertThat("Should execute all tasks at once", tasks.size(), equalTo(expectedTaskCount)); + executionCountDown.countDown(); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + } + + final var executors = new Executor[executorCount]; + for (int i = 0; i < executors.length; i++) { + executors[i] = new Executor(); + } + + try (var masterService = createMasterService(true)) { + + final var executionBarrier = new CyclicBarrier(2); + + masterService.submitStateUpdateTask( + "block", + new Task(), + ClusterStateTaskConfig.build(Priority.NORMAL), + (currentState, tasks) -> { + executionBarrier.await(10, TimeUnit.SECONDS); // notify test thread that the master service is blocked + executionBarrier.await(10, TimeUnit.SECONDS); // wait for test thread to release us + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + ); + + executionBarrier.await(10, TimeUnit.SECONDS); // wait for the master service to be blocked + + final var submissionLatch = new CountDownLatch(1); + + final var submitThreads = new Thread[between(1, 10)]; + for (int i = 0; i < submitThreads.length; i++) { + final var executor = randomFrom(executors); + final var tasks = randomList(1, 10, Task::new); + executor.addExpectedTaskCount(tasks.size()); + submitThreads[i] = new Thread(() -> { + try { + assertTrue(submissionLatch.await(10, TimeUnit.SECONDS)); + masterService.submitStateUpdateTasks( + Thread.currentThread().getName(), + tasks, + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor + ); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + + }, "submit-thread-" + i); + } + + for (var executor : executors) { + if (executor.expectedTaskCount == 0) { + executionCountDown.countDown(); + } + } + + for (var submitThread : submitThreads) { + submitThread.start(); + } + + submissionLatch.countDown(); + + for (var submitThread : submitThreads) { + submitThread.join(); + } + + for (var executor : executors) { + assertFalse(executor.executed.get()); + } + + assertThat(masterService.numberOfPendingTasks(), equalTo(submitThreads.length + 1)); + final var sources = masterService.pendingTasks().stream().map(t -> t.getSource().string()).collect(Collectors.toSet()); + assertThat(sources, hasSize(submitThreads.length + 1)); + assertTrue(sources.contains("block")); + for (int i = 0; i < submitThreads.length; i++) { + assertTrue("submit-thread-" + i, sources.contains("submit-thread-" + i)); + } + + executionBarrier.await(10, TimeUnit.SECONDS); // release block on master service + assertTrue(executionCountDown.await(10, TimeUnit.SECONDS)); + for (var executor : executors) { + assertTrue(executor.executed.get() != (executor.expectedTaskCount == 0)); + } + } + } + public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException { AtomicInteger executedTasks = new AtomicInteger();