diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 63ed7a51da896..a1f810ad5987c 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -145,9 +145,6 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { SearchTask searchTask = context.getTask(); List tasksResourceUsages = searchRequestContext.getPhaseResourceUsage(); - if (clusterService.getTaskResourceTrackingService() != null) { - clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask); - } tasksResourceUsages.add( new TaskResourceInfo( searchTask.getAction(), diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index 80c80fb6b6937..dcdb085fdc6fa 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -8,18 +8,11 @@ package org.opensearch.plugin.insights.rules.model; -import org.apache.lucene.util.ArrayUtil; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Locale; -import java.util.Map; /** * Valid attributes for a search query record @@ -82,69 +75,6 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO out.writeString(attribute.toString()); } - /** - * Write Attribute value to a StreamOutput - * @param out the StreamOutput to write - * @param attributeValue the Attribute value to write - */ - @SuppressWarnings("unchecked") - public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException { - if (attributeValue instanceof List) { - out.writeList((List) attributeValue); - } else { - out.writeGenericValue(attributeValue); - } - } - - /** - * Read attribute value from the input stream given the Attribute type - * - * @param in the {@link StreamInput} input to read - * @param attribute attribute type to differentiate between Source and others - * @return parse value - * @throws IOException IOException - */ - public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException { - if (attribute == Attribute.TASK_RESOURCE_USAGES) { - return in.readList(TaskResourceInfo::readFromStream); - } else { - return in.readGenericValue(); - } - } - - /** - * Read attribute map from the input stream - * - * @param in the {@link StreamInput} to read - * @return parsed attribute map - * @throws IOException IOException - */ - public static Map readAttributeMap(StreamInput in) throws IOException { - int size = readArraySize(in); - if (size == 0) { - return Collections.emptyMap(); - } - Map map = new HashMap<>(size); - - for (int i = 0; i < size; i++) { - Attribute key = readFromStream(in); - Object value = readAttributeValue(in, key); - map.put(key, value); - } - return map; - } - - private static int readArraySize(StreamInput in) throws IOException { - final int arraySize = in.readVInt(); - if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) { - throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize); - } - if (arraySize < 0) { - throw new NegativeArraySizeException("array size must be positive but was: " + arraySize); - } - return arraySize; - } - @Override public String toString() { return this.name().toLowerCase(Locale.ROOT); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index a6e6b4a9051f0..fec00a680ae58 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -45,7 +45,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce measurements = new HashMap<>(); in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); - this.attributes = Attribute.readAttributeMap(in); + this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); } /** @@ -134,11 +134,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten public void writeTo(final StreamOutput out) throws IOException { out.writeLong(timestamp); out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); - out.writeMap( - attributes, - (stream, attribute) -> Attribute.writeTo(out, attribute), - (stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue) - ); + out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); } /** diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 54cafdc97ea74..7fa4e9841c20e 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -12,8 +12,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.util.Maps; -import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; -import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; @@ -82,25 +80,6 @@ public static List generateQueryInsightRecords(int lower, int attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); - attributes.put( - Attribute.TASK_RESOURCE_USAGES, - List.of( - new TaskResourceInfo( - randomAlphaOfLengthBetween(5, 10), - randomLongBetween(1, 1000), - randomLongBetween(1, 1000), - randomAlphaOfLengthBetween(5, 10), - new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) - ), - new TaskResourceInfo( - randomAlphaOfLengthBetween(5, 10), - randomLongBetween(1, 1000), - randomLongBetween(1, 1000), - randomAlphaOfLengthBetween(5, 10), - new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) - ) - ) - ); records.add(new SearchQueryRecord(timestamp, measurements, attributes)); timestamp += interval; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index 051a7105a0dc0..86de44c680188 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -32,7 +32,6 @@ import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -66,7 +65,6 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); private final TopQueriesService topQueriesService = mock(TopQueriesService.class); - private final TaskResourceTrackingService taskResourceTrackingService = mock(TaskResourceTrackingService.class); private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool"); private ClusterService clusterService; @@ -79,7 +77,6 @@ public void setup() { ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2)); clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); ClusterServiceUtils.setState(clusterService, state); - clusterService.setTaskResourceTrackingService(taskResourceTrackingService); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); @@ -142,7 +139,6 @@ public void testOnRequestEnd() throws InterruptedException { assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE)); Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS); assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID)); - verify(taskResourceTrackingService, times(1)).refreshResourceStats(task); } public void testConcurrentOnRequestEnd() throws InterruptedException { @@ -204,7 +200,6 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { countDownLatch.await(); verify(queryInsightsService, times(numRequests)).addRecord(any()); - verify(taskResourceTrackingService, times(numRequests)).refreshResourceStats(task); } public void testSetEnabled() {