From b0b2d1d85e1b474856e708a1f6f7af7da02aa41a Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 8 Jan 2025 11:39:17 -0800 Subject: [PATCH] Update tests for new update async code Signed-off-by: Daniel Widdis --- .../indices/FlowFrameworkIndicesHandler.java | 11 +- .../FlowFrameworkIndicesHandlerTests.java | 325 ++++++++---------- 2 files changed, 146 insertions(+), 190 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 46d8229d..5fe64d3f 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessageFactory; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchStatusException; import org.opensearch.action.DocWriteRequest.OpType; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; @@ -47,7 +48,6 @@ import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.index.IndexNotFoundException; -import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.remote.metadata.client.DeleteDataObjectRequest; import org.opensearch.remote.metadata.client.GetDataObjectRequest; import org.opensearch.remote.metadata.client.PutDataObjectRequest; @@ -991,9 +991,8 @@ private void handleStateGetResponse( listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND)); return; } - WorkflowState currentState; try { - currentState = WorkflowState.parse(getResponse.getSourceAsString()); + WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString()); List resourcesCreated = new ArrayList<>(currentState.resourcesCreated()); if (operation == OpType.DELETE) { resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap())); @@ -1001,7 +1000,7 @@ private void handleStateGetResponse( resourcesCreated.add(resource); } WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build(); - UpdateDataObjectRequest updateRequest2 = UpdateDataObjectRequest.builder() + UpdateDataObjectRequest updateRequest = UpdateDataObjectRequest.builder() .index(WORKFLOW_STATE_INDEX) .id(workflowId) .tenantId(tenantId) @@ -1010,7 +1009,7 @@ private void handleStateGetResponse( .ifPrimaryTerm(getResponse.getPrimaryTerm()) .build(); sdkClient.updateDataObjectAsync( - updateRequest2, + updateRequest, client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL) ).whenComplete((r, throwable) -> { if (throwable == null) { @@ -1059,7 +1058,7 @@ private void handleStateUpdateException( ActionListener listener, Exception e ) { - if (e instanceof VersionConflictEngineException && retries > 0) { + if (e instanceof OpenSearchStatusException && ((OpenSearchStatusException) e).status() == RestStatus.CONFLICT && retries > 0) { // Retry if we haven't exhausted retries getAndUpdateResourceInStateDocumentWithRetries(workflowId, tenantId, newResource, operation, retries - 1, listener); return; diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index a38f1dc2..acc0b0fb 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -81,6 +81,7 @@ import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; +import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.mockito.ArgumentMatchers.any; @@ -103,6 +104,13 @@ public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase { TimeValue.timeValueMinutes(1), FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL ), + new ScalingExecutorBuilder( + PROVISION_WORKFLOW_THREAD_POOL, + 1, + Math.max(4, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1), + TimeValue.timeValueMinutes(5), + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL + ), new ScalingExecutorBuilder( DEPROVISION_WORKFLOW_THREAD_POOL, 1, @@ -315,22 +323,8 @@ public void testIsWorkflowProvisionedFailedParsing() throws IOException, Interru Consumer> function = mock(Consumer.class); @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - /* - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - - XContentBuilder builder = XContentFactory.jsonBuilder(); - // workFlowState.toXContent(builder, null); - this.template.toXContent(builder, null); - BytesReference workflowBytesRef = BytesReference.bytes(builder); - GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null); - responseListener.onResponse(new GetResponse(getResult)); - return null; - }).when(client).get(any(GetRequest.class), any()); - */ XContentBuilder builder = XContentFactory.jsonBuilder(); - // workFlowState.toXContent(builder, null); this.template.toXContent(builder, null); BytesReference workflowBytesRef = BytesReference.bytes(builder); GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null); @@ -615,7 +609,7 @@ public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException, Interrup ); } - public void testAddResourceToStateIndex() { + public void testAddResourceToStateIndex() throws InterruptedException, IOException { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -625,48 +619,49 @@ public void testAddResourceToStateIndex() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); // test success - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - XContentBuilder builder = XContentFactory.jsonBuilder(); - WorkflowState state = WorkflowState.builder().build(); - state.toXContent(builder, null); - BytesReference workflowBytesRef = BytesReference.bytes(builder); - GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); - responseListener.onResponse(new GetResponse(getResult)); - return null; - }).when(client).get(any(GetRequest.class), any()); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED)); - return null; - }).when(client).update(any(UpdateRequest.class), any()); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + PlainActionFuture future = PlainActionFuture.newFuture(); + future.onResponse(new GetResponse(getResult)); + when(client.get(any(GetRequest.class))).thenReturn(future); + PlainActionFuture updateFuture = PlainActionFuture.newFuture(); + updateFuture.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED)); + when(client.update(any(UpdateRequest.class))).thenReturn(updateFuture); + + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); flowFrameworkIndicesHandler.addResourceToStateIndex( new WorkflowData(Collections.emptyMap(), "this_id", null), "node_id", CreateConnectorStep.NAME, "this_id", - listener + latchedActionListener ); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); // test failure - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(new Exception("Failed to update state")); - return null; - }).when(client).update(any(UpdateRequest.class), any()); + updateFuture = PlainActionFuture.newFuture(); + updateFuture.onFailure(new Exception("Failed to update state")); + when(client.update(any(UpdateRequest.class))).thenReturn(updateFuture); + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(listener, latch); flowFrameworkIndicesHandler.addResourceToStateIndex( new WorkflowData(Collections.emptyMap(), "this_id", null), "node_id", CreateConnectorStep.NAME, "this_id", - listener + latchedActionListener ); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); @@ -677,20 +672,23 @@ public void testAddResourceToStateIndex() { // test document not found @SuppressWarnings("unchecked") + ActionListener notFoundListener = mock(ActionListener.class); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", -2, 0, 1, false, null, null, null); - responseListener.onResponse(new GetResponse(getResult)); - return null; - }).when(client).get(any(GetRequest.class), any()); + getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", -2, 0, 1, false, null, null, null); + future = PlainActionFuture.newFuture(); + future.onResponse(new GetResponse(getResult)); + when(client.get(any(GetRequest.class))).thenReturn(future); + + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(notFoundListener, latch); flowFrameworkIndicesHandler.addResourceToStateIndex( new WorkflowData(Collections.emptyMap(), "this_id", null), "node_id", CreateConnectorStep.NAME, "this_id", - notFoundListener + latchedActionListener ); + latch.await(1, TimeUnit.SECONDS); exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(notFoundListener, times(1)).onFailure(exceptionCaptor.capture()); @@ -716,7 +714,7 @@ public void testAddResourceToStateIndex() { ); } - public void testDeleteResourceFromStateIndex() { + public void testDeleteResourceFromStateIndex() throws InterruptedException, IOException { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -727,36 +725,37 @@ public void testDeleteResourceFromStateIndex() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); // test success - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - XContentBuilder builder = XContentFactory.jsonBuilder(); - WorkflowState state = WorkflowState.builder().build(); - state.toXContent(builder, null); - BytesReference workflowBytesRef = BytesReference.bytes(builder); - GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); - responseListener.onResponse(new GetResponse(getResult)); - return null; - }).when(client).get(any(GetRequest.class), any()); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED)); - return null; - }).when(client).update(any(UpdateRequest.class), any()); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + PlainActionFuture future = PlainActionFuture.newFuture(); + future.onResponse(new GetResponse(getResult)); + when(client.get(any(GetRequest.class))).thenReturn(future); + + PlainActionFuture updateFuture = PlainActionFuture.newFuture(); + updateFuture.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED)); + when(client.update(any(UpdateRequest.class))).thenReturn(updateFuture); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, listener); + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, latchedActionListener); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); // test failure - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(new Exception("Failed to update state")); - return null; - }).when(client).update(any(UpdateRequest.class), any()); + updateFuture = PlainActionFuture.newFuture(); + updateFuture.onFailure(new Exception("Failed to update state")); + when(client.update(any(UpdateRequest.class))).thenReturn(updateFuture); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, listener); + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(listener, latch); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, latchedActionListener); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); @@ -768,13 +767,15 @@ public void testDeleteResourceFromStateIndex() { // test document not found @SuppressWarnings("unchecked") ActionListener notFoundListener = mock(ActionListener.class); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", -2, 0, 1, false, null, null, null); - responseListener.onResponse(new GetResponse(getResult)); - return null; - }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, notFoundListener); + getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", -2, 0, 1, false, null, null, null); + future = PlainActionFuture.newFuture(); + future.onResponse(new GetResponse(getResult)); + when(client.get(any(GetRequest.class))).thenReturn(future); + + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(notFoundListener, latch); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, latchedActionListener); + latch.await(1, TimeUnit.SECONDS); exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(notFoundListener, times(1)).onFailure(exceptionCaptor.capture()); @@ -794,7 +795,7 @@ public void testDeleteResourceFromStateIndex() { ); } - public void testAddResourceToStateIndexWithRetries() { + public void testAddResourceToStateIndexWithRetries() throws IOException, InterruptedException { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -805,94 +806,73 @@ public void testAddResourceToStateIndexWithRetries() { "this_id", null ); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + PlainActionFuture future = PlainActionFuture.newFuture(); + future.onResponse(new GetResponse(getResult)); + when(client.get(any(GetRequest.class))).thenReturn(future); + UpdateResponse updateResponse = new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - XContentBuilder builder = XContentFactory.jsonBuilder(); - WorkflowState state = WorkflowState.builder().build(); - state.toXContent(builder, null); - BytesReference workflowBytesRef = BytesReference.bytes(builder); - GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); - responseListener.onResponse(new GetResponse(getResult)); - return null; - }).when(client).get(any(GetRequest.class), any()); + PlainActionFuture updateFuture = PlainActionFuture.newFuture(); + updateFuture.onResponse(updateResponse); + + when(client.update(any(UpdateRequest.class))).thenThrow(conflictException).thenReturn(updateFuture); // test success on retry @SuppressWarnings("unchecked") ActionListener retryListener = mock(ActionListener.class); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onResponse(updateResponse); - return null; - }).when(client).update(any(UpdateRequest.class), any()); + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(retryListener, latch); flowFrameworkIndicesHandler.addResourceToStateIndex( new WorkflowData(Collections.emptyMap(), "this_id", null), "node_id", CreateConnectorStep.NAME, "this_id", - retryListener + latchedActionListener ); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); verify(retryListener, times(1)).onResponse(responseCaptor.capture()); assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); // test failure on 6th after 5 retries even if 7th would have been success + when(client.update(any(UpdateRequest.class))).thenThrow(conflictException) + .thenThrow(conflictException) + .thenThrow(conflictException) + .thenThrow(conflictException) + .thenThrow(conflictException) + .thenThrow(conflictException) + .thenReturn(updateFuture); + @SuppressWarnings("unchecked") - ActionListener threeRetryListener = mock(ActionListener.class); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - // we'll never get here - ActionListener responseListener = invocation.getArgument(1); - responseListener.onResponse(updateResponse); - return null; - }).when(client).update(any(UpdateRequest.class), any()); + ActionListener fiveRetryListener = mock(ActionListener.class); + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(fiveRetryListener, latch); flowFrameworkIndicesHandler.addResourceToStateIndex( new WorkflowData(Collections.emptyMap(), "this_id", null), "node_id", CreateConnectorStep.NAME, "this_id", - threeRetryListener + latchedActionListener ); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); - verify(threeRetryListener, times(1)).onFailure(exceptionCaptor.capture()); + verify(fiveRetryListener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals( "Failed to update workflow state for this_id on step node_id to add resource connector_id this_id", exceptionCaptor.getValue().getMessage() ); } - public void testDeleteResourceFromStateIndexWithRetries() { + public void testDeleteResourceFromStateIndexWithRetries() throws IOException, InterruptedException { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -903,77 +883,54 @@ public void testDeleteResourceFromStateIndexWithRetries() { "this_id", null ); - UpdateResponse updateResponse = new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED); ResourceCreated resourceToDelete = new ResourceCreated("", "node_id", "connector_id", "this_id"); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - XContentBuilder builder = XContentFactory.jsonBuilder(); - WorkflowState state = WorkflowState.builder().build(); - state.toXContent(builder, null); - BytesReference workflowBytesRef = BytesReference.bytes(builder); - GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); - responseListener.onResponse(new GetResponse(getResult)); - return null; - }).when(client).get(any(GetRequest.class), any()); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + PlainActionFuture future = PlainActionFuture.newFuture(); + future.onResponse(new GetResponse(getResult)); + when(client.get(any(GetRequest.class))).thenReturn(future); // test success on retry + UpdateResponse updateResponse = new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED); + PlainActionFuture updateFuture = PlainActionFuture.newFuture(); + updateFuture.onResponse(updateResponse); + + when(client.update(any(UpdateRequest.class))).thenThrow(conflictException).thenReturn(updateFuture); + @SuppressWarnings("unchecked") ActionListener retryListener = mock(ActionListener.class); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onResponse(updateResponse); - return null; - }).when(client).update(any(UpdateRequest.class), any()); - - flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, retryListener); + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(retryListener, latch); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, latchedActionListener); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); verify(retryListener, times(1)).onResponse(responseCaptor.capture()); assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); // test failure on 6th after 5 retries even if 7th would have been success + when(client.update(any(UpdateRequest.class))).thenThrow(conflictException) + .thenThrow(conflictException) + .thenThrow(conflictException) + .thenThrow(conflictException) + .thenThrow(conflictException) + .thenThrow(conflictException) + .thenReturn(updateFuture); + @SuppressWarnings("unchecked") - ActionListener threeRetryListener = mock(ActionListener.class); - doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - ActionListener responseListener = invocation.getArgument(1); - responseListener.onFailure(conflictException); - return null; - }).doAnswer(invocation -> { - // we'll never get here - ActionListener responseListener = invocation.getArgument(1); - responseListener.onResponse(updateResponse); - return null; - }).when(client).update(any(UpdateRequest.class), any()); + ActionListener fiveRetryListener = mock(ActionListener.class); - flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, threeRetryListener); + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(fiveRetryListener, latch); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, latchedActionListener); + latch.await(1, TimeUnit.SECONDS); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); - verify(threeRetryListener, times(1)).onFailure(exceptionCaptor.capture()); + verify(fiveRetryListener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals( "Failed to update workflow state for this_id on step node_id to delete resource connector_id this_id", exceptionCaptor.getValue().getMessage()