Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Optimized ClusterStatsIndices to precomute shard stats (#14426) #14910

Merged
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))
- Optimize Cluster Stats Indices to precomute node level stats ([#14426](https://github.com/opensearch-project/OpenSearch/pull/14426))

### Dependencies
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))
Original file line number Diff line number Diff line change
@@ -88,7 +88,11 @@ public void testNodeCounts() {
Map<String, Integer> expectedCounts = getExpectedCounts(1, 1, 1, 1, 1, 0, 0);
int numNodes = randomIntBetween(1, 5);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);

for (int i = 0; i < numNodes; i++) {
@@ -153,7 +157,11 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 0);

Client client = client();
ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(DiscoveryNodeRole.MASTER_ROLE.roleName());
@@ -176,15 +184,60 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices,
assertThat(stats.getReplication(), Matchers.equalTo(replicationFactor));
}

public void testIndicesShardStats() throws ExecutionException, InterruptedException {
public void testIndicesShardStatsWithoutNodeLevelAggregations() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));

prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();

response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(0L));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(1));
assertShardStats(response.getIndicesStats().getShards(), 1, 2, 2, 0.0);

// add another node, replicas should get assigned
internalCluster().startNode();
ensureGreen();
index("test1", "type", "1", "f", "f");
refresh(); // make the doc visible
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);

prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
ensureGreen();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);

assertThat(response.getIndicesStats().getShards().getAvgIndexPrimaryShards(), Matchers.equalTo(2.5));
assertThat(response.getIndicesStats().getShards().getMinIndexPrimaryShards(), Matchers.equalTo(2));
assertThat(response.getIndicesStats().getShards().getMaxIndexPrimaryShards(), Matchers.equalTo(3));

assertThat(response.getIndicesStats().getShards().getAvgIndexShards(), Matchers.equalTo(3.5));
assertThat(response.getIndicesStats().getShards().getMinIndexShards(), Matchers.equalTo(3));
assertThat(response.getIndicesStats().getShards().getMaxIndexShards(), Matchers.equalTo(4));

assertThat(response.getIndicesStats().getShards().getAvgIndexReplication(), Matchers.equalTo(0.5));
assertThat(response.getIndicesStats().getShards().getMinIndexReplication(), Matchers.equalTo(0.0));
assertThat(response.getIndicesStats().getShards().getMaxIndexReplication(), Matchers.equalTo(1.0));

}

public void testIndicesShardStatsWithNodeLevelAggregations() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));

prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();

response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(0L));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(1));
@@ -195,14 +248,14 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept
ensureGreen();
index("test1", "type", "1", "f", "f");
refresh(); // make the doc visible
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);

prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
ensureGreen();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);
@@ -225,7 +278,11 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
internalCluster().startNodes(randomIntBetween(1, 3));
index("test1", "type", "1", "f", "f");

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
String msg = response.toString();
assertThat(msg, response.getTimestamp(), Matchers.greaterThan(946681200000L)); // 1 Jan 2000
assertThat(msg, response.indicesStats.getStore().getSizeInBytes(), Matchers.greaterThan(0L));
@@ -265,13 +322,21 @@ public void testAllocatedProcessors() throws Exception {
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
waitForNodes(1);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
}

public void testClusterStatusWhenStateNotRecovered() throws Exception {
internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));

if (randomBoolean()) {
@@ -281,14 +346,18 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception {
}
// wait for the cluster status to settle
ensureGreen();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(randomBoolean()).get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
}

public void testFieldTypes() {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertTrue(response.getIndicesStats().getMappings().getFieldTypeStats().isEmpty());

@@ -301,7 +370,7 @@ public void testFieldTypes() {
+ "\"eggplant\":{\"type\":\"integer\"}}}}}"
)
.get();
response = client().admin().cluster().prepareClusterStats().get();
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(randomBoolean()).get();
assertThat(response.getIndicesStats().getMappings().getFieldTypeStats().size(), equalTo(3));
Set<IndexFeatureStats> stats = response.getIndicesStats().getMappings().getFieldTypeStats();
for (IndexFeatureStats stat : stats) {
@@ -329,7 +398,11 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(
@@ -359,7 +432,11 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);

Set<String> expectedRoles = Set.of(
@@ -383,7 +460,11 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);

Set<String> expectedRoles = Set.of(
@@ -410,7 +491,11 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);

Client client = client();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
ClusterStatsResponse clusterStatsResponse = client.admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);

Set<Set<String>> expectedNodesRoles = Set.of(
Original file line number Diff line number Diff line change
@@ -78,26 +78,49 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
this.segments = new SegmentsStats();

for (ClusterStatsNodeResponse r : nodeResponses) {
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
// Aggregated response from the node
if (r.getAggregatedNodeLevelStats() != null) {

for (Map.Entry<String, ClusterStatsNodeResponse.AggregatedIndexStats> entry : r.getAggregatedNodeLevelStats().indexStatsMap
.entrySet()) {
ShardStats indexShardStats = countsPerIndex.get(entry.getKey());
if (indexShardStats == null) {
indexShardStats = new ShardStats(entry.getValue());
countsPerIndex.put(entry.getKey(), indexShardStats);
} else {
indexShardStats.addStatsFrom(entry.getValue());
}
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
docs.add(r.getAggregatedNodeLevelStats().commonStats.docs);
store.add(r.getAggregatedNodeLevelStats().commonStats.store);
fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData);
queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache);
completion.add(r.getAggregatedNodeLevelStats().commonStats.completion);
segments.add(r.getAggregatedNodeLevelStats().commonStats.segments);
} else {
// Default response from the node
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
}

@@ -202,6 +225,11 @@ public static class ShardStats implements ToXContentFragment {

public ShardStats() {}

public ShardStats(ClusterStatsNodeResponse.AggregatedIndexStats aggregatedIndexStats) {
this.total = aggregatedIndexStats.total;
this.primaries = aggregatedIndexStats.primaries;
}

/**
* number of indices in the cluster
*/
@@ -329,6 +357,11 @@ public void addIndexShardCount(ShardStats indexShardCount) {
}
}

public void addStatsFrom(ClusterStatsNodeResponse.AggregatedIndexStats incomingStats) {
this.total += incomingStats.total;
this.primaries += incomingStats.primaries;
}

/**
* Inner Fields used for creating XContent and parsing
*
Original file line number Diff line number Diff line change
@@ -32,17 +32,29 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.index.engine.SegmentsStats;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.store.StoreStats;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Transport action for obtaining cluster stats from node level
@@ -55,6 +67,7 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
private final NodeStats nodeStats;
private final ShardStats[] shardsStats;
private ClusterHealthStatus clusterStatus;
private AggregatedNodeLevelStats aggregatedNodeLevelStats;

public ClusterStatsNodeResponse(StreamInput in) throws IOException {
super(in);
@@ -64,7 +77,12 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
}
this.nodeInfo = new NodeInfo(in);
this.nodeStats = new NodeStats(in);
shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
this.shardsStats = in.readOptionalArray(ShardStats::new, ShardStats[]::new);
this.aggregatedNodeLevelStats = in.readOptionalWriteable(AggregatedNodeLevelStats::new);
} else {
this.shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
}
}

public ClusterStatsNodeResponse(
@@ -81,6 +99,24 @@ public ClusterStatsNodeResponse(
this.clusterStatus = clusterStatus;
}

public ClusterStatsNodeResponse(
DiscoveryNode node,
@Nullable ClusterHealthStatus clusterStatus,
NodeInfo nodeInfo,
NodeStats nodeStats,
ShardStats[] shardsStats,
boolean useAggregatedNodeLevelResponses
) {
super(node);
this.nodeInfo = nodeInfo;
this.nodeStats = nodeStats;
if (useAggregatedNodeLevelResponses) {
this.aggregatedNodeLevelStats = new AggregatedNodeLevelStats(node, shardsStats);
}
this.shardsStats = shardsStats;
this.clusterStatus = clusterStatus;
}

public NodeInfo nodeInfo() {
return this.nodeInfo;
}
@@ -101,6 +137,10 @@ public ShardStats[] shardsStats() {
return this.shardsStats;
}

public AggregatedNodeLevelStats getAggregatedNodeLevelStats() {
return aggregatedNodeLevelStats;
}

public static ClusterStatsNodeResponse readNodeResponse(StreamInput in) throws IOException {
return new ClusterStatsNodeResponse(in);
}
@@ -116,6 +156,95 @@ public void writeTo(StreamOutput out) throws IOException {
}
nodeInfo.writeTo(out);
nodeStats.writeTo(out);
out.writeArray(shardsStats);
if (out.getVersion().onOrAfter(Version.V_2_16_0)) {
if (aggregatedNodeLevelStats != null) {
out.writeOptionalArray(null);
out.writeOptionalWriteable(aggregatedNodeLevelStats);
} else {
out.writeOptionalArray(shardsStats);
out.writeOptionalWriteable(null);
}
} else {
out.writeArray(shardsStats);
}
}

/**
* Node level statistics used for ClusterStatsIndices for _cluster/stats call.
*/
public class AggregatedNodeLevelStats extends BaseNodeResponse {

CommonStats commonStats;
Map<String, AggregatedIndexStats> indexStatsMap;

protected AggregatedNodeLevelStats(StreamInput in) throws IOException {
super(in);
commonStats = in.readOptionalWriteable(CommonStats::new);
indexStatsMap = in.readMap(StreamInput::readString, AggregatedIndexStats::new);
}

protected AggregatedNodeLevelStats(DiscoveryNode node, ShardStats[] indexShardsStats) {
super(node);
this.commonStats = new CommonStats();
this.commonStats.docs = new DocsStats();
this.commonStats.store = new StoreStats();
this.commonStats.fieldData = new FieldDataStats();
this.commonStats.queryCache = new QueryCacheStats();
this.commonStats.completion = new CompletionStats();
this.commonStats.segments = new SegmentsStats();
this.indexStatsMap = new HashMap<>();

// Index Level Stats
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : indexShardsStats) {
AggregatedIndexStats indexShardStats = this.indexStatsMap.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new AggregatedIndexStats();
this.indexStatsMap.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
this.commonStats.docs.add(shardCommonStats.docs);
}
this.commonStats.store.add(shardCommonStats.store);
this.commonStats.fieldData.add(shardCommonStats.fieldData);
this.commonStats.queryCache.add(shardCommonStats.queryCache);
this.commonStats.completion.add(shardCommonStats.completion);
this.commonStats.segments.add(shardCommonStats.segments);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(commonStats);
out.writeMap(indexStatsMap, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
}
}

/**
* Node level statistics used for ClusterStatsIndices for _cluster/stats call.
*/
@PublicApi(since = "2.16.0")
public static class AggregatedIndexStats implements Writeable {
public int total = 0;
public int primaries = 0;

public AggregatedIndexStats(StreamInput in) throws IOException {
total = in.readVInt();
primaries = in.readVInt();
}

public AggregatedIndexStats() {}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total);
out.writeVInt(primaries);
}
}
}
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.Version;
import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
@@ -49,8 +50,13 @@ public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {

public ClusterStatsRequest(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
useAggregatedNodeLevelResponses = in.readOptionalBoolean();
}
}

private Boolean useAggregatedNodeLevelResponses = false;

/**
* Get stats from nodes based on the nodes ids specified. If none are passed, stats
* based on all nodes will be returned.
@@ -59,9 +65,20 @@ public ClusterStatsRequest(String... nodesIds) {
super(nodesIds);
}

public boolean useAggregatedNodeLevelResponses() {
return useAggregatedNodeLevelResponses;
}

public void useAggregatedNodeLevelResponses(boolean useAggregatedNodeLevelResponses) {
this.useAggregatedNodeLevelResponses = useAggregatedNodeLevelResponses;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_2_16_0)) {
out.writeOptionalBoolean(useAggregatedNodeLevelResponses);
}
}

}
Original file line number Diff line number Diff line change
@@ -50,4 +50,9 @@ public class ClusterStatsRequestBuilder extends NodesOperationRequestBuilder<
public ClusterStatsRequestBuilder(OpenSearchClient client, ClusterStatsAction action) {
super(client, action, new ClusterStatsRequest());
}

public final ClusterStatsRequestBuilder useAggregatedNodeLevelResponses(boolean useAggregatedNodeLevelResponses) {
request.useAggregatedNodeLevelResponses(useAggregatedNodeLevelResponses);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -217,9 +217,9 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
clusterStatus,
nodeInfo,
nodeStats,
shardsStats.toArray(new ShardStats[shardsStats.size()])
shardsStats.toArray(new ShardStats[0]),
nodeRequest.request.useAggregatedNodeLevelResponses()
);

}

/**
Original file line number Diff line number Diff line change
@@ -67,6 +67,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.setIncludeDiscoveryNodes(false);
clusterStatsRequest.useAggregatedNodeLevelResponses(true);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}

Original file line number Diff line number Diff line change
@@ -32,16 +32,38 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.Build;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodeStatsTests;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.index.engine.SegmentsStats;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.flush.FlushStats;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.StoreStats;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -158,6 +180,253 @@ public void testIngestStats() throws Exception {
);
}

public void testMultiVersionScenarioWithAggregatedNodeLevelStats() {
// Assuming the default behavior will be the type of response expected from a node of version prior to version containing
// aggregated node level information
int numberOfNodes = randomIntBetween(1, 4);
Index testIndex = new Index("test-index", "_na_");

List<ClusterStatsNodeResponse> defaultClusterStatsNodeResponses = new ArrayList<>();
List<ClusterStatsNodeResponse> aggregatedNodeLevelClusterStatsNodeResponses = new ArrayList<>();

for (int i = 0; i < numberOfNodes; i++) {
DiscoveryNode node = new DiscoveryNode("node-" + i, buildNewFakeTransportAddress(), Version.CURRENT);
CommonStats commonStats = createRandomCommonStats();
ShardStats[] shardStats = createshardStats(node, testIndex, commonStats);
ClusterStatsNodeResponse customClusterStatsResponse = createClusterStatsNodeResponse(node, shardStats, testIndex, true, false);
ClusterStatsNodeResponse customNodeLevelAggregatedClusterStatsResponse = createClusterStatsNodeResponse(
node,
shardStats,
testIndex,
false,
true
);
defaultClusterStatsNodeResponses.add(customClusterStatsResponse);
aggregatedNodeLevelClusterStatsNodeResponses.add(customNodeLevelAggregatedClusterStatsResponse);
}

ClusterStatsIndices defaultClusterStatsIndices = new ClusterStatsIndices(defaultClusterStatsNodeResponses, null, null);
ClusterStatsIndices aggregatedNodeLevelClusterStatsIndices = new ClusterStatsIndices(
aggregatedNodeLevelClusterStatsNodeResponses,
null,
null
);

assertClusterStatsIndicesEqual(defaultClusterStatsIndices, aggregatedNodeLevelClusterStatsIndices);
}

public void assertClusterStatsIndicesEqual(ClusterStatsIndices first, ClusterStatsIndices second) {
assertEquals(first.getIndexCount(), second.getIndexCount());

assertEquals(first.getShards().getIndices(), second.getShards().getIndices());
assertEquals(first.getShards().getTotal(), second.getShards().getTotal());
assertEquals(first.getShards().getPrimaries(), second.getShards().getPrimaries());
assertEquals(first.getShards().getMinIndexShards(), second.getShards().getMaxIndexShards());
assertEquals(first.getShards().getMinIndexPrimaryShards(), second.getShards().getMinIndexPrimaryShards());

// As AssertEquals with double is deprecated and can only be used to compare floating-point numbers
assertTrue(first.getShards().getReplication() == second.getShards().getReplication());
assertTrue(first.getShards().getAvgIndexShards() == second.getShards().getAvgIndexShards());
assertTrue(first.getShards().getMaxIndexPrimaryShards() == second.getShards().getMaxIndexPrimaryShards());
assertTrue(first.getShards().getAvgIndexPrimaryShards() == second.getShards().getAvgIndexPrimaryShards());
assertTrue(first.getShards().getMinIndexReplication() == second.getShards().getMinIndexReplication());
assertTrue(first.getShards().getAvgIndexReplication() == second.getShards().getAvgIndexReplication());
assertTrue(first.getShards().getMaxIndexReplication() == second.getShards().getMaxIndexReplication());

// Docs stats
assertEquals(first.getDocs().getAverageSizeInBytes(), second.getDocs().getAverageSizeInBytes());
assertEquals(first.getDocs().getDeleted(), second.getDocs().getDeleted());
assertEquals(first.getDocs().getCount(), second.getDocs().getCount());
assertEquals(first.getDocs().getTotalSizeInBytes(), second.getDocs().getTotalSizeInBytes());

// Store Stats
assertEquals(first.getStore().getSizeInBytes(), second.getStore().getSizeInBytes());
assertEquals(first.getStore().getSize(), second.getStore().getSize());
assertEquals(first.getStore().getReservedSize(), second.getStore().getReservedSize());

// Query Cache
assertEquals(first.getQueryCache().getCacheCount(), second.getQueryCache().getCacheCount());
assertEquals(first.getQueryCache().getCacheSize(), second.getQueryCache().getCacheSize());
assertEquals(first.getQueryCache().getEvictions(), second.getQueryCache().getEvictions());
assertEquals(first.getQueryCache().getHitCount(), second.getQueryCache().getHitCount());
assertEquals(first.getQueryCache().getTotalCount(), second.getQueryCache().getTotalCount());
assertEquals(first.getQueryCache().getMissCount(), second.getQueryCache().getMissCount());
assertEquals(first.getQueryCache().getMemorySize(), second.getQueryCache().getMemorySize());
assertEquals(first.getQueryCache().getMemorySizeInBytes(), second.getQueryCache().getMemorySizeInBytes());

// Completion Stats
assertEquals(first.getCompletion().getSizeInBytes(), second.getCompletion().getSizeInBytes());
assertEquals(first.getCompletion().getSize(), second.getCompletion().getSize());

// Segment Stats
assertEquals(first.getSegments().getBitsetMemory(), second.getSegments().getBitsetMemory());
assertEquals(first.getSegments().getCount(), second.getSegments().getCount());
assertEquals(first.getSegments().getBitsetMemoryInBytes(), second.getSegments().getBitsetMemoryInBytes());
assertEquals(first.getSegments().getFileSizes(), second.getSegments().getFileSizes());
assertEquals(first.getSegments().getIndexWriterMemoryInBytes(), second.getSegments().getIndexWriterMemoryInBytes());
assertEquals(first.getSegments().getVersionMapMemory(), second.getSegments().getVersionMapMemory());
assertEquals(first.getSegments().getVersionMapMemoryInBytes(), second.getSegments().getVersionMapMemoryInBytes());
}

public void testNodeIndexShardStatsSuccessfulSerializationDeserialization() throws IOException {
Index testIndex = new Index("test-index", "_na_");

DiscoveryNode node = new DiscoveryNode("node", buildNewFakeTransportAddress(), Version.CURRENT);
CommonStats commonStats = createRandomCommonStats();
ShardStats[] shardStats = createshardStats(node, testIndex, commonStats);
ClusterStatsNodeResponse aggregatedNodeLevelClusterStatsNodeResponse = createClusterStatsNodeResponse(
node,
shardStats,
testIndex,
false,
true
);

BytesStreamOutput out = new BytesStreamOutput();
aggregatedNodeLevelClusterStatsNodeResponse.writeTo(out);
StreamInput in = out.bytes().streamInput();

ClusterStatsNodeResponse newClusterStatsNodeRequest = new ClusterStatsNodeResponse(in);

ClusterStatsIndices beforeSerialization = new ClusterStatsIndices(List.of(aggregatedNodeLevelClusterStatsNodeResponse), null, null);
ClusterStatsIndices afterSerialization = new ClusterStatsIndices(List.of(newClusterStatsNodeRequest), null, null);

assertClusterStatsIndicesEqual(beforeSerialization, afterSerialization);

}

private ClusterStatsNodeResponse createClusterStatsNodeResponse(
DiscoveryNode node,
ShardStats[] shardStats,
Index index,
boolean defaultBehavior,
boolean aggregateNodeLevelStats
) {
NodeInfo nodeInfo = new NodeInfo(
Version.CURRENT,
Build.CURRENT,
node,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);

NodeStats nodeStats = new NodeStats(
node,
randomNonNegativeLong(),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
if (defaultBehavior) {
return new ClusterStatsNodeResponse(node, null, nodeInfo, nodeStats, shardStats);
} else {
return new ClusterStatsNodeResponse(node, null, nodeInfo, nodeStats, shardStats, aggregateNodeLevelStats);
}

}

private CommonStats createRandomCommonStats() {
CommonStats commonStats = new CommonStats(CommonStatsFlags.NONE);
commonStats.docs = new DocsStats(randomLongBetween(0, 10000), randomLongBetween(0, 100), randomLongBetween(0, 1000));
commonStats.store = new StoreStats(randomLongBetween(0, 100), randomLongBetween(0, 1000));
commonStats.indexing = new IndexingStats();
commonStats.completion = new CompletionStats();
commonStats.flush = new FlushStats(randomLongBetween(0, 100), randomLongBetween(0, 100), randomLongBetween(0, 100));
commonStats.fieldData = new FieldDataStats(randomLongBetween(0, 100), randomLongBetween(0, 100), null);
commonStats.queryCache = new QueryCacheStats(
randomLongBetween(0, 100),
randomLongBetween(0, 100),
randomLongBetween(0, 100),
randomLongBetween(0, 100),
randomLongBetween(0, 100)
);
commonStats.segments = new SegmentsStats();

return commonStats;
}

private ShardStats[] createshardStats(DiscoveryNode localNode, Index index, CommonStats commonStats) {
List<ShardStats> shardStatsList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
ShardRoutingState shardRoutingState = ShardRoutingState.fromValue((byte) randomIntBetween(2, 3));
ShardRouting shardRouting = TestShardRouting.newShardRouting(
index.getName(),
i,
localNode.getId(),
randomBoolean(),
shardRoutingState
);

Path path = createTempDir().resolve("indices")
.resolve(shardRouting.shardId().getIndex().getUUID())
.resolve(String.valueOf(shardRouting.shardId().id()));

ShardStats shardStats = new ShardStats(
shardRouting,
new ShardPath(false, path, path, shardRouting.shardId()),
commonStats,
null,
null,
null
);
shardStatsList.add(shardStats);
}

return shardStatsList.toArray(new ShardStats[0]);
}

private class MockShardStats extends ClusterStatsIndices.ShardStats {
public boolean equals(ClusterStatsIndices.ShardStats shardStats) {
return this.getIndices() == shardStats.getIndices()
&& this.getTotal() == shardStats.getTotal()
&& this.getPrimaries() == shardStats.getPrimaries()
&& this.getReplication() == shardStats.getReplication()
&& this.getMaxIndexShards() == shardStats.getMaxIndexShards()
&& this.getMinIndexShards() == shardStats.getMinIndexShards()
&& this.getAvgIndexShards() == shardStats.getAvgIndexShards()
&& this.getMaxIndexPrimaryShards() == shardStats.getMaxIndexPrimaryShards()
&& this.getMinIndexPrimaryShards() == shardStats.getMinIndexPrimaryShards()
&& this.getAvgIndexPrimaryShards() == shardStats.getAvgIndexPrimaryShards()
&& this.getMinIndexReplication() == shardStats.getMinIndexReplication()
&& this.getAvgIndexReplication() == shardStats.getAvgIndexReplication()
&& this.getMaxIndexReplication() == shardStats.getMaxIndexReplication();
}
}

private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) {
Settings.Builder settings = Settings.builder();
if (transportType != null) {