Skip to content

Commit

Permalink
Handle backward compatibility.
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Sep 25, 2024
1 parent b094110 commit 08bbf14
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ public void testClusterStatsMetricFiltering() {
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.FS.metricName(), Metric.JVM.metricName(), Metric.PLUGINS.metricName(), Metric.OS.metricName()))
.applyMetricFiltering(true)
.get();
assertNotNull(response.getNodesStats());
assertNotNull(response.getNodesStats().getJvm());
Expand All @@ -531,6 +532,8 @@ public void testClusterStatsMetricFiltering() {
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.INDICES.metricName()))
.indexMetrics(IndexMetrics.allIndicesMetrics())
.applyMetricFiltering(true)
.get();
assertNotNull(response.getIndicesStats());
assertNotNull(response.getIndicesStats().getMappings());
Expand Down Expand Up @@ -565,6 +568,7 @@ public void testClusterStatsMetricFiltering() {
IndexMetrics.ANALYSIS.metricName()
)
)
.applyMetricFiltering(true)
.get();
assertNotNull(response.getIndicesStats());
assertNotNull(response.getIndicesStats().getShards());
Expand All @@ -583,6 +587,7 @@ public void testClusterStatsMetricFiltering() {
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.OS.metricName(), Metric.PROCESS.metricName(), Metric.INDICES.metricName()))
.indexMetrics(Set.of(IndexMetrics.SHARDS.metricName(), IndexMetrics.MAPPINGS.metricName()))
.applyMetricFiltering(true)
.get();
assertNotNull(response.getNodesStats());
assertNotNull(response.getNodesStats().getOs());
Expand All @@ -602,6 +607,7 @@ public void testClusterStatsMetricFiltering() {
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of("random_metric"))
.applyMetricFiltering(true)
.get()
);

Expand All @@ -613,6 +619,7 @@ public void testClusterStatsMetricFiltering() {
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Metric.allMetrics())
.indexMetrics(Set.of("random_metric"))
.applyMetricFiltering(true)
.get()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,52 +90,60 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final PackagingTypes packagingTypes;
private final IngestStats ingestStats;

public static final Set<String> NODE_STATS_METRICS = Set.of(
Metric.OS.metricName(),
Metric.PROCESS.metricName(),
Metric.JVM.metricName(),
Metric.FS.metricName(),
Metric.PLUGINS.metricName(),
Metric.INGEST.metricName(),
Metric.NETWORK_TYPES.metricName(),
Metric.DISCOVERY_TYPES.metricName(),
Metric.PACKAGING_TYPES.metricName()
);

ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this(Metric.allMetrics(), nodeResponses);
}

ClusterStatsNodes(Set<String> requestedMetrics, List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
boolean isFSInfoRequested = ClusterStatsRequest.Metric.FS.containedIn(requestedMetrics);
boolean isPluginsInfoRequested = ClusterStatsRequest.Metric.PLUGINS.containedIn(requestedMetrics);
this.fs = isFSInfoRequested ? new FsInfo.Path() : null;
this.plugins = isPluginsInfoRequested ? new HashSet<>() : null;
this.fs = ClusterStatsRequest.Metric.FS.containedIn(requestedMetrics) ? new FsInfo.Path() : null;
this.plugins = ClusterStatsRequest.Metric.PLUGINS.containedIn(requestedMetrics) ? new HashSet<>() : null;

Set<InetAddress> seenAddresses = new HashSet<>(nodeResponses.size());
List<NodeInfo> nodeInfos = new ArrayList<>(nodeResponses.size());
List<NodeStats> nodesStats = new ArrayList<>(nodeResponses.size());
List<NodeStats> nodeStats = new ArrayList<>(nodeResponses.size());
for (ClusterStatsNodeResponse nodeResponse : nodeResponses) {
NodeInfo nodeInfo = nodeResponse.nodeInfo();
NodeStats nodeStats = nodeResponse.nodeStats();
nodeInfos.add(nodeInfo);
nodesStats.add(nodeStats);
this.versions.add(nodeInfo.getVersion());
if (isPluginsInfoRequested) {
this.plugins.addAll(nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos());
nodeInfos.add(nodeResponse.nodeInfo());
nodeStats.add(nodeResponse.nodeStats());
this.versions.add(nodeResponse.nodeInfo().getVersion());
if (this.plugins != null) {
this.plugins.addAll(nodeResponse.nodeInfo().getInfo(PluginsAndModules.class).getPluginInfos());
}

// now do the stats that should be deduped by hardware (implemented by ip deduping)
TransportAddress publishAddress = nodeInfo.getInfo(TransportInfo.class).address().publishAddress();
TransportAddress publishAddress = nodeResponse.nodeInfo().getInfo(TransportInfo.class).address().publishAddress();
final InetAddress inetAddress = publishAddress.address().getAddress();
if (!seenAddresses.add(inetAddress)) {
continue;
}
if (isFSInfoRequested && nodeStats.getFs() != null) {
this.fs.add(nodeStats.getFs().getTotal());
if (this.fs != null && nodeResponse.nodeStats().getFs() != null) {
this.fs.add(nodeResponse.nodeStats().getFs().getTotal());
}
}
this.counts = new Counts(nodeInfos);
this.os = ClusterStatsRequest.Metric.OS.containedIn(requestedMetrics) ? new OsStats(nodeInfos, nodesStats) : null;
this.process = ClusterStatsRequest.Metric.PROCESS.containedIn(requestedMetrics) ? new ProcessStats(nodesStats) : null;
this.jvm = ClusterStatsRequest.Metric.JVM.containedIn(requestedMetrics) ? new JvmStats(nodeInfos, nodesStats) : null;
this.os = ClusterStatsRequest.Metric.OS.containedIn(requestedMetrics) ? new OsStats(nodeInfos, nodeStats) : null;
this.process = ClusterStatsRequest.Metric.PROCESS.containedIn(requestedMetrics) ? new ProcessStats(nodeStats) : null;
this.jvm = ClusterStatsRequest.Metric.JVM.containedIn(requestedMetrics) ? new JvmStats(nodeInfos, nodeStats) : null;
this.networkTypes = ClusterStatsRequest.Metric.NETWORK_TYPES.containedIn(requestedMetrics) ? new NetworkTypes(nodeInfos) : null;
this.discoveryTypes = ClusterStatsRequest.Metric.DISCOVERY_TYPES.containedIn(requestedMetrics)
? new DiscoveryTypes(nodeInfos)
: null;
this.packagingTypes = ClusterStatsRequest.Metric.PACKAGING_TYPES.containedIn(requestedMetrics)
? new PackagingTypes(nodeInfos)
: null;
this.ingestStats = ClusterStatsRequest.Metric.INGEST.containedIn(requestedMetrics) ? new IngestStats(nodesStats) : null;
this.ingestStats = ClusterStatsRequest.Metric.INGEST.containedIn(requestedMetrics) ? new IngestStats(nodeStats) : null;
}

public Counts getCounts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@
@PublicApi(since = "1.0.0")
public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {

private final Set<String> requestedMetrics = new HashSet<>(Metric.allMetrics());
private final Set<String> indicesMetrics = new HashSet<>(IndexMetrics.allIndicesMetrics());
private final Set<String> requestedMetrics = new HashSet<>();
private final Set<String> indexMetricsRequested = new HashSet<>();
private Boolean applyMetricFiltering = false;

public ClusterStatsRequest(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
useAggregatedNodeLevelResponses = in.readOptionalBoolean();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
requestedMetrics.clear();
indicesMetrics.clear();
applyMetricFiltering = in.readOptionalBoolean();
requestedMetrics.addAll(in.readStringList());
indicesMetrics.addAll(in.readStringList());
indexMetricsRequested.addAll(in.readStringList());
}
}

Expand All @@ -86,6 +86,14 @@ public void useAggregatedNodeLevelResponses(boolean useAggregatedNodeLevelRespon
this.useAggregatedNodeLevelResponses = useAggregatedNodeLevelResponses;
}

public boolean applyMetricFiltering() {
return applyMetricFiltering;
}

public void applyMetricFiltering(boolean honourMetricFiltering) {
this.applyMetricFiltering = honourMetricFiltering;
}

/**
* Add Metric
*/
Expand All @@ -111,20 +119,20 @@ public ClusterStatsRequest addIndexMetric(String indexMetric) {
if (IndexMetrics.allIndicesMetrics().contains(indexMetric) == false) {
throw new IllegalStateException("Used an illegal index metric: " + indexMetric);
}
indicesMetrics.add(indexMetric);
indexMetricsRequested.add(indexMetric);
return this;
}

public Set<String> indicesMetrics() {
return new HashSet<>(indicesMetrics);
return new HashSet<>(indexMetricsRequested);
}

public void clearRequestedMetrics() {
requestedMetrics.clear();
}

public void clearIndicesMetrics() {
indicesMetrics.clear();
indexMetricsRequested.clear();
}

@Override
Expand All @@ -134,8 +142,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBoolean(useAggregatedNodeLevelResponses);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(applyMetricFiltering);
out.writeStringArray(requestedMetrics.toArray(new String[0]));
out.writeStringArray(indicesMetrics.toArray(new String[0]));
out.writeStringArray(indexMetricsRequested.toArray(new String[0]));
}
}

Expand Down Expand Up @@ -180,6 +189,7 @@ public boolean containedIn(Set<String> metricNames) {
* An enumeration of the "core" sections of indices metrics that may be requested
* from the cluster stats endpoint.
*/
@PublicApi(since = "3.0.0")
public enum IndexMetrics {
SHARDS("shards"),
DOCS("docs"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@ public final ClusterStatsRequestBuilder useAggregatedNodeLevelResponses(boolean
return this;
}

public final ClusterStatsRequestBuilder applyMetricFiltering(boolean applyMetricFiltering) {
request.applyMetricFiltering(applyMetricFiltering);
return this;
}

public final ClusterStatsRequestBuilder requestMetrics(Set<String> requestMetrics) {
request.clearRequestedMetrics();
requestMetrics.forEach(request::addMetric);
return this;
}

public final ClusterStatsRequestBuilder indexMetrics(Set<String> indexMetrics) {
request.clearIndicesMetrics();
indexMetrics.forEach(request::addIndexMetric);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public ClusterStatsResponse(
super(clusterName, nodes, failures);
this.clusterUUID = clusterUUID;
this.timestamp = timestamp;
nodesStats = !requestedMetrics.isEmpty() && !requestedMetrics.equals(Set.of(Metric.INDICES.metricName()))
nodesStats = requestedMetrics.stream().anyMatch(ClusterStatsNodes.NODE_STATS_METRICS::contains)
? new ClusterStatsNodes(requestedMetrics, nodes)
: null;
MappingStats mappingStats = ClusterStatsRequest.IndexMetrics.MAPPINGS.containedIn(indicesMetrics) ? MappingStats.of(state) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -78,13 +79,20 @@ public class TransportClusterStatsAction extends TransportNodesAction<
TransportClusterStatsAction.ClusterStatsNodeRequest,
ClusterStatsNodeResponse> {

private static final CommonStatsFlags SHARD_STATS_FLAGS = new CommonStatsFlags(
private static final Map<String, CommonStatsFlags.Flag> INDEX_METRIC_TO_SHARDS_STATS_FLAG_MAP = Map.of(
ClusterStatsRequest.IndexMetrics.DOCS.metricName(),
CommonStatsFlags.Flag.Docs,
ClusterStatsRequest.IndexMetrics.STORE.metricName(),
CommonStatsFlags.Flag.Store,
ClusterStatsRequest.IndexMetrics.FIELDDATA.metricName(),
CommonStatsFlags.Flag.FieldData,
ClusterStatsRequest.IndexMetrics.QUERY_CACHE.metricName(),
CommonStatsFlags.Flag.QueryCache,
ClusterStatsRequest.IndexMetrics.COMPLETION.metricName(),
CommonStatsFlags.Flag.Completion,
ClusterStatsRequest.IndexMetrics.SEGMENTS.metricName(),
CommonStatsFlags.Flag.Segments

);

private final NodeService nodeService;
Expand Down Expand Up @@ -126,16 +134,27 @@ protected ClusterStatsResponse newResponse(
+ " the cluster state that are too slow for a transport thread"
);
ClusterState state = clusterService.state();
return new ClusterStatsResponse(
System.currentTimeMillis(),
state.metadata().clusterUUID(),
clusterService.getClusterName(),
responses,
failures,
state,
request.requestedMetrics(),
request.indicesMetrics()
);
if (request.applyMetricFiltering()) {
return new ClusterStatsResponse(
System.currentTimeMillis(),
state.metadata().clusterUUID(),
clusterService.getClusterName(),
responses,
failures,
state,
request.requestedMetrics(),
request.indicesMetrics()
);
} else {
return new ClusterStatsResponse(
System.currentTimeMillis(),
state.metadata().clusterUUID(),
clusterService.getClusterName(),
responses,
failures,
state
);
}
}

@Override
Expand All @@ -151,20 +170,21 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce
@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false, false);
boolean applyMetricFiltering = nodeRequest.request.applyMetricFiltering();
Set<String> requestedMetrics = nodeRequest.request.requestedMetrics();
NodeStats nodeStats = nodeService.stats(
CommonStatsFlags.NONE,
Metric.OS.containedIn(requestedMetrics),
Metric.PROCESS.containedIn(requestedMetrics),
Metric.JVM.containedIn(requestedMetrics),
!applyMetricFiltering || Metric.OS.containedIn(requestedMetrics),
!applyMetricFiltering || Metric.PROCESS.containedIn(requestedMetrics),
!applyMetricFiltering || Metric.JVM.containedIn(requestedMetrics),
false,
Metric.FS.containedIn(requestedMetrics),
!applyMetricFiltering || Metric.FS.containedIn(requestedMetrics),
false,
false,
false,
false,
false,
Metric.INGEST.containedIn(requestedMetrics),
!applyMetricFiltering || Metric.INGEST.containedIn(requestedMetrics),
false,
false,
false,
Expand All @@ -182,7 +202,13 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false
);
List<ShardStats> shardsStats = new ArrayList<>();
if (Metric.INDICES.containedIn(requestedMetrics)) {
if (!applyMetricFiltering || Metric.INDICES.containedIn(requestedMetrics)) {
CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
for (String metric : nodeRequest.request.indicesMetrics()) {
if (INDEX_METRIC_TO_SHARDS_STATS_FLAG_MAP.containsKey(metric)) {
commonStatsFlags.set(INDEX_METRIC_TO_SHARDS_STATS_FLAG_MAP.get(metric), true);
}
}
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
Expand All @@ -204,7 +230,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, commonStatsFlags),
commitStats,
seqNoStats,
retentionLeaseStats
Expand Down
Loading

0 comments on commit 08bbf14

Please sign in to comment.