Skip to content

Commit

Permalink
Serialise metrics/index_metrics using bit mask.
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Sep 29, 2024
1 parent a8a1de5 commit d9744e7
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 236 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.IndexMetrics;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.IndexMetric;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.xcontent.ToXContentFragment;
Expand Down Expand Up @@ -71,52 +71,52 @@ public class ClusterStatsIndices implements ToXContentFragment {
private MappingStats mappings;

public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, MappingStats mappingStats, AnalysisStats analysisStats) {
this(IndexMetrics.allIndicesMetrics(), nodeResponses, mappingStats, analysisStats);
this(Set.of(IndexMetric.values()), nodeResponses, mappingStats, analysisStats);

}

public ClusterStatsIndices(
Set<String> indicesMetrics,
Set<IndexMetric> indicesMetrics,
List<ClusterStatsNodeResponse> nodeResponses,
MappingStats mappingStats,
AnalysisStats analysisStats
) {
Map<String, ShardStats> countsPerIndex = new HashMap<>();
Consumer<DocsStats> docsStatsConsumer = (docs) -> {
if (IndexMetrics.DOCS.containedIn(indicesMetrics)) {
if (indicesMetrics.contains(IndexMetric.DOCS)) {
if (this.docs == null) this.docs = new DocsStats();
this.docs.add(docs);
}
};
Consumer<StoreStats> storeStatsConsumer = (store) -> {
if (IndexMetrics.STORE.containedIn(indicesMetrics)) {
if (indicesMetrics.contains(IndexMetric.STORE)) {
if (this.store == null) this.store = new StoreStats();
this.store.add(store);
}
};
Consumer<FieldDataStats> fieldDataConsumer = (fieldDataStats) -> {
if (IndexMetrics.FIELDDATA.containedIn(indicesMetrics)) {
if (indicesMetrics.contains(IndexMetric.FIELDDATA)) {
if (this.fieldData == null) this.fieldData = new FieldDataStats();
this.fieldData.add(fieldDataStats);
}
};

Consumer<QueryCacheStats> queryCacheStatsConsumer = (queryCacheStats) -> {
if (IndexMetrics.QUERY_CACHE.containedIn(indicesMetrics)) {
if (indicesMetrics.contains(IndexMetric.QUERY_CACHE)) {
if (this.queryCache == null) this.queryCache = new QueryCacheStats();
this.queryCache.add(queryCacheStats);
}
};

Consumer<CompletionStats> completionStatsConsumer = (completionStats) -> {
if (IndexMetrics.COMPLETION.containedIn(indicesMetrics)) {
if (indicesMetrics.contains(IndexMetric.COMPLETION)) {
if (this.completion == null) this.completion = new CompletionStats();
this.completion.add(completionStats);
}
};

Consumer<SegmentsStats> segmentsStatsConsumer = (segmentsStats) -> {
if (IndexMetrics.SEGMENTS.containedIn(indicesMetrics)) {
if (indicesMetrics.contains(IndexMetric.SEGMENTS)) {
if (this.segments == null) this.segments = new SegmentsStats();
this.segments.add(segmentsStats);
}
Expand Down Expand Up @@ -170,7 +170,7 @@ public ClusterStatsIndices(
}

indexCount = countsPerIndex.size();
if (IndexMetrics.SHARDS.containedIn(indicesMetrics)) {
if (indicesMetrics.contains(IndexMetric.SHARDS)) {
shards = new ShardStats();
for (final ShardStats indexCountsCursor : countsPerIndex.values()) {
shards.addIndexShardCount(indexCountsCursor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,29 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final PackagingTypes packagingTypes;
private final IngestStats ingestStats;

public static final Set<String> NODE_STATS_METRICS = Set.of(
Metric.OS.metricName(),
Metric.PROCESS.metricName(),
Metric.JVM.metricName(),
Metric.FS.metricName(),
Metric.PLUGINS.metricName(),
Metric.INGEST.metricName(),
Metric.NETWORK_TYPES.metricName(),
Metric.DISCOVERY_TYPES.metricName(),
Metric.PACKAGING_TYPES.metricName()
public static final Set<Metric> NODE_STATS_METRICS = Set.of(
// Stats computed from node info and node stat
Metric.OS,
Metric.JVM,
// Stats computed from node stat
Metric.FS,
Metric.PROCESS,
Metric.INGEST,
// Stats computed from node info
Metric.PLUGINS,
Metric.NETWORK_TYPES,
Metric.DISCOVERY_TYPES,
Metric.PACKAGING_TYPES
);

ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this(Metric.allMetrics(), nodeResponses);
this(Set.of(Metric.values()), nodeResponses);
}

ClusterStatsNodes(Set<String> requestedMetrics, List<ClusterStatsNodeResponse> nodeResponses) {
ClusterStatsNodes(Set<Metric> requestedMetrics, List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
this.fs = ClusterStatsRequest.Metric.FS.containedIn(requestedMetrics) ? new FsInfo.Path() : null;
this.plugins = ClusterStatsRequest.Metric.PLUGINS.containedIn(requestedMetrics) ? new HashSet<>() : null;
this.fs = requestedMetrics.contains(ClusterStatsRequest.Metric.FS) ? new FsInfo.Path() : null;
this.plugins = requestedMetrics.contains(ClusterStatsRequest.Metric.PLUGINS) ? new HashSet<>() : null;

Set<InetAddress> seenAddresses = new HashSet<>(nodeResponses.size());
List<NodeInfo> nodeInfos = new ArrayList<>(nodeResponses.size());
Expand All @@ -132,18 +135,15 @@ public class ClusterStatsNodes implements ToXContentFragment {
this.fs.add(nodeResponse.nodeStats().getFs().getTotal());
}
}

this.counts = new Counts(nodeInfos);
this.os = ClusterStatsRequest.Metric.OS.containedIn(requestedMetrics) ? new OsStats(nodeInfos, nodeStats) : null;
this.process = ClusterStatsRequest.Metric.PROCESS.containedIn(requestedMetrics) ? new ProcessStats(nodeStats) : null;
this.jvm = ClusterStatsRequest.Metric.JVM.containedIn(requestedMetrics) ? new JvmStats(nodeInfos, nodeStats) : null;
this.networkTypes = ClusterStatsRequest.Metric.NETWORK_TYPES.containedIn(requestedMetrics) ? new NetworkTypes(nodeInfos) : null;
this.discoveryTypes = ClusterStatsRequest.Metric.DISCOVERY_TYPES.containedIn(requestedMetrics)
? new DiscoveryTypes(nodeInfos)
: null;
this.packagingTypes = ClusterStatsRequest.Metric.PACKAGING_TYPES.containedIn(requestedMetrics)
? new PackagingTypes(nodeInfos)
: null;
this.ingestStats = ClusterStatsRequest.Metric.INGEST.containedIn(requestedMetrics) ? new IngestStats(nodeStats) : null;
this.networkTypes = requestedMetrics.contains(ClusterStatsRequest.Metric.NETWORK_TYPES) ? new NetworkTypes(nodeInfos) : null;
this.discoveryTypes = requestedMetrics.contains(ClusterStatsRequest.Metric.DISCOVERY_TYPES) ? new DiscoveryTypes(nodeInfos) : null;
this.packagingTypes = requestedMetrics.contains(ClusterStatsRequest.Metric.PACKAGING_TYPES) ? new PackagingTypes(nodeInfos) : null;
this.ingestStats = requestedMetrics.contains(ClusterStatsRequest.Metric.INGEST) ? new IngestStats(nodeStats) : null;
this.process = requestedMetrics.contains(ClusterStatsRequest.Metric.PROCESS) ? new ProcessStats(nodeStats) : null;
this.os = requestedMetrics.contains(ClusterStatsRequest.Metric.OS) ? new OsStats(nodeInfos, nodeStats) : null;
this.jvm = requestedMetrics.contains(ClusterStatsRequest.Metric.JVM) ? new JvmStats(nodeInfos, nodeStats) : null;
}

public Counts getCounts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A request to get cluster level stats.
Expand All @@ -52,8 +50,8 @@
@PublicApi(since = "1.0.0")
public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {

private final Set<String> requestedMetrics = new HashSet<>();
private final Set<String> indexMetricsRequested = new HashSet<>();
private final Set<Metric> requestedMetrics = new HashSet<>();
private final Set<IndexMetric> indexMetricsRequested = new HashSet<>();
private Boolean applyMetricFiltering = false;

public ClusterStatsRequest(StreamInput in) throws IOException {
Expand All @@ -63,8 +61,18 @@ public ClusterStatsRequest(StreamInput in) throws IOException {
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
applyMetricFiltering = in.readOptionalBoolean();
requestedMetrics.addAll(in.readStringList());
indexMetricsRequested.addAll(in.readStringList());
final long longMetricsFlags = in.readLong();
for (Metric metric : Metric.values()) {
if ((longMetricsFlags & (1 << metric.getIndex())) != 0) {
requestedMetrics.add(metric);
}
}
final long longIndexMetricFlags = in.readLong();
for (IndexMetric indexMetric : IndexMetric.values()) {
if ((longIndexMetricFlags & (1 << indexMetric.getIndex())) != 0) {
indexMetricsRequested.add(indexMetric);
}
}
}
}

Expand Down Expand Up @@ -97,44 +105,30 @@ public void applyMetricFiltering(boolean honourMetricFiltering) {
/**
* Add Metric
*/
public ClusterStatsRequest addMetric(String metric) {
if (Metric.allMetrics().contains(metric) == false) {
throw new IllegalStateException("Used an illegal Metric: " + metric);
}
public ClusterStatsRequest addMetric(Metric metric) {
requestedMetrics.add(metric);
return this;
}

/**
* Get the names of requested metrics
*/
public Set<String> requestedMetrics() {
public Set<Metric> requestedMetrics() {
return new HashSet<>(requestedMetrics);
}

/**
* Add IndexMetric
*/
public ClusterStatsRequest addIndexMetric(String indexMetric) {
if (IndexMetrics.allIndicesMetrics().contains(indexMetric) == false) {
throw new IllegalStateException("Used an illegal index metric: " + indexMetric);
}
public ClusterStatsRequest addIndexMetric(IndexMetric indexMetric) {
indexMetricsRequested.add(indexMetric);
return this;
}

public Set<String> indicesMetrics() {
public Set<IndexMetric> indicesMetrics() {
return new HashSet<>(indexMetricsRequested);
}

public void clearRequestedMetrics() {
requestedMetrics.clear();
}

public void clearIndicesMetrics() {
indexMetricsRequested.clear();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -143,8 +137,16 @@ public void writeTo(StreamOutput out) throws IOException {
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(applyMetricFiltering);
out.writeStringArray(requestedMetrics.toArray(new String[0]));
out.writeStringArray(indexMetricsRequested.toArray(new String[0]));
long longMetricFlags = 0;
for (Metric metric : requestedMetrics) {
longMetricFlags |= (1 << metric.getIndex());
}
out.writeLong(longMetricFlags);
long longIndexMetricFlags = 0;
for (IndexMetric indexMetric : indexMetricsRequested) {
longIndexMetricFlags |= (1 << indexMetric.getIndex());
}
out.writeLong(longIndexMetricFlags);
}
}

Expand All @@ -154,33 +156,32 @@ public void writeTo(StreamOutput out) throws IOException {
*/
@PublicApi(since = "3.0.0")
public enum Metric {
OS("os"),
PROCESS("process"),
JVM("jvm"),
FS("fs"),
PLUGINS("plugins"),
INGEST("ingest"),
NETWORK_TYPES("network_types"),
DISCOVERY_TYPES("discovery_types"),
PACKAGING_TYPES("packaging_types"),
INDICES("indices");
OS("os", 0),
JVM("jvm", 1),
FS("fs", 2),
PROCESS("process", 3),
INGEST("ingest", 4),
PLUGINS("plugins", 5),
NETWORK_TYPES("network_types", 6),
DISCOVERY_TYPES("discovery_types", 7),
PACKAGING_TYPES("packaging_types", 8),
INDICES("indices", 9);

private String metricName;

Metric(String name) {
private int index;

Metric(String name, int index) {
this.metricName = name;
this.index = index;
}

public String metricName() {
return this.metricName;
}

public static Set<String> allMetrics() {
return Arrays.stream(values()).map(Metric::metricName).collect(Collectors.toSet());
}

public boolean containedIn(Set<String> metricNames) {
return metricNames.contains(this.metricName());
public int getIndex() {
return index;
}

}
Expand All @@ -190,33 +191,32 @@ public boolean containedIn(Set<String> metricNames) {
* from the cluster stats endpoint.
*/
@PublicApi(since = "3.0.0")
public enum IndexMetrics {
SHARDS("shards"),
DOCS("docs"),
STORE("store"),
FIELDDATA("fielddata"),
QUERY_CACHE("query_cache"),
COMPLETION("completion"),
SEGMENTS("segments"),
ANALYSIS("analysis"),
MAPPINGS("mappings");
public enum IndexMetric {
SHARDS("shards", 0),
DOCS("docs", 1),
STORE("store", 2),
FIELDDATA("fielddata", 3),
QUERY_CACHE("query_cache", 4),
COMPLETION("completion", 5),
SEGMENTS("segments", 6),
ANALYSIS("analysis", 7),
MAPPINGS("mappings", 8);

private String metricName;

IndexMetrics(String name) {
private int index;

IndexMetric(String name, int index) {
this.metricName = name;
this.index = index;
}

public String metricName() {
return this.metricName;
}

public boolean containedIn(Set<String> metricNames) {
return metricNames.contains(this.metricName());
}

public static Set<String> allIndicesMetrics() {
return Arrays.stream(values()).map(IndexMetrics::metricName).collect(Collectors.toSet());
public int getIndex() {
return this.index;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

import java.util.Set;

import static org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.IndexMetric;
import static org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric;

/**
* Transport request builder for obtaining cluster stats
*
Expand All @@ -63,12 +66,12 @@ public final ClusterStatsRequestBuilder applyMetricFiltering(boolean applyMetric
return this;
}

public final ClusterStatsRequestBuilder requestMetrics(Set<String> requestMetrics) {
public final ClusterStatsRequestBuilder requestMetrics(Set<Metric> requestMetrics) {
requestMetrics.forEach(request::addMetric);
return this;
}

public final ClusterStatsRequestBuilder indexMetrics(Set<String> indexMetrics) {
public final ClusterStatsRequestBuilder indexMetrics(Set<IndexMetric> indexMetrics) {
indexMetrics.forEach(request::addIndexMetric);
return this;
}
Expand Down
Loading

0 comments on commit d9744e7

Please sign in to comment.