Skip to content

Commit

Permalink
Making _cat/allocation API use indexLevelStats
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Sep 4, 2024
1 parent a60b668 commit e091736
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 1 deletion.
46 changes: 45 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -695,7 +696,10 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
}
if (flags.getIncludeIndicesStatsByLevel()) {
NodeIndicesStats.StatsLevel statsLevel = NodeIndicesStats.getAcceptedLevel(flags.getLevels());
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel);
if (!NodeIndicesStats.StatsLevel.SHARDS.equals(statsLevel)) {
return new NodeIndicesStats(commonStats, null, searchRequestStats, statsLevel, statsByIndex(this, flags));
}
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel, null);
} else {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}
Expand Down Expand Up @@ -728,6 +732,46 @@ Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesServi
return statsByShard;
}

private Map<Index, CommonStats> statsByIndex(final IndicesService indicesService, final CommonStatsFlags flags) {
final Map<Index, CommonStats> statsByIndex = new HashMap<>();
for (final IndexService indexService : indicesService) {
for (final IndexShard indexShard : indexService) {
IndexShardStats indexShardStats = getIndexShardStats(indicesService, indexShard, flags);
updateStatsForIndex(statsByIndex, indexShard, indexShardStats);
}
}
return statsByIndex;
}

private IndexShardStats getIndexShardStats(
final IndicesService indicesService,
final IndexShard indexShard,
final CommonStatsFlags flags
) {
try {
return indicesService.indexShardStats(indicesService, indexShard, flags);
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
// we can safely ignore illegal state on ones that are closing for example
logger.trace(() -> new ParameterizedMessage("{} ignoring shard stats", indexShard.shardId()), e);
}
return null;
}

private void updateStatsForIndex(
final Map<Index, CommonStats> statsByIndex,
final IndexShard indexShard,
final IndexShardStats indexShardStats
) {
if (indexShardStats == null) {
return;
}
if (!statsByIndex.containsKey(indexShard.shardId().getIndex())) {
statsByIndex.put(indexShard.shardId().getIndex(), new CommonStats());
}
Arrays.stream(indexShardStats.getShards())
.forEach(shardStats -> statsByIndex.get(indexShard.shardId().getIndex()).add(shardStats.getStats()));
}

IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) {
if (indexShard.routingEntry() == null) {
return null;
Expand Down
45 changes: 45 additions & 0 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
Expand Down Expand Up @@ -154,6 +155,50 @@ public NodeIndicesStats(
}
}

/**
* This constructor makes use of the fact that, if only statsByIndex are required, instead of
* computing them using statsByShard and maintaining both the objects, just pass on the one
* directly which is required and avoid maintaining both.
*/
public NodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
StatsLevel level,
Map<Index, CommonStats> statsByIndex
) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
assert Objects.nonNull(statsByIndex) || Objects.nonNull(statsByShard)
: "both statsByIndex and statsByShard should not be null at the same point";
if (statsByIndex != null) {
statsByIndex.values().forEach(commonStats -> stats.add(commonStats));
} else {
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}

if (level != null) {
switch (level) {
case INDICES:
this.statsByIndex = statsByIndex;
break;
case SHARDS:
this.statsByShard = statsByShard;
break;
}
}
}

/**
* By default, the levels passed from the transport action will be a list of strings, since NodeIndicesStats can
* only aggregate on one level, we pick the first accepted level else we ignore if no known level is passed. Level is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void processResponse(final ClusterStateResponse state) {
statsRequest.clear()
.addMetric(NodesStatsRequest.Metric.FS.metricName())
.indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store));
statsRequest.indices().setIncludeIndicesStatsByLevel(true);

client.admin().cluster().nodesStats(statsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,16 @@ public MockNodeIndicesStats(
super(oldStats, statsByShard, searchRequestStats, level);
}

public MockNodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
StatsLevel level,
Map<Index, CommonStats> statsByIndex
) {
super(oldStats, statsByShard, searchRequestStats, level, statsByIndex);
}

public CommonStats getStats() {
return this.stats;
}
Expand Down Expand Up @@ -1345,6 +1355,97 @@ public void testNodeIndicesStatsWithAndWithoutAggregations() throws IOException
});
}

public void testNodeIndicesStatsWithAndWithoutAggregationsUsingLevelsToDecideAggregation() throws IOException {

CommonStatsFlags commonStatsFlags = new CommonStatsFlags(
CommonStatsFlags.Flag.Docs,
CommonStatsFlags.Flag.Store,
CommonStatsFlags.Flag.Indexing,
CommonStatsFlags.Flag.Completion,
CommonStatsFlags.Flag.Flush,
CommonStatsFlags.Flag.FieldData,
CommonStatsFlags.Flag.QueryCache,
CommonStatsFlags.Flag.Segments
);

int numberOfIndexes = randomIntBetween(1, 3);
List<Index> indexList = new ArrayList<>();
for (int i = 0; i < numberOfIndexes; i++) {
Index index = new Index("test-index-" + i, "_na_");
indexList.add(index);
}

ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
HashMap<Index, List<IndexShardStats>> statsByShards = createRandomShardByStats(indexList);
Map<Index, CommonStats> statsByIndex = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShards.entrySet()) {
if (!statsByIndex.containsKey(entry.getKey())) {
statsByIndex.put(entry.getKey(), new CommonStats());
}

for (IndexShardStats indexShardStats : entry.getValue()) {
for (ShardStats shardStats : indexShardStats.getShards()) {
statsByIndex.get(entry.getKey()).add(shardStats.getStats());
}
}
}

final MockNodeIndicesStats nonAggregatedNodeIndicesStats = new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
statsByShards,
new SearchRequestStats(clusterSettings)
);

commonStatsFlags.setIncludeIndicesStatsByLevel(true);

Arrays.stream(NodeIndicesStats.StatsLevel.values()).forEach(level -> {
MockNodeIndicesStats aggregatedNodeIndicesStats;
if (!NodeIndicesStats.StatsLevel.SHARDS.equals(level)) {
aggregatedNodeIndicesStats = new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
null,
new SearchRequestStats(clusterSettings),
level,
statsByIndex
);
} else {
aggregatedNodeIndicesStats = new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
statsByShards,
new SearchRequestStats(clusterSettings),
level,
null
);
}

XContentBuilder nonAggregatedBuilder = null;
XContentBuilder aggregatedBuilder = null;
try {
nonAggregatedBuilder = XContentFactory.jsonBuilder();
nonAggregatedBuilder.startObject();
nonAggregatedBuilder = nonAggregatedNodeIndicesStats.toXContent(
nonAggregatedBuilder,
new ToXContent.MapParams(Collections.singletonMap("level", level.getRestName()))
);
nonAggregatedBuilder.endObject();
Map<String, Object> nonAggregatedContentMap = xContentBuilderToMap(nonAggregatedBuilder);

aggregatedBuilder = XContentFactory.jsonBuilder();
aggregatedBuilder.startObject();
aggregatedBuilder = aggregatedNodeIndicesStats.toXContent(
aggregatedBuilder,
new ToXContent.MapParams(Collections.singletonMap("level", level.getRestName()))
);
aggregatedBuilder.endObject();
Map<String, Object> aggregatedContentMap = xContentBuilderToMap(aggregatedBuilder);

assertEquals(aggregatedContentMap, nonAggregatedContentMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

private CommonStats createRandomCommonStats() {
CommonStats commonStats = new CommonStats(CommonStatsFlags.NONE);
commonStats.docs = new DocsStats(randomLongBetween(0, 10000), randomLongBetween(0, 100), randomLongBetween(0, 1000));
Expand Down

0 comments on commit e091736

Please sign in to comment.