Skip to content

Commit

Permalink
Add failing tests for apparent batch reordering bug
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Jun 13, 2024
1 parent 79405ed commit 6595f8d
Showing 1 changed file with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.hamcrest.MatcherAssert;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
Expand Down Expand Up @@ -66,7 +67,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.NodeRoles.nonIngestNode;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -177,6 +180,7 @@ public void testBulkWithIngestFailures() throws Exception {

int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.batchSize(numRequests);
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", i % 2 == 0);
Expand Down Expand Up @@ -209,6 +213,45 @@ public void testBulkWithIngestFailures() throws Exception {
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithIngestFailuresBatch() throws Exception {
createIndex("index");

BytesReference source = BytesReference.bytes(
jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject()
);
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.batchSize(2);
bulkRequest.add(new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true));
bulkRequest.add(new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false));

BulkResponse response = client().bulk(bulkRequest).actionGet();
MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));

Map<String, BulkItemResponse> results = Arrays.stream(response.getItems()).collect(Collectors.toMap(
BulkItemResponse::getId,
r -> r
));

MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success"));
assertNotNull(results.get("_fail").getFailure());
assertNull(results.get("_success").getFailure());

// cleanup
AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithUpsert() throws Exception {
createIndex("index");

Expand Down

0 comments on commit 6595f8d

Please sign in to comment.