From f95c505df6af60302ad533e43433920f35de6f42 Mon Sep 17 00:00:00 2001 From: Swetha Guptha Date: Mon, 16 Sep 2024 14:27:26 +0530 Subject: [PATCH] URI path filtering support in cluster stats API Signed-off-by: Swetha Guptha --- .../cluster/stats/ClusterStatsIndices.java | 114 +++++++++++++----- .../cluster/stats/ClusterStatsNodes.java | 105 ++++++++++------ .../cluster/stats/ClusterStatsRequest.java | 101 ++++++++++++++++ .../cluster/stats/ClusterStatsResponse.java | 55 +++++++-- .../stats/TransportClusterStatsAction.java | 3 +- .../admin/cluster/RestClusterStatsAction.java | 59 ++++++++- 6 files changed, 360 insertions(+), 77 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java index 03a73f45ffe81..e0d929e680393 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.cluster.stats; +import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.xcontent.ToXContentFragment; @@ -47,6 +48,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; /** * Cluster Stats per index @@ -68,14 +71,52 @@ public class ClusterStatsIndices implements ToXContentFragment { private MappingStats mappings; public ClusterStatsIndices(List nodeResponses, MappingStats mappingStats, AnalysisStats analysisStats) { + this(Metric.allIndicesMetrics(), nodeResponses, mappingStats, analysisStats); + + } + + public ClusterStatsIndices(Set requestedMetrics, List nodeResponses, MappingStats mappingStats, AnalysisStats analysisStats) { Map countsPerIndex = new HashMap<>(); - this.docs = new DocsStats(); - this.store = new StoreStats(); - this.fieldData = new FieldDataStats(); - this.queryCache = new QueryCacheStats(); - this.completion = new CompletionStats(); - this.segments = new SegmentsStats(); + Consumer docsStatsConsumer = (docs) -> { + if (Metric.DOCS.containedIn(requestedMetrics)) { + if (this.docs == null) this.docs = new DocsStats(); + this.docs.add(docs); + } + }; + Consumer storeStatsConsumer = (store) -> { + if (Metric.STORE.containedIn(requestedMetrics)) { + if (this.store == null) this.store = new StoreStats(); + this.store.add(store); + } + }; + Consumer fieldDataConsumer = (fieldDataStats) -> { + if (Metric.FIELD_DATA.containedIn(requestedMetrics)) { + if (this.fieldData == null) this.fieldData = new FieldDataStats(); + this.fieldData.add(fieldDataStats); + } + }; + + Consumer queryCacheStatsConsumer = (queryCacheStats) -> { + if (Metric.QUERY_CACHE.containedIn(requestedMetrics)) { + if (this.queryCache == null) this.queryCache = new QueryCacheStats(); + this.queryCache.add(queryCacheStats); + } + }; + + Consumer completionStatsConsumer = (completionStats) -> { + if (Metric.COMPLETION.containedIn(requestedMetrics)) { + if (this.completion == null) this.completion = new CompletionStats(); + this.completion.add(completionStats); + } + }; + + Consumer segmentsStatsConsumer = (segmentsStats) -> { + if (Metric.SEGMENTS.containedIn(requestedMetrics)) { + if (this.segments == null) this.segments = new SegmentsStats(); + this.segments.add(segmentsStats); + } + }; for (ClusterStatsNodeResponse r : nodeResponses) { // Aggregated response from the node @@ -92,12 +133,12 @@ public ClusterStatsIndices(List nodeResponses, Mapping } } - docs.add(r.getAggregatedNodeLevelStats().commonStats.docs); - store.add(r.getAggregatedNodeLevelStats().commonStats.store); - fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData); - queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache); - completion.add(r.getAggregatedNodeLevelStats().commonStats.completion); - segments.add(r.getAggregatedNodeLevelStats().commonStats.segments); + docsStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.docs); + storeStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.store); + fieldDataConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.fieldData); + queryCacheStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.queryCache); + completionStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.completion); + segmentsStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.segments); } else { // Default response from the node for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) { @@ -113,21 +154,24 @@ public ClusterStatsIndices(List nodeResponses, Mapping if (shardStats.getShardRouting().primary()) { indexShardStats.primaries++; - docs.add(shardCommonStats.docs); + docsStatsConsumer.accept(shardCommonStats.docs); } - store.add(shardCommonStats.store); - fieldData.add(shardCommonStats.fieldData); - queryCache.add(shardCommonStats.queryCache); - completion.add(shardCommonStats.completion); - segments.add(shardCommonStats.segments); + storeStatsConsumer.accept(shardCommonStats.store); + fieldDataConsumer.accept(shardCommonStats.fieldData); + queryCacheStatsConsumer.accept(shardCommonStats.queryCache); + completionStatsConsumer.accept(shardCommonStats.completion); + segmentsStatsConsumer.accept(shardCommonStats.segments); } } } - shards = new ShardStats(); + indexCount = countsPerIndex.size(); - for (final ShardStats indexCountsCursor : countsPerIndex.values()) { - shards.addIndexShardCount(indexCountsCursor); + if (Metric.SHARDS.containedIn(requestedMetrics)) { + shards = new ShardStats(); + for (final ShardStats indexCountsCursor : countsPerIndex.values()) { + shards.addIndexShardCount(indexCountsCursor); + } } this.mappings = mappingStats; @@ -186,13 +230,27 @@ static final class Fields { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.COUNT, indexCount); - shards.toXContent(builder, params); - docs.toXContent(builder, params); - store.toXContent(builder, params); - fieldData.toXContent(builder, params); - queryCache.toXContent(builder, params); - completion.toXContent(builder, params); - segments.toXContent(builder, params); + if (shards != null) { + shards.toXContent(builder, params); + } + if (docs != null) { + docs.toXContent(builder, params); + } + if (store != null) { + store.toXContent(builder, params); + } + if (fieldData != null) { + fieldData.toXContent(builder, params); + } + if (queryCache != null) { + queryCache.toXContent(builder, params); + } + if (completion != null) { + completion.toXContent(builder, params); + } + if (segments != null) { + segments.toXContent(builder, params); + } if (mappings != null) { mappings.toXContent(builder, params); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java index b44e9cfc5c74a..9856887db9d84 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -36,6 +36,7 @@ import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.common.annotation.PublicApi; @@ -90,37 +91,47 @@ public class ClusterStatsNodes implements ToXContentFragment { private final IngestStats ingestStats; ClusterStatsNodes(List nodeResponses) { + this(Metric.allNodesMetrics(), nodeResponses); + } + + ClusterStatsNodes(Set requestedMetrics, List nodeResponses) { this.versions = new HashSet<>(); - this.fs = new FsInfo.Path(); - this.plugins = new HashSet<>(); + boolean isFSInfoRequested = Metric.FS.containedIn(requestedMetrics); + boolean isPluginsInfoRequested = Metric.PLUGINS.containedIn(requestedMetrics); + this.fs = isFSInfoRequested ? new FsInfo.Path() : null; + this.plugins = isPluginsInfoRequested ? new HashSet<>() : null; Set seenAddresses = new HashSet<>(nodeResponses.size()); List nodeInfos = new ArrayList<>(nodeResponses.size()); - List nodeStats = new ArrayList<>(nodeResponses.size()); + List nodesStats = new ArrayList<>(nodeResponses.size()); for (ClusterStatsNodeResponse nodeResponse : nodeResponses) { - nodeInfos.add(nodeResponse.nodeInfo()); - nodeStats.add(nodeResponse.nodeStats()); - this.versions.add(nodeResponse.nodeInfo().getVersion()); - this.plugins.addAll(nodeResponse.nodeInfo().getInfo(PluginsAndModules.class).getPluginInfos()); + NodeInfo nodeInfo = nodeResponse.nodeInfo(); + NodeStats nodeStats = nodeResponse.nodeStats(); + nodeInfos.add(nodeInfo); + nodesStats.add(nodeStats); + this.versions.add(nodeInfo.getVersion()); + if (isPluginsInfoRequested) { + this.plugins.addAll(nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos()); + } // now do the stats that should be deduped by hardware (implemented by ip deduping) - TransportAddress publishAddress = nodeResponse.nodeInfo().getInfo(TransportInfo.class).address().publishAddress(); + TransportAddress publishAddress = nodeInfo.getInfo(TransportInfo.class).address().publishAddress(); final InetAddress inetAddress = publishAddress.address().getAddress(); if (!seenAddresses.add(inetAddress)) { continue; } - if (nodeResponse.nodeStats().getFs() != null) { - this.fs.add(nodeResponse.nodeStats().getFs().getTotal()); + if (nodeStats.getFs() != null) { + this.fs.add(nodeStats.getFs().getTotal()); } } this.counts = new Counts(nodeInfos); - this.os = new OsStats(nodeInfos, nodeStats); - this.process = new ProcessStats(nodeStats); - this.jvm = new JvmStats(nodeInfos, nodeStats); - this.networkTypes = new NetworkTypes(nodeInfos); - this.discoveryTypes = new DiscoveryTypes(nodeInfos); - this.packagingTypes = new PackagingTypes(nodeInfos); - this.ingestStats = new IngestStats(nodeStats); + this.os = Metric.OS.containedIn(requestedMetrics) ? new OsStats(nodeInfos, nodesStats) : null; + this.process = Metric.PROCESS.containedIn(requestedMetrics) ? new ProcessStats(nodesStats) : null; + this.jvm = Metric.JVM.containedIn(requestedMetrics) ? new JvmStats(nodeInfos, nodesStats) : null; + this.networkTypes = Metric.NETWORK_TYPES.containedIn(requestedMetrics) ? new NetworkTypes(nodeInfos) : null; + this.discoveryTypes = Metric.DISCOVERY_TYPES.containedIn(requestedMetrics) ? new DiscoveryTypes(nodeInfos) : null; + this.packagingTypes = Metric.PACKAGING_TYPES.containedIn(requestedMetrics) ? new PackagingTypes(nodeInfos) : null; + this.ingestStats = Metric.INGEST_STATS.containedIn(requestedMetrics) ? new IngestStats(nodesStats) : null; } public Counts getCounts() { @@ -179,36 +190,54 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.endArray(); - builder.startObject(Fields.OS); - os.toXContent(builder, params); - builder.endObject(); + if (os != null) { + builder.startObject(Fields.OS); + os.toXContent(builder, params); + builder.endObject(); + } - builder.startObject(Fields.PROCESS); - process.toXContent(builder, params); - builder.endObject(); + if (process != null) { + builder.startObject(Fields.PROCESS); + process.toXContent(builder, params); + builder.endObject(); + } - builder.startObject(Fields.JVM); - jvm.toXContent(builder, params); - builder.endObject(); + if (jvm != null) { + builder.startObject(Fields.JVM); + jvm.toXContent(builder, params); + builder.endObject(); + } - builder.field(Fields.FS); - fs.toXContent(builder, params); + if (fs != null) { + builder.field(Fields.FS); + fs.toXContent(builder, params); + } - builder.startArray(Fields.PLUGINS); - for (PluginInfo pluginInfo : plugins) { - pluginInfo.toXContent(builder, params); + if (plugins != null) { + builder.startArray(Fields.PLUGINS); + for (PluginInfo pluginInfo : plugins) { + pluginInfo.toXContent(builder, params); + } + builder.endArray(); } - builder.endArray(); - builder.startObject(Fields.NETWORK_TYPES); - networkTypes.toXContent(builder, params); - builder.endObject(); + if (networkTypes != null) { + builder.startObject(Fields.NETWORK_TYPES); + networkTypes.toXContent(builder, params); + builder.endObject(); + } - discoveryTypes.toXContent(builder, params); + if (discoveryTypes != null) { + discoveryTypes.toXContent(builder, params); + } - packagingTypes.toXContent(builder, params); + if (packagingTypes != null) { + packagingTypes.toXContent(builder, params); + } - ingestStats.toXContent(builder, params); + if (ingestStats != null) { + ingestStats.toXContent(builder, params); + } return builder; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsRequest.java index b82a9d256a134..e07ce89a2552e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsRequest.java @@ -39,6 +39,10 @@ import org.opensearch.core.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; /** * A request to get cluster level stats. @@ -48,11 +52,18 @@ @PublicApi(since = "1.0.0") public class ClusterStatsRequest extends BaseNodesRequest { + private final Set requestedMetrics = new HashSet<>(Metric.allMetrics()); + + public ClusterStatsRequest(StreamInput in) throws IOException { super(in); if (in.getVersion().onOrAfter(Version.V_2_16_0)) { useAggregatedNodeLevelResponses = in.readOptionalBoolean(); } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + requestedMetrics.clear(); + requestedMetrics.addAll(in.readStringList()); + } } private Boolean useAggregatedNodeLevelResponses = false; @@ -73,12 +84,102 @@ public void useAggregatedNodeLevelResponses(boolean useAggregatedNodeLevelRespon this.useAggregatedNodeLevelResponses = useAggregatedNodeLevelResponses; } + /** + * Get the names of requested metrics, excluding indices, which are + * handled separately. + */ + public Set requestedMetrics() { + return new HashSet<>(requestedMetrics); + } + + + /** + * Add metric + */ + public ClusterStatsRequest addMetric(String metric) { + if (ClusterStatsRequest.Metric.allMetrics().contains(metric) == false) { + throw new IllegalStateException("Used an illegal metric: " + metric); + } + requestedMetrics.add(metric); + return this; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); if (out.getVersion().onOrAfter(Version.V_2_16_0)) { out.writeOptionalBoolean(useAggregatedNodeLevelResponses); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeStringArray(requestedMetrics.toArray(new String[0])); + } } + + /** + * An enumeration of the "core" sections of metrics that may be requested + * from the cluster stats endpoint. Eventually this list will be pluggable. + */ + public enum Metric { + SHARDS("shards", "indices"), + DOCS("docs", "indices"), + STORE("store", "indices"), + FIELD_DATA("field_data", "indices"), + QUERY_CACHE("query_cache", "indices"), + COMPLETION("completion", "indices"), + SEGMENTS("segments", "indices"), + ANALYSIS("analysis", "indices"), + MAPPINGS("mappings", "indices"), + + OS("os", "nodes"), + PROCESS("process", "nodes"), + JVM("jvm", "nodes"), + FS("fs", "nodes"), + PLUGINS("plugins", "nodes"), + INGEST_STATS("ingest_stats", "nodes"), + NETWORK_TYPES("network_types", "nodes"), + DISCOVERY_TYPES("discovery_types", "nodes"), + PACKAGING_TYPES("packaging_types", "nodes"); + + + private String metricName; + + private String metricType; + + Metric(String name, String type) { + this.metricName = name; + this.metricType = type; + } + + public String metricName() { + return this.metricName; + } + public String metricType() { + return this.metricType; + } + + boolean containedIn(Set metricNames) { + return metricNames.contains(this.metricName()); + } + + static Set allMetrics() { + return Arrays.stream(values()).map(ClusterStatsRequest.Metric::metricName).collect(Collectors.toSet()); + } + + static Set allIndicesMetrics() { + return Arrays.stream(values()).filter(metric -> "indices".equals(metric.metricType())).map(ClusterStatsRequest.Metric::metricName).collect(Collectors.toSet()); + } + + static Set allNodesMetrics() { + return Arrays.stream(values()).filter(metric -> "nodes".equals(metric.metricType())).map(ClusterStatsRequest.Metric::metricName).collect(Collectors.toSet()); + } + + static boolean containsIndexMetric(Set metricNames) { + return allIndicesMetrics().stream().anyMatch(metricNames::contains); + } + + static boolean containsNodeMetric(Set metricNames) { + return allNodesMetrics().stream().anyMatch(metricNames::contains); + } + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsResponse.java index cc002b689a2a5..acc3d16d0ef54 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -33,6 +33,7 @@ package org.opensearch.action.admin.cluster.stats; import org.opensearch.action.FailedNodeException; +import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric; import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -47,6 +48,7 @@ import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.Set; /** * Transport response for obtaining cluster stats @@ -105,6 +107,34 @@ public ClusterStatsResponse( this.status = status; } + public ClusterStatsResponse( + long timestamp, + String clusterUUID, + ClusterName clusterName, + List nodes, + List failures, + ClusterState state, + ClusterStatsRequest clusterStatsRequest + ) { + super(clusterName, nodes, failures); + this.clusterUUID = clusterUUID; + this.timestamp = timestamp; + Set requestedMetrics = clusterStatsRequest.requestedMetrics(); + MappingStats mappingStats = Metric.MAPPINGS.containedIn(requestedMetrics) ? MappingStats.of(state) : null; + AnalysisStats analysisStats = Metric.ANALYSIS.containedIn(requestedMetrics) ? AnalysisStats.of(state) : null; + nodesStats = Metric.containsNodeMetric(requestedMetrics) ? new ClusterStatsNodes(requestedMetrics, nodes) : null; + indicesStats = Metric.containsIndexMetric(requestedMetrics) ? new ClusterStatsIndices(requestedMetrics, nodes, mappingStats, analysisStats) : null; + ClusterHealthStatus status = null; + for (ClusterStatsNodeResponse response : nodes) { + // only the cluster-manager node populates the status + if (response.clusterStatus() != null) { + status = response.clusterStatus(); + break; + } + } + this.status = status; + } + public String getClusterUUID() { return this.clusterUUID; } @@ -131,8 +161,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(timestamp); out.writeOptionalWriteable(status); out.writeOptionalString(clusterUUID); - out.writeOptionalWriteable(indicesStats.getMappings()); - out.writeOptionalWriteable(indicesStats.getAnalysis()); + if (indicesStats != null) { + out.writeOptionalWriteable(indicesStats.getMappings()); + out.writeOptionalWriteable(indicesStats.getAnalysis()); + } else { + out.writeOptionalWriteable(null); + out.writeOptionalWriteable(null); + } } @Override @@ -153,12 +188,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (status != null) { builder.field("status", status.name().toLowerCase(Locale.ROOT)); } - builder.startObject("indices"); - indicesStats.toXContent(builder, params); - builder.endObject(); - builder.startObject("nodes"); - nodesStats.toXContent(builder, params); - builder.endObject(); + if (indicesStats != null) { + builder.startObject("indices"); + indicesStats.toXContent(builder, params); + builder.endObject(); + } + if (nodesStats != null) { + builder.startObject("nodes"); + nodesStats.toXContent(builder, params); + builder.endObject(); + } return builder; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index a49ca2035783c..25a0aa1568bae 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -130,7 +130,8 @@ protected ClusterStatsResponse newResponse( clusterService.getClusterName(), responses, failures, - state + state, + request ); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java index ee33bd18db05d..1f46f1da5bd5f 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -34,12 +34,20 @@ import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest; import org.opensearch.client.node.NodeClient; +import org.opensearch.core.common.Strings; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestActions.NodesResponseRestListener; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.function.Consumer; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; @@ -54,7 +62,24 @@ public class RestClusterStatsAction extends BaseRestHandler { @Override public List routes() { - return unmodifiableList(asList(new Route(GET, "/_cluster/stats"), new Route(GET, "/_cluster/stats/nodes/{nodeId}"))); + return unmodifiableList( + asList( + new Route(GET, "/_cluster/stats"), + new Route(GET, "/_cluster/stats/{metric}"), + new Route(GET, "/_cluster/stats/nodes/{nodeId}"), + new Route(GET, "/_cluster/stats/nodes/{nodeId}/{metric}") + ) + ); + } + + static final Map> METRICS; + + static { + Map> map = new HashMap<>(); + for (ClusterStatsRequest.Metric metric : ClusterStatsRequest.Metric.values()) { + map.put(metric.metricName(), request -> request.addMetric(metric.metricName())); + } + METRICS = Collections.unmodifiableMap(map); } @Override @@ -64,9 +89,39 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); + String[] nodeIds = request.paramAsStringArray("nodeId", null); + Set metrics = Strings.tokenizeByCommaToSet(request.param("metric", "_all")); + + ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(nodeIds); clusterStatsRequest.timeout(request.param("timeout")); clusterStatsRequest.useAggregatedNodeLevelResponses(true); + if (metrics.size() > 1 & metrics.contains("_all")) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "request [%s] contains _all and individual metrics [%s]", + request.path(), + request.param("metric") + ) + ); + } else if (!metrics.contains("_all")) { + clusterStatsRequest.requestedMetrics().clear(); + final Set invalidMetrics = new TreeSet<>(); + for (String metric : metrics) { + Consumer clusterStatsRequestConsumer = METRICS.get(metric); + if (clusterStatsRequestConsumer != null) { + clusterStatsRequestConsumer.accept(clusterStatsRequest); + } else { + invalidMetrics.add(metric); + } + } + + if (!invalidMetrics.isEmpty()) { + throw new IllegalArgumentException(unrecognized(request, invalidMetrics, METRICS.keySet(), "metric")); + } + + } + return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel)); }