From 0431d8502c58b9a07cb986cb9011fea0d09cac7a Mon Sep 17 00:00:00 2001 From: Ievgen Degtiarenko Date: Mon, 31 Jan 2022 10:06:26 +0100 Subject: [PATCH] restrict generic parameter type in ClusterStateTaskExecutor (#83024) Currently, submitStateUpdateTask only accept tasks that implement ClusterStateTaskListener. This pr adjusts the ClusterStateTaskExecutor to have similar restriction. --- .../cluster/ClusterStateTaskExecutor.java | 8 ++-- .../cluster/service/MasterService.java | 47 +++++++++++-------- .../ClusterStateTaskExecutorTests.java | 30 +++++------- .../indices/cluster/ClusterStateChanges.java | 23 +++++---- 4 files changed, 57 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index 835665a4b396..b9ceef4c8e98 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -14,7 +14,7 @@ import java.util.List; import java.util.Map; -public interface ClusterStateTaskExecutor { +public interface ClusterStateTaskExecutor { /** * Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state * should be changed. @@ -63,16 +63,16 @@ default String describeTasks(List tasks) { * * @param the type of the cluster state update task */ - record ClusterTasksResult ( + record ClusterTasksResult ( @Nullable ClusterState resultingState, // the resulting cluster state Map executionResults // the correspondence between tasks and their outcome ) { - public static Builder builder() { + public static Builder builder() { return new Builder<>(); } - public static class Builder { + public static class Builder { private final Map executionResults = new IdentityHashMap<>(); public Builder success(T task) { diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index f664b0514a0c..7213ed81648e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -159,9 +159,9 @@ protected void onTimeout(List tasks, TimeValue timeout) { @Override protected void run(Object batchingKey, List tasks, String tasksSummary) { - ClusterStateTaskExecutor taskExecutor = (ClusterStateTaskExecutor) batchingKey; - List updateTasks = (List) tasks; - runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary)); + runTasks( + new TaskInputs((ClusterStateTaskExecutor) batchingKey, (List) tasks, tasksSummary) + ); } class UpdateTask extends BatchedTask { @@ -180,10 +180,15 @@ class UpdateTask extends BatchedTask { @Override public String describeTasks(List tasks) { - return ((ClusterStateTaskExecutor) batchingKey).describeTasks( - tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()) + return ((ClusterStateTaskExecutor) batchingKey).describeTasks( + tasks.stream().map(task -> (ClusterStateTaskListener) task.task).toList() ); } + + @Override + public ClusterStateTaskListener getTask() { + return (ClusterStateTaskListener) task; + } } } @@ -389,7 +394,7 @@ private void handleException(String summary, long startTimeMillis, ClusterState } private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) { - ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, previousClusterState); + ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, previousClusterState); ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult); return new TaskOutputs( taskInputs, @@ -474,14 +479,14 @@ class TaskOutputs { final ClusterState previousClusterState; final ClusterState newClusterState; final List nonFailedTasks; - final Map executionResults; + final Map executionResults; TaskOutputs( TaskInputs taskInputs, ClusterState previousClusterState, ClusterState newClusterState, List nonFailedTasks, - Map executionResults + Map executionResults ) { this.taskInputs = taskInputs; this.previousClusterState = previousClusterState; @@ -806,10 +811,10 @@ public void onTimeout() { } } - private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) { - ClusterTasksResult clusterTasksResult; + private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) { + ClusterTasksResult clusterTasksResult; try { - List inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); + List inputs = taskInputs.updateTasks.stream().map(Batcher.UpdateTask::getTask).toList(); clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs); if (previousClusterState != clusterTasksResult.resultingState() && previousClusterState.nodes().isLocalNodeElectedMaster() @@ -829,8 +834,8 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterSt ), // may be expensive => construct message lazily e ); - clusterTasksResult = ClusterTasksResult.builder() - .failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e) + clusterTasksResult = ClusterTasksResult.builder() + .failures(taskInputs.updateTasks.stream().map(Batcher.UpdateTask::getTask)::iterator, e) .build(previousClusterState); } @@ -844,7 +849,7 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterSt clusterTasksResult.executionResults().size() ); if (Assertions.ENABLED) { - ClusterTasksResult finalClusterTasksResult = clusterTasksResult; + ClusterTasksResult finalClusterTasksResult = clusterTasksResult; taskInputs.updateTasks.forEach( updateTask -> { assert finalClusterTasksResult.executionResults().containsKey(updateTask.task) @@ -856,11 +861,13 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterSt return clusterTasksResult; } - private List getNonFailedTasks(TaskInputs taskInputs, ClusterTasksResult clusterTasksResult) { + private List getNonFailedTasks( + TaskInputs taskInputs, + ClusterTasksResult clusterTasksResult + ) { return taskInputs.updateTasks.stream().filter(updateTask -> { - assert clusterTasksResult.executionResults().containsKey(updateTask.task) : "missing " + updateTask; - final ClusterStateTaskExecutor.TaskResult taskResult = clusterTasksResult.executionResults().get(updateTask.task); - return taskResult.isSuccess(); + assert clusterTasksResult.executionResults().containsKey(updateTask.getTask()) : "missing " + updateTask; + return clusterTasksResult.executionResults().get(updateTask.getTask()).isSuccess(); }).collect(Collectors.toList()); } @@ -870,9 +877,9 @@ private List getNonFailedTasks(TaskInputs taskInputs, Cluste private class TaskInputs { final String summary; final List updateTasks; - final ClusterStateTaskExecutor executor; + final ClusterStateTaskExecutor executor; - TaskInputs(ClusterStateTaskExecutor executor, List updateTasks, String summary) { + TaskInputs(ClusterStateTaskExecutor executor, List updateTasks, String summary) { this.summary = summary; this.executor = executor; this.updateTasks = updateTasks; diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTaskExecutorTests.java index 714380241189..349ca02d8534 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTaskExecutorTests.java @@ -9,20 +9,24 @@ import org.elasticsearch.test.ESTestCase; -import java.util.Arrays; -import java.util.Collections; +import java.util.List; import static org.hamcrest.Matchers.equalTo; public class ClusterStateTaskExecutorTests extends ESTestCase { - private class TestTask { + private class TestTask implements ClusterStateTaskListener { private final String description; TestTask(String description) { this.description = description; } + @Override + public void onFailure(Exception e) { + throw new AssertionError("Should not fail in test", e); + } + @Override public String toString() { return description == null ? "" : "Task{" + description + "}"; @@ -32,28 +36,18 @@ public String toString() { public void testDescribeTasks() { final ClusterStateTaskExecutor executor = (currentState, tasks) -> { throw new AssertionError("should not be called"); }; - assertThat("describes an empty list", executor.describeTasks(Collections.emptyList()), equalTo("")); - assertThat( - "describes a singleton list", - executor.describeTasks(Collections.singletonList(new TestTask("a task"))), - equalTo("Task{a task}") - ); + assertThat("describes an empty list", executor.describeTasks(List.of()), equalTo("")); + assertThat("describes a singleton list", executor.describeTasks(List.of(new TestTask("a task"))), equalTo("Task{a task}")); assertThat( "describes a list of two tasks", - executor.describeTasks(Arrays.asList(new TestTask("a task"), new TestTask("another task"))), + executor.describeTasks(List.of(new TestTask("a task"), new TestTask("another task"))), equalTo("Task{a task}, Task{another task}") ); - assertThat( - "skips the only item if it has no description", - executor.describeTasks(Collections.singletonList(new TestTask(null))), - equalTo("") - ); + assertThat("skips the only item if it has no description", executor.describeTasks(List.of(new TestTask(null))), equalTo("")); assertThat( "skips an item if it has no description", - executor.describeTasks( - Arrays.asList(new TestTask("a task"), new TestTask(null), new TestTask("another task"), new TestTask(null)) - ), + executor.describeTasks(List.of(new TestTask("a task"), new TestTask(null), new TestTask("another task"), new TestTask(null))), equalTo("Task{a task}, Task{another task}") ); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 66efd2a708e5..d4deb426e9b0 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; import org.elasticsearch.cluster.ClusterStateTaskExecutor.TaskResult; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -101,9 +102,9 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collectors; import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom; +import static java.util.stream.Collectors.toMap; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; import static org.elasticsearch.test.CheckedFunctionUtils.anyCheckedFunction; import static org.hamcrest.Matchers.notNullValue; @@ -325,7 +326,7 @@ public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) newState = MetadataIndexStateServiceUtils.closeRoutingTable( newState, blockedIndices, - blockedIndices.keySet().stream().collect(Collectors.toMap(Function.identity(), CloseIndexResponse.IndexResult::new)) + blockedIndices.keySet().stream().collect(toMap(Function.identity(), CloseIndexResponse.IndexResult::new)) ); return allocationService.reroute(newState, "indices closed"); } @@ -358,7 +359,7 @@ public ClusterState addNodes(ClusterState clusterState, List node ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); }) ) ) - .collect(Collectors.toList()) + .toList() ); } @@ -375,7 +376,7 @@ public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List { throw new AssertionError("should not complete publication"); }) ) ) - .collect(Collectors.toList()) + .toList() ); return runTasks(joinTaskExecutor, clusterState, joinNodes); @@ -385,7 +386,7 @@ public ClusterState removeNodes(ClusterState clusterState, List n return runTasks( nodeRemovalExecutor, clusterState, - nodes.stream().map(n -> new NodeRemovalClusterStateTaskExecutor.Task(n, "dummy reason", () -> {})).collect(Collectors.toList()) + nodes.stream().map(n -> new NodeRemovalClusterStateTaskExecutor.Task(n, "dummy reason", () -> {})).toList() ); } @@ -404,12 +405,12 @@ public ClusterState applyFailedShards(ClusterState clusterState, List startedShards) { - final Map entries = startedShards.stream().collect(Collectors.toMap(Function.identity(), startedShard -> { + final Map entries = startedShards.stream().collect(toMap(Function.identity(), startedShard -> { final IndexMetadata indexMetadata = clusterState.metadata().index(startedShard.shardId().getIndex()); return indexMetadata != null ? indexMetadata.primaryTerm(startedShard.shardId().id()) : 0L; })); @@ -434,11 +435,15 @@ public ClusterState applyStartedShards(ClusterState clusterState, Map ClusterState runTasks(ClusterStateTaskExecutor executor, ClusterState clusterState, List entries) { + private ClusterState runTasks( + ClusterStateTaskExecutor executor, + ClusterState clusterState, + List entries + ) { try { ClusterTasksResult result = executor.execute(clusterState, entries); for (TaskResult taskResult : result.executionResults().values()) {