Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Light weight Transport action to verify local term before fetching cluster-state from remote #12252

Merged
merged 8 commits into from
Mar 20, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
- Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710))
- Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435))
- Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/))

### Dependencies
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.state;

import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction;
import org.opensearch.action.admin.cluster.state.term.GetTermVersionResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.is;

@SuppressWarnings("unchecked")
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class FetchByTermVersionIT extends OpenSearchIntegTestCase {

AtomicBoolean isTermVersionCheckEnabled = new AtomicBoolean();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}

AtomicBoolean forceFetchFromCM = new AtomicBoolean();

public void testClusterStateResponseFromDataNode() throws Exception {
String cm = internalCluster().startClusterManagerOnlyNode();
List<String> dns = internalCluster().startDataOnlyNodes(5);
int numberOfShards = dns.size();
stubClusterTermResponse(cm);

ensureClusterSizeConsistency();
ensureGreen();

List<String> indices = new ArrayList<>();

// Create a large sized cluster-state by creating field mappings
IntStream.range(0, 20).forEachOrdered(n -> {
String index = "index_" + n;
createIndex(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
.build()
);
indices.add(index);
});
IntStream.range(0, 5).forEachOrdered(n -> {
List<String> mappings = new ArrayList<>();
for (int i = 0; i < 2000; i++) {
mappings.add("t-123456789-123456789-" + n + "-" + i);
mappings.add("type=keyword");
}
PutMappingRequest request = new PutMappingRequest().source(mappings.toArray(new String[0]))
.indices(indices.toArray(new String[0]));
internalCluster().dataNodeClient().admin().indices().putMapping(request).actionGet();
});
ensureGreen();

ClusterStateResponse stateResponseM = internalCluster().clusterManagerClient()
.admin()
.cluster()
.state(new ClusterStateRequest())
.actionGet();

waitUntil(() -> {
ClusterStateResponse stateResponseD = internalCluster().dataNodeClient()
.admin()
.cluster()
.state(new ClusterStateRequest())
.actionGet();
return stateResponseD.getState().stateUUID().equals(stateResponseM.getState().stateUUID());
});
// cluster state response time with term check enabled on datanode
isTermVersionCheckEnabled.set(true);
{
List<Long> latencies = new ArrayList<>();
IntStream.range(0, 50).forEachOrdered(n1 -> {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
long start = System.currentTimeMillis();
ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).actionGet();
latencies.add(System.currentTimeMillis() - start);
assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName()));
assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length));
assertThat(stateResponse.getState().metadata().indices().size(), is(indices.size()));
Map<String, Object> fieldMappings = (Map<String, Object>) stateResponse.getState()
.metadata()
.index(indices.get(0))
.mapping()
.sourceAsMap()
.get("properties");

assertThat(fieldMappings.size(), is(10000));
});
Collections.sort(latencies);

logger.info("cluster().state() fetch with Term Version enabled took {} milliseconds", (latencies.get(latencies.size() / 2)));
}
// cluster state response time with term check disabled on datanode
isTermVersionCheckEnabled.set(false);
{
List<Long> latencies = new ArrayList<>();
IntStream.range(0, 50).forEachOrdered(n1 -> {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
long start = System.currentTimeMillis();
ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).actionGet();
latencies.add(System.currentTimeMillis() - start);
assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName()));
assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length));
assertThat(stateResponse.getState().metadata().indices().size(), is(indices.size()));
Map<String, Object> typeProperties = (Map<String, Object>) stateResponse.getState()
.metadata()
.index(indices.get(0))
.mapping()
.sourceAsMap()
.get("properties");
assertThat(typeProperties.size(), is(10000));

});
Collections.sort(latencies);
logger.info("cluster().state() fetch with Term Version disabled took {} milliseconds", (latencies.get(latencies.size() / 2)));
}

}

private void stubClusterTermResponse(String master) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
if (isTermVersionCheckEnabled.get()) {
handler.messageReceived(request, channel, task);
} else {
// always return response that does not match
channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.TransportClusterStateAction;
import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction;
import org.opensearch.action.admin.cluster.state.term.TransportGetTermVersionAction;
import org.opensearch.action.admin.cluster.stats.ClusterStatsAction;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
Expand Down Expand Up @@ -614,6 +616,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
actions.register(GetTermVersionAction.INSTANCE, TransportGetTermVersionAction.class);
actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ protected void clusterManagerOperation(
? clusterState -> true
: clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion();

// action will be executed on local node, if either the request is local only (or) the local node has the same cluster-state as
// ClusterManager
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
? acceptableClusterStatePredicate
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedClusterManager() == false);
|| !state.nodes().isLocalNodeElectedClusterManager()
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
? acceptableClusterStatePredicate
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedClusterManager() == false);

if (acceptableClusterStatePredicate.test(state)) {
ActionListener.completeWith(listener, () -> buildResponse(request, state));
Expand Down Expand Up @@ -231,4 +234,8 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi
return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
}

@Override
protected boolean localExecuteSupportedByAction() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.action.ActionType;

/**
* Transport action for fetching cluster term and version
*
* @opensearch.internal
*/
public class GetTermVersionAction extends ActionType<GetTermVersionResponse> {

public static final GetTermVersionAction INSTANCE = new GetTermVersionAction();
public static final String NAME = "cluster:monitor/term";
rajiv-kv marked this conversation as resolved.
Show resolved Hide resolved

private GetTermVersionAction() {
super(NAME, GetTermVersionResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.core.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Request object to get cluster term and version
*
rajiv-kv marked this conversation as resolved.
Show resolved Hide resolved
* @opensearch.internal
*/
public class GetTermVersionRequest extends ClusterManagerNodeReadRequest<GetTermVersionRequest> {

public GetTermVersionRequest() {}

public GetTermVersionRequest(StreamInput in) throws IOException {
rajiv-kv marked this conversation as resolved.
Show resolved Hide resolved
super(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.state.term;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Response object of cluster term
*
* @opensearch.internal
*/
public class GetTermVersionResponse extends ActionResponse {

private final ClusterStateTermVersion clusterStateTermVersion;

public GetTermVersionResponse(ClusterStateTermVersion clusterStateTermVersion) {
this.clusterStateTermVersion = clusterStateTermVersion;
}

public GetTermVersionResponse(StreamInput in) throws IOException {
super(in);
this.clusterStateTermVersion = new ClusterStateTermVersion(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateTermVersion.writeTo(out);
}

public ClusterStateTermVersion getClusterStateTermVersion() {
return clusterStateTermVersion;
}

public boolean matches(ClusterState clusterState) {
return clusterStateTermVersion != null && clusterStateTermVersion.equals(new ClusterStateTermVersion(clusterState));
}

}
Loading
Loading