From 59a0f5d6d5596ae8e887ab2b71586df496692529 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Fri, 14 Jun 2024 08:50:27 +0530 Subject: [PATCH] Optimize NodeIndicesStats output behind flag --- .../org/opensearch/nodestats/NodeStatsIT.java | 54 +++++++ .../common/settings/ClusterSettings.java | 4 +- .../opensearch/indices/IndicesService.java | 21 ++- .../opensearch/indices/NodeIndicesStats.java | 137 ++++++++++++++++-- 4 files changed, 199 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java index f270cb1399072..63dc9523f7fb0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java @@ -10,6 +10,8 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; @@ -19,21 +21,26 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats; +import org.opensearch.indices.IndicesService; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; import org.hamcrest.MatcherAssert; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.singletonMap; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -243,6 +250,53 @@ public void testNodeIndicesStatsDocStatusStatsCreateDeleteUpdate() { } } + public void testNodeIndicesStatsResponseOptimised() { + internalCluster().startNode(); + ensureGreen(); + String indexName = "test1"; + index(indexName, "type", "1", "f", "f"); + refresh(); + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + + NodesStatsResponse response = client().admin().cluster().prepareNodesStats().get(); + response.getNodes().forEach(nodeStats -> { + assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex())); + assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex())); + }); + + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(IndicesService.OPTIMIZED_NODES_STATS, true)) + .get() + ); + + response = client().admin().cluster().prepareNodesStats().get(); + response.getNodes().forEach(nodeStats -> { + assertNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex())); + assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex())); + + }); + ArrayList level_indices = new ArrayList<>(); + level_indices.add("indices"); + CommonStatsFlags commonStatsFlags = new CommonStatsFlags().setLevels(level_indices.toArray(new String[0])); + response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get(); + response.getNodes().forEach(nodeStats -> { + assertNotNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex())); + assertNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex())); + }); + + ArrayList level_shards = new ArrayList<>(); + level_shards.add("shards"); + commonStatsFlags = new CommonStatsFlags().setLevels(level_shards.toArray(new String[0])); + response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get(); + response.getNodes().forEach(nodeStats -> { + assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex())); + assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex())); + }); + } + private void assertDocStatusStats() { DocStatusStats docStatusStats = client().admin() .cluster() diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 335615a6affb7..22a6e9e72102f 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -749,7 +749,9 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS, RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA, - SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING + SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, + + IndicesService.OPTIMIZED_NODES_STATS_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 251be8a990055..d928402fcc78c 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -237,6 +237,17 @@ public class IndicesService extends AbstractLifecycleComponent Setting.Property.NodeScope ); + public static final String OPTIMIZED_NODES_STATS = "opensearch.experimental.optimization.nodes_stats.enabled"; + + public static final Setting OPTIMIZED_NODES_STATS_SETTING = Setting.boolSetting( + OPTIMIZED_NODES_STATS, + false, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public volatile boolean optimizedNodesStatsEnabled; + /** * Used to specify SEGMENT replication type as the default replication strategy for all indices in a cluster. By default, this is false. */ @@ -433,6 +444,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes); } }); + this.optimizedNodesStatsEnabled = OPTIMIZED_NODES_STATS_SETTING.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(OPTIMIZED_NODES_STATS_SETTING, this::setOptimizedNodesStats); this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings); this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval); this.metaStateService = metaStateService; @@ -622,7 +635,9 @@ public NodeIndicesStats stats(CommonStatsFlags flags) { break; } } - + if (optimizedNodesStatsEnabled) { + return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, flags.getLevels()); + } return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats); } @@ -1896,6 +1911,10 @@ private void setIdFieldDataEnabled(boolean value) { this.idFieldDataEnabled = value; } + private void setOptimizedNodesStats(boolean optimizedNodesStatsEnabled) { + this.optimizedNodesStatsEnabled = optimizedNodesStatsEnabled; + } + private void updateDanglingIndicesInfo(Index index) { assert DiscoveryNode.isDataNode(settings) : "dangling indices information should only be persisted on data nodes"; assert nodeWriteDanglingIndicesInfo : "writing dangling indices info is not enabled"; diff --git a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java index 35b6fd395ee12..84d297b6e6456 100644 --- a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java @@ -32,6 +32,7 @@ package org.opensearch.indices; +import org.opensearch.Version; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.admin.indices.stats.ShardStats; @@ -63,6 +64,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -75,22 +77,21 @@ @PublicApi(since = "1.0.0") public class NodeIndicesStats implements Writeable, ToXContentFragment { private CommonStats stats; + private Map statsByIndex; private Map> statsByShard; public NodeIndicesStats(StreamInput in) throws IOException { stats = new CommonStats(in); + if (in.getVersion().onOrAfter(Version.V_2_13_0)) { + // contains statsByIndex + if (in.readBoolean()) { + statsByIndex = new HashMap<>(); + readStatsByIndex(in); + } + } if (in.readBoolean()) { - int entries = in.readVInt(); statsByShard = new HashMap<>(); - for (int i = 0; i < entries; i++) { - Index index = new Index(in); - int indexShardListSize = in.readVInt(); - List indexShardStats = new ArrayList<>(indexShardListSize); - for (int j = 0; j < indexShardListSize; j++) { - indexShardStats.add(new IndexShardStats(in)); - } - statsByShard.put(index, indexShardStats); - } + readStatsByShards(in); } } @@ -112,6 +113,57 @@ public NodeIndicesStats(CommonStats oldStats, Map> } } + public NodeIndicesStats( + CommonStats oldStats, + Map> statsByShard, + SearchRequestStats searchRequestStats, + String[] levels + ) { + // make a total common stats from old ones and current ones + this.stats = oldStats; + for (List shardStatsList : statsByShard.values()) { + for (IndexShardStats indexShardStats : shardStatsList) { + for (ShardStats shardStats : indexShardStats.getShards()) { + stats.add(shardStats.getStats()); + } + } + } + + if (this.stats.search != null) { + this.stats.search.setSearchRequestStats(searchRequestStats); + } + + if (levels != null) { + if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.indices::equals)) { + this.statsByIndex = createStatsByIndex(statsByShard); + } else if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.shards::equals)) { + this.statsByShard = statsByShard; + } + } + } + + private void readStatsByIndex(StreamInput in) throws IOException { + int indexEntries = in.readVInt(); + for (int i = 0; i < indexEntries; i++) { + Index index = new Index(in); + CommonStats commonStats = new CommonStats(in); + statsByIndex.put(index, commonStats); + } + } + + private void readStatsByShards(StreamInput in) throws IOException { + int entries = in.readVInt(); + for (int i = 0; i < entries; i++) { + Index index = new Index(in); + int indexShardListSize = in.readVInt(); + List indexShardStats = new ArrayList<>(indexShardListSize); + for (int j = 0; j < indexShardListSize; j++) { + indexShardStats.add(new IndexShardStats(in)); + } + statsByShard.put(index, indexShardStats); + } + } + @Nullable public StoreStats getStore() { return stats.getStore(); @@ -195,7 +247,31 @@ public RecoveryStats getRecoveryStats() { @Override public void writeTo(StreamOutput out) throws IOException { stats.writeTo(out); + + if (out.getVersion().onOrAfter(Version.V_2_13_0)) { + out.writeBoolean(statsByIndex != null); + if (statsByIndex != null) { + writeStatsByIndex(out); + } + } + out.writeBoolean(statsByShard != null); + if (statsByShard != null) { + writeStatsByShards(out); + } + } + + private void writeStatsByIndex(StreamOutput out) throws IOException { + if (statsByIndex != null) { + out.writeVInt(statsByIndex.size()); + for (Map.Entry entry : statsByIndex.entrySet()) { + entry.getKey().writeTo(out); + entry.getValue().writeTo(out); + } + } + } + + private void writeStatsByShards(StreamOutput out) throws IOException { if (statsByShard != null) { out.writeVInt(statsByShard.size()); for (Map.Entry> entry : statsByShard.entrySet()) { @@ -222,16 +298,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(Fields.INDICES); stats.toXContent(builder, params); - if ("indices".equals(level)) { - Map indexStats = createStatsByIndex(); + if (levels.indices.equals(level)) { builder.startObject(Fields.INDICES); - for (Map.Entry entry : indexStats.entrySet()) { + for (Map.Entry entry : statsByIndex.entrySet()) { builder.startObject(entry.getKey().getName()); entry.getValue().toXContent(builder, params); builder.endObject(); } builder.endObject(); - } else if ("shards".equals(level)) { + } else if (levels.shards.equals(level)) { builder.startObject("shards"); for (Map.Entry> entry : statsByShard.entrySet()) { builder.startArray(entry.getKey().getName()); @@ -251,7 +326,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - private Map createStatsByIndex() { + private Map createStatsByIndex(Map> statsByShard) { Map statsMap = new HashMap<>(); for (Map.Entry> entry : statsByShard.entrySet()) { if (!statsMap.containsKey(entry.getKey())) { @@ -276,6 +351,14 @@ public List getShardStats(Index index) { } } + public CommonStats getIndexStats(Index index) { + if (statsByIndex == null) { + return null; + } else { + return statsByIndex.get(index); + } + } + /** * Fields used for parsing and toXContent * @@ -284,4 +367,28 @@ public List getShardStats(Index index) { static final class Fields { static final String INDICES = "indices"; } + + /** + * Levels for the NodeIndicesStats + */ + public enum levels { + nodes("nodes"), + indices("indices"), + shards("shards"); + + private final String name; + + levels(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + + public boolean equals(String value) { + return this.name.equals(value); + } + } }