-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
* Implemented GET API for workflow step Signed-off-by: Owais Kazi <[email protected]> * Added rest test case Signed-off-by: Owais Kazi <[email protected]> * Added tests and javadocs Signed-off-by: Owais Kazi <[email protected]> * Addressed PR comments Signed-off-by: Owais Kazi <[email protected]> * Removed CreateIndex and CreateIngestPipeline from json Signed-off-by: Owais Kazi <[email protected]> --------- Signed-off-by: Owais Kazi <[email protected]> (cherry picked from commit ebf8b90) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.flowframework.rest; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.ExceptionsHelper; | ||
import org.opensearch.action.ActionRequest; | ||
import org.opensearch.action.ActionRequestValidationException; | ||
import org.opensearch.client.node.NodeClient; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.rest.RestStatus; | ||
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.flowframework.common.FlowFrameworkSettings; | ||
import org.opensearch.flowframework.exception.FlowFrameworkException; | ||
import org.opensearch.flowframework.transport.GetWorkflowStepAction; | ||
import org.opensearch.rest.BaseRestHandler; | ||
import org.opensearch.rest.BytesRestResponse; | ||
import org.opensearch.rest.RestRequest; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Locale; | ||
|
||
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; | ||
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; | ||
|
||
/** | ||
* Rest Action to facilitate requests to get the workflow steps | ||
*/ | ||
public class RestGetWorkflowStepAction extends BaseRestHandler { | ||
|
||
private static final String GET_WORKFLOW_STEP_ACTION = "get_workflow_step"; | ||
private static final Logger logger = LogManager.getLogger(RestGetWorkflowStepAction.class); | ||
private FlowFrameworkSettings flowFrameworkSettings; | ||
|
||
/** | ||
* Instantiates a new RestGetWorkflowStepAction | ||
* @param flowFrameworkSettings Whether this API is enabled | ||
*/ | ||
public RestGetWorkflowStepAction(FlowFrameworkSettings flowFrameworkSettings) { | ||
this.flowFrameworkSettings = flowFrameworkSettings; | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return GET_WORKFLOW_STEP_ACTION; | ||
} | ||
|
||
@Override | ||
public List<Route> routes() { | ||
return List.of(new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, "_steps"))); | ||
} | ||
|
||
@Override | ||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { | ||
try { | ||
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) { | ||
throw new FlowFrameworkException( | ||
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.", | ||
RestStatus.FORBIDDEN | ||
); | ||
} | ||
|
||
ActionRequest request = new ActionRequest() { | ||
@Override | ||
public ActionRequestValidationException validate() { | ||
return null; | ||
} | ||
}; | ||
return channel -> client.execute(GetWorkflowStepAction.INSTANCE, request, ActionListener.wrap(response -> { | ||
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); | ||
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); | ||
}, exception -> { | ||
Check warning on line 81 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L79-L81
|
||
try { | ||
FlowFrameworkException ex = exception instanceof FlowFrameworkException | ||
? (FlowFrameworkException) exception | ||
: new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)); | ||
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); | ||
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); | ||
Check warning on line 87 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L84-L87
|
||
|
||
} catch (IOException e) { | ||
logger.error("Failed to send back get workflow step exception", e); | ||
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage())); | ||
} | ||
})); | ||
Check warning on line 93 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L89-L93
|
||
|
||
} catch (FlowFrameworkException ex) { | ||
return channel -> channel.sendResponse( | ||
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) | ||
); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.flowframework.transport; | ||
|
||
import org.opensearch.action.ActionType; | ||
|
||
import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX; | ||
|
||
/** | ||
* External Action for public facing RestGetWorkflowStepAction | ||
*/ | ||
public class GetWorkflowStepAction extends ActionType<GetWorkflowStepResponse> { | ||
|
||
/** The name of this action */ | ||
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow_step/get"; | ||
/** An instance of this action */ | ||
public static final GetWorkflowStepAction INSTANCE = new GetWorkflowStepAction(); | ||
|
||
/** | ||
* Instantiates this class | ||
*/ | ||
public GetWorkflowStepAction() { | ||
super(NAME, GetWorkflowStepResponse::new); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.flowframework.transport; | ||
|
||
import org.opensearch.core.action.ActionResponse; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.xcontent.ToXContentObject; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.flowframework.model.WorkflowValidator; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* Transport Response from getting workflow step | ||
*/ | ||
public class GetWorkflowStepResponse extends ActionResponse implements ToXContentObject { | ||
|
||
private WorkflowValidator workflowValidator; | ||
|
||
/** | ||
* Instantiates a new GetWorkflowStepResponse from an input stream | ||
* @param in the input stream to read from | ||
* @throws IOException if the workflow json cannot be read from the input stream | ||
*/ | ||
public GetWorkflowStepResponse(StreamInput in) throws IOException { | ||
super(in); | ||
this.workflowValidator = WorkflowValidator.parse(in.readString()); | ||
} | ||
Check warning on line 35 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java Codecov / codecov/patchsrc/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java#L33-L35
|
||
|
||
/** | ||
* Instantiates a new GetWorkflowStepResponse | ||
* @param workflowValidator the workflow validator | ||
*/ | ||
public GetWorkflowStepResponse(WorkflowValidator workflowValidator) { | ||
this.workflowValidator = workflowValidator; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeString(workflowValidator.toJson()); | ||
} | ||
Check warning on line 48 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java Codecov / codecov/patchsrc/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java#L47-L48
|
||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params) throws IOException { | ||
return this.workflowValidator.toXContent(xContentBuilder, params); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.flowframework.transport; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.ExceptionsHelper; | ||
import org.opensearch.action.ActionRequest; | ||
import org.opensearch.action.support.ActionFilters; | ||
import org.opensearch.action.support.HandledTransportAction; | ||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.flowframework.exception.FlowFrameworkException; | ||
import org.opensearch.flowframework.model.WorkflowValidator; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.transport.TransportService; | ||
|
||
/** | ||
* Transport action to retrieve a workflow step json | ||
*/ | ||
public class GetWorkflowStepTransportAction extends HandledTransportAction<ActionRequest, GetWorkflowStepResponse> { | ||
|
||
private final Logger logger = LogManager.getLogger(GetWorkflowStepTransportAction.class); | ||
|
||
/** | ||
* Instantiates a new GetWorkflowStepTransportAction instance | ||
* @param transportService the transport service | ||
* @param actionFilters action filters | ||
*/ | ||
@Inject | ||
public GetWorkflowStepTransportAction(TransportService transportService, ActionFilters actionFilters) { | ||
super(GetWorkflowStepAction.NAME, transportService, actionFilters, WorkflowRequest::new); | ||
} | ||
|
||
@Override | ||
protected void doExecute(Task task, ActionRequest request, ActionListener<GetWorkflowStepResponse> listener) { | ||
try { | ||
listener.onResponse(new GetWorkflowStepResponse(WorkflowValidator.parse("mappings/workflow-steps.json"))); | ||
} catch (Exception e) { | ||
logger.error("Failed to retrieve workflow step json.", e); | ||
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); | ||
Check warning on line 47 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepTransportAction.java Codecov / codecov/patchsrc/main/java/org/opensearch/flowframework/transport/GetWorkflowStepTransportAction.java#L45-L47
|
||
} | ||
} | ||
} |