Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve pipeline on lazy rollover write #115987

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ protected void doRun() throws IOException {

private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata;
Map<String, ComponentTemplate> componentTemplateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions = bulkRequest.getIndexTemplateSubstitutions();
Expand Down Expand Up @@ -228,13 +227,10 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
metadata = clusterService.state().getMetadata();
}

for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored this, pushing the loop down into IngestService. Did this so that storedPipelineCache didn't need to be passed down into IngestService.resolvePipelinesAndUpdateIndexRequest. But, this required iterating over the request twice, once withing IngestService.resolveAndUpdateAllPipelines and once below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do y'all think this is a worthwhile refactoring? Or better to just pass storedPipelineCache into IngestService.resolvePipelinesAndUpdateIndexRequest and return the resolved pipeline from IngestService.resolvePipelinesAndUpdateIndexRequest so it can be set in the cache within the main loop.

Though I like the refactoring, resolvePipelinesAndUpdateIndexRequest has some nice test coverage that I'm disappointed to no longer be using.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an alternative approach without this refactoring: https://github.com/elastic/elasticsearch/pull/116031/files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is pretty performance sensitive code, I think it makes sense to avoid the extra looping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}
// Resolve and set pipelines on each index request
boolean hasIndexRequestsWithPipelines = IngestService.resolveAndUpdateAllPipelines(bulkRequest.requests, metadata);

for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest ir) {
if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
Expand Down
63 changes: 53 additions & 10 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.bulk.TransportAbstractBulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
Expand Down Expand Up @@ -285,23 +286,65 @@ static void resolvePipelinesAndUpdateIndexRequest(
if (indexRequest.isPipelineResolved()) {
return;
}
var pipelines = resolveStoredPipelines(originalRequest, indexRequest, metadata, epochMillis);
setPipelineOnRequest(indexRequest, pipelines);
}

/*
* Here we look for the pipelines associated with the index if the index exists. If the index does not exist we fall back to using
* templates to find the pipelines.
*/
final Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or(
() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)
).orElse(Pipelines.NO_PIPELINES_DEFINED);
public static boolean resolveAndUpdateAllPipelines(List<DocWriteRequest<?>> requests, Metadata metadata) {
final Map<String, IngestService.Pipelines> storedPipelineCache = new HashMap<>();
boolean hasIndexRequestsWithPipelines = false;
for (DocWriteRequest<?> actionRequest : requests) {
IndexRequest indexRequest = TransportAbstractBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
if (indexRequest.isPipelineResolved() == false) {
// Resolve the pipeline from setting or templates if not cached
var pipelines = storedPipelineCache.computeIfAbsent(
indexRequest.index(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now only looking up pipelines per index (rather than per index request) for all requests. So this will apply whether or not lazy rollover is set.

(index) -> IngestService.resolveStoredPipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis())
);

// Set pipeline on the index request
setPipelineOnRequest(indexRequest, pipelines);
}
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}
}
return hasIndexRequestsWithPipelines;
}

private static boolean isRolloverOnWrite(Metadata metadata, IndexRequest indexRequest) {
DataStream dataStream = metadata.dataStreams().get(indexRequest.index());
if (dataStream == null) {
return false;
}
return dataStream.getBackingIndices().isRolloverOnWrite();
}

static Pipelines resolveStoredPipelines(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata,
final long epochMillis
) {
assert indexRequest.isPipelineResolved() == false;
if (isRolloverOnWrite(metadata, indexRequest)) {
return resolvePipelinesFromIndexTemplates(indexRequest, metadata).orElse(Pipelines.NO_PIPELINES_DEFINED);
} else {
return resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or(
() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)
).orElse(Pipelines.NO_PIPELINES_DEFINED);
}
}

static void setPipelineOnRequest(IndexRequest indexRequest, Pipelines resolvedPipelines) {
// The pipeline coming as part of the request always has priority over the resolved one from metadata or templates
String requestPipeline = indexRequest.getPipeline();
if (requestPipeline != null) {
indexRequest.setPipeline(requestPipeline);
} else {
indexRequest.setPipeline(pipelines.defaultPipeline);
indexRequest.setPipeline(resolvedPipelines.defaultPipeline);
}
indexRequest.setFinalPipeline(pipelines.finalPipeline);
indexRequest.setFinalPipeline(resolvedPipelines.finalPipeline);
indexRequest.isPipelineResolved(true);
}

Expand Down Expand Up @@ -1507,7 +1550,7 @@ public static boolean hasPipeline(IndexRequest indexRequest) {
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
}

private record Pipelines(String defaultPipeline, String finalPipeline) {
public record Pipelines(String defaultPipeline, String finalPipeline) {

private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME);

Expand Down