From 9c241b78e94906ca5aff511da00f7d3d4886169d Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Tue, 3 Sep 2024 15:58:09 -0700 Subject: [PATCH] add logic for stats api Signed-off-by: Ruirui Zhang --- CHANGELOG.md | 1 + .../org/opensearch/action/ActionModule.java | 5 + .../cluster/wlm/QueryGroupStatsAction.java | 25 ++++ .../cluster/wlm/QueryGroupStatsRequest.java | 44 +++++++ .../cluster/wlm/QueryGroupStatsResponse.java | 71 +++++++++++ .../wlm/TransportQueryGroupStatsAction.java | 110 +++++++++++++++++ .../admin/cluster/wlm/package-info.java | 10 ++ .../opensearch/client/ClusterAdminClient.java | 10 ++ .../client/support/AbstractClient.java | 8 ++ .../main/java/org/opensearch/node/Node.java | 54 +++++---- .../cluster/RestQueryGroupStatsAction.java | 50 ++++++++ .../org/opensearch/wlm/QueryGroupService.java | 15 ++- ...orkloadManagementTransportInterceptor.java | 18 ++- .../opensearch/wlm/stats/QueryGroupStats.java | 9 +- .../TransportQueryGroupStatsActionTests.java | 112 ++++++++++++++++++ ...adManagementTransportInterceptorTests.java | 4 +- ...anagementTransportRequestHandlerTests.java | 9 +- ...eryGroupRequestOperationListenerTests.java | 9 +- .../wlm/stats/QueryGroupStatsTests.java | 7 +- 19 files changed, 525 insertions(+), 46 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportQueryGroupStatsAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/wlm/package-info.java create mode 100644 server/src/main/java/org/opensearch/rest/action/admin/cluster/RestQueryGroupStatsAction.java create mode 100644 server/src/test/java/org/opensearch/action/support/nodes/TransportQueryGroupStatsActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 409e152ac60af..b5e5cb1d94a02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711)) - Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) - [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709)) +- [Workload Management] Add QueryGroup Stats API Logic ([15777](https://github.com/opensearch-project/OpenSearch/pull/15777)) - [Workload Management] Add Settings for Workload Management feature ([#15028](https://github.com/opensearch-project/OpenSearch/pull/15028)) - [Workload Management] Add Update QueryGroup API Logic ([#14775](https://github.com/opensearch-project/OpenSearch/pull/14775)) - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index fbf90b97d1e8f..9713b2ea7496e 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -123,6 +123,8 @@ import org.opensearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction; import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksAction; import org.opensearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction; +import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsAction; +import org.opensearch.action.admin.cluster.wlm.TransportQueryGroupStatsAction; import org.opensearch.action.admin.indices.alias.IndicesAliasesAction; import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.opensearch.action.admin.indices.alias.TransportIndicesAliasesAction; @@ -367,6 +369,7 @@ import org.opensearch.rest.action.admin.cluster.RestPendingClusterTasksAction; import org.opensearch.rest.action.admin.cluster.RestPutRepositoryAction; import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction; +import org.opensearch.rest.action.admin.cluster.RestQueryGroupStatsAction; import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction; import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction; import org.opensearch.rest.action.admin.cluster.RestRemoteStoreStatsAction; @@ -611,6 +614,7 @@ public void reg actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class); actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class); actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class); + actions.register(QueryGroupStatsAction.INSTANCE, TransportQueryGroupStatsAction.class); actions.register(RemoteStoreStatsAction.INSTANCE, TransportRemoteStoreStatsAction.class); actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class); actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class); @@ -812,6 +816,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestClearVotingConfigExclusionsAction()); registerHandler.accept(new RestMainAction()); registerHandler.accept(new RestNodesInfoAction(settingsFilter)); + registerHandler.accept(new RestQueryGroupStatsAction()); registerHandler.accept(new RestRemoteClusterInfoAction()); registerHandler.accept(new RestNodesStatsAction()); registerHandler.accept(new RestNodesUsageAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsAction.java new file mode 100644 index 0000000000000..bb865aab28521 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.wlm; + +import org.opensearch.action.ActionType; + +/** + * Transport action for obtaining QueryGroup Stats. + * + * @opensearch.experimental + */ +public class QueryGroupStatsAction extends ActionType { + public static final QueryGroupStatsAction INSTANCE = new QueryGroupStatsAction(); + public static final String NAME = "cluster:monitor/query_group_stats"; + + private QueryGroupStatsAction() { + super(NAME, QueryGroupStatsResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsRequest.java new file mode 100644 index 0000000000000..1645ef59cdae1 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsRequest.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.wlm; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A request to get QueryGroupStats + */ +@ExperimentalApi +public class QueryGroupStatsRequest extends BaseNodesRequest { + + protected QueryGroupStatsRequest(StreamInput in) throws IOException { + super(in); + } + + public QueryGroupStatsRequest() { + super(false, (String[]) null); + } + + /** + * Get QueryGroup stats from nodes based on the nodes ids specified. If none are passed, stats + * for all nodes will be returned. + */ + public QueryGroupStatsRequest(String... nodesIds) { + super(nodesIds); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponse.java new file mode 100644 index 0000000000000..8fa958a97634f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponse.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.wlm; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.wlm.stats.QueryGroupStats; + +import java.io.IOException; +import java.util.List; + +/** + * A response for obtaining QueryGroupStats + */ +@ExperimentalApi +public class QueryGroupStatsResponse extends BaseNodesResponse implements ToXContentFragment { + + QueryGroupStatsResponse(StreamInput in) throws IOException { + super(in); + } + + QueryGroupStatsResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(QueryGroupStats::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + for (QueryGroupStats queryGroupStats : getNodes()) { + builder.startObject(queryGroupStats.getNode().getId()); + queryGroupStats.toXContent(builder, params); + builder.endObject(); + } + return builder; + } + + @Override + public String toString() { + try { + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return builder.toString(); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportQueryGroupStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportQueryGroupStatsAction.java new file mode 100644 index 0000000000000..56df96c1273db --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/TransportQueryGroupStatsAction.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.wlm; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; +import org.opensearch.wlm.QueryGroupService; +import org.opensearch.wlm.stats.QueryGroupStats; + +import java.io.IOException; +import java.util.List; + +/** + * Transport action for obtaining QueryGroupStats + * + * @opensearch.experimental + */ +public class TransportQueryGroupStatsAction extends TransportNodesAction< + QueryGroupStatsRequest, + QueryGroupStatsResponse, + TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest, + QueryGroupStats> { + + QueryGroupService queryGroupService; + + @Inject + public TransportQueryGroupStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + QueryGroupService queryGroupService, + ActionFilters actionFilters + ) { + super( + QueryGroupStatsAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + QueryGroupStatsRequest::new, + NodeQueryGroupStatsRequest::new, + ThreadPool.Names.MANAGEMENT, + QueryGroupStats.class + ); + this.queryGroupService = queryGroupService; + } + + @Override + protected QueryGroupStatsResponse newResponse( + QueryGroupStatsRequest request, + List queryGroupStats, + List failures + ) { + return new QueryGroupStatsResponse(clusterService.getClusterName(), queryGroupStats, failures); + } + + @Override + protected NodeQueryGroupStatsRequest newNodeRequest(QueryGroupStatsRequest request) { + return new NodeQueryGroupStatsRequest(request); + } + + @Override + protected QueryGroupStats newNodeResponse(StreamInput in) throws IOException { + return new QueryGroupStats(in); + } + + @Override + protected QueryGroupStats nodeOperation(NodeQueryGroupStatsRequest nodeQueryGroupStatsRequest) { + return queryGroupService.nodeStats(); + } + + /** + * Inner QueryGroupStatsRequest + * + * @opensearch.experimental + */ + public static class NodeQueryGroupStatsRequest extends TransportRequest { + + protected QueryGroupStatsRequest request; + + public NodeQueryGroupStatsRequest(StreamInput in) throws IOException { + super(in); + request = new QueryGroupStatsRequest(in); + } + + NodeQueryGroupStatsRequest(QueryGroupStatsRequest request) { + this.request = request; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/wlm/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/package-info.java new file mode 100644 index 0000000000000..0dc98d45c5e5f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** QueryGroupStats transport handlers. */ +package org.opensearch.action.admin.cluster.wlm; diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 05f09c1a6e661..ac43e53e9b4b7 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -137,6 +137,8 @@ import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest; import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder; import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest; +import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsResponse; import org.opensearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest; import org.opensearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest; import org.opensearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest; @@ -320,6 +322,14 @@ public interface ClusterAdminClient extends OpenSearchClient { */ NodesStatsRequestBuilder prepareNodesStats(String... nodesIds); + /** + * QueryGroup stats of the cluster. + * + * @param request The QueryGroupStatsRequest + * @param listener A listener to be notified with a result + */ + void queryGroupStats(QueryGroupStatsRequest request, ActionListener listener); + void remoteStoreStats(RemoteStoreStatsRequest request, ActionListener listener); RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId); diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 509cd732357d6..9bd6c76ef829c 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -179,6 +179,9 @@ import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest; import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder; import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsAction; +import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest; +import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsResponse; import org.opensearch.action.admin.indices.alias.IndicesAliasesAction; import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.opensearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; @@ -918,6 +921,11 @@ public NodesStatsRequestBuilder prepareNodesStats(String... nodesIds) { return new NodesStatsRequestBuilder(this, NodesStatsAction.INSTANCE).setNodesIds(nodesIds); } + @Override + public void queryGroupStats(final QueryGroupStatsRequest request, final ActionListener listener) { + execute(QueryGroupStatsAction.INSTANCE, request, listener); + } + @Override public void remoteStoreStats(final RemoteStoreStatsRequest request, final ActionListener listener) { execute(RemoteStoreStatsAction.INSTANCE, request, listener); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a8d4ebcf23dab..7592282e8df05 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1026,29 +1026,6 @@ protected Node( List identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class); identityService.initializeIdentityAwarePlugins(identityAwarePlugins); - final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the - // queryGroupService - final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener( - queryGroupService, - threadPool - ); - - // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory - final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = - new SearchRequestOperationsCompositeListenerFactory( - Stream.concat( - Stream.of( - searchRequestStats, - searchRequestSlowLog, - searchTaskRequestOperationsListener, - queryGroupRequestOperationListener - ), - pluginComponents.stream() - .filter(p -> p instanceof SearchRequestOperationsListener) - .map(p -> (SearchRequestOperationsListener) p) - ).toArray(SearchRequestOperationsListener[]::new) - ); - ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), @@ -1091,9 +1068,11 @@ protected Node( admissionControlService ); + SetOnce queryGroupServiceSetOnce = new SetOnce<>(); + WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor( threadPool, - new QueryGroupService() // We will need to replace this with actual implementation + queryGroupServiceSetOnce // We will need to replace this with actual implementation ); final Collection secureSettingsFactories = pluginsService.filterPlugins(Plugin.class) @@ -1155,6 +1134,31 @@ protected Node( taskHeaders, tracer ); + final QueryGroupService queryGroupService = new QueryGroupService(transportService); // We will need to replace this with actual + // instance of the + // queryGroupService + queryGroupServiceSetOnce.set(queryGroupService); + + final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener( + queryGroupService, + threadPool + ); + + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory + final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = + new SearchRequestOperationsCompositeListenerFactory( + Stream.concat( + Stream.of( + searchRequestStats, + searchRequestSlowLog, + searchTaskRequestOperationsListener, + queryGroupRequestOperationListener + ), + pluginComponents.stream() + .filter(p -> p instanceof SearchRequestOperationsListener) + .map(p -> (SearchRequestOperationsListener) p) + ).toArray(SearchRequestOperationsListener[]::new) + ); TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings()); transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer); this.extensionsManager.initializeServicesAndRestHandler( @@ -1470,6 +1474,7 @@ protected Node( b.bind(SearchRequestStats.class).toInstance(searchRequestStats); b.bind(SearchRequestSlowLog.class).toInstance(searchRequestSlowLog); b.bind(MetricsRegistry.class).toInstance(metricsRegistry); + b.bind(QueryGroupService.class).toInstance(queryGroupService); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader); b.bind(RemoteStorePinnedTimestampService.class).toProvider(() -> remoteStorePinnedTimestampService); @@ -1478,7 +1483,6 @@ protected Node( b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); b.bind(SegmentReplicator.class).toInstance(segmentReplicator); - taskManagerClientOptional.ifPresent(value -> b.bind(TaskManagerClient.class).toInstance(value)); }); injector = modules.createInjector(); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestQueryGroupStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestQueryGroupStatsAction.java new file mode 100644 index 0000000000000..66eb04bf2ff27 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestQueryGroupStatsAction.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestActions; + +import java.io.IOException; +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Transport action to get QueryGroup stats + * + * @opensearch.experimental + */ +public class RestQueryGroupStatsAction extends BaseRestHandler { + + @Override + public List routes() { + return unmodifiableList(asList(new Route(GET, "/_wlm/query_group_stats"), new Route(GET, "/_wlm/query_group_stats/{nodeId}"))); + } + + @Override + public String getName() { + return "query_group_stats_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + QueryGroupStatsRequest queryGroupStatsRequest = new QueryGroupStatsRequest(nodesIds); + return channel -> client.admin() + .cluster() + .queryGroupStats(queryGroupStatsRequest, new RestActions.NodesResponseRestListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 6545598dd9951..26624ece16ec7 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -8,7 +8,9 @@ package org.opensearch.wlm; +import org.opensearch.common.inject.Inject; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.transport.TransportService; import org.opensearch.wlm.stats.QueryGroupState; import org.opensearch.wlm.stats.QueryGroupStats; import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder; @@ -18,17 +20,22 @@ /** * As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes + * @opensearch.experimental */ public class QueryGroupService { // This map does not need to be concurrent since we will process the cluster state change serially and update // this map with new additions and deletions of entries. QueryGroupState is thread safe private final Map queryGroupStateMap; + private final TransportService transportService; - public QueryGroupService() { - this(new HashMap<>()); + @Inject + public QueryGroupService(TransportService transportService) { + this(transportService, new HashMap<>()); } - public QueryGroupService(Map queryGroupStateMap) { + @Inject + public QueryGroupService(TransportService transportService, Map queryGroupStateMap) { + this.transportService = transportService; this.queryGroupStateMap = queryGroupStateMap; } @@ -59,7 +66,7 @@ public QueryGroupStats nodeStats() { statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState)); } - return new QueryGroupStats(statsHolderMap); + return new QueryGroupStats(transportService.getLocalNode(), statsHolderMap); } /** diff --git a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java index d382b4c729a38..1f08d8722e349 100644 --- a/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/wlm/WorkloadManagementTransportInterceptor.java @@ -8,6 +8,7 @@ package org.opensearch.wlm; +import org.opensearch.common.SetOnce; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; @@ -20,11 +21,11 @@ */ public class WorkloadManagementTransportInterceptor implements TransportInterceptor { private final ThreadPool threadPool; - private final QueryGroupService queryGroupService; + private final SetOnce queryGroupService; - public WorkloadManagementTransportInterceptor(final ThreadPool threadPool, final QueryGroupService queryGroupService) { + public WorkloadManagementTransportInterceptor(final ThreadPool threadPool, final SetOnce queryGroupServiceSetOnce) { this.threadPool = threadPool; - this.queryGroupService = queryGroupService; + this.queryGroupService = queryGroupServiceSetOnce; } @Override @@ -45,9 +46,13 @@ public static class RequestHandler implements Transp private final ThreadPool threadPool; TransportRequestHandler actualHandler; - private final QueryGroupService queryGroupService; + private final SetOnce queryGroupService; - public RequestHandler(ThreadPool threadPool, TransportRequestHandler actualHandler, QueryGroupService queryGroupService) { + public RequestHandler( + ThreadPool threadPool, + TransportRequestHandler actualHandler, + SetOnce queryGroupService + ) { this.threadPool = threadPool; this.actualHandler = actualHandler; this.queryGroupService = queryGroupService; @@ -58,7 +63,8 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro if (isSearchWorkloadRequest(task)) { ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId(); - queryGroupService.rejectIfNeeded(queryGroupId); + assert queryGroupService.get() != null; + queryGroupService.get().rejectIfNeeded(queryGroupId); } actualHandler.messageReceived(request, channel, task); } diff --git a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java index 2b389c2167778..f3ad22fcc5cbb 100644 --- a/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java +++ b/server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java @@ -8,6 +8,8 @@ package org.opensearch.wlm.stats; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -37,19 +39,22 @@ * ... * } */ -public class QueryGroupStats implements ToXContentObject, Writeable { +public class QueryGroupStats extends BaseNodeResponse implements ToXContentObject, Writeable { private final Map stats; - public QueryGroupStats(Map stats) { + public QueryGroupStats(DiscoveryNode node, Map stats) { + super(node); this.stats = stats; } public QueryGroupStats(StreamInput in) throws IOException { + super(in); stats = in.readMap(StreamInput::readString, QueryGroupStatsHolder::new); } @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeMap(stats, StreamOutput::writeString, QueryGroupStatsHolder::writeTo); } diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportQueryGroupStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportQueryGroupStatsActionTests.java new file mode 100644 index 0000000000000..6cad8734082dc --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportQueryGroupStatsActionTests.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.nodes; + +import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest; +import org.opensearch.action.admin.cluster.wlm.TransportQueryGroupStatsAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.opensearch.wlm.QueryGroupService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +public class TransportQueryGroupStatsActionTests extends TransportNodesActionTests { + + /** + * We don't want to send discovery nodes list to each request that is sent across from the coordinator node. + * This behavior is asserted in this test. + */ + public void testQueryGroupStatsActionWithRetentionOfDiscoveryNodesList() { + QueryGroupStatsRequest request = new QueryGroupStatsRequest(); + Map> combinedSentRequest = performQueryGroupStatsAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); }); + }); + } + + private Map> performQueryGroupStatsAction(QueryGroupStatsRequest request) { + TransportNodesAction action = getTestTransportQueryGroupStatsAction(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + Map> combinedSentRequest = new HashMap<>(); + + capturedRequests.forEach((node, capturedRequestList) -> { + List sentRequestList = new ArrayList<>(); + + capturedRequestList.forEach(preSentRequest -> { + BytesStreamOutput out = new BytesStreamOutput(); + try { + TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest QueryGroupStatsRequestFromCoordinator = + (TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest) preSentRequest.request; + QueryGroupStatsRequestFromCoordinator.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MockNodeQueryGroupStatsRequest QueryGroupStatsRequest = new MockNodeQueryGroupStatsRequest(in); + sentRequestList.add(QueryGroupStatsRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + combinedSentRequest.put(node, sentRequestList); + }); + + return combinedSentRequest; + } + + private TestTransportQueryGroupStatsAction getTestTransportQueryGroupStatsAction() { + return new TestTransportQueryGroupStatsAction( + THREAD_POOL, + clusterService, + transportService, + mock(QueryGroupService.class), + new ActionFilters(Collections.emptySet()) + ); + } + + private static class TestTransportQueryGroupStatsAction extends TransportQueryGroupStatsAction { + public TestTransportQueryGroupStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + QueryGroupService queryGroupService, + ActionFilters actionFilters + ) { + super(threadPool, clusterService, transportService, queryGroupService, actionFilters); + } + } + + private static class MockNodeQueryGroupStatsRequest extends TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest { + + public MockNodeQueryGroupStatsRequest(StreamInput in) throws IOException { + super(in); + } + + public DiscoveryNode[] getDiscoveryNodes() { + return this.request.concreteNodes(); + } + } +} diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java index 4668b845150a9..7fcb55b585180 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java @@ -8,6 +8,7 @@ package org.opensearch.wlm; +import org.opensearch.common.SetOnce; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -16,6 +17,7 @@ import org.opensearch.wlm.WorkloadManagementTransportInterceptor.RequestHandler; import static org.opensearch.threadpool.ThreadPool.Names.SAME; +import static org.mockito.Mockito.mock; public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestCase { @@ -25,7 +27,7 @@ public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestC public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getTestName()); - sut = new WorkloadManagementTransportInterceptor(threadPool, new QueryGroupService()); + sut = new WorkloadManagementTransportInterceptor(threadPool, new SetOnce<>(mock(QueryGroupService.class))); } public void tearDown() throws Exception { diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java index 59818ad3dbbd2..7fa6e4e9f067b 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportRequestHandlerTests.java @@ -9,6 +9,7 @@ package org.opensearch.wlm; import org.opensearch.action.index.IndexRequest; +import org.opensearch.common.SetOnce; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.Task; @@ -30,7 +31,7 @@ public class WorkloadManagementTransportRequestHandlerTests extends OpenSearchTestCase { private WorkloadManagementTransportInterceptor.RequestHandler sut; private ThreadPool threadPool; - private QueryGroupService queryGroupService; + private SetOnce queryGroupService; private TestTransportRequestHandler actualHandler; @@ -38,7 +39,7 @@ public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getTestName()); actualHandler = new TestTransportRequestHandler<>(); - queryGroupService = mock(QueryGroupService.class); + queryGroupService = new SetOnce<>(mock(QueryGroupService.class)); sut = new WorkloadManagementTransportInterceptor.RequestHandler<>(threadPool, actualHandler, queryGroupService); } @@ -51,7 +52,7 @@ public void tearDown() throws Exception { public void testMessageReceivedForSearchWorkload_nonRejectionCase() throws Exception { ShardSearchRequest request = mock(ShardSearchRequest.class); QueryGroupTask spyTask = getSpyTask(); - doNothing().when(queryGroupService).rejectIfNeeded(anyString()); + doNothing().when(queryGroupService.get()).rejectIfNeeded(anyString()); sut.messageReceived(request, mock(TransportChannel.class), spyTask); assertTrue(sut.isSearchWorkloadRequest(spyTask)); } @@ -59,7 +60,7 @@ public void testMessageReceivedForSearchWorkload_nonRejectionCase() throws Excep public void testMessageReceivedForSearchWorkload_RejectionCase() throws Exception { ShardSearchRequest request = mock(ShardSearchRequest.class); QueryGroupTask spyTask = getSpyTask(); - doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService).rejectIfNeeded(anyString()); + doThrow(OpenSearchRejectedExecutionException.class).when(queryGroupService.get()).rejectIfNeeded(anyString()); assertThrows(OpenSearchRejectedExecutionException.class, () -> sut.messageReceived(request, mock(TransportChannel.class), spyTask)); } diff --git a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java index 0307ff623c408..964a807c38ac9 100644 --- a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java +++ b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java @@ -8,11 +8,13 @@ package org.opensearch.wlm.listeners; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.QueryGroupTask; import org.opensearch.wlm.ResourceType; @@ -70,6 +72,7 @@ public void testNonRejectionCase() { public void testValidQueryGroupRequestFailure() throws IOException { QueryGroupStats expectedStats = new QueryGroupStats( + mock(DiscoveryNode.class), Map.of( testQueryGroupId, new QueryGroupStats.QueryGroupStatsHolder( @@ -94,7 +97,7 @@ public void testMultiThreadedValidQueryGroupRequestFailures() { queryGroupStateMap.put(testQueryGroupId, new QueryGroupState()); - queryGroupService = new QueryGroupService(queryGroupStateMap); + queryGroupService = new QueryGroupService(mock(TransportService.class), queryGroupStateMap); sut = new QueryGroupRequestOperationListener(queryGroupService, testThreadPool); @@ -120,6 +123,7 @@ public void testMultiThreadedValidQueryGroupRequestFailures() { QueryGroupStats actualStats = queryGroupService.nodeStats(); QueryGroupStats expectedStats = new QueryGroupStats( + mock(DiscoveryNode.class), Map.of( testQueryGroupId, new QueryGroupStats.QueryGroupStatsHolder( @@ -142,6 +146,7 @@ public void testMultiThreadedValidQueryGroupRequestFailures() { public void testInvalidQueryGroupFailure() throws IOException { QueryGroupStats expectedStats = new QueryGroupStats( + mock(DiscoveryNode.class), Map.of( testQueryGroupId, new QueryGroupStats.QueryGroupStatsHolder( @@ -174,7 +179,7 @@ private void assertSuccess( testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, threadContextQG_Id); queryGroupStateMap.put(testQueryGroupId, new QueryGroupState()); - queryGroupService = new QueryGroupService(queryGroupStateMap); + queryGroupService = new QueryGroupService(mock(TransportService.class), queryGroupStateMap); sut = new QueryGroupRequestOperationListener(queryGroupService, testThreadPool); sut.onRequestFailure(null, null); diff --git a/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStatsTests.java b/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStatsTests.java index 661c3a7beae40..b6c66fee75bf9 100644 --- a/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStatsTests.java +++ b/server/src/test/java/org/opensearch/wlm/stats/QueryGroupStatsTests.java @@ -8,6 +8,7 @@ package org.opensearch.wlm.stats; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContent; @@ -19,6 +20,8 @@ import java.util.HashMap; import java.util.Map; +import static org.mockito.Mockito.mock; + public class QueryGroupStatsTests extends AbstractWireSerializingTestCase { public void testToXContent() throws IOException { @@ -35,7 +38,7 @@ public void testToXContent() throws IOException { ) ); XContentBuilder builder = JsonXContent.contentBuilder(); - QueryGroupStats queryGroupStats = new QueryGroupStats(stats); + QueryGroupStats queryGroupStats = new QueryGroupStats(mock(DiscoveryNode.class), stats); builder.startObject(); queryGroupStats.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); @@ -70,6 +73,6 @@ protected QueryGroupStats createTestInstance() { ) ) ); - return new QueryGroupStats(stats); + return new QueryGroupStats(mock(DiscoveryNode.class), stats); } }