-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resolve pipeline on lazy rollover write #115987
Closed
parkertimmins
wants to merge
5
commits into
elastic:main
from
parkertimmins:resolve-pipeline-on-lazy-rollover-write
Closed
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
70ce886
Initial work on resolving pipeline from template if lazy rollover
parkertimmins c79b34c
spotless & only use index from IndexRequest
parkertimmins 8a5aa09
Merge branch 'main' into resolve-pipeline-on-lazy-rollover-write
parkertimmins 38da866
Re-use resolved pipeline for all request with same index
parkertimmins 02fe6a5
Fix javadoc causing compilation failure
parkertimmins File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; | ||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; | ||
import org.elasticsearch.action.bulk.FailureStoreMetrics; | ||
import org.elasticsearch.action.bulk.TransportAbstractBulkAction; | ||
import org.elasticsearch.action.bulk.TransportBulkAction; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.action.ingest.DeletePipelineRequest; | ||
|
@@ -285,23 +286,65 @@ static void resolvePipelinesAndUpdateIndexRequest( | |
if (indexRequest.isPipelineResolved()) { | ||
return; | ||
} | ||
var pipelines = resolveStoredPipelines(originalRequest, indexRequest, metadata, epochMillis); | ||
setPipelineOnRequest(indexRequest, pipelines); | ||
} | ||
|
||
/* | ||
* Here we look for the pipelines associated with the index if the index exists. If the index does not exist we fall back to using | ||
* templates to find the pipelines. | ||
*/ | ||
final Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or( | ||
() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata) | ||
).orElse(Pipelines.NO_PIPELINES_DEFINED); | ||
public static boolean resolveAndUpdateAllPipelines(List<DocWriteRequest<?>> requests, Metadata metadata) { | ||
final Map<String, IngestService.Pipelines> storedPipelineCache = new HashMap<>(); | ||
boolean hasIndexRequestsWithPipelines = false; | ||
for (DocWriteRequest<?> actionRequest : requests) { | ||
IndexRequest indexRequest = TransportAbstractBulkAction.getIndexWriteRequest(actionRequest); | ||
if (indexRequest != null) { | ||
if (indexRequest.isPipelineResolved() == false) { | ||
// Resolve the pipeline from setting or templates if not cached | ||
var pipelines = storedPipelineCache.computeIfAbsent( | ||
indexRequest.index(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is now only looking up pipelines per index (rather than per index request) for all requests. So this will apply whether or not lazy rollover is set. |
||
(index) -> IngestService.resolveStoredPipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis()) | ||
); | ||
|
||
// Set pipeline on the index request | ||
setPipelineOnRequest(indexRequest, pipelines); | ||
} | ||
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest); | ||
} | ||
} | ||
return hasIndexRequestsWithPipelines; | ||
} | ||
|
||
private static boolean isRolloverOnWrite(Metadata metadata, IndexRequest indexRequest) { | ||
DataStream dataStream = metadata.dataStreams().get(indexRequest.index()); | ||
if (dataStream == null) { | ||
return false; | ||
} | ||
return dataStream.getBackingIndices().isRolloverOnWrite(); | ||
} | ||
|
||
static Pipelines resolveStoredPipelines( | ||
final DocWriteRequest<?> originalRequest, | ||
final IndexRequest indexRequest, | ||
final Metadata metadata, | ||
final long epochMillis | ||
) { | ||
assert indexRequest.isPipelineResolved() == false; | ||
if (isRolloverOnWrite(metadata, indexRequest)) { | ||
return resolvePipelinesFromIndexTemplates(indexRequest, metadata).orElse(Pipelines.NO_PIPELINES_DEFINED); | ||
} else { | ||
return resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or( | ||
() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata) | ||
).orElse(Pipelines.NO_PIPELINES_DEFINED); | ||
} | ||
} | ||
|
||
static void setPipelineOnRequest(IndexRequest indexRequest, Pipelines resolvedPipelines) { | ||
// The pipeline coming as part of the request always has priority over the resolved one from metadata or templates | ||
String requestPipeline = indexRequest.getPipeline(); | ||
if (requestPipeline != null) { | ||
indexRequest.setPipeline(requestPipeline); | ||
} else { | ||
indexRequest.setPipeline(pipelines.defaultPipeline); | ||
indexRequest.setPipeline(resolvedPipelines.defaultPipeline); | ||
} | ||
indexRequest.setFinalPipeline(pipelines.finalPipeline); | ||
indexRequest.setFinalPipeline(resolvedPipelines.finalPipeline); | ||
indexRequest.isPipelineResolved(true); | ||
} | ||
|
||
|
@@ -1507,7 +1550,7 @@ public static boolean hasPipeline(IndexRequest indexRequest) { | |
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; | ||
} | ||
|
||
private record Pipelines(String defaultPipeline, String finalPipeline) { | ||
public record Pipelines(String defaultPipeline, String finalPipeline) { | ||
|
||
private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME); | ||
|
||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this, pushing the loop down into IngestService. Did this so that
storedPipelineCache
didn't need to be passed down intoIngestService.resolvePipelinesAndUpdateIndexRequest
. But, this required iterating over the request twice, once withingIngestService.resolveAndUpdateAllPipelines
and once below.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do y'all think this is a worthwhile refactoring? Or better to just pass
storedPipelineCache
intoIngestService.resolvePipelinesAndUpdateIndexRequest
and return the resolved pipeline fromIngestService.resolvePipelinesAndUpdateIndexRequest
so it can be set in the cache within the main loop.Though I like the refactoring,
resolvePipelinesAndUpdateIndexRequest
has some nice test coverage that I'm disappointed to no longer be using.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an alternative approach without this refactoring: https://github.com/elastic/elasticsearch/pull/116031/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is pretty performance sensitive code, I think it makes sense to avoid the extra looping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dakrone So you prefer the approach in https://github.com/elastic/elasticsearch/pull/116031/files ?