Skip to content

Commit

Permalink
Add metrics@custom component template to metrics-*-* index template (e…
Browse files Browse the repository at this point in the history
…lastic#109540)

This lets users customize the metrics data stream mappings, without
having to override a managed component template that may get overridden.

Fixes elastic#109475
  • Loading branch information
felixbarny authored Jun 10, 2024
1 parent a9f31bd commit 540d2b1
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 150 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/109540.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 109540
summary: Add metrics@custom component template to metrics-*-* index template
area: Data streams
type: enhancement
issues:
- 109475
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* This base class provides the boilerplate to simplify the development of integration tests.
* Aside from providing useful helper methods and disabling unnecessary plugins,
* it waits until an {@linkplain #indexTemplateName() index template} is installed, which happens asynchronously in StackTemplateRegistry.
* This avoids race conditions leading to flaky tests by ensuring the template has been installed before executing the tests.
*/
public abstract class AbstractDataStreamIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.security.enabled", "false")
.setting("xpack.watcher.enabled", "false")
// Disable apm-data so the index templates it installs do not impact
// tests such as testIgnoreDynamicBeyondLimit.
.setting("xpack.apm_data.enabled", "false")
.build();
protected RestClient client;

static void waitForIndexTemplate(RestClient client, String indexTemplate) throws Exception {
assertBusy(() -> {
try {
Request request = new Request("GET", "_index_template/" + indexTemplate);
assertOK(client.performRequest(request));
} catch (ResponseException e) {
fail(e.getMessage());
}
});
}

static void createDataStream(RestClient client, String name) throws IOException {
Request request = new Request("PUT", "_data_stream/" + name);
assertOK(client.performRequest(request));
}

@SuppressWarnings("unchecked")
static String getWriteBackingIndex(RestClient client, String name) throws IOException {
Request request = new Request("GET", "_data_stream/" + name);
List<Object> dataStreams = (List<Object>) entityAsMap(client.performRequest(request)).get("data_streams");
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
List<Map<String, String>> indices = (List<Map<String, String>>) dataStream.get("indices");
return indices.get(0).get("index_name");
}

@SuppressWarnings("unchecked")
static Map<String, Object> getSettings(RestClient client, String indexName) throws IOException {
Request request = new Request("GET", "/" + indexName + "/_settings?flat_settings");
return ((Map<String, Map<String, Object>>) entityAsMap(client.performRequest(request)).get(indexName)).get("settings");
}

static void putMapping(RestClient client, String indexName) throws IOException {
Request request = new Request("PUT", "/" + indexName + "/_mapping");
request.setJsonEntity("""
{
"properties": {
"numeric_field": {
"type": "integer"
}
}
}
""");
assertOK(client.performRequest(request));
}

@SuppressWarnings("unchecked")
static Map<String, Object> getMappingProperties(RestClient client, String indexName) throws IOException {
Request request = new Request("GET", "/" + indexName + "/_mapping");
Map<String, Object> map = (Map<String, Object>) entityAsMap(client.performRequest(request)).get(indexName);
Map<String, Object> mappings = (Map<String, Object>) map.get("mappings");
return (Map<String, Object>) mappings.get("properties");
}

static void indexDoc(RestClient client, String dataStreamName, String doc) throws IOException {
Request request = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
request.setJsonEntity(doc);
assertOK(client.performRequest(request));
}

@SuppressWarnings("unchecked")
static List<Object> searchDocs(RestClient client, String dataStreamName, String query) throws IOException {
Request request = new Request("GET", "/" + dataStreamName + "/_search");
request.setJsonEntity(query);
Map<String, Object> hits = (Map<String, Object>) entityAsMap(client.performRequest(request)).get("hits");
return (List<Object>) hits.get("hits");
}

@SuppressWarnings("unchecked")
static Object getValueFromPath(Map<String, Object> map, List<String> path) {
Map<String, Object> current = map;
for (int i = 0; i < path.size(); i++) {
Object value = current.get(path.get(i));
if (i == path.size() - 1) {
return value;
}
if (value == null) {
throw new IllegalStateException("Path " + String.join(".", path) + " was not found in " + map);
}
if (value instanceof Map<?, ?> next) {
current = (Map<String, Object>) next;
} else {
throw new IllegalStateException(
"Failed to reach the end of the path "
+ String.join(".", path)
+ " last reachable field was "
+ path.get(i)
+ " in "
+ map
);
}
}
return current;
}

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected Settings restAdminSettings() {
if (super.restAdminSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) {
return super.restAdminSettings();
} else {
String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray()));
return Settings.builder().put(super.restAdminSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
}
}

@Before
public void setup() throws Exception {
client = client();
AbstractDataStreamIT.waitForIndexTemplate(client, indexTemplateName());
}

protected abstract String indexTemplateName();

@After
public void cleanUp() throws IOException {
adminClient().performRequest(new Request("DELETE", "_data_stream/*"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.elasticsearch.datastreams.LogsDataStreamIT.getWriteBackingIndex;
import static org.elasticsearch.datastreams.LogsDataStreamIT.indexDoc;
import static org.elasticsearch.datastreams.LogsDataStreamIT.searchDocs;
import static org.elasticsearch.datastreams.LogsDataStreamIT.waitForLogs;
import static org.elasticsearch.datastreams.LogsDataStreamIT.waitForIndexTemplate;
import static org.hamcrest.Matchers.is;

public class EcsLogsDataStreamIT extends DisabledSecurityDataStreamTestCase {
Expand All @@ -38,7 +38,7 @@ public class EcsLogsDataStreamIT extends DisabledSecurityDataStreamTestCase {
@Before
public void setup() throws Exception {
client = client();
waitForLogs(client);
waitForIndexTemplate(client, "logs");

{
Request request = new Request("PUT", "/_ingest/pipeline/logs@custom");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,7 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;

import java.io.IOException;

import java.util.List;
import java.util.Map;

Expand All @@ -35,46 +22,7 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class LogsDataStreamIT extends ESRestTestCase {

@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.security.enabled", "false")
.setting("xpack.watcher.enabled", "false")
// Disable apm-data so the index templates it installs do not impact
// tests such as testIgnoreDynamicBeyondLimit.
.setting("xpack.apm_data.enabled", "false")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
protected Settings restAdminSettings() {
if (super.restAdminSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) {
return super.restAdminSettings();
} else {
String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray()));
return Settings.builder().put(super.restAdminSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
}
}

private RestClient client;

@Before
public void setup() throws Exception {
client = client();
waitForLogs(client);
}

@After
public void cleanUp() throws IOException {
adminClient().performRequest(new Request("DELETE", "_data_stream/*"));
}
public class LogsDataStreamIT extends AbstractDataStreamIT {

@SuppressWarnings("unchecked")
public void testDefaultLogsSettingAndMapping() throws Exception {
Expand Down Expand Up @@ -791,97 +739,8 @@ public void testIgnoreDynamicBeyondLimit() throws Exception {
assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty());
}

static void waitForLogs(RestClient client) throws Exception {
assertBusy(() -> {
try {
Request request = new Request("GET", "_index_template/logs");
assertOK(client.performRequest(request));
} catch (ResponseException e) {
fail(e.getMessage());
}
});
}

static void createDataStream(RestClient client, String name) throws IOException {
Request request = new Request("PUT", "_data_stream/" + name);
assertOK(client.performRequest(request));
}

@SuppressWarnings("unchecked")
static String getWriteBackingIndex(RestClient client, String name) throws IOException {
Request request = new Request("GET", "_data_stream/" + name);
List<Object> dataStreams = (List<Object>) entityAsMap(client.performRequest(request)).get("data_streams");
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
List<Map<String, String>> indices = (List<Map<String, String>>) dataStream.get("indices");
return indices.get(0).get("index_name");
}

@SuppressWarnings("unchecked")
static Map<String, Object> getSettings(RestClient client, String indexName) throws IOException {
Request request = new Request("GET", "/" + indexName + "/_settings?flat_settings");
return ((Map<String, Map<String, Object>>) entityAsMap(client.performRequest(request)).get(indexName)).get("settings");
}

static void putMapping(RestClient client, String indexName) throws IOException {
Request request = new Request("PUT", "/" + indexName + "/_mapping");
request.setJsonEntity("""
{
"properties": {
"numeric_field": {
"type": "integer"
}
}
}
""");
assertOK(client.performRequest(request));
}

@SuppressWarnings("unchecked")
static Map<String, Object> getMappingProperties(RestClient client, String indexName) throws IOException {
Request request = new Request("GET", "/" + indexName + "/_mapping");
Map<String, Object> map = (Map<String, Object>) entityAsMap(client.performRequest(request)).get(indexName);
Map<String, Object> mappings = (Map<String, Object>) map.get("mappings");
return (Map<String, Object>) mappings.get("properties");
}

static void indexDoc(RestClient client, String dataStreamName, String doc) throws IOException {
Request request = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
request.setJsonEntity(doc);
assertOK(client.performRequest(request));
}

@SuppressWarnings("unchecked")
static List<Object> searchDocs(RestClient client, String dataStreamName, String query) throws IOException {
Request request = new Request("GET", "/" + dataStreamName + "/_search");
request.setJsonEntity(query);
Map<String, Object> hits = (Map<String, Object>) entityAsMap(client.performRequest(request)).get("hits");
return (List<Object>) hits.get("hits");
}

@SuppressWarnings("unchecked")
static Object getValueFromPath(Map<String, Object> map, List<String> path) {
Map<String, Object> current = map;
for (int i = 0; i < path.size(); i++) {
Object value = current.get(path.get(i));
if (i == path.size() - 1) {
return value;
}
if (value == null) {
throw new IllegalStateException("Path " + String.join(".", path) + " was not found in " + map);
}
if (value instanceof Map<?, ?> next) {
current = (Map<String, Object>) next;
} else {
throw new IllegalStateException(
"Failed to reach the end of the path "
+ String.join(".", path)
+ " last reachable field was "
+ path.get(i)
+ " in "
+ map
);
}
}
return current;
@Override
protected String indexTemplateName() {
return "logs";
}
}
Loading

0 comments on commit 540d2b1

Please sign in to comment.