From eaa27186a70442ebb56fcd4eb665185846e71ace Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Tue, 3 Sep 2024 18:36:19 -0700 Subject: [PATCH] Add Update QueryGroup API Logic (#14775) Signed-off-by: Ruirui Zhang * Add Update QueryGroup API Logic Signed-off-by: Ruirui Zhang * append to changlog Signed-off-by: Ruirui Zhang * add javadoc Signed-off-by: Ruirui Zhang * rebase Signed-off-by: Ruirui Zhang * address comments Signed-off-by: Ruirui Zhang * address comments Signed-off-by: Ruirui Zhang * fix UT Signed-off-by: Ruirui Zhang * adress comments Signed-off-by: Ruirui Zhang * address comments Signed-off-by: Ruirui Zhang (cherry picked from commit 0753461986cd5746bf7b7ed2ba153cc65cea9a59) --- CHANGELOG.md | 1 + .../plugin/wlm/WorkloadManagementPlugin.java | 13 +- .../wlm/action/CreateQueryGroupRequest.java | 4 +- .../TransportUpdateQueryGroupAction.java | 51 ++++ .../wlm/action/UpdateQueryGroupAction.java | 36 +++ .../wlm/action/UpdateQueryGroupRequest.java | 83 +++++++ .../wlm/action/UpdateQueryGroupResponse.java | 74 ++++++ .../wlm/rest/RestGetQueryGroupAction.java | 2 +- .../wlm/rest/RestUpdateQueryGroupAction.java | 72 ++++++ .../service/QueryGroupPersistenceService.java | 139 ++++++++--- .../plugin/wlm/QueryGroupTestUtils.java | 40 +++- .../action/CreateQueryGroupRequestTests.java | 2 +- .../action/CreateQueryGroupResponseTests.java | 6 +- .../action/GetQueryGroupResponseTests.java | 16 +- .../wlm/action/QueryGroupActionTestUtils.java | 17 ++ .../action/UpdateQueryGroupRequestTests.java | 97 ++++++++ .../action/UpdateQueryGroupResponseTests.java | 67 ++++++ .../QueryGroupPersistenceServiceTests.java | 182 +++++++++++++- .../api/update_query_group_context.json | 23 ++ .../rest-api-spec/test/wlm/10_query_group.yml | 42 ++++ .../cluster/metadata/QueryGroup.java | 165 +++++-------- .../wlm/MutableQueryGroupFragment.java | 225 ++++++++++++++++++ .../metadata/QueryGroupMetadataTests.java | 6 +- .../cluster/metadata/QueryGroupTests.java | 64 +++-- .../wlm/MutableQueryGroupFragmentTests.java | 66 +++++ 25 files changed, 1292 insertions(+), 201 deletions(-) create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponse.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateQueryGroupAction.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupActionTestUtils.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java create mode 100644 plugins/workload-management/src/yamlRestTest/resources/rest-api-spec/api/update_query_group_context.json create mode 100644 server/src/main/java/org/opensearch/wlm/MutableQueryGroupFragment.java create mode 100644 server/src/test/java/org/opensearch/wlm/MutableQueryGroupFragmentTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 18ab4c09c56a3..78364c9d73cad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711)) - Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) - [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709)) +- [Workload Management] Add Update QueryGroup API Logic ([#14775](https://github.com/opensearch-project/OpenSearch/pull/14775)) - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) - Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774)) - Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153)) diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java index 64f510fa1db67..c86490552f2f2 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java @@ -24,9 +24,12 @@ import org.opensearch.plugin.wlm.action.TransportCreateQueryGroupAction; import org.opensearch.plugin.wlm.action.TransportDeleteQueryGroupAction; import org.opensearch.plugin.wlm.action.TransportGetQueryGroupAction; +import org.opensearch.plugin.wlm.action.TransportUpdateQueryGroupAction; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupAction; import org.opensearch.plugin.wlm.rest.RestCreateQueryGroupAction; import org.opensearch.plugin.wlm.rest.RestDeleteQueryGroupAction; import org.opensearch.plugin.wlm.rest.RestGetQueryGroupAction; +import org.opensearch.plugin.wlm.rest.RestUpdateQueryGroupAction; import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; @@ -52,7 +55,8 @@ public WorkloadManagementPlugin() {} return List.of( new ActionPlugin.ActionHandler<>(CreateQueryGroupAction.INSTANCE, TransportCreateQueryGroupAction.class), new ActionPlugin.ActionHandler<>(GetQueryGroupAction.INSTANCE, TransportGetQueryGroupAction.class), - new ActionPlugin.ActionHandler<>(DeleteQueryGroupAction.INSTANCE, TransportDeleteQueryGroupAction.class) + new ActionPlugin.ActionHandler<>(DeleteQueryGroupAction.INSTANCE, TransportDeleteQueryGroupAction.class), + new ActionPlugin.ActionHandler<>(UpdateQueryGroupAction.INSTANCE, TransportUpdateQueryGroupAction.class) ); } @@ -66,7 +70,12 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - return List.of(new RestCreateQueryGroupAction(), new RestGetQueryGroupAction(), new RestDeleteQueryGroupAction()); + return List.of( + new RestCreateQueryGroupAction(), + new RestGetQueryGroupAction(), + new RestDeleteQueryGroupAction(), + new RestUpdateQueryGroupAction() + ); } @Override diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java index ff6422be36885..d92283391dd3b 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java @@ -40,7 +40,7 @@ public class CreateQueryGroupRequest extends ActionRequest { * Constructor for CreateQueryGroupRequest * @param queryGroup - A {@link QueryGroup} object */ - public CreateQueryGroupRequest(QueryGroup queryGroup) { + CreateQueryGroupRequest(QueryGroup queryGroup) { this.queryGroup = queryGroup; } @@ -48,7 +48,7 @@ public CreateQueryGroupRequest(QueryGroup queryGroup) { * Constructor for CreateQueryGroupRequest * @param in - A {@link StreamInput} object */ - public CreateQueryGroupRequest(StreamInput in) throws IOException { + CreateQueryGroupRequest(StreamInput in) throws IOException { super(in); queryGroup = new QueryGroup(in); } diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java new file mode 100644 index 0000000000000..a6aa2da8fdc08 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * Transport action to update QueryGroup + * + * @opensearch.experimental + */ +public class TransportUpdateQueryGroupAction extends HandledTransportAction { + + private final QueryGroupPersistenceService queryGroupPersistenceService; + + /** + * Constructor for TransportUpdateQueryGroupAction + * + * @param actionName - action name + * @param transportService - a {@link TransportService} object + * @param actionFilters - a {@link ActionFilters} object + * @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object + */ + @Inject + public TransportUpdateQueryGroupAction( + String actionName, + TransportService transportService, + ActionFilters actionFilters, + QueryGroupPersistenceService queryGroupPersistenceService + ) { + super(UpdateQueryGroupAction.NAME, transportService, actionFilters, UpdateQueryGroupRequest::new); + this.queryGroupPersistenceService = queryGroupPersistenceService; + } + + @Override + protected void doExecute(Task task, UpdateQueryGroupRequest request, ActionListener listener) { + queryGroupPersistenceService.updateInClusterStateMetadata(request, listener); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupAction.java new file mode 100644 index 0000000000000..ff472f206131c --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupAction.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionType; + +/** + * Transport action to update QueryGroup + * + * @opensearch.experimental + */ +public class UpdateQueryGroupAction extends ActionType { + + /** + * An instance of UpdateQueryGroupAction + */ + public static final UpdateQueryGroupAction INSTANCE = new UpdateQueryGroupAction(); + + /** + * Name for UpdateQueryGroupAction + */ + public static final String NAME = "cluster:admin/opensearch/wlm/query_group/_update"; + + /** + * Default constructor + */ + private UpdateQueryGroupAction() { + super(NAME, UpdateQueryGroupResponse::new); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java new file mode 100644 index 0000000000000..048b599f095fd --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.wlm.MutableQueryGroupFragment; + +import java.io.IOException; + +/** + * A request for update QueryGroup + * + * @opensearch.experimental + */ +public class UpdateQueryGroupRequest extends ActionRequest { + private final String name; + private final MutableQueryGroupFragment mutableQueryGroupFragment; + + /** + * Constructor for UpdateQueryGroupRequest + * @param name - QueryGroup name for UpdateQueryGroupRequest + * @param mutableQueryGroupFragment - MutableQueryGroupFragment for UpdateQueryGroupRequest + */ + UpdateQueryGroupRequest(String name, MutableQueryGroupFragment mutableQueryGroupFragment) { + this.name = name; + this.mutableQueryGroupFragment = mutableQueryGroupFragment; + } + + /** + * Constructor for UpdateQueryGroupRequest + * @param in - A {@link StreamInput} object + */ + UpdateQueryGroupRequest(StreamInput in) throws IOException { + this(in.readString(), new MutableQueryGroupFragment(in)); + } + + /** + * Generate a UpdateQueryGroupRequest from XContent + * @param parser - A {@link XContentParser} object + * @param name - name of the QueryGroup to be updated + */ + public static UpdateQueryGroupRequest fromXContent(XContentParser parser, String name) throws IOException { + QueryGroup.Builder builder = QueryGroup.Builder.fromXContent(parser); + return new UpdateQueryGroupRequest(name, builder.getMutableQueryGroupFragment()); + } + + @Override + public ActionRequestValidationException validate() { + QueryGroup.validateName(name); + return null; + } + + /** + * name getter + */ + public String getName() { + return name; + } + + /** + * mutableQueryGroupFragment getter + */ + public MutableQueryGroupFragment getmMutableQueryGroupFragment() { + return mutableQueryGroupFragment; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + mutableQueryGroupFragment.writeTo(out); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponse.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponse.java new file mode 100644 index 0000000000000..9071f52ecb5a7 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponse.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Response for the update API for QueryGroup + * + * @opensearch.experimental + */ +public class UpdateQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject { + private final QueryGroup queryGroup; + private final RestStatus restStatus; + + /** + * Constructor for UpdateQueryGroupResponse + * @param queryGroup - the QueryGroup to be updated + * @param restStatus - the rest status for the response + */ + public UpdateQueryGroupResponse(final QueryGroup queryGroup, RestStatus restStatus) { + this.queryGroup = queryGroup; + this.restStatus = restStatus; + } + + /** + * Constructor for UpdateQueryGroupResponse + * @param in - a {@link StreamInput} object + */ + public UpdateQueryGroupResponse(StreamInput in) throws IOException { + queryGroup = new QueryGroup(in); + restStatus = RestStatus.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + queryGroup.writeTo(out); + RestStatus.writeTo(out, restStatus); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return queryGroup.toXContent(builder, params); + } + + /** + * queryGroup getter + */ + public QueryGroup getQueryGroup() { + return queryGroup; + } + + /** + * restStatus getter + */ + public RestStatus getRestStatus() { + return restStatus; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestGetQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestGetQueryGroupAction.java index c250bd2979e98..c87973e113138 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestGetQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestGetQueryGroupAction.java @@ -27,7 +27,7 @@ import static org.opensearch.rest.RestRequest.Method.GET; /** - * Rest action to get a QueryGroup0 + * Rest action to get a QueryGroup * * @opensearch.experimental */ diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateQueryGroupAction.java new file mode 100644 index 0000000000000..55b4bc5a295c4 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateQueryGroupAction.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.rest; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupAction; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * Rest action to update a QueryGroup + * + * @opensearch.experimental + */ +public class RestUpdateQueryGroupAction extends BaseRestHandler { + + /** + * Constructor for RestUpdateQueryGroupAction + */ + public RestUpdateQueryGroupAction() {} + + @Override + public String getName() { + return "update_query_group"; + } + + /** + * The list of {@link Route}s that this RestHandler is responsible for handling. + */ + @Override + public List routes() { + return List.of(new Route(POST, "_wlm/query_group/{name}"), new Route(PUT, "_wlm/query_group/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + try (XContentParser parser = request.contentParser()) { + UpdateQueryGroupRequest updateQueryGroupRequest = UpdateQueryGroupRequest.fromXContent(parser, request.param("name")); + return channel -> client.execute(UpdateQueryGroupAction.INSTANCE, updateQueryGroupRequest, updateQueryGroupResponse(channel)); + } + } + + private RestResponseListener updateQueryGroupResponse(final RestChannel channel) { + return new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final UpdateQueryGroupResponse response) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java index 7561a2f6f99c3..f9332ff3022dc 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java @@ -29,6 +29,9 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; import org.opensearch.plugin.wlm.action.DeleteQueryGroupRequest; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse; +import org.opensearch.wlm.MutableQueryGroupFragment; import org.opensearch.wlm.ResourceType; import java.util.Collection; @@ -37,6 +40,8 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.opensearch.cluster.metadata.QueryGroup.updateExistingQueryGroup; + /** * This class defines the functions for QueryGroup persistence */ @@ -44,6 +49,7 @@ public class QueryGroupPersistenceService { static final String SOURCE = "query-group-persistence-service"; private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group"; private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group"; + private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group"; private static final Logger logger = LogManager.getLogger(QueryGroupPersistenceService.class); /** * max QueryGroup count setting name @@ -72,6 +78,7 @@ public class QueryGroupPersistenceService { private volatile int maxQueryGroupCount; final ThrottlingKey createQueryGroupThrottlingKey; final ThrottlingKey deleteQueryGroupThrottlingKey; + final ThrottlingKey updateQueryGroupThrottlingKey; /** * Constructor for QueryGroupPersistenceService @@ -89,6 +96,7 @@ public QueryGroupPersistenceService( this.clusterService = clusterService; this.createQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_THROTTLING_KEY, true); this.deleteQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_THROTTLING_KEY, true); + this.updateQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true); setMaxQueryGroupCount(MAX_QUERY_GROUP_COUNT.get(settings)); clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); } @@ -169,39 +177,13 @@ ClusterState saveQueryGroupInClusterState(final QueryGroup queryGroup, final Clu } // check if there's any resource allocation that exceed limit of 1.0 - Map totalUsageMap = calculateTotalUsage(existingQueryGroups, queryGroup); - for (ResourceType resourceType : queryGroup.getResourceLimits().keySet()) { - if (totalUsageMap.get(resourceType) > 1) { - logger.warn("Total resource allocation for {} will go above the max limit of 1.0.", resourceType.getName()); - throw new IllegalArgumentException( - "Total resource allocation for " + resourceType.getName() + " will go above the max limit of 1.0." - ); - } - } + validateTotalUsage(existingQueryGroups, groupName, queryGroup.getResourceLimits()); return ClusterState.builder(currentClusterState) .metadata(Metadata.builder(currentClusterState.metadata()).put(queryGroup).build()) .build(); } - /** - * This method calculates the existing total usage of the all the resource limits - * @param existingQueryGroups - existing QueryGroups in the system - * @param queryGroup - the QueryGroup we're creating or updating - */ - private Map calculateTotalUsage(Map existingQueryGroups, QueryGroup queryGroup) { - final Map map = new EnumMap<>(ResourceType.class); - map.putAll(queryGroup.getResourceLimits()); - for (QueryGroup currGroup : existingQueryGroups.values()) { - if (!currGroup.getName().equals(queryGroup.getName())) { - for (ResourceType resourceType : queryGroup.getResourceLimits().keySet()) { - map.compute(resourceType, (k, v) -> v + currGroup.getResourceLimits().get(resourceType)); - } - } - } - return map; - } - /** * Get the QueryGroups with the specified name from cluster state * @param name - the QueryGroup name we are getting @@ -264,10 +246,113 @@ ClusterState deleteQueryGroupInClusterState(final String name, final ClusterStat return ClusterState.builder(currentClusterState).metadata(Metadata.builder(metadata).remove(queryGroupToRemove).build()).build(); } + /** + * Modify cluster state to update the QueryGroup + * @param toUpdateGroup {@link QueryGroup} - the QueryGroup that we want to update + * @param listener - ActionListener for UpdateQueryGroupResponse + */ + public void updateInClusterStateMetadata(UpdateQueryGroupRequest toUpdateGroup, ActionListener listener) { + clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.NORMAL) { + @Override + public ClusterState execute(ClusterState currentState) { + return updateQueryGroupInClusterState(toUpdateGroup, currentState); + } + + @Override + public ThrottlingKey getClusterManagerThrottlingKey() { + return updateQueryGroupThrottlingKey; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("Failed to update QueryGroup due to error: {}, for source: {}", e.getMessage(), source); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + String name = toUpdateGroup.getName(); + Optional findUpdatedGroup = newState.metadata() + .queryGroups() + .values() + .stream() + .filter(group -> group.getName().equals(name)) + .findFirst(); + assert findUpdatedGroup.isPresent(); + QueryGroup updatedGroup = findUpdatedGroup.get(); + UpdateQueryGroupResponse response = new UpdateQueryGroupResponse(updatedGroup, RestStatus.OK); + listener.onResponse(response); + } + }); + } + + /** + * Modify cluster state to update the existing QueryGroup + * @param updateQueryGroupRequest {@link QueryGroup} - the QueryGroup that we want to update + * @param currentState - current cluster state + */ + ClusterState updateQueryGroupInClusterState(UpdateQueryGroupRequest updateQueryGroupRequest, ClusterState currentState) { + final Metadata metadata = currentState.metadata(); + final Map existingGroups = currentState.metadata().queryGroups(); + String name = updateQueryGroupRequest.getName(); + MutableQueryGroupFragment mutableQueryGroupFragment = updateQueryGroupRequest.getmMutableQueryGroupFragment(); + + final QueryGroup existingGroup = existingGroups.values() + .stream() + .filter(group -> group.getName().equals(name)) + .findFirst() + .orElseThrow(() -> new ResourceNotFoundException("No QueryGroup exists with the provided name: " + name)); + + validateTotalUsage(existingGroups, name, mutableQueryGroupFragment.getResourceLimits()); + return ClusterState.builder(currentState) + .metadata( + Metadata.builder(metadata) + .remove(existingGroup) + .put(updateExistingQueryGroup(existingGroup, mutableQueryGroupFragment)) + .build() + ) + .build(); + } + + /** + * This method checks if there's any resource allocation that exceed limit of 1.0 + * @param existingQueryGroups - existing QueryGroups in the system + * @param resourceLimits - the QueryGroup we're creating or updating + */ + private void validateTotalUsage(Map existingQueryGroups, String name, Map resourceLimits) { + if (resourceLimits == null || resourceLimits.isEmpty()) { + return; + } + final Map totalUsage = new EnumMap<>(ResourceType.class); + totalUsage.putAll(resourceLimits); + for (QueryGroup currGroup : existingQueryGroups.values()) { + if (!currGroup.getName().equals(name)) { + for (ResourceType resourceType : resourceLimits.keySet()) { + totalUsage.compute(resourceType, (k, v) -> v + currGroup.getResourceLimits().getOrDefault(resourceType, 0.0)); + } + } + } + totalUsage.forEach((resourceType, total) -> { + if (total > 1.0) { + logger.warn("Total resource allocation for {} will go above the max limit of 1.0.", resourceType.getName()); + throw new IllegalArgumentException( + "Total resource allocation for " + resourceType.getName() + " will go above the max limit of 1.0." + ); + } + }); + } + /** * maxQueryGroupCount getter */ public int getMaxQueryGroupCount() { return maxQueryGroupCount; } + + /** + * clusterService getter + */ + public ClusterService getClusterService() { + return clusterService; + } } diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java index e165645775d5c..c6eb3140e943d 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java @@ -21,6 +21,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.MutableQueryGroupFragment; +import org.opensearch.wlm.ResourceType; import java.util.ArrayList; import java.util.Collection; @@ -31,7 +33,6 @@ import java.util.Set; import static org.opensearch.cluster.metadata.QueryGroup.builder; -import static org.opensearch.wlm.ResourceType.fromName; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -43,21 +44,21 @@ public class QueryGroupTestUtils { public static final String _ID_ONE = "AgfUO5Ja9yfsYlONlYi3TQ=="; public static final String _ID_TWO = "G5iIqHy4g7eK1qIAAAAIH53=1"; public static final String NAME_NONE_EXISTED = "query_group_none_existed"; - public static final String MEMORY_STRING = "memory"; - public static final String MONITOR_STRING = "monitor"; public static final long TIMESTAMP_ONE = 4513232413L; public static final long TIMESTAMP_TWO = 4513232415L; public static final QueryGroup queryGroupOne = builder().name(NAME_ONE) ._id(_ID_ONE) - .mode(MONITOR_STRING) - .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.3)) + .mutableQueryGroupFragment( + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3)) + ) .updatedAt(TIMESTAMP_ONE) .build(); public static final QueryGroup queryGroupTwo = builder().name(NAME_TWO) ._id(_ID_TWO) - .mode(MONITOR_STRING) - .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.6)) + .mutableQueryGroupFragment( + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.6)) + ) .updatedAt(TIMESTAMP_TWO) .build(); @@ -131,14 +132,35 @@ public static Tuple preparePersisten return new Tuple(queryGroupPersistenceService, clusterState); } - public static void assertEqualQueryGroups(Collection collectionOne, Collection collectionTwo) { + public static void assertEqualResourceLimits( + Map resourceLimitMapOne, + Map resourceLimitMapTwo + ) { + assertTrue(resourceLimitMapOne.keySet().containsAll(resourceLimitMapTwo.keySet())); + assertTrue(resourceLimitMapOne.values().containsAll(resourceLimitMapTwo.values())); + } + + public static void assertEqualQueryGroups( + Collection collectionOne, + Collection collectionTwo, + boolean assertUpdateAt + ) { assertEquals(collectionOne.size(), collectionTwo.size()); List listOne = new ArrayList<>(collectionOne); List listTwo = new ArrayList<>(collectionTwo); listOne.sort(Comparator.comparing(QueryGroup::getName)); listTwo.sort(Comparator.comparing(QueryGroup::getName)); for (int i = 0; i < listOne.size(); i++) { - assertTrue(listOne.get(i).equals(listTwo.get(i))); + if (assertUpdateAt) { + QueryGroup one = listOne.get(i); + QueryGroup two = listTwo.get(i); + assertEquals(one.getName(), two.getName()); + assertEquals(one.getResourceLimits(), two.getResourceLimits()); + assertEquals(one.getResiliencyMode(), two.getResiliencyMode()); + assertEquals(one.get_id(), two.get_id()); + } else { + assertEquals(listOne.get(i), listTwo.get(i)); + } } } } diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java index b0fa96a46df80..dd9de4bf8fb1a 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java @@ -35,6 +35,6 @@ public void testSerialization() throws IOException { List list2 = new ArrayList<>(); list1.add(queryGroupOne); list2.add(otherRequest.getQueryGroup()); - assertEqualQueryGroups(list1, list2); + assertEqualQueryGroups(list1, list2, false); } } diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java index ecb9a6b2dc0d2..3a2ce215d21b5 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java @@ -42,7 +42,7 @@ public void testSerialization() throws IOException { List listTwo = new ArrayList<>(); listOne.add(responseGroup); listTwo.add(otherResponseGroup); - QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo); + QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo, false); } /** @@ -56,10 +56,10 @@ public void testToXContentCreateQueryGroup() throws IOException { + " \"_id\" : \"AgfUO5Ja9yfsYlONlYi3TQ==\",\n" + " \"name\" : \"query_group_one\",\n" + " \"resiliency_mode\" : \"monitor\",\n" - + " \"updated_at\" : 4513232413,\n" + " \"resource_limits\" : {\n" + " \"memory\" : 0.3\n" - + " }\n" + + " },\n" + + " \"updated_at\" : 4513232413\n" + "}"; assertEquals(expected, actual); } diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java index 774f4b2d8db52..1a2ac282d86a4 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java @@ -41,7 +41,7 @@ public void testSerializationSingleQueryGroup() throws IOException { GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput); assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); - QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups()); + QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups(), false); } /** @@ -58,7 +58,7 @@ public void testSerializationMultipleQueryGroup() throws IOException { GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput); assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); assertEquals(2, otherResponse.getQueryGroups().size()); - QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups()); + QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups(), false); } /** @@ -93,10 +93,10 @@ public void testToXContentGetSingleQueryGroup() throws IOException { + " \"_id\" : \"AgfUO5Ja9yfsYlONlYi3TQ==\",\n" + " \"name\" : \"query_group_one\",\n" + " \"resiliency_mode\" : \"monitor\",\n" - + " \"updated_at\" : 4513232413,\n" + " \"resource_limits\" : {\n" + " \"memory\" : 0.3\n" - + " }\n" + + " },\n" + + " \"updated_at\" : 4513232413\n" + " }\n" + " ]\n" + "}"; @@ -119,19 +119,19 @@ public void testToXContentGetMultipleQueryGroup() throws IOException { + " \"_id\" : \"AgfUO5Ja9yfsYlONlYi3TQ==\",\n" + " \"name\" : \"query_group_one\",\n" + " \"resiliency_mode\" : \"monitor\",\n" - + " \"updated_at\" : 4513232413,\n" + " \"resource_limits\" : {\n" + " \"memory\" : 0.3\n" - + " }\n" + + " },\n" + + " \"updated_at\" : 4513232413\n" + " },\n" + " {\n" + " \"_id\" : \"G5iIqHy4g7eK1qIAAAAIH53=1\",\n" + " \"name\" : \"query_group_two\",\n" + " \"resiliency_mode\" : \"monitor\",\n" - + " \"updated_at\" : 4513232415,\n" + " \"resource_limits\" : {\n" + " \"memory\" : 0.6\n" - + " }\n" + + " },\n" + + " \"updated_at\" : 4513232415\n" + " }\n" + " ]\n" + "}"; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupActionTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupActionTestUtils.java new file mode 100644 index 0000000000000..08d128ca7ed59 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupActionTestUtils.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.wlm.MutableQueryGroupFragment; + +public class QueryGroupActionTestUtils { + public static UpdateQueryGroupRequest updateQueryGroupRequest(String name, MutableQueryGroupFragment mutableQueryGroupFragment) { + return new UpdateQueryGroupRequest(name, mutableQueryGroupFragment); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java new file mode 100644 index 0000000000000..b99f079e81984 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.wlm.MutableQueryGroupFragment; +import org.opensearch.wlm.MutableQueryGroupFragment.ResiliencyMode; +import org.opensearch.wlm.ResourceType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_ONE; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne; + +public class UpdateQueryGroupRequestTests extends OpenSearchTestCase { + + /** + * Test case to verify the serialization and deserialization of UpdateQueryGroupRequest. + */ + public void testSerialization() throws IOException { + UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, queryGroupOne.getMutableQueryGroupFragment()); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + UpdateQueryGroupRequest otherRequest = new UpdateQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + assertEquals(request.getmMutableQueryGroupFragment(), otherRequest.getmMutableQueryGroupFragment()); + } + + /** + * Test case to verify the serialization and deserialization of UpdateQueryGroupRequest with only name field. + */ + public void testSerializationOnlyName() throws IOException { + UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, new MutableQueryGroupFragment(null, new HashMap<>())); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + UpdateQueryGroupRequest otherRequest = new UpdateQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + assertEquals(request.getmMutableQueryGroupFragment(), otherRequest.getmMutableQueryGroupFragment()); + } + + /** + * Test case to verify the serialization and deserialization of UpdateQueryGroupRequest with only resourceLimits field. + */ + public void testSerializationOnlyResourceLimit() throws IOException { + UpdateQueryGroupRequest request = new UpdateQueryGroupRequest( + NAME_ONE, + new MutableQueryGroupFragment(null, Map.of(ResourceType.MEMORY, 0.4)) + ); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + UpdateQueryGroupRequest otherRequest = new UpdateQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + assertEquals(request.getmMutableQueryGroupFragment(), otherRequest.getmMutableQueryGroupFragment()); + } + + /** + * Tests invalid ResourceType. + */ + public void testInvalidResourceLimitList() { + assertThrows( + IllegalArgumentException.class, + () -> new UpdateQueryGroupRequest( + NAME_ONE, + new MutableQueryGroupFragment( + ResiliencyMode.MONITOR, + Map.of(ResourceType.MEMORY, 0.3, ResourceType.fromName("random"), 0.4) + ) + ) + ); + } + + /** + * Tests invalid resiliencyMode. + */ + public void testInvalidEnforcement() { + assertThrows( + IllegalArgumentException.class, + () -> new UpdateQueryGroupRequest( + NAME_ONE, + new MutableQueryGroupFragment(ResiliencyMode.fromName("random"), Map.of(ResourceType.fromName("memory"), 0.3)) + ) + ); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java new file mode 100644 index 0000000000000..a7ab4c6a682ef --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.wlm.QueryGroupTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne; +import static org.mockito.Mockito.mock; + +public class UpdateQueryGroupResponseTests extends OpenSearchTestCase { + + /** + * Test case to verify the serialization and deserialization of UpdateQueryGroupResponse. + */ + public void testSerialization() throws IOException { + UpdateQueryGroupResponse response = new UpdateQueryGroupResponse(queryGroupOne, RestStatus.OK); + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + UpdateQueryGroupResponse otherResponse = new UpdateQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + QueryGroup responseGroup = response.getQueryGroup(); + QueryGroup otherResponseGroup = otherResponse.getQueryGroup(); + List list1 = new ArrayList<>(); + List list2 = new ArrayList<>(); + list1.add(responseGroup); + list2.add(otherResponseGroup); + QueryGroupTestUtils.assertEqualQueryGroups(list1, list2, false); + } + + /** + * Test case to verify the toXContent method of UpdateQueryGroupResponse. + */ + public void testToXContentUpdateSingleQueryGroup() throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + UpdateQueryGroupResponse otherResponse = new UpdateQueryGroupResponse(queryGroupOne, RestStatus.OK); + String actual = otherResponse.toXContent(builder, mock(ToXContent.Params.class)).toString(); + String expected = "{\n" + + " \"_id\" : \"AgfUO5Ja9yfsYlONlYi3TQ==\",\n" + + " \"name\" : \"query_group_one\",\n" + + " \"resiliency_mode\" : \"monitor\",\n" + + " \"resource_limits\" : {\n" + + " \"memory\" : 0.3\n" + + " },\n" + + " \"updated_at\" : 4513232413\n" + + "}"; + assertEquals(expected, actual); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java index 5cb3d8fc6d11f..08b51fd46cfcf 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java @@ -24,8 +24,12 @@ import org.opensearch.plugin.wlm.QueryGroupTestUtils; import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; import org.opensearch.plugin.wlm.action.DeleteQueryGroupRequest; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.MutableQueryGroupFragment; +import org.opensearch.wlm.MutableQueryGroupFragment.ResiliencyMode; import org.opensearch.wlm.ResourceType; import java.util.ArrayList; @@ -33,14 +37,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.mockito.ArgumentCaptor; import static org.opensearch.cluster.metadata.QueryGroup.builder; -import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MEMORY_STRING; -import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MONITOR_STRING; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_NONE_EXISTED; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_ONE; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_TWO; @@ -55,6 +58,7 @@ import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupPersistenceService; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupTwo; +import static org.opensearch.plugin.wlm.action.QueryGroupActionTestUtils.updateQueryGroupRequest; import static org.opensearch.plugin.wlm.service.QueryGroupPersistenceService.QUERY_GROUP_COUNT_SETTING_NAME; import static org.opensearch.plugin.wlm.service.QueryGroupPersistenceService.SOURCE; import static org.mockito.ArgumentMatchers.any; @@ -83,7 +87,7 @@ public void testCreateQueryGroup() { List listTwo = new ArrayList<>(); listOne.add(queryGroupOne); listTwo.add(updatedGroupsMap.get(_ID_ONE)); - assertEqualQueryGroups(listOne, listTwo); + assertEqualQueryGroups(listOne, listTwo, false); } /** @@ -99,7 +103,7 @@ public void testCreateAnotherQueryGroup() { assertEquals(2, updatedGroups.size()); assertTrue(updatedGroups.containsKey(_ID_TWO)); Collection values = updatedGroups.values(); - assertEqualQueryGroups(queryGroupList(), new ArrayList<>(values)); + assertEqualQueryGroups(queryGroupList(), new ArrayList<>(values), false); } /** @@ -111,8 +115,7 @@ public void testCreateQueryGroupDuplicateName() { ClusterState clusterState = setup.v2(); QueryGroup toCreate = builder().name(NAME_ONE) ._id("W5iIqHyhgi4K1qIAAAAIHw==") - .mode(MONITOR_STRING) - .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.3)) + .mutableQueryGroupFragment(new MutableQueryGroupFragment(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3))) .updatedAt(1690934400000L) .build(); assertThrows(RuntimeException.class, () -> queryGroupPersistenceService1.saveQueryGroupInClusterState(toCreate, clusterState)); @@ -126,8 +129,7 @@ public void testCreateQueryGroupOverflowAllocation() { Tuple setup = preparePersistenceServiceSetup(Map.of(_ID_TWO, queryGroupTwo)); QueryGroup toCreate = builder().name(NAME_ONE) ._id("W5iIqHyhgi4K1qIAAAAIHw==") - .mode(MONITOR_STRING) - .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.41)) + .mutableQueryGroupFragment(new MutableQueryGroupFragment(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.41))) .updatedAt(1690934400000L) .build(); @@ -143,8 +145,7 @@ public void testCreateQueryGroupOverflowAllocation() { public void testCreateQueryGroupOverflowCount() { QueryGroup toCreate = builder().name(NAME_NONE_EXISTED) ._id("W5iIqHyhgi4K1qIAAAAIHw==") - .mode(MONITOR_STRING) - .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.5)) + .mutableQueryGroupFragment(new MutableQueryGroupFragment(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.5))) .updatedAt(1690934400000L) .build(); Metadata metadata = Metadata.builder().queryGroups(Map.of(_ID_ONE, queryGroupOne, _ID_TWO, queryGroupTwo)).build(); @@ -267,7 +268,7 @@ public void testGetSingleQueryGroup() { List listTwo = new ArrayList<>(); listOne.add(QueryGroupTestUtils.queryGroupOne); listTwo.add(queryGroup); - QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo); + QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo, false); } /** @@ -281,7 +282,7 @@ public void testGetAllQueryGroups() { Set currentNAME = res.stream().map(QueryGroup::getName).collect(Collectors.toSet()); assertTrue(currentNAME.contains(QueryGroupTestUtils.NAME_ONE)); assertTrue(currentNAME.contains(QueryGroupTestUtils.NAME_TWO)); - QueryGroupTestUtils.assertEqualQueryGroups(QueryGroupTestUtils.queryGroupList(), res); + QueryGroupTestUtils.assertEqualQueryGroups(QueryGroupTestUtils.queryGroupList(), res, false); } /** @@ -316,7 +317,7 @@ public void testDeleteSingleQueryGroup() { assertEquals(1, afterDeletionGroups.size()); List oldQueryGroups = new ArrayList<>(); oldQueryGroups.add(queryGroupOne); - assertEqualQueryGroups(new ArrayList<>(afterDeletionGroups.values()), oldQueryGroups); + assertEqualQueryGroups(new ArrayList<>(afterDeletionGroups.values()), oldQueryGroups, false); } /** @@ -356,4 +357,159 @@ public void testDeleteInClusterStateMetadata() throws Exception { queryGroupPersistenceService.deleteInClusterStateMetadata(request, listener); verify(clusterService).submitStateUpdateTask(eq(SOURCE), any(AckedClusterStateUpdateTask.class)); } + + /** + * Tests updating a QueryGroup with all fields + */ + public void testUpdateQueryGroupAllFields() { + QueryGroup updated = builder().name(NAME_ONE) + ._id(_ID_ONE) + .mutableQueryGroupFragment(new MutableQueryGroupFragment(ResiliencyMode.ENFORCED, Map.of(ResourceType.MEMORY, 0.15))) + .updatedAt(1690934400000L) + .build(); + UpdateQueryGroupRequest updateQueryGroupRequest = updateQueryGroupRequest(NAME_ONE, updated.getMutableQueryGroupFragment()); + ClusterState newClusterState = queryGroupPersistenceService().updateQueryGroupInClusterState( + updateQueryGroupRequest, + clusterState() + ); + List updatedQueryGroups = new ArrayList<>(newClusterState.getMetadata().queryGroups().values()); + assertEquals(2, updatedQueryGroups.size()); + List expectedList = new ArrayList<>(); + expectedList.add(queryGroupTwo); + expectedList.add(updated); + assertEqualQueryGroups(expectedList, updatedQueryGroups, true); + } + + /** + * Tests updating a QueryGroup with only updated resourceLimits + */ + public void testUpdateQueryGroupResourceLimitsOnly() { + QueryGroup updated = builder().name(NAME_ONE) + ._id(_ID_ONE) + .mutableQueryGroupFragment(new MutableQueryGroupFragment(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.15))) + .updatedAt(1690934400000L) + .build(); + UpdateQueryGroupRequest updateQueryGroupRequest = updateQueryGroupRequest(NAME_ONE, updated.getMutableQueryGroupFragment()); + ClusterState newClusterState = queryGroupPersistenceService().updateQueryGroupInClusterState( + updateQueryGroupRequest, + clusterState() + ); + List updatedQueryGroups = new ArrayList<>(newClusterState.getMetadata().queryGroups().values()); + assertEquals(2, updatedQueryGroups.size()); + Optional findUpdatedGroupOne = newClusterState.metadata() + .queryGroups() + .values() + .stream() + .filter(group -> group.getName().equals(NAME_ONE)) + .findFirst(); + Optional findUpdatedGroupTwo = newClusterState.metadata() + .queryGroups() + .values() + .stream() + .filter(group -> group.getName().equals(NAME_TWO)) + .findFirst(); + assertTrue(findUpdatedGroupOne.isPresent()); + assertTrue(findUpdatedGroupTwo.isPresent()); + List list1 = new ArrayList<>(); + list1.add(updated); + List list2 = new ArrayList<>(); + list2.add(findUpdatedGroupOne.get()); + assertEqualQueryGroups(list1, list2, true); + } + + /** + * Tests updating a QueryGroup with invalid name + */ + public void testUpdateQueryGroupNonExistedName() { + QueryGroupPersistenceService queryGroupPersistenceService = queryGroupPersistenceService(); + UpdateQueryGroupRequest updateQueryGroupRequest = updateQueryGroupRequest( + NAME_NONE_EXISTED, + new MutableQueryGroupFragment(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.15)) + ); + assertThrows( + RuntimeException.class, + () -> queryGroupPersistenceService.updateQueryGroupInClusterState(updateQueryGroupRequest, clusterState()) + ); + List updatedQueryGroups = new ArrayList<>( + queryGroupPersistenceService.getClusterService().state().metadata().queryGroups().values() + ); + assertEquals(2, updatedQueryGroups.size()); + List expectedList = new ArrayList<>(); + expectedList.add(queryGroupTwo); + expectedList.add(queryGroupOne); + assertEqualQueryGroups(expectedList, updatedQueryGroups, true); + } + + /** + * Tests UpdateInClusterStateMetadata function + */ + public void testUpdateInClusterStateMetadata() { + ClusterService clusterService = mock(ClusterService.class); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + QueryGroupTestUtils.settings(), + clusterSettings() + ); + queryGroupPersistenceService.updateInClusterStateMetadata(null, listener); + verify(clusterService).submitStateUpdateTask(eq(SOURCE), any()); + } + + /** + * Tests UpdateInClusterStateMetadata function with inner functions + */ + public void testUpdateInClusterStateMetadataInner() { + ClusterService clusterService = mock(ClusterService.class); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + QueryGroupTestUtils.settings(), + clusterSettings() + ); + UpdateQueryGroupRequest updateQueryGroupRequest = updateQueryGroupRequest( + NAME_TWO, + new MutableQueryGroupFragment(ResiliencyMode.SOFT, new HashMap<>()) + ); + ArgumentCaptor captor = ArgumentCaptor.forClass(ClusterStateUpdateTask.class); + queryGroupPersistenceService.updateInClusterStateMetadata(updateQueryGroupRequest, listener); + verify(clusterService, times(1)).submitStateUpdateTask(eq(SOURCE), captor.capture()); + ClusterStateUpdateTask capturedTask = captor.getValue(); + assertEquals(queryGroupPersistenceService.updateQueryGroupThrottlingKey, capturedTask.getClusterManagerThrottlingKey()); + + doAnswer(invocation -> { + ClusterStateUpdateTask task = invocation.getArgument(1); + task.clusterStateProcessed(SOURCE, clusterState(), clusterState()); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any()); + queryGroupPersistenceService.updateInClusterStateMetadata(updateQueryGroupRequest, listener); + verify(listener).onResponse(any(UpdateQueryGroupResponse.class)); + } + + /** + * Tests UpdateInClusterStateMetadata function with failure + */ + public void testUpdateInClusterStateMetadataFailure() { + ClusterService clusterService = mock(ClusterService.class); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + QueryGroupTestUtils.settings(), + clusterSettings() + ); + UpdateQueryGroupRequest updateQueryGroupRequest = updateQueryGroupRequest( + NAME_TWO, + new MutableQueryGroupFragment(ResiliencyMode.SOFT, new HashMap<>()) + ); + doAnswer(invocation -> { + ClusterStateUpdateTask task = invocation.getArgument(1); + Exception exception = new RuntimeException("Test Exception"); + task.onFailure(SOURCE, exception); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any()); + queryGroupPersistenceService.updateInClusterStateMetadata(updateQueryGroupRequest, listener); + verify(listener).onFailure(any(RuntimeException.class)); + } } diff --git a/plugins/workload-management/src/yamlRestTest/resources/rest-api-spec/api/update_query_group_context.json b/plugins/workload-management/src/yamlRestTest/resources/rest-api-spec/api/update_query_group_context.json new file mode 100644 index 0000000000000..fbfa2dde292ee --- /dev/null +++ b/plugins/workload-management/src/yamlRestTest/resources/rest-api-spec/api/update_query_group_context.json @@ -0,0 +1,23 @@ +{ + "update_query_group_context": { + "stability": "experimental", + "url": { + "paths": [ + { + "path": "/_wlm/query_group/{name}", + "methods": ["PUT", "POST"], + "parts": { + "name": { + "type": "string", + "description": "QueryGroup name" + } + } + } + ] + }, + "params":{}, + "body":{ + "description":"The updated QueryGroup schema" + } + } +} diff --git a/plugins/workload-management/src/yamlRestTest/resources/rest-api-spec/test/wlm/10_query_group.yml b/plugins/workload-management/src/yamlRestTest/resources/rest-api-spec/test/wlm/10_query_group.yml index a00314986a5cf..40ec665351094 100644 --- a/plugins/workload-management/src/yamlRestTest/resources/rest-api-spec/test/wlm/10_query_group.yml +++ b/plugins/workload-management/src/yamlRestTest/resources/rest-api-spec/test/wlm/10_query_group.yml @@ -29,6 +29,48 @@ - match: { query_groups.0.resource_limits.cpu: 0.4 } - match: { query_groups.0.resource_limits.memory: 0.2 } + - do: + update_query_group_context: + name: "analytics" + body: + { + "resiliency_mode": "monitor", + "resource_limits": { + "cpu": 0.42, + "memory": 0.22 + } + } + + - match: { name: "analytics" } + - match: { resiliency_mode: "monitor" } + - match: { resource_limits.cpu: 0.42 } + - match: { resource_limits.memory: 0.22 } + + - do: + catch: /resource_not_found_exception/ + update_query_group_context: + name: "analytics5" + body: + { + "resiliency_mode": "monitor", + "resource_limits": { + "cpu": 0.42, + "memory": 0.22 + } + } + + - do: + catch: /illegal_argument_exception/ + update_query_group_context: + name: "analytics" + body: + { + "resiliency_mode": "monitor", + "resource_limits": { + "cpu": 1.1 + } + } + - do: catch: /illegal_argument_exception/ create_query_group_context: diff --git a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java index a971aa58940ba..dcd96dceb4bf1 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java @@ -17,6 +17,8 @@ import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.wlm.MutableQueryGroupFragment; +import org.opensearch.wlm.MutableQueryGroupFragment.ResiliencyMode; import org.opensearch.wlm.ResourceType; import org.joda.time.Instant; @@ -24,13 +26,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** * Class to define the QueryGroup schema * { * "_id": "fafjafjkaf9ag8a9ga9g7ag0aagaga", * "resource_limits": { - * "memory": 0.4 + * "memory": 0.4, + * "cpu": 0.2 * }, * "resiliency_mode": "enforced", * "name": "analytics", @@ -42,44 +46,39 @@ public class QueryGroup extends AbstractDiffable implements ToXConte public static final String _ID_STRING = "_id"; public static final String NAME_STRING = "name"; - public static final String RESILIENCY_MODE_STRING = "resiliency_mode"; public static final String UPDATED_AT_STRING = "updated_at"; - public static final String RESOURCE_LIMITS_STRING = "resource_limits"; private static final int MAX_CHARS_ALLOWED_IN_NAME = 50; private final String name; private final String _id; - private final ResiliencyMode resiliencyMode; // It is an epoch in millis private final long updatedAtInMillis; - private final Map resourceLimits; + private final MutableQueryGroupFragment mutableQueryGroupFragment; - public QueryGroup(String name, ResiliencyMode resiliencyMode, Map resourceLimits) { - this(name, UUIDs.randomBase64UUID(), resiliencyMode, resourceLimits, Instant.now().getMillis()); + public QueryGroup(String name, MutableQueryGroupFragment mutableQueryGroupFragment) { + this(name, UUIDs.randomBase64UUID(), mutableQueryGroupFragment, Instant.now().getMillis()); } - public QueryGroup(String name, String _id, ResiliencyMode resiliencyMode, Map resourceLimits, long updatedAt) { + public QueryGroup(String name, String _id, MutableQueryGroupFragment mutableQueryGroupFragment, long updatedAt) { Objects.requireNonNull(name, "QueryGroup.name can't be null"); - Objects.requireNonNull(resourceLimits, "QueryGroup.resourceLimits can't be null"); - Objects.requireNonNull(resiliencyMode, "QueryGroup.resiliencyMode can't be null"); + Objects.requireNonNull(mutableQueryGroupFragment.getResourceLimits(), "QueryGroup.resourceLimits can't be null"); + Objects.requireNonNull(mutableQueryGroupFragment.getResiliencyMode(), "QueryGroup.resiliencyMode can't be null"); Objects.requireNonNull(_id, "QueryGroup._id can't be null"); validateName(name); - if (resourceLimits.isEmpty()) { + if (mutableQueryGroupFragment.getResourceLimits().isEmpty()) { throw new IllegalArgumentException("QueryGroup.resourceLimits should at least have 1 resource limit"); } - validateResourceLimits(resourceLimits); if (!isValid(updatedAt)) { throw new IllegalArgumentException("QueryGroup.updatedAtInMillis is not a valid epoch"); } this.name = name; this._id = _id; - this.resiliencyMode = resiliencyMode; - this.resourceLimits = resourceLimits; + this.mutableQueryGroupFragment = mutableQueryGroupFragment; this.updatedAtInMillis = updatedAt; } - private static boolean isValid(long updatedAt) { + public static boolean isValid(long updatedAt) { long minValidTimestamp = Instant.ofEpochMilli(0L).getMillis(); // Use Instant.now() to get the current time in seconds since epoch @@ -90,12 +89,22 @@ private static boolean isValid(long updatedAt) { } public QueryGroup(StreamInput in) throws IOException { - this( - in.readString(), - in.readString(), - ResiliencyMode.fromName(in.readString()), - in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readDouble), - in.readLong() + this(in.readString(), in.readString(), new MutableQueryGroupFragment(in), in.readLong()); + } + + public static QueryGroup updateExistingQueryGroup(QueryGroup existingGroup, MutableQueryGroupFragment mutableQueryGroupFragment) { + final Map updatedResourceLimits = new HashMap<>(existingGroup.getResourceLimits()); + final Map mutableFragmentResourceLimits = mutableQueryGroupFragment.getResourceLimits(); + if (mutableFragmentResourceLimits != null && !mutableFragmentResourceLimits.isEmpty()) { + updatedResourceLimits.putAll(mutableFragmentResourceLimits); + } + final ResiliencyMode mode = Optional.ofNullable(mutableQueryGroupFragment.getResiliencyMode()) + .orElse(existingGroup.getResiliencyMode()); + return new QueryGroup( + existingGroup.getName(), + existingGroup.get_id(), + new MutableQueryGroupFragment(mode, updatedResourceLimits), + Instant.now().getMillis() ); } @@ -103,8 +112,7 @@ public QueryGroup(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeString(_id); - out.writeString(resiliencyMode.getName()); - out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeDouble); + mutableQueryGroupFragment.writeTo(out); out.writeLong(updatedAtInMillis); } @@ -114,34 +122,15 @@ public static void validateName(String name) { } } - private void validateResourceLimits(Map resourceLimits) { - for (Map.Entry resource : resourceLimits.entrySet()) { - Double threshold = resource.getValue(); - Objects.requireNonNull(resource.getKey(), "resourceName can't be null"); - Objects.requireNonNull(threshold, "resource limit threshold for" + resource.getKey().getName() + " : can't be null"); - - if (Double.compare(threshold, 0.0) <= 0 || Double.compare(threshold, 1.0) > 0) { - throw new IllegalArgumentException("resource value should be greater than 0 and less or equal to 1.0"); - } - } - } - @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); builder.field(_ID_STRING, _id); builder.field(NAME_STRING, name); - builder.field(RESILIENCY_MODE_STRING, resiliencyMode.getName()); - builder.field(UPDATED_AT_STRING, updatedAtInMillis); - // write resource limits - builder.startObject(RESOURCE_LIMITS_STRING); - for (ResourceType resourceType : ResourceType.values()) { - if (resourceLimits.containsKey(resourceType)) { - builder.field(resourceType.getName(), resourceLimits.get(resourceType)); - } + for (String fieldName : MutableQueryGroupFragment.acceptedFieldNames) { + mutableQueryGroupFragment.writeField(builder, fieldName); } - builder.endObject(); - + builder.field(UPDATED_AT_STRING, updatedAtInMillis); builder.endObject(); return builder; } @@ -160,27 +149,30 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; QueryGroup that = (QueryGroup) o; return Objects.equals(name, that.name) - && Objects.equals(resiliencyMode, that.resiliencyMode) - && Objects.equals(resourceLimits, that.resourceLimits) + && Objects.equals(mutableQueryGroupFragment, that.mutableQueryGroupFragment) && Objects.equals(_id, that._id) && updatedAtInMillis == that.updatedAtInMillis; } @Override public int hashCode() { - return Objects.hash(name, resourceLimits, updatedAtInMillis, _id); + return Objects.hash(name, mutableQueryGroupFragment, updatedAtInMillis, _id); } public String getName() { return name; } + public MutableQueryGroupFragment getMutableQueryGroupFragment() { + return mutableQueryGroupFragment; + } + public ResiliencyMode getResiliencyMode() { - return resiliencyMode; + return getMutableQueryGroupFragment().getResiliencyMode(); } public Map getResourceLimits() { - return resourceLimits; + return getMutableQueryGroupFragment().getResourceLimits(); } public String get_id() { @@ -199,37 +191,6 @@ public static Builder builder() { return new Builder(); } - /** - * This enum models the different QueryGroup resiliency modes - * SOFT - means that this query group can consume more than query group resource limits if node is not in duress - * ENFORCED - means that it will never breach the assigned limits and will cancel as soon as the limits are breached - * MONITOR - it will not cause any cancellation but just log the eligible task cancellations - */ - @ExperimentalApi - public enum ResiliencyMode { - SOFT("soft"), - ENFORCED("enforced"), - MONITOR("monitor"); - - private final String name; - - ResiliencyMode(String mode) { - this.name = mode; - } - - public String getName() { - return name; - } - - public static ResiliencyMode fromName(String s) { - for (ResiliencyMode mode : values()) { - if (mode.getName().equalsIgnoreCase(s)) return mode; - - } - throw new IllegalArgumentException("Invalid value for QueryGroupMode: " + s); - } - } - /** * Builder class for {@link QueryGroup} */ @@ -237,9 +198,8 @@ public static ResiliencyMode fromName(String s) { public static class Builder { private String name; private String _id; - private ResiliencyMode resiliencyMode; + private MutableQueryGroupFragment mutableQueryGroupFragment; private long updatedAt; - private Map resourceLimits; private Builder() {} @@ -257,8 +217,7 @@ public static Builder fromXContent(XContentParser parser) throws IOException { } String fieldName = ""; - // Map to hold resources - final Map resourceLimits = new HashMap<>(); + MutableQueryGroupFragment mutableQueryGroupFragment1 = new MutableQueryGroupFragment(); while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { fieldName = parser.currentName(); @@ -267,32 +226,21 @@ public static Builder fromXContent(XContentParser parser) throws IOException { builder._id(parser.text()); } else if (fieldName.equals(NAME_STRING)) { builder.name(parser.text()); - } else if (fieldName.equals(RESILIENCY_MODE_STRING)) { - builder.mode(parser.text()); + } else if (MutableQueryGroupFragment.shouldParse(fieldName)) { + mutableQueryGroupFragment1.parseField(parser, fieldName); } else if (fieldName.equals(UPDATED_AT_STRING)) { builder.updatedAt(parser.longValue()); } else { throw new IllegalArgumentException(fieldName + " is not a valid field in QueryGroup"); } } else if (token == XContentParser.Token.START_OBJECT) { - - if (!fieldName.equals(RESOURCE_LIMITS_STRING)) { - throw new IllegalArgumentException( - "QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token - ); - } - - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - fieldName = parser.currentName(); - } else { - resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue()); - } + if (!MutableQueryGroupFragment.shouldParse(fieldName)) { + throw new IllegalArgumentException(fieldName + " is not a valid object in QueryGroup"); } - + mutableQueryGroupFragment1.parseField(parser, fieldName); } } - return builder.resourceLimits(resourceLimits); + return builder.mutableQueryGroupFragment(mutableQueryGroupFragment1); } public Builder name(String name) { @@ -305,8 +253,8 @@ public Builder _id(String _id) { return this; } - public Builder mode(String mode) { - this.resiliencyMode = ResiliencyMode.fromName(mode); + public Builder mutableQueryGroupFragment(MutableQueryGroupFragment mutableQueryGroupFragment) { + this.mutableQueryGroupFragment = mutableQueryGroupFragment; return this; } @@ -315,13 +263,12 @@ public Builder updatedAt(long updatedAt) { return this; } - public Builder resourceLimits(Map resourceLimits) { - this.resourceLimits = resourceLimits; - return this; + public QueryGroup build() { + return new QueryGroup(name, _id, mutableQueryGroupFragment, updatedAt); } - public QueryGroup build() { - return new QueryGroup(name, _id, resiliencyMode, resourceLimits, updatedAt); + public MutableQueryGroupFragment getMutableQueryGroupFragment() { + return mutableQueryGroupFragment; } } } diff --git a/server/src/main/java/org/opensearch/wlm/MutableQueryGroupFragment.java b/server/src/main/java/org/opensearch/wlm/MutableQueryGroupFragment.java new file mode 100644 index 0000000000000..8ea240132fea2 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/MutableQueryGroupFragment.java @@ -0,0 +1,225 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.cluster.AbstractDiffable; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +/** + * Class to hold the fields that can be updated in a QueryGroup. + */ +@ExperimentalApi +public class MutableQueryGroupFragment extends AbstractDiffable { + + public static final String RESILIENCY_MODE_STRING = "resiliency_mode"; + public static final String RESOURCE_LIMITS_STRING = "resource_limits"; + private ResiliencyMode resiliencyMode; + private Map resourceLimits; + + public static final List acceptedFieldNames = List.of(RESILIENCY_MODE_STRING, RESOURCE_LIMITS_STRING); + + public MutableQueryGroupFragment() {} + + public MutableQueryGroupFragment(ResiliencyMode resiliencyMode, Map resourceLimits) { + validateResourceLimits(resourceLimits); + this.resiliencyMode = resiliencyMode; + this.resourceLimits = resourceLimits; + } + + public MutableQueryGroupFragment(StreamInput in) throws IOException { + if (in.readBoolean()) { + resourceLimits = in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readDouble); + } else { + resourceLimits = new HashMap<>(); + } + String updatedResiliencyMode = in.readOptionalString(); + resiliencyMode = updatedResiliencyMode == null ? null : ResiliencyMode.fromName(updatedResiliencyMode); + } + + interface FieldParser { + T parseField(XContentParser parser) throws IOException; + } + + static class ResiliencyModeParser implements FieldParser { + public ResiliencyMode parseField(XContentParser parser) throws IOException { + return ResiliencyMode.fromName(parser.text()); + } + } + + static class ResourceLimitsParser implements FieldParser> { + public Map parseField(XContentParser parser) throws IOException { + String fieldName = ""; + XContentParser.Token token; + final Map resourceLimits = new HashMap<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else { + resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue()); + } + } + return resourceLimits; + } + } + + static class FieldParserFactory { + static Optional> fieldParserFor(String fieldName) { + if (fieldName.equals(RESOURCE_LIMITS_STRING)) { + return Optional.of(new ResourceLimitsParser()); + } else if (fieldName.equals(RESILIENCY_MODE_STRING)) { + return Optional.of(new ResiliencyModeParser()); + } + return Optional.empty(); + } + } + + private final Map> toXContentMap = Map.of(RESILIENCY_MODE_STRING, (builder) -> { + try { + builder.field(RESILIENCY_MODE_STRING, resiliencyMode.getName()); + return null; + } catch (IOException e) { + throw new IllegalStateException("writing error encountered for the field " + RESILIENCY_MODE_STRING); + } + }, RESOURCE_LIMITS_STRING, (builder) -> { + try { + builder.startObject(RESOURCE_LIMITS_STRING); + for (ResourceType resourceType : ResourceType.values()) { + if (resourceLimits.containsKey(resourceType)) { + builder.field(resourceType.getName(), resourceLimits.get(resourceType)); + } + } + builder.endObject(); + return null; + } catch (IOException e) { + throw new IllegalStateException("writing error encountered for the field " + RESOURCE_LIMITS_STRING); + } + }); + + public static boolean shouldParse(String field) { + return FieldParserFactory.fieldParserFor(field).isPresent(); + } + + public void parseField(XContentParser parser, String field) { + FieldParserFactory.fieldParserFor(field).ifPresent(fieldParser -> { + try { + Object value = fieldParser.parseField(parser); + if (field.equals(RESILIENCY_MODE_STRING)) { + setResiliencyMode((ResiliencyMode) value); + } else if (field.equals(RESOURCE_LIMITS_STRING)) { + setResourceLimits((Map) value); + } + } catch (IOException e) { + throw new IllegalArgumentException("parsing error encountered for the field " + field); + } + }); + } + + public void writeField(XContentBuilder builder, String field) { + toXContentMap.get(field).apply(builder); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (resourceLimits == null || resourceLimits.isEmpty()) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeDouble); + } + out.writeOptionalString(resiliencyMode == null ? null : resiliencyMode.getName()); + } + + public static void validateResourceLimits(Map resourceLimits) { + if (resourceLimits == null) { + return; + } + for (Map.Entry resource : resourceLimits.entrySet()) { + Double threshold = resource.getValue(); + Objects.requireNonNull(resource.getKey(), "resourceName can't be null"); + Objects.requireNonNull(threshold, "resource limit threshold for" + resource.getKey().getName() + " : can't be null"); + + if (Double.compare(threshold, 0.0) <= 0 || Double.compare(threshold, 1.0) > 0) { + throw new IllegalArgumentException("resource value should be greater than 0 and less or equal to 1.0"); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MutableQueryGroupFragment that = (MutableQueryGroupFragment) o; + return Objects.equals(resiliencyMode, that.resiliencyMode) && Objects.equals(resourceLimits, that.resourceLimits); + } + + @Override + public int hashCode() { + return Objects.hash(resiliencyMode, resourceLimits); + } + + public ResiliencyMode getResiliencyMode() { + return resiliencyMode; + } + + public Map getResourceLimits() { + return resourceLimits; + } + + /** + * This enum models the different QueryGroup resiliency modes + * SOFT - means that this query group can consume more than query group resource limits if node is not in duress + * ENFORCED - means that it will never breach the assigned limits and will cancel as soon as the limits are breached + * MONITOR - it will not cause any cancellation but just log the eligible task cancellations + */ + @ExperimentalApi + public enum ResiliencyMode { + SOFT("soft"), + ENFORCED("enforced"), + MONITOR("monitor"); + + private final String name; + + ResiliencyMode(String mode) { + this.name = mode; + } + + public String getName() { + return name; + } + + public static ResiliencyMode fromName(String s) { + for (ResiliencyMode mode : values()) { + if (mode.getName().equalsIgnoreCase(s)) return mode; + + } + throw new IllegalArgumentException("Invalid value for QueryGroupMode: " + s); + } + } + + public void setResiliencyMode(ResiliencyMode resiliencyMode) { + this.resiliencyMode = resiliencyMode; + } + + public void setResourceLimits(Map resourceLimits) { + validateResourceLimits(resourceLimits); + this.resourceLimits = resourceLimits; + } +} diff --git a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java index f5e667de73d93..3f8d231ffb91e 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java @@ -15,6 +15,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.AbstractDiffableSerializationTestCase; +import org.opensearch.wlm.MutableQueryGroupFragment; import org.opensearch.wlm.ResourceType; import java.io.IOException; @@ -33,8 +34,7 @@ public void testToXContent() throws IOException { new QueryGroup( "test", "ajakgakg983r92_4242", - QueryGroup.ResiliencyMode.ENFORCED, - Map.of(ResourceType.MEMORY, 0.5), + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.MEMORY, 0.5)), updatedAt ) ) @@ -44,7 +44,7 @@ public void testToXContent() throws IOException { queryGroupMetadata.toXContent(builder, null); builder.endObject(); assertEquals( - "{\"ajakgakg983r92_4242\":{\"_id\":\"ajakgakg983r92_4242\",\"name\":\"test\",\"resiliency_mode\":\"enforced\",\"updated_at\":1720047207,\"resource_limits\":{\"memory\":0.5}}}", + "{\"ajakgakg983r92_4242\":{\"_id\":\"ajakgakg983r92_4242\",\"name\":\"test\",\"resiliency_mode\":\"enforced\",\"resource_limits\":{\"memory\":0.5},\"updated_at\":1720047207}}", builder.toString() ); } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java index f4d3e5ceb1784..ce1b1270fc94e 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java @@ -15,6 +15,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.AbstractSerializingTestCase; +import org.opensearch.wlm.MutableQueryGroupFragment; +import org.opensearch.wlm.MutableQueryGroupFragment.ResiliencyMode; import org.opensearch.wlm.ResourceType; import org.joda.time.Instant; @@ -26,20 +28,16 @@ public class QueryGroupTests extends AbstractSerializingTestCase { - private static final List allowedModes = List.of( - QueryGroup.ResiliencyMode.SOFT, - QueryGroup.ResiliencyMode.ENFORCED, - QueryGroup.ResiliencyMode.MONITOR - ); + private static final List allowedModes = List.of(ResiliencyMode.SOFT, ResiliencyMode.ENFORCED, ResiliencyMode.MONITOR); static QueryGroup createRandomQueryGroup(String _id) { String name = randomAlphaOfLength(10); Map resourceLimit = new HashMap<>(); resourceLimit.put(ResourceType.MEMORY, randomDoubleBetween(0.0, 0.80, false)); - return new QueryGroup(name, _id, randomMode(), resourceLimit, Instant.now().getMillis()); + return new QueryGroup(name, _id, new MutableQueryGroupFragment(randomMode(), resourceLimit), Instant.now().getMillis()); } - private static QueryGroup.ResiliencyMode randomMode() { + private static ResiliencyMode randomMode() { return allowedModes.get(randomIntBetween(0, allowedModes.size() - 1)); } @@ -74,37 +72,60 @@ protected QueryGroup createTestInstance() { public void testNullName() { assertThrows( NullPointerException.class, - () -> new QueryGroup(null, "_id", randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + () -> new QueryGroup( + null, + "_id", + new MutableQueryGroupFragment(randomMode(), Collections.emptyMap()), + Instant.now().getMillis() + ) ); } public void testNullId() { assertThrows( NullPointerException.class, - () -> new QueryGroup("Dummy", null, randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + () -> new QueryGroup( + "Dummy", + null, + new MutableQueryGroupFragment(randomMode(), Collections.emptyMap()), + Instant.now().getMillis() + ) ); } public void testNullResourceLimits() { - assertThrows(NullPointerException.class, () -> new QueryGroup("analytics", "_id", randomMode(), null, Instant.now().getMillis())); + assertThrows( + NullPointerException.class, + () -> new QueryGroup("analytics", "_id", new MutableQueryGroupFragment(randomMode(), null), Instant.now().getMillis()) + ); } public void testEmptyResourceLimits() { assertThrows( IllegalArgumentException.class, - () -> new QueryGroup("analytics", "_id", randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + () -> new QueryGroup( + "analytics", + "_id", + new MutableQueryGroupFragment(randomMode(), Collections.emptyMap()), + Instant.now().getMillis() + ) ); } public void testIllegalQueryGroupMode() { assertThrows( NullPointerException.class, - () -> new QueryGroup("analytics", "_id", null, Map.of(ResourceType.MEMORY, 0.4), Instant.now().getMillis()) + () -> new QueryGroup( + "analytics", + "_id", + new MutableQueryGroupFragment(null, Map.of(ResourceType.MEMORY, 0.4)), + Instant.now().getMillis() + ) ); } public void testQueryGroupInitiation() { - QueryGroup queryGroup = new QueryGroup("analytics", randomMode(), Map.of(ResourceType.MEMORY, 0.4)); + QueryGroup queryGroup = new QueryGroup("analytics", new MutableQueryGroupFragment(randomMode(), Map.of(ResourceType.MEMORY, 0.4))); assertNotNull(queryGroup.getName()); assertNotNull(queryGroup.get_id()); assertNotNull(queryGroup.getResourceLimits()); @@ -117,11 +138,11 @@ public void testQueryGroupInitiation() { public void testIllegalQueryGroupName() { assertThrows( NullPointerException.class, - () -> new QueryGroup("a".repeat(51), "_id", null, Map.of(ResourceType.MEMORY, 0.4), Instant.now().getMillis()) + () -> new QueryGroup("a".repeat(51), "_id", new MutableQueryGroupFragment(), Instant.now().getMillis()) ); assertThrows( NullPointerException.class, - () -> new QueryGroup("", "_id", null, Map.of(ResourceType.MEMORY, 0.4), Instant.now().getMillis()) + () -> new QueryGroup("", "_id", new MutableQueryGroupFragment(), Instant.now().getMillis()) ); } @@ -132,8 +153,7 @@ public void testInvalidResourceLimitWhenInvalidSystemResourceValueIsGiven() { () -> new QueryGroup( "analytics", "_id", - randomMode(), - Map.of(ResourceType.MEMORY, randomDoubleBetween(1.1, 1.8, false)), + new MutableQueryGroupFragment(randomMode(), Map.of(ResourceType.MEMORY, randomDoubleBetween(1.1, 1.8, false))), Instant.now().getMillis() ) ); @@ -143,8 +163,7 @@ public void testValidQueryGroup() { QueryGroup queryGroup = new QueryGroup( "analytics", "_id", - randomMode(), - Map.of(ResourceType.MEMORY, randomDoubleBetween(0.01, 0.8, false)), + new MutableQueryGroupFragment(randomMode(), Map.of(ResourceType.MEMORY, randomDoubleBetween(0.01, 0.8, false))), Instant.ofEpochMilli(1717187289).getMillis() ); @@ -163,8 +182,7 @@ public void testToXContent() throws IOException { QueryGroup queryGroup = new QueryGroup( "TestQueryGroup", queryGroupId, - QueryGroup.ResiliencyMode.ENFORCED, - Map.of(ResourceType.CPU, 0.30, ResourceType.MEMORY, 0.40), + new MutableQueryGroupFragment(ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.30, ResourceType.MEMORY, 0.40)), currentTimeInMillis ); XContentBuilder builder = JsonXContent.contentBuilder(); @@ -172,9 +190,9 @@ public void testToXContent() throws IOException { assertEquals( "{\"_id\":\"" + queryGroupId - + "\",\"name\":\"TestQueryGroup\",\"resiliency_mode\":\"enforced\",\"updated_at\":" + + "\",\"name\":\"TestQueryGroup\",\"resiliency_mode\":\"enforced\",\"resource_limits\":{\"cpu\":0.3,\"memory\":0.4},\"updated_at\":" + currentTimeInMillis - + ",\"resource_limits\":{\"cpu\":0.3,\"memory\":0.4}}", + + "}", builder.toString() ); } diff --git a/server/src/test/java/org/opensearch/wlm/MutableQueryGroupFragmentTests.java b/server/src/test/java/org/opensearch/wlm/MutableQueryGroupFragmentTests.java new file mode 100644 index 0000000000000..cfe53ddbd2c14 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/MutableQueryGroupFragmentTests.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class MutableQueryGroupFragmentTests extends OpenSearchTestCase { + + public void testSerializationDeserialization() throws IOException { + Map resourceLimits = new HashMap<>(); + resourceLimits.put(ResourceType.CPU, 0.5); + resourceLimits.put(ResourceType.MEMORY, 0.75); + MutableQueryGroupFragment mutableQueryGroupFragment = new MutableQueryGroupFragment( + MutableQueryGroupFragment.ResiliencyMode.SOFT, + resourceLimits + ); + BytesStreamOutput out = new BytesStreamOutput(); + mutableQueryGroupFragment.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MutableQueryGroupFragment deserializedGroup = new MutableQueryGroupFragment(in); + assertEquals(mutableQueryGroupFragment, deserializedGroup); + } + + public void testSerializationDeserializationWithNull() throws IOException { + MutableQueryGroupFragment mutableQueryGroupFragment = new MutableQueryGroupFragment(); + BytesStreamOutput out = new BytesStreamOutput(); + mutableQueryGroupFragment.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MutableQueryGroupFragment deserializedGroup = new MutableQueryGroupFragment(in); + assertEquals(0, deserializedGroup.getResourceLimits().size()); + assertEquals(mutableQueryGroupFragment.getResiliencyMode(), deserializedGroup.getResiliencyMode()); + } + + public void testValidateResourceLimits() { + Map invalidLimits = new HashMap<>(); + invalidLimits.put(ResourceType.CPU, 1.5); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { + MutableQueryGroupFragment.validateResourceLimits(invalidLimits); + }); + String expectedMessage = "resource value should be greater than 0 and less or equal to 1.0"; + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + } + + public void testSetMethodsWithNullAndEmptyValues() { + MutableQueryGroupFragment queryGroup = new MutableQueryGroupFragment(); + queryGroup.setResiliencyMode(null); + assertNull(queryGroup.getResiliencyMode()); + queryGroup.setResourceLimits(null); + assertNull(queryGroup.getResourceLimits()); + queryGroup.setResourceLimits(new HashMap<>()); + assertEquals(0, queryGroup.getResourceLimits().size()); + } +}