From 540d2b10a354c3b45b727f3488724fa008c5d6cf Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 10 Jun 2024 19:32:31 +0200 Subject: [PATCH] Add metrics@custom component template to metrics-*-* index template (#109540) This lets users customize the metrics data stream mappings, without having to override a managed component template that may get overridden. Fixes #109475 --- docs/changelog/109540.yaml | 6 + .../datastreams/AbstractDataStreamIT.java | 169 ++++++++++++++++++ .../datastreams/EcsLogsDataStreamIT.java | 4 +- .../datastreams/LogsDataStreamIT.java | 151 +--------------- .../datastreams/MetricsDataStreamIT.java | 101 +++++++++++ .../src/main/resources/metrics@template.json | 4 +- .../xpack/stack/StackTemplateRegistry.java | 2 +- 7 files changed, 287 insertions(+), 150 deletions(-) create mode 100644 docs/changelog/109540.yaml create mode 100644 modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AbstractDataStreamIT.java create mode 100644 modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/MetricsDataStreamIT.java diff --git a/docs/changelog/109540.yaml b/docs/changelog/109540.yaml new file mode 100644 index 0000000000000..722c60a30fb97 --- /dev/null +++ b/docs/changelog/109540.yaml @@ -0,0 +1,6 @@ +pr: 109540 +summary: Add metrics@custom component template to metrics-*-* index template +area: Data streams +type: enhancement +issues: + - 109475 diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AbstractDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AbstractDataStreamIT.java new file mode 100644 index 0000000000000..ca33f08324539 --- /dev/null +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AbstractDataStreamIT.java @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.FeatureFlag; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * This base class provides the boilerplate to simplify the development of integration tests. + * Aside from providing useful helper methods and disabling unnecessary plugins, + * it waits until an {@linkplain #indexTemplateName() index template} is installed, which happens asynchronously in StackTemplateRegistry. + * This avoids race conditions leading to flaky tests by ensuring the template has been installed before executing the tests. + */ +public abstract class AbstractDataStreamIT extends ESRestTestCase { + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .distribution(DistributionType.DEFAULT) + .feature(FeatureFlag.FAILURE_STORE_ENABLED) + .setting("xpack.security.enabled", "false") + .setting("xpack.watcher.enabled", "false") + // Disable apm-data so the index templates it installs do not impact + // tests such as testIgnoreDynamicBeyondLimit. + .setting("xpack.apm_data.enabled", "false") + .build(); + protected RestClient client; + + static void waitForIndexTemplate(RestClient client, String indexTemplate) throws Exception { + assertBusy(() -> { + try { + Request request = new Request("GET", "_index_template/" + indexTemplate); + assertOK(client.performRequest(request)); + } catch (ResponseException e) { + fail(e.getMessage()); + } + }); + } + + static void createDataStream(RestClient client, String name) throws IOException { + Request request = new Request("PUT", "_data_stream/" + name); + assertOK(client.performRequest(request)); + } + + @SuppressWarnings("unchecked") + static String getWriteBackingIndex(RestClient client, String name) throws IOException { + Request request = new Request("GET", "_data_stream/" + name); + List dataStreams = (List) entityAsMap(client.performRequest(request)).get("data_streams"); + Map dataStream = (Map) dataStreams.get(0); + List> indices = (List>) dataStream.get("indices"); + return indices.get(0).get("index_name"); + } + + @SuppressWarnings("unchecked") + static Map getSettings(RestClient client, String indexName) throws IOException { + Request request = new Request("GET", "/" + indexName + "/_settings?flat_settings"); + return ((Map>) entityAsMap(client.performRequest(request)).get(indexName)).get("settings"); + } + + static void putMapping(RestClient client, String indexName) throws IOException { + Request request = new Request("PUT", "/" + indexName + "/_mapping"); + request.setJsonEntity(""" + { + "properties": { + "numeric_field": { + "type": "integer" + } + } + } + """); + assertOK(client.performRequest(request)); + } + + @SuppressWarnings("unchecked") + static Map getMappingProperties(RestClient client, String indexName) throws IOException { + Request request = new Request("GET", "/" + indexName + "/_mapping"); + Map map = (Map) entityAsMap(client.performRequest(request)).get(indexName); + Map mappings = (Map) map.get("mappings"); + return (Map) mappings.get("properties"); + } + + static void indexDoc(RestClient client, String dataStreamName, String doc) throws IOException { + Request request = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true"); + request.setJsonEntity(doc); + assertOK(client.performRequest(request)); + } + + @SuppressWarnings("unchecked") + static List searchDocs(RestClient client, String dataStreamName, String query) throws IOException { + Request request = new Request("GET", "/" + dataStreamName + "/_search"); + request.setJsonEntity(query); + Map hits = (Map) entityAsMap(client.performRequest(request)).get("hits"); + return (List) hits.get("hits"); + } + + @SuppressWarnings("unchecked") + static Object getValueFromPath(Map map, List path) { + Map current = map; + for (int i = 0; i < path.size(); i++) { + Object value = current.get(path.get(i)); + if (i == path.size() - 1) { + return value; + } + if (value == null) { + throw new IllegalStateException("Path " + String.join(".", path) + " was not found in " + map); + } + if (value instanceof Map next) { + current = (Map) next; + } else { + throw new IllegalStateException( + "Failed to reach the end of the path " + + String.join(".", path) + + " last reachable field was " + + path.get(i) + + " in " + + map + ); + } + } + return current; + } + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected Settings restAdminSettings() { + if (super.restAdminSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) { + return super.restAdminSettings(); + } else { + String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray())); + return Settings.builder().put(super.restAdminSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + } + + @Before + public void setup() throws Exception { + client = client(); + AbstractDataStreamIT.waitForIndexTemplate(client, indexTemplateName()); + } + + protected abstract String indexTemplateName(); + + @After + public void cleanUp() throws IOException { + adminClient().performRequest(new Request("DELETE", "_data_stream/*")); + } +} diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/EcsLogsDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/EcsLogsDataStreamIT.java index 5fe72c38078ee..e43b1e451c312 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/EcsLogsDataStreamIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/EcsLogsDataStreamIT.java @@ -26,7 +26,7 @@ import static org.elasticsearch.datastreams.LogsDataStreamIT.getWriteBackingIndex; import static org.elasticsearch.datastreams.LogsDataStreamIT.indexDoc; import static org.elasticsearch.datastreams.LogsDataStreamIT.searchDocs; -import static org.elasticsearch.datastreams.LogsDataStreamIT.waitForLogs; +import static org.elasticsearch.datastreams.LogsDataStreamIT.waitForIndexTemplate; import static org.hamcrest.Matchers.is; public class EcsLogsDataStreamIT extends DisabledSecurityDataStreamTestCase { @@ -38,7 +38,7 @@ public class EcsLogsDataStreamIT extends DisabledSecurityDataStreamTestCase { @Before public void setup() throws Exception { client = client(); - waitForLogs(client); + waitForIndexTemplate(client, "logs"); { Request request = new Request("PUT", "/_ingest/pipeline/logs@custom"); diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamIT.java index c2a7a76ab751a..9ab32f29f4a79 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamIT.java @@ -9,20 +9,7 @@ package org.elasticsearch.datastreams; import org.elasticsearch.client.Request; -import org.elasticsearch.client.ResponseException; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.common.settings.SecureString; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.test.cluster.ElasticsearchCluster; -import org.elasticsearch.test.cluster.FeatureFlag; -import org.elasticsearch.test.cluster.local.distribution.DistributionType; -import org.elasticsearch.test.rest.ESRestTestCase; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; - -import java.io.IOException; + import java.util.List; import java.util.Map; @@ -35,46 +22,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -public class LogsDataStreamIT extends ESRestTestCase { - - @ClassRule - public static ElasticsearchCluster cluster = ElasticsearchCluster.local() - .distribution(DistributionType.DEFAULT) - .feature(FeatureFlag.FAILURE_STORE_ENABLED) - .setting("xpack.security.enabled", "false") - .setting("xpack.watcher.enabled", "false") - // Disable apm-data so the index templates it installs do not impact - // tests such as testIgnoreDynamicBeyondLimit. - .setting("xpack.apm_data.enabled", "false") - .build(); - - @Override - protected String getTestRestCluster() { - return cluster.getHttpAddresses(); - } - - @Override - protected Settings restAdminSettings() { - if (super.restAdminSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) { - return super.restAdminSettings(); - } else { - String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray())); - return Settings.builder().put(super.restAdminSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build(); - } - } - - private RestClient client; - - @Before - public void setup() throws Exception { - client = client(); - waitForLogs(client); - } - - @After - public void cleanUp() throws IOException { - adminClient().performRequest(new Request("DELETE", "_data_stream/*")); - } +public class LogsDataStreamIT extends AbstractDataStreamIT { @SuppressWarnings("unchecked") public void testDefaultLogsSettingAndMapping() throws Exception { @@ -791,97 +739,8 @@ public void testIgnoreDynamicBeyondLimit() throws Exception { assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty()); } - static void waitForLogs(RestClient client) throws Exception { - assertBusy(() -> { - try { - Request request = new Request("GET", "_index_template/logs"); - assertOK(client.performRequest(request)); - } catch (ResponseException e) { - fail(e.getMessage()); - } - }); - } - - static void createDataStream(RestClient client, String name) throws IOException { - Request request = new Request("PUT", "_data_stream/" + name); - assertOK(client.performRequest(request)); - } - - @SuppressWarnings("unchecked") - static String getWriteBackingIndex(RestClient client, String name) throws IOException { - Request request = new Request("GET", "_data_stream/" + name); - List dataStreams = (List) entityAsMap(client.performRequest(request)).get("data_streams"); - Map dataStream = (Map) dataStreams.get(0); - List> indices = (List>) dataStream.get("indices"); - return indices.get(0).get("index_name"); - } - - @SuppressWarnings("unchecked") - static Map getSettings(RestClient client, String indexName) throws IOException { - Request request = new Request("GET", "/" + indexName + "/_settings?flat_settings"); - return ((Map>) entityAsMap(client.performRequest(request)).get(indexName)).get("settings"); - } - - static void putMapping(RestClient client, String indexName) throws IOException { - Request request = new Request("PUT", "/" + indexName + "/_mapping"); - request.setJsonEntity(""" - { - "properties": { - "numeric_field": { - "type": "integer" - } - } - } - """); - assertOK(client.performRequest(request)); - } - - @SuppressWarnings("unchecked") - static Map getMappingProperties(RestClient client, String indexName) throws IOException { - Request request = new Request("GET", "/" + indexName + "/_mapping"); - Map map = (Map) entityAsMap(client.performRequest(request)).get(indexName); - Map mappings = (Map) map.get("mappings"); - return (Map) mappings.get("properties"); - } - - static void indexDoc(RestClient client, String dataStreamName, String doc) throws IOException { - Request request = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true"); - request.setJsonEntity(doc); - assertOK(client.performRequest(request)); - } - - @SuppressWarnings("unchecked") - static List searchDocs(RestClient client, String dataStreamName, String query) throws IOException { - Request request = new Request("GET", "/" + dataStreamName + "/_search"); - request.setJsonEntity(query); - Map hits = (Map) entityAsMap(client.performRequest(request)).get("hits"); - return (List) hits.get("hits"); - } - - @SuppressWarnings("unchecked") - static Object getValueFromPath(Map map, List path) { - Map current = map; - for (int i = 0; i < path.size(); i++) { - Object value = current.get(path.get(i)); - if (i == path.size() - 1) { - return value; - } - if (value == null) { - throw new IllegalStateException("Path " + String.join(".", path) + " was not found in " + map); - } - if (value instanceof Map next) { - current = (Map) next; - } else { - throw new IllegalStateException( - "Failed to reach the end of the path " - + String.join(".", path) - + " last reachable field was " - + path.get(i) - + " in " - + map - ); - } - } - return current; + @Override + protected String indexTemplateName() { + return "logs"; } } diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/MetricsDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/MetricsDataStreamIT.java new file mode 100644 index 0000000000000..6cc300378a312 --- /dev/null +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/MetricsDataStreamIT.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.client.Request; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class MetricsDataStreamIT extends AbstractDataStreamIT { + + @SuppressWarnings("unchecked") + public void testCustomMapping() throws Exception { + { + Request request = new Request("POST", "/_component_template/metrics@custom"); + request.setJsonEntity(""" + { + "template": { + "settings": { + "index": { + "query": { + "default_field": ["custom-message"] + } + } + }, + "mappings": { + "properties": { + "numeric_field": { + "type": "integer" + }, + "socket": { + "properties": { + "ip": { + "type": "keyword" + } + } + } + } + } + } + } + """); + assertOK(client.performRequest(request)); + } + + String dataStreamName = "metrics-generic-default"; + createDataStream(client, dataStreamName); + String backingIndex = getWriteBackingIndex(client, dataStreamName); + + // Verify that the custom settings.index.query.default_field overrides the default query field - "message" + Map settings = getSettings(client, backingIndex); + assertThat(settings.get("index.query.default_field"), is(List.of("custom-message"))); + + // Verify that the new field from the custom component template is applied + putMapping(client, backingIndex); + Map mappingProperties = getMappingProperties(client, backingIndex); + assertThat(getValueFromPath(mappingProperties, List.of("numeric_field", "type")), equalTo("integer")); + assertThat(getValueFromPath(mappingProperties, List.of("socket", "properties", "ip", "type")), is("keyword")); + + // Insert valid doc and verify successful indexing + { + indexDoc(client, dataStreamName, """ + { + "@timestamp": "2024-06-10", + "test": "doc-with-ip", + "socket": { + "ip": "127.0.0.1" + } + } + """); + List results = searchDocs(client, dataStreamName, """ + { + "query": { + "term": { + "test": { + "value": "doc-with-ip" + } + } + }, + "fields": ["socket.ip"] + } + """); + Map fields = ((Map>) results.get(0)).get("_source"); + assertThat(fields.get("socket"), is(Map.of("ip", "127.0.0.1"))); + } + } + + @Override + protected String indexTemplateName() { + return "metrics"; + } +} diff --git a/x-pack/plugin/core/template-resources/src/main/resources/metrics@template.json b/x-pack/plugin/core/template-resources/src/main/resources/metrics@template.json index 464df09ffe2ce..776ed88857db5 100644 --- a/x-pack/plugin/core/template-resources/src/main/resources/metrics@template.json +++ b/x-pack/plugin/core/template-resources/src/main/resources/metrics@template.json @@ -5,8 +5,10 @@ "composed_of": [ "metrics@mappings", "data-streams@mappings", - "metrics@settings" + "metrics@settings", + "metrics@custom" ], + "ignore_missing_component_templates": ["metrics@custom"], "allow_auto_create": true, "_meta": { "description": "default metrics template installed by x-pack", diff --git a/x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java b/x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java index 30323a1d7d363..3cd551ca1f3d9 100644 --- a/x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java +++ b/x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java @@ -47,7 +47,7 @@ public class StackTemplateRegistry extends IndexTemplateRegistry { // The stack template registry version. This number must be incremented when we make changes // to built-in templates. - public static final int REGISTRY_VERSION = 10; + public static final int REGISTRY_VERSION = 11; public static final String TEMPLATE_VERSION_VARIABLE = "xpack.stack.template.version"; public static final Setting STACK_TEMPLATES_ENABLED = Setting.boolSetting(