From 4ea348afe294a70ae91c71d6eb641c8e94fd28ee Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 30 May 2024 18:00:51 -0700 Subject: [PATCH] Add param to delete workflow API to clear status even if resources exist (#719) Signed-off-by: Daniel Widdis --- CHANGELOG.md | 1 + .../flowframework/common/CommonValue.java | 2 ++ .../indices/FlowFrameworkIndicesHandler.java | 12 ++++++-- .../flowframework/model/WorkflowState.java | 2 +- .../rest/RestDeleteWorkflowAction.java | 4 ++- .../rest/RestGetWorkflowStateAction.java | 2 +- .../DeleteWorkflowTransportAction.java | 6 +++- .../GetWorkflowStateTransportAction.java | 4 +-- .../FlowFrameworkRestTestCase.java | 29 +++++++++++++++++-- .../FlowFrameworkIndicesHandlerTests.java | 28 +++++++++--------- .../rest/FlowFrameworkRestApiIT.java | 21 ++++++++++++++ 11 files changed, 84 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bc658575..5021e3f14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Features ### Enhancements - Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718)) +- Add param to delete workflow API to clear status even if resources exist ([#719](https://github.com/opensearch-project/flow-framework/pull/719)) ### Bug Fixes - Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705)) diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index 2a835b852..87c2f2180 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -199,6 +199,8 @@ private CommonValue() {} public static final String USER_OUTPUTS_FIELD = "user_outputs"; /** The template field name for template resources created */ public static final String RESOURCES_CREATED_FIELD = "resources_created"; + /** The parameter to clear workflow state when deleting template */ + public static final String CLEAR_STATUS = "clear_status"; /** The field name for the step name where a resource is created */ public static final String WORKFLOW_STEP_NAME = "workflow_step_name"; /** The field name for the step ID where a resource is created */ diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 34cdf2b56..8fcfd1207 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -525,11 +525,17 @@ public void getProvisioningProgress( * Check workflow provisioning state and resources to see if state can be deleted with template * * @param documentId document id - * @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources + * @param clearStatus if set true, always deletes the state document unless status is IN_PROGRESS + * @param canDeleteStateConsumer consumer function which will be true if workflow state is not IN_PROGRESS and either no resources or true clearStatus * @param listener action listener from caller to fail on error * @param action listener response type */ - public void canDeleteWorkflowStateDoc(String documentId, Consumer canDeleteStateConsumer, ActionListener listener) { + public void canDeleteWorkflowStateDoc( + String documentId, + boolean clearStatus, + Consumer canDeleteStateConsumer, + ActionListener listener + ) { GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { client.get(getRequest, ActionListener.wrap(response -> { @@ -545,7 +551,7 @@ public void canDeleteWorkflowStateDoc(String documentId, Consumer c ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); WorkflowState workflowState = WorkflowState.parse(parser); canDeleteStateConsumer.accept( - workflowState.resourcesCreated().isEmpty() + (clearStatus || workflowState.resourcesCreated().isEmpty()) && !ProvisioningProgress.IN_PROGRESS.equals( ProvisioningProgress.valueOf(workflowState.getProvisioningProgress()) ) diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java index 29d808fb7..6b1593b6e 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java @@ -70,7 +70,7 @@ public class WorkflowState implements ToXContentObject, Writeable { * @param provisionEndTime Indicates the end time of the whole provisioning flow * @param user The user extracted from the thread context from the request * @param userOutputs A map of essential API responses for backend to use and lookup. - * @param resourcesCreated A map of all the resources created. + * @param resourcesCreated A list of all the resources created. */ public WorkflowState( String workflowId, diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java index a7c8711dd..ebef48c86 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Locale; +import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; @@ -62,6 +63,7 @@ public List routes() { @Override protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { String workflowId = request.param(WORKFLOW_ID); + request.param(CLEAR_STATUS); // consume and ignore, we will pass params to workflow try { if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) { throw new FlowFrameworkException( @@ -78,7 +80,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request if (workflowId == null) { throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); } - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, request.params()); return channel -> client.execute(DeleteWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java index b95c75a56..bdf4df35f 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java @@ -83,7 +83,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request try { FlowFrameworkException ex = exception instanceof FlowFrameworkException ? (FlowFrameworkException) exception - : new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception)); + : new FlowFrameworkException("Failed to get workflow status.", ExceptionsHelper.status(exception)); XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); } catch (IOException e) { diff --git a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java index 0e4699f20..151c3da7c 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.util.Booleans; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.support.ActionFilters; @@ -24,6 +25,7 @@ import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; /** @@ -65,10 +67,12 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener stateListener = ActionListener.wrap(response -> { logger.info("Deleted workflow state doc: {}", workflowId); }, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); }); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> { + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> { if (Boolean.TRUE.equals(canDelete)) { flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener); } diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java index f2303a606..9625ce731 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java @@ -83,11 +83,11 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); } } else { - listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND)); + listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND)); } }, e -> { if (e instanceof IndexNotFoundException) { - listener.onFailure(new FlowFrameworkException("Fail to find workflow " + workflowId, RestStatus.NOT_FOUND)); + listener.onFailure(new FlowFrameworkException("Fail to find workflow status of " + workflowId, RestStatus.NOT_FOUND)); } else { String errorMessage = "Failed to get workflow status of: " + workflowId; logger.error(errorMessage, e); diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 2eaff69b4..9a1d89c2e 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -28,6 +28,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Request; import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.common.settings.Settings; @@ -454,10 +455,22 @@ protected Response deprovisionWorkflow(RestClient client, String workflowId) thr * @throws Exception if the request fails */ protected Response deleteWorkflow(RestClient client, String workflowId) throws Exception { + return deleteWorkflow(client, workflowId, ""); + } + + /** + * Helper method to invoke the Delete Workflow Rest Action + * @param client the rest client + * @param workflowId the workflow ID to delete + * @param params a string adding any rest path params + * @return a rest response + * @throws Exception if the request fails + */ + protected Response deleteWorkflow(RestClient client, String workflowId, String params) throws Exception { return TestHelpers.makeRequest( client, "DELETE", - String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, workflowId), + String.format(Locale.ROOT, "%s/%s%s", WORKFLOW_URI, workflowId, params), Collections.emptyMap(), "", null @@ -481,7 +494,6 @@ protected Response getWorkflowStatus(RestClient client, String workflowId, boole "", null ); - } /** @@ -586,7 +598,7 @@ protected SearchResponse searchWorkflowState(RestClient client, String query) th } /** - * Helper method to invoke the Get Workflow Rest Action and assert the provisioning and state status + * Helper method to invoke the Get Workflow Status Rest Action and assert the provisioning and state status * @param client the rest client * @param workflowId the workflow ID to get the status * @param stateStatus the state status name @@ -607,6 +619,17 @@ protected void getAndAssertWorkflowStatus( assertEquals(provisioningStatus.name(), (String) responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD)); } + /** + * Helper method to invoke the Get Workflow Status Rest Action and assert document is not found + * @param client the rest client + * @param workflowId the workflow ID to get the status + * @throws Exception if the request fails + */ + protected void getAndAssertWorkflowStatusNotFound(RestClient client, String workflowId) throws Exception { + ResponseException ex = assertThrows(ResponseException.class, () -> getWorkflowStatus(client, workflowId, true)); + assertEquals(RestStatus.NOT_FOUND.getStatus(), ex.getResponse().getStatusLine().getStatusCode()); + } + /** * Helper method to invoke the Get Workflow status Rest Action and get the error field * @param client the rest client diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index ad2ebca0e..f55cb3bc1 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -72,6 +72,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings("deprecation") public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase { @Mock private Client client; @@ -262,19 +263,10 @@ public void testInitIndexIfAbsent_IndexNotPresent() { public void testIsWorkflowProvisionedFailedParsing() { String documentId = randomAlphaOfLength(5); + @SuppressWarnings("unchecked") Consumer> function = mock(Consumer.class); + @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - WorkflowState workFlowState = new WorkflowState( - documentId, - "test", - "PROVISIONING", - "IN_PROGRESS", - Instant.now(), - Instant.now(), - TestHelpers.randomUser(), - Collections.emptyMap(), - Collections.emptyList() - ); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); @@ -318,7 +310,7 @@ public void testCanDeleteWorkflowStateDoc() { return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertTrue(canDelete); }, listener); } public void testCanNotDeleteWorkflowStateDocInProgress() { @@ -347,10 +339,10 @@ public void testCanNotDeleteWorkflowStateDocInProgress() { return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertFalse(canDelete); }, listener); } - public void testCanNotDeleteWorkflowStateDocResourcesExist() { + public void testDeleteWorkflowStateDocResourcesExist() { String documentId = randomAlphaOfLength(5); @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); @@ -376,12 +368,18 @@ public void testCanNotDeleteWorkflowStateDocResourcesExist() { return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener); + // Can't delete because resources exist + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertFalse(canDelete); }, listener); + + // But can delete if clearStatus set true + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertTrue(canDelete); }, listener); } public void testDoesTemplateExist() { String documentId = randomAlphaOfLength(5); + @SuppressWarnings("unchecked") Consumer function = mock(Consumer.class); + @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index c516978f0..7f51cd276 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -175,6 +175,17 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception { assertNotNull(resourcesCreated.get(0).resourceId()); assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName()); assertNotNull(resourcesCreated.get(1).resourceId()); + + // Delete the workflow without deleting the resources + Response deleteResponse = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); + + // Verify state doc is not deleted + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, + 30, + TimeUnit.SECONDS + ); } public void testCreateAndProvisionCyclicalTemplate() throws Exception { @@ -235,6 +246,13 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception { assertNotNull(resourcesCreated.get(1).resourceId()); assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName()); assertNotNull(resourcesCreated.get(2).resourceId()); + + // Delete the workflow without deleting the resources + Response deleteResponse = deleteWorkflow(client(), workflowId, "?clear_status=true"); + assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); + + // Verify state doc is deleted + assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS); } public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { @@ -305,6 +323,9 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { // Hit Delete API Response deleteResponse = deleteWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); + + // Verify state doc is deleted + assertBusy(() -> { getAndAssertWorkflowStatusNotFound(client(), workflowId); }, 30, TimeUnit.SECONDS); } public void testTimestamps() throws Exception {