From 688fc7e125ccaca06f050387979679012e32b350 Mon Sep 17 00:00:00 2001 From: carlosdelest Date: Mon, 23 Oct 2023 12:19:49 +0200 Subject: [PATCH] First working version of inference resolving --- .../org/elasticsearch/TransportVersions.java | 1 + .../action/bulk/TransportBulkAction.java | 89 +++++++++++- .../action/index/IndexRequest.java | 27 ++++ .../vectors/SparseVectorFieldMapper.java | 3 +- .../elasticsearch/ingest/IngestService.java | 128 ++++++++++++++---- .../bulk/TransportBulkActionIngestTests.java | 22 +-- .../ingest/IngestServiceTests.java | 42 +++--- 7 files changed, 248 insertions(+), 64 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index eccdd791c8244..52e9cf836bceb 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -143,6 +143,7 @@ static TransportVersion def(int id) { public static final TransportVersion BUILD_QUALIFIER_SEPARATED = def(8_518_00_0); public static final TransportVersion PIPELINES_IN_BULK_RESPONSE_ADDED = def(8_519_00_0); public static final TransportVersion PLUGIN_DESCRIPTOR_STRING_VERSION = def(8_520_00_0); + public static final TransportVersion SEMANTIC_TEXT_FIELD_ADDED = def(8_521_00_0); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 6188d3612e5af..09b4404ae01f3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -271,7 +271,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); boolean hasIndexRequestsWithPipelines = false; - boolean hasInferenceFields = false; + boolean needsFieldInference = false; final Metadata metadata = clusterService.state().getMetadata(); final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion(); for (DocWriteRequest actionRequest : bulkRequest.requests) { @@ -279,7 +279,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec if (indexRequest != null) { IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata); hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest); - hasInferenceFields |= ingestService.hasInferenceFields(indexRequest); + needsFieldInference |= ingestService.needsFieldInference(indexRequest); } if (actionRequest instanceof IndexRequest ir) { @@ -290,7 +290,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec } } - if (hasIndexRequestsWithPipelines || hasInferenceFields) { + if (hasIndexRequestsWithPipelines) { // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method, // this path is never taken. @@ -304,7 +304,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, executorName, l); + processPipelinesBulkIndexIngestRequest(task, bulkRequest, executorName, l); } else { ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, l); } @@ -312,6 +312,24 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec return; } + if (needsFieldInference) { + // this method (doExecute) will be called again, but with the bulk requests updated with the field inference and also with + // isFieldInferenceResolved set to true on each request. This ensures that this on the second time through this method, + // this path is never taken. + ActionListener.run(listener, l -> { + if (Assertions.ENABLED) { + final boolean isFieldsInferenceResolved = bulkRequest.requests() + .stream() + .map(TransportBulkAction::getIndexWriteRequest) + .filter(Objects::nonNull) + .allMatch(IndexRequest::isFieldInferenceResolved); + assert isFieldsInferenceResolved == false : bulkRequest; + } + processFieldsInferenceBulkIndexIngestRequest(task, bulkRequest, executorName, l); + }); + return; + } + // Attempt to create all the indices that we're going to need during the bulk before we start. // Step 1: collect all the indices in the request final Map indices = bulkRequest.requests.stream() @@ -803,7 +821,7 @@ private long relativeTime() { return relativeTimeProvider.getAsLong(); } - private void processBulkIndexIngestRequest( + private void processPipelinesBulkIndexIngestRequest( Task task, BulkRequest original, String executorName, @@ -811,7 +829,7 @@ private void processBulkIndexIngestRequest( ) { final long ingestStartTimeInNanos = System.nanoTime(); final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.executeBulkRequest( + ingestService.executePipelinesBulkRequest( original.numberOfActions(), () -> bulkRequestModifier, bulkRequestModifier::markItemAsDropped, @@ -862,6 +880,65 @@ public boolean isForceExecution() { ); } + private void processFieldsInferenceBulkIndexIngestRequest( + Task task, + BulkRequest original, + String executorName, + ActionListener listener + ) { + final long ingestStartTimeInNanos = System.nanoTime(); + final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); + ingestService.executeFieldInferenceBulkRequest( + original.numberOfActions(), + () -> bulkRequestModifier, + bulkRequestModifier::markItemAsDropped, + bulkRequestModifier::markItemAsFailed, + (originalThread, exception) -> { + if (exception != null) { + logger.debug("failed to execute inference for a bulk request", exception); + listener.onFailure(exception); + } else { + BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); + long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); + ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded( + ingestTookInMillis, + listener + ); + if (bulkRequest.requests().isEmpty()) { + // at this stage, the transport bulk action can't deal with a bulk request with no requests, + // so we stop and send an empty response back to the client. + // (this will happen if pre-processing all items in the bulk failed) + actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); + } else { + ActionRunnable runnable = new ActionRunnable<>(actionListener) { + @Override + protected void doRun() { + doInternalExecute(task, bulkRequest, executorName, actionListener); + } + + @Override + public boolean isForceExecution() { + // If we fork back to a write thread we **not** should fail, because tp queue is full. + // (Otherwise the work done during ingest will be lost) + // It is okay to force execution here. Throttling of write requests happens prior to + // ingest when a node receives a bulk request. + return true; + } + }; + // If a processor went async and returned a response on a different thread then + // before we continue the bulk request we should fork back on a write thread: + if (originalThread == Thread.currentThread()) { + runnable.run(); + } else { + threadPool.executor(executorName).execute(runnable); + } + } + } + }, + executorName + ); + } + static final class BulkRequestModifier implements Iterator> { final BulkRequest bulkRequest; diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 2f202dd21ad7c..2a671835d155e 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Objects; +import static org.elasticsearch.TransportVersions.SEMANTIC_TEXT_FIELD_ADDED; import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -105,6 +106,8 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean isPipelineResolved; + private boolean isFieldInferenceResolved; + private boolean requireAlias; /** * This indicates whether the response to this request ought to list the ingest pipelines that were executed on the document @@ -189,6 +192,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio : new ArrayList<>(possiblyImmutableExecutedPipelines); } } + isFieldInferenceResolved = in.getTransportVersion().before(SEMANTIC_TEXT_FIELD_ADDED) || in.readBoolean(); } public IndexRequest() { @@ -375,6 +379,26 @@ public boolean isPipelineResolved() { return this.isPipelineResolved; } + /** + * Sets if field inference for this request has been resolved by the coordinating node. + * + * @param isFieldInferenceResolved true if the field inference has been resolved + * @return the request + */ + public IndexRequest isFieldInferenceResolved(final boolean isFieldInferenceResolved) { + this.isFieldInferenceResolved = isFieldInferenceResolved; + return this; + } + + /** + * Returns whether the field inference for this request has been resolved by the coordinating node. + * + * @return true if the pipeline has been resolved + */ + public boolean isFieldInferenceResolved() { + return this.isFieldInferenceResolved; + } + /** * The source of the document to index, recopied to a new array if it is unsafe. */ @@ -755,6 +779,9 @@ private void writeBody(StreamOutput out) throws IOException { out.writeOptionalCollection(executedPipelines, StreamOutput::writeString); } } + if (out.getTransportVersion().onOrAfter(SEMANTIC_TEXT_FIELD_ADDED)) { + out.writeBoolean(isFieldInferenceResolved); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/vectors/SparseVectorFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/vectors/SparseVectorFieldMapper.java index c64c860e85f6f..f10055fd4669b 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/vectors/SparseVectorFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/vectors/SparseVectorFieldMapper.java @@ -23,7 +23,6 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperBuilderContext; import org.elasticsearch.index.mapper.SourceValueFetcher; -import org.elasticsearch.index.mapper.TextFieldMapper; import org.elasticsearch.index.mapper.TextSearchInfo; import org.elasticsearch.index.mapper.ValueFetcher; import org.elasticsearch.index.query.SearchExecutionContext; @@ -82,7 +81,7 @@ public SparseVectorFieldMapper build(MapperBuilderContext context) { } return new Builder(n); - }, notInMultiFieldsUlessParentOfType(CONTENT_TYPE, TextFieldMapper.CONTENT_TYPE)); // TODO Change for semantic_text field type + }, notInMultiFields(CONTENT_TYPE)); public static final class SparseVectorFieldType extends MappedFieldType { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index e376e6f8a8a24..4f6d258745812 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -156,9 +156,9 @@ public static MatcherWatchdog createGrokThreadWatchdog(Environment env, ThreadPo : batchExecutionContext.initialState().copyAndUpdateMetadata(b -> b.putCustom(IngestMetadata.TYPE, finalIngestMetadata)); }; - public boolean hasInferenceFields(IndexRequest indexRequest) { - return indexRequest.sourceAsMap().keySet().stream() - .anyMatch(fieldName -> fieldNeedsInference(indexRequest.index(), fieldName)); + public boolean needsFieldInference(IndexRequest indexRequest) { + return (indexRequest.isFieldInferenceResolved() == false) + && indexRequest.sourceAsMap().keySet().stream().anyMatch(fieldName -> fieldNeedsInference(indexRequest.index(), fieldName)); } /** @@ -661,7 +661,7 @@ void validatePipeline(Map ingestInfos, String pipelin ExceptionsHelper.rethrowAndSuppress(exceptions); } - public void executeBulkRequest( + public void executePipelinesBulkRequest( final int numberOfActionRequests, final Iterable> actionRequests, final IntConsumer onDropped, @@ -685,28 +685,94 @@ protected void doRun() { int i = 0; for (DocWriteRequest actionRequest : actionRequests) { IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); - if (indexRequest != null) { - PipelineIterator pipelines = getAndResetPipelines(indexRequest); - if (pipelines.hasNext()) { - executePipelinesOnActionRequest( - actionRequest, - i, - refs.acquire(), - indexRequest, - pipelines, - onDropped, - onFailure - ); + if (indexRequest == null) { + i++; + continue; + } + + PipelineIterator pipelines = getAndResetPipelines(indexRequest); + if (pipelines.hasNext() == false) { + i++; + continue; + } + + // start the stopwatch and acquire a ref to indicate that we're working on this document + final long startTimeInNanos = System.nanoTime(); + totalMetrics.preIngest(); + final int slot = i; + final Releasable ref = refs.acquire(); + // the document listener gives us three-way logic: a document can fail processing (1), or it can + // be successfully processed. a successfully processed document can be kept (2) or dropped (3). + final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Boolean kept) { + assert kept != null; + if (kept == false) { + onDropped.accept(slot); + } + } + + @Override + public void onFailure(Exception e) { + totalMetrics.ingestFailed(); + onFailure.accept(slot, e); } + }, () -> { + // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate + // that we're finished with this document + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); + ref.close(); + }); + DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); + + IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); + + executePipelinesOnActionRequest(pipelines, indexRequest, ingestDocument, documentListener); + indexRequest.setPipelinesHaveRun(); + assert actionRequest.index() != null; + documentParsingObserver.setIndexName(actionRequest.index()); + documentParsingObserver.close(); + + i++; + } + } + } + }); + } + + public void executeFieldInferenceBulkRequest( + final int numberOfActionRequests, + final Iterable> actionRequests, + final IntConsumer onDropped, + final BiConsumer onFailure, + final BiConsumer onCompletion, + final String executorName + ) { + assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]"; + + threadPool.executor(executorName).execute(new AbstractRunnable() { + + @Override + public void onFailure(Exception e) { + onCompletion.accept(null, e); + } + + @Override + protected void doRun() { + final Thread originalThread = Thread.currentThread(); + try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) { + int i = 0; + for (DocWriteRequest actionRequest : actionRequests) { + IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); + if (indexRequest != null) { String index = indexRequest.index(); Map sourceMap = indexRequest.sourceAsMap(); final int position = i; - sourceMap.entrySet().stream() - .filter(entry -> fieldNeedsInference(index, entry.getKey())) - .forEach(entry -> { - runInferenceForField(indexRequest, entry.getKey(), entry.getValue(), refs, position, onFailure); - }); + sourceMap.entrySet().stream().filter(entry -> fieldNeedsInference(index, entry.getKey())).forEach(entry -> { + runInferenceForField(indexRequest, entry.getKey(), entry.getValue(), refs, position, onFailure); + }); } i++; } @@ -720,7 +786,14 @@ private boolean fieldNeedsInference(String index, String fieldName) { return fieldName.startsWith("infer_"); } - private void runInferenceForField(IndexRequest indexRequest, String fieldName, Object fieldValue, RefCountingRunnable ref, int position, BiConsumer onFailure) { + private void runInferenceForField( + IndexRequest indexRequest, + String fieldName, + Object fieldValue, + RefCountingRunnable ref, + int position, + BiConsumer onFailure + ) { var ingestDocument = newIngestDocument(indexRequest, documentParsingObserverSupplier.get()); if (ingestDocument.hasField(fieldName) == false) { return; @@ -729,12 +802,19 @@ private void runInferenceForField(IndexRequest indexRequest, String fieldName, O ref.acquire(); // TODO Hardcoding model ID and task type - InferenceAction.Request inferenceRequest = new InferenceAction.Request(TaskType.SPARSE_EMBEDDING, "my-elser-model", ingestDocument.getFieldValue(fieldName, String.class), Map.of()); + InferenceAction.Request inferenceRequest = new InferenceAction.Request( + TaskType.SPARSE_EMBEDDING, + "my-elser-model", + ingestDocument.getFieldValue(fieldName, String.class), + Map.of() + ); client.execute(InferenceAction.INSTANCE, inferenceRequest, new ActionListener() { @Override public void onResponse(InferenceAction.Response response) { - ingestDocument.setFieldValue(fieldName + ".inference", response.getResult().asMap(fieldName).get(fieldName)); + ingestDocument.setFieldValue(fieldName + "_inference", response.getResult().asMap(fieldName).get(fieldName)); + updateIndexRequestSource(indexRequest, ingestDocument); + indexRequest.isFieldInferenceResolved(true); ref.close(); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 0168eb0488a5b..c39c07510bdef 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -279,7 +279,7 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -321,7 +321,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(1), bulkDocsItr.capture(), any(), @@ -367,7 +367,7 @@ public void testIngestSystemLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -404,7 +404,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executePipelinesBulkRequest(anyInt(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -444,7 +444,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executePipelinesBulkRequest(anyInt(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -524,7 +524,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -572,7 +572,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { assertFalse(action.indexCreated); // no index yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(1), bulkDocsItr.capture(), any(), @@ -666,7 +666,7 @@ public void testFindDefaultPipelineFromTemplateMatch() { ); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(1), bulkDocsItr.capture(), any(), @@ -710,7 +710,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { ); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(1), bulkDocsItr.capture(), any(), @@ -737,7 +737,7 @@ public void testIngestCallbackExceptionHandled() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), any(), @@ -774,7 +774,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest( + verify(ingestService).executePipelinesBulkRequest( eq(1), bulkDocsItr.capture(), any(), diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3b114cf0a618e..da7284e046a13 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -207,7 +207,7 @@ public void testExecuteIndexPipelineDoesNotExist() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1106,7 +1106,7 @@ public String getType() { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.executePipelinesBulkRequest( bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1149,7 +1149,7 @@ public void testExecuteBulkPipelineDoesNotExist() { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.executePipelinesBulkRequest( bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1213,7 +1213,7 @@ public void close() { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.executePipelinesBulkRequest( bulkRequest.numberOfActions(), bulkRequest.requests(), indexReq -> {}, @@ -1246,7 +1246,7 @@ public void testExecuteSuccess() { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1279,7 +1279,7 @@ public void testDynamicTemplates() throws Exception { CountDownLatch latch = new CountDownLatch(1); final BiConsumer failureHandler = (v, e) -> { throw new AssertionError("must never fail", e); }; final BiConsumer completionHandler = (t, e) -> latch.countDown(); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); latch.await(); assertThat(indexRequest.getDynamicTemplates(), equalTo(Map.of("foo", "bar", "foo.bar", "baz"))); } @@ -1300,7 +1300,7 @@ public void testExecuteEmptyPipeline() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1354,7 +1354,7 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1403,7 +1403,7 @@ public void testExecuteFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1452,7 +1452,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } @@ -1495,7 +1495,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1549,7 +1549,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.executePipelinesBulkRequest( numRequest, bulkRequest.requests(), indexReq -> {}, @@ -1607,7 +1607,7 @@ public void testBulkRequestExecution() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( + ingestService.executePipelinesBulkRequest( numRequest, bulkRequest.requests(), indexReq -> {}, @@ -1720,7 +1720,7 @@ public String execute() { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_id2"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); { final IngestStats ingestStats = ingestService.stats(); @@ -1791,7 +1791,7 @@ public void testStats() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1").setFinalPipeline("_none"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.pipelineStats().size(), equalTo(2)); @@ -1808,7 +1808,7 @@ public void testStats() throws Exception { assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1830,7 +1830,7 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1853,7 +1853,7 @@ public void testStats() throws Exception { clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, failureHandler, completionHandler, Names.WRITE); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.pipelineStats().size(), equalTo(2)); // total @@ -1941,7 +1941,7 @@ public String getDescription() { final BiConsumer completionHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final IntConsumer dropHandler = mock(IntConsumer.class); - ingestService.executeBulkRequest( + ingestService.executePipelinesBulkRequest( bulkRequest.numberOfActions(), bulkRequest.requests(), dropHandler, @@ -2029,7 +2029,7 @@ public void testCBORParsing() throws Exception { .setPipeline("_id") .setFinalPipeline("_none"); - ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executePipelinesBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); } assertThat(reference.get(), is(instanceOf(byte[].class))); @@ -2100,7 +2100,7 @@ public void testSetsRawTimestamp() { bulkRequest.add(indexRequest6); bulkRequest.add(indexRequest7); bulkRequest.add(indexRequest8); - ingestService.executeBulkRequest(8, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + ingestService.executePipelinesBulkRequest(8, bulkRequest.requests(), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); assertThat(indexRequest1.getRawTimestamp(), nullValue()); assertThat(indexRequest2.getRawTimestamp(), nullValue());