From d7a452a55f7a381cb92ce7b336b3d14a2e34ce19 Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Wed, 10 Jul 2024 12:18:52 -0700 Subject: [PATCH] Add Get QueryGroup API Logic Signed-off-by: Ruirui Zhang --- plugins/workload-management/build.gradle | 18 ++ .../wlm/action/GetQueryGroupAction.java | 37 ++++ .../wlm/action/GetQueryGroupRequest.java | 66 ++++++ .../wlm/action/GetQueryGroupResponse.java | 82 ++++++++ .../action/TransportGetQueryGroupAction.java | 58 +++++ .../wlm/action/WorkloadManagementPlugin.java | 62 ++++++ .../WorkloadManagementPluginModule.java | 32 +++ .../plugin/wlm/action/package-info.java | 12 ++ .../action/rest/RestGetQueryGroupAction.java | 69 ++++++ .../plugin/wlm/action/rest/package-info.java | 12 ++ .../wlm/action/service/Persistable.java | 24 +++ .../service/QueryGroupPersistenceService.java | 126 +++++++++++ .../wlm/action/service/package-info.java | 12 ++ .../wlm/action/GetQueryGroupRequestTests.java | 38 ++++ .../action/GetQueryGroupResponseTests.java | 65 ++++++ .../wlm/action/QueryGroupTestUtils.java | 116 ++++++++++ .../QueryGroupPersistenceServiceTests.java | 76 +++++++ .../opensearch/cluster/metadata/Metadata.java | 6 + .../common/settings/ClusterSettings.java | 8 +- .../QueryGroupServiceSettings.java | 198 ++++++++++++++++++ .../search/query_group/package-info.java | 12 ++ 21 files changed, 1128 insertions(+), 1 deletion(-) create mode 100644 plugins/workload-management/build.gradle create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupRequest.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponse.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportGetQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestGetQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupRequestTests.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java create mode 100644 server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java create mode 100644 server/src/main/java/org/opensearch/search/query_group/package-info.java diff --git a/plugins/workload-management/build.gradle b/plugins/workload-management/build.gradle new file mode 100644 index 0000000000000..89e13c079795e --- /dev/null +++ b/plugins/workload-management/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +opensearchplugin { + description 'OpenSearch Workload Management Plugin.' + classname 'org.opensearch.plugin.wlm.action.WorkloadManagementPlugin' +} + +dependencies { +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupAction.java new file mode 100644 index 0000000000000..fb9365196f828 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupAction.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionType; + +/** + * Transport action to get QueryGroup + * + * @opensearch.api + */ +public class GetQueryGroupAction extends ActionType { + + /** + /** + * An instance of GetQueryGroupAction + */ + public static final GetQueryGroupAction INSTANCE = new GetQueryGroupAction(); + + /** + * Name for GetQueryGroupAction + */ + public static final String NAME = "cluster:admin/opensearch/query_group/wlm/_get"; + + /** + * Default constructor + */ + private GetQueryGroupAction() { + super(NAME, GetQueryGroupResponse::new); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupRequest.java new file mode 100644 index 0000000000000..ee394fdc36440 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupRequest.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * A request for get QueryGroup + * + * @opensearch.internal + */ +public class GetQueryGroupRequest extends ActionRequest implements Writeable.Reader { + String name; + + /** + * Default constructor for GetQueryGroupRequest + * @param name - name for the QueryGroup to get + */ + public GetQueryGroupRequest(String name) { + this.name = name; + } + + /** + * Constructor for GetQueryGroupRequest + * @param in - A {@link StreamInput} object + */ + public GetQueryGroupRequest(StreamInput in) throws IOException { + super(in); + name = in.readOptionalString(); + } + + @Override + public GetQueryGroupRequest read(StreamInput in) throws IOException { + return new GetQueryGroupRequest(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + /** + * Name getter + */ + public String getName() { + return name; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(name); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponse.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponse.java new file mode 100644 index 0000000000000..3355ffc936360 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponse.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + * Response for the get API for QueryGroup + * + * @opensearch.internal + */ +public class GetQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject { + private final List queryGroups; + private RestStatus restStatus; + + /** + * Constructor for GetQueryGroupResponse + * @param queryGroups - The QueryGroup list to be fetched + * @param restStatus - The rest status of the request + */ + public GetQueryGroupResponse(final List queryGroups, RestStatus restStatus) { + this.queryGroups = queryGroups; + this.restStatus = restStatus; + } + + /** + * Constructor for GetQueryGroupResponse + * @param in - A {@link StreamInput} object + */ + public GetQueryGroupResponse(StreamInput in) throws IOException { + this.queryGroups = in.readList(QueryGroup::new); + restStatus = RestStatus.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(queryGroups); + RestStatus.writeTo(out, restStatus); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("query_groups"); + for (QueryGroup group : queryGroups) { + group.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + /** + * queryGroups getter + */ + public List getQueryGroups() { + return queryGroups; + } + + /** + * restStatus getter + */ + public RestStatus getRestStatus() { + return restStatus; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportGetQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportGetQueryGroupAction.java new file mode 100644 index 0000000000000..df34da9e9e248 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportGetQueryGroupAction.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.action.service.Persistable; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +/** + * Transport action for get QueryGroup + * + * @opensearch.internal + */ +public class TransportGetQueryGroupAction extends HandledTransportAction { + + private final ThreadPool threadPool; + private final Persistable queryGroupPersistenceService; + + /** + * Constructor for TransportGetQueryGroupAction + * + * @param actionName - action name + * @param transportService - a {@link TransportService} object + * @param actionFilters - a {@link ActionFilters} object + * @param threadPool - a {@link ThreadPool} object + * @param queryGroupPersistenceService - a {@link Persistable} object + */ + @Inject + public TransportGetQueryGroupAction( + String actionName, + TransportService transportService, + ActionFilters actionFilters, + ThreadPool threadPool, + Persistable queryGroupPersistenceService + ) { + super(GetQueryGroupAction.NAME, transportService, actionFilters, GetQueryGroupRequest::new); + this.threadPool = threadPool; + this.queryGroupPersistenceService = queryGroupPersistenceService; + } + + @Override + protected void doExecute(Task task, GetQueryGroupRequest request, ActionListener listener) { + String name = request.getName(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.get(name, listener)); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java new file mode 100644 index 0000000000000..8860e9aae91dc --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.inject.Module; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.plugin.wlm.action.rest.RestGetQueryGroupAction; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * Plugin class for WorkloadManagement + */ +public class WorkloadManagementPlugin extends Plugin implements ActionPlugin { + + /** + * Default constructor + */ + public WorkloadManagementPlugin() {} + + @Override + public List> getActions() { + return List.of(new ActionPlugin.ActionHandler<>(GetQueryGroupAction.INSTANCE, TransportGetQueryGroupAction.class)); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(new RestGetQueryGroupAction()); + } + + @Override + public Collection createGuiceModules() { + return List.of(new WorkloadManagementPluginModule()); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java new file mode 100644 index 0000000000000..65f92a59a576b --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.inject.AbstractModule; +import org.opensearch.common.inject.TypeLiteral; +import org.opensearch.plugin.wlm.action.service.Persistable; +import org.opensearch.plugin.wlm.action.service.QueryGroupPersistenceService; + +/** + * Guice Module to manage WorkloadManagement related objects + */ +public class WorkloadManagementPluginModule extends AbstractModule { + + /** + * Constructor for WorkloadManagementPluginModule + */ + public WorkloadManagementPluginModule() {} + + @Override + protected void configure() { + bind(new TypeLiteral>() { + }).to(QueryGroupPersistenceService.class).asEagerSingleton(); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java new file mode 100644 index 0000000000000..8f7d2647546f5 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of CRUD API of QueryGroup + */ +package org.opensearch.plugin.wlm.action; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestGetQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestGetQueryGroupAction.java new file mode 100644 index 0000000000000..162c2b2b304cb --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestGetQueryGroupAction.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.rest; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.wlm.action.GetQueryGroupAction; +import org.opensearch.plugin.wlm.action.GetQueryGroupRequest; +import org.opensearch.plugin.wlm.action.GetQueryGroupResponse; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action to get a QueryGroup0 + * + * @opensearch.api + */ +public class RestGetQueryGroupAction extends BaseRestHandler { + + /** + * Constructor for RestGetQueryGroupAction + */ + public RestGetQueryGroupAction() {} + + @Override + public String getName() { + return "get_query_group"; + } + + /** + * The list of {@link Route}s that this RestHandler is responsible for handling. + */ + @Override + public List routes() { + return List.of(new Route(GET, "_query_group/{name}"), new Route(GET, "_query_group/")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String name = request.param("name"); + GetQueryGroupRequest getQueryGroupRequest = new GetQueryGroupRequest(name); + return channel -> client.execute(GetQueryGroupAction.INSTANCE, getQueryGroupRequest, getQueryGroupResponse(channel)); + } + + private RestResponseListener getQueryGroupResponse(final RestChannel channel) { + return new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final GetQueryGroupResponse response) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java new file mode 100644 index 0000000000000..6d468b26c5649 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of CRUD API of QueryGroup + */ +package org.opensearch.plugin.wlm.action.rest; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java new file mode 100644 index 0000000000000..eae172fe6f3f9 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.action.GetQueryGroupResponse; + +/** + * This interface defines the key APIs for implementing QueruGroup persistence + */ +public interface Persistable { + /** + * fetch the QueryGroup in a durable storage + * @param name + * @param listener + */ + void get(String name, ActionListener listener); +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java new file mode 100644 index 0000000000000..5d481dc87d8bc --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler.ThrottlingKey; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.plugin.wlm.action.GetQueryGroupResponse; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.DoubleAdder; + +import static org.opensearch.search.query_group.QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT; + +/** + * This class defines the functions for QueryGroup persistence + */ +public class QueryGroupPersistenceService implements Persistable { + private static final Logger logger = LogManager.getLogger(QueryGroupPersistenceService.class); + private final ClusterService clusterService; + private static final String SOURCE = "query-group-persistence-service"; + private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group"; + private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group"; + private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group"; + private final AtomicInteger inflightCreateQueryGroupRequestCount; + private final Map inflightResourceLimitValues; + private volatile int maxQueryGroupCount; + final ThrottlingKey createQueryGroupThrottlingKey; + final ThrottlingKey updateQueryGroupThrottlingKey; + final ThrottlingKey deleteQueryGroupThrottlingKey; + + /** + * Constructor for QueryGroupPersistenceService + * + * @param clusterService {@link ClusterService} - The cluster service to be used by QueryGroupPersistenceService + * @param settings {@link Settings} - The settings to be used by QueryGroupPersistenceService + * @param clusterSettings {@link ClusterSettings} - The cluster settings to be used by QueryGroupPersistenceService + */ + @Inject + public QueryGroupPersistenceService( + final ClusterService clusterService, + final Settings settings, + final ClusterSettings clusterSettings + ) { + this.clusterService = clusterService; + this.createQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_THROTTLING_KEY, true); + this.deleteQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_THROTTLING_KEY, true); + this.updateQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true); + maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); + inflightCreateQueryGroupRequestCount = new AtomicInteger(); + inflightResourceLimitValues = new HashMap<>(); + } + + /** + * Set maxQueryGroupCount to be newMaxQueryGroupCount + * @param newMaxQueryGroupCount - the max number of QueryGroup allowed + */ + public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { + if (newMaxQueryGroupCount < 0) { + throw new IllegalArgumentException("node.query_group.max_count can't be negative"); + } + this.maxQueryGroupCount = newMaxQueryGroupCount; + } + + @Override + public void get(String name, ActionListener listener) { + ClusterState currentState = clusterService.state(); + List resultGroups = getFromClusterStateMetadata(name, currentState); + if (resultGroups.isEmpty() && name != null && !name.isEmpty()) { + logger.warn("No QueryGroup exists with the provided name: {}", name); + Exception e = new RuntimeException("No QueryGroup exists with the provided name: " + name); + listener.onFailure(e); + return; + } + GetQueryGroupResponse response = new GetQueryGroupResponse(resultGroups, RestStatus.OK); + listener.onResponse(response); + } + + List getFromClusterStateMetadata(String name, ClusterState currentState) { + Map currentGroups = currentState.getMetadata().queryGroups(); + if (name == null || name.isEmpty()) { + return new ArrayList<>(currentGroups.values()); + } + List resultGroups = new ArrayList<>(); + for (QueryGroup group : currentGroups.values()) { + if (group.getName().equals(name)) { + resultGroups.add(group); + break; + } + } + return resultGroups; + } + + /** + * inflightCreateQueryGroupRequestCount getter + */ + public AtomicInteger getInflightCreateQueryGroupRequestCount() { + return inflightCreateQueryGroupRequestCount; + } + + /** + * inflightResourceLimitValues getter + */ + public Map getInflightResourceLimitValues() { + return inflightResourceLimitValues; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java new file mode 100644 index 0000000000000..e70ac3afb81b5 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for the service classes for QueryGroup CRUD operations + */ +package org.opensearch.plugin.wlm.action.service; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupRequestTests.java new file mode 100644 index 0000000000000..107c4d085ff46 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupRequestTests.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class GetQueryGroupRequestTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + GetQueryGroupRequest request = new GetQueryGroupRequest(QueryGroupTestUtils.NAME_ONE); + assertEquals(QueryGroupTestUtils.NAME_ONE, request.getName()); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + GetQueryGroupRequest otherRequest = new GetQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + } + + public void testSerializationWithNull() throws IOException { + GetQueryGroupRequest request = new GetQueryGroupRequest((String) null); + assertNull(request.getName()); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + GetQueryGroupRequest otherRequest = new GetQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java new file mode 100644 index 0000000000000..1d706362377c6 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class GetQueryGroupResponseTests extends OpenSearchTestCase { + + public void testSerializationSingleQueryGroup() throws IOException { + List list = new ArrayList<>(); + list.add(QueryGroupTestUtils.queryGroupOne); + GetQueryGroupResponse response = new GetQueryGroupResponse(list, RestStatus.OK); + assertEquals(response.getQueryGroups(), list); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + + GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + QueryGroupTestUtils.compareQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups()); + } + + public void testSerializationMultipleQueryGroup() throws IOException { + GetQueryGroupResponse response = new GetQueryGroupResponse(QueryGroupTestUtils.queryGroupList(), RestStatus.OK); + assertEquals(response.getQueryGroups(), QueryGroupTestUtils.queryGroupList()); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + + GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + assertEquals(2, otherResponse.getQueryGroups().size()); + QueryGroupTestUtils.compareQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups()); + } + + public void testSerializationNull() throws IOException { + List list = new ArrayList<>(); + GetQueryGroupResponse response = new GetQueryGroupResponse(list, RestStatus.OK); + assertEquals(response.getQueryGroups(), list); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + + GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + assertEquals(0, otherResponse.getQueryGroups().size()); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java new file mode 100644 index 0000000000000..ef167a1589115 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.wlm.action.service.QueryGroupPersistenceService; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.DoubleAdder; + +import static org.opensearch.cluster.metadata.QueryGroup.builder; +import static org.opensearch.search.ResourceType.fromName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class QueryGroupTestUtils { + public static final String NAME_ONE = "query_group_one"; + public static final String NAME_TWO = "query_group_two"; + public static final String _ID_ONE = "AgfUO5Ja9yfsYlONlYi3TQ=="; + public static final String _ID_TWO = "G5iIqHy4g7eK1qIAAAAIH53=1"; + public static final String NAME_NONE_EXISTED = "query_group_none_existed"; + public static final String MEMORY_STRING = "memory"; + public static final String MONITOR_STRING = "monitor"; + public static final long TIMESTAMP_ONE = 4513232413L; + public static final long TIMESTAMP_TWO = 4513232415L; + public static final QueryGroup queryGroupOne = builder().name(NAME_ONE) + ._id(_ID_ONE) + .mode(MONITOR_STRING) + .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.3)) + .updatedAt(TIMESTAMP_ONE) + .build(); + + public static final QueryGroup queryGroupTwo = builder().name(NAME_TWO) + ._id(_ID_TWO) + .mode(MONITOR_STRING) + .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.6)) + .updatedAt(TIMESTAMP_TWO) + .build(); + + public static final Map queryGroupMap = Map.of(NAME_ONE, queryGroupOne, NAME_TWO, queryGroupTwo); + + public static List queryGroupList() { + List list = new ArrayList<>(); + list.add(queryGroupOne); + list.add(queryGroupTwo); + return list; + } + + public static ClusterState clusterState() { + final Metadata metadata = Metadata.builder().queryGroups(Map.of(NAME_ONE, queryGroupOne, NAME_TWO, queryGroupTwo)).build(); + return ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + } + + public static Settings settings() { + return Settings.builder().build(); + } + + public static ClusterSettings clusterSettings() { + return new ClusterSettings(settings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + } + + public static QueryGroupPersistenceService queryGroupPersistenceService() { + ClusterService clusterService = new ClusterService(settings(), clusterSettings(), mock(ThreadPool.class)); + return new QueryGroupPersistenceService(clusterService, settings(), clusterSettings()); + } + + public static List preparePersistenceServiceSetup(Map queryGroups) { + Metadata metadata = Metadata.builder().queryGroups(queryGroups).build(); + Settings settings = Settings.builder().build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, mock(ThreadPool.class)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + settings, + clusterSettings + ); + return List.of(queryGroupPersistenceService, clusterState); + } + + public static void compareQueryGroups(List listOne, List listTwo) { + assertEquals(listOne.size(), listTwo.size()); + listOne.sort(Comparator.comparing(QueryGroup::getName)); + listTwo.sort(Comparator.comparing(QueryGroup::getName)); + for (int i = 0; i < listOne.size(); i++) { + assertTrue(listOne.get(i).equals(listTwo.get(i))); + } + } + + public static void assertInflightValuesAreZero(QueryGroupPersistenceService queryGroupPersistenceService) { + assertEquals(0, queryGroupPersistenceService.getInflightCreateQueryGroupRequestCount().get()); + Map inflightResourceMap = queryGroupPersistenceService.getInflightResourceLimitValues(); + if (inflightResourceMap != null) { + for (String resourceName : inflightResourceMap.keySet()) { + assertEquals(0, inflightResourceMap.get(resourceName).intValue()); + } + } + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java new file mode 100644 index 0000000000000..2861f42d4cd40 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.NAME_NONE_EXISTED; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.NAME_ONE; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.NAME_TWO; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.assertInflightValuesAreZero; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.clusterState; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.compareQueryGroups; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupList; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupOne; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupPersistenceService; +import static org.mockito.Mockito.mock; + +public class QueryGroupPersistenceServiceTests extends OpenSearchTestCase { + + public void testGetSingleQueryGroup() { + List groups = queryGroupPersistenceService().getFromClusterStateMetadata(NAME_ONE, clusterState()); + assertEquals(1, groups.size()); + QueryGroup queryGroup = groups.get(0); + List listOne = new ArrayList<>(); + List listTwo = new ArrayList<>(); + listOne.add(queryGroupOne); + listTwo.add(queryGroup); + compareQueryGroups(listOne, listTwo); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testGetAllQueryGroups() { + assertEquals(2, clusterState().metadata().queryGroups().size()); + List res = queryGroupPersistenceService().getFromClusterStateMetadata(null, clusterState()); + assertEquals(2, res.size()); + Set currentNAME = res.stream().map(QueryGroup::getName).collect(Collectors.toSet()); + assertTrue(currentNAME.contains(NAME_ONE)); + assertTrue(currentNAME.contains(NAME_TWO)); + compareQueryGroups(queryGroupList(), res); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testGetZeroQueryGroups() { + Settings settings = Settings.builder().build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + mock(ClusterService.class), + settings, + clusterSettings + ); + List res = queryGroupPersistenceService.getFromClusterStateMetadata(NAME_NONE_EXISTED, clusterState()); + assertEquals(0, res.size()); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testGetNonExistedQueryGroups() { + List groups = queryGroupPersistenceService().getFromClusterStateMetadata(NAME_NONE_EXISTED, clusterState()); + assertEquals(0, groups.size()); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 2a54f6444ffda..d1589bce0cdc8 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -844,6 +844,12 @@ public Map views() { return Optional.ofNullable((ViewMetadata) this.custom(ViewMetadata.TYPE)).map(ViewMetadata::views).orElse(Collections.emptyMap()); } + public Map queryGroups() { + return Optional.ofNullable((QueryGroupMetadata) this.custom(QueryGroupMetadata.TYPE)) + .map(QueryGroupMetadata::queryGroups) + .orElse(Collections.emptyMap()); + } + public DecommissionAttributeMetadata decommissionAttributeMetadata() { return custom(DecommissionAttributeMetadata.TYPE); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5dcf23ae52294..9ee69c1428d83 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -158,6 +158,7 @@ import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.search.fetch.subphase.highlight.FastVectorHighlighter; +import org.opensearch.search.query_group.QueryGroupServiceSettings; import org.opensearch.snapshots.InternalSnapshotsInfoService; import org.opensearch.snapshots.SnapshotsService; import org.opensearch.tasks.TaskCancellationMonitoringSettings; @@ -758,7 +759,12 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, // Composite index settings - CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING + CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING, + + // QueryGroup settings + QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT, + QueryGroupServiceSettings.NODE_LEVEL_REJECTION_THRESHOLD, + QueryGroupServiceSettings.NODE_LEVEL_CANCELLATION_THRESHOLD ) ) ); diff --git a/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java b/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java new file mode 100644 index 0000000000000..7f6e4955cf22f --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java @@ -0,0 +1,198 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query_group; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +/** + * Main class to declare the QueryGroup feature related settings + */ +public class QueryGroupServiceSettings { + private static final Long DEFAULT_RUN_INTERVAL_MILLIS = 1000l; + private static final Double DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD = 0.8; + private static final Double DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD = 0.9; + /** + * default max queryGroup count on any node at any given point in time + */ + public static final int DEFAULT_MAX_QUERY_GROUP_COUNT_VALUE = 100; + + public static final String QUERY_GROUP_COUNT_SETTING_NAME = "node.query_group.max_count"; + public static final double NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95; + public static final double NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE = 0.90; + + private TimeValue runIntervalMillis; + private Double nodeLevelMemoryCancellationThreshold; + private Double nodeLevelMemoryRejectionThreshold; + private volatile int maxQueryGroupCount; + /** + * max QueryGroup count setting + */ + public static final Setting MAX_QUERY_GROUP_COUNT = Setting.intSetting( + QUERY_GROUP_COUNT_SETTING_NAME, + DEFAULT_MAX_QUERY_GROUP_COUNT_VALUE, + 0, + (newVal) -> { + if (newVal > 100 || newVal < 1) throw new IllegalArgumentException( + QUERY_GROUP_COUNT_SETTING_NAME + " should be in range [1-100]" + ); + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for default QueryGroup count + */ + public static final String SERVICE_RUN_INTERVAL_MILLIS_SETTING_NAME = "query_group.service.run_interval_millis"; + /** + * Setting to control the run interval of QSB service + */ + private static final Setting QUERY_GROUP_RUN_INTERVAL_SETTING = Setting.longSetting( + SERVICE_RUN_INTERVAL_MILLIS_SETTING_NAME, + DEFAULT_RUN_INTERVAL_MILLIS, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting name for node level rejection threshold for QSB + */ + public static final String NODE_REJECTION_THRESHOLD_SETTING_NAME = "query_group.node.rejection_threshold"; + /** + * Setting to control the rejection threshold + */ + public static final Setting NODE_LEVEL_REJECTION_THRESHOLD = Setting.doubleSetting( + NODE_REJECTION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for node level cancellation threshold + */ + public static final String NODE_CANCELLATION_THRESHOLD_SETTING_NAME = "query_group.node.cancellation_threshold"; + /** + * Setting name for node level cancellation threshold + */ + public static final Setting NODE_LEVEL_CANCELLATION_THRESHOLD = Setting.doubleSetting( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * QueryGroup service settings constructor + * @param settings + * @param clusterSettings + */ + public QueryGroupServiceSettings(Settings settings, ClusterSettings clusterSettings) { + runIntervalMillis = new TimeValue(QUERY_GROUP_RUN_INTERVAL_SETTING.get(settings)); + nodeLevelMemoryCancellationThreshold = NODE_LEVEL_CANCELLATION_THRESHOLD.get(settings); + nodeLevelMemoryRejectionThreshold = NODE_LEVEL_REJECTION_THRESHOLD.get(settings); + maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + + clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CANCELLATION_THRESHOLD, this::setNodeLevelMemoryCancellationThreshold); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_REJECTION_THRESHOLD, this::setNodeLevelMemoryRejectionThreshold); + } + + /** + * Method to get runInterval for QSB + * @return runInterval in milliseconds for QSB Service + */ + public TimeValue getRunIntervalMillis() { + return runIntervalMillis; + } + + /** + * Method to set the new QueryGroup count + * @param newMaxQueryGroupCount is the new maxQueryGroupCount per node + */ + public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { + if (newMaxQueryGroupCount < 0) { + throw new IllegalArgumentException("node.node.query_group.max_count can't be negative"); + } + this.maxQueryGroupCount = newMaxQueryGroupCount; + } + + /** + * Method to get the node level cancellation threshold + * @return current node level cancellation threshold + */ + public Double getNodeLevelMemoryCancellationThreshold() { + return nodeLevelMemoryCancellationThreshold; + } + + /** + * Method to set the node level cancellation threshold + * @param nodeLevelMemoryCancellationThreshold sets the new node level cancellation threshold + * @throws IllegalArgumentException if the value is > 0.95 and cancellation < rejection threshold + */ + public void setNodeLevelMemoryCancellationThreshold(Double nodeLevelMemoryCancellationThreshold) { + if (Double.compare(nodeLevelMemoryCancellationThreshold, NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be greater than 0.95 as it pose a threat of node drop" + ); + } + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + + this.nodeLevelMemoryCancellationThreshold = nodeLevelMemoryCancellationThreshold; + } + + /** + * Method to get the node level rejection threshold + * @return the current node level rejection threshold + */ + public Double getNodeLevelMemoryRejectionThreshold() { + return nodeLevelMemoryRejectionThreshold; + } + + /** + * Method to set the node level rejection threshold + * @param nodeLevelMemoryRejectionThreshold sets the new rejection threshold + * @throws IllegalArgumentException if rejection > 0.90 and rejection < cancellation threshold + */ + public void setNodeLevelMemoryRejectionThreshold(Double nodeLevelMemoryRejectionThreshold) { + if (Double.compare(nodeLevelMemoryRejectionThreshold, NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_REJECTION_THRESHOLD_SETTING_NAME + " value not be greater than 0.90 as it pose a threat of node drop" + ); + } + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + + this.nodeLevelMemoryRejectionThreshold = nodeLevelMemoryRejectionThreshold; + } + + private void ensureRejectionThresholdIsLessThanCancellation( + Double nodeLevelMemoryRejectionThreshold, + Double nodeLevelMemoryCancellationThreshold + ) { + if (Double.compare(nodeLevelMemoryCancellationThreshold, nodeLevelMemoryRejectionThreshold) < 0) { + throw new IllegalArgumentException( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be less than " + NODE_REJECTION_THRESHOLD_SETTING_NAME + ); + } + } + + /** + * Method to get the current QueryGroup count + * @return the current max QueryGroup count + */ + public int getMaxQueryGroupCount() { + return maxQueryGroupCount; + } +} diff --git a/server/src/main/java/org/opensearch/search/query_group/package-info.java b/server/src/main/java/org/opensearch/search/query_group/package-info.java new file mode 100644 index 0000000000000..00b68b0d3306c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query_group/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * QueryGroup related artifacts + */ +package org.opensearch.search.query_group;