Skip to content

Commit

Permalink
[Backport 2.x] Delete workflow state when template is deleted and no …
Browse files Browse the repository at this point in the history
…resources exist (#694)

Delete workflow state when template is deleted and no resources exist (#689)

* Add test for updating state index



* Add method to delete state document to FlowFrameworkIndidesHandler



* Delete state document on deprovision if no template exists



* Add check whether state can be deleted to FlowFrameworkIndicesHandler



* Conditionally delete state when deleting template



* Add IT to test workflow state deletion on deprovision/no-resources



* Fix typo



* Properly test ResponseException on Workflow State 404



* Fetch workflowId of newly created template



* Stabilize IT



---------


(cherry picked from commit ff072de)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 67dc711 commit 99c5ca1
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Silently ignore content on APIs that don't require it ([#639](https://github.com/opensearch-project/flow-framework/pull/639))
- Hide user and credential field from search response ([#680](https://github.com/opensearch-project/flow-framework/pull/680))
- Throw the correct error message in status API for WorkflowSteps ([#676](https://github.com/opensearch-project/flow-framework/pull/676))
- Delete workflow state when template is deleted and no resources exist ([#689](https://github.com/opensearch-project/flow-framework/pull/689))

### Infrastructure
- Switch macos runner to macos-13 from macos-latest since macos-latest is now arm64 ([#686](https://github.com/opensearch-project/flow-framework/pull/686))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
Expand Down Expand Up @@ -519,6 +521,51 @@ public <T> void getProvisioningProgress(
}
}

/**
* Check workflow provisioning state and resources to see if state can be deleted with template
*
* @param documentId document id
* @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources
* @param listener action listener from caller to fail on error
* @param <T> action listener response type
*/
public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> canDeleteStateConsumer, ActionListener<T> listener) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();
if (!response.isExists()) {
// no need to delete if it's not there to start with
canDeleteStateConsumer.accept(Boolean.FALSE);
return;
}
try (
XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
canDeleteStateConsumer.accept(
workflowState.resourcesCreated().isEmpty()
&& !ProvisioningProgress.IN_PROGRESS.equals(
ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())
)
);
} catch (Exception e) {
String errorMessage = "Failed to parse workflow state " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
}, exception -> {
logger.error("Failed to get workflow state for {} ", documentId);
canDeleteStateConsumer.accept(Boolean.FALSE);
}));
} catch (Exception e) {
String errorMessage = "Failed to retrieve workflow state to check provisioning status";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}

/**
* Updates a document in the workflow state index
* @param documentId the document ID
Expand All @@ -531,7 +578,7 @@ public void updateFlowFrameworkSystemIndexDoc(
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to update document for given workflow due to missing " + WORKFLOW_STATE_INDEX + " index";
String errorMessage = "Failed to update document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
Expand All @@ -551,6 +598,29 @@ public void updateFlowFrameworkSystemIndexDoc(
}
}

/**
* Deletes a document in the workflow state index
* @param documentId the document ID
* @param listener action listener
*/
public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<DeleteResponse> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
}

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
}
}, stateListener);
} else {
String errorMessage = "There are no templates in the global context";
logger.error(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,23 @@ private void updateWorkflowState(
) {
if (remainingResources.isEmpty()) {
// Successful deprovision, reset state to initial
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
workflowId,
getUserContext(client),
ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })
);
// return workflow ID
listener.onResponse(new WorkflowResponse(workflowId));
flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> {
if (Boolean.TRUE.equals(templateExists)) {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
workflowId,
getUserContext(client),
ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })
);
} else {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> {
logger.info("Deleted workflow {} state", workflowId);
}, exception -> { logger.error("Failed to delete workflow state for {}", workflowId, exception); }));
}
// return workflow ID
listener.onResponse(new WorkflowResponse(workflowId));
}, listener);
} else {
// Failed deprovision
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
package org.opensearch.flowframework.indices;

import org.opensearch.Version;
import org.opensearch.action.DocWriteResponse.Result;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
Expand All @@ -29,9 +34,11 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
Expand Down Expand Up @@ -285,6 +292,93 @@ public void testIsWorkflowProvisionedFailedParsing() {
assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state"));
}

public void testCanDeleteWorkflowStateDoc() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"NOT_STARTED",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocInProgress() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"IN_PROGRESS",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocResourcesExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"DONE",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
List.of(new ResourceCreated("w", "x", "y", "z"))
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testDoesTemplateExist() {
String documentId = randomAlphaOfLength(5);
Consumer<Boolean> function = mock(Consumer.class);
Expand All @@ -302,4 +396,98 @@ public void testDoesTemplateExist() {
flowFrameworkIndicesHandler.doesTemplateExist(documentId, function, listener);
verify(function).accept(true);
}

public void testUpdateFlowFrameworkSystemIndexDoc() throws IOException {
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
when(clusterService.state()).thenReturn(mockClusterState);
when(mockClusterState.metadata()).thenReturn(mockMetaData);
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true);

@SuppressWarnings("unchecked")
ActionListener<UpdateResponse> listener = mock(ActionListener.class);

// test success
doAnswer(invocation -> {
ActionListener<UpdateResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, Result.UPDATED));
return null;
}).when(client).update(any(UpdateRequest.class), any());

flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

ArgumentCaptor<UpdateResponse> responseCaptor = ArgumentCaptor.forClass(UpdateResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(Result.UPDATED, responseCaptor.getValue().getResult());

// test failure
doAnswer(invocation -> {
ActionListener<UpdateResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to update state"));
return null;
}).when(client).update(any(UpdateRequest.class), any());

flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to update state", exceptionCaptor.getValue().getMessage());

// test no index
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

verify(listener, times(2)).onFailure(exceptionCaptor.capture());
assertEquals(
"Failed to update document 1 due to missing .plugins-flow-framework-state index",
exceptionCaptor.getValue().getMessage()
);
}

public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException {
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
when(clusterService.state()).thenReturn(mockClusterState);
when(mockClusterState.metadata()).thenReturn(mockMetaData);
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true);

@SuppressWarnings("unchecked")
ActionListener<DeleteResponse> listener = mock(ActionListener.class);

// test success
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new DeleteResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, true));
return null;
}).when(client).delete(any(DeleteRequest.class), any());

flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

ArgumentCaptor<DeleteResponse> responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(Result.DELETED, responseCaptor.getValue().getResult());

// test failure
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to delete state"));
return null;
}).when(client).delete(any(DeleteRequest.class), any());

flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to delete state", exceptionCaptor.getValue().getMessage());

// test no index
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false);
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

verify(listener, times(2)).onFailure(exceptionCaptor.capture());
assertEquals(
"Failed to delete document 1 due to missing .plugins-flow-framework-state index",
exceptionCaptor.getValue().getMessage()
);
}
}
Loading

0 comments on commit 99c5ca1

Please sign in to comment.