Skip to content

Commit

Permalink
[bug fix] fix incorrect coordinator node search resource usages
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jul 9, 2024
1 parent 0684342 commit d1a6038
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque
private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
SearchTask searchTask = context.getTask();
List<TaskResourceInfo> tasksResourceUsages = searchRequestContext.getPhaseResourceUsage();
if (clusterService.getTaskResourceTrackingService() != null) {
clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask);

Check warning on line 149 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java#L149

Added line #L149 was not covered by tests
}
tasksResourceUsages.add(
new TaskResourceInfo(
searchTask.getAction(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.node.Node;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -92,6 +93,7 @@ public class ClusterService extends AbstractLifecycleComponent {
private RerouteService rerouteService;

private IndexingPressureService indexingPressureService;
private TaskResourceTrackingService taskResourceTrackingService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
Expand Down Expand Up @@ -265,6 +267,24 @@ public IndexingPressureService getIndexingPressureService() {
return indexingPressureService;
}

/**
* Getter for {@link TaskResourceTrackingService}, This method exposes task level resource usage for other components to use.
*
* @return TaskResourceTrackingService
*/
public TaskResourceTrackingService getTaskResourceTrackingService() {
return taskResourceTrackingService;
}

/**
* Setter for {@link TaskResourceTrackingService}
*
* @param taskResourceTrackingService taskResourceTrackingService
*/
public void setTaskResourceTrackingService(TaskResourceTrackingService taskResourceTrackingService) {
this.taskResourceTrackingService = taskResourceTrackingService;
}

public ClusterApplierService getClusterApplierService() {
return clusterApplierService;
}
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,7 @@ protected Node(
clusterService.getClusterSettings(),
threadPool
);
clusterService.setTaskResourceTrackingService(taskResourceTrackingService);

final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -51,6 +52,7 @@
/**
* Service that helps track resource usage of tasks running on a node.
*/
@PublicApi(since = "2.15.0")
@SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes")
public class TaskResourceTrackingService implements RunnableTaskExecutionListener {

Expand Down Expand Up @@ -357,6 +359,7 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() {
/**
* Listener that gets invoked when a task execution completes.
*/
@PublicApi(since = "2.15.0")
public interface TaskCompletionListener {
void onTaskCompleted(Task task);
}
Expand Down

0 comments on commit d1a6038

Please sign in to comment.