Skip to content

Commit

Permalink
Deprecate class 'MasterService' and create alternative class 'Cluster…
Browse files Browse the repository at this point in the history
…ManagerService' (#4022) (#4050)

To support inclusive language, the `master` terminology is going to be replaced by `cluster manager` in the code base.

- Deprecate class `MasterService` and create alternative class `ClusterManagerService`.
- Add a unit test to validate the method `ClusterService.getMasterService()` can still return an object in type `MasterService`.
- Rename all the existing references of `MasterService` to `ClusterManagerService`, and rename the local variable names.
- Deprecate public methods `ClusterServiceUtils.createMasterService(...)` and create alternative methods `createClusterManagerService(...)`

Note:
The class `ClusterManagerService` is a subclass of `MasterService`, the inheritance relationship is opposite from most of the other classes with `Master` in the name (that covered by issue #1684).
The reason is:
There is a public method that has return value in type `MasterService`,
https://github.com/opensearch-project/OpenSearch/blob/388c80ad94529b1d9aad0a735c4740dce2932a32/server/src/main/java/org/opensearch/cluster/service/ClusterService.java#L221
And in the code for Performance Analyzer plugin, there is a local variable in type `MasterService`:
https://github.com/opensearch-project/performance-analyzer/blob/5ee4809ac1cda6517ed871aeb12c6635203e7f1d/src/main/java/org/opensearch/performanceanalyzer/collectors/MasterServiceEventMetrics.java#L219
If making the old class `MasterService` a subclass of the new class `ClusterManagerService`, the above usage will be broken.
Reversing the inheritance relationship, I'm able to keep the backwards compatibility of the method `getMasterService()` while deprecating the class `MasterService` and encourage using a new class `ClusterManagerService`.

Signed-off-by: Tianli Feng <[email protected]>
(cherry picked from commit 740f75d)
  • Loading branch information
opensearch-trigger-bot[bot] authored Jul 30, 2022
1 parent e423ec4 commit 3f47160
Show file tree
Hide file tree
Showing 28 changed files with 248 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ public void testDiscoveryStats() throws Exception {
ensureGreen(); // ensures that all events are processed (in particular state recovery fully completed)
assertBusy(
() -> assertThat(
internalCluster().clusterService(internalCluster().getClusterManagerName()).getMasterService().numberOfPendingTasks(),
internalCluster().clusterService(internalCluster().getClusterManagerName())
.getClusterManagerService()
.numberOfPendingTasks(),
equalTo(0)
)
); // see https://github.com/elastic/elasticsearch/issues/24388
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

// The tasks can be re-ordered, so we need to check out-of-order
Set<String> controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
List<PendingClusterTask> pendingClusterTasks = clusterService.getMasterService().pendingTasks();
List<PendingClusterTask> pendingClusterTasks = clusterService.getClusterManagerService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10));
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
Expand All @@ -413,7 +413,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
invoked2.await();

// whenever we test for no tasks, we need to wait since this is a live node
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getMasterService().pendingTasks().isEmpty()));
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getClusterManagerService().pendingTasks().isEmpty()));
waitNoPendingTasksOnAll();

final CountDownLatch block2 = new CountDownLatch(1);
Expand Down Expand Up @@ -453,7 +453,7 @@ public void onFailure(String source, Exception e) {
}
Thread.sleep(100);

pendingClusterTasks = clusterService.getMasterService().pendingTasks();
pendingClusterTasks = clusterService.getClusterManagerService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : pendingClusterTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,9 @@ private boolean validateRequest(final ClusterHealthRequest request, ClusterState
ClusterHealthResponse response = clusterHealth(
request,
clusterState,
clusterService.getMasterService().numberOfPendingTasks(),
clusterService.getClusterManagerService().numberOfPendingTasks(),
allocationService.getNumberOfInFlightFetches(),
clusterService.getMasterService().getMaxTaskWaitTime()
clusterService.getClusterManagerService().getMaxTaskWaitTime()
);
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
}
Expand All @@ -341,9 +341,9 @@ private ClusterHealthResponse getResponse(
ClusterHealthResponse response = clusterHealth(
request,
clusterState,
clusterService.getMasterService().numberOfPendingTasks(),
clusterService.getClusterManagerService().numberOfPendingTasks(),
allocationService.getNumberOfInFlightFetches(),
clusterService.getMasterService().getMaxTaskWaitTime()
clusterService.getClusterManagerService().getMaxTaskWaitTime()
);
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
boolean valid = (readyCounter == waitFor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected void clusterManagerOperation(
ActionListener<PendingClusterTasksResponse> listener
) {
logger.trace("fetching pending tasks from cluster service");
final List<PendingClusterTask> pendingTasks = clusterService.getMasterService().pendingTasks();
final List<PendingClusterTask> pendingTasks = clusterService.getClusterManagerService().pendingTasks();
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

package org.opensearch.cluster;

import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;

import java.util.List;

Expand All @@ -49,15 +49,15 @@ public interface ClusterStateTaskListener {

/**
* called when the task was rejected because the local node is no longer cluster-manager.
* Used only for tasks submitted to {@link MasterService}.
* Used only for tasks submitted to {@link ClusterManagerService}.
*/
default void onNoLongerClusterManager(String source) {
onFailure(source, new NotClusterManagerException("no longer cluster-manager. source: [" + source + "]"));
}

/**
* called when the task was rejected because the local node is no longer cluster-manager.
* Used only for tasks submitted to {@link MasterService}.
* Used only for tasks submitted to {@link ClusterManagerService}.
*
* @deprecated As of 2.1, because supporting inclusive language, replaced by {@link #onNoLongerClusterManager(String)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -141,7 +141,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final boolean singleNodeDiscovery;
private final ElectionStrategy electionStrategy;
private final TransportService transportService;
private final MasterService masterService;
private final ClusterManagerService clusterManagerService;
private final AllocationService allocationService;
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
Expand Down Expand Up @@ -191,7 +191,7 @@ public Coordinator(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
AllocationService allocationService,
MasterService masterService,
ClusterManagerService clusterManagerService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier,
SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier,
Expand All @@ -203,15 +203,15 @@ public Coordinator(
) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(
settings,
allocationService,
masterService,
clusterManagerService,
transportService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
Expand Down Expand Up @@ -260,7 +260,7 @@ public Coordinator(
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForClusterManagerService);
clusterManagerService.setClusterStateSupplier(this::getStateForClusterManagerService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(
settings,
Expand Down Expand Up @@ -310,7 +310,7 @@ private void onLeaderFailure(Exception e) {
private void removeNode(DiscoveryNode discoveryNode, String reason) {
synchronized (mutex) {
if (mode == Mode.LEADER) {
masterService.submitStateUpdateTask(
clusterManagerService.submitStateUpdateTask(
"node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
Expand Down Expand Up @@ -757,7 +757,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
}

private void cleanClusterManagerService() {
masterService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() {
clusterManagerService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() {
@Override
public void onFailure(String source, Exception e) {
// ignore
Expand Down Expand Up @@ -1129,7 +1129,7 @@ private void scheduleReconfigurationIfNeeded() {
final ClusterState state = getLastAcceptedState();
if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) {
logger.trace("scheduling reconfiguration");
masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
clusterManagerService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
reconfigurationTaskScheduled.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Priority;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -106,7 +106,7 @@ public class JoinHelper {
Setting.Property.Deprecated
);

private final MasterService masterService;
private final ClusterManagerService clusterManagerService;
private final TransportService transportService;
private volatile JoinTaskExecutor joinTaskExecutor;

Expand All @@ -122,7 +122,7 @@ public class JoinHelper {
JoinHelper(
Settings settings,
AllocationService allocationService,
MasterService masterService,
ClusterManagerService clusterManagerService,
TransportService transportService,
LongSupplier currentTermSupplier,
Supplier<ClusterState> currentStateSupplier,
Expand All @@ -132,7 +132,7 @@ public class JoinHelper {
RerouteService rerouteService,
NodeHealthService nodeHealthService
) {
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -458,7 +458,7 @@ class LeaderJoinAccumulator implements JoinAccumulator {
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader");
assert joinTaskExecutor != null;
masterService.submitStateUpdateTask(
clusterManagerService.submitStateUpdateTask(
"node-join",
task,
ClusterStateTaskConfig.build(Priority.URGENT),
Expand Down Expand Up @@ -543,7 +543,7 @@ public void close(Mode newMode) {
pendingAsTasks.put(JoinTaskExecutor.newBecomeClusterManagerTask(), (source, e) -> {});
pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {});
joinTaskExecutor = joinTaskExecutorGenerator.get();
masterService.submitStateUpdateTasks(
clusterManagerService.submitStateUpdateTasks(
stateUpdateSource,
pendingAsTasks,
ClusterStateTaskConfig.build(Priority.URGENT),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;

/**
* Main Cluster Manager Node Service
*
* @opensearch.internal
*/
public class ClusterManagerService extends MasterService {
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* @opensearch.internal
*/
public class ClusterService extends AbstractLifecycleComponent {
private final MasterService masterService;
private final ClusterManagerService clusterManagerService;

private final ClusterApplierService clusterApplierService;

Expand Down Expand Up @@ -93,20 +93,20 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
this(
settings,
clusterSettings,
new MasterService(settings, clusterSettings, threadPool),
new ClusterManagerService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
);
}

public ClusterService(
Settings settings,
ClusterSettings clusterSettings,
MasterService masterService,
ClusterManagerService clusterManagerService,
ClusterApplierService clusterApplierService
) {
this.settings = settings;
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.masterService = masterService;
this.clusterManagerService = clusterManagerService;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand All @@ -132,18 +132,18 @@ public RerouteService getRerouteService() {
@Override
protected synchronized void doStart() {
clusterApplierService.start();
masterService.start();
clusterManagerService.start();
}

@Override
protected synchronized void doStop() {
masterService.stop();
clusterManagerService.stop();
clusterApplierService.stop();
}

@Override
protected synchronized void doClose() {
masterService.close();
clusterManagerService.close();
clusterApplierService.close();
}

Expand Down Expand Up @@ -228,8 +228,14 @@ public void addLocalNodeMasterListener(LocalNodeMasterListener listener) {
addLocalNodeClusterManagerListener(listener);
}

public ClusterManagerService getClusterManagerService() {
return clusterManagerService;
}

/** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #getClusterManagerService()} */
@Deprecated
public MasterService getMasterService() {
return masterService;
return clusterManagerService;
}

/**
Expand All @@ -252,7 +258,7 @@ public ClusterApplierService getClusterApplierService() {

public static boolean assertClusterOrClusterManagerStateThread() {
assert Thread.currentThread().getName().contains(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME)
|| Thread.currentThread().getName().contains(MasterService.CLUSTER_MANAGER_UPDATE_THREAD_NAME)
|| Thread.currentThread().getName().contains(ClusterManagerService.CLUSTER_MANAGER_UPDATE_THREAD_NAME)
: "not called from the master/cluster state update thread";
return true;
}
Expand Down Expand Up @@ -349,6 +355,6 @@ public <T> void submitStateUpdateTasks(
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor
) {
masterService.submitStateUpdateTasks(source, tasks, config, executor);
clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@
* Main Master Node Service
*
* @opensearch.internal
* @deprecated As of 2.2, because supporting inclusive language, replaced by {@link ClusterManagerService}.
*/
@Deprecated
public class MasterService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(MasterService.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
Expand Down Expand Up @@ -335,8 +335,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexModule.NODE_STORE_ALLOW_MMAP,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterService.USER_DEFINED_METADATA,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
package org.opensearch.common.util.concurrent;

import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Nullable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;
Expand Down Expand Up @@ -109,7 +109,7 @@ protected boolean blockingAllowed() {
return Transports.assertNotTransportThread(BLOCKING_OP_REASON)
&& ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON)
&& ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON)
&& MasterService.assertNotClusterManagerUpdateThread(BLOCKING_OP_REASON);
&& ClusterManagerService.assertNotClusterManagerUpdateThread(BLOCKING_OP_REASON);
}

@Override
Expand Down
Loading

0 comments on commit 3f47160

Please sign in to comment.