Skip to content

Commit

Permalink
Add support for multi-es version scenario and memory optimisation
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranshu-S committed Jun 12, 2024
1 parent fbc7d99 commit e8deaf8
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping

for (ClusterStatsNodeResponse r : nodeResponses) {

r.getNodeIndexShardStats().indexCountMap.forEach(
r.getNodeIndexShardStats().indexStatsMap.forEach(
(index, indexCountStats) -> countsPerIndex.merge(index, indexCountStats, (v1, v2) -> {
v1.addStatsFrom(v2);
return v1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand Down Expand Up @@ -65,8 +66,13 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
}
this.nodeInfo = new NodeInfo(in);
this.nodeStats = new NodeStats(in);
shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
this.nodeIndexShardStats = new NodeIndexShardStats(in);
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
this.nodeIndexShardStats = new NodeIndexShardStats(in);
shardsStats = null;
} else {
shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
nodeIndexShardStats = null;
}
}

public ClusterStatsNodeResponse(
Expand Down Expand Up @@ -123,7 +129,10 @@ public void writeTo(StreamOutput out) throws IOException {
}
nodeInfo.writeTo(out);
nodeStats.writeTo(out);
out.writeArray(shardsStats);
nodeIndexShardStats.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
nodeIndexShardStats.writeTo(out);
} else {
out.writeArray(shardsStats);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.index.cache.query.QueryCacheStats;
Expand All @@ -26,6 +25,9 @@
import java.util.HashMap;
import java.util.Map;

/**
* Node level statistics used for ClusterStatsIndices for _cluster/stats call.
*/
public class NodeIndexShardStats extends BaseNodeResponse {

DocsStats docs;
Expand All @@ -34,7 +36,7 @@ public class NodeIndexShardStats extends BaseNodeResponse {
QueryCacheStats queryCache;
CompletionStats completion;
SegmentsStats segments;
Map<String, ClusterStatsIndices.ShardStats> indexCountMap;
Map<String, ClusterStatsIndices.ShardStats> indexStatsMap;

protected NodeIndexShardStats(StreamInput in) throws IOException {
super(in);
Expand All @@ -44,7 +46,7 @@ protected NodeIndexShardStats(StreamInput in) throws IOException {
queryCache = in.readOptionalWriteable(QueryCacheStats::new);
completion = in.readOptionalWriteable(CompletionStats::new);
segments = in.readOptionalWriteable(SegmentsStats::new);
indexCountMap = in.readMap(StreamInput::readString, ClusterStatsIndices.ShardStats::new);
indexStatsMap = in.readMap(StreamInput::readString, ClusterStatsIndices.ShardStats::new);
}

protected NodeIndexShardStats(DiscoveryNode node, ShardStats[] indexShardsStats) {
Expand All @@ -56,14 +58,14 @@ protected NodeIndexShardStats(DiscoveryNode node, ShardStats[] indexShardsStats)
this.queryCache = new QueryCacheStats();
this.completion = new CompletionStats();
this.segments = new SegmentsStats();
this.indexCountMap = new HashMap<>();
this.indexStatsMap = new HashMap<>();

// Index Level Stats
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : indexShardsStats) {
ClusterStatsIndices.ShardStats indexShardStats = this.indexCountMap.get(shardStats.getShardRouting().getIndexName());
ClusterStatsIndices.ShardStats indexShardStats = this.indexStatsMap.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ClusterStatsIndices.ShardStats();
this.indexCountMap.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
this.indexStatsMap.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;
Expand Down Expand Up @@ -92,6 +94,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(queryCache);
out.writeOptionalWriteable(completion);
out.writeOptionalWriteable(segments);
out.writeMap(indexCountMap, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
out.writeMap(indexStatsMap, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
}
}

0 comments on commit e8deaf8

Please sign in to comment.