-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use ConcurrentHashMap for batching tasks per executor in TaskBatcher … #5827
Use ConcurrentHashMap for batching tasks per executor in TaskBatcher … #5827
Conversation
…pensearch-project#5099) Signed-off-by: Aman Khare <[email protected]>
Signed-off-by: Aman Khare <[email protected]>
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Flaky tests issues linkSimpleBlocksIT - #2472 |
synchronized (tasksPerBatchingKey) { | ||
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.get(batchingKey); | ||
if (existingTasks != null) { | ||
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.get(batchingKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned about a potential race with the following scenario:
Thread 1: executes line 98 and adds an empty LinkedHashSet to the map
Thread 2: executes lines 145-152, which will find an empty LinkedHashSet and remove it from the map
Thread 1: executes lines 103-118, which adds tasks to the LinkedHashSet but the set has been removed from the map
Is that scenario possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 the race can possibly occur. The synchronize block on the existingTasks object is not sufficient to prevent any operation on that key in the main map tasksPerBatchingKey
.
@amkhar it is better to use compute
or preferablymerge
( as it provides capability to provide default value as well) on tasksPerBatchingKey
and perform the duplicate check, adding tasks etc. (Line 104-117) in the remappingFunction as argument which will prevent any update to the same key from any other code path executing in different thread. For the other code paths as well to remove the key etc. should also use merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrross @shwetathareja
Sorry for delayed response.
Yes, race condition is possible. A little different than what Andrew explained.
Thread 1 &2 both working on same batchingKey, and note that timeout method can only be called once thread has already executed line number 126 (only after adding the tasks).
Thread 1: executes till line number 126, so it has already added some list in the value of batchingKey
Thread 2: executes line number 98 and get a value from map which has a list of 1 task
Thread 1's async call back due to timeout: executes line 147-151 and removes the entry from map.
Thread 2: reaches on 103 and waits
Thread 2: executes the lines 104-117 and adds another entry in LinkedHashSet but it has already been removed from previous callback.
So, yes a fix is needed for this condition. I've found a way to handle this. It requires adding one line after 117
tasksPerBatchingKey.putIfAbsent(firstTask.batchingKey, existingTasks);
to make sure that if key-value was removed, we put it again while adding new tasks in the queue. And we also need to add a check to remove already processed tasks like below at line number 116, because runIfNotProcessed
method can also update and process some tasks in another thread.
if (existing.processed.get()) {
tasksIterator.remove();
}
Something like this
synchronized (existingTasks) {
Iterator<BatchedTask> tasksIterator = existingTasks.iterator();
while (tasksIterator.hasNext()) {
BatchedTask existing = tasksIterator.next();
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
if (duplicateTask != null) {
throw new IllegalStateException(
"task ["
+ duplicateTask.describeTasks(Collections.singletonList(existing))
+ "] with source ["
+ duplicateTask.source
+ "] is already queued"
);
}
if (existing.processed.get()){
tasksIterator.remove();
}
}
existingTasks.addAll(tasks);
tasksPerBatchingKey.putIfAbsent(firstTask.batchingKey, existingTasks);
}
But thanks Shweta for suggesting merge
, let me try that also. Looks similar to what we're trying to achieve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amkhar I have one concern with the above proposed fix. The synchronized block is on existingTasks and putIfAbsent
will add the key back again (in case it was removed during the timeout code execution form line 151). But lets says another Thread 3 came and at that time there was no entry in the map, it would have taken lock on a diff existingTasks object altogether. So now, 2 threads are executing the code which is in synchronized block from line 103.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrross
@shwetathareja Agreed. That was another case. I realized that, taking separate locks on LinkedHashSet is problematic. If we just maintain the CHM bucket level lock, by using its library method, that will be clean and correct.
This is how code will look using merge or computeIfPresent (only writing the concerning code block)
submitTasks - create new or update existing list then update Map
LinkedHashSet<BatchedTask> newTasks = new LinkedHashSet<>(tasks);
tasksPerBatchingKey.merge(firstTask.batchingKey, newTasks, (existingTasks, updatedTasks) -> {
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
if (duplicateTask != null) {
throw new IllegalStateException(
"task ["
+ duplicateTask.describeTasks(Collections.singletonList(existing))
+ "] with source ["
+ duplicateTask.source
+ "] is already queued"
);
}
}
existingTasks.addAll(updatedTasks);
return existingTasks;
});
And runIfNotProcessed - here for iteration lock is not needed, because no other flow will be accessing the same object after it's removed from the Map.
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
if (pending != null) {
for (BatchedTask task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}
onTimeoutInterval - Re-compute the value once timed out tasks are removed, and remove it if nothing left in LinkedHashSet.
tasksPerBatchingKey.computeIfPresent(batchingKey, (tasksKey, currentTasks) -> {
currentTasks.removeAll(toRemove);
if (currentTasks.isEmpty()) {
return null;
}
return currentTasks;
});
e79b69f
to
57b96f6
Compare
…edHashSet object Signed-off-by: Aman Khare <[email protected]>
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM. @amkhar can you please ensure we have test to submit tasks from multiple threads else add them.
Hey @shwetathareja , there are no tests which invokes parallel threads on single batchingKey. Existing test
This validation is already happening in Feel free to suggest other assertions which I may be missing :) |
@amkhar right we need to write a test to ensure none of the tasks are lost due to any race condition in the code when tasks are submitted in parallel. |
Signed-off-by: Aman Khare <[email protected]>
Gradle Check (Jenkins) Run Completed with:
|
Please rebase the latest from main, which should fix up the unrelated test failures. |
CHANGELOG.md
Outdated
@@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), | |||
- Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) | |||
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) | |||
- Require MediaType in Strings.toString API ([#6009](https://github.com/opensearch-project/OpenSearch/pull/6009)) | |||
- Use ConcurrentHashMap for batching tasks per executor in TaskBatcher ([#5827](https://github.com/opensearch-project/OpenSearch/pull/5827)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can probably omit this from the changelog because it is a targeted performance improvement that doesn't change any user-facing behavior. What do you think? (we haven't been super consistent about this, by the way)
If this does stay in the changelog, it should be in the [Unreleased 2.x]
section because this will be backported and released in the next 2.x released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we are keeping changelog strictly to user facing behavior, then I agree this should be removed.
Signed-off-by: Aman Khare <[email protected]>
Gradle Check (Jenkins) Run Completed with:
|
Opened an issue for flaky test failure : #6565 |
#5827) * Use ConcurrentHashMap for batching tasks per executor in TaskBatcher (#5099) Signed-off-by: Aman Khare <[email protected]> (cherry picked from commit 30e4e5e) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
#5827) (#6566) * Use ConcurrentHashMap for batching tasks per executor in TaskBatcher (#5099) (cherry picked from commit 30e4e5e) Signed-off-by: Aman Khare <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
opensearch-project#5827) * Use ConcurrentHashMap for batching tasks per executor in TaskBatcher (opensearch-project#5099) Signed-off-by: Aman Khare <[email protected]> Signed-off-by: Mingshi Liu <[email protected]>
Description
Master tasks execution is based on batchingKey. This is stored in a HashMap and every time there is a task to be enqueued, we take a lock on whole code block where map is being used. This is an optimization to use ConcurrentHashMap and improve the overall enqueue time. This will help when master is overloaded with pending tasks. Using synchronized block on LinkedHashSet operations as it's not concurrent naturally and we're removing synchronized block on overall map.
Update : Using internal methods(merge and computeIfPresent) of ConcurrentHashMap instead of taking lock on LinkedHashSet as there are more race conditions if we don't lock the bucket of CHM while operating over LinkedHashSet. This is much cleaner and correct implementation.
As we know that execution time would still be same for this unit test, enqueuing time has improved more than 43%.
Final comparison based on load run
After changing to CHM and using synchronized block on all LHS operations, we’re able to see tasks are being enqueued and executed faster than before.
Load setup
Ran
OpenSearch/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java
Line 214 in 082f059
In the second run switched executors to 2000 and tasks to 5000.
Issues Resolved
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.