-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resolve pipeline on lazy rollover write #116031
Resolve pipeline on lazy rollover write #116031
Conversation
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) { | ||
IndexRequest indexRequest = getIndexWriteRequest(actionRequest); | ||
if (indexRequest != null) { | ||
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am been having trouble unit testing this bit of code that does caching because it's inlined in the loop. I'm not super worried, because it is very similar to resolvePipelinesAndUpdateIndexRequest()
which is tested well (but doesn't do the caching).
We discussed previously (and discarded) some ideas for encapsulating this logic so it could be tested. Eg, having resolvePipelinesAndUpdateIndexRequest
take a reference to resolvedPipelineCache
, but that was decidedly nasty.
The only other idea I have, it to encapsulate the resolvedPipelineCache
map in an class called CachedPipelineResolver
with a function resolvePipelinesAndUpdateIndexRequest
consisting of the lines of logic below.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I meant:
public static class CachedPipelineResolver {
private final Map<String, Pipelines> resolvedPipelineCache;
public CachedPipelineResolver() {
this(new HashMap<>());
}
// For testing
CachedPipelineResolver(Map<String, Pipelines> resolvedPipelineCache) {
this.resolvedPipelineCache = resolvedPipelineCache;
}
public void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata
) {
if (indexRequest.isPipelineResolved() == false) {
var pipeline = resolvedPipelineCache.computeIfAbsent(
indexRequest.index(),
// TODO perhaps this should use `threadPool.absoluteTimeInMillis()`, but leaving as is for now.
(index) -> IngestService.resolveStoredPipelines(originalRequest, indexRequest, metadata, System.currentTimeMillis())
);
IngestService.setPipelineOnRequest(indexRequest, pipeline);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If y'all like this approach, I have an un-pushed commit with this change and more unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong feelings either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm in favor of that version of the code (or something not entirely unlike it), and I think it resolves my issues about how intimate these two classes are getting with each other. Given the plan for backporting this PR, I think we should punt on that change until a subsequent PR.
That is, let's merge this PR more or less as-is to all the branches, but then we can follow up with a separate refactoring PR for main
and 8.x
only next week.
Pinging @elastic/es-data-management (Team:Data Management) |
Hi @parkertimmins, I've created a changelog YAML for you. |
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
(index) -> IngestService.resolveStoredPipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis()) | ||
); | ||
IngestService.setPipelineOnRequest(indexRequest, pipeline); | ||
} | ||
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the above block yearns for the mines wants to be a method of IngestService
...
That is, how many static helper methods of IngestService
is TransportAbstractBulkAction
allowed to invoke before we call shenanigans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're punting on this #116031 (comment)
@@ -1507,7 +1540,7 @@ public static boolean hasPipeline(IndexRequest indexRequest) { | |||
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; | |||
} | |||
|
|||
private record Pipelines(String defaultPipeline, String finalPipeline) { | |||
public record Pipelines(String defaultPipeline, String finalPipeline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the spirit of https://github.com/elastic/elasticsearch/pull/116031/files#r1826153821, I'd like this PR a lot more if this record didn't become public
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're punting on this #116031 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I still think we should tighten up the date math expression logic, but there's bigger issues there than just this one PR brings up. Given that template application isn't parsing json (so to speak) I don't think the performance will be bad enough to invalidate this approach (and your cache avoids doing it per-document-per-bulk, so the worst case isn't sooo bad).
I think this is good enough for a bugfix, but that we should clean up the separation of concerns in a subsequent refactoring PR.
💔 Backport failed
You can use sqren/backport to manually backport by running |
If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: elastic#112781
If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: elastic#112781
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: elastic#112781 (cherry picked from commit 6db39d1) # Conflicts: # server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java # server/src/main/java/org/elasticsearch/ingest/IngestService.java
… (#116131) * Resolve pipelines from template if lazy rollover write (#116031) If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: #112781 * Remute tests blocking merge * Remute tests blocking merge
#116132) * Resolve pipelines from template if lazy rollover write (#116031) If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: #112781 * Remute tests block merge * Remute tests block merge
…6137) If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: #112781 (cherry picked from commit 6db39d1)
If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: elastic#112781
This fixes a bug described in #112781 . The issue being that if lazy rollover is set, and a reroute processor always reroutes to another index, the write index of a data stream will never roll over. Because of this, if the pipeline is changed in a template, this change will not go into effect. To avoid this, when lazy rollover is set, we always resolve the pipeline from templates.
Fixes: #112781