diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index e75c2304d8c98..22638602fa7d0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -355,111 +355,6 @@ protected void doRun() { } } - private boolean preprocessBulkRequest(Task task, BulkRequest bulkRequest, String executorName, ActionListener listener) { - final Metadata metadata = clusterService.state().getMetadata(); - final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion(); - boolean needsProcessing = false; - for (BulkRequestPreprocessor preprocessor : bulkRequestPreprocessors) { - for (DocWriteRequest docWriteRequest : bulkRequest.requests) { - IndexRequest indexRequest = getIndexWriteRequest(docWriteRequest); - if (indexRequest != null) { - needsProcessing |= preprocessor.needsProcessing(docWriteRequest, indexRequest, metadata); - } - - if (docWriteRequest instanceof IndexRequest ir) { - ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion); - if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { - throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally"); - } - } - } - - if (needsProcessing) { - // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but - // also with requests updated with processing information. This ensures that this on the second time through this method - // this path is never taken. - ActionListener.run(listener, l -> { - if (Assertions.ENABLED) { - final boolean allRequestsUnprocessed = bulkRequest.requests() - .stream() - .map(TransportBulkAction::getIndexWriteRequest) - .filter(Objects::nonNull) - .noneMatch(preprocessor::hasBeenProcessed); - assert allRequestsUnprocessed : bulkRequest; - } - if ((preprocessor.shouldExecuteOnIngestNode() == false) || clusterService.localNode().isIngestNode()) { - preprocessBulkRequestWithPreprocessor(preprocessor, task, bulkRequest, executorName, l); - } else { - ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, l); - } - }); - return true; - } - } - return false; - } - - private void preprocessBulkRequestWithPreprocessor( - BulkRequestPreprocessor preprocessor, - Task task, - BulkRequest original, - String executorName, - ActionListener listener - ) { - final long ingestStartTimeInNanos = System.nanoTime(); - final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - preprocessor.processBulkRequest( - threadPool.executor(executorName), - original.numberOfActions(), - () -> bulkRequestModifier, - bulkRequestModifier::markItemAsDropped, - bulkRequestModifier::markItemAsFailed, - (originalThread, exception) -> { - if (exception != null) { - logger.debug("failed to execute pipeline for a bulk request", exception); - listener.onFailure(exception); - } else { - long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); - BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); - ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded( - ingestTookInMillis, - listener - ); - if (bulkRequest.requests().isEmpty()) { - // at this stage, the transport bulk action can't deal with a bulk request with no requests, - // so we stop and send an empty response back to the client. - // (this will happen if pre-processing all items in the bulk failed) - actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); - } else { - ActionRunnable runnable = new ActionRunnable<>(actionListener) { - @Override - protected void doRun() { - doInternalExecute(task, bulkRequest, executorName, actionListener); - } - - @Override - public boolean isForceExecution() { - // If we fork back to a write thread we **not** should fail, because tp queue is full. - // (Otherwise the work done during ingest will be lost) - // It is okay to force execution here. Throttling of write requests happens prior to - // ingest when a node receives a bulk request. - return true; - } - }; - // If a processor went async and returned a response on a different thread then - // before we continue the bulk request we should fork back on a write thread: - if (originalThread == Thread.currentThread()) { - runnable.run(); - } else { - threadPool.executor(executorName).execute(runnable); - } - } - } - }, - executorName - ); - } - static void prohibitAppendWritesInBackingIndices(DocWriteRequest writeRequest, Metadata metadata) { DocWriteRequest.OpType opType = writeRequest.opType(); if ((opType == OpType.CREATE || opType == OpType.INDEX) == false) { @@ -875,6 +770,111 @@ private long relativeTime() { return relativeTimeProvider.getAsLong(); } + private boolean preprocessBulkRequest(Task task, BulkRequest bulkRequest, String executorName, ActionListener listener) { + final Metadata metadata = clusterService.state().getMetadata(); + final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion(); + boolean needsProcessing = false; + for (BulkRequestPreprocessor preprocessor : bulkRequestPreprocessors) { + for (DocWriteRequest docWriteRequest : bulkRequest.requests) { + IndexRequest indexRequest = getIndexWriteRequest(docWriteRequest); + if (indexRequest != null) { + needsProcessing |= preprocessor.needsProcessing(docWriteRequest, indexRequest, metadata); + } + + if (docWriteRequest instanceof IndexRequest ir) { + ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion); + if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { + throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally"); + } + } + } + + if (needsProcessing) { + // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but + // also with requests updated with processing information. This ensures that this on the second time through this method + // this path is never taken. + ActionListener.run(listener, l -> { + if (Assertions.ENABLED) { + final boolean allRequestsUnprocessed = bulkRequest.requests() + .stream() + .map(TransportBulkAction::getIndexWriteRequest) + .filter(Objects::nonNull) + .noneMatch(preprocessor::hasBeenProcessed); + assert allRequestsUnprocessed : bulkRequest; + } + if ((preprocessor.shouldExecuteOnIngestNode() == false) || clusterService.localNode().isIngestNode()) { + preprocessBulkRequestWithPreprocessor(preprocessor, task, bulkRequest, executorName, l); + } else { + ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, l); + } + }); + return true; + } + } + return false; + } + + private void preprocessBulkRequestWithPreprocessor( + BulkRequestPreprocessor preprocessor, + Task task, + BulkRequest original, + String executorName, + ActionListener listener + ) { + final long ingestStartTimeInNanos = System.nanoTime(); + final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); + preprocessor.processBulkRequest( + threadPool.executor(executorName), + original.numberOfActions(), + () -> bulkRequestModifier, + bulkRequestModifier::markItemAsDropped, + bulkRequestModifier::markItemAsFailed, + (originalThread, exception) -> { + if (exception != null) { + logger.debug("failed to execute pipeline for a bulk request", exception); + listener.onFailure(exception); + } else { + long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); + BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); + ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded( + ingestTookInMillis, + listener + ); + if (bulkRequest.requests().isEmpty()) { + // at this stage, the transport bulk action can't deal with a bulk request with no requests, + // so we stop and send an empty response back to the client. + // (this will happen if pre-processing all items in the bulk failed) + actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); + } else { + ActionRunnable runnable = new ActionRunnable<>(actionListener) { + @Override + protected void doRun() { + doInternalExecute(task, bulkRequest, executorName, actionListener); + } + + @Override + public boolean isForceExecution() { + // If we fork back to a write thread we **not** should fail, because tp queue is full. + // (Otherwise the work done during ingest will be lost) + // It is okay to force execution here. Throttling of write requests happens prior to + // ingest when a node receives a bulk request. + return true; + } + }; + // If a processor went async and returned a response on a different thread then + // before we continue the bulk request we should fork back on a write thread: + if (originalThread == Thread.currentThread()) { + runnable.run(); + } else { + threadPool.executor(executorName).execute(runnable); + } + } + } + }, + executorName + ); + } + static final class BulkRequestModifier implements Iterator> { final BulkRequest bulkRequest; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 4d98d4b7d04fd..aaa92c7b7d782 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -106,13 +106,12 @@ public class IngestService extends AbstractBulkRequestPreprocessor implements Cl private final ClusterService clusterService; private final ScriptService scriptService; private final Map processorFactories; - protected final Client client; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some // processor factories rely on other node services. Custom metadata is statically registered when classes // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. private volatile Map pipelines = Map.of(); - protected final ThreadPool threadPool; + private final ThreadPool threadPool; private final IngestMetric totalMetrics = new IngestMetric(); private final List> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); private volatile ClusterState state; @@ -187,7 +186,6 @@ public IngestService( super(documentParsingObserverSupplier); this.clusterService = clusterService; this.scriptService = scriptService; - this.client = client; this.processorFactories = processorFactories( ingestPlugins, new Processor.Parameters( @@ -601,7 +599,7 @@ public void onFailure(Exception e) { IngestDocument ingestDocument = newIngestDocument(indexRequest); - executePipelinesOnActionRequest(pipelines, indexRequest, ingestDocument, documentListener); + executePipelines(pipelines, indexRequest, ingestDocument, documentListener); indexRequest.setPipelinesHaveRun(); assert indexRequest.index() != null; @@ -721,7 +719,7 @@ void validatePipeline(Map ingestInfos, String pipelin ExceptionsHelper.rethrowAndSuppress(exceptions); } - public void processBulkRequest( + public void executeBulkRequest( final int numberOfActionRequests, final Iterable> actionRequests, final IntConsumer onDropped, @@ -755,7 +753,7 @@ protected void doRun() { }); } - private void executePipelinesOnActionRequest( + private void executePipelines( DocWriteRequest actionRequest, final int slot, final Releasable ref, @@ -794,7 +792,7 @@ public void onFailure(Exception e) { IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); - executePipelinesOnActionRequest(pipelines, indexRequest, ingestDocument, documentListener); + executePipelines(pipelines, indexRequest, ingestDocument, documentListener); indexRequest.setPipelinesHaveRun(); assert actionRequest.index() != null; @@ -874,7 +872,7 @@ public PipelineSlot next() { } } - private void executePipelinesOnActionRequest( + private void executePipelines( final PipelineIterator pipelines, final IndexRequest indexRequest, final IngestDocument ingestDocument, @@ -994,7 +992,7 @@ private void executePipelinesOnActionRequest( } if (newPipelines.hasNext()) { - executePipelinesOnActionRequest(newPipelines, indexRequest, ingestDocument, listener); + executePipelines(newPipelines, indexRequest, ingestDocument, listener); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); @@ -1082,7 +1080,7 @@ static String getProcessorName(Processor processor) { /** * Builds a new ingest document from the passed-in index request. */ - protected static IngestDocument newIngestDocument(final IndexRequest request, DocumentParsingObserver documentParsingObserver) { + private static IngestDocument newIngestDocument(final IndexRequest request, DocumentParsingObserver documentParsingObserver) { return new IngestDocument( request.index(), request.id(), diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 957da1bfd1d37..5649655480660 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -45,7 +45,6 @@ import java.util.function.Consumer; import java.util.function.Function; -import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 15cd353712a33..33bc30bf11ed1 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -283,7 +283,7 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -325,7 +325,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(1), bulkDocsItr.capture(), any(), @@ -371,7 +371,7 @@ public void testIngestSystemLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -408,7 +408,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).processBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -448,7 +448,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).processBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -528,7 +528,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -576,7 +576,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { assertFalse(action.indexCreated); // no index yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(1), bulkDocsItr.capture(), any(), @@ -670,7 +670,7 @@ public void testFindDefaultPipelineFromTemplateMatch() { ); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(1), bulkDocsItr.capture(), any(), @@ -714,7 +714,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { ); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(1), bulkDocsItr.capture(), any(), @@ -741,7 +741,7 @@ public void testIngestCallbackExceptionHandled() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -778,7 +778,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).processBulkRequest( + verify(ingestService).executeBulkRequest( eq(1), bulkDocsItr.capture(), any(), diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3130854001b19..3b114cf0a618e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -207,7 +207,7 @@ public void testExecuteIndexPipelineDoesNotExist() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1106,7 +1106,7 @@ public String getType() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest( + ingestService.executeBulkRequest( bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1149,7 +1149,7 @@ public void testExecuteBulkPipelineDoesNotExist() { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest( + ingestService.executeBulkRequest( bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1213,7 +1213,7 @@ public void close() { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest( + ingestService.executeBulkRequest( bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1246,7 +1246,7 @@ public void testExecuteSuccess() { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1279,7 +1279,7 @@ public void testDynamicTemplates() throws Exception { CountDownLatch latch = new CountDownLatch(1); final BiConsumer failureHandler = (v, e) -> { throw new AssertionError("must never fail", e); }; final BiConsumer completionHandler = (t, e) -> latch.countDown(); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); latch.await(); assertThat(indexRequest.getDynamicTemplates(), equalTo(Map.of("foo", "bar", "foo.bar", "baz"))); } @@ -1300,7 +1300,7 @@ public void testExecuteEmptyPipeline() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1354,7 +1354,7 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1403,7 +1403,7 @@ public void testExecuteFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1452,7 +1452,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1495,7 +1495,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1549,7 +1549,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest( + ingestService.executeBulkRequest( numRequest, bulkRequest.requests(), indexReq -> {}, @@ -1607,7 +1607,7 @@ public void testBulkRequestExecution() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.processBulkRequest( + ingestService.executeBulkRequest( numRequest, bulkRequest.requests(), indexReq -> {}, @@ -1720,7 +1720,7 @@ public String execute() { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_id2"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); { final IngestStats ingestStats = ingestService.stats(); @@ -1791,7 +1791,7 @@ public void testStats() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_none"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.pipelineStats().size(), equalTo(2)); @@ -1808,7 +1808,7 @@ public void testStats() throws Exception { assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); indexRequest.setPipeline("_id2"); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1830,7 +1830,7 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1853,7 +1853,7 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1941,7 +1941,7 @@ public String getDescription() { final BiConsumer completionHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final IntConsumer dropHandler = mock(IntConsumer.class); - ingestService.processBulkRequest( + ingestService.executeBulkRequest( bulkRequest.numberOfActions(), bulkRequest.requests(), dropHandler, @@ -2029,7 +2029,7 @@ public void testCBORParsing() throws Exception { .setPipeline("_id") .setFinalPipeline("_none"); - ingestService.processBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); } assertThat(reference.get(), is(instanceOf(byte[].class))); @@ -2100,7 +2100,7 @@ public void testSetsRawTimestamp() { bulkRequest.add(indexRequest6); bulkRequest.add(indexRequest7); bulkRequest.add(indexRequest8); - ingestService.processBulkRequest(8, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executeBulkRequest(8, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); assertThat(indexRequest1.getRawTimestamp(), nullValue()); assertThat(indexRequest2.getRawTimestamp(), nullValue());