From 9e293d68f02fe976079d797bd713c42d5ac637e9 Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Fri, 1 Nov 2024 21:54:55 -0600 Subject: [PATCH] Resolve pipelines from template if lazy rollover write (#116031) If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: #112781 --- docs/changelog/116031.yaml | 6 + .../test/ingest/310_reroute_processor.yml | 324 +++++++++++++++++- .../bulk/TransportAbstractBulkAction.java | 10 +- .../elasticsearch/ingest/IngestService.java | 58 +++- .../ingest/IngestServiceTests.java | 139 ++++++++ 5 files changed, 523 insertions(+), 14 deletions(-) create mode 100644 docs/changelog/116031.yaml diff --git a/docs/changelog/116031.yaml b/docs/changelog/116031.yaml new file mode 100644 index 0000000000000..e30552bf3b513 --- /dev/null +++ b/docs/changelog/116031.yaml @@ -0,0 +1,6 @@ +pr: 116031 +summary: Resolve pipelines from template on lazy rollover write +area: Data streams +type: bug +issues: + - 112781 diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml index 53229290da03e..5b7e6cff63b31 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml @@ -36,7 +36,10 @@ teardown: ingest.delete_pipeline: id: "pipeline-2" ignore: 404 - + - do: + indices.delete_data_stream: + name: "data-stream-*" + expand_wildcards: all --- "Test first matching router terminates pipeline": - skip: @@ -252,3 +255,322 @@ teardown: - match: { _source.existing-field : true } - match: { _source.added-in-pipeline-before-reroute : true } - match: { _source.added-in-pipeline-after-reroute : true } + +--- +"Test data stream with lazy rollover obtains pipeline from template": + # This test starts with chain of reroutes, from data-stream-1, to data-stream-2, to data-stream-3 + # We then add higher priority templates that remove the reroute processors. Then we show that + # after a lazy rollover on data-stream-2, a document written to data-stream-1 still gets rerouted + # to data-steam-2, but not on to data-stream-3. Finally, after a lazy rollover on data-stream-1 + # causes the new template to also take effect on data-stream-1, and the last write goes directly + # into data-stream-1. Multiple reroute steps are tested because pipeline resolution using a + # different code path for initial index and indices after a reroute. + + # start with pipeline that reroutes from ds-1 to ds-2 + - do: + ingest.put_pipeline: + id: "reroute-1" + body: > + { + "processors": [ + { + "reroute" : {"destination": "data-stream-2"} + } + ] + } + - match: { acknowledged: true } + + # and pipeline that reroutes from ds-2 to ds-3 + - do: + ingest.put_pipeline: + id: "reroute-2" + body: > + { + "processors": [ + { + "reroute" : {"destination": "data-stream-3"} + } + ] + } + - match: { acknowledged: true } + + # set pipelines in templates + - do: + indices.put_index_template: + name: template-1 + body: + index_patterns: [ "data-stream-1"] + priority: 1 + data_stream: { } + template: + settings: + index.default_pipeline: "reroute-1" + - match: { acknowledged: true } + - do: + indices.put_index_template: + name: template-2 + body: + index_patterns: [ "data-stream-2"] + priority: 1 + data_stream: { } + template: + settings: + index.default_pipeline: "reroute-2" + - match: { acknowledged: true } + - do: + indices.put_index_template: + name: template_3 + body: + index_patterns: [ "data-stream-3" ] + priority: 1 + data_stream: { } + - match: { acknowledged: true } + + - do: + indices.create_data_stream: + name: data-stream-1 + - match: { acknowledged: true } + - do: + indices.create_data_stream: + name: data-stream-2 + - match: { acknowledged: true } + - do: + indices.create_data_stream: + name: data-stream-3 + - match: { acknowledged: true } + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 1 + - do: + indices.refresh: + index: data-stream-3 + + # document is rerouted to ds-3 + - do: + search: + index: data-stream-3 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._source.some-field: 1 } + + # add higher priority templates without reroute processors + - do: + indices.put_index_template: + name: template_4 + body: + index_patterns: [ "data-stream-1" ] + priority: 2 # higher priority + data_stream: { } + - match: { acknowledged: true } + - do: + indices.put_index_template: + name: template_5 + body: + index_patterns: [ "data-stream-2" ] + priority: 2 # higher priority + data_stream: { } + - match: { acknowledged: true } + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 2 + - do: + indices.refresh: + index: data-stream-3 + + # still rerouted because ds-1 and ds-2 rolled over + - do: + search: + index: data-stream-3 + body: { query: { match_all: { } } } + - length: { hits.hits: 2 } + + # perform lazy rollover on ds-2 + - do: + indices.rollover: + alias: data-stream-2 + lazy: true + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 3 + - do: + indices.refresh: + index: data-stream-2 + + # written to ds-2, as rerouted to ds-2, but not on to ds-3 + - do: + search: + index: data-stream-2 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._source.some-field: 3 } + + # perform lazy rollover on 1 + - do: + indices.rollover: + alias: data-stream-1 + lazy: true + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 4 + - do: + indices.refresh: + index: data-stream-1 + + # written to ds-1, as not rerouted to ds-2 + - do: + search: + index: data-stream-1 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._source.some-field: 4 } + +--- +"Test remove then add reroute processor with and without lazy rollover": + # start with pipeline that reroutes from ds-1 to ds-2 + - do: + ingest.put_pipeline: + id: "reroute-1" + body: > + { + "processors": [ + { + "reroute" : {"destination": "data-stream-2"} + } + ] + } + - match: { acknowledged: true } + + # set pipelines in templates + - do: + indices.put_index_template: + name: template-1 + body: + index_patterns: [ "data-stream-1"] + priority: 1 + data_stream: { } + template: + settings: + index.default_pipeline: "reroute-1" + - match: { acknowledged: true } + - do: + indices.put_index_template: + name: template_2 + body: + index_patterns: [ "data-stream-2" ] + priority: 1 + data_stream: { } + - match: { acknowledged: true } + + - do: + indices.create_data_stream: + name: data-stream-1 + - match: { acknowledged: true } + + - do: + indices.create_data_stream: + name: data-stream-2 + - match: { acknowledged: true } + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 1 + - do: + indices.refresh: + index: data-stream-2 + + # document is rerouted to ds-2 + - do: + search: + index: data-stream-2 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + + # add higher priority templates without reroute processors + - do: + indices.put_index_template: + name: template_3 + body: + index_patterns: [ "data-stream-1" ] + priority: 2 # higher priority + data_stream: { } + - match: { acknowledged: true } + + # perform lazy rollover on ds-2 + - do: + indices.rollover: + alias: data-stream-1 + lazy: true + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 2 + - do: + indices.refresh: + index: data-stream-1 + + # written to ds-1, as not rerouted to ds-2 + - do: + search: + index: data-stream-1 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + + # add another higher priority templates with reroute processors + - do: + indices.put_index_template: + name: template-3 + body: + index_patterns: [ "data-stream-1" ] + priority: 3 + data_stream: { } + template: + settings: + index.default_pipeline: "reroute-1" + - match: { acknowledged: true } + + # don't do a lazy rollover + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 3 + - do: + indices.refresh: + index: data-stream-1 + + # because no lazy rollover, still no reroute processor + - do: + search: + index: data-stream-1 + body: { query: { match_all: { } } } + - length: { hits.hits: 2 } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 111e4d72c57c6..e83bca4b661c9 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -228,10 +228,18 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec metadata = clusterService.state().getMetadata(); } + Map resolvedPipelineCache = new HashMap<>(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); if (indexRequest != null) { - IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata); + if (indexRequest.isPipelineResolved() == false) { + var pipeline = resolvedPipelineCache.computeIfAbsent( + indexRequest.index(), + // TODO perhaps this should use `threadPool.absoluteTimeInMillis()`, but leaving as is for now. + (index) -> IngestService.resolvePipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis()) + ); + IngestService.setPipelineOnRequest(indexRequest, pipeline); + } hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b5ac54b018e46..ce61f197b4831 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -282,26 +282,60 @@ static void resolvePipelinesAndUpdateIndexRequest( final Metadata metadata, final long epochMillis ) { - if (indexRequest.isPipelineResolved()) { - return; + if (indexRequest.isPipelineResolved() == false) { + var pipelines = resolvePipelines(originalRequest, indexRequest, metadata, epochMillis); + setPipelineOnRequest(indexRequest, pipelines); } + } - /* - * Here we look for the pipelines associated with the index if the index exists. If the index does not exist we fall back to using - * templates to find the pipelines. - */ - final Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or( - () -> resolvePipelinesFromIndexTemplates(indexRequest, metadata) - ).orElse(Pipelines.NO_PIPELINES_DEFINED); + static boolean isRolloverOnWrite(Metadata metadata, IndexRequest indexRequest) { + DataStream dataStream = metadata.dataStreams().get(indexRequest.index()); + if (dataStream == null) { + return false; + } + return dataStream.getBackingIndices().isRolloverOnWrite(); + } + /** + * Resolve the default and final pipelines from the cluster state metadata or index templates. + * + * @param originalRequest initial request + * @param indexRequest the index request, which could be different from the initial request if rerouted + * @param metadata cluster data metadata + * @param epochMillis current time for index name resolution + * @return the resolved pipelines + */ + public static Pipelines resolvePipelines( + final DocWriteRequest originalRequest, + final IndexRequest indexRequest, + final Metadata metadata, + final long epochMillis + ) { + if (isRolloverOnWrite(metadata, indexRequest)) { + return resolvePipelinesFromIndexTemplates(indexRequest, metadata) // + .orElse(Pipelines.NO_PIPELINES_DEFINED); + } else { + return resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis) // + .or(() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)) // + .orElse(Pipelines.NO_PIPELINES_DEFINED); + } + } + + /** + * Set the request pipeline on the index request if present, otherwise set the default pipeline. + * Always set the final pipeline. + * @param indexRequest the index request + * @param resolvedPipelines default and final pipelines resolved from metadata and templates + */ + public static void setPipelineOnRequest(IndexRequest indexRequest, Pipelines resolvedPipelines) { // The pipeline coming as part of the request always has priority over the resolved one from metadata or templates String requestPipeline = indexRequest.getPipeline(); if (requestPipeline != null) { indexRequest.setPipeline(requestPipeline); } else { - indexRequest.setPipeline(pipelines.defaultPipeline); + indexRequest.setPipeline(resolvedPipelines.defaultPipeline); } - indexRequest.setFinalPipeline(pipelines.finalPipeline); + indexRequest.setFinalPipeline(resolvedPipelines.finalPipeline); indexRequest.isPipelineResolved(true); } @@ -1507,7 +1541,7 @@ public static boolean hasPipeline(IndexRequest indexRequest) { || NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; } - private record Pipelines(String defaultPipeline, String finalPipeline) { + public record Pipelines(String defaultPipeline, String finalPipeline) { private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index b3ddc313eaf3a..78baa1699df00 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.VersionType; @@ -2506,6 +2507,144 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { } } + public void testRolloverOnWrite() { + { // false if not data stream + IndexMetadata.Builder builder = IndexMetadata.builder("idx") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0); + Metadata metadata = Metadata.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); + assertFalse(IngestService.isRolloverOnWrite(metadata, indexRequest)); + } + + { // false if not rollover on write + var backingIndex = ".ds-data-stream-01"; + var indexUUID = randomUUID(); + + var dataStream = DataStream.builder( + "no-rollover-data-stream", + DataStream.DataStreamIndices.backingIndicesBuilder(List.of(new Index(backingIndex, indexUUID))) + .setRolloverOnWrite(false) + .build() + ).build(); + + Metadata metadata = Metadata.builder().dataStreams(Map.of(dataStream.getName(), dataStream), Map.of()).build(); + + IndexRequest indexRequest = new IndexRequest("no-rollover-data-stream"); + assertFalse(IngestService.isRolloverOnWrite(metadata, indexRequest)); + } + + { // true if rollover on write + var backingIndex = ".ds-data-stream-01"; + var indexUUID = randomUUID(); + + var dataStream = DataStream.builder( + "rollover-data-stream", + DataStream.DataStreamIndices.backingIndicesBuilder(List.of(new Index(backingIndex, indexUUID))) + .setRolloverOnWrite(true) + .build() + ).build(); + + Metadata metadata = Metadata.builder().dataStreams(Map.of(dataStream.getName(), dataStream), Map.of()).build(); + + IndexRequest indexRequest = new IndexRequest("rollover-data-stream"); + assertTrue(IngestService.isRolloverOnWrite(metadata, indexRequest)); + } + } + + public void testResolveFromTemplateIfRolloverOnWrite() { + { // if rolloverOnWrite is false, get pipeline from metadata + var backingIndex = ".ds-data-stream-01"; + var indexUUID = randomUUID(); + + var dataStream = DataStream.builder( + "no-rollover-data-stream", + DataStream.DataStreamIndices.backingIndicesBuilder(List.of(new Index(backingIndex, indexUUID))) + .setRolloverOnWrite(false) + .build() + ).build(); + + IndexMetadata indexMetadata = IndexMetadata.builder(backingIndex) + .settings( + settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "metadata-pipeline") + .put(IndexMetadata.SETTING_INDEX_UUID, indexUUID) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Metadata metadata = Metadata.builder() + .indices(Map.of(backingIndex, indexMetadata)) + .dataStreams(Map.of(dataStream.getName(), dataStream), Map.of()) + .build(); + + IndexRequest indexRequest = new IndexRequest("no-rollover-data-stream"); + IngestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest, metadata); + assertTrue(hasPipeline(indexRequest)); + assertTrue(indexRequest.isPipelineResolved()); + assertThat(indexRequest.getPipeline(), equalTo("metadata-pipeline")); + } + + { // if rolloverOnWrite is true, get pipeline from template + var backingIndex = ".ds-data-stream-01"; + var indexUUID = randomUUID(); + + var dataStream = DataStream.builder( + "rollover-data-stream", + DataStream.DataStreamIndices.backingIndicesBuilder(List.of(new Index(backingIndex, indexUUID))) + .setRolloverOnWrite(true) + .build() + ).build(); + + IndexMetadata indexMetadata = IndexMetadata.builder(backingIndex) + .settings( + settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "metadata-pipeline") + .put(IndexMetadata.SETTING_INDEX_UUID, indexUUID) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder("name1") + .patterns(List.of("rollover*")) + .settings(settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "template-pipeline")); + + Metadata metadata = Metadata.builder() + .put(templateBuilder) + .indices(Map.of(backingIndex, indexMetadata)) + .dataStreams(Map.of(dataStream.getName(), dataStream), Map.of()) + .build(); + + IndexRequest indexRequest = new IndexRequest("rollover-data-stream"); + IngestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest, metadata); + assertTrue(hasPipeline(indexRequest)); + assertTrue(indexRequest.isPipelineResolved()); + assertThat(indexRequest.getPipeline(), equalTo("template-pipeline")); + } + } + + public void testSetPipelineOnRequest() { + { + // with request pipeline + var indexRequest = new IndexRequest("idx").setPipeline("request"); + var pipelines = new IngestService.Pipelines("default", "final"); + IngestService.setPipelineOnRequest(indexRequest, pipelines); + assertTrue(indexRequest.isPipelineResolved()); + assertEquals(indexRequest.getPipeline(), "request"); + assertEquals(indexRequest.getFinalPipeline(), "final"); + } + { + // no request pipeline + var indexRequest = new IndexRequest("idx"); + var pipelines = new IngestService.Pipelines("default", "final"); + IngestService.setPipelineOnRequest(indexRequest, pipelines); + assertTrue(indexRequest.isPipelineResolved()); + assertEquals(indexRequest.getPipeline(), "default"); + assertEquals(indexRequest.getFinalPipeline(), "final"); + } + } + public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception { var randomMap = randomMap(10, 50, IngestServiceTests::randomMapEntry);