Skip to content

Commit

Permalink
Add resource usage trackers and resource usage collector service (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#9890)

---------

Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored and deshsidd committed Oct 18, 2023
1 parent afe4c55 commit 27472b9
Show file tree
Hide file tree
Showing 23 changed files with 1,210 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
Expand Down Expand Up @@ -142,6 +143,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchPipelineStats searchPipelineStats;

@Nullable
private NodesResourceUsageStats resourceUsageStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -198,6 +202,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchPipelineStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
resourceUsageStats = in.readOptionalWriteable(NodesResourceUsageStats::new);
} else {
resourceUsageStats = null;
}
}

public NodeStats(
Expand All @@ -216,6 +225,7 @@ public NodeStats(
@Nullable DiscoveryStats discoveryStats,
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable NodesResourceUsageStats resourceUsageStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
Expand All @@ -241,6 +251,7 @@ public NodeStats(
this.discoveryStats = discoveryStats;
this.ingestStats = ingestStats;
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.resourceUsageStats = resourceUsageStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
Expand Down Expand Up @@ -344,6 +355,11 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() {
return adaptiveSelectionStats;
}

@Nullable
public NodesResourceUsageStats getResourceUsageStats() {
return resourceUsageStats;
}

@Nullable
public ScriptCacheStats getScriptCacheStats() {
return scriptCacheStats;
Expand Down Expand Up @@ -430,6 +446,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
out.writeOptionalWriteable(resourceUsageStats);
}
}

@Override
Expand Down Expand Up @@ -520,7 +539,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);
}

if (getResourceUsageStats() != null) {
getResourceUsageStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ public enum Metric {
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline");
SEARCH_PIPELINE("search_pipeline"),
RESOURCE_USAGE_STATS("resource_usage_stats");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.decider.EnableAssignmentDecider;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -655,6 +656,10 @@ public void apply(Settings value, Settings current, Settings previous) {
SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to resource trackers
ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
Expand Down
25 changes: 23 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.resource.tracker.NodeResourceUsageTracker;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.PersistentTasksExecutor;
import org.opensearch.persistent.PersistentTasksExecutorRegistry;
Expand Down Expand Up @@ -805,7 +806,6 @@ protected Node(
remoteStoreStatsTrackerFactory,
recoverySettings
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down Expand Up @@ -1070,6 +1070,16 @@ protected Node(
transportService.getTaskManager(),
taskCancellationMonitoringSettings
);
final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
threadPool,
settings,
clusterService.getClusterSettings()
);
final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService(
nodeResourceUsageTracker,
clusterService,
threadPool
);
this.nodeService = new NodeService(
settings,
threadPool,
Expand All @@ -1091,7 +1101,8 @@ protected Node(
searchBackpressureService,
searchPipelineService,
fileCache,
taskCancellationMonitoringService
taskCancellationMonitoringService,
resourceUsageCollectorService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -1212,6 +1223,8 @@ protected Node(
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(NodeResourceUsageTracker.class).toInstance(nodeResourceUsageTracker);
b.bind(ResourceUsageCollectorService.class).toInstance(resourceUsageCollectorService);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
Expand Down Expand Up @@ -1328,6 +1341,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(NodeResourceUsageTracker.class).start();
injector.getInstance(ResourceUsageCollectorService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
Expand Down Expand Up @@ -1490,6 +1505,8 @@ private Node stop() {
injector.getInstance(ClusterService.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(NodeResourceUsageTracker.class).stop();
injector.getInstance(ResourceUsageCollectorService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
Expand Down Expand Up @@ -1553,6 +1570,10 @@ public synchronized void close() throws IOException {
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("resource_usage_tracker"));
toClose.add(injector.getInstance(NodeResourceUsageTracker.class));
toClose.add(() -> stopWatch.stop().start("resource_usage_collector"));
toClose.add(injector.getInstance(ResourceUsageCollectorService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Locale;

/**
* This represents the resource usage stats of a node along with the timestamp at which the stats object was created
* in the respective node
*/
public class NodeResourceUsageStats implements Writeable {
final String nodeId;
long timestamp;
double cpuUtilizationPercent;
double memoryUtilizationPercent;

public NodeResourceUsageStats(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
this.nodeId = nodeId;
this.timestamp = timestamp;
this.cpuUtilizationPercent = cpuUtilizationPercent;
this.memoryUtilizationPercent = memoryUtilizationPercent;
}

public NodeResourceUsageStats(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.timestamp = in.readLong();
this.cpuUtilizationPercent = in.readDouble();
this.memoryUtilizationPercent = in.readDouble();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeLong(this.timestamp);
out.writeDouble(this.cpuUtilizationPercent);
out.writeDouble(this.memoryUtilizationPercent);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder("NodeResourceUsageStats[");
sb.append(nodeId).append("](");
sb.append("Timestamp: ").append(timestamp);
sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent));
sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent));
sb.append(")");
return sb.toString();
}

NodeResourceUsageStats(NodeResourceUsageStats nodeResourceUsageStats) {
this(
nodeResourceUsageStats.nodeId,
nodeResourceUsageStats.timestamp,
nodeResourceUsageStats.memoryUtilizationPercent,
nodeResourceUsageStats.cpuUtilizationPercent
);
}

public double getMemoryUtilizationPercent() {
return memoryUtilizationPercent;
}

public double getCpuUtilizationPercent() {
return cpuUtilizationPercent;
}

public long getTimestamp() {
return timestamp;
}
}
9 changes: 7 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class NodeService implements Closeable {
private final ScriptService scriptService;
private final HttpServerTransport httpServerTransport;
private final ResponseCollectorService responseCollectorService;
private final ResourceUsageCollectorService resourceUsageCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
Expand Down Expand Up @@ -114,7 +115,8 @@ public class NodeService implements Closeable {
SearchBackpressureService searchBackpressureService,
SearchPipelineService searchPipelineService,
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService
TaskCancellationMonitoringService taskCancellationMonitoringService,
ResourceUsageCollectorService resourceUsageCollectorService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -137,6 +139,7 @@ public class NodeService implements Closeable {
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
this.resourceUsageCollectorService = resourceUsageCollectorService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
Expand Down Expand Up @@ -217,7 +220,8 @@ public NodeStats stats(
boolean weightedRoutingStats,
boolean fileCacheStats,
boolean taskCancellation,
boolean searchPipelineStats
boolean searchPipelineStats,
boolean resourceUsageStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -237,6 +241,7 @@ public NodeStats stats(
discoveryStats ? discovery.stats() : null,
ingest ? ingestService.stats() : null,
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
resourceUsageStats ? resourceUsageCollectorService.stats() : null,
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
Expand Down
Loading

0 comments on commit 27472b9

Please sign in to comment.