Skip to content

Commit

Permalink
Optimized ClusterStatsIndices to precomute shard stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranshu-S committed Jun 13, 2024
1 parent b4692c8 commit 62035d1
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@

package org.opensearch.action.admin.cluster.stats;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.cache.query.QueryCacheStats;
Expand All @@ -56,6 +61,8 @@
@PublicApi(since = "1.0.0")
public class ClusterStatsIndices implements ToXContentFragment {

private static final Logger log = LogManager.getLogger(ClusterStatsIndices.class);

private int indexCount;
private ShardStats shards;
private DocsStats docs;
Expand All @@ -78,26 +85,42 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
this.segments = new SegmentsStats();

for (ClusterStatsNodeResponse r : nodeResponses) {
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
if (r.getNodeIndexShardStats() != null) {
r.getNodeIndexShardStats().indexStatsMap.forEach(
(index, indexCountStats) -> countsPerIndex.merge(index, indexCountStats, (v1, v2) -> {
v1.addStatsFrom(v2);
return v1;
})
);

docs.add(r.getNodeIndexShardStats().docs);
store.add(r.getNodeIndexShardStats().store);
fieldData.add(r.getNodeIndexShardStats().fieldData);
queryCache.add(r.getNodeIndexShardStats().queryCache);
completion.add(r.getNodeIndexShardStats().completion);
segments.add(r.getNodeIndexShardStats().segments);
} else {
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
}

Expand Down Expand Up @@ -185,7 +208,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public static class ShardStats implements ToXContentFragment {
public static class ShardStats implements ToXContentFragment, Writeable {

int indices;
int total;
Expand All @@ -202,6 +225,12 @@ public static class ShardStats implements ToXContentFragment {

public ShardStats() {}

public ShardStats(StreamInput in) throws IOException {
indices = in.readVInt();
total = in.readVInt();
primaries = in.readVInt();
}

/**
* number of indices in the cluster
*/
Expand Down Expand Up @@ -329,6 +358,19 @@ public void addIndexShardCount(ShardStats indexShardCount) {
}
}

public void addStatsFrom(ShardStats incomingStats){
this.total += incomingStats.getTotal();
this.indices += incomingStats.getIndices();
this.primaries += incomingStats.getPrimaries();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indices);
out.writeVInt(total);
out.writeVInt(primaries);
}

/**
* Inner Fields used for creating XContent and parsing
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@

package org.opensearch.action.admin.cluster.stats;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

Expand All @@ -50,11 +55,12 @@
* @opensearch.internal
*/
public class ClusterStatsNodeResponse extends BaseNodeResponse {

private static final Logger log = LogManager.getLogger(ClusterStatsNodeResponse.class);
private final NodeInfo nodeInfo;
private final NodeStats nodeStats;
private final ShardStats[] shardsStats;
private ShardStats[] shardsStats;
private ClusterHealthStatus clusterStatus;
private NodeIndexShardStats nodeIndexShardStats;

public ClusterStatsNodeResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -64,7 +70,12 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException {
}
this.nodeInfo = new NodeInfo(in);
this.nodeStats = new NodeStats(in);
shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
this.shardsStats = in.readOptionalArray(ShardStats::new, ShardStats[]::new);
this.nodeIndexShardStats = in.readOptionalWriteable(NodeIndexShardStats::new);
} else {
this.shardsStats = in.readArray(ShardStats::new, ShardStats[]::new);
}
}

public ClusterStatsNodeResponse(
Expand All @@ -77,8 +88,27 @@ public ClusterStatsNodeResponse(
super(node);
this.nodeInfo = nodeInfo;
this.nodeStats = nodeStats;
this.clusterStatus = clusterStatus;
this.shardsStats = shardsStats;
}

public ClusterStatsNodeResponse(
DiscoveryNode node,
@Nullable ClusterHealthStatus clusterStatus,
NodeInfo nodeInfo,
NodeStats nodeStats,
ShardStats[] shardsStats,
boolean optimized
) {
super(node);
this.nodeInfo = nodeInfo;
this.nodeStats = nodeStats;
this.clusterStatus = clusterStatus;
if (optimized) {
log.info(node.getVersion().toString());
this.nodeIndexShardStats = new NodeIndexShardStats(node, shardsStats);
}
this.shardsStats = shardsStats;
}

public NodeInfo nodeInfo() {
Expand All @@ -101,6 +131,10 @@ public ShardStats[] shardsStats() {
return this.shardsStats;
}

public NodeIndexShardStats getNodeIndexShardStats() {
return nodeIndexShardStats;
}

public static ClusterStatsNodeResponse readNodeResponse(StreamInput in) throws IOException {
return new ClusterStatsNodeResponse(in);
}
Expand All @@ -116,6 +150,16 @@ public void writeTo(StreamOutput out) throws IOException {
}
nodeInfo.writeTo(out);
nodeStats.writeTo(out);
out.writeArray(shardsStats);
if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
if (nodeIndexShardStats!=null) {
out.writeOptionalArray(null);
out.writeOptionalWriteable(nodeIndexShardStats);
} else {
out.writeOptionalArray(shardsStats);
out.writeOptionalWriteable(null);
}
} else {
out.writeArray(shardsStats);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.index.engine.SegmentsStats;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.store.StoreStats;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Node level statistics used for ClusterStatsIndices for _cluster/stats call.
*/
public class NodeIndexShardStats extends BaseNodeResponse {

DocsStats docs;
StoreStats store;
FieldDataStats fieldData;
QueryCacheStats queryCache;
CompletionStats completion;
SegmentsStats segments;
Map<String, ClusterStatsIndices.ShardStats> indexStatsMap;

protected NodeIndexShardStats(StreamInput in) throws IOException {
super(in);
docs = in.readOptionalWriteable(DocsStats::new);
store = in.readOptionalWriteable(StoreStats::new);
fieldData = in.readOptionalWriteable(FieldDataStats::new);
queryCache = in.readOptionalWriteable(QueryCacheStats::new);
completion = in.readOptionalWriteable(CompletionStats::new);
segments = in.readOptionalWriteable(SegmentsStats::new);
indexStatsMap = in.readMap(StreamInput::readString, ClusterStatsIndices.ShardStats::new);
}

protected NodeIndexShardStats(DiscoveryNode node, ShardStats[] indexShardsStats) {
super(node);

this.docs = new DocsStats();
this.store = new StoreStats();
this.fieldData = new FieldDataStats();
this.queryCache = new QueryCacheStats();
this.completion = new CompletionStats();
this.segments = new SegmentsStats();
this.indexStatsMap = new HashMap<>();

// Index Level Stats
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : indexShardsStats) {
ClusterStatsIndices.ShardStats indexShardStats = this.indexStatsMap.get(shardStats.getShardRouting().getIndexName());
if (indexShardStats == null) {
indexShardStats = new ClusterStatsIndices.ShardStats();
this.indexStatsMap.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
}

indexShardStats.total++;

CommonStats shardCommonStats = shardStats.getStats();

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
this.docs.add(shardCommonStats.docs);
}
this.store.add(shardCommonStats.store);
this.fieldData.add(shardCommonStats.fieldData);
this.queryCache.add(shardCommonStats.queryCache);
this.completion.add(shardCommonStats.completion);
this.segments.add(shardCommonStats.segments);
}

}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(docs);
out.writeOptionalWriteable(store);
out.writeOptionalWriteable(fieldData);
out.writeOptionalWriteable(queryCache);
out.writeOptionalWriteable(completion);
out.writeOptionalWriteable(segments);
out.writeMap(indexStatsMap, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.cluster.health.ClusterStateHealth;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.index.IndexService;
Expand Down Expand Up @@ -86,6 +87,15 @@ public class TransportClusterStatsAction extends TransportNodesAction<

private final NodeService nodeService;
private final IndicesService indicesService;
public static final String OPTIMIZED_CLUSTER_STATS = "opensearch.experimental.optimization.cluster_stats.enabled";

public static final Setting<Boolean> OPTIMIZED_CLUSTER_STATS_SETTING = Setting.boolSetting(
OPTIMIZED_CLUSTER_STATS,
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
public volatile boolean optimizedClusterStatsEnabled;

@Inject
public TransportClusterStatsAction(
Expand All @@ -110,6 +120,8 @@ public TransportClusterStatsAction(
);
this.nodeService = nodeService;
this.indicesService = indicesService;
this.optimizedClusterStatsEnabled = OPTIMIZED_CLUSTER_STATS_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(OPTIMIZED_CLUSTER_STATS_SETTING, this::setOptimizedClusterStats);
}

@Override
Expand Down Expand Up @@ -211,8 +223,11 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus();
}

return new ClusterStatsNodeResponse(nodeInfo.getNode(), clusterStatus, nodeInfo, nodeStats, shardsStats.toArray(new ShardStats[0]));
return new ClusterStatsNodeResponse(nodeInfo.getNode(), clusterStatus, nodeInfo, nodeStats, shardsStats.toArray(new ShardStats[0]), optimizedClusterStatsEnabled);
}

private void setOptimizedClusterStats(boolean optimizedClusterStatsEnabled) {
this.optimizedClusterStatsEnabled = optimizedClusterStatsEnabled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.logging.log4j.LogManager;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.action.search.SearchRequestSlowLog;
Expand Down Expand Up @@ -732,7 +733,10 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,

// Node Stats Optimisation Settings
TransportClusterStatsAction.OPTIMIZED_CLUSTER_STATS_SETTING
)
)
);
Expand Down

0 comments on commit 62035d1

Please sign in to comment.