diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java index 085a32593063a..a16060112a867 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java @@ -114,6 +114,7 @@ public void testNodeCounts() { NodeRoleSettings.NODE_ROLES_SETTING.getKey(), roles.stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toList()) ) + .put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()) .build(); internalCluster().startNode(settings); total++; @@ -146,6 +147,7 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException, int total = 1; Settings settings = Settings.builder() .putList(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), Collections.singletonList(DiscoveryNodeRole.MASTER_ROLE.roleName())) + .put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()) .build(); internalCluster().startNode(settings); waitForNodes(total); @@ -177,7 +179,8 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices, } public void testIndicesShardStats() throws ExecutionException, InterruptedException { - internalCluster().startNode(); + Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build(); + internalCluster().startNode(settings); ensureGreen(); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN)); @@ -222,7 +225,8 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept } public void testValuesSmokeScreen() throws IOException, ExecutionException, InterruptedException { - internalCluster().startNodes(randomIntBetween(1, 3)); + Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build(); + internalCluster().startNodes(randomIntBetween(1, 3), settings); index("test1", "type", "1", "f", "f"); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); @@ -262,7 +266,12 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte public void testAllocatedProcessors() throws Exception { // start one node with 7 processors. - internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build()); + internalCluster().startNode( + Settings.builder() + .put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7) + .put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()) + .build() + ); waitForNodes(1); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); @@ -270,7 +279,12 @@ public void testAllocatedProcessors() throws Exception { } public void testClusterStatusWhenStateNotRecovered() throws Exception { - internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build()); + internalCluster().startClusterManagerOnlyNode( + Settings.builder() + .put("gateway.recover_after_nodes", 2) + .put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()) + .build() + ); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED)); @@ -286,7 +300,8 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception { } public void testFieldTypes() { - internalCluster().startNode(); + Settings settings = Settings.builder().put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()).build(); + internalCluster().startNode(settings); ensureGreen(); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN)); @@ -321,6 +336,7 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I .put("node.master", true) .put("node.data", false) .put("node.ingest", false) + .put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()) .build(); internalCluster().startNodes(legacyMasterSettings); @@ -351,6 +367,7 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() ) ) + .put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()) .build(); internalCluster().startNodes(clusterManagerNodeRoleSettings); @@ -375,6 +392,7 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept .put("node.master", true) .put("node.data", true) .put("node.ingest", false) + .put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()) .build(); internalCluster().startNodes(legacySeedDataNodeSettings); @@ -400,6 +418,7 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException, .put("node.master", false) .put("node.data", true) .put("node.ingest", false) + .put(TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS, randomBoolean()) .build(); // can't start data-only node without assigning cluster-manager diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java index 26e554f44fca1..43f1496b6e39b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java @@ -32,8 +32,13 @@ package org.opensearch.action.admin.cluster.stats; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.cache.query.QueryCacheStats; @@ -56,6 +61,8 @@ @PublicApi(since = "1.0.0") public class ClusterStatsIndices implements ToXContentFragment { + private static final Logger log = LogManager.getLogger(ClusterStatsIndices.class); + private int indexCount; private ShardStats shards; private DocsStats docs; @@ -78,26 +85,42 @@ public ClusterStatsIndices(List nodeResponses, Mapping this.segments = new SegmentsStats(); for (ClusterStatsNodeResponse r : nodeResponses) { - for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) { - ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName()); - if (indexShardStats == null) { - indexShardStats = new ShardStats(); - countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats); - } - - indexShardStats.total++; - - CommonStats shardCommonStats = shardStats.getStats(); - - if (shardStats.getShardRouting().primary()) { - indexShardStats.primaries++; - docs.add(shardCommonStats.docs); + if (r.getNodeIndexShardStats() != null) { + r.getNodeIndexShardStats().indexStatsMap.forEach( + (index, indexCountStats) -> countsPerIndex.merge(index, indexCountStats, (v1, v2) -> { + v1.addStatsFrom(v2); + return v1; + }) + ); + + docs.add(r.getNodeIndexShardStats().docs); + store.add(r.getNodeIndexShardStats().store); + fieldData.add(r.getNodeIndexShardStats().fieldData); + queryCache.add(r.getNodeIndexShardStats().queryCache); + completion.add(r.getNodeIndexShardStats().completion); + segments.add(r.getNodeIndexShardStats().segments); + } else { + for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) { + ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName()); + if (indexShardStats == null) { + indexShardStats = new ShardStats(); + countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats); + } + + indexShardStats.total++; + + CommonStats shardCommonStats = shardStats.getStats(); + + if (shardStats.getShardRouting().primary()) { + indexShardStats.primaries++; + docs.add(shardCommonStats.docs); + } + store.add(shardCommonStats.store); + fieldData.add(shardCommonStats.fieldData); + queryCache.add(shardCommonStats.queryCache); + completion.add(shardCommonStats.completion); + segments.add(shardCommonStats.segments); } - store.add(shardCommonStats.store); - fieldData.add(shardCommonStats.fieldData); - queryCache.add(shardCommonStats.queryCache); - completion.add(shardCommonStats.completion); - segments.add(shardCommonStats.segments); } } @@ -185,7 +208,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws * @opensearch.api */ @PublicApi(since = "1.0.0") - public static class ShardStats implements ToXContentFragment { + public static class ShardStats implements ToXContentFragment, Writeable { int indices; int total; @@ -202,6 +225,12 @@ public static class ShardStats implements ToXContentFragment { public ShardStats() {} + public ShardStats(StreamInput in) throws IOException { + indices = in.readVInt(); + total = in.readVInt(); + primaries = in.readVInt(); + } + /** * number of indices in the cluster */ @@ -329,6 +358,19 @@ public void addIndexShardCount(ShardStats indexShardCount) { } } + public void addStatsFrom(ShardStats incomingStats) { + this.total += incomingStats.getTotal(); + this.indices += incomingStats.getIndices(); + this.primaries += incomingStats.getPrimaries(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(indices); + out.writeVInt(total); + out.writeVInt(primaries); + } + /** * Inner Fields used for creating XContent and parsing * diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java index 1b25bf84356d6..b3c629fc48400 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java @@ -32,6 +32,9 @@ package org.opensearch.action.admin.cluster.stats; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.indices.stats.ShardStats; @@ -50,11 +53,12 @@ * @opensearch.internal */ public class ClusterStatsNodeResponse extends BaseNodeResponse { - + private static final Logger log = LogManager.getLogger(ClusterStatsNodeResponse.class); private final NodeInfo nodeInfo; private final NodeStats nodeStats; - private final ShardStats[] shardsStats; + private ShardStats[] shardsStats; private ClusterHealthStatus clusterStatus; + private NodeIndexShardStats nodeIndexShardStats; public ClusterStatsNodeResponse(StreamInput in) throws IOException { super(in); @@ -64,7 +68,12 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException { } this.nodeInfo = new NodeInfo(in); this.nodeStats = new NodeStats(in); - shardsStats = in.readArray(ShardStats::new, ShardStats[]::new); + if (in.getVersion().onOrAfter(Version.V_2_13_0)) { + this.shardsStats = in.readOptionalArray(ShardStats::new, ShardStats[]::new); + this.nodeIndexShardStats = in.readOptionalWriteable(NodeIndexShardStats::new); + } else { + this.shardsStats = in.readArray(ShardStats::new, ShardStats[]::new); + } } public ClusterStatsNodeResponse( @@ -77,8 +86,27 @@ public ClusterStatsNodeResponse( super(node); this.nodeInfo = nodeInfo; this.nodeStats = nodeStats; + this.clusterStatus = clusterStatus; this.shardsStats = shardsStats; + } + + public ClusterStatsNodeResponse( + DiscoveryNode node, + @Nullable ClusterHealthStatus clusterStatus, + NodeInfo nodeInfo, + NodeStats nodeStats, + ShardStats[] shardsStats, + boolean optimized + ) { + super(node); + this.nodeInfo = nodeInfo; + this.nodeStats = nodeStats; this.clusterStatus = clusterStatus; + if (optimized) { + log.info(node.getVersion().toString()); + this.nodeIndexShardStats = new NodeIndexShardStats(node, shardsStats); + } + this.shardsStats = shardsStats; } public NodeInfo nodeInfo() { @@ -101,6 +129,10 @@ public ShardStats[] shardsStats() { return this.shardsStats; } + public NodeIndexShardStats getNodeIndexShardStats() { + return nodeIndexShardStats; + } + public static ClusterStatsNodeResponse readNodeResponse(StreamInput in) throws IOException { return new ClusterStatsNodeResponse(in); } @@ -116,6 +148,16 @@ public void writeTo(StreamOutput out) throws IOException { } nodeInfo.writeTo(out); nodeStats.writeTo(out); - out.writeArray(shardsStats); + if (out.getVersion().onOrAfter(Version.V_2_13_0)) { + if (nodeIndexShardStats != null) { + out.writeOptionalArray(null); + out.writeOptionalWriteable(nodeIndexShardStats); + } else { + out.writeOptionalArray(shardsStats); + out.writeOptionalWriteable(null); + } + } else { + out.writeArray(shardsStats); + } } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/NodeIndexShardStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/NodeIndexShardStats.java new file mode 100644 index 0000000000000..2e364a97fa551 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/NodeIndexShardStats.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.stats; + +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.index.cache.query.QueryCacheStats; +import org.opensearch.index.engine.SegmentsStats; +import org.opensearch.index.fielddata.FieldDataStats; +import org.opensearch.index.shard.DocsStats; +import org.opensearch.index.store.StoreStats; +import org.opensearch.search.suggest.completion.CompletionStats; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Node level statistics used for ClusterStatsIndices for _cluster/stats call. + */ +public class NodeIndexShardStats extends BaseNodeResponse { + + DocsStats docs; + StoreStats store; + FieldDataStats fieldData; + QueryCacheStats queryCache; + CompletionStats completion; + SegmentsStats segments; + Map indexStatsMap; + + protected NodeIndexShardStats(StreamInput in) throws IOException { + super(in); + docs = in.readOptionalWriteable(DocsStats::new); + store = in.readOptionalWriteable(StoreStats::new); + fieldData = in.readOptionalWriteable(FieldDataStats::new); + queryCache = in.readOptionalWriteable(QueryCacheStats::new); + completion = in.readOptionalWriteable(CompletionStats::new); + segments = in.readOptionalWriteable(SegmentsStats::new); + indexStatsMap = in.readMap(StreamInput::readString, ClusterStatsIndices.ShardStats::new); + } + + protected NodeIndexShardStats(DiscoveryNode node, ShardStats[] indexShardsStats) { + super(node); + + this.docs = new DocsStats(); + this.store = new StoreStats(); + this.fieldData = new FieldDataStats(); + this.queryCache = new QueryCacheStats(); + this.completion = new CompletionStats(); + this.segments = new SegmentsStats(); + this.indexStatsMap = new HashMap<>(); + + // Index Level Stats + for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : indexShardsStats) { + ClusterStatsIndices.ShardStats indexShardStats = this.indexStatsMap.get(shardStats.getShardRouting().getIndexName()); + if (indexShardStats == null) { + indexShardStats = new ClusterStatsIndices.ShardStats(); + this.indexStatsMap.put(shardStats.getShardRouting().getIndexName(), indexShardStats); + } + + indexShardStats.total++; + + CommonStats shardCommonStats = shardStats.getStats(); + + if (shardStats.getShardRouting().primary()) { + indexShardStats.primaries++; + this.docs.add(shardCommonStats.docs); + } + this.store.add(shardCommonStats.store); + this.fieldData.add(shardCommonStats.fieldData); + this.queryCache.add(shardCommonStats.queryCache); + this.completion.add(shardCommonStats.completion); + this.segments.add(shardCommonStats.segments); + } + + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalWriteable(docs); + out.writeOptionalWriteable(store); + out.writeOptionalWriteable(fieldData); + out.writeOptionalWriteable(queryCache); + out.writeOptionalWriteable(completion); + out.writeOptionalWriteable(segments); + out.writeMap(indexStatsMap, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream)); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 9c5dcc9e9de3f..ee46bddefb2aa 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -46,6 +46,7 @@ import org.opensearch.cluster.health.ClusterStateHealth; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Setting; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.index.IndexService; @@ -86,6 +87,15 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final NodeService nodeService; private final IndicesService indicesService; + public static final String OPTIMIZED_CLUSTER_STATS = "opensearch.experimental.optimization.cluster_stats.enabled"; + + public static final Setting OPTIMIZED_CLUSTER_STATS_SETTING = Setting.boolSetting( + OPTIMIZED_CLUSTER_STATS, + false, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + public volatile boolean optimizedClusterStatsEnabled; @Inject public TransportClusterStatsAction( @@ -110,6 +120,8 @@ public TransportClusterStatsAction( ); this.nodeService = nodeService; this.indicesService = indicesService; + this.optimizedClusterStatsEnabled = OPTIMIZED_CLUSTER_STATS_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings().addSettingsUpdateConsumer(OPTIMIZED_CLUSTER_STATS_SETTING, this::setOptimizedClusterStats); } @Override @@ -211,8 +223,18 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus(); } - return new ClusterStatsNodeResponse(nodeInfo.getNode(), clusterStatus, nodeInfo, nodeStats, shardsStats.toArray(new ShardStats[0])); + return new ClusterStatsNodeResponse( + nodeInfo.getNode(), + clusterStatus, + nodeInfo, + nodeStats, + shardsStats.toArray(new ShardStats[0]), + optimizedClusterStatsEnabled + ); + } + private void setOptimizedClusterStats(boolean optimizedClusterStatsEnabled) { + this.optimizedClusterStatsEnabled = optimizedClusterStatsEnabled; } /** diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index dab0f6bcf1c85..37873462ae32e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -33,6 +33,7 @@ import org.apache.logging.log4j.LogManager; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; +import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; import org.opensearch.action.search.CreatePitController; import org.opensearch.action.search.SearchRequestSlowLog; @@ -732,7 +733,10 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, - RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING + RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, + + // Node Stats Optimisation Settings + TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS_SETTING ) ) );