Skip to content

Commit

Permalink
Update the signature of the submitStateUpdateTask across the codeba…
Browse files Browse the repository at this point in the history
…se. (elastic#82942)

This change updates submitStateUpdateTask signature to enforce task implements
ClusterStateTaskListener and remove extra listener argument. This reduces the
number of arguments and simplifies the call sites as all tasks are listeners already.
  • Loading branch information
idegtiarenko authored Jan 25, 2022
1 parent f416b67 commit 65d27ad
Show file tree
Hide file tree
Showing 15 changed files with 51 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected void masterOperation(
}
}, finalListener::onFailure);
CreateIndexTask clusterTask = new CreateIndexTask(request, listener, indexNameRef);
clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask);
clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ protected void masterOperation(
String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trialRolloverResponse, listener);
ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL, rolloverRequest.masterNodeTimeout());
clusterService.submitStateUpdateTask(source, rolloverTask, config, rolloverTaskExecutor, rolloverTask);
clusterService.submitStateUpdateTask(source, rolloverTask, config, rolloverTaskExecutor);
} else {
// conditions not met
listener.onResponse(trialRolloverResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ private Builder<T> result(T task, TaskResult executionResult) {
public ClusterTasksResult<T> build(ClusterState resultingState) {
return new ClusterTasksResult<>(resultingState, executionResults);
}

ClusterTasksResult<T> build(ClusterTasksResult<T> result, ClusterState previousState) {
return new ClusterTasksResult<>(result.resultingState == null ? previousState : result.resultingState, executionResults);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public ClusterTasksResult<LocalMasterServiceTask> execute(ClusterState currentSt
LocalMasterServiceTask.this.execute(currentState);
return ClusterTasksResult.<LocalMasterServiceTask>builder().successes(tasks).build(currentState);
}
},
this
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,7 @@ public void messageReceived(FailedShardEntry request, TransportChannel channel,
TASK_SOURCE,
update,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateTaskExecutor,
update
shardFailedClusterStateTaskExecutor
);
}
}
Expand Down Expand Up @@ -607,8 +606,7 @@ public void messageReceived(StartedShardEntry request, TransportChannel channel,
"shard-started " + request,
update,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
update
shardStartedClusterStateTaskExecutor
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ private void removeNode(DiscoveryNode discoveryNode, String reason) {
"node-left",
task,
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
task
nodeRemovalExecutor
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -403,7 +401,7 @@ public void handleJoinRequest(DiscoveryNode sender, ActionListener<Void> joinLis
joinListener
);
assert joinTaskExecutor != null;
masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor, task);
masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}

@Override
Expand Down Expand Up @@ -456,18 +454,17 @@ public void close(Mode newMode) {
assert closed == false : "CandidateJoinAccumulator closed";
closed = true;
if (newMode == Mode.LEADER) {
final Map<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new LinkedHashMap<>();
final Consumer<JoinTaskExecutor.Task> pendingTaskAdder = task -> pendingAsTasks.put(task, task);
final List<JoinTaskExecutor.Task> pendingAsTasks = new ArrayList<>();
joinRequestAccumulator.forEach(
(node, listener) -> pendingTaskAdder.accept(
(node, listener) -> pendingAsTasks.add(
new JoinTaskExecutor.Task(node, joinReasonService.getJoinReason(node, Mode.CANDIDATE), listener)
)
);

final String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)";

pendingTaskAdder.accept(JoinTaskExecutor.newBecomeMasterTask());
pendingTaskAdder.accept(JoinTaskExecutor.newFinishElectionTask());
pendingAsTasks.add(JoinTaskExecutor.newBecomeMasterTask());
pendingAsTasks.add(JoinTaskExecutor.newFinishElectionTask());
joinTaskExecutor = joinTaskExecutorGenerator.get();
masterService.submitStateUpdateTasks(
stateUpdateSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ public void putMapping(final PutMappingClusterStateUpdateRequest request, final
"put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()),
task,
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
putMappingExecutor,
task
putMappingExecutor
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.List;

public class ClusterService extends AbstractLifecycleComponent {
private final MasterService masterService;
Expand Down Expand Up @@ -223,8 +223,7 @@ public final String getNodeName() {
}

/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}.
* Submits a cluster state update task
* @param source the source of the cluster state update task
* @param updateTask the full context for the cluster state update
* @param executor the executor to use for the submitted task.
Expand All @@ -234,7 +233,7 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskListener> void submit
T updateTask,
ClusterStateTaskExecutor<T> executor
) {
submitStateUpdateTask(source, updateTask, updateTask, executor, updateTask);
submitStateUpdateTask(source, updateTask, updateTask, executor);
}

/**
Expand All @@ -246,24 +245,21 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskListener> void submit
* tasks will all be executed on the executor in a single batch
*
* @param source the source of the cluster state update task
* @param task the state needed for the cluster state update task
* @param task the state and the callback needed for the cluster state update task
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param listener callback after the cluster state update task
* completes
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTask(
public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
String source,
T task,
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor,
ClusterStateTaskListener listener
ClusterStateTaskExecutor<T> executor
) {
masterService.submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
masterService.submitStateUpdateTasks(source, List.of(task), config, executor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -426,9 +426,8 @@ public Builder incrementVersion(ClusterState clusterState) {
}

/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched.
* @param source the source of the cluster state update task
* Submits a cluster state update task
* @param source the source of the cluster state update task
* @param updateTask the full context for the cluster state update
* @param executor
*
Expand All @@ -438,7 +437,7 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskListener> void submit
T updateTask,
ClusterStateTaskExecutor<T> executor
) {
submitStateUpdateTask(source, updateTask, updateTask, executor, updateTask);
submitStateUpdateTask(source, updateTask, updateTask, executor);
}

/**
Expand All @@ -450,24 +449,21 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskListener> void submit
* tasks will all be executed on the executor in a single batch
*
* @param source the source of the cluster state update task
* @param task the state needed for the cluster state update task
* @param task the state and the callback needed for the cluster state update task
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param listener callback after the cluster state update task
* completes
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTask(
public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
String source,
T task,
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor,
ClusterStateTaskListener listener
ClusterStateTaskExecutor<T> executor
) {
submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
submitStateUpdateTasks(source, List.of(task), config, executor);
}

/**
Expand Down Expand Up @@ -896,17 +892,17 @@ void onNoLongerMaster() {
* potentially with more tasks of the same executor.
*
* @param source the source of the cluster state update task
* @param tasks a map of update tasks and their corresponding listeners
* @param tasks a collection of update tasks and their corresponding listeners
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTasks(
public <T extends ClusterStateTaskListener> void submitStateUpdateTasks(
final String source,
final Map<T, ClusterStateTaskListener> tasks,
final Collection<T> tasks,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor
) {
Expand All @@ -918,10 +914,9 @@ public <T> void submitStateUpdateTasks(
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();

List<Batcher.UpdateTask> safeTasks = tasks.entrySet()
.stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
.collect(Collectors.toList());
List<Batcher.UpdateTask> safeTasks = tasks.stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e, safe(e, supplier), executor))
.toList();
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3418,8 +3418,7 @@ private void innerUpdateSnapshotState(
"update snapshot state",
update,
ClusterStateTaskConfig.build(Priority.NORMAL),
SHARD_STATE_EXECUTOR,
update
SHARD_STATE_EXECUTOR
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toMap;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -275,20 +273,22 @@ public void onFailure(Exception e) {}
"testClusterStateTaskListenerThrowingExceptionIsOkay",
update,
ClusterStateTaskConfig.build(Priority.NORMAL),
new ClusterStateTaskExecutor<Object>() {
new ClusterStateTaskExecutor<>() {
@Override
public ClusterTasksResult<Object> execute(ClusterState currentState, List<Object> tasks) {
public ClusterTasksResult<ClusterStateTaskListener> execute(
ClusterState currentState,
List<ClusterStateTaskListener> tasks
) {
ClusterState newClusterState = ClusterState.builder(currentState).build();
return ClusterTasksResult.builder().successes(tasks).build(newClusterState);
return ClusterTasksResult.<ClusterStateTaskListener>builder().successes(tasks).build(newClusterState);
}

@Override
public void clusterStatePublished(ClusterStatePublicationEvent clusterStatePublicationEvent) {
published.set(true);
latch.countDown();
}
},
update
}
);

latch.await();
Expand Down Expand Up @@ -602,18 +602,16 @@ public void clusterStatePublished(ClusterStatePublicationEvent clusterPublicatio
var executor = assignment.v1();
submittedTasks.addAndGet(tasks.size());
if (tasks.size() == 1) {
var update = tasks.iterator().next();
masterService.submitStateUpdateTask(
threadName,
update,
tasks.iterator().next(),
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor,
update
executor
);
} else {
masterService.submitStateUpdateTasks(
threadName,
tasks.stream().collect(toMap(Function.<Task>identity(), Function.<ClusterStateTaskListener>identity())),
tasks,
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor
);
Expand Down Expand Up @@ -685,8 +683,7 @@ public void onFailure(Exception e) {}
(currentState, tasks) -> {
ClusterState newClusterState = ClusterState.builder(currentState).build();
return ClusterTasksResult.<ClusterStateTaskListener>builder().successes(tasks).build(newClusterState);
},
update
}
);

latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ private void submitUnlessAlreadyQueued(String source, IndexLifecycleClusterState
busyIndices.remove(dedupKey);
assert removed : "tried to unregister unknown task [" + task + "]";
}));
clusterService.submitStateUpdateTask(source, task, ILM_TASK_CONFIG, ILM_TASK_EXECUTOR, task);
clusterService.submitStateUpdateTask(source, task, ILM_TASK_CONFIG, ILM_TASK_EXECUTOR);
} else {
logger.trace("skipped redundant execution of [{}]", source);
}
Expand Down
Loading

0 comments on commit 65d27ad

Please sign in to comment.