From 5c4a3a09075f8e51f341fdf1b06c4bc887803a9d Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 3 Oct 2024 10:23:36 -0700 Subject: [PATCH 1/3] Add method to delete a resource from the resources_created field Signed-off-by: Daniel Widdis --- .../indices/FlowFrameworkIndicesHandler.java | 78 +++++-- .../FlowFrameworkIndicesHandlerTests.java | 198 +++++++++++++++++- 2 files changed, 254 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index cb2dee56..345c1589 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.DocWriteRequest.OpType; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -693,6 +694,7 @@ public void addResourceToStateIndex( getAndUpdateResourceInStateDocumentWithRetries( workflowId, newResource, + OpType.INDEX, RETRIES, ActionListener.runBefore(listener, context::restore) ); @@ -701,15 +703,41 @@ public void addResourceToStateIndex( } /** - * Performs a get and update of a State Index document adding a new resource with strong consistency and retries + * Removes a resource from the state index, including common exception handling + * @param workflowId The workflow document id in the state index + * @param resourceToDelete The resource to delete + * @param listener the ActionListener for this step to handle completing the future after update + */ + public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener listener) { + if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { + String errorMessage = "Failed to update state for " + workflowId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; + logger.error(errorMessage); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND)); + } else { + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + getAndUpdateResourceInStateDocumentWithRetries( + workflowId, + resourceToDelete, + OpType.DELETE, + RETRIES, + ActionListener.runBefore(listener, context::restore) + ); + } + } + } + + /** + * Performs a get and update of a State Index document adding or removing a resource with strong consistency and retries * @param workflowId The document id to update - * @param newResource The resource to add to the resources created list + * @param resource The resource to add or remove from the resources created list + * @param operation The operation to perform on the resource (INDEX to append to the list or DELETE to remove) * @param retries The number of retries on update version conflicts * @param listener The listener to complete on success or failure */ private void getAndUpdateResourceInStateDocumentWithRetries( String workflowId, - ResourceCreated newResource, + ResourceCreated resource, + OpType operation, int retries, ActionListener listener ) { @@ -721,7 +749,11 @@ private void getAndUpdateResourceInStateDocumentWithRetries( } WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString()); List resourcesCreated = new ArrayList<>(currentState.resourcesCreated()); - resourcesCreated.add(newResource); + if (operation == OpType.DELETE) { + resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap())); + } else { + resourcesCreated.add(resource); + } XContentBuilder builder = XContentFactory.jsonBuilder(); WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build(); newState.toXContent(builder, null); @@ -732,41 +764,51 @@ private void getAndUpdateResourceInStateDocumentWithRetries( client.update( updateRequest, ActionListener.wrap( - r -> handleStateUpdateSuccess(workflowId, newResource, listener), - e -> handleStateUpdateException(workflowId, newResource, retries, listener, e) + r -> handleStateUpdateSuccess(workflowId, resource, operation, listener), + e -> handleStateUpdateException(workflowId, resource, operation, retries, listener, e) ) ); - }, ex -> handleStateUpdateException(workflowId, newResource, 0, listener, ex))); + }, ex -> handleStateUpdateException(workflowId, resource, operation, 0, listener, ex))); } - private void handleStateUpdateSuccess(String workflowId, ResourceCreated newResource, ActionListener listener) { + private void handleStateUpdateSuccess( + String workflowId, + ResourceCreated newResource, + OpType operation, + ActionListener listener + ) { String resourceName = newResource.resourceType(); String resourceId = newResource.resourceId(); String nodeId = newResource.workflowStepId(); - logger.info("Updated resources created for {} on step {} with {} {}", workflowId, nodeId, resourceName, resourceId); + logger.info( + "Updated resources created for {} on step {} to {} resource {} {}", + workflowId, + nodeId, + operation.equals(OpType.DELETE) ? "delete" : "add", + resourceName, + resourceId + ); listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), workflowId, nodeId)); } private void handleStateUpdateException( String workflowId, ResourceCreated newResource, + OpType operation, int retries, ActionListener listener, Exception e ) { if (e instanceof VersionConflictEngineException && retries > 0) { // Retry if we haven't exhausted retries - getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, retries - 1, listener); + getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, operation, retries - 1, listener); return; } - String errorMessage = "Failed to update workflow state for " - + workflowId - + " on step " - + newResource.workflowStepId() - + " with " - + newResource.resourceType() - + " " - + newResource.resourceId(); + StringBuilder sb = new StringBuilder("Failed to update workflow state for "); + sb.append(workflowId).append(" on step ").append(newResource.workflowStepId()); + sb.append(" to ").append(operation.equals(OpType.DELETE) ? "delete" : "add"); + sb.append(" resource ").append(newResource.resourceType()).append(" ").append(newResource.resourceId()); + String errorMessage = sb.toString(); logger.error(errorMessage, e); listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); } diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index 3c6c4846..4613cf42 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -551,7 +551,7 @@ public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException { ); } - public void testAddResourceToStateIndex() throws IOException { + public void testAddResourceToStateIndex() { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -607,7 +607,7 @@ public void testAddResourceToStateIndex() throws IOException { ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals( - "Failed to update workflow state for this_id on step node_id with connector_id this_id", + "Failed to update workflow state for this_id on step node_id to add resource connector_id this_id", exceptionCaptor.getValue().getMessage() ); @@ -651,8 +651,102 @@ public void testAddResourceToStateIndex() throws IOException { exceptionCaptor.getValue().getMessage() ); } + + public void testDeleteResourceFromStateIndex() { + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + when(clusterService.state()).thenReturn(mockClusterState); + when(mockClusterState.metadata()).thenReturn(mockMetaData); + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true); + ResourceCreated resourceToDelete = new ResourceCreated("", "node_id", "connector_id", "this_id"); + + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + // test success + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED)); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.deleteResourceFromStateIndex( + "this_id", + resourceToDelete, + listener + ); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); + verify(listener, times(1)).onResponse(responseCaptor.capture()); + assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); + + // test failure + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(new Exception("Failed to update state")); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.deleteResourceFromStateIndex( + "this_id", + resourceToDelete, + listener + ); - public void testAddResourceToStateIndexWithRetries() throws IOException { + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update workflow state for this_id on step node_id to delete resource connector_id this_id", + exceptionCaptor.getValue().getMessage() + ); + + // test document not found + @SuppressWarnings("unchecked") + ActionListener notFoundListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", -2, 0, 1, false, null, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex( + "this_id", + resourceToDelete, + notFoundListener + ); + + exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(notFoundListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals("Workflow state not found for this_id", exceptionCaptor.getValue().getMessage()); + + // test index not found + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false); + @SuppressWarnings("unchecked") + ActionListener indexNotFoundListener = mock(ActionListener.class); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex( + "this_id", + resourceToDelete, + indexNotFoundListener + ); + + exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(indexNotFoundListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update state for this_id due to missing .plugins-flow-framework-state index", + exceptionCaptor.getValue().getMessage() + ); + } + + public void testAddResourceToStateIndexWithRetries() { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -745,7 +839,103 @@ public void testAddResourceToStateIndexWithRetries() throws IOException { ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(threeRetryListener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals( - "Failed to update workflow state for this_id on step node_id with connector_id this_id", + "Failed to update workflow state for this_id on step node_id to add resource connector_id this_id", + exceptionCaptor.getValue().getMessage() + ); + } + + public void testDeleteResourceFromStateIndexWithRetries() { + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + when(clusterService.state()).thenReturn(mockClusterState); + when(mockClusterState.metadata()).thenReturn(mockMetaData); + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true); + VersionConflictEngineException conflictException = new VersionConflictEngineException( + new ShardId(WORKFLOW_STATE_INDEX, "", 1), + "this_id", + null + ); + UpdateResponse updateResponse = new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED); + ResourceCreated resourceToDelete = new ResourceCreated("", "node_id", "connector_id", "this_id"); + + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + + // test success on retry + @SuppressWarnings("unchecked") + ActionListener retryListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(updateResponse); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.deleteResourceFromStateIndex( + "this_id", + resourceToDelete, + retryListener + ); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); + verify(retryListener, times(1)).onResponse(responseCaptor.capture()); + assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); + + // test failure on 6th after 5 retries even if 7th would have been success + @SuppressWarnings("unchecked") + ActionListener threeRetryListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + // we'll never get here + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(updateResponse); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.deleteResourceFromStateIndex( + "this_id", + resourceToDelete, + threeRetryListener + ); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(threeRetryListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update workflow state for this_id on step node_id to delete resource connector_id this_id", exceptionCaptor.getValue().getMessage() ); } From bc9961e4856e338a8a430cc0340345b150d13d44 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 3 Oct 2024 11:59:03 -0700 Subject: [PATCH 2/3] Update deprovisioned resources incrementally Signed-off-by: Daniel Widdis --- CHANGELOG.md | 2 + .../DeprovisionWorkflowTransportAction.java | 20 +++++++++- .../FlowFrameworkRestTestCase.java | 11 +++++ .../FlowFrameworkIndicesHandlerTests.java | 40 ++++--------------- .../rest/FlowFrameworkRestApiIT.java | 7 +++- ...provisionWorkflowTransportActionTests.java | 6 +++ 6 files changed, 51 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 167472c6..18fe6382 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.17...2.x) ### Features ### Enhancements +- Incrementally remove resources from workflow state during deprovisioning ([#898](https://github.com/opensearch-project/flow-framework/pull/898)) + ### Bug Fixes ### Infrastructure ### Documentation diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index 1b58e66d..2b8db025 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE; @@ -214,19 +215,32 @@ private void executeDeprovisionSequence( // Repeat attempting to delete resources as long as at least one is successful int resourceCount = deprovisionProcessSequence.size(); while (resourceCount > 0) { + PlainActionFuture stateUpdateFuture; Iterator iter = deprovisionProcessSequence.iterator(); - while (iter.hasNext()) { + do { ProcessNode deprovisionNode = iter.next(); ResourceCreated resource = getResourceFromDeprovisionNode(deprovisionNode, resourcesCreated); String resourceNameAndId = getResourceNameAndId(resource); PlainActionFuture deprovisionFuture = deprovisionNode.execute(); + stateUpdateFuture = PlainActionFuture.newFuture(); try { deprovisionFuture.get(); logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId); + // Remove from state index resource list + flowFrameworkIndicesHandler.deleteResourceFromStateIndex(workflowId, resource, stateUpdateFuture); + try { + // Wait at most 1 second for state index update. + stateUpdateFuture.actionGet(1, TimeUnit.SECONDS); + } catch (Exception e) { + // Ignore incremental resource removal failures (or timeouts) as we catch up at the end with remainingResources + } // Remove from list so we don't try again iter.remove(); // Pause briefly before next step Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; } catch (Throwable t) { // If any deprovision fails due to not found, it's a success if (t.getCause() instanceof OpenSearchStatusException @@ -238,7 +252,7 @@ private void executeDeprovisionSequence( logger.info("Failed {} for {}", deprovisionNode.id(), resourceNameAndId); } } - } + } while (iter.hasNext()); if (deprovisionProcessSequence.size() < resourceCount) { // If we've deleted something, decrement and try again if not zero resourceCount = deprovisionProcessSequence.size(); @@ -259,6 +273,7 @@ private void executeDeprovisionSequence( try { Thread.sleep(1000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); break; } } else { @@ -274,6 +289,7 @@ private void executeDeprovisionSequence( if (!deleteNotAllowed.isEmpty()) { logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed); } + // This is a redundant best-effort backup to the incremental deletion done earlier updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener); } diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 49fb46a7..3570dccf 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -817,6 +817,17 @@ protected List getResourcesCreated(RestClient client, String wo TimeUnit.SECONDS ); + return getResourcesCreated(client, workflowId); + } + + /** + * Helper method retrieve any resources created incrementally without waiting for completion + * @param client the rest client + * @param workflowId the workflow id to retrieve resources from + * @return a list of created resources + * @throws Exception if the request fails + */ + protected List getResourcesCreated(RestClient client, String workflowId) throws Exception { Response response = getWorkflowStatus(client, workflowId, true); // Parse workflow state from response and retrieve resources created diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index 4613cf42..a7dd7f75 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -651,7 +651,7 @@ public void testAddResourceToStateIndex() { exceptionCaptor.getValue().getMessage() ); } - + public void testDeleteResourceFromStateIndex() { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); @@ -679,11 +679,7 @@ public void testDeleteResourceFromStateIndex() { return null; }).when(client).update(any(UpdateRequest.class), any()); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex( - "this_id", - resourceToDelete, - listener - ); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); @@ -696,11 +692,7 @@ public void testDeleteResourceFromStateIndex() { return null; }).when(client).update(any(UpdateRequest.class), any()); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex( - "this_id", - resourceToDelete, - listener - ); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); @@ -718,11 +710,7 @@ public void testDeleteResourceFromStateIndex() { responseListener.onResponse(new GetResponse(getResult)); return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex( - "this_id", - resourceToDelete, - notFoundListener - ); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, notFoundListener); exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(notFoundListener, times(1)).onFailure(exceptionCaptor.capture()); @@ -732,11 +720,7 @@ public void testDeleteResourceFromStateIndex() { when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false); @SuppressWarnings("unchecked") ActionListener indexNotFoundListener = mock(ActionListener.class); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex( - "this_id", - resourceToDelete, - indexNotFoundListener - ); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, indexNotFoundListener); exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(indexNotFoundListener, times(1)).onFailure(exceptionCaptor.capture()); @@ -843,7 +827,7 @@ public void testAddResourceToStateIndexWithRetries() { exceptionCaptor.getValue().getMessage() ); } - + public void testDeleteResourceFromStateIndexWithRetries() { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); @@ -882,11 +866,7 @@ public void testDeleteResourceFromStateIndexWithRetries() { return null; }).when(client).update(any(UpdateRequest.class), any()); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex( - "this_id", - resourceToDelete, - retryListener - ); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, retryListener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); verify(retryListener, times(1)).onResponse(responseCaptor.capture()); @@ -926,11 +906,7 @@ public void testDeleteResourceFromStateIndexWithRetries() { return null; }).when(client).update(any(UpdateRequest.class), any()); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex( - "this_id", - resourceToDelete, - threeRetryListener - ); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, threeRetryListener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(threeRetryListener, times(1)).onFailure(exceptionCaptor.capture()); diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index bb7ba109..f3e5ceea 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -299,8 +299,13 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { assertNotNull(resourcesCreated.get(0).resourceId()); // Hit Deprovision API - // By design, this may not completely deprovision the first time if it takes >2s to process removals Response deprovisionResponse = deprovisionWorkflow(client(), workflowId); + // Test for incremental removal + assertBusy(() -> { + List resourcesRemaining = getResourcesCreated(client(), workflowId); + assertTrue(resourcesRemaining.size() < 5); + }, 30, TimeUnit.SECONDS); + // By design, this may not completely deprovision the first time if it takes >2s to process removals try { assertBusy( () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 20325536..4841871a 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -175,6 +175,7 @@ public void testDeprovisionWorkflow() throws Exception { ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals(workflowId, responseCaptor.getValue().getWorkflowId()); + verify(flowFrameworkIndicesHandler, times(1)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); } public void testFailToDeprovision() throws Exception { @@ -208,6 +209,7 @@ public void testFailToDeprovision() throws Exception { verify(listener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals(RestStatus.ACCEPTED, exceptionCaptor.getValue().getRestStatus()); assertEquals("Failed to deprovision some resources: [model_id modelId].", exceptionCaptor.getValue().getMessage()); + verify(flowFrameworkIndicesHandler, times(0)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); } public void testAllowDeleteRequired() throws Exception { @@ -248,6 +250,7 @@ public void testAllowDeleteRequired() throws Exception { "These resources require the allow_delete parameter to deprovision: [index_name test-index].", exceptionCaptor.getValue().getMessage() ); + verify(flowFrameworkIndicesHandler, times(0)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); // Test (2nd) failure with wrong allow_delete param workflowRequest = new WorkflowRequest(workflowId, null, Map.of(ALLOW_DELETE, "wrong-index")); @@ -264,6 +267,7 @@ public void testAllowDeleteRequired() throws Exception { "These resources require the allow_delete parameter to deprovision: [index_name test-index].", exceptionCaptor.getValue().getMessage() ); + verify(flowFrameworkIndicesHandler, times(0)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); // Test success with correct allow_delete param workflowRequest = new WorkflowRequest(workflowId, null, Map.of(ALLOW_DELETE, "wrong-index,test-index,other-index")); @@ -280,6 +284,7 @@ public void testAllowDeleteRequired() throws Exception { ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals(workflowId, responseCaptor.getValue().getWorkflowId()); + verify(flowFrameworkIndicesHandler, times(1)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); } public void testFailToDeprovisionAndAllowDeleteRequired() throws Exception { @@ -323,5 +328,6 @@ public void testFailToDeprovisionAndAllowDeleteRequired() throws Exception { + " These resources require the allow_delete parameter to deprovision: [index_name test-index].", exceptionCaptor.getValue().getMessage() ); + verify(flowFrameworkIndicesHandler, times(0)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); } } From 86ac8ea862ba59abcbdf036fd637159e751e3be4 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 7 Oct 2024 12:39:47 -0700 Subject: [PATCH 3/3] Use Log4j ParameterizedMessage for string substitutions Signed-off-by: Daniel Widdis --- .../indices/FlowFrameworkIndicesHandler.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 345c1589..f05a162f 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessageFactory; import org.opensearch.ExceptionsHelper; import org.opensearch.action.DocWriteRequest.OpType; import org.opensearch.action.admin.indices.create.CreateIndexRequest; @@ -804,11 +805,14 @@ private void handleStateUpdateException( getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, operation, retries - 1, listener); return; } - StringBuilder sb = new StringBuilder("Failed to update workflow state for "); - sb.append(workflowId).append(" on step ").append(newResource.workflowStepId()); - sb.append(" to ").append(operation.equals(OpType.DELETE) ? "delete" : "add"); - sb.append(" resource ").append(newResource.resourceType()).append(" ").append(newResource.resourceId()); - String errorMessage = sb.toString(); + String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( + "Failed to update workflow state for {} on step {} to {} resource {} {}", + workflowId, + newResource.workflowStepId(), + operation.equals(OpType.DELETE) ? "delete" : "add", + newResource.resourceType(), + newResource.resourceId() + ).getFormattedMessage(); logger.error(errorMessage, e); listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); }