diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index be16d4ea184fa..493c036a1cef0 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -172,6 +172,7 @@ import java.io.UncheckedIOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -695,7 +696,10 @@ public NodeIndicesStats stats(CommonStatsFlags flags) { } if (flags.getIncludeIndicesStatsByLevel()) { NodeIndicesStats.StatsLevel statsLevel = NodeIndicesStats.getAcceptedLevel(flags.getLevels()); - return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel); + if (!NodeIndicesStats.StatsLevel.SHARDS.equals(statsLevel)) { + return new NodeIndicesStats(commonStats, null, searchRequestStats, statsLevel, statsByIndex(this, flags)); + } + return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel, null); } else { return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats); } @@ -728,6 +732,46 @@ Map> statsByShard(final IndicesService indicesServi return statsByShard; } + private Map statsByIndex(final IndicesService indicesService, final CommonStatsFlags flags) { + final Map statsByIndex = new HashMap<>(); + for (final IndexService indexService : indicesService) { + for (final IndexShard indexShard : indexService) { + IndexShardStats indexShardStats = getIndexShardStats(indicesService, indexShard, flags); + updateStatsForIndex(statsByIndex, indexShard, indexShardStats); + } + } + return statsByIndex; + } + + private IndexShardStats getIndexShardStats( + final IndicesService indicesService, + final IndexShard indexShard, + final CommonStatsFlags flags + ) { + try { + return indicesService.indexShardStats(indicesService, indexShard, flags); + } catch (IllegalIndexShardStateException | AlreadyClosedException e) { + // we can safely ignore illegal state on ones that are closing for example + logger.trace(() -> new ParameterizedMessage("{} ignoring shard stats", indexShard.shardId()), e); + } + return null; + } + + private void updateStatsForIndex( + final Map statsByIndex, + final IndexShard indexShard, + final IndexShardStats indexShardStats + ) { + if (indexShardStats == null) { + return; + } + if (!statsByIndex.containsKey(indexShard.shardId().getIndex())) { + statsByIndex.put(indexShard.shardId().getIndex(), new CommonStats()); + } + Arrays.stream(indexShardStats.getShards()) + .forEach(shardStats -> statsByIndex.get(indexShard.shardId().getIndex()).add(shardStats.getStats())); + } + IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) { if (indexShard.routingEntry() == null) { return null; diff --git a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java index 83a759cdb71c5..1d834a5b527ac 100644 --- a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java @@ -68,6 +68,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; /** @@ -154,6 +155,50 @@ public NodeIndicesStats( } } + /** + * This constructor makes use of the fact that, if only statsByIndex are required, instead of + * computing them using statsByShard and maintaining both the objects, just pass on the one + * directly which is required and avoid maintaining both. + */ + public NodeIndicesStats( + CommonStats oldStats, + Map> statsByShard, + SearchRequestStats searchRequestStats, + StatsLevel level, + Map statsByIndex + ) { + // make a total common stats from old ones and current ones + this.stats = oldStats; + assert Objects.nonNull(statsByIndex) || Objects.nonNull(statsByShard) + : "both statsByIndex and statsByShard should not be null at the same point"; + if (statsByIndex != null) { + statsByIndex.values().forEach(commonStats -> stats.add(commonStats)); + } else { + for (List shardStatsList : statsByShard.values()) { + for (IndexShardStats indexShardStats : shardStatsList) { + for (ShardStats shardStats : indexShardStats.getShards()) { + stats.add(shardStats.getStats()); + } + } + } + } + + if (this.stats.search != null) { + this.stats.search.setSearchRequestStats(searchRequestStats); + } + + if (level != null) { + switch (level) { + case INDICES: + this.statsByIndex = statsByIndex; + break; + case SHARDS: + this.statsByShard = statsByShard; + break; + } + } + } + /** * By default, the levels passed from the transport action will be a list of strings, since NodeIndicesStats can * only aggregate on one level, we pick the first accepted level else we ignore if no known level is passed. Level is diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java index 07b0fbbe4a911..912e5173f8e01 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java @@ -101,6 +101,7 @@ public void processResponse(final ClusterStateResponse state) { statsRequest.clear() .addMetric(NodesStatsRequest.Metric.FS.metricName()) .indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)); + statsRequest.indices().setIncludeIndicesStatsByLevel(true); client.admin().cluster().nodesStats(statsRequest, new RestResponseListener(channel) { @Override diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 11902728eed07..03f6c3f295ce5 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -1112,6 +1112,16 @@ public MockNodeIndicesStats( super(oldStats, statsByShard, searchRequestStats, level); } + public MockNodeIndicesStats( + CommonStats oldStats, + Map> statsByShard, + SearchRequestStats searchRequestStats, + StatsLevel level, + Map statsByIndex + ) { + super(oldStats, statsByShard, searchRequestStats, level, statsByIndex); + } + public CommonStats getStats() { return this.stats; } @@ -1345,6 +1355,97 @@ public void testNodeIndicesStatsWithAndWithoutAggregations() throws IOException }); } + public void testNodeIndicesStatsWithAndWithoutAggregationsUsingLevelsToDecideAggregation() throws IOException { + + CommonStatsFlags commonStatsFlags = new CommonStatsFlags( + CommonStatsFlags.Flag.Docs, + CommonStatsFlags.Flag.Store, + CommonStatsFlags.Flag.Indexing, + CommonStatsFlags.Flag.Completion, + CommonStatsFlags.Flag.Flush, + CommonStatsFlags.Flag.FieldData, + CommonStatsFlags.Flag.QueryCache, + CommonStatsFlags.Flag.Segments + ); + + int numberOfIndexes = randomIntBetween(1, 3); + List indexList = new ArrayList<>(); + for (int i = 0; i < numberOfIndexes; i++) { + Index index = new Index("test-index-" + i, "_na_"); + indexList.add(index); + } + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + HashMap> statsByShards = createRandomShardByStats(indexList); + Map statsByIndex = new HashMap<>(); + for (Map.Entry> entry : statsByShards.entrySet()) { + if (!statsByIndex.containsKey(entry.getKey())) { + statsByIndex.put(entry.getKey(), new CommonStats()); + } + + for (IndexShardStats indexShardStats : entry.getValue()) { + for (ShardStats shardStats : indexShardStats.getShards()) { + statsByIndex.get(entry.getKey()).add(shardStats.getStats()); + } + } + } + + final MockNodeIndicesStats nonAggregatedNodeIndicesStats = new MockNodeIndicesStats( + new CommonStats(commonStatsFlags), + statsByShards, + new SearchRequestStats(clusterSettings) + ); + + commonStatsFlags.setIncludeIndicesStatsByLevel(true); + + Arrays.stream(NodeIndicesStats.StatsLevel.values()).forEach(level -> { + MockNodeIndicesStats aggregatedNodeIndicesStats; + if (!NodeIndicesStats.StatsLevel.SHARDS.equals(level)) { + aggregatedNodeIndicesStats = new MockNodeIndicesStats( + new CommonStats(commonStatsFlags), + null, + new SearchRequestStats(clusterSettings), + level, + statsByIndex + ); + } else { + aggregatedNodeIndicesStats = new MockNodeIndicesStats( + new CommonStats(commonStatsFlags), + statsByShards, + new SearchRequestStats(clusterSettings), + level, + null + ); + } + + XContentBuilder nonAggregatedBuilder = null; + XContentBuilder aggregatedBuilder = null; + try { + nonAggregatedBuilder = XContentFactory.jsonBuilder(); + nonAggregatedBuilder.startObject(); + nonAggregatedBuilder = nonAggregatedNodeIndicesStats.toXContent( + nonAggregatedBuilder, + new ToXContent.MapParams(Collections.singletonMap("level", level.getRestName())) + ); + nonAggregatedBuilder.endObject(); + Map nonAggregatedContentMap = xContentBuilderToMap(nonAggregatedBuilder); + + aggregatedBuilder = XContentFactory.jsonBuilder(); + aggregatedBuilder.startObject(); + aggregatedBuilder = aggregatedNodeIndicesStats.toXContent( + aggregatedBuilder, + new ToXContent.MapParams(Collections.singletonMap("level", level.getRestName())) + ); + aggregatedBuilder.endObject(); + Map aggregatedContentMap = xContentBuilderToMap(aggregatedBuilder); + + assertEquals(aggregatedContentMap, nonAggregatedContentMap); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + private CommonStats createRandomCommonStats() { CommonStats commonStats = new CommonStats(CommonStatsFlags.NONE); commonStats.docs = new DocsStats(randomLongBetween(0, 10000), randomLongBetween(0, 100), randomLongBetween(0, 1000));