From e8dde1e217e9ab0ae302e963de6f6ec576e3dd3b Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 11 Jan 2023 12:30:09 +0530 Subject: [PATCH 1/5] Use ConcurrentHashMap for batching tasks per executor in TaskBatcher (#5099) Signed-off-by: Aman Khare --- .../cluster/service/TaskBatcher.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java index b5710bab41172..463e59e877884 100644 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java @@ -46,6 +46,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -61,7 +62,7 @@ public abstract class TaskBatcher { private final Logger logger; private final PrioritizedOpenSearchThreadPoolExecutor threadExecutor; // package visible for tests - final Map> tasksPerBatchingKey = new HashMap<>(); + final Map> tasksPerBatchingKey = new ConcurrentHashMap<>(); private final TaskBatcherListener taskBatcherListener; public TaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) { @@ -94,11 +95,12 @@ public void submitTasks(List tasks, @Nullable TimeValue t ) ); - synchronized (tasksPerBatchingKey) { - LinkedHashSet existingTasks = tasksPerBatchingKey.computeIfAbsent( - firstTask.batchingKey, - k -> new LinkedHashSet<>(tasks.size()) - ); + LinkedHashSet existingTasks = tasksPerBatchingKey.computeIfAbsent( + firstTask.batchingKey, + k -> new LinkedHashSet<>(tasks.size()) + ); + // Locking on LinkedHashSet is necessary as it is being modified in concurrent manner. + synchronized (existingTasks) { for (BatchedTask existing : existingTasks) { // check that there won't be two tasks with the same identity for the same batching key BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); @@ -114,6 +116,7 @@ public void submitTasks(List tasks, @Nullable TimeValue t } existingTasks.addAll(tasks); } + } catch (Exception e) { taskBatcherListener.onSubmitFailure(tasks); throw e; @@ -139,13 +142,13 @@ private void onTimeoutInternal(List tasks, TimeValue time Object batchingKey = firstTask.batchingKey; assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks; - synchronized (tasksPerBatchingKey) { - LinkedHashSet existingTasks = tasksPerBatchingKey.get(batchingKey); - if (existingTasks != null) { + LinkedHashSet existingTasks = tasksPerBatchingKey.get(batchingKey); + if (existingTasks != null) { + synchronized (existingTasks) { existingTasks.removeAll(toRemove); - if (existingTasks.isEmpty()) { - tasksPerBatchingKey.remove(batchingKey); - } + } + if (existingTasks.isEmpty()) { + tasksPerBatchingKey.remove(batchingKey); } } taskBatcherListener.onTimeout(toRemove); @@ -165,9 +168,9 @@ void runIfNotProcessed(BatchedTask updateTask) { if (updateTask.processed.get() == false) { final List toExecute = new ArrayList<>(); final Map> processTasksBySource = new HashMap<>(); - synchronized (tasksPerBatchingKey) { - LinkedHashSet pending = tasksPerBatchingKey.remove(updateTask.batchingKey); - if (pending != null) { + LinkedHashSet pending = tasksPerBatchingKey.remove(updateTask.batchingKey); + if (pending != null) { + synchronized (pending) { for (BatchedTask task : pending) { if (task.processed.getAndSet(true) == false) { logger.trace("will process {}", task); From 91d31822c726634a4f924c3b6ff3621b1f101a2b Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 11 Jan 2023 13:04:20 +0530 Subject: [PATCH 2/5] Update changelog Signed-off-by: Aman Khare --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37f9fc821a5c0..273f6e0d8518d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955)) +- Use ConcurrentHashMap for batching tasks per executor in TaskBatcher ([#5827](https://github.com/opensearch-project/OpenSearch/pull/5827)) ### Deprecated From fa73fda90821f2cb1896eb5da073aa2dc5b3a732 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 31 Jan 2023 15:59:11 +0530 Subject: [PATCH 3/5] Use ConcurrentHashMap internal methods instead of taking lock on LinkedHashSet object Signed-off-by: Aman Khare --- .../cluster/service/TaskBatcher.java | 43 ++++++++----------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java index 463e59e877884..686169f81e837 100644 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java @@ -94,13 +94,8 @@ public void submitTasks(List tasks, @Nullable TimeValue t IdentityHashMap::new ) ); - - LinkedHashSet existingTasks = tasksPerBatchingKey.computeIfAbsent( - firstTask.batchingKey, - k -> new LinkedHashSet<>(tasks.size()) - ); - // Locking on LinkedHashSet is necessary as it is being modified in concurrent manner. - synchronized (existingTasks) { + LinkedHashSet newTasks = new LinkedHashSet<>(tasks); + tasksPerBatchingKey.merge(firstTask.batchingKey, newTasks, (existingTasks, updatedTasks) -> { for (BatchedTask existing : existingTasks) { // check that there won't be two tasks with the same identity for the same batching key BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); @@ -114,9 +109,9 @@ public void submitTasks(List tasks, @Nullable TimeValue t ); } } - existingTasks.addAll(tasks); - } - + existingTasks.addAll(updatedTasks); + return existingTasks; + }); } catch (Exception e) { taskBatcherListener.onSubmitFailure(tasks); throw e; @@ -142,15 +137,13 @@ private void onTimeoutInternal(List tasks, TimeValue time Object batchingKey = firstTask.batchingKey; assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks; - LinkedHashSet existingTasks = tasksPerBatchingKey.get(batchingKey); - if (existingTasks != null) { - synchronized (existingTasks) { - existingTasks.removeAll(toRemove); - } + tasksPerBatchingKey.computeIfPresent(batchingKey, (tasksKey, existingTasks) -> { + existingTasks.removeAll(toRemove); if (existingTasks.isEmpty()) { - tasksPerBatchingKey.remove(batchingKey); + return null; } - } + return existingTasks; + }); taskBatcherListener.onTimeout(toRemove); onTimeout(toRemove, timeout); } @@ -170,15 +163,13 @@ void runIfNotProcessed(BatchedTask updateTask) { final Map> processTasksBySource = new HashMap<>(); LinkedHashSet pending = tasksPerBatchingKey.remove(updateTask.batchingKey); if (pending != null) { - synchronized (pending) { - for (BatchedTask task : pending) { - if (task.processed.getAndSet(true) == false) { - logger.trace("will process {}", task); - toExecute.add(task); - processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); - } else { - logger.trace("skipping {}, already processed", task); - } + for (BatchedTask task : pending) { + if (task.processed.getAndSet(true) == false) { + logger.trace("will process {}", task); + toExecute.add(task); + processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); + } else { + logger.trace("skipping {}, already processed", task); } } } From d771fd4c68c647d7ff2aab8deb92cbb1b81ea030 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 3 Feb 2023 16:20:46 +0530 Subject: [PATCH 4/5] Add UT for submitting tasks per batchingKey in parallel Signed-off-by: Aman Khare --- .../cluster/service/TaskBatcherTests.java | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java b/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java index 31018d4cef029..b59b70ca60ef8 100644 --- a/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java @@ -279,6 +279,87 @@ public void processed(String source) { } } + public void testNoTasksAreDroppedInParallelSubmission() throws BrokenBarrierException, InterruptedException { + int numberOfThreads = randomIntBetween(2, 8); + TaskExecutor[] executors = new TaskExecutor[numberOfThreads]; + for (int i = 0; i < numberOfThreads; i++) { + executors[i] = new TaskExecutor(); + } + + int tasksSubmittedPerThread = randomIntBetween(2, 1024); + + CopyOnWriteArrayList> failures = new CopyOnWriteArrayList<>(); + CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread); + + final TestListener listener = new TestListener() { + @Override + public void onFailure(String source, Exception e) { + logger.error(() -> new ParameterizedMessage("unexpected failure: [{}]", source), e); + failures.add(new Tuple<>(source, e)); + updateLatch.countDown(); + } + + @Override + public void processed(String source) { + updateLatch.countDown(); + } + }; + + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + + for (int i = 0; i < numberOfThreads; i++) { + final int index = i; + Thread thread = new Thread(() -> { + try { + barrier.await(); + CyclicBarrier tasksBarrier = new CyclicBarrier(1 + tasksSubmittedPerThread); + for (int j = 0; j < tasksSubmittedPerThread; j++) { + int taskNumber = j; + Thread taskThread = new Thread(() -> { + try { + tasksBarrier.await(); + submitTask( + "[" + index + "][" + taskNumber + "]", + taskNumber, + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executors[index], + listener + ); + tasksBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + }); + // submit tasks per batchingKey in parallel + taskThread.start(); + } + // wait for all task threads to be ready + tasksBarrier.await(); + // wait for all task threads to finish + tasksBarrier.await(); + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + }); + thread.start(); + } + + // wait for all executor threads to be ready + barrier.await(); + // wait for all executor threads to finish + barrier.await(); + + updateLatch.await(); + + assertThat(failures, empty()); + + for (int i = 0; i < numberOfThreads; i++) { + // assert that total executed tasks is same for every executor as we initiated + assertEquals(tasksSubmittedPerThread, executors[i].tasks.size()); + } + } + public void testSingleBatchSubmission() throws InterruptedException { Map tasks = new HashMap<>(); final int numOfTasks = randomInt(10); From af0aeb58f784e3c495968cb86afd51e0509ebd7d Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Mon, 6 Mar 2023 15:23:16 +0530 Subject: [PATCH 5/5] Remove change log entry for performance improvement Signed-off-by: Aman Khare --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bde6bb4de747a..a09e2ac686613 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,7 +46,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Require MediaType in Strings.toString API ([#6009](https://github.com/opensearch-project/OpenSearch/pull/6009)) -- Use ConcurrentHashMap for batching tasks per executor in TaskBatcher ([#5827](https://github.com/opensearch-project/OpenSearch/pull/5827)) ### Deprecated