From 18dce2661db501ddb845934ab029a6707a0a3764 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 23 Jul 2024 19:27:11 +0000 Subject: [PATCH] Fix IngestServiceTests.testBulkRequestExecutionWithFailures (#14918) The test would previously fail if the randomness led to only a single indexing request being included in the bulk payload. This change guarantees multiple indexing requests in order to ensure the batch logic kicks in. Also replace some unneeded mocks with real classes. Signed-off-by: Andrew Ross (cherry picked from commit 087355f0ee676064ea409ed68090b33e568ea941) Signed-off-by: github-actions[bot] --- .../opensearch/ingest/IngestServiceTests.java | 47 +++++++++---------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 6ccc1a4a28ac6..67aaab5ebe12a 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -78,6 +78,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; +import org.hamcrest.MatcherAssert; import org.junit.Before; import java.nio.charset.StandardCharsets; @@ -104,15 +105,16 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.doAnswer; @@ -1106,27 +1108,23 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } - public void testBulkRequestExecutionWithFailures() throws Exception { + public void testBulkRequestExecutionWithFailures() { BulkRequest bulkRequest = new BulkRequest(); String pipelineId = "_id"; - int numRequest = scaledRandomIntBetween(8, 64); - int numIndexRequests = 0; - for (int i = 0; i < numRequest; i++) { - DocWriteRequest request; + int numIndexRequests = scaledRandomIntBetween(4, 32); + for (int i = 0; i < numIndexRequests; i++) { + IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + bulkRequest.add(indexRequest); + } + int numOtherRequests = scaledRandomIntBetween(4, 32); + for (int i = 0; i < numOtherRequests; i++) { if (randomBoolean()) { - if (randomBoolean()) { - request = new DeleteRequest("_index", "_id"); - } else { - request = new UpdateRequest("_index", "_id"); - } + bulkRequest.add(new DeleteRequest("_index", "_id")); } else { - IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); - request = indexRequest; - numIndexRequests++; + bulkRequest.add(new UpdateRequest("_index", "_id")); } - bulkRequest.add(request); } CompoundProcessor processor = mock(CompoundProcessor.class); @@ -1155,23 +1153,22 @@ public void testBulkRequestExecutionWithFailures() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final Map errorHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); ingestService.executeBulkRequest( - numRequest, + numIndexRequests + numOtherRequests, bulkRequest.requests(), - requestItemErrorHandler, - completionHandler, + errorHandler::put, + completionHandler::put, indexReq -> {}, Names.WRITE, bulkRequest ); - verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(o -> o.getCause().equals(error))); + MatcherAssert.assertThat(errorHandler.entrySet(), hasSize(numIndexRequests)); + errorHandler.values().forEach(e -> assertEquals(e.getCause(), error)); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + MatcherAssert.assertThat(completionHandler.keySet(), contains(Thread.currentThread())); } public void testBulkRequestExecution() throws Exception {