Skip to content

Commit

Permalink
Adding dryrun param to create workflow API, allows for validation bef…
Browse files Browse the repository at this point in the history
…ore saving

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Nov 2, 2023
1 parent 515507a commit eefe602
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private CommonValue() {}
public static final String WORKFLOW_URI = FLOW_FRAMEWORK_BASE_URI + "/workflow";
/** Field name for workflow Id, the document Id of the indexed use case template */
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for dry run, the flag to indicate if validation is necessary */
public static final String DRY_RUN = "dryrun";
/** The field name for provision workflow within a use case template*/
public static final String PROVISION_WORKFLOW = "provision";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.DRY_RUN;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

Expand Down Expand Up @@ -62,9 +63,13 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
try {

String workflowId = request.param(WORKFLOW_ID);
Template template = Template.parse(request.content().utf8ToString());
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template);
boolean dryRun = request.paramAsBoolean(DRY_RUN, false);

Check warning on line 69 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L68-L69

Added lines #L68 - L69 were not covered by tests

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun);

Check warning on line 71 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L71

Added line #L71 was not covered by tests

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.List;

import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
Expand All @@ -39,30 +44,35 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl

private final Logger logger = LogManager.getLogger(CreateWorkflowTransportAction.class);

private final WorkflowProcessSorter workflowProcessSorter;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;

/**
* Intantiates a new CreateWorkflowTransportAction
* @param transportService the TransportService
* @param actionFilters action filters
* @param workflowProcessSorter the workflow process sorter
* @param flowFrameworkIndicesHandler The handler for the global context index
* @param client The client used to make the request to OS
*/
@Inject
public CreateWorkflowTransportAction(
TransportService transportService,
ActionFilters actionFilters,
WorkflowProcessSorter workflowProcessSorter,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Client client
) {
super(CreateWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.client = client;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {

User user = getUserContext(client);
Template templateWithUser = new Template(
request.getTemplate().name(),
Expand All @@ -73,6 +83,21 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
request.getTemplate().workflows(),
user
);

if (request.isDryRun()) {
try {
validateWorkflows(templateWithUser);
} catch (Exception e) {

Check warning on line 90 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L89-L90

Added lines #L89 - L90 were not covered by tests
if (e instanceof FlowFrameworkException) {
logger.error("Workflow validation failed for template : " + templateWithUser.name());
listener.onFailure(e);

Check warning on line 93 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L92-L93

Added lines #L92 - L93 were not covered by tests
} else {
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 95 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L95

Added line #L95 was not covered by tests
}
return;
}

Check warning on line 98 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L97-L98

Added lines #L97 - L98 were not covered by tests
}

if (request.getWorkflowId() == null) {
// Create new global context and state index entries
flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> {
Expand Down Expand Up @@ -135,4 +160,11 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
}
}

private void validateWorkflows(Template template) throws Exception {
for (Workflow workflow : template.workflows().values()) {
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(workflow);
workflowProcessSorter.validateGraph(sortedNodes);
}
}

Check warning on line 168 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L165-L168

Added lines #L165 - L168 were not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,30 @@ public class WorkflowRequest extends ActionRequest {
*/
@Nullable
private Template template;
/**
* Validation flag
*/
private boolean dryRun;

/**
* Instantiates a new WorkflowRequest
* Instantiates a new WorkflowRequest and defaults dry run to false
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) {
this(workflowId, template, false);
}

/**
* Instantiates a new WorkflowRequest
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
* @param dryRun flag to indicate if validation is necessary
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean dryRun) {
this.workflowId = workflowId;
this.template = template;
this.dryRun = dryRun;
}

/**
Expand All @@ -53,6 +68,7 @@ public WorkflowRequest(StreamInput in) throws IOException {
this.workflowId = in.readOptionalString();
String templateJson = in.readOptionalString();
this.template = templateJson == null ? null : Template.parse(templateJson);
this.dryRun = in.readBoolean();
}

/**
Expand All @@ -73,11 +89,20 @@ public Template getTemplate() {
return this.template;
}

/**
* Gets the dry run validation flag
* @return the dry run boolean
*/
public boolean isDryRun() {
return this.dryRun;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(workflowId);
out.writeOptionalString(template == null ? null : template.toJson());
out.writeBoolean(dryRun);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -56,6 +57,7 @@ public void setUp() throws Exception {
this.createWorkflowTransportAction = new CreateWorkflowTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
mock(WorkflowProcessSorter.class),
flowFrameworkIndicesHandler,
client
);
Expand Down

0 comments on commit eefe602

Please sign in to comment.