Skip to content

Commit

Permalink
Optimize NodeIndicesStats output behind flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranshu-S committed Jun 14, 2024
1 parent 93d507a commit 3557f10
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,9 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

IndicesService.OPTIMIZED_NODES_STATS_SETTING
)
)
);
Expand Down
21 changes: 20 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ public class IndicesService extends AbstractLifecycleComponent
Setting.Property.NodeScope
);

public static final String OPTIMIZED_NODES_STATS = "opensearch.experimental.optimization.nodes_stats.enabled";

public static final Setting<Boolean> OPTIMIZED_NODES_STATS_SETTING = Setting.boolSetting(
OPTIMIZED_NODES_STATS,
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public volatile boolean optimizedNodesStatsEnabled;

/**
* Used to specify SEGMENT replication type as the default replication strategy for all indices in a cluster. By default, this is false.
*/
Expand Down Expand Up @@ -433,6 +444,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes);
}
});
this.optimizedNodesStatsEnabled = OPTIMIZED_NODES_STATS_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(OPTIMIZED_NODES_STATS_SETTING, this::setOptimizedNodesStats);
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval);
this.metaStateService = metaStateService;
Expand Down Expand Up @@ -622,7 +635,9 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
break;
}
}

if (optimizedNodesStatsEnabled) {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, flags.getLevels());
}
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}

Expand Down Expand Up @@ -1896,6 +1911,10 @@ private void setIdFieldDataEnabled(boolean value) {
this.idFieldDataEnabled = value;
}

private void setOptimizedNodesStats(boolean optimizedNodesStatsEnabled) {
this.optimizedNodesStatsEnabled = optimizedNodesStatsEnabled;
}

private void updateDanglingIndicesInfo(Index index) {
assert DiscoveryNode.isDataNode(settings) : "dangling indices information should only be persisted on data nodes";
assert nodeWriteDanglingIndicesInfo : "writing dangling indices info is not enabled";
Expand Down
128 changes: 112 additions & 16 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices;

import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand Down Expand Up @@ -66,6 +67,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Arrays;

/**
* Global information on indices stats running on a specific node.
Expand All @@ -75,22 +77,24 @@
@PublicApi(since = "1.0.0")
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, CommonStats> statsByIndex;
private Map<Index, List<IndexShardStats>> statsByShard;
private boolean levelMode;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
// levelMode is enabled.
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
// contains statsByIndex
if (in.readBoolean()) {
statsByIndex = new HashMap<>();
readStatsByIndex(in);
}
}
// fall-back to normal mode.
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
readStatsByShards(in);
}
}

Expand All @@ -110,6 +114,54 @@ public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>>
if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}

this.levelMode = false;
}

public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>> statsByShard, SearchRequestStats searchRequestStats, String[] levels) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}

if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.indices.name()::equals)) {
this.statsByIndex = createStatsByIndex(statsByShard);
} else if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.shards.name()::equals)) {
this.statsByShard = statsByShard;
}

this.levelMode = true;
}

private void readStatsByIndex(StreamInput in) throws IOException {
int indexEntries = in.readVInt();
for (int i = 0; i<indexEntries; i++) {
Index index = new Index(in);
CommonStats commonStats = new CommonStats(in);
statsByIndex.put(index, commonStats);
}
}

private void readStatsByShards(StreamInput in) throws IOException {
int entries = in.readVInt();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
}

@Nullable
Expand Down Expand Up @@ -195,7 +247,32 @@ public RecoveryStats getRecoveryStats() {
@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);
out.writeBoolean(statsByShard != null);

if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
out.writeBoolean(statsByIndex == null);
if (statsByIndex!=null) {
writeStatsByIndex(out);
}
out.writeBoolean(statsByShard == null);
if (statsByShard!=null) {
writeStatsByShards(out);
}
} else {
writeStatsByShards(out);
}
}

private void writeStatsByIndex(StreamOutput out) throws IOException {
if (statsByIndex != null) {
out.writeVInt(statsByIndex.size());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
}
}

private void writeStatsByShards(StreamOutput out) throws IOException {
if (statsByShard != null) {
out.writeVInt(statsByShard.size());
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
Expand All @@ -218,20 +295,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
throw new IllegalArgumentException("level parameter must be one of [indices] or [node] or [shards] but was [" + level + "]");
}


// "node" level
builder.startObject(Fields.INDICES);
stats.toXContent(builder, params);

if ("indices".equals(level)) {
Map<Index, CommonStats> indexStats = createStatsByIndex();
if (levels.indices.equals(level)) {
builder.startObject(Fields.INDICES);
for (Map.Entry<Index, CommonStats> entry : indexStats.entrySet()) {
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
} else if ("shards".equals(level)) {
} else if (levels.shards.equals(level)) {
builder.startObject("shards");
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
Expand All @@ -251,7 +328,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private Map<Index, CommonStats> createStatsByIndex() {
private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShard) {
Map<Index, CommonStats> statsMap = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
if (!statsMap.containsKey(entry.getKey())) {
Expand Down Expand Up @@ -284,4 +361,23 @@ public List<IndexShardStats> getShardStats(Index index) {
static final class Fields {
static final String INDICES = "indices";
}

public enum levels {
nodes("nodes"),
indices("indices"),
shards("shards");
private final String name;

levels(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}

public boolean equals(String value) {
return this.name.equals(value);
}
}
}

0 comments on commit 3557f10

Please sign in to comment.