-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resolve pipeline on lazy rollover write #115987
Resolve pipeline on lazy rollover write #115987
Conversation
if (indexRequest.isPipelineResolved() == false) { | ||
// Resolve the pipeline from setting or templates if not cached | ||
var pipelines = storedPipelineCache.computeIfAbsent( | ||
indexRequest.index(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now only looking up pipelines per index (rather than per index request) for all requests. So this will apply whether or not lazy rollover is set.
@@ -228,13 +227,10 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec | |||
metadata = clusterService.state().getMetadata(); | |||
} | |||
|
|||
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this, pushing the loop down into IngestService. Did this so that storedPipelineCache
didn't need to be passed down into IngestService.resolvePipelinesAndUpdateIndexRequest
. But, this required iterating over the request twice, once withing IngestService.resolveAndUpdateAllPipelines
and once below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do y'all think this is a worthwhile refactoring? Or better to just pass storedPipelineCache
into IngestService.resolvePipelinesAndUpdateIndexRequest
and return the resolved pipeline from IngestService.resolvePipelinesAndUpdateIndexRequest
so it can be set in the cache within the main loop.
Though I like the refactoring, resolvePipelinesAndUpdateIndexRequest
has some nice test coverage that I'm disappointed to no longer be using.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an alternative approach without this refactoring: https://github.com/elastic/elasticsearch/pull/116031/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is pretty performance sensitive code, I think it makes sense to avoid the extra looping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dakrone So you prefer the approach in https://github.com/elastic/elasticsearch/pull/116031/files ?
Replacing this PR with the work in #116031 |
If lazy rollover is set, when write with bulk request, always resolve default (and final) pipeline from index templates.
Related to #112781