Skip to content

Commit

Permalink
Remove ccs_telemetry feature flag (elastic#113825)
Browse files Browse the repository at this point in the history
This removes `ccs_telemetry` feature flag, and instead introduces an
undocumented, true by default setting: - `search.ccs.collect_telemetry`
- enables CCS search telemetry collection and
`_cluster/stats?include_remote=true`. Can be disabled if this is causing
any problems.
  • Loading branch information
smalyshev authored Oct 9, 2024
1 parent 27faf59 commit 510a56b
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 55 deletions.
1 change: 0 additions & 1 deletion docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {

requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
requiresFeature 'es.failure_store_feature_flag_enabled', Version.fromString("8.12.0")
requiresFeature 'es.ccs_telemetry_feature_flag_enabled', Version.fromString("8.16.0")

// TODO Rene: clean up this kind of cross project file references
extraConfigFile 'op-jwks.json', project(':x-pack:test:idp-fixture').file("src/main/resources/oidc/op-jwks.json")
Expand Down
12 changes: 12 additions & 0 deletions docs/changelog/113825.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
pr: 113825
summary: Cross-cluster search telemetry
area: Search
type: feature
issues: []
highlight:
title: Cross-cluster search telemetry
body: |-
The cross-cluster search telemetry is collected when cross-cluster searches
are performed, and is returned as "ccs" field in `_cluster/stats` output.
It also add a new parameter `include_remotes=true` to the `_cluster/stats` API
which will collect data from connected remote clusters.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -23,7 +22,6 @@
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Assert;
import org.junit.BeforeClass;

import java.util.Collection;
import java.util.List;
Expand All @@ -46,7 +44,6 @@ public class ClusterStatsRemoteIT extends AbstractMultiClustersTestCase {
private static final String REMOTE2 = "cluster-b";

private static final String INDEX_NAME = "demo";
private static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry");

@Override
protected boolean reuseClusters() {
Expand All @@ -63,11 +60,6 @@ protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
return Map.of(REMOTE1, false, REMOTE2, true);
}

@BeforeClass
protected static void skipIfTelemetryDisabled() {
assumeTrue("Skipping test as CCS_TELEMETRY_FEATURE_FLAG is disabled", CCS_TELEMETRY_FEATURE_FLAG.isEnabled());
}

public void testRemoteClusterStats() throws ExecutionException, InterruptedException {
setupClusters();
final Client client = client(LOCAL_CLUSTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -40,7 +39,6 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.usage.UsageService;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
Expand Down Expand Up @@ -73,7 +71,6 @@ public class CCSUsageTelemetryIT extends AbstractMultiClustersTestCase {
private static final Logger LOGGER = LogManager.getLogger(CCSUsageTelemetryIT.class);
private static final String REMOTE1 = "cluster-a";
private static final String REMOTE2 = "cluster-b";
private static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry");

@Override
protected boolean reuseClusters() {
Expand All @@ -88,11 +85,6 @@ protected Collection<String> remoteClusterAlias() {
@Rule
public SkipUnavailableRule skipOverride = new SkipUnavailableRule(REMOTE1, REMOTE2);

@BeforeClass
protected static void skipIfTelemetryDisabled() {
assumeTrue("Skipping test as CCS_TELEMETRY_FEATURE_FLAG is disabled", CCS_TELEMETRY_FEATURE_FLAG.isEnabled());
}

@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
var map = skipOverride.getMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG;

public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResponse> implements ToXContentFragment {

final ClusterStatsNodes nodesStats;
Expand Down Expand Up @@ -145,14 +143,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("repositories");
repositoryUsageStats.toXContent(builder, params);

if (CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) {
builder.startObject("ccs");
if (remoteClustersStats != null) {
builder.field("clusters", remoteClustersStats);
}
ccsMetrics.toXContent(builder, params);
builder.endObject();
builder.startObject("ccs");
if (remoteClustersStats != null) {
builder.field("clusters", remoteClustersStats);
}
ccsMetrics.toXContent(builder, params);
builder.endObject();

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -431,8 +432,8 @@ public Map<String, RemoteClusterStats> getRemoteStats() {
}
}

private static boolean doRemotes(ClusterStatsRequest request) {
return request.doRemotes();
private boolean doRemotes(ClusterStatsRequest request) {
return SearchService.CCS_COLLECT_TELEMETRY.get(settings) && request.doRemotes();
}

private class RemoteStatsFanout extends CancellableFanOut<String, RemoteClusterStatsResponse, Map<String, RemoteClusterStats>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand Down Expand Up @@ -128,8 +128,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
public static final String FROZEN_INDICES_DEPRECATION_MESSAGE = "Searching frozen indices [{}] is deprecated."
+ " Consider cold or frozen tiers in place of frozen indices. The frozen feature will be removed in a feature release.";

public static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry");

/** The maximum number of shards for a single search request. */
public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting(
"action.search.shard_count.limit",
Expand Down Expand Up @@ -162,6 +160,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final SearchResponseMetrics searchResponseMetrics;
private final Client client;
private final UsageService usageService;
private final Settings settings;

@Inject
public TransportSearchAction(
Expand Down Expand Up @@ -194,8 +193,9 @@ public TransportSearchAction(
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.namedWriteableRegistry = namedWriteableRegistry;
this.executorSelector = executorSelector;
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(clusterService.getSettings());
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
this.settings = clusterService.getSettings();
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(settings);
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(settings);
this.searchResponseMetrics = searchResponseMetrics;
this.client = client;
this.usageService = usageService;
Expand Down Expand Up @@ -372,7 +372,7 @@ void executeRequest(
searchPhaseProvider.apply(delegate)
);
} else {
if ((listener instanceof TelemetryListener tl) && CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) {
if (listener instanceof TelemetryListener tl) {
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size());
if (task.isAsync()) {
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
Expand All @@ -398,7 +398,7 @@ void executeRequest(
}
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
if (shouldMinimizeRoundtrips(rewritten)) {
if ((listener instanceof TelemetryListener tl) && CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) {
if (listener instanceof TelemetryListener tl) {
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE);
}
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
Expand Down Expand Up @@ -1868,7 +1868,7 @@ private class SearchResponseActionListener implements ActionListener<SearchRespo
* Should we collect telemetry for this search?
*/
private boolean collectTelemetry() {
return CCS_TELEMETRY_FEATURE_FLAG.isEnabled() && usageBuilder.getRemotesCount() > 0;
return SearchService.CCS_COLLECT_TELEMETRY.get(settings) && usageBuilder.getRemotesCount() > 0;
}

public void setRemotes(int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_KEEPALIVE_SETTING,
SearchService.ALLOW_EXPENSIVE_QUERIES,
SearchService.CCS_VERSION_CHECK_SETTING,
SearchService.CCS_COLLECT_TELEMETRY,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
Expand All @@ -33,10 +31,9 @@ public class RestClusterStatsAction extends BaseRestHandler {

private static final Set<String> SUPPORTED_CAPABILITIES = Set.of(
"human-readable-total-docs-size",
"verbose-dense-vector-mapping-stats"
"verbose-dense-vector-mapping-stats",
"ccs-stats"
);
private static final Set<String> SUPPORTED_CAPABILITIES_CCS_STATS = Set.copyOf(Sets.union(SUPPORTED_CAPABILITIES, Set.of("ccs-stats")));
public static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry");
private static final Set<String> SUPPORTED_QUERY_PARAMETERS = Set.of("include_remotes", "nodeId", REST_TIMEOUT_PARAM);

@Override
Expand Down Expand Up @@ -73,6 +70,6 @@ public boolean canTripCircuitBreaker() {

@Override
public Set<String> supportedCapabilities() {
return CCS_TELEMETRY_FEATURE_FLAG.isEnabled() ? SUPPORTED_CAPABILITIES_CCS_STATS : SUPPORTED_CAPABILITIES;
return SUPPORTED_CAPABILITIES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<Boolean> CCS_COLLECT_TELEMETRY = Setting.boolSetting(
"search.ccs.collect_telemetry",
true,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -34,7 +33,6 @@
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.BeforeClass;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -55,12 +53,6 @@
public class CCSUsageTelemetryAsyncSearchIT extends AbstractMultiClustersTestCase {
private static final String REMOTE1 = "cluster-a";
private static final String REMOTE2 = "cluster-b";
private static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry");

@BeforeClass
protected static void skipIfTelemetryDisabled() {
assumeTrue("Skipping test as CCS_TELEMETRY_FEATURE_FLAG is disabled", CCS_TELEMETRY_FEATURE_FLAG.isEnabled());
}

@Override
protected boolean reuseClusters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG;
import static org.elasticsearch.common.xcontent.XContentHelper.stripWhitespace;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -847,10 +846,7 @@ public void testToXContent() throws IOException {
}
}
}""";
assertEquals(
stripWhitespace(Strings.format(expectedJson + (CCS_TELEMETRY_FEATURE_FLAG.isEnabled() ? ccsOutput : "") + suffixJson, args)),
xContent.utf8ToString()
);
assertEquals(stripWhitespace(Strings.format(expectedJson + ccsOutput + suffixJson, args)), xContent.utf8ToString());
}

private DiscoveryNode masterNode() {
Expand Down

0 comments on commit 510a56b

Please sign in to comment.