Skip to content

Commit

Permalink
Use ConcurrentHashMap internal methods instead of taking lock on Link…
Browse files Browse the repository at this point in the history
…edHashSet object

Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Jan 31, 2023
1 parent 57b96f6 commit e79b69f
Showing 1 changed file with 18 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,8 @@ public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue t
IdentityHashMap::new
)
);

LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(
firstTask.batchingKey,
k -> new LinkedHashSet<>(tasks.size())
);
// Locking on LinkedHashSet is necessary as it is being modified in concurrent manner.
synchronized (existingTasks) {
LinkedHashSet<BatchedTask> newTasks = new LinkedHashSet<>(tasks);
tasksPerBatchingKey.merge(firstTask.batchingKey, newTasks, (existingTasks, updatedTasks) -> {
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
Expand All @@ -114,9 +109,9 @@ public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue t
);
}
}
existingTasks.addAll(tasks);
}

existingTasks.addAll(updatedTasks);
return existingTasks;
});
} catch (Exception e) {
taskBatcherListener.onSubmitFailure(tasks);
throw e;
Expand All @@ -142,15 +137,13 @@ private void onTimeoutInternal(List<? extends BatchedTask> tasks, TimeValue time
Object batchingKey = firstTask.batchingKey;
assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey)
: "tasks submitted in a batch should share the same batching key: " + tasks;
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.get(batchingKey);
if (existingTasks != null) {
synchronized (existingTasks) {
existingTasks.removeAll(toRemove);
tasksPerBatchingKey.computeIfPresent(batchingKey, (tasksKey, currentTasks) -> {
currentTasks.removeAll(toRemove);
if (currentTasks.isEmpty()) {
return null;
}
if (existingTasks.isEmpty()) {
tasksPerBatchingKey.remove(batchingKey);
}
}
return currentTasks;
});
taskBatcherListener.onTimeout(toRemove);
onTimeout(toRemove, timeout);
}
Expand All @@ -170,15 +163,13 @@ void runIfNotProcessed(BatchedTask updateTask) {
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
if (pending != null) {
synchronized (pending) {
for (BatchedTask task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
for (BatchedTask task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}
Expand Down

0 comments on commit e79b69f

Please sign in to comment.