From d1a60387fd635778e01dd17cac7af970de058993 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 8 Jul 2024 18:33:50 -0700 Subject: [PATCH 1/4] [bug fix] fix incorrect coordinator node search resource usages Signed-off-by: Chenyang Ji --- .../core/listener/QueryInsightsListener.java | 3 +++ .../cluster/service/ClusterService.java | 20 +++++++++++++++++++ .../main/java/org/opensearch/node/Node.java | 1 + .../tasks/TaskResourceTrackingService.java | 3 +++ 4 files changed, 27 insertions(+) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index a1f810ad5987c..63ed7a51da896 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -145,6 +145,9 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { SearchTask searchTask = context.getTask(); List tasksResourceUsages = searchRequestContext.getPhaseResourceUsage(); + if (clusterService.getTaskResourceTrackingService() != null) { + clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask); + } tasksResourceUsages.add( new TaskResourceInfo( searchTask.getAction(), diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index c3c48dd8b87ef..4ece885a55b70 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -54,6 +54,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexingPressureService; import org.opensearch.node.Node; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.threadpool.ThreadPool; @@ -92,6 +93,7 @@ public class ClusterService extends AbstractLifecycleComponent { private RerouteService rerouteService; private IndexingPressureService indexingPressureService; + private TaskResourceTrackingService taskResourceTrackingService; public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); @@ -265,6 +267,24 @@ public IndexingPressureService getIndexingPressureService() { return indexingPressureService; } + /** + * Getter for {@link TaskResourceTrackingService}, This method exposes task level resource usage for other components to use. + * + * @return TaskResourceTrackingService + */ + public TaskResourceTrackingService getTaskResourceTrackingService() { + return taskResourceTrackingService; + } + + /** + * Setter for {@link TaskResourceTrackingService} + * + * @param taskResourceTrackingService taskResourceTrackingService + */ + public void setTaskResourceTrackingService(TaskResourceTrackingService taskResourceTrackingService) { + this.taskResourceTrackingService = taskResourceTrackingService; + } + public ClusterApplierService getClusterApplierService() { return clusterApplierService; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 85ef547e27787..ed058ac5c36d4 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1109,6 +1109,7 @@ protected Node( clusterService.getClusterSettings(), threadPool ); + clusterService.setTaskResourceTrackingService(taskResourceTrackingService); final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings( settings, diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index ca1957cdb1633..ea62093d0c893 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -16,6 +16,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -51,6 +52,7 @@ /** * Service that helps track resource usage of tasks running on a node. */ +@PublicApi(since = "2.15.0") @SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes") public class TaskResourceTrackingService implements RunnableTaskExecutionListener { @@ -357,6 +359,7 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() { /** * Listener that gets invoked when a task execution completes. */ + @PublicApi(since = "2.15.0") public interface TaskCompletionListener { void onTaskCompleted(Task task); } From d00b2dfbd08f878a493bb172ed63486e33622b3f Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 9 Jul 2024 00:22:51 -0700 Subject: [PATCH 2/4] fix bug on serialization when passing task resource usage to coordinator Signed-off-by: Chenyang Ji --- .../insights/rules/model/Attribute.java | 70 +++++++++++++++++++ .../rules/model/SearchQueryRecord.java | 8 ++- .../tasks/TaskResourceTrackingService.java | 4 +- 3 files changed, 78 insertions(+), 4 deletions(-) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index dcdb085fdc6fa..80c80fb6b6937 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -8,11 +8,18 @@ package org.opensearch.plugin.insights.rules.model; +import org.apache.lucene.util.ArrayUtil; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Locale; +import java.util.Map; /** * Valid attributes for a search query record @@ -75,6 +82,69 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO out.writeString(attribute.toString()); } + /** + * Write Attribute value to a StreamOutput + * @param out the StreamOutput to write + * @param attributeValue the Attribute value to write + */ + @SuppressWarnings("unchecked") + public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException { + if (attributeValue instanceof List) { + out.writeList((List) attributeValue); + } else { + out.writeGenericValue(attributeValue); + } + } + + /** + * Read attribute value from the input stream given the Attribute type + * + * @param in the {@link StreamInput} input to read + * @param attribute attribute type to differentiate between Source and others + * @return parse value + * @throws IOException IOException + */ + public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException { + if (attribute == Attribute.TASK_RESOURCE_USAGES) { + return in.readList(TaskResourceInfo::readFromStream); + } else { + return in.readGenericValue(); + } + } + + /** + * Read attribute map from the input stream + * + * @param in the {@link StreamInput} to read + * @return parsed attribute map + * @throws IOException IOException + */ + public static Map readAttributeMap(StreamInput in) throws IOException { + int size = readArraySize(in); + if (size == 0) { + return Collections.emptyMap(); + } + Map map = new HashMap<>(size); + + for (int i = 0; i < size; i++) { + Attribute key = readFromStream(in); + Object value = readAttributeValue(in, key); + map.put(key, value); + } + return map; + } + + private static int readArraySize(StreamInput in) throws IOException { + final int arraySize = in.readVInt(); + if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) { + throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize); + } + if (arraySize < 0) { + throw new NegativeArraySizeException("array size must be positive but was: " + arraySize); + } + return arraySize; + } + @Override public String toString() { return this.name().toLowerCase(Locale.ROOT); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index fec00a680ae58..a6e6b4a9051f0 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -45,7 +45,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce measurements = new HashMap<>(); in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); - this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); + this.attributes = Attribute.readAttributeMap(in); } /** @@ -134,7 +134,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten public void writeTo(final StreamOutput out) throws IOException { out.writeLong(timestamp); out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); - out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); + out.writeMap( + attributes, + (stream, attribute) -> Attribute.writeTo(out, attribute), + (stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue) + ); } /** diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index ea62093d0c893..80c9e2227e9fe 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -52,7 +52,7 @@ /** * Service that helps track resource usage of tasks running on a node. */ -@PublicApi(since = "2.15.0") +@PublicApi(since = "2.16.0") @SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes") public class TaskResourceTrackingService implements RunnableTaskExecutionListener { @@ -359,7 +359,7 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() { /** * Listener that gets invoked when a task execution completes. */ - @PublicApi(since = "2.15.0") + @PublicApi(since = "2.16.0") public interface TaskCompletionListener { void onTaskCompleted(Task task); } From 720cefb1bcf9670bfe3c6bd740ecc74bb04caba2 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 9 Jul 2024 12:35:39 -0700 Subject: [PATCH 3/4] add more unit tests Signed-off-by: Chenyang Ji --- .../insights/QueryInsightsTestUtils.java | 21 +++++++++++++++++++ .../listener/QueryInsightsListenerTests.java | 5 +++++ 2 files changed, 26 insertions(+) diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 7fa4e9841c20e..54cafdc97ea74 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -12,6 +12,8 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.util.Maps; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; @@ -80,6 +82,25 @@ public static List generateQueryInsightRecords(int lower, int attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + attributes.put( + Attribute.TASK_RESOURCE_USAGES, + List.of( + new TaskResourceInfo( + randomAlphaOfLengthBetween(5, 10), + randomLongBetween(1, 1000), + randomLongBetween(1, 1000), + randomAlphaOfLengthBetween(5, 10), + new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) + ), + new TaskResourceInfo( + randomAlphaOfLengthBetween(5, 10), + randomLongBetween(1, 1000), + randomLongBetween(1, 1000), + randomAlphaOfLengthBetween(5, 10), + new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) + ) + ) + ); records.add(new SearchQueryRecord(timestamp, measurements, attributes)); timestamp += interval; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index 86de44c680188..051a7105a0dc0 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -32,6 +32,7 @@ import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -65,6 +66,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); private final TopQueriesService topQueriesService = mock(TopQueriesService.class); + private final TaskResourceTrackingService taskResourceTrackingService = mock(TaskResourceTrackingService.class); private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool"); private ClusterService clusterService; @@ -77,6 +79,7 @@ public void setup() { ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2)); clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); ClusterServiceUtils.setState(clusterService, state); + clusterService.setTaskResourceTrackingService(taskResourceTrackingService); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); @@ -139,6 +142,7 @@ public void testOnRequestEnd() throws InterruptedException { assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE)); Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS); assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID)); + verify(taskResourceTrackingService, times(1)).refreshResourceStats(task); } public void testConcurrentOnRequestEnd() throws InterruptedException { @@ -200,6 +204,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { countDownLatch.await(); verify(queryInsightsService, times(numRequests)).addRecord(any()); + verify(taskResourceTrackingService, times(numRequests)).refreshResourceStats(task); } public void testSetEnabled() { From 14ff852355f9b41c9bcfc4b4d2d88676b1b65c72 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Fri, 12 Jul 2024 12:48:28 -0700 Subject: [PATCH 4/4] remove query insights plugin related code Signed-off-by: Chenyang Ji --- .../core/listener/QueryInsightsListener.java | 3 - .../insights/rules/model/Attribute.java | 70 ------------------- .../rules/model/SearchQueryRecord.java | 8 +-- .../insights/QueryInsightsTestUtils.java | 21 ------ .../listener/QueryInsightsListenerTests.java | 5 -- 5 files changed, 2 insertions(+), 105 deletions(-) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 63ed7a51da896..a1f810ad5987c 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -145,9 +145,6 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { SearchTask searchTask = context.getTask(); List tasksResourceUsages = searchRequestContext.getPhaseResourceUsage(); - if (clusterService.getTaskResourceTrackingService() != null) { - clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask); - } tasksResourceUsages.add( new TaskResourceInfo( searchTask.getAction(), diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index 80c80fb6b6937..dcdb085fdc6fa 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -8,18 +8,11 @@ package org.opensearch.plugin.insights.rules.model; -import org.apache.lucene.util.ArrayUtil; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Locale; -import java.util.Map; /** * Valid attributes for a search query record @@ -82,69 +75,6 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO out.writeString(attribute.toString()); } - /** - * Write Attribute value to a StreamOutput - * @param out the StreamOutput to write - * @param attributeValue the Attribute value to write - */ - @SuppressWarnings("unchecked") - public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException { - if (attributeValue instanceof List) { - out.writeList((List) attributeValue); - } else { - out.writeGenericValue(attributeValue); - } - } - - /** - * Read attribute value from the input stream given the Attribute type - * - * @param in the {@link StreamInput} input to read - * @param attribute attribute type to differentiate between Source and others - * @return parse value - * @throws IOException IOException - */ - public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException { - if (attribute == Attribute.TASK_RESOURCE_USAGES) { - return in.readList(TaskResourceInfo::readFromStream); - } else { - return in.readGenericValue(); - } - } - - /** - * Read attribute map from the input stream - * - * @param in the {@link StreamInput} to read - * @return parsed attribute map - * @throws IOException IOException - */ - public static Map readAttributeMap(StreamInput in) throws IOException { - int size = readArraySize(in); - if (size == 0) { - return Collections.emptyMap(); - } - Map map = new HashMap<>(size); - - for (int i = 0; i < size; i++) { - Attribute key = readFromStream(in); - Object value = readAttributeValue(in, key); - map.put(key, value); - } - return map; - } - - private static int readArraySize(StreamInput in) throws IOException { - final int arraySize = in.readVInt(); - if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) { - throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize); - } - if (arraySize < 0) { - throw new NegativeArraySizeException("array size must be positive but was: " + arraySize); - } - return arraySize; - } - @Override public String toString() { return this.name().toLowerCase(Locale.ROOT); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index a6e6b4a9051f0..fec00a680ae58 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -45,7 +45,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce measurements = new HashMap<>(); in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); - this.attributes = Attribute.readAttributeMap(in); + this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); } /** @@ -134,11 +134,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten public void writeTo(final StreamOutput out) throws IOException { out.writeLong(timestamp); out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); - out.writeMap( - attributes, - (stream, attribute) -> Attribute.writeTo(out, attribute), - (stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue) - ); + out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); } /** diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 54cafdc97ea74..7fa4e9841c20e 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -12,8 +12,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.util.Maps; -import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; -import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; @@ -82,25 +80,6 @@ public static List generateQueryInsightRecords(int lower, int attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); - attributes.put( - Attribute.TASK_RESOURCE_USAGES, - List.of( - new TaskResourceInfo( - randomAlphaOfLengthBetween(5, 10), - randomLongBetween(1, 1000), - randomLongBetween(1, 1000), - randomAlphaOfLengthBetween(5, 10), - new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) - ), - new TaskResourceInfo( - randomAlphaOfLengthBetween(5, 10), - randomLongBetween(1, 1000), - randomLongBetween(1, 1000), - randomAlphaOfLengthBetween(5, 10), - new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) - ) - ) - ); records.add(new SearchQueryRecord(timestamp, measurements, attributes)); timestamp += interval; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index 051a7105a0dc0..86de44c680188 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -32,7 +32,6 @@ import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -66,7 +65,6 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); private final TopQueriesService topQueriesService = mock(TopQueriesService.class); - private final TaskResourceTrackingService taskResourceTrackingService = mock(TaskResourceTrackingService.class); private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool"); private ClusterService clusterService; @@ -79,7 +77,6 @@ public void setup() { ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2)); clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); ClusterServiceUtils.setState(clusterService, state); - clusterService.setTaskResourceTrackingService(taskResourceTrackingService); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); @@ -142,7 +139,6 @@ public void testOnRequestEnd() throws InterruptedException { assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE)); Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS); assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID)); - verify(taskResourceTrackingService, times(1)).refreshResourceStats(task); } public void testConcurrentOnRequestEnd() throws InterruptedException { @@ -204,7 +200,6 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { countDownLatch.await(); verify(queryInsightsService, times(numRequests)).addRecord(any()); - verify(taskResourceTrackingService, times(numRequests)).refreshResourceStats(task); } public void testSetEnabled() {