Skip to content

Commit

Permalink
Add allow_delete parameter to Deprovision API (#763)
Browse files Browse the repository at this point in the history
* Add allow_delete parameter to Deprovision API

Signed-off-by: Daniel Widdis <[email protected]>

* Add denylist for workflow step types users are not allowed to use

Signed-off-by: Daniel Widdis <[email protected]>

* Remove denylisted steps from GetWorkflowStep response

Signed-off-by: Daniel Widdis <[email protected]>

* Remove mention of new steps from change log

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Jul 16, 2024
1 parent 9f83d94 commit 7e7e993
Show file tree
Hide file tree
Showing 21 changed files with 1,109 additions and 165 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))
- Add allow_delete parameter to Deprovision API ([#763](https://github.com/opensearch-project/flow-framework/pull/763))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,7 @@ public Collection<Object> createComponents(
flowFrameworkSettings,
client
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(
workflowStepFactory,
threadPool,
clusterService,
client,
flowFrameworkSettings
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings);

return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler, flowFrameworkSettings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ private CommonValue() {}
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for template validation, the flag to indicate if validation is necessary */
public static final String VALIDATION = "validation";
/** The param name for provision workflow in create API */
/** Param name for allow deletion during deprovisioning */
public static final String ALLOW_DELETE = "allow_delete";
/** The field name for provision workflow within a use case template*/
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import org.opensearch.flowframework.workflow.CreateSearchPipelineStep;
import org.opensearch.flowframework.workflow.DeleteAgentStep;
import org.opensearch.flowframework.workflow.DeleteConnectorStep;
import org.opensearch.flowframework.workflow.DeleteIndexStep;
import org.opensearch.flowframework.workflow.DeleteIngestPipelineStep;
import org.opensearch.flowframework.workflow.DeleteModelStep;
import org.opensearch.flowframework.workflow.DeleteSearchPipelineStep;
import org.opensearch.flowframework.workflow.DeployModelStep;
import org.opensearch.flowframework.workflow.NoOpStep;
import org.opensearch.flowframework.workflow.RegisterAgentStep;
Expand Down Expand Up @@ -54,11 +57,11 @@ public enum WorkflowResources {
/** Workflow steps for deploying/undeploying a model and associated created resource */
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteIngestPipelineStep.NAME),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteSearchPipelineStep.NAME),
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -57,6 +60,7 @@ public String getName() {
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
String allowDelete = request.param(ALLOW_DELETE);
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
Expand All @@ -73,7 +77,11 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
null,
allowDelete == null ? Collections.emptyMap() : Map.of(ALLOW_DELETE, allowDelete)
);

return channel -> client.execute(DeprovisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -40,8 +42,10 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE;
import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_END_TIME_FIELD;
Expand Down Expand Up @@ -95,6 +99,7 @@ public DeprovisionWorkflowTransportAction(
@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {
String workflowId = request.getWorkflowId();
String allowDelete = request.getParams().get(ALLOW_DELETE);
GetWorkflowStateRequest getStateRequest = new GetWorkflowStateRequest(workflowId, true);

// Stash thread context to interact with system index
Expand All @@ -103,9 +108,17 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
client.execute(GetWorkflowStateAction.INSTANCE, getStateRequest, ActionListener.wrap(response -> {
context.restore();

Set<String> deleteAllowedResources = Strings.tokenizeByCommaToSet(allowDelete);
// Retrieve resources from workflow state and deprovision
threadPool.executor(DEPROVISION_WORKFLOW_THREAD_POOL)
.execute(() -> executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener));
.execute(
() -> executeDeprovisionSequence(
workflowId,
response.getWorkflowState().resourcesCreated(),
deleteAllowedResources,
listener
)
);
}, exception -> {
String errorMessage = "Failed to get workflow state for workflow " + workflowId;
logger.error(errorMessage, exception);
Expand All @@ -121,26 +134,28 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
private void executeDeprovisionSequence(
String workflowId,
List<ResourceCreated> resourcesCreated,
Set<String> deleteAllowedResources,
ActionListener<WorkflowResponse> listener
) {

List<ResourceCreated> deleteNotAllowed = new ArrayList<>();
// Create a list of ProcessNodes with the corresponding deprovision workflow steps
List<ProcessNode> deprovisionProcessSequence = new ArrayList<>();
for (ResourceCreated resource : resourcesCreated) {
String workflowStepId = resource.workflowStepId();

String stepName = resource.workflowStepName();
String deprovisionStep = getDeprovisionStepByWorkflowStep(stepName);
// Unimplemented steps presently return null, so skip
if (deprovisionStep == null) {
WorkflowStep deprovisionStep = workflowStepFactory.createStep(getDeprovisionStepByWorkflowStep(stepName));
// Skip if the step requires allow_delete but the resourceId isn't included
if (deprovisionStep.allowDeleteRequired() && !deleteAllowedResources.contains(resource.resourceId())) {
deleteNotAllowed.add(resource);
continue;
}
// New ID is old ID with (deprovision step type) prepended
String deprovisionStepId = "(deprovision_" + stepName + ") " + workflowStepId;
deprovisionProcessSequence.add(
new ProcessNode(
deprovisionStepId,
workflowStepFactory.createStep(deprovisionStep),
deprovisionStep,
Collections.emptyMap(),
Collections.emptyMap(),
new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId),
Expand Down Expand Up @@ -215,17 +230,21 @@ private void executeDeprovisionSequence(
List<ResourceCreated> remainingResources = deprovisionProcessSequence.stream()
.map(pn -> getResourceFromDeprovisionNode(pn, resourcesCreated))
.collect(Collectors.toList());
logger.info("Resources remaining: {}", remainingResources);
updateWorkflowState(workflowId, remainingResources, listener);
logger.info("Resources remaining: {}.", remainingResources);
if (!deleteNotAllowed.isEmpty()) {
logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed);
}
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener);
}

private void updateWorkflowState(
String workflowId,
List<ResourceCreated> remainingResources,
List<ResourceCreated> deleteNotAllowed,
ActionListener<WorkflowResponse> listener
) {
if (remainingResources.isEmpty()) {
// Successful deprovision, reset state to initial
if (remainingResources.isEmpty() && deleteNotAllowed.isEmpty()) {
// Successful deprovision of all resources, reset state to initial
flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> {
if (Boolean.TRUE.equals(templateExists)) {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
Expand All @@ -244,35 +263,49 @@ private void updateWorkflowState(
listener.onResponse(new WorkflowResponse(workflowId));
}, listener);
} else {
// Failed deprovision
// Remaining resources only includes ones we tried to delete
List<ResourceCreated> stateIndexResources = new ArrayList<>(remainingResources);
// Add in those we skipped
stateIndexResources.addAll(deleteNotAllowed);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Map.entry(STATE_FIELD, State.COMPLETED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.DONE),
Map.entry(PROVISION_END_TIME_FIELD, Instant.now().toEpochMilli()),
Map.entry(RESOURCES_CREATED_FIELD, remainingResources)
Map.entry(RESOURCES_CREATED_FIELD, stateIndexResources)
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to COMPLETED", workflowId);
}, exception -> { logger.error("Failed to update workflow {} state", workflowId, exception); })
);
// give user list of remaining resources
StringBuilder message = new StringBuilder();
appendResourceInfo(message, "Failed to deprovision some resources: ", remainingResources);
appendResourceInfo(message, "These resources require the " + ALLOW_DELETE + " parameter to deprovision: ", deleteNotAllowed);
listener.onFailure(
new FlowFrameworkException(
"Failed to deprovision some resources: ["
+ remainingResources.stream()
.map(DeprovisionWorkflowTransportAction::getResourceNameAndId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.joining(", "))
+ "].",
RestStatus.ACCEPTED
)
new FlowFrameworkException(message.toString(), remainingResources.isEmpty() ? RestStatus.FORBIDDEN : RestStatus.ACCEPTED)
);
}
}

private static void appendResourceInfo(StringBuilder message, String prefix, List<ResourceCreated> resources) {
if (!resources.isEmpty()) {
if (message.length() > 0) {
message.append(" ");
}
message.append(prefix)
.append(
resources.stream()
.map(DeprovisionWorkflowTransportAction::getResourceNameAndId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.joining(", ", "[", "]"))
)
.append(".");
}
}

private static ResourceCreated getResourceFromDeprovisionNode(ProcessNode deprovisionNode, List<ResourceCreated> resourcesCreated) {
return resourcesCreated.stream()
.filter(r -> deprovisionNode.id().equals("(deprovision_" + r.workflowStepName() + ") " + r.workflowStepId()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete an index
*/
public class DeleteIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(DeleteIndexStep.class);
private final Client client;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "delete_index";
/** Required input keys */
public static final Set<String> REQUIRED_INPUTS = Set.of(INDEX_NAME);
/** Optional input keys */
public static final Set<String> OPTIONAL_INPUTS = Collections.emptySet();
/** Provided output keys */
public static final Set<String> PROVIDED_OUTPUTS = Set.of(INDEX_NAME);

/**
* Instantiate this class
*
* @param client Client to delete an index
*/
public DeleteIndexStep(Client client) {
this.client = client;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {
PlainActionFuture<WorkflowData> deleteIndexFuture = PlainActionFuture.newFuture();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
REQUIRED_INPUTS,
OPTIONAL_INPUTS,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String indexName = (String) inputs.get(INDEX_NAME);

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);

client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(acknowledgedResponse -> {
logger.info("Deleted index: {}", indexName);
deleteIndexFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(INDEX_NAME, indexName)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, ex -> {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete the index " + indexName : e.getMessage());
logger.error(errorMessage, e);
deleteIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}));
} catch (Exception e) {
deleteIndexFuture.onFailure(e);
}
return deleteIndexFuture;
}

@Override
public String getName() {
return NAME;
}

@Override
public boolean allowDeleteRequired() {
return true;
}
}
Loading

0 comments on commit 7e7e993

Please sign in to comment.