Skip to content

Commit

Permalink
[controller] Change task list member variable back to local variable …
Browse files Browse the repository at this point in the history
…in AdminConsumptionTask (linkedin#1302)

The current implementation is error prone for multi-thread
access to function "executeMessagesAndCollectResults",
unless we add a synchronized keyword to "executeMessagesAndCollectResults".
  • Loading branch information
huangminchn authored Nov 14, 2024
1 parent a9e775b commit 06bf58c
Showing 1 changed file with 4 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,6 @@ public ExecutorService getExecutorService() {
*/
private final String regionName;

/**
* List of tasks to be executed by the worker threads.
*/
private List<Callable<Void>> tasks;

public AdminConsumptionTask(
String clusterName,
PubSubConsumerAdapter consumer,
Expand Down Expand Up @@ -497,7 +492,7 @@ private void unSubscribe() {
private void executeMessagesAndCollectResults() throws InterruptedException {
lastSucceededExecutionIdMap =
new ConcurrentHashMap<>(executionIdAccessor.getLastSucceededExecutionIdMap(clusterName));
this.tasks = new ArrayList<>();
List<Callable<Void>> tasks = new ArrayList<>();
List<String> stores = new ArrayList<>();
// Create a task for each store that has admin messages pending to be processed.
boolean skipOffsetCommandHasBeenProcessed = false;
Expand Down Expand Up @@ -528,7 +523,7 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
"Adding admin message from store {} with offset {} to the task list",
entry.getKey(),
adminMessageOffset);
this.tasks.add(newTask);
tasks.add(newTask);
stores.add(entry.getKey());
}
}
Expand All @@ -538,13 +533,13 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
}

if (isRunning.get()) {
if (!this.tasks.isEmpty()) {
if (!tasks.isEmpty()) {
int pendingAdminMessagesCount = 0;
int storesWithPendingAdminMessagesCount = 0;
long adminExecutionTasksInvokeTime = System.currentTimeMillis();
// Wait for the worker threads to finish processing the internal admin topics.
List<Future<Void>> results =
executorService.invokeAll(this.tasks, processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
executorService.invokeAll(tasks, processingCycleTimeoutInMs, TimeUnit.MILLISECONDS);
stats.recordAdminConsumptionCycleDurationMs(System.currentTimeMillis() - adminExecutionTasksInvokeTime);
Map<String, Long> newLastSucceededExecutionIdMap =
executionIdAccessor.getLastSucceededExecutionIdMap(clusterName);
Expand Down

0 comments on commit 06bf58c

Please sign in to comment.