Skip to content

Commit

Permalink
[Backport 2.x] Search APIs to fetch all the workflows present (#152)
Browse files Browse the repository at this point in the history
Search APIs to fetch all the workflows present (#139)

* Initial Implementation of Search API



* Added tests



* Added tests for transport action



* Added javadoc



* Set APIs behind the feature flag



* Minor comment



---------


(cherry picked from commit dbbff74)

Signed-off-by: Owais Kazi <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent d22951e commit 8c6c040
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 6 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ dependencies {
implementation 'org.junit.jupiter:junit-jupiter:5.10.1'
implementation "com.google.guava:guava:32.1.3-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
implementation "org.opensearch:common-utils:${common_utils_version}"

configurations.all {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
import org.opensearch.flowframework.transport.SearchWorkflowTransportAction;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -104,15 +107,17 @@ public List<RestHandler> getRestHandlers(
) {
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting)
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class)
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.util.RestHandlerUtils.getSourceContext;

/**
* Abstract class to handle search request.
*/
public abstract class AbstractSearchWorkflowAction<T extends ToXContentObject> extends BaseRestHandler {

protected final List<String> urlPaths;
protected final String index;
protected final Class<T> clazz;
protected final ActionType<SearchResponse> actionType;
protected final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new AbstractSearchWorkflowAction
* @param urlPaths urlPaths to create routes
* @param index index the search should be done on
* @param clazz model class
* @param actionType from which action abstract class is called
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public AbstractSearchWorkflowAction(
List<String> urlPaths,
String index,
Class<T> clazz,
ActionType<SearchResponse> actionType,
FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting
) {
this.urlPaths = urlPaths;
this.index = index;
this.clazz = clazz;
this.actionType = actionType;
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser());
searchSourceBuilder.fetchSource(getSourceContext(request, searchSourceBuilder));
searchSourceBuilder.seqNoAndPrimaryTerm(true).version(true);
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index);
return channel -> client.execute(actionType, searchRequest, search(channel));
}

/**
* Builds the action response for the Search Request
*
* @param channel the REST channel
* @return the action response
*/
protected RestResponseListener<SearchResponse> search(RestChannel channel) {
return new RestResponseListener<SearchResponse>(channel) {
@Override
public RestResponse buildResponse(SearchResponse response) throws Exception {
if (response.isTimedOut()) {
return new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, response.toString());
}
return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), EMPTY_PARAMS));
}
};
}

@Override
public List<Route> routes() {
List<Route> routes = new ArrayList<>();
for (String path : urlPaths) {
routes.add(new Route(RestRequest.Method.POST, path));
routes.add(new Route(RestRequest.Method.GET, path));
}
return routes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class RestCreateWorkflowAction extends BaseRestHandler {
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Intantiates a new RestCreateWorkflowAction
* Instantiates a new RestCreateWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.SearchWorkflowAction;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

/**
* Rest Action to facilitate requests to search workflows
*/
public class RestSearchWorkflowAction extends AbstractSearchWorkflowAction<Template> {

private static final String SEARCH_WORKFLOW_ACTION = "search_workflow_action";
private static final String SEARCH_WORKFLOW_PATH = WORKFLOW_URI + "/_search";

/**
* Instantiates a new RestSearchWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestSearchWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
super(
ImmutableList.of(SEARCH_WORKFLOW_PATH),
GLOBAL_CONTEXT_INDEX,
Template.class,
SearchWorkflowAction.INSTANCE,
flowFrameworkFeatureEnabledSetting
);
}

@Override
public String getName() {
return SEARCH_WORKFLOW_ACTION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACION_NAME_PREFIX;

/**
* External Action for public facing RestCreateWorkflowActiom
* External Action for public facing RestCreateWorkflowAction
*/
public class CreateWorkflowAction extends ActionType<WorkflowResponse> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchResponse;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACION_NAME_PREFIX;

/**
* External Action for public facing RestSearchWorkflowAction
*/
public class SearchWorkflowAction extends ActionType<SearchResponse> {

/** The name of this action */
public static final String NAME = TRANSPORT_ACION_NAME_PREFIX + "workflow/search";
/** An instance of this action */
public static final SearchWorkflowAction INSTANCE = new SearchWorkflowAction();

private SearchWorkflowAction() {
super(NAME, SearchResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport Action to search workflows created
*/
public class SearchWorkflowTransportAction extends HandledTransportAction<SearchRequest, SearchResponse> {
private Client client;

/**
* Intantiates a new CreateWorkflowTransportAction
* @param transportService the TransportService
* @param actionFilters action filters
* @param client The client used to make the request to OS
*/
@Inject
public SearchWorkflowTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) {
super(SearchWorkflowAction.NAME, transportService, actionFilters, SearchRequest::new);
this.client = client;
}

@Override
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> actionListener) {
// TODO: AccessController should take care of letting the user with right permission to view the workflow
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.search(request, ActionListener.runBefore(actionListener, () -> context.restore()));
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.util;

import org.apache.commons.lang3.ArrayUtils;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.rest.RestRequest;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.fetch.subphase.FetchSourceContext;

/**
* Utility methods for Rest Handlers
*/
public class RestHandlerUtils {

public static final String[] USER_EXCLUDE = new String[] { CommonValue.USER_FIELD };

private RestHandlerUtils() {}

/**
* Creates a source context and include/exclude information to be shared based on the user
*
* @param request the REST request
* @param searchSourceBuilder the search request source builder
* @return modified sources
*/
public static FetchSourceContext getSourceContext(RestRequest request, SearchSourceBuilder searchSourceBuilder) {
// TODO
// 1. Move UI_METADATA to GC Index
// 2. check if the request came from dashboard and exclude UI_METADATA
if (searchSourceBuilder.fetchSource() != null) {
String[] newArray = (String[]) ArrayUtils.addAll(searchSourceBuilder.fetchSource().excludes(), USER_EXCLUDE);
return new FetchSourceContext(true, searchSourceBuilder.fetchSource().includes(), newArray);
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public void testPlugin() throws IOException {
3,
ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size()
);
assertEquals(2, ffp.getRestHandlers(null, null, null, null, null, null, null).size());
assertEquals(2, ffp.getActions().size());
assertEquals(3, ffp.getRestHandlers(null, null, null, null, null, null, null).size());
assertEquals(3, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(1, ffp.getSettings().size());
}
Expand Down
Loading

0 comments on commit 8c6c040

Please sign in to comment.