Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add resource usage trackers and resource usage collector service #10695

Merged
merged 3 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Admission control] Add Resource usage collector service and resource usage tracker ([#10695](https://github.com/opensearch-project/OpenSearch/pull/10695))
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10696](https://github.com/opensearch-project/OpenSearch/pull/10696))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
Expand Down Expand Up @@ -143,6 +144,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchPipelineStats searchPipelineStats;

@Nullable
private NodesResourceUsageStats resourceUsageStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -211,6 +215,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchPipelineStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
resourceUsageStats = in.readOptionalWriteable(NodesResourceUsageStats::new);
} else {
resourceUsageStats = null;
}
}

public NodeStats(
Expand All @@ -229,6 +238,7 @@ public NodeStats(
@Nullable DiscoveryStats discoveryStats,
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable NodesResourceUsageStats resourceUsageStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
Expand All @@ -254,6 +264,7 @@ public NodeStats(
this.discoveryStats = discoveryStats;
this.ingestStats = ingestStats;
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.resourceUsageStats = resourceUsageStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
Expand Down Expand Up @@ -357,6 +368,11 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() {
return adaptiveSelectionStats;
}

@Nullable
public NodesResourceUsageStats getResourceUsageStats() {
return resourceUsageStats;
}

@Nullable
public ScriptCacheStats getScriptCacheStats() {
return scriptCacheStats;
Expand Down Expand Up @@ -449,6 +465,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(resourceUsageStats);
}
}

@Override
Expand Down Expand Up @@ -539,7 +558,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);
}

if (getResourceUsageStats() != null) {
getResourceUsageStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ public enum Metric {
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline");
SEARCH_PIPELINE("search_pipeline"),
RESOURCE_USAGE_STATS("resource_usage_stats");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.decider.EnableAssignmentDecider;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -656,6 +657,10 @@ public void apply(Settings value, Settings current, Settings previous) {
SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to resource trackers
ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
Expand Down
25 changes: 23 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.resource.tracker.NodeResourceUsageTracker;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.PersistentTasksExecutor;
import org.opensearch.persistent.PersistentTasksExecutorRegistry;
Expand Down Expand Up @@ -818,7 +819,6 @@ protected Node(
remoteStoreStatsTrackerFactory,
recoverySettings
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down Expand Up @@ -1082,6 +1082,16 @@ protected Node(
transportService.getTaskManager(),
taskCancellationMonitoringSettings
);
final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
threadPool,
settings,
clusterService.getClusterSettings()
);
final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService(
nodeResourceUsageTracker,
clusterService,
threadPool
);
this.nodeService = new NodeService(
settings,
threadPool,
Expand All @@ -1103,7 +1113,8 @@ protected Node(
searchBackpressureService,
searchPipelineService,
fileCache,
taskCancellationMonitoringService
taskCancellationMonitoringService,
resourceUsageCollectorService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -1224,6 +1235,8 @@ protected Node(
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(NodeResourceUsageTracker.class).toInstance(nodeResourceUsageTracker);
b.bind(ResourceUsageCollectorService.class).toInstance(resourceUsageCollectorService);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
Expand Down Expand Up @@ -1340,6 +1353,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(NodeResourceUsageTracker.class).start();
injector.getInstance(ResourceUsageCollectorService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
Expand Down Expand Up @@ -1502,6 +1517,8 @@ private Node stop() {
injector.getInstance(ClusterService.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(NodeResourceUsageTracker.class).stop();
injector.getInstance(ResourceUsageCollectorService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
Expand Down Expand Up @@ -1565,6 +1582,10 @@ public synchronized void close() throws IOException {
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("resource_usage_tracker"));
toClose.add(injector.getInstance(NodeResourceUsageTracker.class));
toClose.add(() -> stopWatch.stop().start("resource_usage_collector"));
toClose.add(injector.getInstance(ResourceUsageCollectorService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Locale;

/**
* This represents the resource usage stats of a node along with the timestamp at which the stats object was created
* in the respective node
*/
public class NodeResourceUsageStats implements Writeable {
final String nodeId;
long timestamp;
double cpuUtilizationPercent;
double memoryUtilizationPercent;

public NodeResourceUsageStats(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
this.nodeId = nodeId;
this.timestamp = timestamp;
this.cpuUtilizationPercent = cpuUtilizationPercent;
this.memoryUtilizationPercent = memoryUtilizationPercent;
}

public NodeResourceUsageStats(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.timestamp = in.readLong();
this.cpuUtilizationPercent = in.readDouble();
this.memoryUtilizationPercent = in.readDouble();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeLong(this.timestamp);
out.writeDouble(this.cpuUtilizationPercent);
out.writeDouble(this.memoryUtilizationPercent);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder("NodeResourceUsageStats[");
sb.append(nodeId).append("](");
sb.append("Timestamp: ").append(timestamp);
sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent));
sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent));
sb.append(")");
return sb.toString();
}

NodeResourceUsageStats(NodeResourceUsageStats nodeResourceUsageStats) {
this(
nodeResourceUsageStats.nodeId,
nodeResourceUsageStats.timestamp,
nodeResourceUsageStats.memoryUtilizationPercent,
nodeResourceUsageStats.cpuUtilizationPercent
);
}

public double getMemoryUtilizationPercent() {
return memoryUtilizationPercent;
}

public double getCpuUtilizationPercent() {
return cpuUtilizationPercent;
}

public long getTimestamp() {
return timestamp;
}
}
9 changes: 7 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class NodeService implements Closeable {
private final ScriptService scriptService;
private final HttpServerTransport httpServerTransport;
private final ResponseCollectorService responseCollectorService;
private final ResourceUsageCollectorService resourceUsageCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
Expand Down Expand Up @@ -114,7 +115,8 @@ public class NodeService implements Closeable {
SearchBackpressureService searchBackpressureService,
SearchPipelineService searchPipelineService,
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService
TaskCancellationMonitoringService taskCancellationMonitoringService,
ResourceUsageCollectorService resourceUsageCollectorService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -137,6 +139,7 @@ public class NodeService implements Closeable {
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
this.resourceUsageCollectorService = resourceUsageCollectorService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
Expand Down Expand Up @@ -217,7 +220,8 @@ public NodeStats stats(
boolean weightedRoutingStats,
boolean fileCacheStats,
boolean taskCancellation,
boolean searchPipelineStats
boolean searchPipelineStats,
boolean resourceUsageStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -237,6 +241,7 @@ public NodeStats stats(
discoveryStats ? discovery.stats() : null,
ingest ? ingestService.stats() : null,
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
resourceUsageStats ? resourceUsageCollectorService.stats() : null,
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
Expand Down
Loading
Loading