From c8a9a491c091b349e7f7a65ddf6e2ffabab78b9c Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Mon, 19 Aug 2024 15:41:18 -0700 Subject: [PATCH] rebase Signed-off-by: Ruirui Zhang --- .../plugin/wlm/WorkloadManagementPlugin.java | 6 +- .../wlm/WorkloadManagementPluginModule.java | 7 +- .../TransportUpdateQueryGroupAction.java | 18 +-- .../{ => action}/UpdateQueryGroupAction.java | 4 +- .../{ => action}/UpdateQueryGroupRequest.java | 107 +++------------- .../UpdateQueryGroupResponse.java | 4 +- .../wlm/rest/RestGetQueryGroupAction.java | 2 +- .../wlm/rest/RestUpdateQueryGroupAction.java | 26 ++-- .../service/QueryGroupPersistenceService.java | 115 ++++++++---------- .../plugin/wlm/service/package-info.java | 4 - .../plugin/wlm/QueryGroupTestUtils.java | 5 +- .../UpdateQueryGroupRequestTests.java | 17 ++- .../UpdateQueryGroupResponseTests.java | 13 +- .../QueryGroupPersistenceServiceTests.java | 96 ++++++++++++++- .../cluster/metadata/QueryGroup.java | 20 ++- .../common/settings/ClusterSettings.java | 8 -- 16 files changed, 231 insertions(+), 221 deletions(-) rename plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/{ => action}/TransportUpdateQueryGroupAction.java (73%) rename plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/{ => action}/UpdateQueryGroupAction.java (92%) rename plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/{ => action}/UpdateQueryGroupRequest.java (55%) rename plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/{ => action}/UpdateQueryGroupResponse.java (96%) rename plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/{ => action}/UpdateQueryGroupRequestTests.java (89%) rename plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/{ => action}/UpdateQueryGroupResponseTests.java (86%) diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java index a83d5150abab5..06e58ec910bcd 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java @@ -11,6 +11,7 @@ import org.opensearch.action.ActionRequest; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.inject.Module; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; @@ -21,11 +22,12 @@ import org.opensearch.plugin.wlm.action.GetQueryGroupAction; import org.opensearch.plugin.wlm.action.TransportCreateQueryGroupAction; import org.opensearch.plugin.wlm.action.TransportGetQueryGroupAction; +import org.opensearch.plugin.wlm.action.TransportUpdateQueryGroupAction; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupAction; import org.opensearch.plugin.wlm.rest.RestCreateQueryGroupAction; import org.opensearch.plugin.wlm.rest.RestGetQueryGroupAction; -import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; -import org.opensearch.common.inject.Module; import org.opensearch.plugin.wlm.rest.RestUpdateQueryGroupAction; +import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.rest.RestController; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java index 58208af293f14..57665f7c0f9b1 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java @@ -8,10 +8,8 @@ package org.opensearch.plugin.wlm; -import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.inject.AbstractModule; -import org.opensearch.common.inject.TypeLiteral; -import org.opensearch.plugin.wlm.service.Persistable; +import org.opensearch.common.inject.Singleton; import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; /** @@ -26,7 +24,6 @@ public WorkloadManagementPluginModule() {} @Override protected void configure() { - bind(new TypeLiteral>() { - }).to(QueryGroupPersistenceService.class).asEagerSingleton(); + bind(QueryGroupPersistenceService.class).in(Singleton.class); } } diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportUpdateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java similarity index 73% rename from plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportUpdateQueryGroupAction.java rename to plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java index 0e560718ced62..84fec256559aa 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportUpdateQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportUpdateQueryGroupAction.java @@ -6,27 +6,26 @@ * compatible open source license. */ -package org.opensearch.plugin.wlm; +package org.opensearch.plugin.wlm.action; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; -import org.opensearch.plugin.wlm.service.Persistable; +import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; /** - * Transport action for update QueryGroup + * Transport action to update QueryGroup * - * @opensearch.internal + * @opensearch.experimental */ public class TransportUpdateQueryGroupAction extends HandledTransportAction { private final ThreadPool threadPool; - private final Persistable queryGroupPersistenceService; + private final QueryGroupPersistenceService queryGroupPersistenceService; /** * Constructor for TransportUpdateQueryGroupAction @@ -35,7 +34,7 @@ public class TransportUpdateQueryGroupAction extends HandledTransportAction queryGroupPersistenceService + QueryGroupPersistenceService queryGroupPersistenceService ) { super(UpdateQueryGroupAction.NAME, transportService, actionFilters, UpdateQueryGroupRequest::new); this.threadPool = threadPool; @@ -52,6 +51,7 @@ public TransportUpdateQueryGroupAction( @Override protected void doExecute(Task task, UpdateQueryGroupRequest request, ActionListener listener) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.update(request, listener)); + threadPool.executor(ThreadPool.Names.GENERIC) + .execute(() -> queryGroupPersistenceService.updateInClusterStateMetadata(request, listener)); } } diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupAction.java similarity index 92% rename from plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupAction.java rename to plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupAction.java index 5b3600c091944..ff472f206131c 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupAction.java @@ -6,14 +6,14 @@ * compatible open source license. */ -package org.opensearch.plugin.wlm; +package org.opensearch.plugin.wlm.action; import org.opensearch.action.ActionType; /** * Transport action to update QueryGroup * - * @opensearch.api + * @opensearch.experimental */ public class UpdateQueryGroupAction extends ActionType { diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java similarity index 55% rename from plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupRequest.java rename to plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java index b33d501d1daf0..04e0f62720d9a 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupRequest.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.plugin.wlm; +package org.opensearch.plugin.wlm.action; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; @@ -14,7 +14,6 @@ import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.ResourceType; import org.joda.time.Instant; @@ -26,18 +25,13 @@ /** * A request for update QueryGroup * - * @opensearch.internal + * @opensearch.experimental */ -public class UpdateQueryGroupRequest extends ActionRequest implements Writeable.Reader { - String name; - Map resourceLimits; - private ResiliencyMode resiliencyMode; - long updatedAtInMillis; - - /** - * Default constructor for UpdateQueryGroupRequest - */ - public UpdateQueryGroupRequest() {} +public class UpdateQueryGroupRequest extends ActionRequest { + private final String name; + private final Map resourceLimits; + private final ResiliencyMode resiliencyMode; + private final long updatedAtInMillis; /** * Constructor for UpdateQueryGroupRequest @@ -83,64 +77,29 @@ public UpdateQueryGroupRequest(StreamInput in) throws IOException { } if (in.readBoolean()) { resiliencyMode = ResiliencyMode.fromName(in.readString()); + } else { + resiliencyMode = null; } updatedAtInMillis = in.readLong(); } - @Override - public UpdateQueryGroupRequest read(StreamInput in) throws IOException { - return new UpdateQueryGroupRequest(in); - } - /** * Generate a UpdateQueryGroupRequest from XContent * @param parser - A {@link XContentParser} object * @param name - name of the QueryGroup to be updated */ public static UpdateQueryGroupRequest fromXContent(XContentParser parser, String name) throws IOException { - while (parser.currentToken() != XContentParser.Token.START_OBJECT) { - parser.nextToken(); - } - - if (parser.currentToken() != XContentParser.Token.START_OBJECT) { - throw new IllegalArgumentException("expected start object but got a " + parser.currentToken()); - } - - XContentParser.Token token; - String fieldName = ""; - ResiliencyMode mode = null; - - // Map to hold resources - final Map resourceLimits = new HashMap<>(); - while ((token = parser.nextToken()) != null) { - if (token == XContentParser.Token.FIELD_NAME) { - fieldName = parser.currentName(); - } else if (token.isValue()) { - if (fieldName.equals("resiliency_mode")) { - mode = ResiliencyMode.fromName(parser.text()); - } else { - throw new IllegalArgumentException("unrecognised [field=" + fieldName + " in QueryGroup"); - } - } else if (token == XContentParser.Token.START_OBJECT) { - if (!fieldName.equals("resourceLimits")) { - throw new IllegalArgumentException( - "QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token - ); - } - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - fieldName = parser.currentName(); - } else { - resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue()); - } - } - } - } - return new UpdateQueryGroupRequest(name, mode, resourceLimits, Instant.now().getMillis()); + QueryGroup.Builder builder = QueryGroup.Builder.fromXContent(parser); + return new UpdateQueryGroupRequest(name, builder.getResiliencyMode(), builder.getResourceLimits(), Instant.now().getMillis()); } @Override public ActionRequestValidationException validate() { + QueryGroup.validateName(name); + if (resourceLimits != null) { + QueryGroup.validateResourceLimits(resourceLimits); + } + assert QueryGroup.isValid(updatedAtInMillis); return null; } @@ -151,14 +110,6 @@ public String getName() { return name; } - /** - * name setter - * @param name - name to be set - */ - public void setName(String name) { - this.name = name; - } - /** * ResourceLimits getter */ @@ -166,14 +117,6 @@ public Map getResourceLimits() { return resourceLimits; } - /** - * ResourceLimits setter - * @param resourceLimits - ResourceLimit to be set - */ - public void setResourceLimits(Map resourceLimits) { - this.resourceLimits = resourceLimits; - } - /** * resiliencyMode getter */ @@ -181,14 +124,6 @@ public ResiliencyMode getResiliencyMode() { return resiliencyMode; } - /** - * resiliencyMode setter - * @param resiliencyMode - mode to be set - */ - public void setResiliencyMode(ResiliencyMode resiliencyMode) { - this.resiliencyMode = resiliencyMode; - } - /** * updatedAtInMillis getter */ @@ -196,14 +131,6 @@ public long getUpdatedAtInMillis() { return updatedAtInMillis; } - /** - * updatedAtInMillis setter - * @param updatedAtInMillis - updatedAtInMillis to be set - */ - public void setUpdatedAtInMillis(long updatedAtInMillis) { - this.updatedAtInMillis = updatedAtInMillis; - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -212,7 +139,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } else { out.writeBoolean(true); - out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeGenericValue); + out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeDouble); } if (resiliencyMode == null) { out.writeBoolean(false); diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupResponse.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponse.java similarity index 96% rename from plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupResponse.java rename to plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponse.java index 4ddf84a6e3c88..9071f52ecb5a7 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/UpdateQueryGroupResponse.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponse.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.plugin.wlm; +package org.opensearch.plugin.wlm.action; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.core.action.ActionResponse; @@ -22,7 +22,7 @@ /** * Response for the update API for QueryGroup * - * @opensearch.internal + * @opensearch.experimental */ public class UpdateQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject { private final QueryGroup queryGroup; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestGetQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestGetQueryGroupAction.java index c250bd2979e98..c87973e113138 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestGetQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestGetQueryGroupAction.java @@ -27,7 +27,7 @@ import static org.opensearch.rest.RestRequest.Method.GET; /** - * Rest action to get a QueryGroup0 + * Rest action to get a QueryGroup * * @opensearch.experimental */ diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateQueryGroupAction.java index de6162ceda225..55b4bc5a295c4 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateQueryGroupAction.java @@ -12,9 +12,9 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.plugin.wlm.UpdateQueryGroupAction; -import org.opensearch.plugin.wlm.UpdateQueryGroupRequest; -import org.opensearch.plugin.wlm.UpdateQueryGroupResponse; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupAction; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; @@ -31,7 +31,7 @@ /** * Rest action to update a QueryGroup * - * @opensearch.api + * @opensearch.experimental */ public class RestUpdateQueryGroupAction extends BaseRestHandler { @@ -50,23 +50,15 @@ public String getName() { */ @Override public List routes() { - return List.of(new Route(POST, "_query_group/{name}"), new Route(PUT, "_query_group/{name}")); + return List.of(new Route(POST, "_wlm/query_group/{name}"), new Route(PUT, "_wlm/query_group/{name}")); } @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - String name = request.param("name"); - UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest(); - request.applyContentParser((parser) -> parseRestRequest(updateQueryGroupRequest, parser, name)); - return channel -> client.execute(UpdateQueryGroupAction.INSTANCE, updateQueryGroupRequest, updateQueryGroupResponse(channel)); - } - - private void parseRestRequest(UpdateQueryGroupRequest request, XContentParser parser, String name) throws IOException { - final UpdateQueryGroupRequest updateQueryGroupRequest = UpdateQueryGroupRequest.fromXContent(parser, name); - request.setName(updateQueryGroupRequest.getName()); - request.setResourceLimits(updateQueryGroupRequest.getResourceLimits()); - request.setResiliencyMode(updateQueryGroupRequest.getResiliencyMode()); - request.setUpdatedAtInMillis(updateQueryGroupRequest.getUpdatedAtInMillis()); + try (XContentParser parser = request.contentParser()) { + UpdateQueryGroupRequest updateQueryGroupRequest = UpdateQueryGroupRequest.fromXContent(parser, request.param("name")); + return channel -> client.execute(UpdateQueryGroupAction.INSTANCE, updateQueryGroupRequest, updateQueryGroupResponse(channel)); + } } private RestResponseListener updateQueryGroupResponse(final RestChannel channel) { diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java index 5667fb3d74dd8..cc83b224e702b 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.ResourceNotFoundException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.metadata.Metadata; @@ -25,18 +26,16 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse; import org.opensearch.search.ResourceType; -import org.opensearch.plugin.wlm.UpdateQueryGroupRequest; -import org.opensearch.plugin.wlm.UpdateQueryGroupResponse; import java.util.Collection; import java.util.EnumMap; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import java.util.HashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.DoubleAdder; /** * This class defines the functions for QueryGroup persistence @@ -177,31 +176,6 @@ ClusterState saveQueryGroupInClusterState(final QueryGroup queryGroup, final Clu .build(); } - /** - * This method checks if there's any resource allocation that exceed limit of 1.0 - * @param existingQueryGroups - existing QueryGroups in the system - * @param resourceLimits - the QueryGroup we're creating or updating - */ - private void validateTotalUsage(Map existingQueryGroups, String name, Map resourceLimits) { - final Map map = new EnumMap<>(ResourceType.class); - map.putAll(resourceLimits); - for (QueryGroup currGroup : existingQueryGroups.values()) { - if (!currGroup.getName().equals(name)) { - for (ResourceType resourceType : resourceLimits.keySet()) { - map.compute(resourceType, (k, v) -> v + currGroup.getResourceLimits().get(resourceType)); - } - } - } - for (ResourceType resourceType : resourceLimits.keySet()) { - if (map.get(resourceType) > 1) { - logger.warn("Total resource allocation for {} will go above the max limit of 1.0.", resourceType.getName()); - throw new IllegalArgumentException( - "Total resource allocation for " + resourceType.getName() + " will go above the max limit of 1.0." - ); - } - } - } - /** * Get the QueryGroups with the specified name from cluster state * @param name - the QueryGroup name we are getting @@ -220,26 +194,12 @@ public static Collection getFromClusterStateMetadata(String name, Cl .collect(Collectors.toList()); } - /** - * Get the allocation value for resourceName for the QueryGroup - * @param resourceName - the resourceName we want to get the usage for - * @param resourceLimits - the resource limit from which to get the allocation value for resourceName - */ - private double getResourceLimitValue(String resourceName, final Map resourceLimits) { - for (ResourceType resourceType : resourceLimits.keySet()) { - if (resourceType.getName().equals(resourceName)) { - return (double) resourceLimits.get(resourceType); - } - } - return 0.0; - } - /** * Modify cluster state to update the QueryGroup * @param toUpdateGroup {@link QueryGroup} - the QueryGroup that we want to update */ - void updateInClusterStateMetadata(UpdateQueryGroupRequest toUpdateGroup, ActionListener listener) { - clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.URGENT) { + public void updateInClusterStateMetadata(UpdateQueryGroupRequest toUpdateGroup, ActionListener listener) { + clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.NORMAL) { @Override public ClusterState execute(ClusterState currentState) { return updateQueryGroupInClusterState(toUpdateGroup, currentState); @@ -278,44 +238,67 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * @param updateQueryGroupRequest {@link QueryGroup} - the QueryGroup that we want to update * @param currentState - current cluster state */ - public ClusterState updateQueryGroupInClusterState(UpdateQueryGroupRequest updateQueryGroupRequest, ClusterState currentState) { + ClusterState updateQueryGroupInClusterState(UpdateQueryGroupRequest updateQueryGroupRequest, ClusterState currentState) { final Metadata metadata = currentState.metadata(); - Map existingGroups = currentState.metadata().queryGroups(); + final Map existingGroups = currentState.metadata().queryGroups(); String name = updateQueryGroupRequest.getName(); - Optional findExistingGroup = existingGroups.values().stream().filter(group -> group.getName().equals(name)).findFirst(); - if (findExistingGroup.isEmpty()) { - logger.warn("No QueryGroup exists with the provided name: {}", name); - throw new RuntimeException("No QueryGroup exists with the provided name: " + name); - } + final QueryGroup existingGroup = existingGroups.values() + .stream() + .filter(group -> group.getName().equals(name)) + .findFirst() + .orElseThrow(() -> new ResourceNotFoundException("No QueryGroup exists with the provided name: " + name)); // check if there's any resource allocation that exceed limit of 1.0 - if (updateQueryGroupRequest.getResourceLimits() != null) { + if (updateQueryGroupRequest.getResourceLimits() != null && !updateQueryGroupRequest.getResourceLimits().isEmpty()) { validateTotalUsage(existingGroups, name, updateQueryGroupRequest.getResourceLimits()); } // build the QueryGroup with updated fields - QueryGroup existingGroup = findExistingGroup.get(); - String _id = existingGroup.get_id(); - long updatedAtInMillis = updateQueryGroupRequest.getUpdatedAtInMillis(); - Map existingResourceLimits = existingGroup.getResourceLimits(); - Map updatedResourceLimits = new HashMap<>(); - if (existingResourceLimits != null) { - updatedResourceLimits.putAll(existingResourceLimits); - } + final Map updatedResourceLimits = new HashMap<>(existingGroup.getResourceLimits()); if (updateQueryGroupRequest.getResourceLimits() != null) { updatedResourceLimits.putAll(updateQueryGroupRequest.getResourceLimits()); } - ResiliencyMode mode = updateQueryGroupRequest.getResiliencyMode() == null - ? existingGroup.getResiliencyMode() - : updateQueryGroupRequest.getResiliencyMode(); + final ResiliencyMode mode = Optional.ofNullable(updateQueryGroupRequest.getResiliencyMode()) + .orElse(existingGroup.getResiliencyMode()); - QueryGroup updatedGroup = new QueryGroup(name, _id, mode, updatedResourceLimits, updatedAtInMillis); + final QueryGroup updatedGroup = new QueryGroup( + name, + existingGroup.get_id(), + mode, + updatedResourceLimits, + updateQueryGroupRequest.getUpdatedAtInMillis() + ); return ClusterState.builder(currentState) .metadata(Metadata.builder(metadata).remove(existingGroup).put(updatedGroup).build()) .build(); } + /** + * This method checks if there's any resource allocation that exceed limit of 1.0 + * @param existingQueryGroups - existing QueryGroups in the system + * @param resourceLimits - the QueryGroup we're creating or updating + */ + private void validateTotalUsage(Map existingQueryGroups, String name, Map resourceLimits) { + final Map totalUsage = new EnumMap<>(ResourceType.class); + totalUsage.putAll(resourceLimits); + for (QueryGroup currGroup : existingQueryGroups.values()) { + if (!currGroup.getName().equals(name)) { + for (ResourceType resourceType : resourceLimits.keySet()) { + totalUsage.compute(resourceType, (k, v) -> v + currGroup.getResourceLimits().get(resourceType)); + } + } + } + totalUsage.forEach((resourceType, total) -> { + if (total > 1.0) { + logger.warn("Total resource allocation for {} will go above the max limit of 1.0.", resourceType.getName()); + throw new IllegalArgumentException( + "Total resource allocation for " + resourceType.getName() + " will go above the max limit of 1.0." + ); + } + }); + } + /** * maxQueryGroupCount getter */ diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/package-info.java index d21a328b6a0bd..5848e9c936623 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/package-info.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/package-info.java @@ -7,10 +7,6 @@ */ /** -<<<<<<< HEAD * Package for the service classes of WorkloadManagementPlugin -======= - * Package for the service classes for QueryGroup CRUD operations ->>>>>>> 7abcbc98a73 (Add Update QueryGroup API Logic) */ package org.opensearch.plugin.wlm.service; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java index e025b3891e3a4..7d0f686285533 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java @@ -132,7 +132,10 @@ public static Tuple preparePersisten return new Tuple(queryGroupPersistenceService, clusterState); } - public static void assertEqualResourceLimits(Map resourceLimitMapOne, Map resourceLimitMapTwo) { + public static void assertEqualResourceLimits( + Map resourceLimitMapOne, + Map resourceLimitMapTwo + ) { assertTrue(resourceLimitMapOne.keySet().containsAll(resourceLimitMapTwo.keySet())); assertTrue(resourceLimitMapOne.values().containsAll(resourceLimitMapTwo.values())); } diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/UpdateQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java similarity index 89% rename from plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/UpdateQueryGroupRequestTests.java rename to plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java index 6feeb46b1165a..d804fb7569ba2 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/UpdateQueryGroupRequestTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.plugin.wlm; +package org.opensearch.plugin.wlm.action; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -27,6 +27,9 @@ public class UpdateQueryGroupRequestTests extends OpenSearchTestCase { + /** + * Test case to verify the serialization and deserialization of UpdateQueryGroupRequest. + */ public void testSerialization() throws IOException { UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(queryGroupOne); BytesStreamOutput out = new BytesStreamOutput(); @@ -40,6 +43,9 @@ public void testSerialization() throws IOException { assertEquals(request.getUpdatedAtInMillis(), otherRequest.getUpdatedAtInMillis()); } + /** + * Test case to verify the serialization and deserialization of UpdateQueryGroupRequest with only name field. + */ public void testSerializationOnlyName() throws IOException { UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, null, new HashMap<>(), TIMESTAMP_ONE); BytesStreamOutput out = new BytesStreamOutput(); @@ -52,6 +58,9 @@ public void testSerializationOnlyName() throws IOException { assertEquals(request.getUpdatedAtInMillis(), otherRequest.getUpdatedAtInMillis()); } + /** + * Test case to verify the serialization and deserialization of UpdateQueryGroupRequest with only resourceLimits field. + */ public void testSerializationOnlyResourceLimit() throws IOException { UpdateQueryGroupRequest request = new UpdateQueryGroupRequest( NAME_ONE, @@ -70,6 +79,9 @@ public void testSerializationOnlyResourceLimit() throws IOException { assertEquals(request.getUpdatedAtInMillis(), otherRequest.getUpdatedAtInMillis()); } + /** + * Tests invalid ResourceType. + */ public void testInvalidResourceLimitList() { assertThrows( IllegalArgumentException.class, @@ -82,6 +94,9 @@ public void testInvalidResourceLimitList() { ); } + /** + * Tests invalid resiliencyMode. + */ public void testInvalidEnforcement() { assertThrows( IllegalArgumentException.class, diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/UpdateQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java similarity index 86% rename from plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/UpdateQueryGroupResponseTests.java rename to plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java index c21e39160a858..fe3d92763866c 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/UpdateQueryGroupResponseTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.plugin.wlm; +package org.opensearch.plugin.wlm.action; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -15,6 +15,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.wlm.QueryGroupTestUtils; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -26,6 +27,9 @@ public class UpdateQueryGroupResponseTests extends OpenSearchTestCase { + /** + * Test case to verify the serialization and deserialization of UpdateQueryGroupResponse. + */ public void testSerialization() throws IOException { UpdateQueryGroupResponse response = new UpdateQueryGroupResponse(queryGroupOne, RestStatus.OK); BytesStreamOutput out = new BytesStreamOutput(); @@ -42,6 +46,9 @@ public void testSerialization() throws IOException { QueryGroupTestUtils.assertEqualQueryGroups(list1, list2); } + /** + * Test case to verify the toXContent method of UpdateQueryGroupResponse. + */ public void testToXContentUpdateSingleQueryGroup() throws IOException { XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); UpdateQueryGroupResponse otherResponse = new UpdateQueryGroupResponse(queryGroupOne, RestStatus.OK); @@ -50,8 +57,8 @@ public void testToXContentUpdateSingleQueryGroup() throws IOException { + " \"_id\" : \"AgfUO5Ja9yfsYlONlYi3TQ==\",\n" + " \"name\" : \"query_group_one\",\n" + " \"resiliency_mode\" : \"monitor\",\n" - + " \"updatedAt\" : 4513232413,\n" - + " \"resourceLimits\" : {\n" + + " \"updated_at\" : 4513232413,\n" + + " \"resource_limits\" : {\n" + " \"memory\" : 0.3\n" + " }\n" + "}"; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java index b303f689a5890..0e1c3cfdfaf65 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.ClusterSettings; @@ -20,22 +21,23 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.plugin.wlm.QueryGroupTestUtils; import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest; +import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse; import org.opensearch.search.ResourceType; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode; -import org.opensearch.plugin.wlm.UpdateQueryGroupRequest; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import java.util.Optional; import org.mockito.ArgumentCaptor; + import static org.opensearch.cluster.metadata.QueryGroup.builder; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MEMORY_STRING; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MONITOR_STRING; @@ -50,8 +52,8 @@ import static org.opensearch.plugin.wlm.QueryGroupTestUtils.clusterState; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.preparePersistenceServiceSetup; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupList; -import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupPersistenceService; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne; +import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupPersistenceService; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupTwo; import static org.opensearch.plugin.wlm.service.QueryGroupPersistenceService.QUERY_GROUP_COUNT_SETTING_NAME; import static org.opensearch.plugin.wlm.service.QueryGroupPersistenceService.SOURCE; @@ -303,6 +305,9 @@ public void testMaxQueryGroupCount() { assertEquals(50, queryGroupPersistenceService.getMaxQueryGroupCount()); } + /** + * Tests updating a QueryGroup with all fields + */ public void testUpdateQueryGroupAllFields() { QueryGroup updated = builder().name(NAME_ONE) ._id(_ID_ONE) @@ -328,6 +333,9 @@ public void testUpdateQueryGroupAllFields() { assertEqualQueryGroups(expectedList, updatedQueryGroups); } + /** + * Tests updating a QueryGroup with only updated resourceLimits + */ public void testUpdateQueryGroupResourceLimitsOnly() { QueryGroup updated = builder().name(NAME_ONE) ._id(_ID_ONE) @@ -368,6 +376,9 @@ public void testUpdateQueryGroupResourceLimitsOnly() { assertEqualQueryGroups(list1, list2); } + /** + * Tests updating a QueryGroup with invalid name + */ public void testUpdateQueryGroupNonExistedName() { QueryGroupPersistenceService queryGroupPersistenceService = queryGroupPersistenceService(); UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest( @@ -389,4 +400,81 @@ public void testUpdateQueryGroupNonExistedName() { expectedList.add(queryGroupOne); assertEqualQueryGroups(expectedList, updatedQueryGroups); } + + /** + * Tests UpdateInClusterStateMetadata function + */ + public void testUpdateInClusterStateMetadata() { + ClusterService clusterService = mock(ClusterService.class); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + QueryGroupTestUtils.settings(), + clusterSettings() + ); + queryGroupPersistenceService.updateInClusterStateMetadata(null, listener); + verify(clusterService).submitStateUpdateTask(eq(SOURCE), any()); + } + + /** + * Tests UpdateInClusterStateMetadata function with inner functions + */ + public void testUpdateInClusterStateMetadataInner() { + ClusterService clusterService = mock(ClusterService.class); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + QueryGroupTestUtils.settings(), + clusterSettings() + ); + UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest( + NAME_TWO, + ResiliencyMode.SOFT, + new HashMap<>(), + 2435465879685L + ); + ArgumentCaptor captor = ArgumentCaptor.forClass(ClusterStateUpdateTask.class); + queryGroupPersistenceService.updateInClusterStateMetadata(updateQueryGroupRequest, listener); + verify(clusterService, times(1)).submitStateUpdateTask(eq(SOURCE), captor.capture()); + ClusterStateUpdateTask capturedTask = captor.getValue(); + assertEquals(queryGroupPersistenceService.updateQueryGroupThrottlingKey, capturedTask.getClusterManagerThrottlingKey()); + + doAnswer(invocation -> { + ClusterStateUpdateTask task = invocation.getArgument(1); + task.clusterStateProcessed(SOURCE, clusterState(), clusterState()); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any()); + queryGroupPersistenceService.updateInClusterStateMetadata(updateQueryGroupRequest, listener); + verify(listener).onResponse(any(UpdateQueryGroupResponse.class)); + } + + /** + * Tests UpdateInClusterStateMetadata function with failure + */ + public void testUpdateInClusterStateMetadataFailure() { + ClusterService clusterService = mock(ClusterService.class); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + QueryGroupTestUtils.settings(), + clusterSettings() + ); + UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest( + NAME_TWO, + ResiliencyMode.SOFT, + new HashMap<>(), + 2435465879685L + ); + doAnswer(invocation -> { + ClusterStateUpdateTask task = invocation.getArgument(1); + Exception exception = new RuntimeException("Test Exception"); + task.onFailure(SOURCE, exception); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any()); + queryGroupPersistenceService.updateInClusterStateMetadata(updateQueryGroupRequest, listener); + verify(listener).onFailure(any(RuntimeException.class)); + } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java index 3af10bc8c5572..ed6fabb2e1fb6 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java @@ -29,11 +29,7 @@ * Class to define the QueryGroup schema * { * "_id": "fafjafjkaf9ag8a9ga9g7ag0aagaga", -<<<<<<< HEAD * "resource_limits": { -======= - * "resourceLimits": { ->>>>>>> 7abcbc98a73 (Add Update QueryGroup API Logic) * "memory": 0.4 * }, * "resiliency_mode": "enforced", @@ -83,7 +79,7 @@ public QueryGroup(String name, String _id, ResiliencyMode resiliencyMode, Map resourceLimits) { + public static void validateResourceLimits(Map resourceLimits) { for (Map.Entry resource : resourceLimits.entrySet()) { Double threshold = resource.getValue(); Objects.requireNonNull(resource.getKey(), "resourceName can't be null"); @@ -327,5 +323,17 @@ public Builder resourceLimits(Map resourceLimits) { public QueryGroup build() { return new QueryGroup(name, _id, resiliencyMode, resourceLimits, updatedAt); } + + public ResiliencyMode getResiliencyMode() { + return resiliencyMode; + } + + public long getUpdatedAt() { + return updatedAt; + } + + public Map getResourceLimits() { + return resourceLimits; + } } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8704fe25a2516..7baae17dd77cd 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -160,7 +160,6 @@ import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.search.fetch.subphase.highlight.FastVectorHighlighter; -import org.opensearch.search.query_group.QueryGroupServiceSettings; import org.opensearch.snapshots.InternalSnapshotsInfoService; import org.opensearch.snapshots.SnapshotsService; import org.opensearch.tasks.TaskCancellationMonitoringSettings; @@ -767,14 +766,7 @@ public void apply(Settings value, Settings current, Settings previous) { // Composite index settings CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING, -<<<<<<< HEAD SystemTemplatesService.SETTING_APPLICATION_BASED_CONFIGURATION_TEMPLATES_ENABLED -======= - // QueryGroup settings - QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT, - QueryGroupServiceSettings.NODE_LEVEL_REJECTION_THRESHOLD, - QueryGroupServiceSettings.NODE_LEVEL_CANCELLATION_THRESHOLD ->>>>>>> 7abcbc98a73 (Add Update QueryGroup API Logic) ) ) );