Skip to content

Commit

Permalink
Initial commit, Adds ReprovisionWorkflowTransportAction, reprovision …
Browse files Browse the repository at this point in the history
…param for RestCreateWorkflowAction, creates and registers Update Ingest/Search pipeline steps in WorkflowResources, registers update steps in WorkflowStepFactory

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Jun 5, 2024
1 parent 13b32f1 commit 773fbb8
Show file tree
Hide file tree
Showing 13 changed files with 568 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction;
Expand Down Expand Up @@ -171,7 +173,8 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStepAction.INSTANCE, GetWorkflowStepTransportAction.class),
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class)
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class),
new ActionHandler<>(ReprovisionWorkflowAction.INSTANCE, ReprovisionWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private CommonValue() {}
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
public static final String USE_CASE = "use_case";
/** The param name for reprovisioning, used by the create workflow API */
public static final String REPROVISION_WORKFLOW = "reprovision";

/*
* Constants associated with plugin configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import org.opensearch.flowframework.workflow.RegisterLocalSparseEncodingModelStep;
import org.opensearch.flowframework.workflow.RegisterModelGroupStep;
import org.opensearch.flowframework.workflow.RegisterRemoteModelStep;
import org.opensearch.flowframework.workflow.ReindexStep;
import org.opensearch.flowframework.workflow.UndeployModelStep;
import org.opensearch.flowframework.workflow.UpdateIngestPipelineStep;
import org.opensearch.flowframework.workflow.UpdateSearchPipelineStep;

import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -40,29 +41,36 @@
public enum WorkflowResources {

/** Workflow steps for creating/deleting a connector and associated created resource */
CREATE_CONNECTOR(CreateConnectorStep.NAME, WorkflowResources.CONNECTOR_ID, DeleteConnectorStep.NAME),
CREATE_CONNECTOR(CreateConnectorStep.NAME, WorkflowResources.CONNECTOR_ID, DeleteConnectorStep.NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting a remote model and associated created resource */
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting a local model and associated created resource */
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting a local sparse encoding model and associated created resource */
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(
RegisterLocalSparseEncodingModelStep.NAME,
WorkflowResources.MODEL_ID,
DeleteModelStep.NAME,
NoOpStep.NAME
),
/** Workflow steps for registering/deleting a local OpenSearch provided pretrained model and associated created resource */
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting a model group and associated created resource */
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, WorkflowResources.MODEL_GROUP_ID, NoOpStep.NAME),
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, WorkflowResources.MODEL_GROUP_ID, NoOpStep.NAME, NoOpStep.NAME),
/** Workflow steps for deploying/undeploying a model and associated created resource */
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME),
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME, NoOpStep.NAME),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null, UpdateIngestPipelineStep.NAME), // TODO
// delete
// step
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null, UpdateSearchPipelineStep.NAME), // TODO
// delete
// step
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME, NoOpStep.NAME);

/** Connector Id for a remote model connector */
public static final String CONNECTOR_ID = "connector_id";
Expand All @@ -80,15 +88,17 @@ public enum WorkflowResources {
private final String workflowStep;
private final String resourceCreated;
private final String deprovisionStep;
private final String updateStep;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) {
WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep, String updateStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
this.deprovisionStep = deprovisionStep;
this.updateStep = updateStep;
}

/**
Expand All @@ -115,6 +125,14 @@ public String getDeprovisionStep() {
return deprovisionStep;
}

/**
* Returns the updateStep for the given enum Constant
* @return the updateStep of this data.
*/
public String getUpdateStep() {
return updateStep;
}

/**
* Gets the resources created type based on the workflowStep.
* @param workflowStep workflow step name
Expand All @@ -124,7 +142,9 @@ public String getDeprovisionStep() {
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (workflowStep.equals(mapping.getWorkflowStep()) || workflowStep.equals(mapping.getDeprovisionStep())) {
if (workflowStep.equals(mapping.getWorkflowStep())
|| workflowStep.equals(mapping.getDeprovisionStep())
|| workflowStep.equals(mapping.getUpdateStep())) {
return mapping.getResourceCreated();
}
}
Expand All @@ -151,6 +171,24 @@ public static String getDeprovisionStepByWorkflowStep(String workflowStep) throw
throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Gets the update step type based on the workflowStep.
* @param workflowStep workflow step name
* @return the corresponding step to update
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getUpdateStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
return mapping.getUpdateStep();
}
}
}
logger.error("Unable to find update step for step: {}", workflowStep);
throw new FlowFrameworkException("Unable to find update step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.REPROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
Expand Down Expand Up @@ -73,7 +74,7 @@ public List<Route> routes() {
return List.of(
// Create new workflow
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s", WORKFLOW_URI)),
// Update use case template
// Update use case template/ reprovision existing workflow
new Route(RestRequest.Method.PUT, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID))
);
}
Expand All @@ -83,7 +84,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String workflowId = request.param(WORKFLOW_ID);
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false);
String useCase = request.param(USE_CASE);

// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
? request.params()
Expand Down Expand Up @@ -195,7 +198,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
provision,
params,
useCase,
useCaseDefaultsMap
useCaseDefaultsMap,
reprovision
);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,27 +249,65 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
request.getWorkflowId(),
template,
ActionListener.wrap(response -> {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name());
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
if (request.isReprovision()) {

// Reprovision request
ReprovisionWorkflowRequest reprovisionRequest = new ReprovisionWorkflowRequest(
getResponse.getId(),
existingTemplate,
template
);
logger.info(
"Reprovisioning parameter is set, continuing to reprovision workflow {}",
getResponse.getId()
);
client.execute(
ReprovisionWorkflowAction.INSTANCE,
reprovisionRequest,
ActionListener.wrap(reprovisionResponse -> {
listener.onResponse(new WorkflowResponse(reprovisionResponse.getWorkflowId()));
}, exception -> {
String errorMessage = "Reprovisioning failed.";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
}
})
);
} else {
// Regular update, reset provisioning status
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info(
"updated workflow {} state to {}",
request.getWorkflowId(),
State.NOT_STARTED.name()
);
}
})
);
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow "
+ request.getWorkflowId()
+ " in template index";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
}
})
);
}
}, exception -> {
String errorMessage = "Failed to update use case template " + request.getWorkflowId();
logger.error(errorMessage, exception);
Expand All @@ -278,7 +316,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
} else {
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
})
}),
request.isReprovision() // ignores NOT_STARTED state if request is to reprovision
);
} else {
String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context.";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestCreateWorkflowAction
*/
public class ReprovisionWorkflowAction extends ActionType<WorkflowResponse> {

/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/reprovision";
/** An instance of this action */
public static final ReprovisionWorkflowAction INSTANCE = new ReprovisionWorkflowAction();

private ReprovisionWorkflowAction() {
super(NAME, WorkflowResponse::new);
}
}
Loading

0 comments on commit 773fbb8

Please sign in to comment.