Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Aug 20, 2024
1 parent 58d33b2 commit c8a9a49
Show file tree
Hide file tree
Showing 16 changed files with 231 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.inject.Module;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -21,11 +22,12 @@
import org.opensearch.plugin.wlm.action.GetQueryGroupAction;
import org.opensearch.plugin.wlm.action.TransportCreateQueryGroupAction;
import org.opensearch.plugin.wlm.action.TransportGetQueryGroupAction;
import org.opensearch.plugin.wlm.action.TransportUpdateQueryGroupAction;
import org.opensearch.plugin.wlm.action.UpdateQueryGroupAction;
import org.opensearch.plugin.wlm.rest.RestCreateQueryGroupAction;
import org.opensearch.plugin.wlm.rest.RestGetQueryGroupAction;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.common.inject.Module;
import org.opensearch.plugin.wlm.rest.RestUpdateQueryGroupAction;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

package org.opensearch.plugin.wlm;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.plugin.wlm.service.Persistable;
import org.opensearch.common.inject.Singleton;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;

/**
Expand All @@ -26,7 +24,6 @@ public WorkloadManagementPluginModule() {}

@Override
protected void configure() {
bind(new TypeLiteral<Persistable<QueryGroup>>() {
}).to(QueryGroupPersistenceService.class).asEagerSingleton();
bind(QueryGroupPersistenceService.class).in(Singleton.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,26 @@
* compatible open source license.
*/

package org.opensearch.plugin.wlm;
package org.opensearch.plugin.wlm.action;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.service.Persistable;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/**
* Transport action for update QueryGroup
* Transport action to update QueryGroup
*
* @opensearch.internal
* @opensearch.experimental
*/
public class TransportUpdateQueryGroupAction extends HandledTransportAction<UpdateQueryGroupRequest, UpdateQueryGroupResponse> {

private final ThreadPool threadPool;
private final Persistable<QueryGroup> queryGroupPersistenceService;
private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportUpdateQueryGroupAction
Expand All @@ -35,15 +34,15 @@ public class TransportUpdateQueryGroupAction extends HandledTransportAction<Upda
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param queryGroupPersistenceService - a {@link Persistable} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportUpdateQueryGroupAction(
String actionName,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
Persistable<QueryGroup> queryGroupPersistenceService
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(UpdateQueryGroupAction.NAME, transportService, actionFilters, UpdateQueryGroupRequest::new);
this.threadPool = threadPool;
Expand All @@ -52,6 +51,7 @@ public TransportUpdateQueryGroupAction(

@Override
protected void doExecute(Task task, UpdateQueryGroupRequest request, ActionListener<UpdateQueryGroupResponse> listener) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.update(request, listener));
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(() -> queryGroupPersistenceService.updateInClusterStateMetadata(request, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
* compatible open source license.
*/

package org.opensearch.plugin.wlm;
package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionType;

/**
* Transport action to update QueryGroup
*
* @opensearch.api
* @opensearch.experimental
*/
public class UpdateQueryGroupAction extends ActionType<UpdateQueryGroupResponse> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
* compatible open source license.
*/

package org.opensearch.plugin.wlm;
package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.ResourceType;
import org.joda.time.Instant;
Expand All @@ -26,18 +25,13 @@
/**
* A request for update QueryGroup
*
* @opensearch.internal
* @opensearch.experimental
*/
public class UpdateQueryGroupRequest extends ActionRequest implements Writeable.Reader<UpdateQueryGroupRequest> {
String name;
Map<ResourceType, Double> resourceLimits;
private ResiliencyMode resiliencyMode;
long updatedAtInMillis;

/**
* Default constructor for UpdateQueryGroupRequest
*/
public UpdateQueryGroupRequest() {}
public class UpdateQueryGroupRequest extends ActionRequest {
private final String name;
private final Map<ResourceType, Double> resourceLimits;
private final ResiliencyMode resiliencyMode;
private final long updatedAtInMillis;

/**
* Constructor for UpdateQueryGroupRequest
Expand Down Expand Up @@ -83,64 +77,29 @@ public UpdateQueryGroupRequest(StreamInput in) throws IOException {
}
if (in.readBoolean()) {
resiliencyMode = ResiliencyMode.fromName(in.readString());
} else {
resiliencyMode = null;
}
updatedAtInMillis = in.readLong();
}

@Override
public UpdateQueryGroupRequest read(StreamInput in) throws IOException {
return new UpdateQueryGroupRequest(in);
}

/**
* Generate a UpdateQueryGroupRequest from XContent
* @param parser - A {@link XContentParser} object
* @param name - name of the QueryGroup to be updated
*/
public static UpdateQueryGroupRequest fromXContent(XContentParser parser, String name) throws IOException {
while (parser.currentToken() != XContentParser.Token.START_OBJECT) {
parser.nextToken();
}

if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("expected start object but got a " + parser.currentToken());
}

XContentParser.Token token;
String fieldName = "";
ResiliencyMode mode = null;

// Map to hold resources
final Map<ResourceType, Double> resourceLimits = new HashMap<>();
while ((token = parser.nextToken()) != null) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token.isValue()) {
if (fieldName.equals("resiliency_mode")) {
mode = ResiliencyMode.fromName(parser.text());
} else {
throw new IllegalArgumentException("unrecognised [field=" + fieldName + " in QueryGroup");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (!fieldName.equals("resourceLimits")) {
throw new IllegalArgumentException(
"QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token
);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else {
resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue());
}
}
}
}
return new UpdateQueryGroupRequest(name, mode, resourceLimits, Instant.now().getMillis());
QueryGroup.Builder builder = QueryGroup.Builder.fromXContent(parser);
return new UpdateQueryGroupRequest(name, builder.getResiliencyMode(), builder.getResourceLimits(), Instant.now().getMillis());
}

@Override
public ActionRequestValidationException validate() {
QueryGroup.validateName(name);
if (resourceLimits != null) {
QueryGroup.validateResourceLimits(resourceLimits);
}
assert QueryGroup.isValid(updatedAtInMillis);
return null;
}

Expand All @@ -151,59 +110,27 @@ public String getName() {
return name;
}

/**
* name setter
* @param name - name to be set
*/
public void setName(String name) {
this.name = name;
}

/**
* ResourceLimits getter
*/
public Map<ResourceType, Double> getResourceLimits() {
return resourceLimits;
}

/**
* ResourceLimits setter
* @param resourceLimits - ResourceLimit to be set
*/
public void setResourceLimits(Map<ResourceType, Double> resourceLimits) {
this.resourceLimits = resourceLimits;
}

/**
* resiliencyMode getter
*/
public ResiliencyMode getResiliencyMode() {
return resiliencyMode;
}

/**
* resiliencyMode setter
* @param resiliencyMode - mode to be set
*/
public void setResiliencyMode(ResiliencyMode resiliencyMode) {
this.resiliencyMode = resiliencyMode;
}

/**
* updatedAtInMillis getter
*/
public long getUpdatedAtInMillis() {
return updatedAtInMillis;
}

/**
* updatedAtInMillis setter
* @param updatedAtInMillis - updatedAtInMillis to be set
*/
public void setUpdatedAtInMillis(long updatedAtInMillis) {
this.updatedAtInMillis = updatedAtInMillis;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -212,7 +139,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeGenericValue);
out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeDouble);
}
if (resiliencyMode == null) {
out.writeBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.plugin.wlm;
package org.opensearch.plugin.wlm.action;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.core.action.ActionResponse;
Expand All @@ -22,7 +22,7 @@
/**
* Response for the update API for QueryGroup
*
* @opensearch.internal
* @opensearch.experimental
*/
public class UpdateQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject {
private final QueryGroup queryGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import static org.opensearch.rest.RestRequest.Method.GET;

/**
* Rest action to get a QueryGroup0
* Rest action to get a QueryGroup
*
* @opensearch.experimental
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.plugin.wlm.UpdateQueryGroupAction;
import org.opensearch.plugin.wlm.UpdateQueryGroupRequest;
import org.opensearch.plugin.wlm.UpdateQueryGroupResponse;
import org.opensearch.plugin.wlm.action.UpdateQueryGroupAction;
import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest;
import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
Expand All @@ -31,7 +31,7 @@
/**
* Rest action to update a QueryGroup
*
* @opensearch.api
* @opensearch.experimental
*/
public class RestUpdateQueryGroupAction extends BaseRestHandler {

Expand All @@ -50,23 +50,15 @@ public String getName() {
*/
@Override
public List<Route> routes() {
return List.of(new Route(POST, "_query_group/{name}"), new Route(PUT, "_query_group/{name}"));
return List.of(new Route(POST, "_wlm/query_group/{name}"), new Route(PUT, "_wlm/query_group/{name}"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String name = request.param("name");
UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest();
request.applyContentParser((parser) -> parseRestRequest(updateQueryGroupRequest, parser, name));
return channel -> client.execute(UpdateQueryGroupAction.INSTANCE, updateQueryGroupRequest, updateQueryGroupResponse(channel));
}

private void parseRestRequest(UpdateQueryGroupRequest request, XContentParser parser, String name) throws IOException {
final UpdateQueryGroupRequest updateQueryGroupRequest = UpdateQueryGroupRequest.fromXContent(parser, name);
request.setName(updateQueryGroupRequest.getName());
request.setResourceLimits(updateQueryGroupRequest.getResourceLimits());
request.setResiliencyMode(updateQueryGroupRequest.getResiliencyMode());
request.setUpdatedAtInMillis(updateQueryGroupRequest.getUpdatedAtInMillis());
try (XContentParser parser = request.contentParser()) {
UpdateQueryGroupRequest updateQueryGroupRequest = UpdateQueryGroupRequest.fromXContent(parser, request.param("name"));
return channel -> client.execute(UpdateQueryGroupAction.INSTANCE, updateQueryGroupRequest, updateQueryGroupResponse(channel));
}
}

private RestResponseListener<UpdateQueryGroupResponse> updateQueryGroupResponse(final RestChannel channel) {
Expand Down
Loading

0 comments on commit c8a9a49

Please sign in to comment.