diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 1e5a9bb8..f53f036f 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -17,7 +17,6 @@ import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; @@ -50,6 +49,7 @@ import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.remote.metadata.client.DeleteDataObjectRequest; import org.opensearch.remote.metadata.client.GetDataObjectRequest; import org.opensearch.remote.metadata.client.PutDataObjectRequest; import org.opensearch.remote.metadata.client.SdkClient; @@ -68,6 +68,7 @@ import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX_MAPPING; +import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING; import static org.opensearch.flowframework.common.CommonValue.META; @@ -777,9 +778,10 @@ public void updateFlowFrameworkSystemIndexDoc( /** * Deletes a document in the workflow state index * @param documentId the document ID + * @param tenantId the tenant Id * @param listener action listener */ - public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener listener) { + public void deleteFlowFrameworkSystemIndexDoc(String documentId, String tenantId, ActionListener listener) { if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( "Failed to delete document {} due to missing {} index", @@ -790,17 +792,36 @@ public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener< listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); } else { try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId); - deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore)); - } catch (Exception e) { - String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( - "Failed to delete {} entry : {}", - WORKFLOW_STATE_INDEX, - documentId - ).getFormattedMessage(); - logger.error(errorMessage, e); - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + DeleteDataObjectRequest deleteRequest = DeleteDataObjectRequest.builder() + .index(WORKFLOW_STATE_INDEX) + .id(documentId) + .tenantId(tenantId) + .build(); + sdkClient.deleteDataObjectAsync(deleteRequest, client.threadPool().executor(DEPROVISION_WORKFLOW_THREAD_POOL)) + .whenComplete((r, throwable) -> { + context.restore(); + if (throwable == null) { + try { + DeleteResponse response = DeleteResponse.fromXContent(r.parser()); + logger.info("Deleted workflow state doc: {}", documentId); + listener.onResponse(response); + } catch (Exception e) { + logger.error("Failed to parse delete response", e); + listener.onFailure( + new FlowFrameworkException("Failed to parse delete response", RestStatus.INTERNAL_SERVER_ERROR) + ); + } + } else { + Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable); + String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( + "Failed to delete {} entry : {}", + WORKFLOW_STATE_INDEX, + documentId + ).getFormattedMessage(); + logger.error(errorMessage, exception); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + } + }); } } } diff --git a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java index 8f3da2c6..d005b5c9 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java @@ -168,7 +168,7 @@ private void executeDeleteRequest( }, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); }); flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, tenantId, clearStatus, canDelete -> { if (Boolean.TRUE.equals(canDelete)) { - flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener); + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, tenantId, stateListener); } }, stateListener); } diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index 79de3dae..04f04bcd 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -331,9 +331,13 @@ private void updateWorkflowState( }, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); }) ); } else { - flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> { - logger.info("Deleted workflow {} state", workflowId); - }, exception -> { logger.error("Failed to delete workflow state for {}", workflowId, exception); })); + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc( + workflowId, + tenantId, + ActionListener.wrap(deleteResponse -> { + logger.info("Deleted workflow {} state", workflowId); + }, exception -> { logger.error("Failed to delete workflow state for {}", workflowId, exception); }) + ); } // return workflow ID listener.onResponse(new WorkflowResponse(workflowId)); diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index 64b51f9e..a38f1dc2 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -34,7 +34,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; @@ -79,6 +78,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; @@ -102,6 +102,13 @@ public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase { Math.max(2, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1), TimeValue.timeValueMinutes(1), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL + ), + new ScalingExecutorBuilder( + DEPROVISION_WORKFLOW_THREAD_POOL, + 1, + Math.max(2, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1), + TimeValue.timeValueMinutes(1), + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL ) ); @@ -115,7 +122,6 @@ public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase { private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; private AdminClient adminClient; private IndicesAdminClient indicesAdminClient; - private ThreadContext threadContext; @Mock protected ClusterService clusterService; @Mock @@ -135,8 +141,6 @@ public void setUp() throws Exception { super.setUp(); MockitoAnnotations.openMocks(this); - Settings settings = Settings.builder().build(); - threadContext = new ThreadContext(settings); when(client.threadPool()).thenReturn(testThreadPool); sdkClient = SdkClientFactory.createSdkClient(client, namedXContentRegistry, Collections.emptyMap()); flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler( @@ -228,7 +232,7 @@ public void testFailedUpdateTemplateInGlobalContextNotExisting() throws IOExcept CountDownLatch latch = new CountDownLatch(1); LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); - flowFrameworkIndicesHandler.updateTemplateInGlobalContext("1", template, listener); + flowFrameworkIndicesHandler.updateTemplateInGlobalContext("1", template, latchedActionListener); latch.await(1, TimeUnit.SECONDS); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); @@ -564,7 +568,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws ); } - public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException { + public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException, InterruptedException { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -574,35 +578,35 @@ public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - // test success - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onResponse(new DeleteResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, true)); - return null; - }).when(client).delete(any(DeleteRequest.class), any()); + PlainActionFuture future = PlainActionFuture.newFuture(); + future.onResponse(new DeleteResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, true)); + when(client.delete(any(DeleteRequest.class))).thenReturn(future); - flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener); + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", null, latchedActionListener); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals(Result.DELETED, responseCaptor.getValue().getResult()); - // test failure - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(new Exception("Failed to delete state")); - return null; - }).when(client).delete(any(DeleteRequest.class), any()); + future = PlainActionFuture.newFuture(); + future.onFailure(new Exception("Failed to delete state")); + when(client.delete(any(DeleteRequest.class))).thenReturn(future); - flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener); + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(listener, latch); + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", null, latchedActionListener); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); - assertEquals("Failed to delete state", exceptionCaptor.getValue().getMessage()); + assertEquals("Failed to delete .plugins-flow-framework-state entry : 1", exceptionCaptor.getValue().getMessage()); // test no index when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false); - flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener); + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", null, listener); verify(listener, times(2)).onFailure(exceptionCaptor.capture()); assertEquals(