Skip to content

Commit

Permalink
Optimized ClusterStatsIndices to precomute shard stats
Browse files Browse the repository at this point in the history
Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Jun 18, 2024
1 parent b4692c8 commit 49d43de
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void testNodeCounts() {
NodeRoleSettings.NODE_ROLES_SETTING.getKey(),
roles.stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toList())
)
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
.build();
internalCluster().startNode(settings);
total++;
Expand Down Expand Up @@ -146,6 +147,7 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
int total = 1;
Settings settings = Settings.builder()
.putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), Collections.singletonList(DiscoveryNodeRole.MASTER_ROLE.roleName()))
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
.build();
internalCluster().startNode(settings);
waitForNodes(total);
Expand Down Expand Up @@ -177,7 +179,8 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices,
}

public void testIndicesShardStats() throws ExecutionException, InterruptedException {
internalCluster().startNode();
Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build();
internalCluster().startNode(settings);
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
Expand Down Expand Up @@ -222,7 +225,8 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept
}

public void testValuesSmokeScreen() throws IOException, ExecutionException, InterruptedException {
internalCluster().startNodes(randomIntBetween(1, 3));
Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build();
internalCluster().startNodes(randomIntBetween(1, 3), settings);
index("test1", "type", "1", "f", "f");

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
Expand Down Expand Up @@ -262,15 +266,25 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte

public void testAllocatedProcessors() throws Exception {
// start one node with 7 processors.
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
internalCluster().startNode(
Settings.builder()
.put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7)
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
.build()
);
waitForNodes(1);

ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
}

public void testClusterStatusWhenStateNotRecovered() throws Exception {
internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
internalCluster().startClusterManagerOnlyNode(
Settings.builder()
.put("gateway.recover_after_nodes", 2)
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
.build()
);
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));

Expand All @@ -286,7 +300,8 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception {
}

public void testFieldTypes() {
internalCluster().startNode();
Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build();
internalCluster().startNode(settings);
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
Expand Down Expand Up @@ -321,6 +336,7 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
.put("node.master", true)
.put("node.data", false)
.put("node.ingest", false)
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
.build();

internalCluster().startNodes(legacyMasterSettings);
Expand Down Expand Up @@ -351,6 +367,7 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName()
)
)
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
.build();

internalCluster().startNodes(clusterManagerNodeRoleSettings);
Expand All @@ -375,6 +392,7 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
.put("node.master", true)
.put("node.data", true)
.put("node.ingest", false)
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
.build();

internalCluster().startNodes(legacySeedDataNodeSettings);
Expand All @@ -400,6 +418,7 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
.put("node.master", false)
.put("node.data", true)
.put("node.ingest", false)
.put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean())
.build();

// can't start data-only node without assigning cluster-manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@

package org.opensearch.action.admin.cluster.stats;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.cache.query.QueryCacheStats;
Expand All @@ -56,6 +61,8 @@
@PublicApi(since = "1.0.0")
public class ClusterStatsIndices implements ToXContentFragment {

private static final Logger log = LogManager.getLogger(ClusterStatsIndices.class);

private int indexCount;
private ShardStats shards;
private DocsStats docs;
Expand All @@ -78,26 +85,42 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
this.segments = new SegmentsStats();

for (ClusterStatsNodeResponse r : nodeResponses) {
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
if (r.getNodeIndexShardStats() != null) {
r.getNodeIndexShardStats().indexStatsMap.forEach(
(index, indexCountStats) -> countsPerIndex.merge(index, indexCountStats, (v1, v2) -> {
v1.addStatsFrom(v2);
return v1;
})
);

docs.add(r.getNodeIndexShardStats().docs);
store.add(r.getNodeIndexShardStats().store);
fieldData.add(r.getNodeIndexShardStats().fieldData);
queryCache.add(r.getNodeIndexShardStats().queryCache);
completion.add(r.getNodeIndexShardStats().completion);
segments.add(r.getNodeIndexShardStats().segments);
} else {
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
}

Expand Down Expand Up @@ -185,7 +208,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public static class ShardStats implements ToXContentFragment {
public static class ShardStats implements ToXContentFragment, Writeable {

int indices;
int total;
Expand All @@ -202,6 +225,12 @@ public static class ShardStats implements ToXContentFragment {

public ShardStats() {}

public ShardStats(StreamInput in) throws IOException {
indices = in.readVInt();
total = in.readVInt();
primaries = in.readVInt();
}

/**
* number of indices in the cluster
*/
Expand Down Expand Up @@ -329,6 +358,19 @@ public void addIndexShardCount(ShardStats indexShardCount) {
}
}

public void addStatsFrom(ShardStats incomingStats) {
this.total += incomingStats.getTotal();
this.indices += incomingStats.getIndices();
this.primaries += incomingStats.getPrimaries();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indices);
out.writeVInt(total);
out.writeVInt(primaries);
}

/**
* Inner Fields used for creating XContent and parsing
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

package org.opensearch.action.admin.cluster.stats;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand All @@ -50,11 +53,12 @@
* @opensearch.internal
*/
public class ClusterStatsNodeResponse extends BaseNodeResponse {

private static final Logger log = LogManager.getLogger(ClusterStatsNodeResponse.class);
private final NodeInfo nodeInfo;
private final NodeStats nodeStats;
private final ShardStats[] shardsStats;
private ShardStats[] shardsStats;
private ClusterHealthStatus clusterStatus;
private NodeIndexShardStats nodeIndexShardStats;

public ClusterStatsNodeResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -64,7 +68,12 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
}
this.nodeInfo = new NodeInfo(in);
this.nodeStats = new NodeStats(in);
shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
this.shardsStats = in.readOptionalArray(ShardStats::new, ShardStats[]::new);
this.nodeIndexShardStats = in.readOptionalWriteable(NodeIndexShardStats::new);
} else {
this.shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
}
}

public ClusterStatsNodeResponse(
Expand All @@ -77,8 +86,27 @@ public ClusterStatsNodeResponse(
super(node);
this.nodeInfo = nodeInfo;
this.nodeStats = nodeStats;
this.clusterStatus = clusterStatus;
this.shardsStats = shardsStats;
}

public ClusterStatsNodeResponse(
DiscoveryNode node,
@Nullable ClusterHealthStatus clusterStatus,
NodeInfo nodeInfo,
NodeStats nodeStats,
ShardStats[] shardsStats,
boolean optimized
) {
super(node);
this.nodeInfo = nodeInfo;
this.nodeStats = nodeStats;
this.clusterStatus = clusterStatus;
if (optimized) {
log.info(node.getVersion().toString());
this.nodeIndexShardStats = new NodeIndexShardStats(node, shardsStats);
}
this.shardsStats = shardsStats;
}

public NodeInfo nodeInfo() {
Expand All @@ -101,6 +129,10 @@ public ShardStats[] shardsStats() {
return this.shardsStats;
}

public NodeIndexShardStats getNodeIndexShardStats() {
return nodeIndexShardStats;
}

public static ClusterStatsNodeResponse readNodeResponse(StreamInput in) throws IOException {
return new ClusterStatsNodeResponse(in);
}
Expand All @@ -116,6 +148,16 @@ public void writeTo(StreamOutput out) throws IOException {
}
nodeInfo.writeTo(out);
nodeStats.writeTo(out);
out.writeArray(shardsStats);
if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
if (nodeIndexShardStats != null) {
out.writeOptionalArray(null);
out.writeOptionalWriteable(nodeIndexShardStats);
} else {
out.writeOptionalArray(shardsStats);
out.writeOptionalWriteable(null);
}
} else {
out.writeArray(shardsStats);
}
}
}
Loading

0 comments on commit 49d43de

Please sign in to comment.