Skip to content

Commit

Permalink
Introduce data stream options via dedicated classes (elastic#117357)
Browse files Browse the repository at this point in the history
In this PR we move the failure store configuration from the
`data_stream` in a composable index template to the `template` section
under the `data_stream_options`. The reason for this change is that we
want to be able to compose the failure store configuration via component
templates.

**Previous composable index template**

```
{
  "index_patterns": [....],
  "template": { 
      ......
  },
  "data_stream": {
    "failure_store": true
  }
}
```

**New composable index template**

```
{
  "index_patterns": [....],
  "template": { 
      "data_stream_options": {
        "failure_store": {
          "enabled": true
        }
      },
      ......
  },
  "data_stream": {}
}
```

**Composition logic**

| Component Template A | Composable Index Template | Resolved data
stream options |
|-------------------------|-----------------------------|-------------------------------|
| `{"failure_store": {"enabled": true}}` | - |`{"failure_store":
{"enabled": true}}`| | `{"failure_store": {"enabled": true}}` |
`{"failure_store": {"enabled": false}}` |`{"failure_store": {"enabled":
false}}`| | - | `{"failure_store": {"enabled": true}}`
|`{"failure_store": {"enabled": true}}`| | `{"failure_store": null}` |
`{"failure_store": {"enabled": true}}` |`{"failure_store": {"enabled":
true}}`| | `{"failure_store": {"enabled": true}}` | `{"failure_store":
null}` |`{}`|

More context and discussions can be found in the comments of
elastic#113863.
  • Loading branch information
gmarouli authored Dec 10, 2024
1 parent 2381559 commit 5997fc3
Show file tree
Hide file tree
Showing 44 changed files with 1,458 additions and 264 deletions.
28 changes: 28 additions & 0 deletions modules/data-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,32 @@ if (buildParams.isSnapshotBuild() == false) {

tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.skipTest("data_stream/10_basic/Create hidden data stream", "warning does not exist for compatibility")

// Failure store configuration changed on 8.18 (earlier versions behind feature flag)
task.skipTest("data_stream/10_basic/Create data stream with failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/10_basic/Delete data stream with failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/10_basic/Delete data stream with failure store uninitialized", "Configuring the failure store via data stream templates is not supported anymore.")

task.skipTest("data_stream/30_auto_create_data_stream/Don't initialize failure store during data stream auto-creation on successful index", "Configuring the failure store via data stream templates is not supported anymore.")

task.skipTest("data_stream/150_tsdb/TSDB failures go to failure store", "Configuring the failure store via data stream templates is not supported anymore.")

task.skipTest("data_stream/170_modify_data_stream/Modify a data stream's failure store", "Configuring the failure store via data stream templates is not supported anymore.")

task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to original failure store during index change if final pipeline changes target", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Ensure failure is redirected to correct failure store after a reroute processor", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Test failure store status with bulk request", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Redirect ingest failure in data stream to failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to correct failure store when pipeline loop is detected", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to correct failure store when index loop is detected", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to original failure store during index change if self referenced", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Redirect shard failure in data stream to failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Version conflicts are not redirected to failure store", "Configuring the failure store via data stream templates is not supported anymore.")

task.skipTest("data_stream/200_rollover_failure_store/Lazily roll over a data stream's failure store after an ingest failure", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/A failure store marked for lazy rollover should only be rolled over when there is a failure", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Roll over a data stream's failure store without conditions", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Lazily roll over a data stream's failure store after a shard failure", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Don't roll over a data stream's failure store when conditions aren't met", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Roll over a data stream's failure store with conditions", "Configuring the failure store via data stream templates is not supported anymore.")
})
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
import org.elasticsearch.cluster.metadata.IndexWriteLoad;
Expand Down Expand Up @@ -2447,9 +2448,10 @@ static void putComposableIndexTemplate(
.mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings))
.aliases(aliases)
.lifecycle(lifecycle)
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(withFailureStore))
)
.metadata(metadata)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, withFailureStore))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -165,8 +166,8 @@ private void createDataStreamWithFailureStore() throws IOException {
request.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of(DATA_STREAM_NAME + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
.template(new Template(null, new CompressedXContent("""
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(Template.builder().mappings(new CompressedXContent("""
{
"dynamic": false,
"properties": {
Expand All @@ -177,7 +178,7 @@ private void createDataStreamWithFailureStore() throws IOException {
"type": "long"
}
}
}"""), null))
}""")).dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.build()
);
assertAcked(safeGet(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -283,8 +284,8 @@ private void putComposableIndexTemplate(boolean failureStore) throws IOException
request.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, failureStore))
.template(new Template(null, new CompressedXContent("""
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(Template.builder().mappings(new CompressedXContent("""
{
"dynamic": false,
"properties": {
Expand All @@ -295,7 +296,7 @@ private void putComposableIndexTemplate(boolean failureStore) throws IOException
"type": "long"
}
}
}"""), null))
}""")).dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(failureStore)))
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -1226,9 +1227,10 @@ static void putComposableIndexTemplate(
.settings(settings)
.mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings))
.lifecycle(lifecycle)
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(withFailureStore))
)
.metadata(metadata)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, withFailureStore))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.junit.Before;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -40,10 +42,14 @@ public void setup() throws IOException {
"template": {
"settings": {
"number_of_replicas": 0
},
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
},
"data_stream": {
"failure_store": true
}
}
""");
Expand All @@ -59,12 +65,63 @@ public void setup() throws IOException {
assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
assertThat(((Map<String, Object>) dataStream.get("failure_store")).get("enabled"), is(true));
List<String> backingIndices = getIndices(dataStream);
assertThat(backingIndices.size(), is(1));
List<String> failureStore = getFailureStore(dataStream);
assertThat(failureStore.size(), is(1));
}

public void testExplicitlyResetDataStreamOptions() throws IOException {
Request putComponentTemplateRequest = new Request("POST", "/_component_template/with-options");
putComponentTemplateRequest.setJsonEntity("""
{
"template": {
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
}
}
""");
assertOK(client().performRequest(putComponentTemplateRequest));

Request invalidRequest = new Request("POST", "/_index_template/other-template");
invalidRequest.setJsonEntity("""
{
"index_patterns": ["something-else"],
"composed_of" : ["with-options"],
"template": {
"settings": {
"number_of_replicas": 0
}
}
}
""");
Exception error = expectThrows(ResponseException.class, () -> client().performRequest(invalidRequest));
assertThat(
error.getMessage(),
containsString("specifies data stream options that can only be used in combination with a data stream")
);

// Check that when we nullify the data stream options we can create use any component template in a non data stream template
Request otherRequest = new Request("POST", "/_index_template/other-template");
otherRequest.setJsonEntity("""
{
"index_patterns": ["something-else"],
"composed_of" : ["with-options"],
"template": {
"settings": {
"number_of_replicas": 0
},
"data_stream_options": null
}
}
""");
assertOK(client().performRequest(otherRequest));
}

public void testEnableDisableFailureStore() throws IOException {
{
assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ public void setup() throws IOException {
"template": {
"settings": {
"number_of_replicas": 0
},
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
},
"data_stream": {
"failure_store": true
}
}
""");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,12 @@ setup:
---
"Create data stream with failure store":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores default settings changed in 8.15+"
test_runner_features: [ capabilities, allowed_warnings ]
reason: "Data stream failure stores config in templates was added in 8.18+"
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]

- do:
ingest.put_pipeline:
Expand Down Expand Up @@ -256,8 +260,7 @@ setup:
name: my-template4
body:
index_patterns: [ failure-data-stream1, failure-data-stream2 ]
data_stream:
failure_store: true
data_stream: {}
template:
settings:
index:
Expand All @@ -269,6 +272,9 @@ setup:
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true

- do:
indices.create_data_stream:
Expand Down Expand Up @@ -632,8 +638,12 @@ setup:
---
"Delete data stream with failure store":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
reason: "Data stream failure stores config in templates was added in 8.18+"
test_runner_features: [ allowed_warnings, capabilities ]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]

- do:
allowed_warnings:
Expand All @@ -642,15 +652,17 @@ setup:
name: my-template4
body:
index_patterns: [ failure-data-stream1 ]
data_stream:
failure_store: true
data_stream: {}
template:
mappings:
properties:
'@timestamp':
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true

- do:
indices.create_data_stream:
Expand Down Expand Up @@ -722,8 +734,12 @@ setup:
---
"Delete data stream with failure store uninitialized":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
reason: "Data stream failure stores config in templates was added in 8.18+"
test_runner_features: [ capabilities, allowed_warnings ]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]

- do:
allowed_warnings:
Expand All @@ -732,8 +748,11 @@ setup:
name: my-template4
body:
index_patterns: [ failure-data-stream1 ]
data_stream:
failure_store: true
data_stream: {}
template:
data_stream_options:
failure_store:
enabled: true

- do:
indices.create_data_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,20 @@ index without timestamp:
---
TSDB failures go to failure store:
- requires:
cluster_features: ["data_stream.failure_store.tsdb_fix"]
reason: "tests tsdb failure store fixes in 8.16.0 that catch timestamp errors that happen earlier in the process and redirect them to the failure store."

reason: "Data stream failure stores config in templates was added in 8.18+"
test_runner_features: [ capabilities, allowed_warnings ]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- do:
allowed_warnings:
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
indices.put_index_template:
name: my-template2
body:
index_patterns: [ "fs-k8s*" ]
data_stream:
failure_store: true
data_stream: {}
template:
settings:
index:
Expand All @@ -207,6 +209,9 @@ TSDB failures go to failure store:
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
data_stream_options:
failure_store:
enabled: true
mappings:
properties:
"@timestamp":
Expand Down
Loading

0 comments on commit 5997fc3

Please sign in to comment.