Skip to content

Commit

Permalink
Fix IngestServiceTests.testBulkRequestExecutionWithFailures (opensear…
Browse files Browse the repository at this point in the history
…ch-project#14918)

The test would previously fail if the randomness led to only a single
indexing request being included in the bulk payload. This change
guarantees multiple indexing requests in order to ensure the batch logic
kicks in.

Also replace some unneeded mocks with real classes.

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross authored and wangdongyu.danny committed Aug 22, 2024
1 parent d0024f8 commit 61c5022
Showing 1 changed file with 22 additions and 25 deletions.
47 changes: 22 additions & 25 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.hamcrest.MatcherAssert;
import org.junit.Before;

import java.nio.charset.StandardCharsets;
Expand All @@ -104,15 +105,16 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -1106,27 +1108,23 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception {
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}

public void testBulkRequestExecutionWithFailures() throws Exception {
public void testBulkRequestExecutionWithFailures() {
BulkRequest bulkRequest = new BulkRequest();
String pipelineId = "_id";

int numRequest = scaledRandomIntBetween(8, 64);
int numIndexRequests = 0;
for (int i = 0; i < numRequest; i++) {
DocWriteRequest request;
int numIndexRequests = scaledRandomIntBetween(4, 32);
for (int i = 0; i < numIndexRequests; i++) {
IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
bulkRequest.add(indexRequest);
}
int numOtherRequests = scaledRandomIntBetween(4, 32);
for (int i = 0; i < numOtherRequests; i++) {
if (randomBoolean()) {
if (randomBoolean()) {
request = new DeleteRequest("_index", "_id");
} else {
request = new UpdateRequest("_index", "_id");
}
bulkRequest.add(new DeleteRequest("_index", "_id"));
} else {
IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
request = indexRequest;
numIndexRequests++;
bulkRequest.add(new UpdateRequest("_index", "_id"));
}
bulkRequest.add(request);
}

CompoundProcessor processor = mock(CompoundProcessor.class);
Expand Down Expand Up @@ -1155,23 +1153,22 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

@SuppressWarnings("unchecked")
BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
final Map<Integer, Exception> errorHandler = new HashMap<>();
final Map<Thread, Exception> completionHandler = new HashMap<>();
ingestService.executeBulkRequest(
numRequest,
numIndexRequests + numOtherRequests,
bulkRequest.requests(),
requestItemErrorHandler,
completionHandler,
errorHandler::put,
completionHandler::put,
indexReq -> {},
Names.WRITE,
bulkRequest
);

verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(o -> o.getCause().equals(error)));
MatcherAssert.assertThat(errorHandler.entrySet(), hasSize(numIndexRequests));
errorHandler.values().forEach(e -> assertEquals(e.getCause(), error));

verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
MatcherAssert.assertThat(completionHandler.keySet(), contains(Thread.currentThread()));
}

public void testBulkRequestExecution() throws Exception {
Expand Down

0 comments on commit 61c5022

Please sign in to comment.