Skip to content

Commit

Permalink
Resolve pipelines from template if lazy rollover write (#116031)
Browse files Browse the repository at this point in the history
If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index.

Fixes: #112781
  • Loading branch information
parkertimmins authored Nov 2, 2024
1 parent 2079e8c commit 6db39d1
Show file tree
Hide file tree
Showing 5 changed files with 523 additions and 14 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/116031.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 116031
summary: Resolve pipelines from template on lazy rollover write
area: Data streams
type: bug
issues:
- 112781
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ teardown:
ingest.delete_pipeline:
id: "pipeline-2"
ignore: 404

- do:
indices.delete_data_stream:
name: "data-stream-*"
expand_wildcards: all
---
"Test first matching router terminates pipeline":
- skip:
Expand Down Expand Up @@ -252,3 +255,322 @@ teardown:
- match: { _source.existing-field : true }
- match: { _source.added-in-pipeline-before-reroute : true }
- match: { _source.added-in-pipeline-after-reroute : true }

---
"Test data stream with lazy rollover obtains pipeline from template":
# This test starts with chain of reroutes, from data-stream-1, to data-stream-2, to data-stream-3
# We then add higher priority templates that remove the reroute processors. Then we show that
# after a lazy rollover on data-stream-2, a document written to data-stream-1 still gets rerouted
# to data-steam-2, but not on to data-stream-3. Finally, after a lazy rollover on data-stream-1
# causes the new template to also take effect on data-stream-1, and the last write goes directly
# into data-stream-1. Multiple reroute steps are tested because pipeline resolution using a
# different code path for initial index and indices after a reroute.

# start with pipeline that reroutes from ds-1 to ds-2
- do:
ingest.put_pipeline:
id: "reroute-1"
body: >
{
"processors": [
{
"reroute" : {"destination": "data-stream-2"}
}
]
}
- match: { acknowledged: true }

# and pipeline that reroutes from ds-2 to ds-3
- do:
ingest.put_pipeline:
id: "reroute-2"
body: >
{
"processors": [
{
"reroute" : {"destination": "data-stream-3"}
}
]
}
- match: { acknowledged: true }

# set pipelines in templates
- do:
indices.put_index_template:
name: template-1
body:
index_patterns: [ "data-stream-1"]
priority: 1
data_stream: { }
template:
settings:
index.default_pipeline: "reroute-1"
- match: { acknowledged: true }
- do:
indices.put_index_template:
name: template-2
body:
index_patterns: [ "data-stream-2"]
priority: 1
data_stream: { }
template:
settings:
index.default_pipeline: "reroute-2"
- match: { acknowledged: true }
- do:
indices.put_index_template:
name: template_3
body:
index_patterns: [ "data-stream-3" ]
priority: 1
data_stream: { }
- match: { acknowledged: true }

- do:
indices.create_data_stream:
name: data-stream-1
- match: { acknowledged: true }
- do:
indices.create_data_stream:
name: data-stream-2
- match: { acknowledged: true }
- do:
indices.create_data_stream:
name: data-stream-3
- match: { acknowledged: true }

# write to ds-1
- do:
index:
index: data-stream-1
body:
'@timestamp': '2020-12-12'
some-field: 1
- do:
indices.refresh:
index: data-stream-3

# document is rerouted to ds-3
- do:
search:
index: data-stream-3
body: { query: { match_all: { } } }
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.some-field: 1 }

# add higher priority templates without reroute processors
- do:
indices.put_index_template:
name: template_4
body:
index_patterns: [ "data-stream-1" ]
priority: 2 # higher priority
data_stream: { }
- match: { acknowledged: true }
- do:
indices.put_index_template:
name: template_5
body:
index_patterns: [ "data-stream-2" ]
priority: 2 # higher priority
data_stream: { }
- match: { acknowledged: true }

# write to ds-1
- do:
index:
index: data-stream-1
body:
'@timestamp': '2020-12-12'
some-field: 2
- do:
indices.refresh:
index: data-stream-3

# still rerouted because ds-1 and ds-2 rolled over
- do:
search:
index: data-stream-3
body: { query: { match_all: { } } }
- length: { hits.hits: 2 }

# perform lazy rollover on ds-2
- do:
indices.rollover:
alias: data-stream-2
lazy: true

# write to ds-1
- do:
index:
index: data-stream-1
body:
'@timestamp': '2020-12-12'
some-field: 3
- do:
indices.refresh:
index: data-stream-2

# written to ds-2, as rerouted to ds-2, but not on to ds-3
- do:
search:
index: data-stream-2
body: { query: { match_all: { } } }
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.some-field: 3 }

# perform lazy rollover on 1
- do:
indices.rollover:
alias: data-stream-1
lazy: true

# write to ds-1
- do:
index:
index: data-stream-1
body:
'@timestamp': '2020-12-12'
some-field: 4
- do:
indices.refresh:
index: data-stream-1

# written to ds-1, as not rerouted to ds-2
- do:
search:
index: data-stream-1
body: { query: { match_all: { } } }
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.some-field: 4 }

---
"Test remove then add reroute processor with and without lazy rollover":
# start with pipeline that reroutes from ds-1 to ds-2
- do:
ingest.put_pipeline:
id: "reroute-1"
body: >
{
"processors": [
{
"reroute" : {"destination": "data-stream-2"}
}
]
}
- match: { acknowledged: true }

# set pipelines in templates
- do:
indices.put_index_template:
name: template-1
body:
index_patterns: [ "data-stream-1"]
priority: 1
data_stream: { }
template:
settings:
index.default_pipeline: "reroute-1"
- match: { acknowledged: true }
- do:
indices.put_index_template:
name: template_2
body:
index_patterns: [ "data-stream-2" ]
priority: 1
data_stream: { }
- match: { acknowledged: true }

- do:
indices.create_data_stream:
name: data-stream-1
- match: { acknowledged: true }

- do:
indices.create_data_stream:
name: data-stream-2
- match: { acknowledged: true }

# write to ds-1
- do:
index:
index: data-stream-1
body:
'@timestamp': '2020-12-12'
some-field: 1
- do:
indices.refresh:
index: data-stream-2

# document is rerouted to ds-2
- do:
search:
index: data-stream-2
body: { query: { match_all: { } } }
- length: { hits.hits: 1 }

# add higher priority templates without reroute processors
- do:
indices.put_index_template:
name: template_3
body:
index_patterns: [ "data-stream-1" ]
priority: 2 # higher priority
data_stream: { }
- match: { acknowledged: true }

# perform lazy rollover on ds-2
- do:
indices.rollover:
alias: data-stream-1
lazy: true

# write to ds-1
- do:
index:
index: data-stream-1
body:
'@timestamp': '2020-12-12'
some-field: 2
- do:
indices.refresh:
index: data-stream-1

# written to ds-1, as not rerouted to ds-2
- do:
search:
index: data-stream-1
body: { query: { match_all: { } } }
- length: { hits.hits: 1 }

# add another higher priority templates with reroute processors
- do:
indices.put_index_template:
name: template-3
body:
index_patterns: [ "data-stream-1" ]
priority: 3
data_stream: { }
template:
settings:
index.default_pipeline: "reroute-1"
- match: { acknowledged: true }

# don't do a lazy rollover
# write to ds-1
- do:
index:
index: data-stream-1
body:
'@timestamp': '2020-12-12'
some-field: 3
- do:
indices.refresh:
index: data-stream-1

# because no lazy rollover, still no reroute processor
- do:
search:
index: data-stream-1
body: { query: { match_all: { } } }
- length: { hits.hits: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,18 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
metadata = clusterService.state().getMetadata();
}

Map<String, IngestService.Pipelines> resolvedPipelineCache = new HashMap<>();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
if (indexRequest.isPipelineResolved() == false) {
var pipeline = resolvedPipelineCache.computeIfAbsent(
indexRequest.index(),
// TODO perhaps this should use `threadPool.absoluteTimeInMillis()`, but leaving as is for now.
(index) -> IngestService.resolvePipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis())
);
IngestService.setPipelineOnRequest(indexRequest, pipeline);
}
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}

Expand Down
Loading

0 comments on commit 6db39d1

Please sign in to comment.