From ead9ab88c0ed3e242a8631e387d2b39ceaf8e315 Mon Sep 17 00:00:00 2001 From: Raghuvansh Raj Date: Sun, 12 Nov 2023 00:56:08 +0530 Subject: [PATCH] Added multiple items in UT with different retry counts and item level retry assertion Signed-off-by: Raghuvansh Raj --- .../bulk/TransportShardBulkActionTests.java | 51 +++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 99c311c11e33f..7fd25a90d7927 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -97,12 +97,15 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -950,9 +953,18 @@ public void testRetries() throws Exception { public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException { IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY); - UpdateRequest writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); - writeRequest.retryOnConflict(3); - BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + int nItems = randomIntBetween(2, 5); + List items = new ArrayList<>(nItems); + int[] retryCountsPerItem = new int[nItems]; + for (int i = 0; i < nItems; i++) { + int retryOnConflictCount = randomIntBetween(0, 3); + logger.debug("Setting retryCount for item {}: {}", i, retryOnConflictCount); + retryCountsPerItem[i] = retryOnConflictCount; + UpdateRequest updateRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value") + .retryOnConflict(retryOnConflictCount); + items.add(new BulkItemRequest(i, updateRequest)); + } IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); @@ -977,8 +989,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep ) ); - BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; - BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new)); final CountDownLatch latch = new CountDownLatch(1); Runnable runnable = () -> TransportShardBulkAction.performOnPrimary( @@ -988,26 +999,34 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), listener -> listener.onResponse(null), - new LatchedActionListener<>( - ActionTestUtils.assertNoFailureListener( - result -> assertEquals( + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + for (int i = 0; i < nItems; i++) { + assertEquals( VersionConflictEngineException.class, - result.replicaRequest().items()[0].getPrimaryResponse().getFailure().getCause().getClass() - ) - ), - latch - ), + result.replicaRequest().items()[i].getPrimaryResponse().getFailure().getCause().getClass() + ); + } + }), latch), threadPool, Names.WRITE ); // execute the runnable on a separate thread so that the infinite loop can be detected - Thread thread = new Thread(runnable); - thread.start(); + new Thread(runnable).start(); // timeout the request in 10 seconds if there is an infinite loop assertTrue(latch.await(10, TimeUnit.SECONDS)); - assertEquals(items[0].getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); + for (int i = 0; i < nItems; i++) { + assertEquals(items.get(i).getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); + + // this assertion is based on the assumption that all bulk item requests are updates and are hence calling + // UpdateRequest::prepareRequest + verify(updateHelper, times(retryCountsPerItem[i] + 1)).prepare( + eq((UpdateRequest) items.get(i).request()), + any(IndexShard.class), + any(LongSupplier.class) + ); + } } public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {