From 4f4e0bcab3dc87c4f4d944a74ecaf6ff84893664 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Thu, 8 Feb 2024 13:55:11 +0530 Subject: [PATCH 1/8] fetching term information from master using light-weight call before fetch of cluster-state Signed-off-by: Rajiv Kumar Vaidyanathan --- .../org/opensearch/action/ActionModule.java | 3 + .../state/TransportClusterStateAction.java | 4 + .../state/term/GetTermVersionAction.java | 26 ++ .../state/term/GetTermVersionRequest.java | 34 ++ .../state/term/GetTermVersionResponse.java | 77 +++++ .../term/TransportGetTermVersionAction.java | 85 +++++ .../cluster/state/term/package-info.java | 10 + .../TransportClusterManagerNodeAction.java | 165 ++++++--- .../state/term/ClusterTermVersionIT.java | 120 +++++++ .../state/term/ClusterTermVersionTests.java | 33 ++ ...TransportClusterManagerTermCheckTests.java | 315 ++++++++++++++++++ 11 files changed, 830 insertions(+), 42 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/state/term/package-info.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java create mode 100644 server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index b19bf9590f43b..f827b7f3f0097 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -107,6 +107,8 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction; import org.opensearch.action.admin.cluster.state.ClusterStateAction; import org.opensearch.action.admin.cluster.state.TransportClusterStateAction; +import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction; +import org.opensearch.action.admin.cluster.state.term.TransportGetTermVersionAction; import org.opensearch.action.admin.cluster.stats.ClusterStatsAction; import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction; import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction; @@ -614,6 +616,7 @@ public void reg actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); + actions.register(GetTermVersionAction.INSTANCE, TransportGetTermVersionAction.class); actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index 4aaa7f1950823..18a8ddd522023 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -231,4 +231,8 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false); } + @Override + protected boolean checkTermVersion() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionAction.java new file mode 100644 index 0000000000000..3344fd549b23f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.opensearch.action.ActionType; + +/** + * Transport action for fetching cluster term and version + * + * @opensearch.internal + */ +public class GetTermVersionAction extends ActionType { + + public static final GetTermVersionAction INSTANCE = new GetTermVersionAction(); + public static final String NAME = "cluster:monitor/term"; + + private GetTermVersionAction() { + super(NAME, GetTermVersionResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionRequest.java new file mode 100644 index 0000000000000..2ea07d7332977 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionRequest.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; +import org.opensearch.core.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * Request object to get cluster term + * + * @opensearch.internal + */ +public class GetTermVersionRequest extends ClusterManagerNodeReadRequest { + + public GetTermVersionRequest() {} + + public GetTermVersionRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java new file mode 100644 index 0000000000000..85f2a7133e576 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Response object of cluster term + * + * @opensearch.internal + */ +public class GetTermVersionResponse extends ActionResponse { + + private final ClusterName clusterName; + private final String stateUUID; + private final long term; + private final long version; + + public GetTermVersionResponse(ClusterName clusterName, String stateUUID, long term, long version) { + this.clusterName = clusterName; + this.stateUUID = stateUUID; + this.term = term; + this.version = version; + } + + public GetTermVersionResponse(StreamInput in) throws IOException { + super(in); + this.clusterName = new ClusterName(in); + this.stateUUID = in.readString(); + this.term = in.readLong(); + this.version = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + clusterName.writeTo(out); + out.writeString(stateUUID); + out.writeLong(term); + out.writeLong(version); + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + public ClusterName getClusterName() { + return clusterName; + } + + public String getStateUUID() { + return stateUUID; + } + + public boolean matches(ClusterState clusterState) { + return clusterName.equals(clusterState.getClusterName()) + && stateUUID.equals(clusterState.stateUUID()) + && term == clusterState.term() + && version == clusterState.version(); + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java new file mode 100644 index 0000000000000..5f51f16c2e80a --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport action for obtaining cluster term and version from cluster-manager + * + * @opensearch.internal + */ +public class TransportGetTermVersionAction extends TransportClusterManagerNodeReadAction { + + private final Logger logger = LogManager.getLogger(getClass()); + + @Inject + public TransportGetTermVersionAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + GetTermVersionAction.NAME, + false, + transportService, + clusterService, + threadPool, + actionFilters, + GetTermVersionRequest::new, + indexNameExpressionResolver + ); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public GetTermVersionResponse read(StreamInput in) throws IOException { + return new GetTermVersionResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(GetTermVersionRequest request, ClusterState state) { + // cluster state term and version needs to be retrieved even on a fully blocked cluster + return null; + } + + @Override + protected void clusterManagerOperation( + GetTermVersionRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + ActionListener.completeWith(listener, () -> buildResponse(request, state)); + } + + private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) { + logger.trace("Serving cluster term version request using term {} and version {}", state.term(), state.version()); + return new GetTermVersionResponse(state.getClusterName(), state.stateUUID(), state.term(), state.getVersion()); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/package-info.java new file mode 100644 index 0000000000000..0ee559c527d7d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Cluster Term transport handler. */ +package org.opensearch.action.admin.cluster.state.term; diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 536ddcdd402e2..329902dc5e397 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -37,6 +37,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction; +import org.opensearch.action.admin.cluster.state.term.GetTermVersionRequest; +import org.opensearch.action.admin.cluster.state.term.GetTermVersionResponse; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; @@ -66,11 +69,14 @@ import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.function.Predicate; +import static org.opensearch.Version.CURRENT; + /** * A base class for operations that needs to be performed on the cluster-manager node. * @@ -252,23 +258,13 @@ protected void doStart(ClusterState clusterState) { }); } } else { - ActionListener delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> { - if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) { - logger.debug( - () -> new ParameterizedMessage( - "master could not publish cluster state or " - + "stepped down before publishing action [{}], scheduling a retry", - actionName - ), - t - ); - retryOnMasterChange(clusterState, t); - } else { - delegatedListener.onFailure(t); - } - }); threadPool.executor(executor) - .execute(ActionRunnable.wrap(delegate, l -> clusterManagerOperation(task, request, clusterState, l))); + .execute( + ActionRunnable.wrap( + getDelegateForLocalExecute(clusterState), + l -> clusterManagerOperation(task, request, clusterState, l) + ) + ); } } else { if (nodes.getClusterManagerNode() == null) { @@ -276,32 +272,12 @@ protected void doStart(ClusterState clusterState) { retryOnMasterChange(clusterState, null); } else { DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode(); - final String actionName = getClusterManagerActionName(clusterManagerNode); - transportService.sendRequest( - clusterManagerNode, - actionName, - request, - new ActionListenerResponseHandler(listener, TransportClusterManagerNodeAction.this::read) { - @Override - public void handleException(final TransportException exp) { - Throwable cause = exp.unwrapCause(); - if (cause instanceof ConnectTransportException - || (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) { - // we want to retry here a bit to see if a new cluster-manager is elected - logger.debug( - "connection exception while trying to forward request with action name [{}] to " - + "master node [{}], scheduling a retry. Error: [{}]", - actionName, - nodes.getClusterManagerNode(), - exp.getDetailedMessage() - ); - retryOnMasterChange(clusterState, cause); - } else { - listener.onFailure(exp); - } - } - } - ); + boolean shouldCheckTerm = clusterManagerNode.getVersion().onOrAfter(CURRENT) && checkTermVersion(); + if (shouldCheckTerm) { + execOnClusterManagerOnTermMismatch(clusterManagerNode, clusterState); + } else { + executeOnClusterManager(clusterManagerNode, clusterState); + } } } } catch (Exception e) { @@ -351,6 +327,101 @@ public void onTimeout(TimeValue timeout) { } }, statePredicate); } + + private ActionListener getDelegateForLocalExecute(ClusterState clusterState) { + return ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) { + logger.debug( + () -> new ParameterizedMessage( + "master could not publish cluster state or " + "stepped down before publishing action [{}], scheduling a retry", + actionName + ), + t + ); + + retryOnMasterChange(clusterState, t); + } else { + delegatedListener.onFailure(t); + } + }); + } + + private void execOnClusterManagerOnTermMismatch(DiscoveryNode clusterManagerNode, ClusterState clusterState) { + transportService.sendRequest( + clusterManagerNode, + GetTermVersionAction.NAME, + new GetTermVersionRequest(), + new TransportResponseHandler() { + @Override + public void handleResponse(GetTermVersionResponse response) { + boolean shouldExecuteOnClusterManger = !response.matches(clusterState); + if (shouldExecuteOnClusterManger) { + executeOnClusterManager(clusterManagerNode, clusterState); + } else { + Runnable runTask = ActionRunnable.wrap( + getDelegateForLocalExecute(clusterState), + l -> clusterManagerOperation(task, request, clusterState, l) + ); + threadPool.executor(executor).execute(runTask); + } + } + + @Override + public void handleException(TransportException exp) { + handleTransportException(clusterManagerNode, clusterState, exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public GetTermVersionResponse read(StreamInput in) throws IOException { + return new GetTermVersionResponse(in); + } + + } + + ); + } + + private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) { + final String actionName = getClusterManagerActionName(clusterManagerNode); + + transportService.sendRequest( + clusterManagerNode, + actionName, + request, + new ActionListenerResponseHandler(listener, TransportClusterManagerNodeAction.this::read) { + @Override + public void handleException(final TransportException exp) { + handleTransportException(clusterManagerNode, clusterState, exp); + } + } + ); + } + + private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterState clusterState, final TransportException exp) { + Throwable cause = exp.unwrapCause(); + if (cause instanceof ConnectTransportException + || (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) { + // we want to retry here a bit to see if a new cluster-manager is elected + + logger.debug( + "connection exception while trying to forward request with action name [{}] to " + + "master node [{}], scheduling a retry. Error: [{}]", + actionName, + clusterManagerNode, + exp.getDetailedMessage() + ); + + retryOnMasterChange(clusterState, cause); + } else { + listener.onFailure(exp); + } + } + } /** @@ -372,4 +443,14 @@ protected String getMasterActionName(DiscoveryNode node) { return getClusterManagerActionName(node); } + /** + * Determines if transport action needs to check local cluster-state term with manager before + * executing the action on manager. This is generally true for actions that are read-only and can be executed locally + * on node if the term matches with cluster-manager. + * @return - true to perform term check and then execute the action + */ + protected boolean checkTermVersion() { + return false; + } + } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java new file mode 100644 index 0000000000000..8740edcea2398 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.opensearch.action.admin.cluster.state.ClusterStateAction; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.is; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class ClusterTermVersionIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(MockTransportService.TestPlugin.class); + } + + public void testClusterStateResponseFromDataNode() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + ensureClusterSizeConsistency(); + ensureGreen(); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.waitForTimeout(TimeValue.timeValueHours(1)); + ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).get(); + assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName())); + assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length)); + assertThat(stateResponse.isWaitForTimedOut(), is(false)); + + } + + public void testClusterStateResponseFromClusterManagerNode() throws Exception { + String master = internalCluster().startClusterManagerOnlyNode(); + String data = internalCluster().startDataOnlyNode(); + ensureClusterSizeConsistency(); + ensureGreen(); + Map callCounters = Map.ofEntries( + Map.entry(ClusterStateAction.NAME, new AtomicInteger()), + Map.entry(GetTermVersionAction.NAME, new AtomicInteger()) + ); + + addCallCountInterceptor(master, callCounters); + + ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(new ClusterStateRequest()).get(); + + AtomicInteger clusterStateCallsOnMaster = callCounters.get(ClusterStateAction.NAME); + AtomicInteger termCallsOnMaster = callCounters.get(GetTermVersionAction.NAME); + + assertThat(clusterStateCallsOnMaster.get(), is(0)); + assertThat(termCallsOnMaster.get(), is(1)); + + assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName())); + assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length)); + + } + + public void testDatanodeOutOfSync() throws Exception { + String master = internalCluster().startClusterManagerOnlyNode(); + String data = internalCluster().startDataOnlyNode(); + ensureClusterSizeConsistency(); + ensureGreen(); + Map callCounters = Map.ofEntries( + Map.entry(ClusterStateAction.NAME, new AtomicInteger()), + Map.entry(GetTermVersionAction.NAME, new AtomicInteger()) + ); + + stubClusterTermResponse(master); + addCallCountInterceptor(master, callCounters); + + ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(new ClusterStateRequest()).get(); + + AtomicInteger clusterStateCallsOnMaster = callCounters.get(ClusterStateAction.NAME); + AtomicInteger termCallsOnMaster = callCounters.get(GetTermVersionAction.NAME); + + assertThat(clusterStateCallsOnMaster.get(), is(1)); + assertThat(termCallsOnMaster.get(), is(1)); + + assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName())); + assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length)); + } + + private void addCallCountInterceptor(String nodeName, Map callCounters) { + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeName); + for (var ctrEnty : callCounters.entrySet()) { + primaryService.addRequestHandlingBehavior(ctrEnty.getKey(), (handler, request, channel, task) -> { + ctrEnty.getValue().incrementAndGet(); + logger.info("--> {} response redirect", ClusterStateAction.NAME); + handler.messageReceived(request, channel, task); + }); + } + } + + private void stubClusterTermResponse(String master) { + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); + primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> { + channel.sendResponse(new GetTermVersionResponse(new ClusterName("test"), "1", -1, -1)); + }); + } + +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java new file mode 100644 index 0000000000000..fba9ef4b2f526 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.state.term; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.is; + +public class ClusterTermVersionTests extends OpenSearchSingleNodeTestCase { + + public void testTransportTermResponse() throws ExecutionException, InterruptedException { + GetTermVersionRequest request = new GetTermVersionRequest(); + GetTermVersionResponse resp = client().execute(GetTermVersionAction.INSTANCE, request).get(); + + final ClusterService clusterService = getInstanceFromNode(ClusterService.class); + final ClusterState clusterState = clusterService.state(); + + assertThat(resp.getTerm(), is(clusterState.term())); + assertThat(resp.getVersion(), is(clusterState.version())); + assertThat(resp.getStateUUID(), is(clusterState.stateUUID())); + assertThat(resp.getClusterName().value().startsWith("single-node-cluster"), is(true)); + } +} diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java new file mode 100644 index 0000000000000..f1e672093f177 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java @@ -0,0 +1,315 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.support.clustermanager; + +import org.opensearch.Version; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.admin.cluster.state.term.GetTermVersionResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.ThreadedActionListener; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.test.ClusterServiceUtils.createClusterService; +import static org.opensearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.equalTo; + +public class TransportClusterManagerTermCheckTests extends OpenSearchTestCase { + private static ThreadPool threadPool; + + private ClusterService clusterService; + private TransportService transportService; + private CapturingTransport transport; + private DiscoveryNode localNode; + private DiscoveryNode remoteNode; + private DiscoveryNode[] allNodes; + + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool("TransportMasterNodeActionTests"); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + transport = new CapturingTransport(); + clusterService = createClusterService(threadPool); + transportService = transport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> clusterService.localNode(), + null, + Collections.emptySet(), + NoopTracer.INSTANCE + ); + transportService.start(); + transportService.acceptIncomingRequests(); + + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + transportService.close(); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + public static class Request extends ClusterManagerNodeRequest { + Request() {} + + Request(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + class Response extends ActionResponse { + private long identity = randomLong(); + + Response() {} + + Response(StreamInput in) throws IOException { + super(in); + identity = in.readLong(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return identity == response.identity; + } + + @Override + public int hashCode() { + return Objects.hash(identity); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(identity); + } + } + + class Action extends TransportClusterManagerNodeAction { + Action(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { + super( + actionName, + transportService, + clusterService, + threadPool, + new ActionFilters(new HashSet<>()), + Request::new, + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) + ); + } + + @Override + protected void doExecute(Task task, final Request request, ActionListener listener) { + // remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER + super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener, false)); + } + + @Override + protected String executor() { + // very lightweight operation in memory, no need to fork to a thread + return ThreadPool.Names.SAME; + } + + @Override + protected boolean checkTermVersion() { + return true; + } + + @Override + protected Response read(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void clusterManagerOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + listener.onResponse(new Response()); // default implementation, overridden in specific tests + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return null; // default implementation, overridden in specific tests + } + } + + public void testTermCheckMatchWithClusterManager() throws ExecutionException, InterruptedException { + setUpCluster(Version.CURRENT); + + TransportClusterManagerTermCheckTests.Request request = new TransportClusterManagerTermCheckTests.Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + new TransportClusterManagerTermCheckTests.Action("internal:testAction", transportService, clusterService, threadPool).execute( + request, + listener + ); + + assertThat(transport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0]; + assertTrue(capturedRequest.node.isClusterManagerNode()); + assertThat(capturedRequest.action, equalTo("cluster:monitor/term")); + GetTermVersionResponse response = new GetTermVersionResponse( + clusterService.state().getClusterName(), + clusterService.state().stateUUID(), + clusterService.state().term(), + clusterService.state().version() + ); + transport.handleResponse(capturedRequest.requestId, response); + assertTrue(listener.isDone()); + } + + public void testTermCheckNoMatchWithClusterManager() throws ExecutionException, InterruptedException { + setUpCluster(Version.CURRENT); + TransportClusterManagerTermCheckTests.Request request = new TransportClusterManagerTermCheckTests.Request(); + + PlainActionFuture listener = new PlainActionFuture<>(); + new TransportClusterManagerTermCheckTests.Action("internal:testAction", transportService, clusterService, threadPool).execute( + request, + listener + ); + + assertThat(transport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest termCheckRequest = transport.capturedRequests()[0]; + assertTrue(termCheckRequest.node.isClusterManagerNode()); + assertThat(termCheckRequest.action, equalTo("cluster:monitor/term")); + GetTermVersionResponse termVersionResponse = new GetTermVersionResponse( + clusterService.state().getClusterName(), + clusterService.state().stateUUID(), + clusterService.state().term(), + clusterService.state().version() - 1 + ); + transport.handleResponse(termCheckRequest.requestId, termVersionResponse); + assertFalse(listener.isDone()); + + assertThat(transport.capturedRequests().length, equalTo(2)); + CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[1]; + assertTrue(capturedRequest.node.isClusterManagerNode()); + assertThat(capturedRequest.request, equalTo(request)); + assertThat(capturedRequest.action, equalTo("internal:testAction")); + + TransportClusterManagerTermCheckTests.Response response = new TransportClusterManagerTermCheckTests.Response(); + transport.handleResponse(capturedRequest.requestId, response); + assertTrue(listener.isDone()); + assertThat(listener.get(), equalTo(response)); + + } + + public void testTermCheckOnOldVersionClusterManager() throws ExecutionException, InterruptedException { + + setUpCluster(Version.V_2_13_0); + TransportClusterManagerTermCheckTests.Request request = new TransportClusterManagerTermCheckTests.Request(); + + PlainActionFuture listener = new PlainActionFuture<>(); + new TransportClusterManagerTermCheckTests.Action("internal:testAction", transportService, clusterService, threadPool).execute( + request, + listener + ); + + assertThat(transport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0]; + assertTrue(capturedRequest.node.isClusterManagerNode()); + assertThat(capturedRequest.request, equalTo(request)); + assertThat(capturedRequest.action, equalTo("internal:testAction")); + + TransportClusterManagerTermCheckTests.Response response = new TransportClusterManagerTermCheckTests.Response(); + transport.handleResponse(capturedRequest.requestId, response); + assertTrue(listener.isDone()); + assertThat(listener.get(), equalTo(response)); + + } + + private void setUpCluster(Version clusterManagerVersion) { + localNode = new DiscoveryNode( + "local_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + remoteNode = new DiscoveryNode( + "remote_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + clusterManagerVersion + ); + allNodes = new DiscoveryNode[] { localNode, remoteNode }; + setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); + + } +} From 96ed4227b3e2a6d44a8cac4a5569293b178473ca Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Wed, 28 Feb 2024 19:04:23 +0530 Subject: [PATCH 2/8] added GetTermVersion action dependency for test Signed-off-by: Rajiv Kumar Vaidyanathan --- .../snapshots/SnapshotResiliencyTests.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 635939e68de71..58315ba031b84 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -66,6 +66,8 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.cluster.state.TransportClusterStateAction; +import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction; +import org.opensearch.action.admin.cluster.state.term.TransportGetTermVersionAction; import org.opensearch.action.admin.indices.create.CreateIndexAction; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; @@ -2437,6 +2439,18 @@ public void onFailure(final Exception e) { indexNameExpressionResolver ) ); + + actions.put( + GetTermVersionAction.INSTANCE, + new TransportGetTermVersionAction( + transportService, + clusterService, + threadPool, + actionFilters, + indexNameExpressionResolver + ) + ); + DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry(); dynamicActionRegistry.registerUnmodifiableActionMap(actions); client.initialize( From a872f9f805aeeaf5a7980f07ff31d892b1d21513 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Thu, 7 Mar 2024 20:37:12 +0530 Subject: [PATCH 3/8] refactored NodeAction methods to avoid naming on TermCheck verification details Signed-off-by: Rajiv Kumar Vaidyanathan --- .../state/TransportClusterStateAction.java | 2 +- .../state/term/GetTermVersionResponse.java | 16 +-- .../term/TransportGetTermVersionAction.java | 3 +- .../TransportClusterManagerNodeAction.java | 103 +++++++++++------- .../state/term/ClusterTermVersionTests.java | 2 +- ...ransportClusterManagerNodeActionTests.java | 18 --- ...TransportClusterManagerTermCheckTests.java | 4 +- 7 files changed, 74 insertions(+), 74 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index 18a8ddd522023..9f40fe2e83461 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -232,7 +232,7 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi } @Override - protected boolean checkTermVersion() { + protected boolean canUseLocalNodeClusterState() { return true; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java index 85f2a7133e576..91e53e8d2b146 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java @@ -24,13 +24,13 @@ public class GetTermVersionResponse extends ActionResponse { private final ClusterName clusterName; - private final String stateUUID; + private final String clusterUUID; private final long term; private final long version; - public GetTermVersionResponse(ClusterName clusterName, String stateUUID, long term, long version) { + public GetTermVersionResponse(ClusterName clusterName, String clusterUUID, long term, long version) { this.clusterName = clusterName; - this.stateUUID = stateUUID; + this.clusterUUID = clusterUUID; this.term = term; this.version = version; } @@ -38,7 +38,7 @@ public GetTermVersionResponse(ClusterName clusterName, String stateUUID, long te public GetTermVersionResponse(StreamInput in) throws IOException { super(in); this.clusterName = new ClusterName(in); - this.stateUUID = in.readString(); + this.clusterUUID = in.readString(); this.term = in.readLong(); this.version = in.readLong(); } @@ -46,7 +46,7 @@ public GetTermVersionResponse(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { clusterName.writeTo(out); - out.writeString(stateUUID); + out.writeString(clusterUUID); out.writeLong(term); out.writeLong(version); } @@ -63,13 +63,13 @@ public ClusterName getClusterName() { return clusterName; } - public String getStateUUID() { - return stateUUID; + public String getClusterUUID() { + return clusterUUID; } public boolean matches(ClusterState clusterState) { return clusterName.equals(clusterState.getClusterName()) - && stateUUID.equals(clusterState.stateUUID()) + && clusterUUID.equals(clusterState.metadata().clusterUUID()) && term == clusterState.term() && version == clusterState.version(); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java index 5f51f16c2e80a..c043501ad0c4a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java @@ -79,7 +79,6 @@ protected void clusterManagerOperation( } private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) { - logger.trace("Serving cluster term version request using term {} and version {}", state.term(), state.version()); - return new GetTermVersionResponse(state.getClusterName(), state.stateUUID(), state.term(), state.getVersion()); + return new GetTermVersionResponse(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion()); } } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 329902dc5e397..57ea8f5382fab 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -73,9 +73,11 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Predicate; -import static org.opensearch.Version.CURRENT; +import static org.opensearch.Version.V_3_0_0; /** * A base class for operations that needs to be performed on the cluster-manager node. @@ -272,9 +274,12 @@ protected void doStart(ClusterState clusterState) { retryOnMasterChange(clusterState, null); } else { DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode(); - boolean shouldCheckTerm = clusterManagerNode.getVersion().onOrAfter(CURRENT) && checkTermVersion(); - if (shouldCheckTerm) { - execOnClusterManagerOnTermMismatch(clusterManagerNode, clusterState); + if (clusterManagerNode.getVersion().onOrAfter(V_3_0_0) && canUseLocalNodeClusterState()) { + BiConsumer executeOnLocalOrClusterManager = clusterStateLatestChecker( + this::executeOnLocalNode, + this::executeOnClusterManager + ); + executeOnLocalOrClusterManager.accept(clusterManagerNode, clusterState); } else { executeOnClusterManager(clusterManagerNode, clusterState); } @@ -333,7 +338,8 @@ private ActionListener getDelegateForLocalExecute(ClusterState cluster if (t instanceof FailedToCommitClusterStateException || t instanceof NotClusterManagerException) { logger.debug( () -> new ParameterizedMessage( - "master could not publish cluster state or " + "stepped down before publishing action [{}], scheduling a retry", + "cluster-manager could not publish cluster state or " + + "stepped down before publishing action [{}], scheduling a retry", actionName ), t @@ -346,44 +352,58 @@ private ActionListener getDelegateForLocalExecute(ClusterState cluster }); } - private void execOnClusterManagerOnTermMismatch(DiscoveryNode clusterManagerNode, ClusterState clusterState) { - transportService.sendRequest( - clusterManagerNode, - GetTermVersionAction.NAME, - new GetTermVersionRequest(), - new TransportResponseHandler() { - @Override - public void handleResponse(GetTermVersionResponse response) { - boolean shouldExecuteOnClusterManger = !response.matches(clusterState); - if (shouldExecuteOnClusterManger) { - executeOnClusterManager(clusterManagerNode, clusterState); - } else { - Runnable runTask = ActionRunnable.wrap( - getDelegateForLocalExecute(clusterState), - l -> clusterManagerOperation(task, request, clusterState, l) + protected BiConsumer clusterStateLatestChecker( + Consumer onLatestWithLocalState, + BiConsumer onMisMatchWithLocalState + ) { + return (clusterManagerNode, clusterState) -> { + transportService.sendRequest( + clusterManagerNode, + GetTermVersionAction.NAME, + new GetTermVersionRequest(), + new TransportResponseHandler() { + @Override + public void handleResponse(GetTermVersionResponse response) { + boolean isLatestClusterStatePresentOnLocalNode = response.matches(clusterState); + logger.trace( + "Received GetTermVersionResponse response : term {}, version {}, latest-on-local {}", + response.getTerm(), + response.getVersion(), + isLatestClusterStatePresentOnLocalNode ); - threadPool.executor(executor).execute(runTask); + if (isLatestClusterStatePresentOnLocalNode) { + onLatestWithLocalState.accept(clusterState); + } else { + onMisMatchWithLocalState.accept(clusterManagerNode, clusterState); + } } - } - @Override - public void handleException(TransportException exp) { - handleTransportException(clusterManagerNode, clusterState, exp); - } + @Override + public void handleException(TransportException exp) { + handleTransportException(clusterManagerNode, clusterState, exp); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public String executor() { + return ThreadPool.Names.SAME; + } - @Override - public GetTermVersionResponse read(StreamInput in) throws IOException { - return new GetTermVersionResponse(in); - } + @Override + public GetTermVersionResponse read(StreamInput in) throws IOException { + return new GetTermVersionResponse(in); + } - } + } + ); + }; + } + private void executeOnLocalNode(ClusterState localClusterState) { + Runnable runTask = ActionRunnable.wrap( + getDelegateForLocalExecute(localClusterState), + l -> clusterManagerOperation(task, request, localClusterState, l) ); + threadPool.executor(executor).execute(runTask); } private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) { @@ -421,7 +441,6 @@ private void handleTransportException(DiscoveryNode clusterManagerNode, ClusterS listener.onFailure(exp); } } - } /** @@ -444,13 +463,13 @@ protected String getMasterActionName(DiscoveryNode node) { } /** - * Determines if transport action needs to check local cluster-state term with manager before - * executing the action on manager. This is generally true for actions that are read-only and can be executed locally - * on node if the term matches with cluster-manager. - * @return - true to perform term check and then execute the action + * Override to true if the transport action need NOT be executed always on cluster-manager (example Read-only actions). + * The action is executed locally if this method returns true AND + * the ClusterState on local node is in-sync with ClusterManager. + * + * @return - boolean if the action can be run locally */ - protected boolean checkTermVersion() { + protected boolean canUseLocalNodeClusterState() { return false; } - } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java index fba9ef4b2f526..0d7c5ae3df670 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java @@ -27,7 +27,7 @@ public void testTransportTermResponse() throws ExecutionException, InterruptedEx assertThat(resp.getTerm(), is(clusterState.term())); assertThat(resp.getVersion(), is(clusterState.version())); - assertThat(resp.getStateUUID(), is(clusterState.stateUUID())); + assertThat(resp.getClusterUUID(), is(clusterState.metadata().clusterUUID())); assertThat(resp.getClusterName().value().startsWith("single-node-cluster"), is(true)); } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 9ae1310a8b15c..538416e1137f5 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -6,24 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java index f1e672093f177..0dce99e6b6255 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java @@ -189,7 +189,7 @@ protected String executor() { } @Override - protected boolean checkTermVersion() { + protected boolean canUseLocalNodeClusterState() { return true; } @@ -225,7 +225,7 @@ public void testTermCheckMatchWithClusterManager() throws ExecutionException, In assertThat(capturedRequest.action, equalTo("cluster:monitor/term")); GetTermVersionResponse response = new GetTermVersionResponse( clusterService.state().getClusterName(), - clusterService.state().stateUUID(), + clusterService.state().metadata().clusterUUID(), clusterService.state().term(), clusterService.state().version() ); From c18fd5e43358fa5b9b62a94bb506860bb1af0e00 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Fri, 8 Mar 2024 18:51:10 +0530 Subject: [PATCH 4/8] added ClusterStateTermVersion as a seperate class for reuse Signed-off-by: Rajiv Kumar Vaidyanathan --- .../state/term/GetTermVersionResponse.java | 49 +++------ .../term/TransportGetTermVersionAction.java | 5 +- .../TransportClusterManagerNodeAction.java | 5 +- .../coordination/ClusterStateTermVersion.java | 99 +++++++++++++++++++ .../state/term/ClusterTermVersionIT.java | 3 +- .../state/term/ClusterTermVersionTests.java | 9 +- ...TransportClusterManagerTermCheckTests.java | 21 ++-- 7 files changed, 134 insertions(+), 57 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java index 91e53e8d2b146..4b0cfce9f717f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java @@ -8,8 +8,8 @@ package org.opensearch.action.admin.cluster.state.term; -import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -23,55 +23,32 @@ */ public class GetTermVersionResponse extends ActionResponse { - private final ClusterName clusterName; - private final String clusterUUID; - private final long term; - private final long version; + private final ClusterStateTermVersion clusterStateTermVersion; - public GetTermVersionResponse(ClusterName clusterName, String clusterUUID, long term, long version) { - this.clusterName = clusterName; - this.clusterUUID = clusterUUID; - this.term = term; - this.version = version; + public GetTermVersionResponse(ClusterStateTermVersion clusterStateTermVersion) { + this.clusterStateTermVersion = clusterStateTermVersion; } public GetTermVersionResponse(StreamInput in) throws IOException { super(in); - this.clusterName = new ClusterName(in); - this.clusterUUID = in.readString(); - this.term = in.readLong(); - this.version = in.readLong(); + this.clusterStateTermVersion = new ClusterStateTermVersion(in); } @Override public void writeTo(StreamOutput out) throws IOException { - clusterName.writeTo(out); - out.writeString(clusterUUID); - out.writeLong(term); - out.writeLong(version); + clusterStateTermVersion.writeTo(out); } - public long getTerm() { - return term; - } - - public long getVersion() { - return version; - } - - public ClusterName getClusterName() { - return clusterName; - } - - public String getClusterUUID() { - return clusterUUID; + public ClusterStateTermVersion getClusterStateTermVersion() { + return clusterStateTermVersion; } public boolean matches(ClusterState clusterState) { - return clusterName.equals(clusterState.getClusterName()) - && clusterUUID.equals(clusterState.metadata().clusterUUID()) - && term == clusterState.term() - && version == clusterState.version(); + return clusterStateTermVersion != null + && clusterStateTermVersion.getClusterName().equals(clusterState.getClusterName()) + && clusterStateTermVersion.getClusterUUID().equals(clusterState.metadata().clusterUUID()) + && clusterStateTermVersion.getTerm() == clusterState.term() + && clusterStateTermVersion.getVersion() == clusterState.version(); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java index c043501ad0c4a..6c2e20695df5e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java @@ -14,6 +14,7 @@ import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -79,6 +80,8 @@ protected void clusterManagerOperation( } private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) { - return new GetTermVersionResponse(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion()); + return new GetTermVersionResponse( + new ClusterStateTermVersion(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion()) + ); } } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 57ea8f5382fab..a976dd87ca213 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -366,9 +366,8 @@ protected BiConsumer clusterStateLatestChecker( public void handleResponse(GetTermVersionResponse response) { boolean isLatestClusterStatePresentOnLocalNode = response.matches(clusterState); logger.trace( - "Received GetTermVersionResponse response : term {}, version {}, latest-on-local {}", - response.getTerm(), - response.getVersion(), + "Received GetTermVersionResponse response : ClusterStateTermVersion {}, latest-on-local {}", + response.getClusterStateTermVersion(), isLatestClusterStatePresentOnLocalNode ); if (isLatestClusterStatePresentOnLocalNode) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java new file mode 100644 index 0000000000000..5c292e85d5301 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; + +public class ClusterStateTermVersion implements Writeable { + + private final ClusterName clusterName; + private final String clusterUUID; + private final long term; + private final long version; + + public ClusterStateTermVersion(ClusterName clusterName, String clusterUUID, long term, long version) { + this.clusterName = clusterName; + this.clusterUUID = clusterUUID; + this.term = term; + this.version = version; + } + + public ClusterStateTermVersion(StreamInput in) throws IOException { + this.clusterName = new ClusterName(in); + this.clusterUUID = in.readString(); + this.term = in.readLong(); + this.version = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + clusterName.writeTo(out); + out.writeString(clusterUUID); + out.writeLong(term); + out.writeLong(version); + } + + public ClusterName getClusterName() { + return clusterName; + } + + public String getClusterUUID() { + return clusterUUID; + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + @Override + public String toString() { + return "ClusterStateTermVersion{" + + "clusterName=" + + clusterName + + ", clusterUUID='" + + clusterUUID + + '\'' + + ", term=" + + term + + ", version=" + + version + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ClusterStateTermVersion that = (ClusterStateTermVersion) o; + + if (term != that.term) return false; + if (version != that.version) return false; + if (!clusterName.equals(that.clusterName)) return false; + return clusterUUID.equals(that.clusterUUID); + } + + @Override + public int hashCode() { + int result = clusterName.hashCode(); + result = 31 * result + clusterUUID.hashCode(); + result = 31 * result + (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (version ^ (version >>> 32)); + return result; + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java index 8740edcea2398..fa2a6121af349 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionIT.java @@ -12,6 +12,7 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -113,7 +114,7 @@ private void addCallCountInterceptor(String nodeName, Map private void stubClusterTermResponse(String master) { MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> { - channel.sendResponse(new GetTermVersionResponse(new ClusterName("test"), "1", -1, -1)); + channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))); }); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java index 0d7c5ae3df670..22d9623eebdbe 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/state/term/ClusterTermVersionTests.java @@ -8,14 +8,11 @@ package org.opensearch.action.admin.cluster.state.term; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.util.concurrent.ExecutionException; -import static org.hamcrest.Matchers.is; - public class ClusterTermVersionTests extends OpenSearchSingleNodeTestCase { public void testTransportTermResponse() throws ExecutionException, InterruptedException { @@ -23,11 +20,7 @@ public void testTransportTermResponse() throws ExecutionException, InterruptedEx GetTermVersionResponse resp = client().execute(GetTermVersionAction.INSTANCE, request).get(); final ClusterService clusterService = getInstanceFromNode(ClusterService.class); - final ClusterState clusterState = clusterService.state(); - assertThat(resp.getTerm(), is(clusterState.term())); - assertThat(resp.getVersion(), is(clusterState.version())); - assertThat(resp.getClusterUUID(), is(clusterState.metadata().clusterUUID())); - assertThat(resp.getClusterName().value().startsWith("single-node-cluster"), is(true)); + assertTrue(resp.matches(clusterService.state())); } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java index 0dce99e6b6255..1890a8d646e35 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java @@ -40,6 +40,7 @@ import org.opensearch.action.support.replication.ClusterStateCreationUtils; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -224,10 +225,12 @@ public void testTermCheckMatchWithClusterManager() throws ExecutionException, In assertTrue(capturedRequest.node.isClusterManagerNode()); assertThat(capturedRequest.action, equalTo("cluster:monitor/term")); GetTermVersionResponse response = new GetTermVersionResponse( - clusterService.state().getClusterName(), - clusterService.state().metadata().clusterUUID(), - clusterService.state().term(), - clusterService.state().version() + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term(), + clusterService.state().version() + ) ); transport.handleResponse(capturedRequest.requestId, response); assertTrue(listener.isDone()); @@ -248,10 +251,12 @@ public void testTermCheckNoMatchWithClusterManager() throws ExecutionException, assertTrue(termCheckRequest.node.isClusterManagerNode()); assertThat(termCheckRequest.action, equalTo("cluster:monitor/term")); GetTermVersionResponse termVersionResponse = new GetTermVersionResponse( - clusterService.state().getClusterName(), - clusterService.state().stateUUID(), - clusterService.state().term(), - clusterService.state().version() - 1 + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().stateUUID(), + clusterService.state().term(), + clusterService.state().version() - 1 + ) ); transport.handleResponse(termCheckRequest.requestId, termVersionResponse); assertFalse(listener.isDone()); From 15769e45af37e3a8a77b3f0941a5f78d049cd068 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Mon, 11 Mar 2024 20:07:41 +0530 Subject: [PATCH 5/8] TransportClusterState - Changed the assert expectation to include for case where the local node has the latest cluster-state Signed-off-by: Rajiv Kumar Vaidyanathan --- .../admin/cluster/state/TransportClusterStateAction.java | 7 +++++-- .../cluster/coordination/ClusterStateTermVersion.java | 3 +++ .../TransportClusterManagerTermCheckTests.java | 6 +++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index 9f40fe2e83461..fdcb3ee6346b6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -125,9 +125,12 @@ protected void clusterManagerOperation( ? clusterState -> true : clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion(); + // action will be executed on local node, if either the request is local only (or) the local node has the same cluster-state as + // ClusterManager final Predicate acceptableClusterStateOrNotMasterPredicate = request.local() - ? acceptableClusterStatePredicate - : acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedClusterManager() == false); + || !state.nodes().isLocalNodeElectedClusterManager() + ? acceptableClusterStatePredicate + : acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedClusterManager() == false); if (acceptableClusterStatePredicate.test(state)) { ActionListener.completeWith(listener, () -> buildResponse(request, state)); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java index 5c292e85d5301..bc992826c400f 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java @@ -15,6 +15,9 @@ import java.io.IOException; +/** + * Identifies a specific version of ClusterState at a node. + */ public class ClusterStateTermVersion implements Writeable { private final ClusterName clusterName; diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java index 1890a8d646e35..d22d4871d92af 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java @@ -250,15 +250,15 @@ public void testTermCheckNoMatchWithClusterManager() throws ExecutionException, CapturingTransport.CapturedRequest termCheckRequest = transport.capturedRequests()[0]; assertTrue(termCheckRequest.node.isClusterManagerNode()); assertThat(termCheckRequest.action, equalTo("cluster:monitor/term")); - GetTermVersionResponse termVersionResponse = new GetTermVersionResponse( + GetTermVersionResponse noMatchResponse = new GetTermVersionResponse( new ClusterStateTermVersion( clusterService.state().getClusterName(), - clusterService.state().stateUUID(), + clusterService.state().metadata().clusterUUID(), clusterService.state().term(), clusterService.state().version() - 1 ) ); - transport.handleResponse(termCheckRequest.requestId, termVersionResponse); + transport.handleResponse(termCheckRequest.requestId, noMatchResponse); assertFalse(listener.isDone()); assertThat(transport.capturedRequests().length, equalTo(2)); From ea686202d3278b6e48369125845953eaac9b6452 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Tue, 12 Mar 2024 20:10:31 +0530 Subject: [PATCH 6/8] test to fetch large cluster-state with enabled/disabled term version check Signed-off-by: Rajiv Kumar Vaidyanathan --- .../cluster/state/FetchByTermVersionIT.java | 161 ++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 server/src/internalClusterTest/java/org/opensearch/cluster/state/FetchByTermVersionIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/state/FetchByTermVersionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/state/FetchByTermVersionIT.java new file mode 100644 index 0000000000000..cef184b3fddf9 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/state/FetchByTermVersionIT.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.state; + +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction; +import org.opensearch.action.admin.cluster.state.term.GetTermVersionResponse; +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.is; + +@SuppressWarnings("unchecked") +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class FetchByTermVersionIT extends OpenSearchIntegTestCase { + + AtomicBoolean isTermVersionCheckEnabled = new AtomicBoolean(); + + protected Collection> nodePlugins() { + return List.of(MockTransportService.TestPlugin.class); + } + + AtomicBoolean forceFetchFromCM = new AtomicBoolean(); + + public void testClusterStateResponseFromDataNode() throws Exception { + String cm = internalCluster().startClusterManagerOnlyNode(); + List dns = internalCluster().startDataOnlyNodes(5); + int numberOfShards = dns.size(); + stubClusterTermResponse(cm); + + ensureClusterSizeConsistency(); + ensureGreen(); + + List indices = new ArrayList<>(); + + // Create a large sized cluster-state by creating field mappings + IntStream.range(0, 20).forEachOrdered(n -> { + String index = "index_" + n; + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE) + .build() + ); + indices.add(index); + }); + IntStream.range(0, 5).forEachOrdered(n -> { + List mappings = new ArrayList<>(); + for (int i = 0; i < 2000; i++) { + mappings.add("t-123456789-123456789-" + n + "-" + i); + mappings.add("type=keyword"); + } + PutMappingRequest request = new PutMappingRequest().source(mappings.toArray(new String[0])) + .indices(indices.toArray(new String[0])); + internalCluster().dataNodeClient().admin().indices().putMapping(request).actionGet(); + }); + ensureGreen(); + + ClusterStateResponse stateResponseM = internalCluster().clusterManagerClient() + .admin() + .cluster() + .state(new ClusterStateRequest()) + .actionGet(); + + waitUntil(() -> { + ClusterStateResponse stateResponseD = internalCluster().dataNodeClient() + .admin() + .cluster() + .state(new ClusterStateRequest()) + .actionGet(); + return stateResponseD.getState().stateUUID().equals(stateResponseM.getState().stateUUID()); + }); + // cluster state response time with term check enabled on datanode + isTermVersionCheckEnabled.set(true); + { + List latencies = new ArrayList<>(); + IntStream.range(0, 50).forEachOrdered(n1 -> { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + long start = System.currentTimeMillis(); + ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).actionGet(); + latencies.add(System.currentTimeMillis() - start); + assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName())); + assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length)); + assertThat(stateResponse.getState().metadata().indices().size(), is(indices.size())); + Map fieldMappings = (Map) stateResponse.getState() + .metadata() + .index(indices.get(0)) + .mapping() + .sourceAsMap() + .get("properties"); + + assertThat(fieldMappings.size(), is(10000)); + }); + Collections.sort(latencies); + + logger.info("cluster().state() fetch with Term Version enabled took {} milliseconds", (latencies.get(latencies.size() / 2))); + } + // cluster state response time with term check disabled on datanode + isTermVersionCheckEnabled.set(false); + { + List latencies = new ArrayList<>(); + IntStream.range(0, 50).forEachOrdered(n1 -> { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + long start = System.currentTimeMillis(); + ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).actionGet(); + latencies.add(System.currentTimeMillis() - start); + assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName())); + assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length)); + assertThat(stateResponse.getState().metadata().indices().size(), is(indices.size())); + Map typeProperties = (Map) stateResponse.getState() + .metadata() + .index(indices.get(0)) + .mapping() + .sourceAsMap() + .get("properties"); + assertThat(typeProperties.size(), is(10000)); + + }); + Collections.sort(latencies); + logger.info("cluster().state() fetch with Term Version disabled took {} milliseconds", (latencies.get(latencies.size() / 2))); + } + + } + + private void stubClusterTermResponse(String master) { + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); + primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> { + if (isTermVersionCheckEnabled.get()) { + handler.messageReceived(request, channel, task); + } else { + // always return response that does not match + channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))); + } + }); + } +} From d20d39feaccea5c2521193167761a22135ddd470 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Fri, 15 Mar 2024 11:59:33 +0530 Subject: [PATCH 7/8] construct clusterstateterm from clusterstate Signed-off-by: Rajiv Kumar Vaidyanathan --- .../admin/cluster/state/term/GetTermVersionResponse.java | 6 +----- .../cluster/state/term/TransportGetTermVersionAction.java | 4 +--- .../cluster/coordination/ClusterStateTermVersion.java | 8 ++++++++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java index 4b0cfce9f717f..16b355a80d1f2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionResponse.java @@ -44,11 +44,7 @@ public ClusterStateTermVersion getClusterStateTermVersion() { } public boolean matches(ClusterState clusterState) { - return clusterStateTermVersion != null - && clusterStateTermVersion.getClusterName().equals(clusterState.getClusterName()) - && clusterStateTermVersion.getClusterUUID().equals(clusterState.metadata().clusterUUID()) - && clusterStateTermVersion.getTerm() == clusterState.term() - && clusterStateTermVersion.getVersion() == clusterState.version(); + return clusterStateTermVersion != null && clusterStateTermVersion.equals(new ClusterStateTermVersion(clusterState)); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java index 6c2e20695df5e..88305252aa99c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/TransportGetTermVersionAction.java @@ -80,8 +80,6 @@ protected void clusterManagerOperation( } private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) { - return new GetTermVersionResponse( - new ClusterStateTermVersion(state.getClusterName(), state.metadata().clusterUUID(), state.term(), state.getVersion()) - ); + return new GetTermVersionResponse(new ClusterStateTermVersion(state)); } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java index bc992826c400f..b317b0d362825 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateTermVersion.java @@ -9,6 +9,7 @@ package org.opensearch.cluster.coordination; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -39,6 +40,13 @@ public ClusterStateTermVersion(StreamInput in) throws IOException { this.version = in.readLong(); } + public ClusterStateTermVersion(ClusterState state) { + this.clusterName = state.getClusterName(); + this.clusterUUID = state.metadata().clusterUUID(); + this.term = state.term(); + this.version = state.version(); + } + @Override public void writeTo(StreamOutput out) throws IOException { clusterName.writeTo(out); From f54184fbac31987a07df7bfbbb3391ff31e88752 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Tue, 19 Mar 2024 12:00:10 +0530 Subject: [PATCH 8/8] updated changelog and renamed localExecuteSupportedByAction Signed-off-by: Rajiv Kumar Vaidyanathan --- CHANGELOG.md | 1 + .../cluster/state/TransportClusterStateAction.java | 2 +- .../cluster/state/term/GetTermVersionRequest.java | 2 +- .../TransportClusterManagerNodeAction.java | 14 +++++++------- .../TransportClusterManagerTermCheckTests.java | 4 ++-- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b5a7cc705a79..5fb89180a0887 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -120,6 +120,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583)) - Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710)) - Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435)) +- Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index fdcb3ee6346b6..cae465a90446e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -235,7 +235,7 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi } @Override - protected boolean canUseLocalNodeClusterState() { + protected boolean localExecuteSupportedByAction() { return true; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionRequest.java index 2ea07d7332977..b099f8087bd15 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/term/GetTermVersionRequest.java @@ -15,7 +15,7 @@ import java.io.IOException; /** - * Request object to get cluster term + * Request object to get cluster term and version * * @opensearch.internal */ diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index a976dd87ca213..6a081f9dfecde 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -274,7 +274,7 @@ protected void doStart(ClusterState clusterState) { retryOnMasterChange(clusterState, null); } else { DiscoveryNode clusterManagerNode = nodes.getClusterManagerNode(); - if (clusterManagerNode.getVersion().onOrAfter(V_3_0_0) && canUseLocalNodeClusterState()) { + if (clusterManagerNode.getVersion().onOrAfter(V_3_0_0) && localExecuteSupportedByAction()) { BiConsumer executeOnLocalOrClusterManager = clusterStateLatestChecker( this::executeOnLocalNode, this::executeOnClusterManager @@ -353,8 +353,8 @@ private ActionListener getDelegateForLocalExecute(ClusterState cluster } protected BiConsumer clusterStateLatestChecker( - Consumer onLatestWithLocalState, - BiConsumer onMisMatchWithLocalState + Consumer onLatestLocalState, + BiConsumer onStaleLocalState ) { return (clusterManagerNode, clusterState) -> { transportService.sendRequest( @@ -371,9 +371,9 @@ public void handleResponse(GetTermVersionResponse response) { isLatestClusterStatePresentOnLocalNode ); if (isLatestClusterStatePresentOnLocalNode) { - onLatestWithLocalState.accept(clusterState); + onLatestLocalState.accept(clusterState); } else { - onMisMatchWithLocalState.accept(clusterManagerNode, clusterState); + onStaleLocalState.accept(clusterManagerNode, clusterState); } } @@ -462,13 +462,13 @@ protected String getMasterActionName(DiscoveryNode node) { } /** - * Override to true if the transport action need NOT be executed always on cluster-manager (example Read-only actions). + * Override to true if the transport action can be executed locally and need NOT be executed always on cluster-manager (Read actions). * The action is executed locally if this method returns true AND * the ClusterState on local node is in-sync with ClusterManager. * * @return - boolean if the action can be run locally */ - protected boolean canUseLocalNodeClusterState() { + protected boolean localExecuteSupportedByAction() { return false; } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java index d22d4871d92af..8c7b7a0940c82 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerTermCheckTests.java @@ -190,7 +190,7 @@ protected String executor() { } @Override - protected boolean canUseLocalNodeClusterState() { + protected boolean localExecuteSupportedByAction() { return true; } @@ -276,7 +276,7 @@ public void testTermCheckNoMatchWithClusterManager() throws ExecutionException, public void testTermCheckOnOldVersionClusterManager() throws ExecutionException, InterruptedException { - setUpCluster(Version.V_2_13_0); + setUpCluster(Version.V_2_12_0); TransportClusterManagerTermCheckTests.Request request = new TransportClusterManagerTermCheckTests.Request(); PlainActionFuture listener = new PlainActionFuture<>();