From d1a60387fd635778e01dd17cac7af970de058993 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 8 Jul 2024 18:33:50 -0700 Subject: [PATCH] [bug fix] fix incorrect coordinator node search resource usages Signed-off-by: Chenyang Ji --- .../core/listener/QueryInsightsListener.java | 3 +++ .../cluster/service/ClusterService.java | 20 +++++++++++++++++++ .../main/java/org/opensearch/node/Node.java | 1 + .../tasks/TaskResourceTrackingService.java | 3 +++ 4 files changed, 27 insertions(+) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index a1f810ad5987c..63ed7a51da896 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -145,6 +145,9 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { SearchTask searchTask = context.getTask(); List tasksResourceUsages = searchRequestContext.getPhaseResourceUsage(); + if (clusterService.getTaskResourceTrackingService() != null) { + clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask); + } tasksResourceUsages.add( new TaskResourceInfo( searchTask.getAction(), diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index c3c48dd8b87ef..4ece885a55b70 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -54,6 +54,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexingPressureService; import org.opensearch.node.Node; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.threadpool.ThreadPool; @@ -92,6 +93,7 @@ public class ClusterService extends AbstractLifecycleComponent { private RerouteService rerouteService; private IndexingPressureService indexingPressureService; + private TaskResourceTrackingService taskResourceTrackingService; public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); @@ -265,6 +267,24 @@ public IndexingPressureService getIndexingPressureService() { return indexingPressureService; } + /** + * Getter for {@link TaskResourceTrackingService}, This method exposes task level resource usage for other components to use. + * + * @return TaskResourceTrackingService + */ + public TaskResourceTrackingService getTaskResourceTrackingService() { + return taskResourceTrackingService; + } + + /** + * Setter for {@link TaskResourceTrackingService} + * + * @param taskResourceTrackingService taskResourceTrackingService + */ + public void setTaskResourceTrackingService(TaskResourceTrackingService taskResourceTrackingService) { + this.taskResourceTrackingService = taskResourceTrackingService; + } + public ClusterApplierService getClusterApplierService() { return clusterApplierService; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 85ef547e27787..ed058ac5c36d4 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1109,6 +1109,7 @@ protected Node( clusterService.getClusterSettings(), threadPool ); + clusterService.setTaskResourceTrackingService(taskResourceTrackingService); final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings( settings, diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index ca1957cdb1633..ea62093d0c893 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -16,6 +16,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -51,6 +52,7 @@ /** * Service that helps track resource usage of tasks running on a node. */ +@PublicApi(since = "2.15.0") @SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes") public class TaskResourceTrackingService implements RunnableTaskExecutionListener { @@ -357,6 +359,7 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() { /** * Listener that gets invoked when a task execution completes. */ + @PublicApi(since = "2.15.0") public interface TaskCompletionListener { void onTaskCompleted(Task task); }