diff --git a/docs/changelog/116031.yaml b/docs/changelog/116031.yaml new file mode 100644 index 0000000000000..e30552bf3b513 --- /dev/null +++ b/docs/changelog/116031.yaml @@ -0,0 +1,6 @@ +pr: 116031 +summary: Resolve pipelines from template on lazy rollover write +area: Data streams +type: bug +issues: + - 112781 diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml index 53229290da03e..5b7e6cff63b31 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml @@ -36,7 +36,10 @@ teardown: ingest.delete_pipeline: id: "pipeline-2" ignore: 404 - + - do: + indices.delete_data_stream: + name: "data-stream-*" + expand_wildcards: all --- "Test first matching router terminates pipeline": - skip: @@ -252,3 +255,322 @@ teardown: - match: { _source.existing-field : true } - match: { _source.added-in-pipeline-before-reroute : true } - match: { _source.added-in-pipeline-after-reroute : true } + +--- +"Test data stream with lazy rollover obtains pipeline from template": + # This test starts with chain of reroutes, from data-stream-1, to data-stream-2, to data-stream-3 + # We then add higher priority templates that remove the reroute processors. Then we show that + # after a lazy rollover on data-stream-2, a document written to data-stream-1 still gets rerouted + # to data-steam-2, but not on to data-stream-3. Finally, after a lazy rollover on data-stream-1 + # causes the new template to also take effect on data-stream-1, and the last write goes directly + # into data-stream-1. Multiple reroute steps are tested because pipeline resolution using a + # different code path for initial index and indices after a reroute. + + # start with pipeline that reroutes from ds-1 to ds-2 + - do: + ingest.put_pipeline: + id: "reroute-1" + body: > + { + "processors": [ + { + "reroute" : {"destination": "data-stream-2"} + } + ] + } + - match: { acknowledged: true } + + # and pipeline that reroutes from ds-2 to ds-3 + - do: + ingest.put_pipeline: + id: "reroute-2" + body: > + { + "processors": [ + { + "reroute" : {"destination": "data-stream-3"} + } + ] + } + - match: { acknowledged: true } + + # set pipelines in templates + - do: + indices.put_index_template: + name: template-1 + body: + index_patterns: [ "data-stream-1"] + priority: 1 + data_stream: { } + template: + settings: + index.default_pipeline: "reroute-1" + - match: { acknowledged: true } + - do: + indices.put_index_template: + name: template-2 + body: + index_patterns: [ "data-stream-2"] + priority: 1 + data_stream: { } + template: + settings: + index.default_pipeline: "reroute-2" + - match: { acknowledged: true } + - do: + indices.put_index_template: + name: template_3 + body: + index_patterns: [ "data-stream-3" ] + priority: 1 + data_stream: { } + - match: { acknowledged: true } + + - do: + indices.create_data_stream: + name: data-stream-1 + - match: { acknowledged: true } + - do: + indices.create_data_stream: + name: data-stream-2 + - match: { acknowledged: true } + - do: + indices.create_data_stream: + name: data-stream-3 + - match: { acknowledged: true } + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 1 + - do: + indices.refresh: + index: data-stream-3 + + # document is rerouted to ds-3 + - do: + search: + index: data-stream-3 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._source.some-field: 1 } + + # add higher priority templates without reroute processors + - do: + indices.put_index_template: + name: template_4 + body: + index_patterns: [ "data-stream-1" ] + priority: 2 # higher priority + data_stream: { } + - match: { acknowledged: true } + - do: + indices.put_index_template: + name: template_5 + body: + index_patterns: [ "data-stream-2" ] + priority: 2 # higher priority + data_stream: { } + - match: { acknowledged: true } + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 2 + - do: + indices.refresh: + index: data-stream-3 + + # still rerouted because ds-1 and ds-2 rolled over + - do: + search: + index: data-stream-3 + body: { query: { match_all: { } } } + - length: { hits.hits: 2 } + + # perform lazy rollover on ds-2 + - do: + indices.rollover: + alias: data-stream-2 + lazy: true + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 3 + - do: + indices.refresh: + index: data-stream-2 + + # written to ds-2, as rerouted to ds-2, but not on to ds-3 + - do: + search: + index: data-stream-2 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._source.some-field: 3 } + + # perform lazy rollover on 1 + - do: + indices.rollover: + alias: data-stream-1 + lazy: true + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 4 + - do: + indices.refresh: + index: data-stream-1 + + # written to ds-1, as not rerouted to ds-2 + - do: + search: + index: data-stream-1 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._source.some-field: 4 } + +--- +"Test remove then add reroute processor with and without lazy rollover": + # start with pipeline that reroutes from ds-1 to ds-2 + - do: + ingest.put_pipeline: + id: "reroute-1" + body: > + { + "processors": [ + { + "reroute" : {"destination": "data-stream-2"} + } + ] + } + - match: { acknowledged: true } + + # set pipelines in templates + - do: + indices.put_index_template: + name: template-1 + body: + index_patterns: [ "data-stream-1"] + priority: 1 + data_stream: { } + template: + settings: + index.default_pipeline: "reroute-1" + - match: { acknowledged: true } + - do: + indices.put_index_template: + name: template_2 + body: + index_patterns: [ "data-stream-2" ] + priority: 1 + data_stream: { } + - match: { acknowledged: true } + + - do: + indices.create_data_stream: + name: data-stream-1 + - match: { acknowledged: true } + + - do: + indices.create_data_stream: + name: data-stream-2 + - match: { acknowledged: true } + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 1 + - do: + indices.refresh: + index: data-stream-2 + + # document is rerouted to ds-2 + - do: + search: + index: data-stream-2 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + + # add higher priority templates without reroute processors + - do: + indices.put_index_template: + name: template_3 + body: + index_patterns: [ "data-stream-1" ] + priority: 2 # higher priority + data_stream: { } + - match: { acknowledged: true } + + # perform lazy rollover on ds-2 + - do: + indices.rollover: + alias: data-stream-1 + lazy: true + + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 2 + - do: + indices.refresh: + index: data-stream-1 + + # written to ds-1, as not rerouted to ds-2 + - do: + search: + index: data-stream-1 + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + + # add another higher priority templates with reroute processors + - do: + indices.put_index_template: + name: template-3 + body: + index_patterns: [ "data-stream-1" ] + priority: 3 + data_stream: { } + template: + settings: + index.default_pipeline: "reroute-1" + - match: { acknowledged: true } + + # don't do a lazy rollover + # write to ds-1 + - do: + index: + index: data-stream-1 + body: + '@timestamp': '2020-12-12' + some-field: 3 + - do: + indices.refresh: + index: data-stream-1 + + # because no lazy rollover, still no reroute processor + - do: + search: + index: data-stream-1 + body: { query: { match_all: { } } } + - length: { hits.hits: 2 } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 111e4d72c57c6..e83bca4b661c9 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -228,10 +228,18 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec metadata = clusterService.state().getMetadata(); } + Map resolvedPipelineCache = new HashMap<>(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); if (indexRequest != null) { - IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata); + if (indexRequest.isPipelineResolved() == false) { + var pipeline = resolvedPipelineCache.computeIfAbsent( + indexRequest.index(), + // TODO perhaps this should use `threadPool.absoluteTimeInMillis()`, but leaving as is for now. + (index) -> IngestService.resolvePipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis()) + ); + IngestService.setPipelineOnRequest(indexRequest, pipeline); + } hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 6f78302c2de9c..3ff07c8f1c86d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -279,26 +279,60 @@ static void resolvePipelinesAndUpdateIndexRequest( final Metadata metadata, final long epochMillis ) { - if (indexRequest.isPipelineResolved()) { - return; + if (indexRequest.isPipelineResolved() == false) { + var pipelines = resolvePipelines(originalRequest, indexRequest, metadata, epochMillis); + setPipelineOnRequest(indexRequest, pipelines); } + } - /* - * Here we look for the pipelines associated with the index if the index exists. If the index does not exist we fall back to using - * templates to find the pipelines. - */ - final Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or( - () -> resolvePipelinesFromIndexTemplates(indexRequest, metadata) - ).orElse(Pipelines.NO_PIPELINES_DEFINED); + static boolean isRolloverOnWrite(Metadata metadata, IndexRequest indexRequest) { + DataStream dataStream = metadata.dataStreams().get(indexRequest.index()); + if (dataStream == null) { + return false; + } + return dataStream.getBackingIndices().isRolloverOnWrite(); + } + /** + * Resolve the default and final pipelines from the cluster state metadata or index templates. + * + * @param originalRequest initial request + * @param indexRequest the index request, which could be different from the initial request if rerouted + * @param metadata cluster data metadata + * @param epochMillis current time for index name resolution + * @return the resolved pipelines + */ + public static Pipelines resolvePipelines( + final DocWriteRequest originalRequest, + final IndexRequest indexRequest, + final Metadata metadata, + final long epochMillis + ) { + if (isRolloverOnWrite(metadata, indexRequest)) { + return resolvePipelinesFromIndexTemplates(indexRequest, metadata) // + .orElse(Pipelines.NO_PIPELINES_DEFINED); + } else { + return resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis) // + .or(() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)) // + .orElse(Pipelines.NO_PIPELINES_DEFINED); + } + } + + /** + * Set the request pipeline on the index request if present, otherwise set the default pipeline. + * Always set the final pipeline. + * @param indexRequest the index request + * @param resolvedPipelines default and final pipelines resolved from metadata and templates + */ + public static void setPipelineOnRequest(IndexRequest indexRequest, Pipelines resolvedPipelines) { // The pipeline coming as part of the request always has priority over the resolved one from metadata or templates String requestPipeline = indexRequest.getPipeline(); if (requestPipeline != null) { indexRequest.setPipeline(requestPipeline); } else { - indexRequest.setPipeline(pipelines.defaultPipeline); + indexRequest.setPipeline(resolvedPipelines.defaultPipeline); } - indexRequest.setFinalPipeline(pipelines.finalPipeline); + indexRequest.setFinalPipeline(resolvedPipelines.finalPipeline); indexRequest.isPipelineResolved(true); } @@ -1503,7 +1537,7 @@ public static boolean hasPipeline(IndexRequest indexRequest) { || NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; } - private record Pipelines(String defaultPipeline, String finalPipeline) { + public record Pipelines(String defaultPipeline, String finalPipeline) { private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 799ea5c0a1c74..fe8f09a68045c 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.VersionType; @@ -2644,6 +2645,144 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { } } + public void testRolloverOnWrite() { + { // false if not data stream + IndexMetadata.Builder builder = IndexMetadata.builder("idx") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0); + Metadata metadata = Metadata.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); + assertFalse(IngestService.isRolloverOnWrite(metadata, indexRequest)); + } + + { // false if not rollover on write + var backingIndex = ".ds-data-stream-01"; + var indexUUID = randomUUID(); + + var dataStream = DataStream.builder( + "no-rollover-data-stream", + DataStream.DataStreamIndices.backingIndicesBuilder(List.of(new Index(backingIndex, indexUUID))) + .setRolloverOnWrite(false) + .build() + ).build(); + + Metadata metadata = Metadata.builder().dataStreams(Map.of(dataStream.getName(), dataStream), Map.of()).build(); + + IndexRequest indexRequest = new IndexRequest("no-rollover-data-stream"); + assertFalse(IngestService.isRolloverOnWrite(metadata, indexRequest)); + } + + { // true if rollover on write + var backingIndex = ".ds-data-stream-01"; + var indexUUID = randomUUID(); + + var dataStream = DataStream.builder( + "rollover-data-stream", + DataStream.DataStreamIndices.backingIndicesBuilder(List.of(new Index(backingIndex, indexUUID))) + .setRolloverOnWrite(true) + .build() + ).build(); + + Metadata metadata = Metadata.builder().dataStreams(Map.of(dataStream.getName(), dataStream), Map.of()).build(); + + IndexRequest indexRequest = new IndexRequest("rollover-data-stream"); + assertTrue(IngestService.isRolloverOnWrite(metadata, indexRequest)); + } + } + + public void testResolveFromTemplateIfRolloverOnWrite() { + { // if rolloverOnWrite is false, get pipeline from metadata + var backingIndex = ".ds-data-stream-01"; + var indexUUID = randomUUID(); + + var dataStream = DataStream.builder( + "no-rollover-data-stream", + DataStream.DataStreamIndices.backingIndicesBuilder(List.of(new Index(backingIndex, indexUUID))) + .setRolloverOnWrite(false) + .build() + ).build(); + + IndexMetadata indexMetadata = IndexMetadata.builder(backingIndex) + .settings( + settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "metadata-pipeline") + .put(IndexMetadata.SETTING_INDEX_UUID, indexUUID) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Metadata metadata = Metadata.builder() + .indices(Map.of(backingIndex, indexMetadata)) + .dataStreams(Map.of(dataStream.getName(), dataStream), Map.of()) + .build(); + + IndexRequest indexRequest = new IndexRequest("no-rollover-data-stream"); + IngestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest, metadata); + assertTrue(hasPipeline(indexRequest)); + assertTrue(indexRequest.isPipelineResolved()); + assertThat(indexRequest.getPipeline(), equalTo("metadata-pipeline")); + } + + { // if rolloverOnWrite is true, get pipeline from template + var backingIndex = ".ds-data-stream-01"; + var indexUUID = randomUUID(); + + var dataStream = DataStream.builder( + "rollover-data-stream", + DataStream.DataStreamIndices.backingIndicesBuilder(List.of(new Index(backingIndex, indexUUID))) + .setRolloverOnWrite(true) + .build() + ).build(); + + IndexMetadata indexMetadata = IndexMetadata.builder(backingIndex) + .settings( + settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "metadata-pipeline") + .put(IndexMetadata.SETTING_INDEX_UUID, indexUUID) + ) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder("name1") + .patterns(List.of("rollover*")) + .settings(settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "template-pipeline")); + + Metadata metadata = Metadata.builder() + .put(templateBuilder) + .indices(Map.of(backingIndex, indexMetadata)) + .dataStreams(Map.of(dataStream.getName(), dataStream), Map.of()) + .build(); + + IndexRequest indexRequest = new IndexRequest("rollover-data-stream"); + IngestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest, metadata); + assertTrue(hasPipeline(indexRequest)); + assertTrue(indexRequest.isPipelineResolved()); + assertThat(indexRequest.getPipeline(), equalTo("template-pipeline")); + } + } + + public void testSetPipelineOnRequest() { + { + // with request pipeline + var indexRequest = new IndexRequest("idx").setPipeline("request"); + var pipelines = new IngestService.Pipelines("default", "final"); + IngestService.setPipelineOnRequest(indexRequest, pipelines); + assertTrue(indexRequest.isPipelineResolved()); + assertEquals(indexRequest.getPipeline(), "request"); + assertEquals(indexRequest.getFinalPipeline(), "final"); + } + { + // no request pipeline + var indexRequest = new IndexRequest("idx"); + var pipelines = new IngestService.Pipelines("default", "final"); + IngestService.setPipelineOnRequest(indexRequest, pipelines); + assertTrue(indexRequest.isPipelineResolved()); + assertEquals(indexRequest.getPipeline(), "default"); + assertEquals(indexRequest.getFinalPipeline(), "final"); + } + } + public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception { var randomMap = randomMap(10, 50, IngestServiceTests::randomMapEntry);