Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add task resource tracking service to cluster service #14681

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change to attributes.put(Attribute.SOURCE, request.source()); here itself so we can use this in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe let's do it as part of your PR? to ensure this PR only contains the necessary urgent fixes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks!

Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque
private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
SearchTask searchTask = context.getTask();
List<TaskResourceInfo> tasksResourceUsages = searchRequestContext.getPhaseResourceUsage();
if (clusterService.getTaskResourceTrackingService() != null) {
clusterService.getTaskResourceTrackingService().refreshResourceStats(searchTask);
}
tasksResourceUsages.add(
new TaskResourceInfo(
searchTask.getAction(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@

package org.opensearch.plugin.insights.rules.model;

import org.apache.lucene.util.ArrayUtil;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
* Valid attributes for a search query record
Expand Down Expand Up @@ -75,6 +82,69 @@
out.writeString(attribute.toString());
}

/**
* Write Attribute value to a StreamOutput
* @param out the StreamOutput to write
* @param attributeValue the Attribute value to write
*/
@SuppressWarnings("unchecked")
public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException {
if (attributeValue instanceof List) {
out.writeList((List<? extends Writeable>) attributeValue);
} else {
out.writeGenericValue(attributeValue);
}
}

/**
* Read attribute value from the input stream given the Attribute type
*
* @param in the {@link StreamInput} input to read
* @param attribute attribute type to differentiate between Source and others
* @return parse value
* @throws IOException IOException
*/
public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException {
if (attribute == Attribute.TASK_RESOURCE_USAGES) {
return in.readList(TaskResourceInfo::readFromStream);
} else {
return in.readGenericValue();
}
}

/**
* Read attribute map from the input stream
*
* @param in the {@link StreamInput} to read
* @return parsed attribute map
* @throws IOException IOException
*/
public static Map<Attribute, Object> readAttributeMap(StreamInput in) throws IOException {
int size = readArraySize(in);
if (size == 0) {
return Collections.emptyMap();

Check warning on line 125 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java#L125

Added line #L125 was not covered by tests
}
Map<Attribute, Object> map = new HashMap<>(size);

for (int i = 0; i < size; i++) {
Attribute key = readFromStream(in);
Object value = readAttributeValue(in, key);
map.put(key, value);
}
return map;
}

private static int readArraySize(StreamInput in) throws IOException {
final int arraySize = in.readVInt();
if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) {
throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize);

Check warning on line 140 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java#L140

Added line #L140 was not covered by tests
}
if (arraySize < 0) {
throw new NegativeArraySizeException("array size must be positive but was: " + arraySize);

Check warning on line 143 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java#L143

Added line #L143 was not covered by tests
}
return arraySize;
}

@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce
measurements = new HashMap<>();
in.readMap(MetricType::readFromStream, StreamInput::readGenericValue)
.forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o))));
this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue);
this.attributes = Attribute.readAttributeMap(in);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has something changed in the ser/deser logic? If yes, can that cause issue while upgrading from 2.15 to 2.16?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jainankitk ! Actually this is a bug we didn't caught. readGenericValue won't be able to read List<CustomObject>, we need to write our own reader in that case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I took this from Siddhant's PR when moving categorization code to the plugin: https://github.com/opensearch-project/OpenSearch/pull/14528/files#r1669900434

Copy link
Contributor

@deshsidd deshsidd Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jainankitk This was also discussed in detail in the PR to move categorization to plugin : https://github.com/opensearch-project/OpenSearch/pull/14528/files#r1669900434

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the details! Given the serialization was failing already, should not result into compatibility issues.

}

/**
Expand Down Expand Up @@ -134,7 +134,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten
public void writeTo(final StreamOutput out) throws IOException {
out.writeLong(timestamp);
out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue);
out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue);
out.writeMap(
attributes,
(stream, attribute) -> Attribute.writeTo(out, attribute),
(stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes for the write part as well

);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.util.Maps;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries;
Expand Down Expand Up @@ -80,6 +82,25 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(int lower, int
attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100));
attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10)));
attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap);
attributes.put(
Attribute.TASK_RESOURCE_USAGES,
List.of(
new TaskResourceInfo(
randomAlphaOfLengthBetween(5, 10),
randomLongBetween(1, 1000),
randomLongBetween(1, 1000),
randomAlphaOfLengthBetween(5, 10),
new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000))
),
new TaskResourceInfo(
randomAlphaOfLengthBetween(5, 10),
randomLongBetween(1, 1000),
randomLongBetween(1, 1000),
randomAlphaOfLengthBetween(5, 10),
new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000))
)
)
);

records.add(new SearchQueryRecord(timestamp, measurements, attributes));
timestamp += interval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
private final SearchRequest searchRequest = mock(SearchRequest.class);
private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class);
private final TopQueriesService topQueriesService = mock(TopQueriesService.class);
private final TaskResourceTrackingService taskResourceTrackingService = mock(TaskResourceTrackingService.class);
private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool");
private ClusterService clusterService;

Expand All @@ -77,6 +79,7 @@ public void setup() {
ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2));
clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings);
ClusterServiceUtils.setState(clusterService, state);
clusterService.setTaskResourceTrackingService(taskResourceTrackingService);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);

Expand Down Expand Up @@ -139,6 +142,7 @@ public void testOnRequestEnd() throws InterruptedException {
assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
Map<String, String> labels = (Map<String, String>) generatedRecord.getAttributes().get(Attribute.LABELS);
assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
verify(taskResourceTrackingService, times(1)).refreshResourceStats(task);
}

public void testConcurrentOnRequestEnd() throws InterruptedException {
Expand Down Expand Up @@ -200,6 +204,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
countDownLatch.await();

verify(queryInsightsService, times(numRequests)).addRecord(any());
verify(taskResourceTrackingService, times(numRequests)).refreshResourceStats(task);
}

public void testSetEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.node.Node;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -92,6 +93,7 @@ public class ClusterService extends AbstractLifecycleComponent {
private RerouteService rerouteService;

private IndexingPressureService indexingPressureService;
private TaskResourceTrackingService taskResourceTrackingService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
Expand Down Expand Up @@ -265,6 +267,24 @@ public IndexingPressureService getIndexingPressureService() {
return indexingPressureService;
}

/**
* Getter for {@link TaskResourceTrackingService}, This method exposes task level resource usage for other components to use.
*
* @return TaskResourceTrackingService
*/
public TaskResourceTrackingService getTaskResourceTrackingService() {
return taskResourceTrackingService;
}

/**
* Setter for {@link TaskResourceTrackingService}
*
* @param taskResourceTrackingService taskResourceTrackingService
*/
public void setTaskResourceTrackingService(TaskResourceTrackingService taskResourceTrackingService) {
this.taskResourceTrackingService = taskResourceTrackingService;
}

public ClusterApplierService getClusterApplierService() {
return clusterApplierService;
}
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,7 @@ protected Node(
clusterService.getClusterSettings(),
threadPool
);
clusterService.setTaskResourceTrackingService(taskResourceTrackingService);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just initialize TaskResourceTrackingService in ClusterService then as we're creating a dependency here anyways.

Also should SetOnce be used?


final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -51,6 +52,7 @@
/**
* Service that helps track resource usage of tasks running on a node.
*/
@PublicApi(since = "2.16.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you are marking this public in that it is now exposed publicly from ClusterService. Is this the right level of visibility for this service? Can we instead expose only the required functionality from ClusterService instead the whole of TaskResourceTrackingService? and/or thinking this maybe should be a separate interface for plugins not exposed through ClusterService?

Copy link
Member Author

@ansjcy ansjcy Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do a wrapper of refreshResourceStats in clusterservice, but one argument could be, does it belong to cluster service? In the right level of encapsulation, task level resource usages related operations should only belong to TaskResourceTrackingService. But I agree making the whole service public is also risky. I'm open to suggestions. cc @reta

Copy link
Collaborator

@reta reta Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ansjcy the TaskResourceTrackingService is internal to OpenSearch (and has no relation to the ClusterService either), should not be exposed to the plugins. Regarding to the issue itself:

Currently we are not refreshing task level resource usages on coordinator node for searchTasks, which means all coordinator node resource usage will be 0.

That seem to be the problem that core implementation has to fix, why the task level resources are not refreshed (on coordinator node)?

Copy link
Member Author

@ansjcy ansjcy Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @reta ! thanks for the input. Currently the TaskResourceTrackingService only refreshes task usages when a task ends. But in our case we want to get the resource usage in a SearchOperationsListener, which will be triggered before a task finishes.

Let me think about this more. Instead of exposing the TaskResourceTrackingSevice, does it make sense to you If we can do a usage refresh in core before the listeners are called (in this function: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java#L469)?

Copy link
Collaborator

@reta reta Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ansjcy

thanks for the input. Currently the TaskResourceTrackingService only refreshes task usages when a task ends.

It does not sound right, the task is still ongoing so its usage won't be correct.

Let me think about this more. Instead of exposing the TaskResourceTrackingSevice, does it make sense to you If we can do a usage refresh in core before the listeners are called (in this function:

The logical point (at least to me) of capturing task usages seems to be the moment task ends. It looks to me you are trying to chime in somewhere in between (while task is still executing), that does not look like the correct way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we add per-request listener instance in the TransportSearchAction::executeRequest. The query insights plugin has nothing to do with it at this moment of time, but we capture the search task resource usage upon request completion, so the tracking data becomes available to everyone (including the query insights plugin). Does it make sense?

@reta We are currently working on a PR based on the above discussion.

For the last point you made, what other justification and work is required to make the API public? We are trying to get all the query insights changes in 2.16 and this is the only PR that is dangling currently. Want to make sure we reach a path forward. Please let us know your suggestions. In the meantime will finalize the above draft PR if there are not concerns with this approach?

Copy link
Collaborator

@reta reta Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @deshsidd

For the last point you made, what other justification and work is required to make the API public?

I did not design the original APIs, you may ask the contributor if he has any concerns. On the second point, if you need to make it public, apply the @PublicApi annotation accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @reta!
cc @buddharajusahil @sgup432 @dzane17 @jainankitk Please let us know your thoughts since you all have contributed to the file.

@reta Looks like you had initially reduced the visibility of the API.

For now I am going to continue with the approach that Reta and Chenyang had discussed above and work on the following PR. Will also make the SearchRequestOperationsListener @publicapi as part of these changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta Looks like you had initially reduced the visibility of the API.

@deshsidd yes, you will understand why once try to apply @PublicApi to it :-) : it pulls a pile of dependencies with it .... (I am very doubtful it was designed as being public in the first place).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood.
@reta and others - Please take a look : #14832 and let me know your thoughts.

@SuppressForbidden(reason = "ThreadMXBean#getThreadAllocatedBytes")
public class TaskResourceTrackingService implements RunnableTaskExecutionListener {

Expand Down Expand Up @@ -357,6 +359,7 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() {
/**
* Listener that gets invoked when a task execution completes.
*/
@PublicApi(since = "2.16.0")
public interface TaskCompletionListener {
void onTaskCompleted(Task task);
}
Expand Down
Loading