diff --git a/CHANGELOG.md b/CHANGELOG.md index 79b45e96ddf4f..0ed3d27b3a70f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635)) ### Deprecated +- Deprecate batch_size parameter on bulk API ([#14725](https://github.com/opensearch-project/OpenSearch/pull/14725)) ### Removed - Remove query categorization changes ([#14759](https://github.com/opensearch-project/OpenSearch/pull/14759)) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index 36b2b5351dcad..47cc80d6df310 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -207,7 +207,7 @@ teardown: - match: { _source: {"f1": "v2", "f2": 47, "field1": "value1", "field2": "value2"}} --- -"Test bulk API with batch enabled happy case": +"Test bulk API with default batch size": - skip: version: " - 2.13.99" reason: "Added in 2.14.0" @@ -215,7 +215,6 @@ teardown: - do: bulk: refresh: true - batch_size: 2 pipeline: "pipeline1" body: - '{"index": {"_index": "test_index", "_id": "test_id1"}}' @@ -245,36 +244,6 @@ teardown: id: test_id3 - match: { _source: { "text": "text3", "field1": "value1" } } ---- -"Test bulk API with batch_size missing": - - skip: - version: " - 2.13.99" - reason: "Added in 2.14.0" - - - do: - bulk: - refresh: true - pipeline: "pipeline1" - body: - - '{"index": {"_index": "test_index", "_id": "test_id1"}}' - - '{"text": "text1"}' - - '{"index": {"_index": "test_index", "_id": "test_id2"}}' - - '{"text": "text2"}' - - - match: { errors: false } - - - do: - get: - index: test_index - id: test_id1 - - match: { _source: { "text": "text1", "field1": "value1" } } - - - do: - get: - index: test_index - id: test_id2 - - match: { _source: { "text": "text2", "field1": "value1" } } - --- "Test bulk API with invalid batch_size": - skip: diff --git a/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java index 657d0f178e096..0eb37a7b25618 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java @@ -315,6 +315,87 @@ public void testBulkWithUpsert() throws Exception { assertThat(upserted.get("processed"), equalTo(true)); } + public void testSingleDocIngestFailure() throws Exception { + createIndex("test"); + BytesReference source = BytesReference.bytes( + jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject() + ); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + GetPipelineRequest getPipelineRequest = new GetPipelineRequest("_id"); + GetPipelineResponse getResponse = client().admin().cluster().getPipeline(getPipelineRequest).get(); + assertThat(getResponse.isFound(), is(true)); + assertThat(getResponse.pipelines().size(), equalTo(1)); + assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id")); + + assertThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("test") + .setId("1") + .setPipeline("_id") + .setSource(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true) + .get() + ); + + DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id"); + AcknowledgedResponse response = client().admin().cluster().deletePipeline(deletePipelineRequest).get(); + assertThat(response.isAcknowledged(), is(true)); + + getResponse = client().admin().cluster().prepareGetPipeline("_id").get(); + assertThat(getResponse.isFound(), is(false)); + assertThat(getResponse.pipelines().size(), equalTo(0)); + } + + public void testSingleDocIngestDrop() throws Exception { + createIndex("test"); + BytesReference source = BytesReference.bytes( + jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject() + ); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + GetPipelineRequest getPipelineRequest = new GetPipelineRequest("_id"); + GetPipelineResponse getResponse = client().admin().cluster().getPipeline(getPipelineRequest).get(); + assertThat(getResponse.isFound(), is(true)); + assertThat(getResponse.pipelines().size(), equalTo(1)); + assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id")); + + DocWriteResponse indexResponse = client().prepareIndex("test") + .setId("1") + .setPipeline("_id") + .setSource(Requests.INDEX_CONTENT_TYPE, "field", "value", "drop", true) + .get(); + assertEquals(DocWriteResponse.Result.NOOP, indexResponse.getResult()); + + Map doc = client().prepareGet("test", "1").get().getSourceAsMap(); + assertNull(doc); + + DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id"); + AcknowledgedResponse response = client().admin().cluster().deletePipeline(deletePipelineRequest).get(); + assertThat(response.isAcknowledged(), is(true)); + + getResponse = client().admin().cluster().prepareGetPipeline("_id").get(); + assertThat(getResponse.isFound(), is(false)); + assertThat(getResponse.pipelines().size(), equalTo(0)); + } + public void test() throws Exception { BytesReference source = BytesReference.bytes( jsonBuilder().startObject() diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java index 7614206cd226f..e686585095962 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java @@ -96,7 +96,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques private String globalRouting; private String globalIndex; private Boolean globalRequireAlias; - private int batchSize = 1; + private int batchSize = Integer.MAX_VALUE; private long sizeInBytes = 0; diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 2281ccd4c0382..17eb23422e68b 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -525,61 +525,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { - int batchSize = originalBulkRequest.batchSize(); - if (shouldExecuteBulkRequestInBatch(originalBulkRequest.requests().size(), batchSize)) { - runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest); - return; - } - - final Thread originalThread = Thread.currentThread(); - final AtomicInteger counter = new AtomicInteger(numberOfActionRequests); - int i = 0; - for (DocWriteRequest actionRequest : actionRequests) { - IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); - if (indexRequest == null) { - if (counter.decrementAndGet() == 0) { - onCompletion.accept(originalThread, null); - } - assert counter.get() >= 0; - i++; - continue; - } - final String pipelineId = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - final String finalPipelineId = indexRequest.getFinalPipeline(); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - boolean hasFinalPipeline = true; - final List pipelines; - if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false - && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = Arrays.asList(pipelineId, finalPipelineId); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - pipelines = Collections.singletonList(pipelineId); - hasFinalPipeline = false; - } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = Collections.singletonList(finalPipelineId); - } else { - if (counter.decrementAndGet() == 0) { - onCompletion.accept(originalThread, null); - } - assert counter.get() >= 0; - i++; - continue; - } - - executePipelines( - i, - pipelines.iterator(), - hasFinalPipeline, - indexRequest, - onDropped, - onFailure, - counter, - onCompletion, - originalThread - ); - i++; - } + runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest); } }); } @@ -635,7 +581,7 @@ private void runBulkRequestInBatch( i++; } - int batchSize = originalBulkRequest.batchSize(); + int batchSize = Math.min(numberOfActionRequests, originalBulkRequest.batchSize()); List> batches = prepareBatches(batchSize, indexRequestWrappers); logger.debug("batchSize: {}, batches: {}", batchSize, batches.size()); @@ -654,10 +600,6 @@ private void runBulkRequestInBatch( } } - private boolean shouldExecuteBulkRequestInBatch(int documentSize, int batchSize) { - return documentSize > 1 && batchSize > 1; - } - /** * IndexRequests are grouped by unique (index + pipeline_ids) before batching. * Only IndexRequests in the same group could be batched. It's to ensure batched documents always @@ -685,7 +627,7 @@ static List> prepareBatches(int batchSize, List> batchedIndexRequests = new ArrayList<>(); for (Map.Entry> indexRequestsPerKey : indexRequestsPerIndexAndPipelines.entrySet()) { - for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += batchSize) { + for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += Math.min(indexRequestsPerKey.getValue().size(), batchSize)) { batchedIndexRequests.add( new ArrayList<>( indexRequestsPerKey.getValue().subList(i, i + Math.min(batchSize, indexRequestsPerKey.getValue().size() - i)) diff --git a/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java index 0bc4234c9b8b8..ce52c5620b968 100644 --- a/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java @@ -38,6 +38,7 @@ import org.opensearch.action.support.ActiveShardCount; import org.opensearch.client.Requests; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.Settings; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; @@ -66,6 +67,8 @@ public class RestBulkAction extends BaseRestHandler { private final boolean allowExplicitIndex; + private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestBulkAction.class); + static final String BATCH_SIZE_DEPRECATED_MESSAGE = "The batch size option in bulk API is deprecated and will be removed in 3.0."; public RestBulkAction(Settings settings) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); @@ -97,7 +100,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null); bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); - bulkRequest.batchSize(request.paramAsInt("batch_size", 1)); + if (request.hasParam("batch_size")) { + deprecationLogger.deprecate("batch_size_deprecation", BATCH_SIZE_DEPRECATED_MESSAGE); + } + bulkRequest.batchSize(request.paramAsInt("batch_size", Integer.MAX_VALUE)); bulkRequest.add( request.requiredContent(), defaultIndex, diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index e61fbb6e1dbff..9d03127692975 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -1134,10 +1134,14 @@ public void testBulkRequestExecutionWithFailures() throws Exception { Exception error = new RuntimeException(); doAnswer(args -> { @SuppressWarnings("unchecked") - BiConsumer handler = (BiConsumer) args.getArguments()[1]; - handler.accept(null, error); + List ingestDocumentWrappers = (List) args.getArguments()[0]; + Consumer> handler = (Consumer) args.getArguments()[1]; + for (IngestDocumentWrapper wrapper : ingestDocumentWrappers) { + wrapper.update(wrapper.getIngestDocument(), error); + } + handler.accept(ingestDocumentWrappers); return null; - }).when(processor).execute(any(), any()); + }).when(processor).batchExecute(any(), any()); IngestService ingestService = createWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> processor) ); @@ -1192,10 +1196,11 @@ public void testBulkRequestExecution() throws Exception { when(processor.getTag()).thenReturn("mockTag"); doAnswer(args -> { @SuppressWarnings("unchecked") - BiConsumer handler = (BiConsumer) args.getArguments()[1]; - handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + List ingestDocumentWrappers = (List) args.getArguments()[0]; + Consumer> handler = (Consumer) args.getArguments()[1]; + handler.accept(ingestDocumentWrappers); return null; - }).when(processor).execute(any(), any()); + }).when(processor).batchExecute(any(), any()); Map map = new HashMap<>(2); map.put("mock", (factories, tag, description, config) -> processor); @@ -1957,6 +1962,42 @@ public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() { verify(mockCompoundProcessor, never()).execute(any(), any()); } + public void testExecuteBulkRequestInBatchWithDefaultBatchSize() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + createPipeline("_id", ingestService); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id"); + bulkRequest.add(indexRequest3); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest4); + @SuppressWarnings("unchecked") + final Map failureHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); + final List dropHandler = new ArrayList<>(); + ingestService.executeBulkRequest( + 4, + bulkRequest.requests(), + failureHandler::put, + completionHandler::put, + dropHandler::add, + Names.WRITE, + bulkRequest + ); + assertTrue(failureHandler.isEmpty()); + assertTrue(dropHandler.isEmpty()); + assertEquals(1, completionHandler.size()); + assertNull(completionHandler.get(Thread.currentThread())); + verify(mockCompoundProcessor, times(1)).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + public void testPrepareBatches_same_index_pipeline() { IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));