diff --git a/modules/data-streams/build.gradle b/modules/data-streams/build.gradle index 10ee9bf842cc4..457c6e10aaf25 100644 --- a/modules/data-streams/build.gradle +++ b/modules/data-streams/build.gradle @@ -43,4 +43,32 @@ if (buildParams.isSnapshotBuild() == false) { tasks.named("yamlRestTestV7CompatTransform").configure({ task -> task.skipTest("data_stream/10_basic/Create hidden data stream", "warning does not exist for compatibility") + + // Failure store configuration changed on 8.18 (earlier versions behind feature flag) + task.skipTest("data_stream/10_basic/Create data stream with failure store", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/10_basic/Delete data stream with failure store", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/10_basic/Delete data stream with failure store uninitialized", "Configuring the failure store via data stream templates is not supported anymore.") + + task.skipTest("data_stream/30_auto_create_data_stream/Don't initialize failure store during data stream auto-creation on successful index", "Configuring the failure store via data stream templates is not supported anymore.") + + task.skipTest("data_stream/150_tsdb/TSDB failures go to failure store", "Configuring the failure store via data stream templates is not supported anymore.") + + task.skipTest("data_stream/170_modify_data_stream/Modify a data stream's failure store", "Configuring the failure store via data stream templates is not supported anymore.") + + task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to original failure store during index change if final pipeline changes target", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/190_failure_store_redirection/Ensure failure is redirected to correct failure store after a reroute processor", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/190_failure_store_redirection/Test failure store status with bulk request", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/190_failure_store_redirection/Redirect ingest failure in data stream to failure store", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to correct failure store when pipeline loop is detected", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to correct failure store when index loop is detected", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to original failure store during index change if self referenced", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/190_failure_store_redirection/Redirect shard failure in data stream to failure store", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/190_failure_store_redirection/Version conflicts are not redirected to failure store", "Configuring the failure store via data stream templates is not supported anymore.") + + task.skipTest("data_stream/200_rollover_failure_store/Lazily roll over a data stream's failure store after an ingest failure", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/200_rollover_failure_store/A failure store marked for lazy rollover should only be rolled over when there is a failure", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/200_rollover_failure_store/Roll over a data stream's failure store without conditions", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/200_rollover_failure_store/Lazily roll over a data stream's failure store after a shard failure", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/200_rollover_failure_store/Don't roll over a data stream's failure store when conditions aren't met", "Configuring the failure store via data stream templates is not supported anymore.") + task.skipTest("data_stream/200_rollover_failure_store/Roll over a data stream's failure store with conditions", "Configuring the failure store via data stream templates is not supported anymore.") }) diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index ff3dd2737f408..70218f17c67fe 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -67,6 +67,7 @@ import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.cluster.metadata.DataStreamAlias; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadataStats; import org.elasticsearch.cluster.metadata.IndexWriteLoad; @@ -2447,9 +2448,10 @@ static void putComposableIndexTemplate( .mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings)) .aliases(aliases) .lifecycle(lifecycle) + .dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(withFailureStore)) ) .metadata(metadata) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, withFailureStore)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .build() ); client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java index c457e3b86184c..5e72de1a9d29a 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.bytes.BytesArray; @@ -286,8 +287,8 @@ private void putComposableIndexTemplate(boolean failureStore) throws IOException request.indexTemplate( ComposableIndexTemplate.builder() .indexPatterns(List.of(dataStream + "*")) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, failureStore)) - .template(new Template(null, new CompressedXContent(""" + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .template(Template.builder().mappings(new CompressedXContent(""" { "dynamic": false, "properties": { @@ -298,7 +299,7 @@ private void putComposableIndexTemplate(boolean failureStore) throws IOException "type": "long" } } - }"""), null)) + }""")).dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(failureStore))) .build() ); client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index 89c440f5edf8b..19067d85a6805 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -1226,9 +1227,10 @@ static void putComposableIndexTemplate( .settings(settings) .mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings)) .lifecycle(lifecycle) + .dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(withFailureStore)) ) .metadata(metadata) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, withFailureStore)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .build() ); client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java index 980cc32a12c68..de6b7a682324e 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/DataStreamOptionsIT.java @@ -11,12 +11,14 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.junit.Before; import java.io.IOException; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -40,10 +42,14 @@ public void setup() throws IOException { "template": { "settings": { "number_of_replicas": 0 + }, + "data_stream_options": { + "failure_store": { + "enabled": true + } } }, "data_stream": { - "failure_store": true } } """); @@ -59,12 +65,63 @@ public void setup() throws IOException { assertThat(dataStreams.size(), is(1)); Map dataStream = (Map) dataStreams.get(0); assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME)); + assertThat(((Map) dataStream.get("failure_store")).get("enabled"), is(true)); List backingIndices = getIndices(dataStream); assertThat(backingIndices.size(), is(1)); List failureStore = getFailureStore(dataStream); assertThat(failureStore.size(), is(1)); } + public void testExplicitlyResetDataStreamOptions() throws IOException { + Request putComponentTemplateRequest = new Request("POST", "/_component_template/with-options"); + putComponentTemplateRequest.setJsonEntity(""" + { + "template": { + "data_stream_options": { + "failure_store": { + "enabled": true + } + } + } + } + """); + assertOK(client().performRequest(putComponentTemplateRequest)); + + Request invalidRequest = new Request("POST", "/_index_template/other-template"); + invalidRequest.setJsonEntity(""" + { + "index_patterns": ["something-else"], + "composed_of" : ["with-options"], + "template": { + "settings": { + "number_of_replicas": 0 + } + } + } + """); + Exception error = expectThrows(ResponseException.class, () -> client().performRequest(invalidRequest)); + assertThat( + error.getMessage(), + containsString("specifies data stream options that can only be used in combination with a data stream") + ); + + // Check that when we nullify the data stream options we can create use any component template in a non data stream template + Request otherRequest = new Request("POST", "/_index_template/other-template"); + otherRequest.setJsonEntity(""" + { + "index_patterns": ["something-else"], + "composed_of" : ["with-options"], + "template": { + "settings": { + "number_of_replicas": 0 + }, + "data_stream_options": null + } + } + """); + assertOK(client().performRequest(otherRequest)); + } + public void testEnableDisableFailureStore() throws IOException { { assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options"))); diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java index 20ec26c0c9341..85b914be30b2c 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java @@ -42,10 +42,14 @@ public void setup() throws IOException { "template": { "settings": { "number_of_replicas": 0 + }, + "data_stream_options": { + "failure_store": { + "enabled": true + } } }, "data_stream": { - "failure_store": true } } """); diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml index 044ea90fec1af..0d19f555d10a4 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/10_basic.yml @@ -212,8 +212,12 @@ setup: --- "Create data stream with failure store": - requires: - cluster_features: ["gte_v8.15.0"] - reason: "data stream failure stores default settings changed in 8.15+" + test_runner_features: [ capabilities, allowed_warnings ] + reason: "Data stream failure stores config in templates was added in 8.18+" + capabilities: + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] - do: ingest.put_pipeline: @@ -256,8 +260,7 @@ setup: name: my-template4 body: index_patterns: [ failure-data-stream1, failure-data-stream2 ] - data_stream: - failure_store: true + data_stream: {} template: settings: index: @@ -269,6 +272,9 @@ setup: type: date count: type: long + data_stream_options: + failure_store: + enabled: true - do: indices.create_data_stream: @@ -632,8 +638,12 @@ setup: --- "Delete data stream with failure store": - requires: - cluster_features: ["gte_v8.15.0"] - reason: "data stream failure stores REST structure changed in 8.15+" + reason: "Data stream failure stores config in templates was added in 8.18+" + test_runner_features: [ allowed_warnings, capabilities ] + capabilities: + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] - do: allowed_warnings: @@ -642,8 +652,7 @@ setup: name: my-template4 body: index_patterns: [ failure-data-stream1 ] - data_stream: - failure_store: true + data_stream: {} template: mappings: properties: @@ -651,6 +660,9 @@ setup: type: date count: type: long + data_stream_options: + failure_store: + enabled: true - do: indices.create_data_stream: @@ -722,8 +734,12 @@ setup: --- "Delete data stream with failure store uninitialized": - requires: - cluster_features: ["gte_v8.15.0"] - reason: "data stream failure stores REST structure changed in 8.15+" + reason: "Data stream failure stores config in templates was added in 8.18+" + test_runner_features: [ capabilities, allowed_warnings ] + capabilities: + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] - do: allowed_warnings: @@ -732,8 +748,11 @@ setup: name: my-template4 body: index_patterns: [ failure-data-stream1 ] - data_stream: - failure_store: true + data_stream: {} + template: + data_stream_options: + failure_store: + enabled: true - do: indices.create_data_stream: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml index 3fbf85ab1e702..9ea3bfefabdf8 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml @@ -185,9 +185,12 @@ index without timestamp: --- TSDB failures go to failure store: - requires: - cluster_features: ["data_stream.failure_store.tsdb_fix"] - reason: "tests tsdb failure store fixes in 8.16.0 that catch timestamp errors that happen earlier in the process and redirect them to the failure store." - + reason: "Data stream failure stores config in templates was added in 8.18+" + test_runner_features: [ capabilities, allowed_warnings ] + capabilities: + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] - do: allowed_warnings: - "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation" @@ -195,8 +198,7 @@ TSDB failures go to failure store: name: my-template2 body: index_patterns: [ "fs-k8s*" ] - data_stream: - failure_store: true + data_stream: {} template: settings: index: @@ -207,6 +209,9 @@ TSDB failures go to failure store: time_series: start_time: 2021-04-28T00:00:00Z end_time: 2021-04-29T00:00:00Z + data_stream_options: + failure_store: + enabled: true mappings: properties: "@timestamp": diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml index 3c6d29d939226..13f79e95d99f4 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml @@ -92,9 +92,12 @@ --- "Modify a data stream's failure store": - requires: - cluster_features: ["gte_v8.15.0"] - reason: "data stream failure stores REST structure changed in 8.15+" - test_runner_features: [ "allowed_warnings" ] + reason: "Data stream failure stores config in templates was added in 8.18+" + test_runner_features: [ capabilities, allowed_warnings ] + capabilities: + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] - do: allowed_warnings: @@ -103,8 +106,7 @@ name: my-template body: index_patterns: [data-*] - data_stream: - failure_store: true + data_stream: {} template: mappings: properties: @@ -112,6 +114,9 @@ type: date count: type: long + data_stream_options: + failure_store: + enabled: true - do: indices.create_data_stream: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml index 9b5a9dae8bc0a..2f6b7a0bff34b 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml @@ -1,3 +1,15 @@ +setup: + - requires: + reason: "Data stream options was added in 8.18+" + test_runner_features: [ capabilities, allowed_warnings, contains ] + capabilities: + - method: POST + path: /{index}/_doc + capabilities: [ 'failure_store_status' ] + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] + --- teardown: - do: @@ -32,13 +44,6 @@ teardown: --- "Redirect ingest failure in data stream to failure store": - - requires: - reason: "Failure store status was added in 8.16+" - test_runner_features: [capabilities, allowed_warnings, contains] - capabilities: - - method: POST - path: /{index}/_doc - capabilities: [ 'failure_store_status' ] - do: ingest.put_pipeline: id: "failing_pipeline" @@ -78,14 +83,16 @@ teardown: name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 number_of_replicas: 1 index: default_pipeline: "parent_failing_pipeline" + data_stream_options: + failure_store: + enabled: true - do: index: @@ -147,14 +154,6 @@ teardown: --- "Redirect shard failure in data stream to failure store": - - requires: - reason: "Failure store status was added in 8.16+" - test_runner_features: [ capabilities, allowed_warnings, contains ] - capabilities: - - method: POST - path: /{index}/_doc - capabilities: [ 'failure_store_status' ] - - do: allowed_warnings: - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" @@ -162,8 +161,7 @@ teardown: name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 @@ -174,7 +172,9 @@ teardown: type: date count: type: long - + data_stream_options: + failure_store: + enabled: true - do: index: @@ -231,13 +231,6 @@ teardown: --- "Ensure failure is redirected to correct failure store after a reroute processor": - - requires: - test_runner_features: [allowed_warnings, capabilities] - reason: "Failure store status was added in 8.16+" - capabilities: - - method: POST - path: /{index}/_doc - capabilities: [ 'failure_store_status' ] - do: ingest.put_pipeline: id: "failing_pipeline" @@ -262,14 +255,16 @@ teardown: name: destination_template body: index_patterns: destination-data-stream - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 number_of_replicas: 1 index: default_pipeline: "failing_pipeline" + data_stream_options: + failure_store: + enabled: true - do: indices.create_data_stream: @@ -331,11 +326,6 @@ teardown: --- "Failure redirects to original failure store during index change if self referenced": - - requires: - cluster_features: [ "gte_v8.15.0" ] - reason: "data stream failure stores REST structure changed in 8.15+" - test_runner_features: [ allowed_warnings, contains ] - - do: ingest.put_pipeline: id: "failing_pipeline" @@ -365,14 +355,16 @@ teardown: name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 number_of_replicas: 1 index: default_pipeline: "failing_pipeline" + data_stream_options: + failure_store: + enabled: true - do: index: @@ -430,14 +422,6 @@ teardown: --- "Failure redirects to original failure store during index change if final pipeline changes target": - - requires: - reason: "Failure store status was added in 8.16+" - test_runner_features: [ capabilities, allowed_warnings, contains ] - capabilities: - - method: POST - path: /{index}/_doc - capabilities: [ 'failure_store_status' ] - - do: ingest.put_pipeline: id: "change_index_pipeline" @@ -462,14 +446,16 @@ teardown: name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 number_of_replicas: 1 index: final_pipeline: "change_index_pipeline" + data_stream_options: + failure_store: + enabled: true - do: index: @@ -526,14 +512,6 @@ teardown: --- "Failure redirects to correct failure store when index loop is detected": - - requires: - reason: "Failure store status was added in 8.16+" - test_runner_features: [ capabilities, allowed_warnings, contains ] - capabilities: - - method: POST - path: /{index}/_doc - capabilities: [ 'failure_store_status' ] - - do: ingest.put_pipeline: id: "send_to_destination" @@ -575,14 +553,16 @@ teardown: name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 number_of_replicas: 1 index: default_pipeline: "send_to_destination" + data_stream_options: + failure_store: + enabled: true - do: allowed_warnings: @@ -591,14 +571,16 @@ teardown: name: destination_logs_template body: index_patterns: destination-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 number_of_replicas: 1 index: default_pipeline: "send_back_to_original" + data_stream_options: + failure_store: + enabled: true - do: index: @@ -657,14 +639,6 @@ teardown: --- "Failure redirects to correct failure store when pipeline loop is detected": - - requires: - reason: "Failure store status was added in 8.16+" - test_runner_features: [ capabilities, allowed_warnings, contains ] - capabilities: - - method: POST - path: /{index}/_doc - capabilities: [ 'failure_store_status' ] - - do: ingest.put_pipeline: id: "step_1" @@ -706,14 +680,16 @@ teardown: name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 number_of_replicas: 1 index: default_pipeline: "step_1" + data_stream_options: + failure_store: + enabled: true - do: index: @@ -773,9 +749,6 @@ teardown: --- "Version conflicts are not redirected to failure store": - - requires: - test_runner_features: [ allowed_warnings] - - do: allowed_warnings: - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" @@ -783,8 +756,7 @@ teardown: name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 @@ -795,6 +767,9 @@ teardown: type: date count: type: long + data_stream_options: + failure_store: + enabled: true - do: bulk: @@ -812,17 +787,6 @@ teardown: --- "Test failure store status with bulk request": - - requires: - test_runner_features: [ allowed_warnings, capabilities ] - reason: "Failure store status was added in 8.16+" - capabilities: - - method: POST - path: /_bulk - capabilities: [ 'failure_store_status' ] - - method: PUT - path: /_bulk - capabilities: [ 'failure_store_status' ] - - do: allowed_warnings: - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" @@ -830,8 +794,7 @@ teardown: name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 @@ -842,6 +805,9 @@ teardown: type: date count: type: long + data_stream_options: + failure_store: + enabled: true - do: allowed_warnings: - "index template [no-fs] has index patterns [no-fs*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [no-fs] will take precedence during new index creation" @@ -849,8 +815,7 @@ teardown: name: no-fs body: index_patterns: no-fs* - data_stream: - failure_store: false + data_stream: {} template: settings: number_of_shards: 1 @@ -861,6 +826,9 @@ teardown: type: date count: type: long + data_stream_options: + failure_store: + enabled: false - do: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml index 0742435f045fb..cc3a11ffde5e8 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/200_rollover_failure_store.yml @@ -1,9 +1,15 @@ --- setup: - requires: - cluster_features: ["gte_v8.15.0"] - reason: "data stream failure stores REST structure changed in 8.15+" - test_runner_features: [allowed_warnings, contains, capabilities] + reason: "Data stream failure stores config in templates was added in 8.16+" + test_runner_features: [ capabilities, allowed_warnings ] + capabilities: + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] + - method: POST + path: /{index}/_rollover + capabilities: [ 'lazy-rollover-failure-store' ] - do: allowed_warnings: @@ -12,8 +18,7 @@ setup: name: my-template body: index_patterns: [data-*] - data_stream: - failure_store: true + data_stream: {} template: mappings: properties: @@ -21,6 +26,9 @@ setup: type: date count: type: long + data_stream_options: + failure_store: + enabled: true - do: indices.create_data_stream: @@ -145,14 +153,6 @@ teardown: --- "Lazily roll over a data stream's failure store after a shard failure": - - requires: - reason: "data stream failure store lazy rollover only supported in 8.15+" - test_runner_features: [allowed_warnings, capabilities] - capabilities: - - method: POST - path: /{index}/_rollover - capabilities: [lazy-rollover-failure-store] - # Initialize failure store - do: index: @@ -215,14 +215,6 @@ teardown: --- "Lazily roll over a data stream's failure store after an ingest failure": - - requires: - reason: "data stream failure store lazy rollover only supported in 8.15+" - test_runner_features: [allowed_warnings, capabilities] - capabilities: - - method: POST - path: /{index}/_rollover - capabilities: [lazy-rollover-failure-store] - - do: ingest.put_pipeline: id: "failing_pipeline" @@ -246,12 +238,14 @@ teardown: name: my-template body: index_patterns: [data-*] - data_stream: - failure_store: true + data_stream: {} template: settings: index: default_pipeline: "failing_pipeline" + data_stream_options: + failure_store: + enabled: true - do: indices.create_data_stream: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml index 61d17c3d675cf..60500767213af 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/30_auto_create_data_stream.yml @@ -50,9 +50,12 @@ --- "Don't initialize failure store during data stream auto-creation on successful index": - requires: - cluster_features: ["gte_v8.15.0"] - reason: "data stream failure stores REST structure changed in 8.15+" - test_runner_features: allowed_warnings + reason: "Data stream failure stores config in templates was added in 8.18+" + test_runner_features: [allowed_warnings, capabilities] + capabilities: + - method: POST + path: /_index_template/{template} + capabilities: [ 'failure_store_in_template' ] - do: allowed_warnings: @@ -61,12 +64,14 @@ name: generic_logs_template body: index_patterns: logs-* - data_stream: - failure_store: true + data_stream: {} template: settings: number_of_shards: 1 number_of_replicas: 1 + data_stream_options: + failure_store: + enabled: true - do: index: diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 2811c841db7a3..825b49b50c4ed 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -141,6 +141,8 @@ static TransportVersion def(int id) { public static final TransportVersion LOGSDB_TELEMETRY_CUSTOM_CUTOFF_DATE = def(8_801_00_0); public static final TransportVersion SOURCE_MODE_TELEMETRY = def(8_802_00_0); public static final TransportVersion NEW_REFRESH_CLUSTER_BLOCK = def(8_803_00_0); + public static final TransportVersion RETRIES_AND_OPERATIONS_IN_BLOBSTORE_STATS = def(8_804_00_0); + public static final TransportVersion ADD_DATA_STREAM_OPTIONS_TO_TEMPLATES = def(8_805_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/SimulateIndexTemplateResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/SimulateIndexTemplateResponse.java index 80e6fbfe051a4..a521dac60e96a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/SimulateIndexTemplateResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/SimulateIndexTemplateResponse.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; +import org.elasticsearch.cluster.metadata.ResettableValue; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -111,9 +112,9 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - if (this.resolvedTemplate != null) { + if (resolvedTemplate != null) { builder.field(TEMPLATE.getPreferredName()); - this.resolvedTemplate.toXContent(builder, params, rolloverConfiguration); + resolvedTemplate.toXContent(builder, ResettableValue.hideResetValues(params), rolloverConfiguration); } if (this.overlappingTemplates != null) { builder.startArray(OVERLAPPING.getPreferredName()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java index 94d9b87467ea8..5f98852148ed4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java @@ -61,6 +61,7 @@ import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findConflictingV1Templates; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findConflictingV2Templates; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV2Template; +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveDataStreamOptions; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveLifecycle; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveSettings; @@ -348,7 +349,13 @@ public static Template resolveTemplate( if (template.getDataStreamTemplate() != null && lifecycle == null && isDslOnlyMode) { lifecycle = DataStreamLifecycle.DEFAULT; } - return new Template(settings, mergedMapping, aliasesByName, lifecycle); + return new Template( + settings, + mergedMapping, + aliasesByName, + lifecycle, + resolveDataStreamOptions(simulatedState.metadata(), matchingTemplate) + ); } private static IndexLongFieldRange getEventIngestedRange(String indexName, ClusterState simulatedState) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index cef68324e2a45..e2c73349b93ec 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -42,6 +43,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.core.Nullable; import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; @@ -638,12 +640,16 @@ private static Boolean resolveFailureStoreFromMetadata(String indexName, Metadat } /** - * Determines if an index name is associated with an index template that has a data stream failure store enabled. + * Determines if an index name is associated with an index template that has a data stream failure store enabled. Since failure store is + * a data stream feature, the method returns true/false only if it is a data stream template, otherwise null. * @param indexName The index name to check. * @param metadata Cluster state metadata. - * @return true if the given index name corresponds to an index template with a data stream failure store enabled. + * @return true the associated index template has failure store enabled, false if the failure store is disabled or it's not specified, + * and null if the template is not a data stream template. + * Visible for testing */ - private static Boolean resolveFailureStoreFromTemplate(String indexName, Metadata metadata) { + @Nullable + static Boolean resolveFailureStoreFromTemplate(String indexName, Metadata metadata) { if (indexName == null) { return null; } @@ -656,7 +662,11 @@ private static Boolean resolveFailureStoreFromTemplate(String indexName, Metadat ComposableIndexTemplate composableIndexTemplate = metadata.templatesV2().get(template); if (composableIndexTemplate.getDataStreamTemplate() != null) { // Check if the data stream has the failure store enabled - return composableIndexTemplate.getDataStreamTemplate().hasFailureStore(); + DataStreamOptions dataStreamOptions = MetadataIndexTemplateService.resolveDataStreamOptions( + composableIndexTemplate, + metadata.componentTemplates() + ).mapAndGet(DataStreamOptions.Template::toDataStreamOptions); + return dataStreamOptions != null && dataStreamOptions.isFailureStoreEnabled(); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java index ae7cff6312155..3dd50d4e386d3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java @@ -372,16 +372,17 @@ public static class DataStreamTemplate implements Writeable, ToXContentObject { private static final ParseField HIDDEN = new ParseField("hidden"); private static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing"); + + /** + * Use the {@link DataStreamFailureStore.Template#enabled()} instead + */ + @Deprecated(since = "8.18") private static final ParseField FAILURE_STORE = new ParseField("failure_store"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "data_stream_template", false, - args -> new DataStreamTemplate( - args[0] != null && (boolean) args[0], - args[1] != null && (boolean) args[1], - DataStream.isFailureStoreFeatureFlagEnabled() && args[2] != null && (boolean) args[2] - ) + args -> new DataStreamTemplate(args[0] != null && (boolean) args[0], args[1] != null && (boolean) args[1]) ); static { @@ -394,20 +395,14 @@ public static class DataStreamTemplate implements Writeable, ToXContentObject { private final boolean hidden; private final boolean allowCustomRouting; - private final boolean failureStore; public DataStreamTemplate() { - this(false, false, false); + this(false, false); } public DataStreamTemplate(boolean hidden, boolean allowCustomRouting) { - this(hidden, allowCustomRouting, false); - } - - public DataStreamTemplate(boolean hidden, boolean allowCustomRouting, boolean failureStore) { this.hidden = hidden; this.allowCustomRouting = allowCustomRouting; - this.failureStore = failureStore; } DataStreamTemplate(StreamInput in) throws IOException { @@ -425,10 +420,9 @@ public DataStreamTemplate(boolean hidden, boolean allowCustomRouting, boolean fa boolean value = in.readBoolean(); assert value == false : "expected false, because this used to be an optional enum that never got set"; } - if (in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) { - failureStore = in.readBoolean(); - } else { - failureStore = false; + if (in.getTransportVersion() + .between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS_TO_TEMPLATES)) { + in.readBoolean(); } } @@ -458,10 +452,6 @@ public boolean isAllowCustomRouting() { return allowCustomRouting; } - public boolean hasFailureStore() { - return failureStore; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(hidden); @@ -472,8 +462,11 @@ public void writeTo(StreamOutput out) throws IOException { // See comment in constructor. out.writeBoolean(false); } - if (out.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) { - out.writeBoolean(failureStore); + if (out.getTransportVersion() + .between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS_TO_TEMPLATES)) { + // Previous versions expect the failure store to be configured via the DataStreamTemplate. We add it here, so we don't break + // the serialisation, but we do not care to preserve the value because this feature is still behind a feature flag. + out.writeBoolean(false); } } @@ -482,9 +475,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field("hidden", hidden); builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting); - if (DataStream.isFailureStoreFeatureFlagEnabled()) { - builder.field(FAILURE_STORE.getPreferredName(), failureStore); - } builder.endObject(); return builder; } @@ -494,12 +484,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; DataStreamTemplate that = (DataStreamTemplate) o; - return hidden == that.hidden && allowCustomRouting == that.allowCustomRouting && failureStore == that.failureStore; + return hidden == that.hidden && allowCustomRouting == that.allowCustomRouting; } @Override public int hashCode() { - return Objects.hash(hidden, allowCustomRouting, failureStore); + return Objects.hash(hidden, allowCustomRouting); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 979434950cf7a..1c6206a4815eb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -424,7 +424,7 @@ public boolean isAllowCustomRouting() { * @return true, if the user has explicitly enabled the failure store. */ public boolean isFailureStoreEnabled() { - return dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().isExplicitlyEnabled(); + return dataStreamOptions.isFailureStoreEnabled(); } @Nullable @@ -1188,6 +1188,7 @@ public void writeTo(StreamOutput out) throws IOException { ); // The fields behind the feature flag should always be last. if (DataStream.isFailureStoreFeatureFlagEnabled()) { + // Should be removed after backport PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FAILURE_STORE_FIELD); PARSER.declareObjectArray( ConstructingObjectParser.optionalConstructorArg(), diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStore.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStore.java index e9d32594fa833..5a6217eea8f7b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStore.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStore.java @@ -14,7 +14,9 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -27,11 +29,13 @@ * supports the following configurations only explicitly enabling or disabling the failure store */ public record DataStreamFailureStore(Boolean enabled) implements SimpleDiffable, ToXContentObject { + public static final String FAILURE_STORE = "failure_store"; + public static final String ENABLED = "enabled"; - public static final ParseField ENABLED_FIELD = new ParseField("enabled"); + public static final ParseField ENABLED_FIELD = new ParseField(ENABLED); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "failure_store", + FAILURE_STORE, false, (args, unused) -> new DataStreamFailureStore((Boolean) args[0]) ); @@ -59,13 +63,6 @@ public static Diff readDiffFrom(StreamInput in) throws I return SimpleDiffable.readDiffFrom(DataStreamFailureStore::new, in); } - /** - * @return iff the user has explicitly enabled the failure store - */ - public boolean isExplicitlyEnabled() { - return enabled != null && enabled; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalBoolean(enabled); @@ -89,4 +86,80 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static DataStreamFailureStore fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } + + /** + * This class is only used in template configuration. It wraps the fields of {@link DataStreamFailureStore} with {@link ResettableValue} + * to allow a user to signal when they want to reset any previously encountered values during template composition. Furthermore, it + * provides the method {@link #merge(Template, Template)} that dictates how two templates can be composed. + */ + public record Template(ResettableValue enabled) implements Writeable, ToXContentObject { + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "failure_store_template", + false, + (args, unused) -> new Template(args[0] == null ? ResettableValue.undefined() : (ResettableValue) args[0]) + ); + + static { + PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> p.currentToken() == XContentParser.Token.VALUE_NULL + ? ResettableValue.reset() + : ResettableValue.create(p.booleanValue()), + ENABLED_FIELD, + ObjectParser.ValueType.BOOLEAN_OR_NULL + ); + } + + public Template { + if (enabled.get() == null) { + throw new IllegalArgumentException("Failure store configuration should have at least one non-null configuration value."); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + ResettableValue.write(out, enabled, StreamOutput::writeBoolean); + } + + public static Template read(StreamInput in) throws IOException { + ResettableValue enabled = ResettableValue.read(in, StreamInput::readBoolean); + return new Template(enabled); + } + + /** + * Converts the template to XContent, depending on the XContent.Params set by {@link ResettableValue#hideResetValues(Params)} + * it may or may not display any explicit nulls when the value is to be reset. + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + enabled.toXContent(builder, params, ENABLED_FIELD.getPreferredName()); + builder.endObject(); + return builder; + } + + public static Template fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Returns a template which has the value of the initial template updated with the values of the update. + * Note: for now it's a trivial composition because we have only one non-null field. + * @return the composed template + */ + public static Template merge(Template ignored, Template update) { + return update; + } + + public DataStreamFailureStore toFailureStore() { + return new DataStreamFailureStore(enabled.get()); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java index 9cd4e2625e2ba..51e13c05e6892 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java @@ -14,9 +14,9 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ConstructingObjectParser; -import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -24,6 +24,8 @@ import java.io.IOException; +import static org.elasticsearch.cluster.metadata.DataStreamFailureStore.FAILURE_STORE; + /** * Holds data stream dedicated configuration options such as failure store, (in the future lifecycle). Currently, it * supports the following configurations: @@ -34,10 +36,10 @@ public record DataStreamOptions(@Nullable DataStreamFailureStore failureStore) SimpleDiffable, ToXContentObject { - public static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store"); + public static final ParseField FAILURE_STORE_FIELD = new ParseField(FAILURE_STORE); public static final DataStreamOptions FAILURE_STORE_ENABLED = new DataStreamOptions(new DataStreamFailureStore(true)); public static final DataStreamOptions FAILURE_STORE_DISABLED = new DataStreamOptions(new DataStreamFailureStore(false)); - public static final DataStreamOptions EMPTY = new DataStreamOptions(); + public static final DataStreamOptions EMPTY = new DataStreamOptions(null); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "options", @@ -46,18 +48,13 @@ public record DataStreamOptions(@Nullable DataStreamFailureStore failureStore) ); static { - PARSER.declareField( + PARSER.declareObject( ConstructingObjectParser.optionalConstructorArg(), (p, c) -> DataStreamFailureStore.fromXContent(p), - FAILURE_STORE_FIELD, - ObjectParser.ValueType.OBJECT_OR_NULL + FAILURE_STORE_FIELD ); } - public DataStreamOptions() { - this(null); - } - public static DataStreamOptions read(StreamInput in) throws IOException { return new DataStreamOptions(in.readOptionalWriteable(DataStreamFailureStore::new)); } @@ -66,8 +63,21 @@ public static Diff readDiffFrom(StreamInput in) throws IOExce return SimpleDiffable.readDiffFrom(DataStreamOptions::read, in); } + /** + * @return true if none of the options are defined + */ public boolean isEmpty() { - return this.equals(EMPTY); + return failureStore == null; + } + + /** + * Determines if this data stream has its failure store enabled or not. Currently, the failure store + * is enabled only when a user has explicitly requested it. + * + * @return true, if the user has explicitly enabled the failure store. + */ + public boolean isFailureStoreEnabled() { + return failureStore != null && Boolean.TRUE.equals(failureStore.enabled()); } @Override @@ -93,4 +103,100 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static DataStreamOptions fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } + + /** + * This class is only used in template configuration. It wraps the fields of {@link DataStreamOptions} with {@link ResettableValue} + * to allow a user to signal when they want to reset any previously encountered values during template composition. Furthermore, it + * provides the {@link Template.Builder} that dictates how two templates can be composed. + */ + public record Template(ResettableValue failureStore) implements Writeable, ToXContentObject { + public static final Template EMPTY = new Template(ResettableValue.undefined()); + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_stream_options_template", + false, + (args, unused) -> new Template( + args[0] == null ? ResettableValue.undefined() : (ResettableValue) args[0] + ) + ); + + static { + PARSER.declareObjectOrNull( + ConstructingObjectParser.optionalConstructorArg(), + (p, s) -> ResettableValue.create(DataStreamFailureStore.Template.fromXContent(p)), + ResettableValue.reset(), + FAILURE_STORE_FIELD + ); + } + + public Template { + assert failureStore != null : "Template does not accept null values, please use Resettable.undefined()"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + ResettableValue.write(out, failureStore, (o, v) -> v.writeTo(o)); + } + + public static Template read(StreamInput in) throws IOException { + ResettableValue failureStore = ResettableValue.read(in, DataStreamFailureStore.Template::read); + return new Template(failureStore); + } + + public static Template fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Converts the template to XContent, depending on the {@param params} set by {@link ResettableValue#hideResetValues(Params)} + * it may or may not display any explicit nulls when the value is to be reset. + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + failureStore.toXContent(builder, params, FAILURE_STORE_FIELD.getPreferredName()); + builder.endObject(); + return builder; + } + + public DataStreamOptions toDataStreamOptions() { + return new DataStreamOptions(failureStore.mapAndGet(DataStreamFailureStore.Template::toFailureStore)); + } + + public static Builder builder(Template template) { + return new Builder(template); + } + + /** + * Builds and composes a data stream template. + */ + public static class Builder { + private ResettableValue failureStore = ResettableValue.undefined(); + + public Builder(Template template) { + if (template != null) { + failureStore = template.failureStore(); + } + } + + /** + * Updates the current failure store configuration with the provided value. This is not a replacement necessarily, if both + * instance contain data the configurations are merged. + */ + public Builder updateFailureStore(ResettableValue newFailureStore) { + failureStore = ResettableValue.merge(failureStore, newFailureStore, DataStreamFailureStore.Template::merge); + return this; + } + + public Template build() { + return new Template(failureStore); + } + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 5dbf4da6f376f..0de87c7226380 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -261,11 +261,16 @@ static ClusterState createDataStream( // This is not a problem as both have different prefixes (`.ds-` vs `.fs-`) and both will be using the same `generation` field // when rolling over in the future. final long initialGeneration = 1; + ResettableValue dataStreamOptionsTemplate = isSystem + ? MetadataIndexTemplateService.resolveDataStreamOptions(template, systemDataStreamDescriptor.getComponentTemplates()) + : MetadataIndexTemplateService.resolveDataStreamOptions(template, metadata.componentTemplates()); + final DataStreamOptions dataStreamOptions = dataStreamOptionsTemplate.mapAndGet(DataStreamOptions.Template::toDataStreamOptions); + var isFailureStoreEnabled = dataStreamOptions != null && dataStreamOptions.isFailureStoreEnabled(); // If we need to create a failure store, do so first. Do not reroute during the creation since we will do // that as part of creating the backing index if required. IndexMetadata failureStoreIndex = null; - if (template.getDataStreamTemplate().hasFailureStore() && initializeFailureStore) { + if (isFailureStoreEnabled && initializeFailureStore) { if (isSystem) { throw new IllegalArgumentException("Failure stores are not supported on system data streams"); } @@ -303,7 +308,7 @@ static ClusterState createDataStream( } assert writeIndex != null; assert writeIndex.mapping() != null : "no mapping found for backing index [" + writeIndex.getIndex().getName() + "]"; - assert template.getDataStreamTemplate().hasFailureStore() == false || initializeFailureStore == false || failureStoreIndex != null + assert isFailureStoreEnabled == false || initializeFailureStore == false || failureStoreIndex != null : "failure store should have an initial index"; assert failureStoreIndex == null || failureStoreIndex.mapping() != null : "no mapping found for failure store [" + failureStoreIndex.getIndex().getName() + "]"; @@ -329,7 +334,7 @@ static ClusterState createDataStream( template.getDataStreamTemplate().isAllowCustomRouting(), indexMode, lifecycle == null && isDslOnlyMode ? DataStreamLifecycle.DEFAULT : lifecycle, - template.getDataStreamTemplate().hasFailureStore() ? DataStreamOptions.FAILURE_STORE_ENABLED : DataStreamOptions.EMPTY, + dataStreamOptions, new DataStream.DataStreamIndices(DataStream.BACKING_INDEX_PREFIX, dsBackingIndices, false, null), // If the failure store shouldn't be initialized on data stream creation, we're marking it for "lazy rollover", which will // initialize the failure store on first write. diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index 3878a3329b634..7f8b87d2d3f48 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -312,12 +312,7 @@ public ClusterState addComponentTemplate( } } - final Template finalTemplate = new Template( - finalSettings, - wrappedMappings, - template.template().aliases(), - template.template().lifecycle() - ); + final Template finalTemplate = Template.builder(template.template()).settings(finalSettings).mappings(wrappedMappings).build(); final ComponentTemplate finalComponentTemplate = new ComponentTemplate( finalTemplate, template.version(), @@ -348,6 +343,7 @@ public ClusterState addComponentTemplate( composableTemplate, globalRetentionSettings.get() ); + validateDataStreamOptions(tempStateWithComponentTemplateAdded.metadata(), composableTemplateName, composableTemplate); validateIndexTemplateV2(composableTemplateName, composableTemplate, tempStateWithComponentTemplateAdded); } catch (Exception e) { if (validationFailure == null) { @@ -629,7 +625,7 @@ public ClusterState addIndexTemplateV2( // adjusted (to add _doc) and it should be validated CompressedXContent mappings = innerTemplate.mappings(); CompressedXContent wrappedMappings = wrapMappingsIfNecessary(mappings, xContentRegistry); - final Template finalTemplate = new Template(finalSettings, wrappedMappings, innerTemplate.aliases(), innerTemplate.lifecycle()); + final Template finalTemplate = Template.builder(innerTemplate).settings(finalSettings).mappings(wrappedMappings).build(); finalIndexTemplate = template.toBuilder().template(finalTemplate).build(); } @@ -690,7 +686,8 @@ public static Map> v2TemplateOverlaps( return overlaps; } - private void validateIndexTemplateV2(String name, ComposableIndexTemplate indexTemplate, ClusterState currentState) { + // Visibility for testing + void validateIndexTemplateV2(String name, ComposableIndexTemplate indexTemplate, ClusterState currentState) { // Workaround for the fact that start_time and end_time are injected by the MetadataCreateDataStreamService upon creation, // but when validating templates that create data streams the MetadataCreateDataStreamService isn't used. var finalTemplate = indexTemplate.template(); @@ -726,6 +723,7 @@ private void validateIndexTemplateV2(String name, ComposableIndexTemplate indexT validate(name, templateToValidate); validateDataStreamsStillReferenced(currentState, name, templateToValidate); validateLifecycle(currentState.metadata(), name, templateToValidate, globalRetentionSettings.get()); + validateDataStreamOptions(currentState.metadata(), name, templateToValidate); if (templateToValidate.isDeprecated() == false) { validateUseOfDeprecatedComponentTemplates(name, templateToValidate, currentState.metadata().componentTemplates()); @@ -819,6 +817,20 @@ static void validateLifecycle( } } + // Visible for testing + static void validateDataStreamOptions(Metadata metadata, String indexTemplateName, ComposableIndexTemplate template) { + ResettableValue dataStreamOptions = resolveDataStreamOptions(template, metadata.componentTemplates()); + if (dataStreamOptions.get() != null) { + if (template.getDataStreamTemplate() == null) { + throw new IllegalArgumentException( + "index template [" + + indexTemplateName + + "] specifies data stream options that can only be used in combination with a data stream" + ); + } + } + } + /** * Validate that by changing or adding {@code newTemplate}, there are * no unreferenced data streams. Note that this scenario is still possible @@ -1561,7 +1573,7 @@ static List> resolveAliases( public static DataStreamLifecycle resolveLifecycle(final Metadata metadata, final String templateName) { final ComposableIndexTemplate template = metadata.templatesV2().get(templateName); assert template != null - : "attempted to resolve settings for a template [" + templateName + "] that did not exist in the cluster state"; + : "attempted to resolve lifecycle for a template [" + templateName + "] that did not exist in the cluster state"; if (template == null) { return null; } @@ -1653,6 +1665,81 @@ public static DataStreamLifecycle composeDataLifecycles(List} object + */ + public static ResettableValue resolveDataStreamOptions(final Metadata metadata, final String templateName) { + final ComposableIndexTemplate template = metadata.templatesV2().get(templateName); + assert template != null + : "attempted to resolve data stream options for a template [" + templateName + "] that did not exist in the cluster state"; + if (template == null) { + return ResettableValue.undefined(); + } + return resolveDataStreamOptions(template, metadata.componentTemplates()); + } + + /** + * Resolve the provided v2 template and component templates into a {@link ResettableValue} object + */ + public static ResettableValue resolveDataStreamOptions( + ComposableIndexTemplate template, + Map componentTemplates + ) { + Objects.requireNonNull(template, "attempted to resolve data stream for a null template"); + Objects.requireNonNull(componentTemplates, "attempted to resolve data stream options with null component templates"); + + List> dataStreamOptionsList = new ArrayList<>(); + for (String componentTemplateName : template.composedOf()) { + if (componentTemplates.containsKey(componentTemplateName) == false) { + continue; + } + ResettableValue dataStreamOptions = componentTemplates.get(componentTemplateName) + .template() + .resettableDataStreamOptions(); + if (dataStreamOptions.isDefined()) { + dataStreamOptionsList.add(dataStreamOptions); + } + } + // The actual index template's data stream options have the highest precedence. + if (template.template() != null && template.template().resettableDataStreamOptions().isDefined()) { + dataStreamOptionsList.add(template.template().resettableDataStreamOptions()); + } + return composeDataStreamOptions(dataStreamOptionsList); + } + + /** + * This method composes a series of data streams options to a final one. Since currently the data stream options + * contains only the failure store configuration which also contains only one field, the composition is a bit trivial. + * But we introduce the mechanics that will help extend it really easily. + * @param dataStreamOptionsList a sorted list of data stream options in the order that they will be composed + * @return the final data stream option configuration + */ + public static ResettableValue composeDataStreamOptions( + List> dataStreamOptionsList + ) { + if (dataStreamOptionsList.isEmpty()) { + return ResettableValue.undefined(); + } + DataStreamOptions.Template.Builder builder = null; + for (ResettableValue current : dataStreamOptionsList) { + if (current.isDefined() == false) { + continue; + } + if (current.shouldReset()) { + builder = null; + } else { + DataStreamOptions.Template currentTemplate = current.get(); + if (builder == null) { + builder = DataStreamOptions.Template.builder(currentTemplate); + } else { + // Currently failure store has only one field that needs to be defined so the composing of the failure store is trivial + builder.updateFailureStore(currentTemplate.failureStore()); + } + } + } + return builder == null ? ResettableValue.undefined() : ResettableValue.create(builder.build()); + } + /** * Given a state and a composable template, validate that the final composite template * generated by the composable template and all of its component templates contains valid diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ResettableValue.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ResettableValue.java new file mode 100644 index 0000000000000..4f38d2b8386a6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ResettableValue.java @@ -0,0 +1,216 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * This class holds a value of type @{param T} that can be in one of 3 states: + * - It has a concrete value, or + * - It is missing, or + * - It is meant to reset any other when it is composed with it. + * It is mainly used in template composition to capture the case when the user wished to reset any previous values. + * @param + */ +public class ResettableValue { + private static final ResettableValue RESET = new ResettableValue<>(true, null); + private static final ResettableValue UNDEFINED = new ResettableValue<>(false, null); + private static final String DISPLAY_RESET_VALUES = "display_reset"; + private static final Map HIDE_RESET_VALUES_PARAMS = Map.of(DISPLAY_RESET_VALUES, "false"); + + private final T value; + private final boolean isDefined; + + /** + * @return the reset state, meaning that this value is explicitly requested to be reset + */ + public static ResettableValue reset() { + @SuppressWarnings("unchecked") + ResettableValue t = (ResettableValue) RESET; + return t; + } + + /** + * @return the undefined state, meaning that this value has not been specified + */ + public static ResettableValue undefined() { + @SuppressWarnings("unchecked") + ResettableValue t = (ResettableValue) UNDEFINED; + return t; + } + + /** + * Wraps a value, if the value is null, it returns {@link #undefined()} + */ + public static ResettableValue create(T value) { + if (value == null) { + return undefined(); + } + return new ResettableValue<>(true, value); + } + + private ResettableValue(boolean isDefined, T value) { + this.isDefined = isDefined; + this.value = value; + } + + /** + * @return true if the state of this is reset + */ + public boolean shouldReset() { + return isDefined && value == null; + } + + /** + * @return true when the value is defined, either with a concrete value or reset. + */ + public boolean isDefined() { + return isDefined; + } + + /** + * @return the concrete value or null if it is in undefined or reset states. + */ + @Nullable + public T get() { + return value; + } + + /** + * Writes a single optional explicitly nullable value. This method is in direct relation with the + * {@link #read(StreamInput, Writeable.Reader)} which reads the respective value. It's the + * responsibility of the caller to preserve order of the fields and their backwards compatibility. + * + * @throws IOException + */ + static void write(StreamOutput out, ResettableValue value, Writeable.Writer writer) throws IOException { + out.writeBoolean(value.isDefined); + if (value.isDefined) { + out.writeBoolean(value.shouldReset()); + if (value.shouldReset() == false) { + writer.write(out, value.get()); + } + } + } + + /** + * Reads a single optional and explicitly nullable value. This method is in direct relation with the + * {@link #write(StreamOutput, ResettableValue, Writeable.Writer)} which writes the respective value. It's the + * responsibility of the caller to preserve order of the fields and their backwards compatibility. + * + * @throws IOException + */ + @Nullable + static ResettableValue read(StreamInput in, Writeable.Reader reader) throws IOException { + boolean isDefined = in.readBoolean(); + if (isDefined == false) { + return ResettableValue.undefined(); + } + boolean shouldReset = in.readBoolean(); + if (shouldReset) { + return ResettableValue.reset(); + } + T value = reader.read(in); + return ResettableValue.create(value); + } + + /** + * Gets the value and applies the function {@param f} when the value is not null. Slightly more efficient than + * this.map(f).get(). + */ + public U mapAndGet(Function f) { + if (isDefined() == false || shouldReset()) { + return null; + } else { + return f.apply(value); + } + } + + public ResettableValue map(Function mapper) { + Objects.requireNonNull(mapper); + if (isDefined == false) { + return ResettableValue.undefined(); + } + if (shouldReset()) { + return reset(); + } + return ResettableValue.create(mapper.apply(value)); + } + + /** + * Ιt merges the values of the ResettableValue's when they are defined using the provided mergeFunction. + */ + public static ResettableValue merge(ResettableValue initial, ResettableValue update, BiFunction mergeFunction) { + if (update.shouldReset()) { + return undefined(); + } + if (update.isDefined() == false) { + return initial; + } + if (initial.isDefined() == false || initial.shouldReset()) { + return update; + } + // Because we checked that's defined and not in reset state, we can directly apply the merge function. + return ResettableValue.create(mergeFunction.apply(initial.value, update.value)); + } + + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params, String field) throws IOException { + return toXContent(builder, params, field, Function.identity()); + } + + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params, String field, Function transformValue) + throws IOException { + if (isDefined) { + if (value != null) { + builder.field(field, transformValue.apply(value)); + } else if (ResettableValue.shouldDisplayResetValue(params)) { + builder.nullField(field); + } + } + return builder; + } + + public static boolean shouldDisplayResetValue(ToXContent.Params params) { + return params.paramAsBoolean(DISPLAY_RESET_VALUES, true); + } + + public static ToXContent.Params hideResetValues(ToXContent.Params params) { + return new ToXContent.DelegatingMapParams(HIDE_RESET_VALUES_PARAMS, params); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ResettableValue that = (ResettableValue) o; + return isDefined == that.isDefined && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(value, isDefined); + } + + @Override + public String toString() { + return "ResettableValue{" + "value=" + value + ", isDefined=" + isDefined + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Template.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Template.java index 0a9e79284ced6..7d354768ca987 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Template.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Template.java @@ -47,12 +47,19 @@ public class Template implements SimpleDiffable