Skip to content

Commit

Permalink
Make easier diff
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest committed Oct 24, 2023
1 parent b2e551a commit 8305723
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,111 +355,6 @@ protected void doRun() {
}
}

private boolean preprocessBulkRequest(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
final Metadata metadata = clusterService.state().getMetadata();
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
boolean needsProcessing = false;
for (BulkRequestPreprocessor preprocessor : bulkRequestPreprocessors) {
for (DocWriteRequest<?> docWriteRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(docWriteRequest);
if (indexRequest != null) {
needsProcessing |= preprocessor.needsProcessing(docWriteRequest, indexRequest, metadata);
}

if (docWriteRequest instanceof IndexRequest ir) {
ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
}
}
}

if (needsProcessing) {
// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
// also with requests updated with processing information. This ensures that this on the second time through this method
// this path is never taken.
ActionListener.run(listener, l -> {
if (Assertions.ENABLED) {
final boolean allRequestsUnprocessed = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.noneMatch(preprocessor::hasBeenProcessed);
assert allRequestsUnprocessed : bulkRequest;
}
if ((preprocessor.shouldExecuteOnIngestNode() == false) || clusterService.localNode().isIngestNode()) {
preprocessBulkRequestWithPreprocessor(preprocessor, task, bulkRequest, executorName, l);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, l);
}
});
return true;
}
}
return false;
}

private void preprocessBulkRequestWithPreprocessor(
BulkRequestPreprocessor preprocessor,
Task task,
BulkRequest original,
String executorName,
ActionListener<BulkResponse> listener
) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
preprocessor.processBulkRequest(
threadPool.executor(executorName),
original.numberOfActions(),
() -> bulkRequestModifier,
bulkRequestModifier::markItemAsDropped,
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
logger.debug("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(
ingestTookInMillis,
listener
);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, actionListener);
}

@Override
public boolean isForceExecution() {
// If we fork back to a write thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
};
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread:
if (originalThread == Thread.currentThread()) {
runnable.run();
} else {
threadPool.executor(executorName).execute(runnable);
}
}
}
},
executorName
);
}

static void prohibitAppendWritesInBackingIndices(DocWriteRequest<?> writeRequest, Metadata metadata) {
DocWriteRequest.OpType opType = writeRequest.opType();
if ((opType == OpType.CREATE || opType == OpType.INDEX) == false) {
Expand Down Expand Up @@ -875,6 +770,111 @@ private long relativeTime() {
return relativeTimeProvider.getAsLong();
}

private boolean preprocessBulkRequest(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
final Metadata metadata = clusterService.state().getMetadata();
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
boolean needsProcessing = false;
for (BulkRequestPreprocessor preprocessor : bulkRequestPreprocessors) {
for (DocWriteRequest<?> docWriteRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(docWriteRequest);
if (indexRequest != null) {
needsProcessing |= preprocessor.needsProcessing(docWriteRequest, indexRequest, metadata);
}

if (docWriteRequest instanceof IndexRequest ir) {
ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
}
}
}

if (needsProcessing) {
// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
// also with requests updated with processing information. This ensures that this on the second time through this method
// this path is never taken.
ActionListener.run(listener, l -> {
if (Assertions.ENABLED) {
final boolean allRequestsUnprocessed = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.noneMatch(preprocessor::hasBeenProcessed);
assert allRequestsUnprocessed : bulkRequest;
}
if ((preprocessor.shouldExecuteOnIngestNode() == false) || clusterService.localNode().isIngestNode()) {
preprocessBulkRequestWithPreprocessor(preprocessor, task, bulkRequest, executorName, l);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, l);
}
});
return true;
}
}
return false;
}

private void preprocessBulkRequestWithPreprocessor(
BulkRequestPreprocessor preprocessor,
Task task,
BulkRequest original,
String executorName,
ActionListener<BulkResponse> listener
) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
preprocessor.processBulkRequest(
threadPool.executor(executorName),
original.numberOfActions(),
() -> bulkRequestModifier,
bulkRequestModifier::markItemAsDropped,
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
logger.debug("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(
ingestTookInMillis,
listener
);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, actionListener);
}

@Override
public boolean isForceExecution() {
// If we fork back to a write thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
};
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread:
if (originalThread == Thread.currentThread()) {
runnable.run();
} else {
threadPool.executor(executorName).execute(runnable);
}
}
}
},
executorName
);
}

static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {

final BulkRequest bulkRequest;
Expand Down
18 changes: 8 additions & 10 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ public class IngestService extends AbstractBulkRequestPreprocessor implements Cl
private final ClusterService clusterService;
private final ScriptService scriptService;
private final Map<String, Processor.Factory> processorFactories;
protected final Client client;
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
// processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map<String, PipelineHolder> pipelines = Map.of();
protected final ThreadPool threadPool;
private final ThreadPool threadPool;
private final IngestMetric totalMetrics = new IngestMetric();
private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
private volatile ClusterState state;
Expand Down Expand Up @@ -187,7 +186,6 @@ public IngestService(
super(documentParsingObserverSupplier);
this.clusterService = clusterService;
this.scriptService = scriptService;
this.client = client;
this.processorFactories = processorFactories(
ingestPlugins,
new Processor.Parameters(
Expand Down Expand Up @@ -601,7 +599,7 @@ public void onFailure(Exception e) {

IngestDocument ingestDocument = newIngestDocument(indexRequest);

executePipelinesOnActionRequest(pipelines, indexRequest, ingestDocument, documentListener);
executePipelines(pipelines, indexRequest, ingestDocument, documentListener);
indexRequest.setPipelinesHaveRun();

assert indexRequest.index() != null;
Expand Down Expand Up @@ -721,7 +719,7 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelin
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

public void processBulkRequest(
public void executeBulkRequest(
final int numberOfActionRequests,
final Iterable<DocWriteRequest<?>> actionRequests,
final IntConsumer onDropped,
Expand Down Expand Up @@ -755,7 +753,7 @@ protected void doRun() {
});
}

private void executePipelinesOnActionRequest(
private void executePipelines(
DocWriteRequest<?> actionRequest,
final int slot,
final Releasable ref,
Expand Down Expand Up @@ -794,7 +792,7 @@ public void onFailure(Exception e) {

IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver);

executePipelinesOnActionRequest(pipelines, indexRequest, ingestDocument, documentListener);
executePipelines(pipelines, indexRequest, ingestDocument, documentListener);
indexRequest.setPipelinesHaveRun();

assert actionRequest.index() != null;
Expand Down Expand Up @@ -874,7 +872,7 @@ public PipelineSlot next() {
}
}

private void executePipelinesOnActionRequest(
private void executePipelines(
final PipelineIterator pipelines,
final IndexRequest indexRequest,
final IngestDocument ingestDocument,
Expand Down Expand Up @@ -994,7 +992,7 @@ private void executePipelinesOnActionRequest(
}

if (newPipelines.hasNext()) {
executePipelinesOnActionRequest(newPipelines, indexRequest, ingestDocument, listener);
executePipelines(newPipelines, indexRequest, ingestDocument, listener);
} else {
// update the index request's source and (potentially) cache the timestamp for TSDB
updateIndexRequestSource(indexRequest, ingestDocument);
Expand Down Expand Up @@ -1082,7 +1080,7 @@ static String getProcessorName(Processor processor) {
/**
* Builds a new ingest document from the passed-in index request.
*/
protected static IngestDocument newIngestDocument(final IndexRequest request, DocumentParsingObserver documentParsingObserver) {
private static IngestDocument newIngestDocument(final IndexRequest request, DocumentParsingObserver documentParsingObserver) {
return new IngestDocument(
request.index(),
request.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down
Loading

0 comments on commit 8305723

Please sign in to comment.