diff --git a/.github/workflows/opensearch-sink-opendistro-integration-tests.yml b/.github/workflows/opensearch-sink-opendistro-integration-tests.yml index cfaca4a330..45c477e12f 100644 --- a/.github/workflows/opensearch-sink-opendistro-integration-tests.yml +++ b/.github/workflows/opensearch-sink-opendistro-integration-tests.yml @@ -40,7 +40,7 @@ jobs: - name: Run Open Distro tests run: | ./gradlew :data-prepper-plugins:opensearch:integrationTest --tests "org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIT" -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin - ./gradlew :data-prepper-plugins:opensearch:integrationTest -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin -Dtests.opensearch.bundle=true + ./gradlew :data-prepper-plugins:opensearch:integrationTest -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin -Dtests.opensearch.bundle=true -Dtests.opensearch.version=opendistro:${{ matrix.opendistro }} - name: Upload Unit Test Results if: always() uses: actions/upload-artifact@v3 diff --git a/.github/workflows/opensearch-sink-opensearch-integration-tests.yml b/.github/workflows/opensearch-sink-opensearch-integration-tests.yml index 925086127d..5e34350303 100644 --- a/.github/workflows/opensearch-sink-opensearch-integration-tests.yml +++ b/.github/workflows/opensearch-sink-opensearch-integration-tests.yml @@ -40,7 +40,7 @@ jobs: - name: Run OpenSearch tests run: | ./gradlew :data-prepper-plugins:opensearch:integrationTest --tests "org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIT" -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin - ./gradlew :data-prepper-plugins:opensearch:integrationTest -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin -Dtests.opensearch.bundle=true + ./gradlew :data-prepper-plugins:opensearch:integrationTest -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin -Dtests.opensearch.bundle=true -Dtests.opensearch.version=opensearch:${{ matrix.opensearch }} - name: Upload Unit Test Results if: always() uses: actions/upload-artifact@v3 diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index 0ccfc39d33..edc297e3c0 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -129,6 +129,8 @@ Default is null. * This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`. * This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value. +- `template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint. + - `template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of `"template"` key in the json content of OpenSearch [Index templates API](https://opensearch.org/docs/latest/opensearch/index-templates/), e.g. [otel-v1-apm-span-index-template.json](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/resources/otel-v1-apm-span-index-template.json) diff --git a/data-prepper-plugins/opensearch/build.gradle b/data-prepper-plugins/opensearch/build.gradle index e77cbc3815..40ecb6b2c7 100644 --- a/data-prepper-plugins/opensearch/build.gradle +++ b/data-prepper-plugins/opensearch/build.gradle @@ -64,9 +64,11 @@ task integrationTest(type: Test) { systemProperty 'tests.opensearch.bundle', System.getProperty('tests.opensearch.bundle') systemProperty 'tests.opensearch.user', System.getProperty('tests.opensearch.user') systemProperty 'tests.opensearch.password', System.getProperty('tests.opensearch.password') + systemProperty 'tests.opensearch.version', System.getProperty('tests.opensearch.version') filter { includeTestsMatching '*IT' + includeTestsMatching '*Test' } } diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java new file mode 100644 index 0000000000..ff79a0f555 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import org.opensearch.Version; + +/** + * An interface for letting the test declare what OpenSearch version is being + * tested against. + */ +class DeclaredOpenSearchVersion implements Comparable { + private static final DeclaredOpenSearchVersion DEFAULT = new DeclaredOpenSearchVersion(Distribution.OPENSEARCH, "1.0.0"); + public static final DeclaredOpenSearchVersion OPENDISTRO_1_9 = new DeclaredOpenSearchVersion(Distribution.OPENDISTRO, "1.9.0"); + + enum Distribution { + OPENDISTRO, + OPENSEARCH + } + + private final Distribution distribution; + private final String version; + private final Version internalVersion; + + private DeclaredOpenSearchVersion(final Distribution distribution, final String version) { + this.distribution = distribution; + this.version = version; + + internalVersion = Version.fromString(version); + } + + static DeclaredOpenSearchVersion parse(final String versionString) { + if(versionString == null) { + return DEFAULT; + } + + final String[] parts = versionString.split(":"); + + if(parts.length != 2) { + throw new IllegalArgumentException("Invalid version string provided."); + } + + final Distribution distribution = Distribution.valueOf(parts[0].toUpperCase()); + + return new DeclaredOpenSearchVersion(distribution, parts[1]); + } + + @Override + public int compareTo(final DeclaredOpenSearchVersion other) { + final int distributionCompareTo = distribution.compareTo(other.distribution); + + if(distributionCompareTo != 0) { + return distributionCompareTo; + } + + return internalVersion.compareTo(other.internalVersion); + } + + Distribution getDistribution() { + return distribution; + } + + String getVersion() { + return version; + } +} diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersionTest.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersionTest.java new file mode 100644 index 0000000000..820c39b820 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersionTest.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class DeclaredOpenSearchVersionTest { + @ParameterizedTest + @CsvSource({ + "opensearch:1.2.4,OPENSEARCH,1.2.4", + "opensearch:1.3.9,OPENSEARCH,1.3.9", + "opensearch:2.2.1,OPENSEARCH,2.2.1", + "opensearch:2.6.0,OPENSEARCH,2.6.0", + "opendistro:1.3.0,OPENDISTRO,1.3.0", + "opendistro:1.13.3,OPENDISTRO,1.13.3" + }) + void parse_should_return_expected(final String versionString, final DeclaredOpenSearchVersion.Distribution expectedDistribution, final String expectedVersion) { + final DeclaredOpenSearchVersion version = DeclaredOpenSearchVersion.parse(versionString); + + assertThat(version, notNullValue()); + assertThat(version.getDistribution(), equalTo(expectedDistribution)); + assertThat(version.getVersion(), equalTo(expectedVersion)); + } + + @Test + void parse_with_null_should_return_minimum_version() { + final DeclaredOpenSearchVersion version = DeclaredOpenSearchVersion.parse(null); + + assertThat(version, notNullValue()); + assertThat(version.getDistribution(), equalTo(DeclaredOpenSearchVersion.Distribution.OPENSEARCH)); + assertThat(version.getVersion(), equalTo("1.0.0")); + } + + @ParameterizedTest + @CsvSource({ + "opensearch:1.2.4,opensearch:1.2.4,0", + "opendistro:1.13.3,opendistro:1.13.3,0", + "opensearch:1.2.4,opensearch:1.2.3,1", + "opensearch:1.2.3,opensearch:1.2.4,-1", + "opensearch:2.6.0,opensearch:1.2.3,1", + "opensearch:1.2.3,opensearch:2.6.0,-1", + "opensearch:1.3.9,opendistro:1.13.3,1", + "opendistro:1.13.3,opensearch:1.3.9,-1" + }) + void compareTo_returns_correct_value(final String versionStringToTest, final String otherVersionString, final int expectedCompareTo) { + final DeclaredOpenSearchVersion objectUnderTest = DeclaredOpenSearchVersion.parse(versionStringToTest); + final DeclaredOpenSearchVersion otherVersion = DeclaredOpenSearchVersion.parse(otherVersionString); + + assertThat(objectUnderTest.compareTo(otherVersion), equalTo(expectedCompareTo)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchIntegrationHelper.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchIntegrationHelper.java index 43103636c5..b44562909d 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchIntegrationHelper.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchIntegrationHelper.java @@ -92,6 +92,10 @@ static List getHosts() { .map(ip -> String.format("https://%s", ip)).collect(Collectors.toList()); } + static DeclaredOpenSearchVersion getVersion() { + return DeclaredOpenSearchVersion.parse(System.getProperty("tests.opensearch.version")); + } + /** * Copied from OpenSearch test framework * TODO: Consolidate in OpenSearch diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 57fe7320d0..f437c0c62c 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -5,17 +5,6 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; -import org.mockito.Mock; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.metrics.MetricNames; -import org.opensearch.dataprepper.metrics.MetricsTestUtil; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventType; -import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.model.record.Record; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -28,19 +17,34 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; import org.opensearch.common.Strings; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.EventType; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; import javax.ws.rs.HttpMethod; @@ -53,6 +57,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -64,43 +69,48 @@ import java.util.StringJoiner; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.http.HttpStatus.SC_OK; import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.closeTo; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.isOSBundle; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.waitForClusterStateUpdatesToFinish; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.wipeAllTemplates; -import static org.mockito.Mockito.mock; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.lenient; public class OpenSearchSinkIT { - private static final String PLUGIN_NAME = "opensearch"; - private static final String PIPELINE_NAME = "integTestPipeline"; - private static final String TEST_CUSTOM_INDEX_POLICY_FILE = "test-custom-index-policy-file.json"; - private static final String TEST_TEMPLATE_V1_FILE = "test-index-template.json"; - private static final String TEST_TEMPLATE_V2_FILE = "test-index-template-v2.json"; - private static final String DEFAULT_RAW_SPAN_FILE_1 = "raw-span-1.json"; - private static final String DEFAULT_RAW_SPAN_FILE_2 = "raw-span-2.json"; - private static final String DEFAULT_SERVICE_MAP_FILE = "service-map-1.json"; - - private RestClient client; - private EventHandle eventHandle; - - @Mock - private PluginFactory pluginFactory; + private static final String PLUGIN_NAME = "opensearch"; + private static final String PIPELINE_NAME = "integTestPipeline"; + private static final String TEST_CUSTOM_INDEX_POLICY_FILE = "test-custom-index-policy-file.json"; + private static final String TEST_TEMPLATE_V1_FILE = "test-index-template.json"; + private static final String TEST_TEMPLATE_V2_FILE = "test-index-template-v2.json"; + private static final String TEST_INDEX_TEMPLATE_V1_FILE = "test-composable-index-template.json"; + private static final String TEST_INDEX_TEMPLATE_V2_FILE = "test-composable-index-template-v2.json"; + private static final String DEFAULT_RAW_SPAN_FILE_1 = "raw-span-1.json"; + private static final String DEFAULT_RAW_SPAN_FILE_2 = "raw-span-2.json"; + private static final String DEFAULT_SERVICE_MAP_FILE = "service-map-1.json"; + + private RestClient client; + private EventHandle eventHandle; + + @Mock + private PluginFactory pluginFactory; @Mock private AwsCredentialsSupplier awsCredentialsSupplier; @@ -113,866 +123,906 @@ public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean return sink; } - @BeforeEach - public void setup() { - eventHandle = mock(EventHandle.class); - lenient().doAnswer(a -> { + @BeforeEach + public void setup() { + eventHandle = mock(EventHandle.class); + lenient().doAnswer(a -> { return null; - }).when(eventHandle).release(any(Boolean.class)); - } - - @BeforeEach - public void metricsInit() throws IOException { - MetricsTestUtil.initMetrics(); - - client = createOpenSearchClient(); - } - - @AfterEach - public void cleanOpenSearch() throws Exception { - wipeAllOpenSearchIndices(); - wipeAllTemplates(); - waitForClusterStateUpdatesToFinish(); - } - - @Test - public void testInstantiateSinkRawSpanDefault() throws IOException { - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); - Request request = new Request(HttpMethod.HEAD, indexAlias); - Response response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); - final String index = String.format("%s-000001", indexAlias); - final Map mappings = getIndexMappings(index); - MatcherAssert.assertThat(mappings, notNullValue()); - MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); - sink.shutdown(); - - if (isOSBundle()) { - // Check managed index - await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { - MatcherAssert.assertThat(getIndexPolicyId(index), equalTo(IndexConstants.RAW_ISM_POLICY)); - } - ); - } - - // roll over initial index - request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias)); - request.setJsonEntity("{ \"conditions\" : { } }\n"); - response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); - - // Instantiate sink again - sink = createObjectUnderTest(pluginSetting, true); - // Make sure no new write index *-000001 is created under alias - final String rolloverIndexName = String.format("%s-000002", indexAlias); - request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); - response = client.performRequest(request); - MatcherAssert.assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true)); - sink.shutdown(); - - if (isOSBundle()) { - // Check managed index - MatcherAssert.assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.RAW_ISM_POLICY)); + }).when(eventHandle).release(any(Boolean.class)); } - } - - @Test - public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws IOException { - final String reservedIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); - final Request request = new Request(HttpMethod.PUT, reservedIndexAlias); - client.performRequest(request); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); - Assert.assertThrows(String.format(AbstractIndexManager.INDEX_ALIAS_USED_AS_INDEX_ERROR, reservedIndexAlias), - RuntimeException.class, () -> sink.doInitialize()); - } - - @Test - public void testOutputRawSpanDefault() throws IOException, InterruptedException { - final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1); - final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2); - final ObjectMapper mapper = new ObjectMapper(); - @SuppressWarnings("unchecked") final Map expData1 = mapper.readValue(testDoc1, Map.class); - @SuppressWarnings("unchecked") final Map expData2 = mapper.readValue(testDoc2, Map.class); - - final List> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - - final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); - final List> retSources = getSearchResponseDocSources(expIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(2)); - MatcherAssert.assertThat(retSources, hasItems(expData1, expData2)); - MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData1.get("spanId")), equalTo(Integer.valueOf(1))); - sink.shutdown(); - - // Verify metrics - final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); - MatcherAssert.assertThat(bulkRequestErrors.size(), equalTo(1)); - Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); - final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); - // COUNT - Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); - // TOTAL_TIME - Assert.assertTrue(bulkRequestLatencies.get(1).getValue() > 0.0); - // MAX - Assert.assertTrue(bulkRequestLatencies.get(2).getValue() > 0.0); - final List documentsSuccessMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); - MatcherAssert.assertThat(documentsSuccessMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentsSuccessMeasurements.get(0).getValue(), closeTo(2.0, 0)); - final List documentsSuccessFirstAttemptMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(BulkRetryStrategy.DOCUMENTS_SUCCESS_FIRST_ATTEMPT).toString()); - MatcherAssert.assertThat(documentsSuccessFirstAttemptMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentsSuccessFirstAttemptMeasurements.get(0).getValue(), closeTo(2.0, 0)); - final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); - MatcherAssert.assertThat(documentErrorsMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentErrorsMeasurements.get(0).getValue(), closeTo(0.0, 0)); - - /** - * Metrics: Bulk Request Size in Bytes - */ - final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2058.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2058.0, 0)); - } - - @Test - public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException { - // TODO: write test case - final String testDoc1 = readDocFromFile("raw-span-error.json"); - final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1); - final ObjectMapper mapper = new ObjectMapper(); - @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc2, Map.class); - - final List> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - // generate temporary directory for dlq file - final File tempDirectory = Files.createTempDirectory("").toFile(); - // add dlq file path into setting - final String expDLQFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt"; - pluginSetting.getSettings().put(RetryConfiguration.DLQ_FILE, expDLQFile); - - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - sink.shutdown(); - - final StringBuilder dlqContent = new StringBuilder(); - Files.lines(Paths.get(expDLQFile)).forEach(dlqContent::append); - final String nonPrettyJsonString = mapper.writeValueAsString(mapper.readValue(testDoc1, JsonNode.class)); - MatcherAssert.assertThat(dlqContent.toString(), containsString(nonPrettyJsonString)); - final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); - final List> retSources = getSearchResponseDocSources(expIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources.get(0), equalTo(expData)); - - // clean up temporary directory - FileUtils.deleteQuietly(tempDirectory); - - // verify metrics - final List documentsSuccessMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); - MatcherAssert.assertThat(documentsSuccessMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentsSuccessMeasurements.get(0).getValue(), closeTo(1.0, 0)); - final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); - MatcherAssert.assertThat(documentErrorsMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentErrorsMeasurements.get(0).getValue(), closeTo(1.0, 0)); - - /** - * Metrics: Bulk Request Size in Bytes - */ - final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2072.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2072.0, 0)); - } + @BeforeEach + public void metricsInit() throws IOException { + MetricsTestUtil.initMetrics(); - @Test - public void testInstantiateSinkServiceMapDefault() throws IOException { - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); - final Request request = new Request(HttpMethod.HEAD, indexAlias); - final Response response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); - final Map mappings = getIndexMappings(indexAlias); - MatcherAssert.assertThat(mappings, notNullValue()); - MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); - sink.shutdown(); - - if (isOSBundle()) { - // Check managed index - MatcherAssert.assertThat(getIndexPolicyId(indexAlias), nullValue()); + client = createOpenSearchClient(); } - } - - @Test - public void testOutputServiceMapDefault() throws IOException, InterruptedException { - final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE); - final ObjectMapper mapper = new ObjectMapper(); - @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc, Map.class); - - final List> testRecords = Collections.singletonList(jsonStringToRecord(testDoc)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); - final List> retSources = getSearchResponseDocSources(expIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources.get(0), equalTo(expData)); - MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData.get("hashId")), equalTo(Integer.valueOf(1))); - sink.shutdown(); - - // verify metrics - final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); - // COUNT - MatcherAssert.assertThat(bulkRequestLatencies.get(0).getValue(), closeTo(1.0, 0)); - - /** - * Metrics: Bulk Request Size in Bytes - */ - final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(265.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(265.0, 0)); - - // Check restart for index already exists - sink = createObjectUnderTest(pluginSetting, true); - sink.shutdown(); - } - @Test - public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException { - final String testIndexAlias = "test-alias"; - final String testTemplateFile = Objects.requireNonNull( - getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - final Request request = new Request(HttpMethod.HEAD, testIndexAlias); - final Response response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); - sink.shutdown(); - - // Check restart for index already exists - sink = createObjectUnderTest(pluginSetting, true); - sink.shutdown(); - } - - @Test - public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException { - final String indexAlias = "sink-custom-index-ism-test-alias"; - final String testTemplateFile = Objects.requireNonNull( - getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); - final Map metadata = initializeConfigurationMetadata(null, indexAlias, testTemplateFile); - metadata.put(IndexConfiguration.ISM_POLICY_FILE, TEST_CUSTOM_INDEX_POLICY_FILE); - final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - Request request = new Request(HttpMethod.HEAD, indexAlias); - Response response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); - final String index = String.format("%s-000001", indexAlias); - final Map mappings = getIndexMappings(index); - MatcherAssert.assertThat(mappings, notNullValue()); - MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); - sink.shutdown(); - - final String expectedIndexPolicyName = indexAlias + "-policy"; - if (isOSBundle()) { - // Check managed index - await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { - MatcherAssert.assertThat(getIndexPolicyId(index), equalTo(expectedIndexPolicyName)); } - ); - } - - // roll over initial index - request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias)); - request.setJsonEntity("{ \"conditions\" : { } }\n"); - response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); - - // Instantiate sink again - sink = createObjectUnderTest(pluginSetting, true); - // Make sure no new write index *-000001 is created under alias - final String rolloverIndexName = String.format("%s-000002", indexAlias); - request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); - response = client.performRequest(request); - MatcherAssert.assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true)); - sink.shutdown(); - - if (isOSBundle()) { - // Check managed index - MatcherAssert.assertThat(getIndexPolicyId(rolloverIndexName), equalTo(expectedIndexPolicyName)); + @AfterEach + public void cleanOpenSearch() throws Exception { + wipeAllOpenSearchIndices(); + wipeAllTemplates(); + waitForClusterStateUpdatesToFinish(); } - } - - @Test - public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates() throws IOException { - final String testIndexAlias = "test-alias"; - final String expectedIndexTemplateName = testIndexAlias + "-index-template"; - final String testTemplateFileV1 = getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE).getFile(); - final String testTemplateFileV2 = getClass().getClassLoader().getResource(TEST_TEMPLATE_V2_FILE).getFile(); - // Create sink with template version 1 - PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFileV1); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + @Test + public void testInstantiateSinkRawSpanDefault() throws IOException { + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); + Request request = new Request(HttpMethod.HEAD, indexAlias); + Response response = client.performRequest(request); + MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + final String index = String.format("%s-000001", indexAlias); + final Map mappings = getIndexMappings(index); + MatcherAssert.assertThat(mappings, notNullValue()); + MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); + sink.shutdown(); + + if (isOSBundle()) { + // Check managed index + await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + MatcherAssert.assertThat(getIndexPolicyId(index), equalTo(IndexConstants.RAW_ISM_POLICY)); + } + ); + } + + // roll over initial index + request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias)); + request.setJsonEntity("{ \"conditions\" : { } }\n"); + response = client.performRequest(request); + MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + // Instantiate sink again + sink = createObjectUnderTest(pluginSetting, true); + // Make sure no new write index *-000001 is created under alias + final String rolloverIndexName = String.format("%s-000002", indexAlias); + request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); + response = client.performRequest(request); + MatcherAssert.assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true)); + sink.shutdown(); + + if (isOSBundle()) { + // Check managed index + MatcherAssert.assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.RAW_ISM_POLICY)); + } + } - Request getTemplateRequest = new Request(HttpMethod.GET, "/_template/" + expectedIndexTemplateName); - Response getTemplateResponse = client.performRequest(getTemplateRequest); - MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + @Test + public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws IOException { + final String reservedIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); + final Request request = new Request(HttpMethod.PUT, reservedIndexAlias); + client.performRequest(request); + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); + Assert.assertThrows(String.format(AbstractIndexManager.INDEX_ALIAS_USED_AS_INDEX_ERROR, reservedIndexAlias), + RuntimeException.class, () -> sink.doInitialize()); + } - String responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); - @SuppressWarnings("unchecked") final Integer firstResponseVersion = - (Integer) ((Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get(expectedIndexTemplateName)).get("version"); + @Test + public void testOutputRawSpanDefault() throws IOException, InterruptedException { + final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1); + final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2); + final ObjectMapper mapper = new ObjectMapper(); + @SuppressWarnings("unchecked") final Map expData1 = mapper.readValue(testDoc1, Map.class); + @SuppressWarnings("unchecked") final Map expData2 = mapper.readValue(testDoc2, Map.class); + + final List> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2)); + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + + final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); + final List> retSources = getSearchResponseDocSources(expIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(2)); + MatcherAssert.assertThat(retSources, hasItems(expData1, expData2)); + MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData1.get("spanId")), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // Verify metrics + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + MatcherAssert.assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + // TOTAL_TIME + Assert.assertTrue(bulkRequestLatencies.get(1).getValue() > 0.0); + // MAX + Assert.assertTrue(bulkRequestLatencies.get(2).getValue() > 0.0); + final List documentsSuccessMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + MatcherAssert.assertThat(documentsSuccessMeasurements.size(), equalTo(1)); + MatcherAssert.assertThat(documentsSuccessMeasurements.get(0).getValue(), closeTo(2.0, 0)); + final List documentsSuccessFirstAttemptMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS_FIRST_ATTEMPT).toString()); + MatcherAssert.assertThat(documentsSuccessFirstAttemptMeasurements.size(), equalTo(1)); + MatcherAssert.assertThat(documentsSuccessFirstAttemptMeasurements.get(0).getValue(), closeTo(2.0, 0)); + final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + MatcherAssert.assertThat(documentErrorsMeasurements.size(), equalTo(1)); + MatcherAssert.assertThat(documentErrorsMeasurements.get(0).getValue(), closeTo(0.0, 0)); + + /** + * Metrics: Bulk Request Size in Bytes + */ + final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2058.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2058.0, 0)); + } - MatcherAssert.assertThat(firstResponseVersion, equalTo(Integer.valueOf(1))); - sink.shutdown(); + @Test + public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException { + // TODO: write test case + final String testDoc1 = readDocFromFile("raw-span-error.json"); + final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1); + final ObjectMapper mapper = new ObjectMapper(); + @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc2, Map.class); + + final List> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2)); + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + // generate temporary directory for dlq file + final File tempDirectory = Files.createTempDirectory("").toFile(); + // add dlq file path into setting + final String expDLQFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt"; + pluginSetting.getSettings().put(RetryConfiguration.DLQ_FILE, expDLQFile); + + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + sink.shutdown(); + + final StringBuilder dlqContent = new StringBuilder(); + Files.lines(Paths.get(expDLQFile)).forEach(dlqContent::append); + final String nonPrettyJsonString = mapper.writeValueAsString(mapper.readValue(testDoc1, JsonNode.class)); + MatcherAssert.assertThat(dlqContent.toString(), containsString(nonPrettyJsonString)); + final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); + final List> retSources = getSearchResponseDocSources(expIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(retSources.get(0), equalTo(expData)); + + // clean up temporary directory + FileUtils.deleteQuietly(tempDirectory); + + // verify metrics + final List documentsSuccessMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + MatcherAssert.assertThat(documentsSuccessMeasurements.size(), equalTo(1)); + MatcherAssert.assertThat(documentsSuccessMeasurements.get(0).getValue(), closeTo(1.0, 0)); + final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); + MatcherAssert.assertThat(documentErrorsMeasurements.size(), equalTo(1)); + MatcherAssert.assertThat(documentErrorsMeasurements.get(0).getValue(), closeTo(1.0, 0)); + + /** + * Metrics: Bulk Request Size in Bytes + */ + final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2072.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2072.0, 0)); - // Create sink with template version 2 - pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFileV2); - sink = createObjectUnderTest(pluginSetting, true); + } - getTemplateRequest = new Request(HttpMethod.GET, "/_template/" + expectedIndexTemplateName); - getTemplateResponse = client.performRequest(getTemplateRequest); - MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + @Test + public void testInstantiateSinkServiceMapDefault() throws IOException { + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); + final Request request = new Request(HttpMethod.HEAD, indexAlias); + final Response response = client.performRequest(request); + MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + final Map mappings = getIndexMappings(indexAlias); + MatcherAssert.assertThat(mappings, notNullValue()); + MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); + sink.shutdown(); + + if (isOSBundle()) { + // Check managed index + MatcherAssert.assertThat(getIndexPolicyId(indexAlias), nullValue()); + } + } - responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); - @SuppressWarnings("unchecked") final Integer secondResponseVersion = - (Integer) ((Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get(expectedIndexTemplateName)).get("version"); + @Test + public void testOutputServiceMapDefault() throws IOException, InterruptedException { + final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE); + final ObjectMapper mapper = new ObjectMapper(); + @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc, Map.class); + + final List> testRecords = Collections.singletonList(jsonStringToRecord(testDoc)); + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); + final List> retSources = getSearchResponseDocSources(expIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(retSources.get(0), equalTo(expData)); + MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData.get("hashId")), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + MatcherAssert.assertThat(bulkRequestLatencies.get(0).getValue(), closeTo(1.0, 0)); + + /** + * Metrics: Bulk Request Size in Bytes + */ + final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(265.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(265.0, 0)); + + // Check restart for index already exists + sink = createObjectUnderTest(pluginSetting, true); + sink.shutdown(); + } - MatcherAssert.assertThat(secondResponseVersion, equalTo(Integer.valueOf(2))); - sink.shutdown(); + @Test + public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException { + final String testIndexAlias = "test-alias"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final Request request = new Request(HttpMethod.HEAD, testIndexAlias); + final Response response = client.performRequest(request); + MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + sink.shutdown(); + + // Check restart for index already exists + sink = createObjectUnderTest(pluginSetting, true); + sink.shutdown(); + } - // Create sink with template version 1 again - pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFileV1); - sink = createObjectUnderTest(pluginSetting, true); + @Test + public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException { + final String indexAlias = "sink-custom-index-ism-test-alias"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + final Map metadata = initializeConfigurationMetadata(null, indexAlias, testTemplateFile); + metadata.put(IndexConfiguration.ISM_POLICY_FILE, TEST_CUSTOM_INDEX_POLICY_FILE); + final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + Request request = new Request(HttpMethod.HEAD, indexAlias); + Response response = client.performRequest(request); + MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + final String index = String.format("%s-000001", indexAlias); + final Map mappings = getIndexMappings(index); + MatcherAssert.assertThat(mappings, notNullValue()); + MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); + sink.shutdown(); + + final String expectedIndexPolicyName = indexAlias + "-policy"; + if (isOSBundle()) { + // Check managed index + await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + MatcherAssert.assertThat(getIndexPolicyId(index), equalTo(expectedIndexPolicyName)); + } + ); + } + + // roll over initial index + request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias)); + request.setJsonEntity("{ \"conditions\" : { } }\n"); + response = client.performRequest(request); + MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + // Instantiate sink again + sink = createObjectUnderTest(pluginSetting, true); + // Make sure no new write index *-000001 is created under alias + final String rolloverIndexName = String.format("%s-000002", indexAlias); + request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); + response = client.performRequest(request); + MatcherAssert.assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true)); + sink.shutdown(); + + if (isOSBundle()) { + // Check managed index + MatcherAssert.assertThat(getIndexPolicyId(rolloverIndexName), equalTo(expectedIndexPolicyName)); + } + } - getTemplateRequest = new Request(HttpMethod.GET, "/_template/" + expectedIndexTemplateName); - getTemplateResponse = client.performRequest(getTemplateRequest); - MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + @ParameterizedTest + @ArgumentsSource(CreateWithTemplatesArgumentsProvider.class) + public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( + final String templateType, + final String templatePath, + final String v1File, + final String v2File, + final BiFunction, String, Integer> extractVersionFunction) throws IOException { + final String testIndexAlias = "test-alias"; + final String expectedIndexTemplateName = testIndexAlias + "-index-template"; + final String testTemplateFileV1 = getClass().getClassLoader().getResource(v1File).getFile(); + final String testTemplateFileV2 = getClass().getClassLoader().getResource(v2File).getFile(); + + // Create sink with template version 1 + PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV1); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + + Request getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName); + Response getTemplateResponse = client.performRequest(getTemplateRequest); + MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + String responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); + @SuppressWarnings("unchecked") final Integer firstResponseVersion = + extractVersionFunction.apply(createContentParser(XContentType.JSON.xContent(), + responseBody).map(), expectedIndexTemplateName); + + MatcherAssert.assertThat(firstResponseVersion, equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // Create sink with template version 2 + pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV2); + sink = createObjectUnderTest(pluginSetting, true); + + getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName); + getTemplateResponse = client.performRequest(getTemplateRequest); + MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); + @SuppressWarnings("unchecked") final Integer secondResponseVersion = + extractVersionFunction.apply(createContentParser(XContentType.JSON.xContent(), + responseBody).map(), expectedIndexTemplateName); + + MatcherAssert.assertThat(secondResponseVersion, equalTo(Integer.valueOf(2))); + sink.shutdown(); + + // Create sink with template version 1 again + pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV1); + sink = createObjectUnderTest(pluginSetting, true); + + getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName); + getTemplateResponse = client.performRequest(getTemplateRequest); + MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); + @SuppressWarnings("unchecked") final Integer thirdResponseVersion = + extractVersionFunction.apply(createContentParser(XContentType.JSON.xContent(), + responseBody).map(), expectedIndexTemplateName); + + // Assert version 2 was not overwritten by version 1 + MatcherAssert.assertThat(thirdResponseVersion, equalTo(Integer.valueOf(2))); + sink.shutdown(); - responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); - @SuppressWarnings("unchecked") final Integer thirdResponseVersion = - (Integer) ((Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get(expectedIndexTemplateName)).get("version"); + } - // Assert version 2 was not overwritten by version 1 - MatcherAssert.assertThat(thirdResponseVersion, equalTo(Integer.valueOf(2))); - sink.shutdown(); + static class CreateWithTemplatesArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + final List arguments = new ArrayList<>(); + arguments.add( + arguments("v1", "_template", + TEST_TEMPLATE_V1_FILE, TEST_TEMPLATE_V2_FILE, + (BiFunction, String, Integer>) (map, templateName) -> + (Integer) ((Map) map.get(templateName)).get("version") + ) + ); + + if(OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.OPENDISTRO_1_9) >= 0) { + arguments.add( + arguments("index-template", "_index_template", + TEST_INDEX_TEMPLATE_V1_FILE, TEST_INDEX_TEMPLATE_V2_FILE, + (BiFunction, String, Integer>) (map, unused) -> + (Integer) ((List>>) map.get("index_templates")).get(0).get("index_template").get("version") + ) + ); + } + return arguments.stream(); + } + } - } + @Test + public void testOutputCustomIndex() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + final String testIdField = "someId"; + final String testId = "foo"; + final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + } - @Test - public void testOutputCustomIndex() throws IOException, InterruptedException { - final String testIndexAlias = "test-alias"; - final String testTemplateFile = Objects.requireNonNull( - getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); - final String testIdField = "someId"; - final String testId = "foo"; - final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); - pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); - sink.shutdown(); - - // verify metrics - final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); - // COUNT - Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); - } + @Test + public void testBulkActionCreate() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + final String testIdField = "someId"; + final String testId = "foo"; + final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + pluginSetting.getSettings().put(IndexConfiguration.ACTION, BulkAction.CREATE.toString()); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + } - @Test - public void testBulkActionCreate() throws IOException, InterruptedException { - final String testIndexAlias = "test-alias"; - final String testTemplateFile = Objects.requireNonNull( - getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); - final String testIdField = "someId"; - final String testId = "foo"; - final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); - pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); - pluginSetting.getSettings().put(IndexConfiguration.ACTION, BulkAction.CREATE.toString()); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); - sink.shutdown(); - - // verify metrics - final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); - // COUNT - Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); - } + @Test + public void testEventOutput() throws IOException, InterruptedException { - @Test - public void testEventOutput() throws IOException, InterruptedException { + final Event testEvent = JacksonEvent.builder() + .withData("{\"log\": \"foobar\"}") + .withEventType("event") + .build(); + ((JacksonEvent) testEvent).setEventHandle(eventHandle); - final Event testEvent = JacksonEvent.builder() - .withData("{\"log\": \"foobar\"}") - .withEventType("event") - .build(); - ((JacksonEvent)testEvent).setEventHandle(eventHandle); + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); + final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); + final List> retSources = getSearchResponseDocSources(expIndexAlias); + final Map expectedContent = new HashMap<>(); + expectedContent.put("log", "foobar"); - final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); - final List> retSources = getSearchResponseDocSources(expIndexAlias); - final Map expectedContent = new HashMap<>(); - expectedContent.put("log", "foobar"); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(retSources.containsAll(Arrays.asList(expectedContent)), equalTo(true)); + MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "log", "foobar"), equalTo(Integer.valueOf(1))); + sink.shutdown(); + } - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources.containsAll(Arrays.asList(expectedContent)), equalTo(true)); - MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "log", "foobar"), equalTo(Integer.valueOf(1))); - sink.shutdown(); - } + @ParameterizedTest + @ValueSource(strings = {"info/ids/id", "id"}) + public void testOpenSearchDocumentId(final String testDocumentIdField) throws IOException, InterruptedException { + final String expectedId = UUID.randomUUID().toString(); + final String testIndexAlias = "test_index"; + final Event testEvent = JacksonEvent.builder() + .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) + .withEventType("event") + .build(); + ((JacksonEvent) testEvent).setEventHandle(eventHandle); + testEvent.put(testDocumentIdField, expectedId); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testDocumentIdField); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + + final List docIds = getSearchResponseDocIds(testIndexAlias); + for (String docId : docIds) { + MatcherAssert.assertThat(docId, equalTo(expectedId)); + } + sink.shutdown(); + } - @ParameterizedTest - @ValueSource(strings = {"info/ids/id", "id"}) - public void testOpenSearchDocumentId(final String testDocumentIdField) throws IOException, InterruptedException { - final String expectedId = UUID.randomUUID().toString(); - final String testIndexAlias = "test_index"; - final Event testEvent = JacksonEvent.builder() - .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) - .withEventType("event") - .build(); - ((JacksonEvent)testEvent).setEventHandle(eventHandle); - testEvent.put(testDocumentIdField, expectedId); - - final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testDocumentIdField); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - - final List docIds = getSearchResponseDocIds(testIndexAlias); - for (String docId: docIds) { - MatcherAssert.assertThat(docId, equalTo(expectedId)); - } - sink.shutdown(); - } + @ParameterizedTest + @ValueSource(strings = {"info/ids/rid", "rid"}) + public void testOpenSearchRoutingField(final String testRoutingField) throws IOException, InterruptedException { + final String expectedRoutingField = UUID.randomUUID().toString(); + final String testIndexAlias = "test_index"; + final Event testEvent = JacksonEvent.builder() + .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) + .withEventType("event") + .build(); + ((JacksonEvent) testEvent).setEventHandle(eventHandle); + testEvent.put(testRoutingField, expectedRoutingField); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, testRoutingField); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + + final List routingFields = getSearchResponseRoutingFields(testIndexAlias); + for (String routingField : routingFields) { + MatcherAssert.assertThat(routingField, equalTo(expectedRoutingField)); + } + sink.shutdown(); + } - @ParameterizedTest - @ValueSource(strings = {"info/ids/rid", "rid"}) - public void testOpenSearchRoutingField(final String testRoutingField) throws IOException, InterruptedException { - final String expectedRoutingField = UUID.randomUUID().toString(); - final String testIndexAlias = "test_index"; - final Event testEvent = JacksonEvent.builder() - .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) - .withEventType("event") - .build(); - ((JacksonEvent)testEvent).setEventHandle(eventHandle); - testEvent.put(testRoutingField, expectedRoutingField); - - final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, testRoutingField); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - - final List routingFields = getSearchResponseRoutingFields(testIndexAlias); - for (String routingField: routingFields) { - MatcherAssert.assertThat(routingField, equalTo(expectedRoutingField)); - } - sink.shutdown(); - } + @ParameterizedTest + @ValueSource(strings = {"info/ids/id", "id"}) + public void testOpenSearchDynamicIndex(final String testIndex) throws IOException, InterruptedException { + final String dynamicTestIndexAlias = "test-${" + testIndex + "}-index"; + final String testIndexName = "idx1"; + final String testIndexAlias = "test-" + testIndexName + "-index"; + final String data = UUID.randomUUID().toString(); + final Map dataMap = Map.of("data", data); + final Event testEvent = JacksonEvent.builder() + .withData(dataMap) + .withEventType("event") + .build(); + ((JacksonEvent) testEvent).setEventHandle(eventHandle); + testEvent.put(testIndex, testIndexName); + + Map expectedMap = testEvent.toMap(); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(null, dynamicTestIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(retSources, hasItem(expectedMap)); + sink.shutdown(); + } - @ParameterizedTest - @ValueSource(strings = {"info/ids/id", "id"}) - public void testOpenSearchDynamicIndex(final String testIndex) throws IOException, InterruptedException { - final String dynamicTestIndexAlias = "test-${"+testIndex+"}-index"; - final String testIndexName = "idx1"; - final String testIndexAlias = "test-"+testIndexName+"-index"; - final String data = UUID.randomUUID().toString(); - final Map dataMap = Map.of("data", data); - final Event testEvent = JacksonEvent.builder() - .withData(dataMap) - .withEventType("event") - .build(); - ((JacksonEvent)testEvent).setEventHandle(eventHandle); - testEvent.put(testIndex, testIndexName); - - Map expectedMap = testEvent.toMap(); - - final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - - final PluginSetting pluginSetting = generatePluginSetting(null, dynamicTestIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources, hasItem(expectedMap)); - sink.shutdown(); - } + @ParameterizedTest + @CsvSource({ + "info/ids/id, yyyy-MM", + "id, yyyy-MM-dd", + }) + public void testOpenSearchDynamicIndexWithDate(final String testIndex, final String testDatePattern) throws IOException, InterruptedException { + final String dynamicTestIndexAlias = "test-${" + testIndex + "}-index-%{" + testDatePattern + "}"; + final String testIndexName = "idx1"; + SimpleDateFormat formatter = new SimpleDateFormat(testDatePattern); + Date date = new Date(); + String expectedDate = formatter.format(date); + final String expectedIndexAlias = "test-" + testIndexName + "-index-" + expectedDate; + final String data = UUID.randomUUID().toString(); + final Map dataMap = Map.of("data", data); + final Event testEvent = JacksonEvent.builder() + .withData(dataMap) + .withEventType("event") + .build(); + ((JacksonEvent) testEvent).setEventHandle(eventHandle); + testEvent.put(testIndex, testIndexName); + + Map expectedMap = testEvent.toMap(); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(null, dynamicTestIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(expectedIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(retSources, hasItem(expectedMap)); + sink.shutdown(); + } - @ParameterizedTest - @CsvSource({ - "info/ids/id, yyyy-MM", - "id, yyyy-MM-dd", - }) - public void testOpenSearchDynamicIndexWithDate(final String testIndex, final String testDatePattern) throws IOException, InterruptedException { - final String dynamicTestIndexAlias = "test-${"+testIndex+"}-index-%{"+testDatePattern+"}"; - final String testIndexName = "idx1"; - SimpleDateFormat formatter = new SimpleDateFormat(testDatePattern); - Date date = new Date(); - String expectedDate = formatter.format(date); - final String expectedIndexAlias = "test-"+testIndexName+"-index-"+expectedDate; - final String data = UUID.randomUUID().toString(); - final Map dataMap = Map.of("data", data); - final Event testEvent = JacksonEvent.builder() - .withData(dataMap) - .withEventType("event") - .build(); - ((JacksonEvent)testEvent).setEventHandle(eventHandle); - testEvent.put(testIndex, testIndexName); - - Map expectedMap = testEvent.toMap(); - - final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - - final PluginSetting pluginSetting = generatePluginSetting(null, dynamicTestIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - final List> retSources = getSearchResponseDocSources(expectedIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources, hasItem(expectedMap)); - sink.shutdown(); - } + @ParameterizedTest + @ValueSource(strings = {"yyyy-MM", "yyyy-MM-dd", "dd-MM-yyyy"}) + public void testOpenSearchIndexWithDate(final String testDatePattern) throws IOException, InterruptedException { + SimpleDateFormat formatter = new SimpleDateFormat(testDatePattern); + Date date = new Date(); + String expectedIndexName = "test-index-" + formatter.format(date); + final String testIndexName = "idx1"; + final String testIndexAlias = "test-index-%{" + testDatePattern + "}"; + final String data = UUID.randomUUID().toString(); + final Map dataMap = Map.of("data", data); + final Event testEvent = JacksonEvent.builder() + .withData(dataMap) + .withEventType("event") + .build(); + ((JacksonEvent) testEvent).setEventHandle(eventHandle); + + Map expectedMap = testEvent.toMap(); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(expectedIndexName); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(retSources, hasItem(expectedMap)); + sink.shutdown(); + } - @ParameterizedTest - @ValueSource(strings = {"yyyy-MM", "yyyy-MM-dd", "dd-MM-yyyy"}) - public void testOpenSearchIndexWithDate(final String testDatePattern) throws IOException, InterruptedException { - SimpleDateFormat formatter = new SimpleDateFormat(testDatePattern); - Date date = new Date(); - String expectedIndexName = "test-index-"+formatter.format(date); - final String testIndexName = "idx1"; - final String testIndexAlias = "test-index-%{"+testDatePattern+"}"; - final String data = UUID.randomUUID().toString(); - final Map dataMap = Map.of("data", data); - final Event testEvent = JacksonEvent.builder() - .withData(dataMap) - .withEventType("event") - .build(); - ((JacksonEvent)testEvent).setEventHandle(eventHandle); - - Map expectedMap = testEvent.toMap(); - - final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - sink.output(testRecords); - final List> retSources = getSearchResponseDocSources(expectedIndexName); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources, hasItem(expectedMap)); - sink.shutdown(); - } + @Test + public void testOpenSearchIndexWithInvalidDate() throws IOException, InterruptedException { + String invalidDatePattern = "yyyy-MM-dd HH:ss:mm"; + final String invalidTestIndexAlias = "test-index-%{" + invalidDatePattern + "}"; + final PluginSetting pluginSetting = generatePluginSetting(null, invalidTestIndexAlias, null); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); + Assert.assertThrows(IllegalArgumentException.class, () -> sink.doInitialize()); + } - @Test - public void testOpenSearchIndexWithInvalidDate() throws IOException, InterruptedException { - String invalidDatePattern = "yyyy-MM-dd HH:ss:mm"; - final String invalidTestIndexAlias = "test-index-%{"+invalidDatePattern+"}"; - final PluginSetting pluginSetting = generatePluginSetting(null, invalidTestIndexAlias, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); - Assert.assertThrows(IllegalArgumentException.class, () -> sink.doInitialize()); - } + @Test + public void testOpenSearchIndexWithInvalidChars() throws IOException, InterruptedException { + final String invalidTestIndexAlias = "test#-index"; + final PluginSetting pluginSetting = generatePluginSetting(null, invalidTestIndexAlias, null); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); + Assert.assertThrows(RuntimeException.class, () -> sink.doInitialize()); + } - @Test - public void testOpenSearchIndexWithInvalidChars() throws IOException, InterruptedException { - final String invalidTestIndexAlias = "test#-index"; - final PluginSetting pluginSetting = generatePluginSetting(null, invalidTestIndexAlias, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); - Assert.assertThrows(RuntimeException.class, () -> sink.doInitialize()); - } + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testOutputManagementDisabled() throws IOException, InterruptedException { + final String testIndexAlias = "test-" + UUID.randomUUID(); + final String roleName = UUID.randomUUID().toString(); + final String username = UUID.randomUUID().toString(); + final String password = UUID.randomUUID().toString(); + final OpenSearchSecurityAccessor securityAccessor = new OpenSearchSecurityAccessor(client); + securityAccessor.createBulkWritingRole(roleName, testIndexAlias + "*"); + securityAccessor.createUser(username, password, roleName); + + final String testIdField = "someId"; + final String testId = "foo"; + + final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + + final Map metadata = initializeConfigurationMetadata(null, testIndexAlias, null); + metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.MANAGEMENT_DISABLED.getValue()); + metadata.put(ConnectionConfiguration.USERNAME, username); + metadata.put(ConnectionConfiguration.PASSWORD, password); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource("management-disabled-index-template.json")).getFile(); + createV1IndexTemplate(testIndexAlias, testIndexAlias + "*", testTemplateFile); + createIndex(testIndexAlias); + + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + } - @Test - @Timeout(value = 1, unit = TimeUnit.MINUTES) - public void testOutputManagementDisabled() throws IOException, InterruptedException { - final String testIndexAlias = "test-" + UUID.randomUUID(); - final String roleName = UUID.randomUUID().toString(); - final String username = UUID.randomUUID().toString(); - final String password = UUID.randomUUID().toString(); - final OpenSearchSecurityAccessor securityAccessor = new OpenSearchSecurityAccessor(client); - securityAccessor.createBulkWritingRole(roleName, testIndexAlias + "*"); - securityAccessor.createUser(username, password, roleName); - - final String testIdField = "someId"; - final String testId = "foo"; - - final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - - final Map metadata = initializeConfigurationMetadata(null, testIndexAlias, null); - metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.MANAGEMENT_DISABLED.getValue()); - metadata.put(ConnectionConfiguration.USERNAME, username); - metadata.put(ConnectionConfiguration.PASSWORD, password); - metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); - final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); - - final String testTemplateFile = Objects.requireNonNull( - getClass().getClassLoader().getResource("management-disabled-index-template.json")).getFile(); - createIndexTemplate(testIndexAlias, testIndexAlias + "*", testTemplateFile); - createIndex(testIndexAlias); - - sink.output(testRecords); - final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); - sink.shutdown(); - - // verify metrics - final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( - new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) - .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); - // COUNT - Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); - } + private Map initializeConfigurationMetadata(final String indexType, final String indexAlias, + final String templateFilePath) { + final Map metadata = new HashMap<>(); + metadata.put(IndexConfiguration.INDEX_TYPE, indexType); + metadata.put(ConnectionConfiguration.HOSTS, getHosts()); + metadata.put(IndexConfiguration.INDEX_ALIAS, indexAlias); + metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath); + final String user = System.getProperty("tests.opensearch.user"); + final String password = System.getProperty("tests.opensearch.password"); + if (user != null) { + metadata.put(ConnectionConfiguration.USERNAME, user); + metadata.put(ConnectionConfiguration.PASSWORD, password); + } + return metadata; + } - private Map initializeConfigurationMetadata (final String indexType, final String indexAlias, - final String templateFilePath) { - final Map metadata = new HashMap<>(); - metadata.put(IndexConfiguration.INDEX_TYPE, indexType); - metadata.put(ConnectionConfiguration.HOSTS, getHosts()); - metadata.put(IndexConfiguration.INDEX_ALIAS, indexAlias); - metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath); - final String user = System.getProperty("tests.opensearch.user"); - final String password = System.getProperty("tests.opensearch.password"); - if (user != null) { - metadata.put(ConnectionConfiguration.USERNAME, user); - metadata.put(ConnectionConfiguration.PASSWORD, password); - } - return metadata; - } + private PluginSetting generatePluginSetting(final String indexType, final String indexAlias, + final String templateFilePath) { + final Map metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath); + return generatePluginSettingByMetadata(metadata); + } - private PluginSetting generatePluginSetting(final String indexType, final String indexAlias, - final String templateFilePath) { - final Map metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath); - return generatePluginSettingByMetadata(metadata); - } + private PluginSetting generatePluginSetting(final String indexType, final String indexAlias, + final String templateType, + final String templateFilePath) { + final Map metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath); + metadata.put(IndexConfiguration.TEMPLATE_TYPE, templateType); + return generatePluginSettingByMetadata(metadata); + } - private PluginSetting generatePluginSettingByMetadata(final Map configurationMetadata) { - final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, configurationMetadata); - pluginSetting.setPipelineName(PIPELINE_NAME); - return pluginSetting; - } + private PluginSetting generatePluginSettingByMetadata(final Map configurationMetadata) { + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, configurationMetadata); + pluginSetting.setPipelineName(PIPELINE_NAME); + return pluginSetting; + } - private String generateCustomRecordJson(final String idField, final String documentId) throws IOException { - return Strings.toString( - XContentFactory.jsonBuilder() - .startObject() - .field(idField, documentId) - .endObject() - ); - } + private String generateCustomRecordJson(final String idField, final String documentId) throws IOException { + return Strings.toString( + XContentFactory.jsonBuilder() + .startObject() + .field(idField, documentId) + .endObject() + ); + } - private String readDocFromFile(final String filename) throws IOException { - final StringBuilder jsonBuilder = new StringBuilder(); - try (final InputStream inputStream = Objects.requireNonNull( - getClass().getClassLoader().getResourceAsStream(filename))) { - final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - bufferedReader.lines().forEach(jsonBuilder::append); + private String readDocFromFile(final String filename) throws IOException { + final StringBuilder jsonBuilder = new StringBuilder(); + try (final InputStream inputStream = Objects.requireNonNull( + getClass().getClassLoader().getResourceAsStream(filename))) { + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + bufferedReader.lines().forEach(jsonBuilder::append); + } + return jsonBuilder.toString(); } - return jsonBuilder.toString(); - } - private Boolean checkIsWriteIndex(final String responseBody, final String aliasName, final String indexName) throws IOException { - @SuppressWarnings("unchecked") final Map indexBlob = (Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get(indexName); - @SuppressWarnings("unchecked") final Map aliasesBlob = (Map) indexBlob.get("aliases"); - @SuppressWarnings("unchecked") final Map aliasBlob = (Map) aliasesBlob.get(aliasName); - return (Boolean) aliasBlob.get("is_write_index"); - } + private Boolean checkIsWriteIndex(final String responseBody, final String aliasName, final String indexName) throws IOException { + @SuppressWarnings("unchecked") final Map indexBlob = (Map) createContentParser(XContentType.JSON.xContent(), + responseBody).map().get(indexName); + @SuppressWarnings("unchecked") final Map aliasesBlob = (Map) indexBlob.get("aliases"); + @SuppressWarnings("unchecked") final Map aliasBlob = (Map) aliasesBlob.get(aliasName); + return (Boolean) aliasBlob.get("is_write_index"); + } - private Integer getDocumentCount(final String index, final String field, final String value) throws IOException, InterruptedException { - final Request request = new Request(HttpMethod.GET, index + "/_count"); - if (field != null && value != null) { - final String jsonEntity = Strings.toString( - XContentFactory.jsonBuilder().startObject() - .startObject("query") - .startObject("match") - .field(field, value) - .endObject() - .endObject() - .endObject() - ); - request.setJsonEntity(jsonEntity); - } - final Response response = client.performRequest(request); - final String responseBody = EntityUtils.toString(response.getEntity()); - return (Integer) createContentParser(XContentType.JSON.xContent(), responseBody).map().get("count"); - } + private Integer getDocumentCount(final String index, final String field, final String value) throws IOException, InterruptedException { + final Request request = new Request(HttpMethod.GET, index + "/_count"); + if (field != null && value != null) { + final String jsonEntity = Strings.toString( + XContentFactory.jsonBuilder().startObject() + .startObject("query") + .startObject("match") + .field(field, value) + .endObject() + .endObject() + .endObject() + ); + request.setJsonEntity(jsonEntity); + } + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + return (Integer) createContentParser(XContentType.JSON.xContent(), responseBody).map().get("count"); + } - private List getSearchResponseDocIds(final String index) throws IOException { - final Request refresh = new Request(HttpMethod.POST, index + "/_refresh"); - client.performRequest(refresh); - final Request request = new Request(HttpMethod.GET, index + "/_search"); - final Response response = client.performRequest(request); - final String responseBody = EntityUtils.toString(response.getEntity()); - - @SuppressWarnings("unchecked") final List hits = - (List) ((Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get("hits")).get("hits"); - @SuppressWarnings("unchecked") final List ids = hits.stream() - .map(hit -> (String) ((Map) hit).get("_id")) - .collect(Collectors.toList()); - return ids; - } + private List getSearchResponseDocIds(final String index) throws IOException { + final Request refresh = new Request(HttpMethod.POST, index + "/_refresh"); + client.performRequest(refresh); + final Request request = new Request(HttpMethod.GET, index + "/_search"); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + + @SuppressWarnings("unchecked") final List hits = + (List) ((Map) createContentParser(XContentType.JSON.xContent(), + responseBody).map().get("hits")).get("hits"); + @SuppressWarnings("unchecked") final List ids = hits.stream() + .map(hit -> (String) ((Map) hit).get("_id")) + .collect(Collectors.toList()); + return ids; + } - private List getSearchResponseRoutingFields(final String index) throws IOException { - final Request refresh = new Request(HttpMethod.POST, index + "/_refresh"); - client.performRequest(refresh); - final Request request = new Request(HttpMethod.GET, index + "/_search"); - final Response response = client.performRequest(request); - final String responseBody = EntityUtils.toString(response.getEntity()); - - @SuppressWarnings("unchecked") final List hits = - (List) ((Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get("hits")).get("hits"); - @SuppressWarnings("unchecked") final List routingFields = hits.stream() - .map(hit -> (String) ((Map) hit).get("_routing")) - .collect(Collectors.toList()); - return routingFields; - } + private List getSearchResponseRoutingFields(final String index) throws IOException { + final Request refresh = new Request(HttpMethod.POST, index + "/_refresh"); + client.performRequest(refresh); + final Request request = new Request(HttpMethod.GET, index + "/_search"); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + + @SuppressWarnings("unchecked") final List hits = + (List) ((Map) createContentParser(XContentType.JSON.xContent(), + responseBody).map().get("hits")).get("hits"); + @SuppressWarnings("unchecked") final List routingFields = hits.stream() + .map(hit -> (String) ((Map) hit).get("_routing")) + .collect(Collectors.toList()); + return routingFields; + } - private List> getSearchResponseDocSources(final String index) throws IOException { - final Request refresh = new Request(HttpMethod.POST, index + "/_refresh"); - client.performRequest(refresh); - final Request request = new Request(HttpMethod.GET, index + "/_search"); - final Response response = client.performRequest(request); - final String responseBody = EntityUtils.toString(response.getEntity()); - - @SuppressWarnings("unchecked") final List hits = - (List) ((Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get("hits")).get("hits"); - @SuppressWarnings("unchecked") final List> sources = hits.stream() - .map(hit -> (Map) ((Map) hit).get("_source")) - .collect(Collectors.toList()); - return sources; - } + private List> getSearchResponseDocSources(final String index) throws IOException { + final Request refresh = new Request(HttpMethod.POST, index + "/_refresh"); + client.performRequest(refresh); + final Request request = new Request(HttpMethod.GET, index + "/_search"); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + + @SuppressWarnings("unchecked") final List hits = + (List) ((Map) createContentParser(XContentType.JSON.xContent(), + responseBody).map().get("hits")).get("hits"); + @SuppressWarnings("unchecked") final List> sources = hits.stream() + .map(hit -> (Map) ((Map) hit).get("_source")) + .collect(Collectors.toList()); + return sources; + } - private Map getIndexMappings(final String index) throws IOException { - final Request request = new Request(HttpMethod.GET, index + "/_mappings"); - final Response response = client.performRequest(request); - final String responseBody = EntityUtils.toString(response.getEntity()); + private Map getIndexMappings(final String index) throws IOException { + final Request request = new Request(HttpMethod.GET, index + "/_mappings"); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); - @SuppressWarnings("unchecked") final Map mappings = - (Map) ((Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get(index)).get("mappings"); - return mappings; - } + @SuppressWarnings("unchecked") final Map mappings = + (Map) ((Map) createContentParser(XContentType.JSON.xContent(), + responseBody).map().get(index)).get("mappings"); + return mappings; + } - private String getIndexPolicyId(final String index) throws IOException { - // TODO: replace with new _opensearch API - final Request request = new Request(HttpMethod.GET, "/_opendistro/_ism/explain/" + index); - final Response response = client.performRequest(request); - final String responseBody = EntityUtils.toString(response.getEntity()); + private String getIndexPolicyId(final String index) throws IOException { + // TODO: replace with new _opensearch API + final Request request = new Request(HttpMethod.GET, "/_opendistro/_ism/explain/" + index); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); - @SuppressWarnings("unchecked") final String policyId = (String) ((Map) createContentParser(XContentType.JSON.xContent(), - responseBody).map().get(index)).get("index.opendistro.index_state_management.policy_id"); - return policyId; - } + @SuppressWarnings("unchecked") final String policyId = (String) ((Map) createContentParser(XContentType.JSON.xContent(), + responseBody).map().get(index)).get("index.opendistro.index_state_management.policy_id"); + return policyId; + } - @SuppressWarnings("unchecked") - private void wipeAllOpenSearchIndices() throws IOException { - final Response response = client.performRequest(new Request("GET", "/*?expand_wildcards=all")); - - final String responseBody = EntityUtils.toString(response.getEntity()); - final Map indexContent = createContentParser(XContentType.JSON.xContent(), responseBody).map(); - - final Set indices = indexContent.keySet(); - - indices.stream() - .filter(Objects::nonNull) - .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro-"))) - .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro_"))) - .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch-"))) - .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch_"))) - .forEach(indexName -> { - try { - client.performRequest(new Request("DELETE", "/" + indexName)); - } catch (final IOException e) { - throw new RuntimeException(e); - } - }); - } + @SuppressWarnings("unchecked") + private void wipeAllOpenSearchIndices() throws IOException { + final Response response = client.performRequest(new Request("GET", "/*?expand_wildcards=all")); + + final String responseBody = EntityUtils.toString(response.getEntity()); + final Map indexContent = createContentParser(XContentType.JSON.xContent(), responseBody).map(); + + final Set indices = indexContent.keySet(); + + indices.stream() + .filter(Objects::nonNull) + .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro-"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro_"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch-"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch_"))) + .forEach(indexName -> { + try { + client.performRequest(new Request("DELETE", "/" + indexName)); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + } - private Record jsonStringToRecord(final String jsonString) { - final ObjectMapper objectMapper = new ObjectMapper(); - try { - Record record = new Record(JacksonEvent.builder() - .withEventType(EventType.TRACE.toString()) - .withData(objectMapper.readValue(jsonString, Map.class)).build()); - JacksonEvent event = (JacksonEvent)record.getData(); - event.setEventHandle(eventHandle); - return record; - } catch (final JsonProcessingException e) { - throw new RuntimeException(e); + private Record jsonStringToRecord(final String jsonString) { + final ObjectMapper objectMapper = new ObjectMapper(); + try { + Record record = new Record(JacksonEvent.builder() + .withEventType(EventType.TRACE.toString()) + .withData(objectMapper.readValue(jsonString, Map.class)).build()); + JacksonEvent event = (JacksonEvent) record.getData(); + event.setEventHandle(eventHandle); + return record; + } catch (final JsonProcessingException e) { + throw new RuntimeException(e); + } } - } - private void createIndex(final String indexName) throws IOException { - final Request request = new Request(HttpMethod.PUT, indexName); - final Response response = client.performRequest(request); - } + private void createIndex(final String indexName) throws IOException { + final Request request = new Request(HttpMethod.PUT, indexName); + final Response response = client.performRequest(request); + } - private void createIndexTemplate(final String templateName, final String indexPattern, final String fileName) throws IOException { - final ObjectMapper objectMapper = new ObjectMapper(); - final Map templateJson = objectMapper.readValue(new FileInputStream(fileName), Map.class); + private void createV1IndexTemplate(final String templateName, final String indexPattern, final String fileName) throws IOException { + final ObjectMapper objectMapper = new ObjectMapper(); + final Map templateJson = objectMapper.readValue(new FileInputStream(fileName), Map.class); - templateJson.put("index_patterns", indexPattern); + templateJson.put("index_patterns", indexPattern); - final Request request = new Request(HttpMethod.PUT, "_template/" + templateName); + final Request request = new Request(HttpMethod.PUT, "_template/" + templateName); - final String createTemplateJson = objectMapper.writeValueAsString(templateJson); - request.setJsonEntity(createTemplateJson); - final Response response = client.performRequest(request); - } + final String createTemplateJson = objectMapper.writeValueAsString(templateJson); + request.setJsonEntity(createTemplateJson); + final Response response = client.performRequest(request); + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 458107e701..3448203f8e 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -5,22 +5,6 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.metrics.MetricNames; -import org.opensearch.dataprepper.plugins.dlq.DlqProvider; -import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.failures.DlqObject; -import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; - -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.sink.AbstractSink; -import org.opensearch.dataprepper.model.sink.Sink; import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; @@ -33,6 +17,21 @@ import org.opensearch.client.opensearch.core.bulk.CreateOperation; import org.opensearch.client.opensearch.core.bulk.IndexOperation; import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.failures.DlqObject; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter; @@ -47,7 +46,6 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +58,8 @@ import java.util.List; import java.util.Optional; import java.util.StringJoiner; -import java.util.function.Supplier; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; @@ -166,7 +164,7 @@ private void doInitializeInternal() throws IOException { restHighLevelClient = openSearchSinkConfig.getConnectionConfiguration().createClient(awsCredentialsSupplier); openSearchClient = openSearchSinkConfig.getConnectionConfiguration().createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier); configuredIndexAlias = openSearchSinkConfig.getIndexConfiguration().getIndexAlias(); - final TemplateStrategy templateStrategy = TemplateType.V1.createTemplateStrategy(openSearchClient); + final TemplateStrategy templateStrategy = openSearchSinkConfig.getIndexConfiguration().getTemplateType().createTemplateStrategy(openSearchClient); indexManager = indexManagerFactory.getIndexManager(indexType, openSearchClient, restHighLevelClient, openSearchSinkConfig, templateStrategy, configuredIndexAlias); final String dlqFile = openSearchSinkConfig.getRetryConfiguration().getDlqFile(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategy.java new file mode 100644 index 0000000000..62cba5a035 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategy.java @@ -0,0 +1,156 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.json.stream.JsonParser; +import org.opensearch.client.json.JsonpDeserializer; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.ObjectBuilderDeserializer; +import org.opensearch.client.json.ObjectDeserializer; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse; +import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.get_index_template.IndexTemplateItem; +import org.opensearch.client.opensearch.indices.put_index_template.IndexTemplateMapping; +import org.opensearch.client.transport.endpoints.BooleanResponse; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * A {@link TemplateStrategy} for the OpenSearch index template. + */ +class ComposableIndexTemplateStrategy implements TemplateStrategy { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final OpenSearchClient openSearchClient; + + public ComposableIndexTemplateStrategy(final OpenSearchClient openSearchClient) { + this.openSearchClient = openSearchClient; + } + + @Override + public Optional getExistingTemplateVersion(final String templateName) throws IOException { + return getIndexTemplate(templateName) + .map(IndexTemplateItem::indexTemplate) + .map(indexTemplate -> indexTemplate.version()); + } + + @Override + public IndexTemplate createIndexTemplate(final Map templateMap) { + return new ComposableIndexTemplate(templateMap); + } + + @Override + public void createTemplate(final IndexTemplate indexTemplate) throws IOException { + if(!(indexTemplate instanceof ComposableIndexTemplate)) { + throw new IllegalArgumentException("Unexpected indexTemplate provided to createTemplate."); + } + + final ComposableIndexTemplate composableIndexTemplate = (ComposableIndexTemplate) indexTemplate; + + final Map templateMapping = composableIndexTemplate.indexTemplateMap; + + final String indexTemplateString = OBJECT_MAPPER.writeValueAsString(templateMapping); + + final ByteArrayInputStream byteIn = new ByteArrayInputStream( + indexTemplateString.getBytes(StandardCharsets.UTF_8)); + final JsonpMapper mapper = openSearchClient._transport().jsonpMapper(); + final JsonParser parser = mapper.jsonProvider().createParser(byteIn); + + final PutIndexTemplateRequest putIndexTemplateRequest = PutIndexTemplateRequestDeserializer.getJsonpDeserializer(composableIndexTemplate.name) + .deserialize(parser, mapper); + + openSearchClient.indices().putIndexTemplate(putIndexTemplateRequest); + + } + + private Optional getIndexTemplate(final String indexTemplateName) throws IOException { + final ExistsIndexTemplateRequest existsRequest = new ExistsIndexTemplateRequest.Builder() + .name(indexTemplateName) + .build(); + final BooleanResponse existsResponse = openSearchClient.indices().existsIndexTemplate(existsRequest); + + if (!existsResponse.value()) { + return Optional.empty(); + } + + final GetIndexTemplateRequest getRequest = new GetIndexTemplateRequest.Builder() + .name(indexTemplateName) + .build(); + final GetIndexTemplateResponse indexTemplateResponse = openSearchClient.indices().getIndexTemplate(getRequest); + + final List indexTemplateItems = indexTemplateResponse.indexTemplates(); + if (indexTemplateItems.size() == 1) { + return indexTemplateItems.stream().findFirst(); + } else { + throw new RuntimeException(String.format("Found zero or multiple index templates result when querying for %s", + indexTemplateName)); + } + } + + static class ComposableIndexTemplate implements IndexTemplate { + + private final Map indexTemplateMap; + private String name; + + private ComposableIndexTemplate(final Map indexTemplateMap) { + this.indexTemplateMap = new HashMap<>(indexTemplateMap); + } + + @Override + public void setTemplateName(final String name) { + this.name = name; + + } + + @Override + public void setIndexPatterns(final List indexPatterns) { + indexTemplateMap.put("index_patterns", indexPatterns); + } + + @Override + public void putCustomSetting(final String name, final Object value) { + + } + + @Override + public Optional getVersion() { + if(!indexTemplateMap.containsKey("version")) + return Optional.empty(); + final Number version = (Number) indexTemplateMap.get("version"); + return Optional.of(version.longValue()); + } + } + + private static class PutIndexTemplateRequestDeserializer { + private static void setupPutIndexTemplateRequestDeserializer(final ObjectDeserializer objectDeserializer) { + + objectDeserializer.add(PutIndexTemplateRequest.Builder::name, JsonpDeserializer.stringDeserializer(), "name"); + objectDeserializer.add(PutIndexTemplateRequest.Builder::indexPatterns, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()), + "index_patterns"); + objectDeserializer.add(PutIndexTemplateRequest.Builder::version, JsonpDeserializer.longDeserializer(), "version"); + objectDeserializer.add(PutIndexTemplateRequest.Builder::priority, JsonpDeserializer.integerDeserializer(), "priority"); + objectDeserializer.add(PutIndexTemplateRequest.Builder::composedOf, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()), + "composed_of"); + objectDeserializer.add(PutIndexTemplateRequest.Builder::template, IndexTemplateMapping._DESERIALIZER, "template"); + } + + static JsonpDeserializer getJsonpDeserializer(final String name) { + return ObjectBuilderDeserializer + .lazy( + () -> new PutIndexTemplateRequest.Builder().name(name), + PutIndexTemplateRequestDeserializer::setupPutIndexTemplateRequestDeserializer); + } + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 3c69680720..979ce4e4c8 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -31,6 +31,7 @@ public class IndexConfiguration { public static final String SETTINGS = "settings"; public static final String INDEX_ALIAS = "index"; public static final String INDEX_TYPE = "index_type"; + public static final String TEMPLATE_TYPE = "template_type"; public static final String TEMPLATE_FILE = "template_file"; public static final String NUM_SHARDS = "number_of_shards"; public static final String NUM_REPLICAS = "number_of_replicas"; @@ -47,6 +48,7 @@ public class IndexConfiguration { public static final String DOCUMENT_ROOT_KEY = "document_root_key"; private IndexType indexType; + private final TemplateType templateType; private final String indexAlias; private final Map indexTemplate; private final String documentIdField; @@ -72,7 +74,8 @@ private IndexConfiguration(final Builder builder) { this.s3AwsStsRoleArn = builder.s3AwsStsRoleArn; this.s3Client = builder.s3Client; - this.indexTemplate = readIndexTemplate(builder.templateFile, indexType); + this.templateType = builder.templateType != null ? builder.templateType : TemplateType.V1; + this.indexTemplate = readIndexTemplate(builder.templateFile, indexType, templateType); if (builder.numReplicas > 0) { indexTemplate.putIfAbsent(SETTINGS, new HashMap<>()); @@ -131,6 +134,10 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti if(indexType != null) { builder = builder.withIndexType(indexType); } + final String templateType = pluginSetting.getStringOrDefault(TEMPLATE_TYPE, TemplateType.V1.getTypeName()); + if(templateType != null) { + builder = builder.withTemplateType(templateType); + } final String templateFile = pluginSetting.getStringOrDefault(TEMPLATE_FILE, null); if (templateFile != null) { builder = builder.withTemplateFile(templateFile); @@ -179,6 +186,10 @@ public IndexType getIndexType() { return indexType; } + public TemplateType getTemplateType() { + return templateType; + } + public String getIndexAlias() { return indexAlias; } @@ -230,18 +241,17 @@ public String getDocumentRootKey() { * * @param templateFile * @param indexType + * @param templateType * @return */ - private Map readIndexTemplate(final String templateFile, final IndexType indexType) { + private Map readIndexTemplate(final String templateFile, final IndexType indexType, TemplateType templateType) { try { URL templateURL = null; InputStream s3TemplateFile = null; if (indexType.equals(IndexType.TRACE_ANALYTICS_RAW)) { - templateURL = getClass().getClassLoader() - .getResource(IndexConstants.RAW_DEFAULT_TEMPLATE_FILE); + templateURL = loadExistingTemplate(templateType, IndexConstants.RAW_DEFAULT_TEMPLATE_FILE); } else if (indexType.equals(IndexType.TRACE_ANALYTICS_SERVICE_MAP)) { - templateURL = getClass().getClassLoader() - .getResource(IndexConstants.SERVICE_MAP_DEFAULT_TEMPLATE_FILE); + templateURL = loadExistingTemplate(templateType, IndexConstants.SERVICE_MAP_DEFAULT_TEMPLATE_FILE); } else if (templateFile != null) { if (templateFile.toLowerCase().startsWith(S3_PREFIX)) { FileReader s3FileReader = new S3FileReader(s3Client); @@ -264,9 +274,16 @@ private Map readIndexTemplate(final String templateFile, final I } } + private URL loadExistingTemplate(TemplateType templateType, String predefinedTemplateName) { + String resourcePath = templateType == TemplateType.V1 ? predefinedTemplateName : templateType.getTypeName() + "/" + predefinedTemplateName; + return getClass().getClassLoader() + .getResource(resourcePath); + } + public static class Builder { private String indexAlias; private String indexType; + private TemplateType templateType; private String templateFile; private int numShards; private int numReplicas; @@ -295,6 +312,13 @@ public Builder withIndexType(final String indexType) { return this; } + public Builder withTemplateType(final String templateType) { + checkArgument(templateType != null, "templateType cannot be null."); + checkArgument(!templateType.isEmpty(), "templateType cannot be empty"); + this.templateType = TemplateType.fromTypeName(templateType); + return this; + } + public Builder withTemplateFile(final String templateFile) { checkArgument(templateFile != null, "templateFile cannot be null."); this.templateFile = templateFile; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TemplateType.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TemplateType.java index b1cc5026b5..b00e1d5ad7 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TemplateType.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TemplateType.java @@ -7,7 +7,10 @@ import org.opensearch.client.opensearch.OpenSearchClient; +import java.util.Arrays; +import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; /** * Represents a template type in OpenSearch. @@ -16,7 +19,18 @@ public enum TemplateType { /** * The v1 template type. */ - V1("v1", V1TemplateStrategy::new); + V1("v1", V1TemplateStrategy::new), + + /** + * Index template type. + */ + INDEX_TEMPLATE("index-template", ComposableIndexTemplateStrategy::new); + + private static final Map TYPE_NAME_MAP = Arrays.stream(TemplateType.values()) + .collect(Collectors.toMap( + value -> value.name, + value -> value + )); private final String name; private final Function factoryFunction; @@ -26,7 +40,15 @@ public enum TemplateType { this.factoryFunction = factoryFunction; } + public static TemplateType fromTypeName(final String name) { + return TYPE_NAME_MAP.get(name); + } + public TemplateStrategy createTemplateStrategy(final OpenSearchClient openSearchClient) { return factoryFunction.apply(openSearchClient); } + + String getTypeName() { + return name; + } } diff --git a/data-prepper-plugins/opensearch/src/main/resources/index-template/otel-v1-apm-service-map-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/index-template/otel-v1-apm-service-map-index-template.json new file mode 100644 index 0000000000..1ac4d3a60e --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/resources/index-template/otel-v1-apm-service-map-index-template.json @@ -0,0 +1,64 @@ +{ + "version": 0, + "template": { + "mappings": { + "date_detection": false, + "dynamic_templates": [ + { + "strings_as_keyword": { + "mapping": { + "ignore_above": 1024, + "type": "keyword" + }, + "match_mapping_type": "string" + } + } + ], + "_source": { + "enabled": true + }, + "properties": { + "hashId": { + "ignore_above": 1024, + "type": "keyword" + }, + "serviceName": { + "ignore_above": 1024, + "type": "keyword" + }, + "kind": { + "ignore_above": 1024, + "type": "keyword" + }, + "destination": { + "properties": { + "domain": { + "ignore_above": 1024, + "type": "keyword" + }, + "resource": { + "ignore_above": 1024, + "type": "keyword" + } + } + }, + "target": { + "properties": { + "domain": { + "ignore_above": 1024, + "type": "keyword" + }, + "resource": { + "ignore_above": 1024, + "type": "keyword" + } + } + }, + "traceGroupName": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/resources/index-template/otel-v1-apm-span-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/index-template/otel-v1-apm-span-index-template.json new file mode 100644 index 0000000000..e0f81af119 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/resources/index-template/otel-v1-apm-span-index-template.json @@ -0,0 +1,101 @@ +{ + "version": 1, + "template": { + "mappings": { + "date_detection": false, + "dynamic_templates": [ + { + "resource_attributes_map": { + "mapping": { + "type": "keyword" + }, + "path_match": "resource.attributes.*" + } + }, + { + "span_attributes_map": { + "mapping": { + "type": "keyword" + }, + "path_match": "span.attributes.*" + } + } + ], + "_source": { + "enabled": true + }, + "properties": { + "traceId": { + "ignore_above": 256, + "type": "keyword" + }, + "spanId": { + "ignore_above": 256, + "type": "keyword" + }, + "parentSpanId": { + "ignore_above": 256, + "type": "keyword" + }, + "name": { + "ignore_above": 1024, + "type": "keyword" + }, + "traceGroup": { + "ignore_above": 1024, + "type": "keyword" + }, + "traceGroupFields": { + "properties": { + "endTime": { + "type": "date_nanos" + }, + "durationInNanos": { + "type": "long" + }, + "statusCode": { + "type": "integer" + } + } + }, + "kind": { + "ignore_above": 128, + "type": "keyword" + }, + "startTime": { + "type": "date_nanos" + }, + "endTime": { + "type": "date_nanos" + }, + "status": { + "properties": { + "code": { + "type": "integer" + }, + "message": { + "type": "keyword" + } + } + }, + "serviceName": { + "type": "keyword" + }, + "durationInNanos": { + "type": "long" + }, + "events": { + "type": "nested", + "properties": { + "time": { + "type": "date_nanos" + } + } + }, + "links": { + "type": "nested" + } + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategyTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategyTest.java new file mode 100644 index 0000000000..100ed8baa1 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategyTest.java @@ -0,0 +1,323 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse; +import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient; +import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.get_index_template.IndexTemplateItem; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ComposableIndexTemplateStrategyTest { + @Mock + private OpenSearchClient openSearchClient; + + @Mock + private OpenSearchIndicesClient openSearchIndicesClient; + private Random random; + private String indexTemplateName; + + @BeforeEach + void setUp() { + random = new Random(); + lenient().when(openSearchClient.indices()).thenReturn(openSearchIndicesClient); + indexTemplateName = UUID.randomUUID().toString(); + } + + + private ComposableIndexTemplateStrategy createObjectUnderTest() { + return new ComposableIndexTemplateStrategy(openSearchClient); + } + + @Test + void getExistingTemplateVersion_should_calls_existIndexTemplate_with_indexTemplateName() throws IOException { + final BooleanResponse booleanResponse = mock(BooleanResponse.class); + when(booleanResponse.value()).thenReturn(false); + when(openSearchIndicesClient.existsIndexTemplate(any(ExistsIndexTemplateRequest.class))) + .thenReturn(booleanResponse); + + createObjectUnderTest().getExistingTemplateVersion(indexTemplateName); + + final ArgumentCaptor existsTemplateRequestArgumentCaptor = ArgumentCaptor.forClass(ExistsIndexTemplateRequest.class); + verify(openSearchIndicesClient).existsIndexTemplate(existsTemplateRequestArgumentCaptor.capture()); + + final ExistsIndexTemplateRequest actualRequest = existsTemplateRequestArgumentCaptor.getValue(); + assertThat(actualRequest.name(), equalTo(indexTemplateName)); + + verifyNoMoreInteractions(openSearchIndicesClient); + } + + @Test + void getExistingTemplateVersion_should_return_empty_if_no_template_exists() throws IOException { + final BooleanResponse booleanResponse = mock(BooleanResponse.class); + when(booleanResponse.value()).thenReturn(false); + when(openSearchIndicesClient.existsIndexTemplate(any(ExistsIndexTemplateRequest.class))) + .thenReturn(booleanResponse); + + final Optional optionalVersion = createObjectUnderTest().getExistingTemplateVersion(indexTemplateName); + + assertThat(optionalVersion, notNullValue()); + assertThat(optionalVersion.isPresent(), equalTo(false)); + } + + @Nested + class WithExistingIndexTemplate { + @BeforeEach + void setUp() throws IOException { + final BooleanResponse booleanResponse = mock(BooleanResponse.class); + when(booleanResponse.value()).thenReturn(true); + when(openSearchIndicesClient.existsIndexTemplate(any(ExistsIndexTemplateRequest.class))) + .thenReturn(booleanResponse); + } + + @Test + void getExistingTemplateVersion_should_return_empty_if_index_template_exists_without_version() throws IOException { + final GetIndexTemplateResponse getIndexTemplateResponse = mock(GetIndexTemplateResponse.class); + final IndexTemplateItem indexTemplateItem = mock(IndexTemplateItem.class); + org.opensearch.client.opensearch.indices.get_index_template.IndexTemplate indexTemplate = mock(org.opensearch.client.opensearch.indices.get_index_template.IndexTemplate.class); + when(indexTemplate.version()).thenReturn(null); + when(indexTemplateItem.indexTemplate()).thenReturn(indexTemplate); + when(getIndexTemplateResponse.indexTemplates()).thenReturn(Collections.singletonList(indexTemplateItem)); + when(openSearchIndicesClient.getIndexTemplate(any(GetIndexTemplateRequest.class))) + .thenReturn(getIndexTemplateResponse); + + final Optional optionalVersion = createObjectUnderTest().getExistingTemplateVersion(indexTemplateName); + + assertThat(optionalVersion, notNullValue()); + assertThat(optionalVersion.isPresent(), equalTo(false)); + } + + @Test + void getExistingTemplateVersion_should_return_template_version_if_template_exists() throws IOException { + final Long version = (long) (random.nextInt(10_000) + 100); + final GetIndexTemplateResponse getIndexTemplateResponse = mock(GetIndexTemplateResponse.class); + final IndexTemplateItem indexTemplateItem = mock(IndexTemplateItem.class); + org.opensearch.client.opensearch.indices.get_index_template.IndexTemplate indexTemplate = mock(org.opensearch.client.opensearch.indices.get_index_template.IndexTemplate.class); + when(indexTemplate.version()).thenReturn(version); + when(indexTemplateItem.indexTemplate()).thenReturn(indexTemplate); + when(getIndexTemplateResponse.indexTemplates()).thenReturn(Collections.singletonList(indexTemplateItem)); + when(openSearchIndicesClient.getIndexTemplate(any(GetIndexTemplateRequest.class))) + .thenReturn(getIndexTemplateResponse); + + final Optional optionalVersion = createObjectUnderTest().getExistingTemplateVersion(indexTemplateName); + + assertThat(optionalVersion, notNullValue()); + assertThat(optionalVersion.isPresent(), equalTo(true)); + assertThat(optionalVersion.get(), equalTo(version)); + } + + @ParameterizedTest + @ValueSource(ints = {0, 2}) + void getExistingTemplateVersion_should_throw_if_get_template_returns_unexpected_number_of_templates(final int numberOfTemplatesReturned) throws IOException { + final GetIndexTemplateResponse getIndexTemplateResponse = mock(GetIndexTemplateResponse.class); + final List templateResult = mock(List.class); + when(templateResult.size()).thenReturn(numberOfTemplatesReturned); + when(getIndexTemplateResponse.indexTemplates()).thenReturn(templateResult); + when(openSearchIndicesClient.getIndexTemplate(any(GetIndexTemplateRequest.class))) + .thenReturn(getIndexTemplateResponse); + + + final ComposableIndexTemplateStrategy objectUnderTest = createObjectUnderTest(); + assertThrows(RuntimeException.class, () -> objectUnderTest.getExistingTemplateVersion(indexTemplateName)); + + verify(openSearchIndicesClient).getIndexTemplate(any(GetIndexTemplateRequest.class)); + } + } + + @Test + void createTemplate_throws_if_template_is_not_ComposableIndexTemplate() { + final IndexTemplate indexTemplate = mock(IndexTemplate.class); + final ComposableIndexTemplateStrategy objectUnderTest = createObjectUnderTest(); + + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.createTemplate(indexTemplate)); + } + + @Nested + class IndexTemplateWithCreateTemplateTests { + private ArgumentCaptor putIndexTemplateRequestArgumentCaptor; + private List indexPatterns; + + @BeforeEach + void setUp() { + final OpenSearchTransport openSearchTransport = mock(OpenSearchTransport.class); + when(openSearchClient._transport()).thenReturn(openSearchTransport); + when(openSearchTransport.jsonpMapper()).thenReturn(new PreSerializedJsonpMapper()); + + putIndexTemplateRequestArgumentCaptor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); + + indexPatterns = Collections.singletonList(UUID.randomUUID().toString()); + } + + @Test + void createTemplate_with_setTemplateName_performs_putIndexTemplate_request() throws IOException { + final ComposableIndexTemplateStrategy objectUnderTest = createObjectUnderTest(); + + final IndexTemplate indexTemplate = objectUnderTest.createIndexTemplate(new HashMap<>()); + indexTemplate.setTemplateName(indexTemplateName); + objectUnderTest.createTemplate(indexTemplate); + + verify(openSearchIndicesClient).putIndexTemplate(putIndexTemplateRequestArgumentCaptor.capture()); + + final PutIndexTemplateRequest actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); + + assertThat(actualPutRequest.name(), equalTo(indexTemplateName)); + + assertThat(actualPutRequest.version(), nullValue()); + assertThat(actualPutRequest.indexPatterns(), notNullValue()); + assertThat(actualPutRequest.indexPatterns(), equalTo(Collections.emptyList())); + assertThat(actualPutRequest.template(), nullValue()); + assertThat(actualPutRequest.priority(), nullValue()); + assertThat(actualPutRequest.composedOf(), notNullValue()); + assertThat(actualPutRequest.composedOf(), equalTo(Collections.emptyList())); + } + + @Test + void createTemplate_with_setIndexPatterns_performs_putIndexTemplate_request() throws IOException { + final ComposableIndexTemplateStrategy objectUnderTest = createObjectUnderTest(); + + final List indexPatterns = Collections.singletonList(UUID.randomUUID().toString()); + + final IndexTemplate indexTemplate = objectUnderTest.createIndexTemplate(new HashMap<>()); + indexTemplate.setTemplateName(indexTemplateName); + indexTemplate.setIndexPatterns(indexPatterns); + objectUnderTest.createTemplate(indexTemplate); + + verify(openSearchIndicesClient).putIndexTemplate(putIndexTemplateRequestArgumentCaptor.capture()); + + final PutIndexTemplateRequest actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); + + assertThat(actualPutRequest.name(), equalTo(indexTemplateName)); + assertThat(actualPutRequest.indexPatterns(), equalTo(indexPatterns)); + + assertThat(actualPutRequest.version(), nullValue()); + assertThat(actualPutRequest.template(), nullValue()); + assertThat(actualPutRequest.priority(), nullValue()); + assertThat(actualPutRequest.composedOf(), notNullValue()); + assertThat(actualPutRequest.composedOf(), equalTo(Collections.emptyList())); + } + + @Test + void createTemplate_with_defined_template_values_performs_putIndexTemplate_request() throws IOException { + final ComposableIndexTemplateStrategy objectUnderTest = createObjectUnderTest(); + + final Long version = (long) (random.nextInt(10_000) + 100); + final int priority = random.nextInt(1000) + 100; + final String numberOfShards = Integer.toString(random.nextInt(1000) + 100); + final List composedOf = Collections.singletonList(UUID.randomUUID().toString()); + + final IndexTemplate indexTemplate = objectUnderTest.createIndexTemplate( + Map.of("version", version, + "priority", priority, + "composed_of", composedOf, + "template", Map.of( + "settings", Map.of( + "index", Map.of("number_of_shards", numberOfShards)), + "mappings", Map.of("date_detection", true) + ) + )); + indexTemplate.setTemplateName(indexTemplateName); + indexTemplate.setIndexPatterns(indexPatterns); + objectUnderTest.createTemplate(indexTemplate); + + verify(openSearchIndicesClient).putIndexTemplate(putIndexTemplateRequestArgumentCaptor.capture()); + + final PutIndexTemplateRequest actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); + + assertThat(actualPutRequest.name(), equalTo(indexTemplateName)); + assertThat(actualPutRequest.indexPatterns(), equalTo(indexPatterns)); + assertThat(actualPutRequest.version(), equalTo(version)); + assertThat(actualPutRequest.priority(), equalTo(priority)); + assertThat(actualPutRequest.composedOf(), equalTo(composedOf)); + assertThat(actualPutRequest.template(), notNullValue()); + assertThat(actualPutRequest.template().mappings(), notNullValue()); + assertThat(actualPutRequest.template().mappings().dateDetection(), equalTo(true)); + assertThat(actualPutRequest.template().settings(), notNullValue()); + assertThat(actualPutRequest.template().settings().index(), notNullValue()); + assertThat(actualPutRequest.template().settings().index().numberOfShards(), equalTo(numberOfShards)); + } + } + + @Nested + class IndexTemplateTests { + private Map providedTemplateMap; + + @BeforeEach + void setUp() { + providedTemplateMap = new HashMap<>(); + } + + @Test + void getVersion_returns_empty_if_no_version() { + final IndexTemplate indexTemplate = createObjectUnderTest().createIndexTemplate(providedTemplateMap); + + final Optional optionalVersion = indexTemplate.getVersion(); + assertThat(optionalVersion, notNullValue()); + assertThat(optionalVersion.isPresent(), equalTo(false)); + } + + @Test + void getVersion_returns_version_from_root_map() { + final Long version = (long) (random.nextInt(10_000) + 100); + providedTemplateMap.put("version", version); + + final IndexTemplate indexTemplate = createObjectUnderTest().createIndexTemplate(providedTemplateMap); + + final Optional optionalVersion = indexTemplate.getVersion(); + assertThat(optionalVersion, notNullValue()); + assertThat(optionalVersion.isPresent(), equalTo(true)); + assertThat(optionalVersion.get(), equalTo(version)); + } + + @Test + void getVersion_returns_version_from_root_map_when_provided_as_int() { + final Integer version = random.nextInt(10_000) + 100; + providedTemplateMap.put("version", version); + + final IndexTemplate indexTemplate = createObjectUnderTest().createIndexTemplate(providedTemplateMap); + + final Optional optionalVersion = indexTemplate.getVersion(); + assertThat(optionalVersion, notNullValue()); + assertThat(optionalVersion.isPresent(), equalTo(true)); + assertThat(optionalVersion.get(), equalTo((long) version)); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index 445042fd67..d6d6a6e53c 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -6,8 +6,10 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.junit.Test; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.services.s3.S3Client; @@ -19,21 +21,29 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; import static org.apache.commons.io.FileUtils.ONE_MB; -import static org.mockito.ArgumentMatchers.any; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.AWS_OPTION; -import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.SERVERLESS; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DOCUMENT_ROOT_KEY; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.SERVERLESS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.TEMPLATE_TYPE; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants.RAW_DEFAULT_TEMPLATE_FILE; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants.SERVICE_MAP_DEFAULT_TEMPLATE_FILE; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants.TYPE_TO_DEFAULT_ALIAS; @@ -47,19 +57,75 @@ public class IndexConfigurationTests { public void testRawAPMSpan() { final IndexConfiguration indexConfiguration = new IndexConfiguration.Builder().withIndexType( IndexType.TRACE_ANALYTICS_RAW.getValue()).build(); - final URL expTemplateURL = indexConfiguration.getClass().getClassLoader().getResource(RAW_DEFAULT_TEMPLATE_FILE); - assertEquals(TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW), indexConfiguration.getIndexAlias()); - assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); + assertThat(indexConfiguration.getIndexAlias(), equalTo(TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW))); + assertThat(indexConfiguration.getIndexTemplate(), not(anEmptyMap())); + assertThat(indexConfiguration.getIndexTemplate(), hasKey("mappings")); + assertThat(indexConfiguration.getIndexTemplate().get("mappings"), instanceOf(Map.class)); + final Object dynamicTemplatesObj = ((Map) indexConfiguration.getIndexTemplate().get("mappings")).get("dynamic_templates"); + assertThat(dynamicTemplatesObj, instanceOf(List.class)); + final List> dynamicTemplates = (List>) dynamicTemplatesObj; + + assertThat(dynamicTemplates.size(), equalTo(2)); + assertThat(dynamicTemplates.get(0), hasKey("resource_attributes_map")); + assertThat(dynamicTemplates.get(1), hasKey("span_attributes_map")); + } @Test public void testServiceMap() { final IndexConfiguration indexConfiguration = new IndexConfiguration.Builder().withIndexType( IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue()).build(); - final URL expTemplateURL = indexConfiguration - .getClass().getClassLoader().getResource(SERVICE_MAP_DEFAULT_TEMPLATE_FILE); - assertEquals(TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP), indexConfiguration.getIndexAlias()); - assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); + assertThat(indexConfiguration.getIndexAlias(), equalTo(TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP))); + assertThat(indexConfiguration.getIndexTemplate(), not(anEmptyMap())); + assertThat(indexConfiguration.getIndexTemplate(), hasKey("mappings")); + assertThat(indexConfiguration.getIndexTemplate().get("mappings"), instanceOf(Map.class)); + final Object dynamicTemplatesObj = ((Map) indexConfiguration.getIndexTemplate().get("mappings")).get("dynamic_templates"); + assertThat(dynamicTemplatesObj, instanceOf(List.class)); + final List> dynamicTemplates = (List>) dynamicTemplatesObj; + + assertThat(dynamicTemplates.size(), equalTo(1)); + assertThat(dynamicTemplates.get(0), hasKey("strings_as_keyword")); + } + + @Test + public void testRawAPMSpanWithIndexTemplates() { + final IndexConfiguration indexConfiguration = new IndexConfiguration.Builder() + .withIndexType(IndexType.TRACE_ANALYTICS_RAW.getValue()) + .withTemplateType(TemplateType.INDEX_TEMPLATE.getTypeName()) + .build(); + assertThat(indexConfiguration.getIndexAlias(), equalTo(TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW))); + assertThat(indexConfiguration.getIndexTemplate(), not(anEmptyMap())); + assertThat(indexConfiguration.getIndexTemplate(), hasKey("template")); + assertThat(indexConfiguration.getIndexTemplate().get("template"), instanceOf(Map.class)); + final Object mappings = ((Map) indexConfiguration.getIndexTemplate().get("template")).get("mappings"); + assertThat(mappings, instanceOf(Map.class)); + final Object dynamicTemplatesObj = ((Map) mappings).get("dynamic_templates"); + assertThat(dynamicTemplatesObj, instanceOf(List.class)); + List> dynamicTemplates = (List>) dynamicTemplatesObj; + + assertThat(dynamicTemplates.size(), equalTo(2)); + assertThat(dynamicTemplates.get(0), hasKey("resource_attributes_map")); + assertThat(dynamicTemplates.get(1), hasKey("span_attributes_map")); + } + + @Test + public void testServiceMapWithIndexTemplates() { + final IndexConfiguration indexConfiguration = new IndexConfiguration.Builder() + .withIndexType(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue()) + .withTemplateType(TemplateType.INDEX_TEMPLATE.getTypeName()) + .build(); + assertThat(indexConfiguration.getIndexAlias(), equalTo(TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP))); + assertThat(indexConfiguration.getIndexTemplate(), not(anEmptyMap())); + assertThat(indexConfiguration.getIndexTemplate(), hasKey("template")); + assertThat(indexConfiguration.getIndexTemplate().get("template"), instanceOf(Map.class)); + final Object mappings = ((Map) indexConfiguration.getIndexTemplate().get("template")).get("mappings"); + assertThat(mappings, instanceOf(Map.class)); + final Object dynamicTemplatesObj = ((Map) mappings).get("dynamic_templates"); + assertThat(dynamicTemplatesObj, instanceOf(List.class)); + List> dynamicTemplates = (List>) dynamicTemplatesObj; + + assertThat(dynamicTemplates.size(), equalTo(1)); + assertThat(dynamicTemplates.get(0), hasKey("strings_as_keyword")); } @Test @@ -317,6 +383,26 @@ public void testReadIndexConfig_emptyDocumentRootKey() { assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting)); } + @Test + void getTemplateType_defaults_to_V1() { + final Map metadata = initializeConfigMetaData( + IndexType.CUSTOM.getValue(), "foo", null, null, null); + final PluginSetting pluginSetting = getPluginSetting(metadata); + final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); + assertThat(indexConfiguration.getTemplateType(), equalTo(TemplateType.V1)); + } + + @ParameterizedTest + @EnumSource(TemplateType.class) + void getTemplateType_with_configured_templateType(final TemplateType templateType) { + final Map metadata = initializeConfigMetaData( + IndexType.CUSTOM.getValue(), "foo", null, null, null); + metadata.put(TEMPLATE_TYPE, templateType.getTypeName()); + final PluginSetting pluginSetting = getPluginSetting(metadata); + final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); + assertThat(indexConfiguration.getTemplateType(), equalTo(templateType)); + } + private PluginSetting generatePluginSetting( final String indexType, final String indexAlias, final String templateFilePath, diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TemplateTypeTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TemplateTypeTest.java index e290efcfaa..bc9422ec7d 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TemplateTypeTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TemplateTypeTest.java @@ -6,17 +6,24 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.EnumSource; import org.opensearch.client.opensearch.OpenSearchClient; +import java.util.UUID; import java.util.stream.Stream; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.Mockito.mock; class TemplateTypeTest { @@ -34,11 +41,29 @@ void createTemplateStrategy_returns_instance_of_expected_type(final TemplateType instanceOf(expectedStrategyClass)); } + @ParameterizedTest + @EnumSource(TemplateType.class) + void createTemplateStrategy_returns_for_all_enum_types(final TemplateType objectUnderTest) { + assertThat(objectUnderTest.createTemplateStrategy(openSearchClient), notNullValue()); + } + + @ParameterizedTest + @EnumSource(TemplateType.class) + void fromTypeName_returns_for_all_enum_types(final TemplateType templateType) { + assertThat(TemplateType.fromTypeName(templateType.getTypeName()), equalTo(templateType)); + } + + @Test + void fromTypeName_returns_null_for_unknown_type() { + assertThat(TemplateType.fromTypeName(UUID.randomUUID().toString()), nullValue()); + } + private static class EnumToStrategyClass implements ArgumentsProvider { @Override public Stream provideArguments(final ExtensionContext context) { return Stream.of( - Arguments.arguments(TemplateType.V1, V1TemplateStrategy.class) + arguments(TemplateType.V1, V1TemplateStrategy.class), + arguments(TemplateType.INDEX_TEMPLATE, ComposableIndexTemplateStrategy.class) ); } } diff --git a/data-prepper-plugins/opensearch/src/test/resources/test-composable-index-template-v2.json b/data-prepper-plugins/opensearch/src/test/resources/test-composable-index-template-v2.json new file mode 100644 index 0000000000..bdc91e23c5 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/resources/test-composable-index-template-v2.json @@ -0,0 +1,28 @@ +{ + "version": 2, + "template": { + "mappings": { + "date_detection": false, + "dynamic_templates": [ + { + "strings_as_keyword": { + "mapping": { + "ignore_above": 1024, + "type": "keyword" + }, + "match_mapping_type": "string" + } + } + ], + "_source": { + "enabled": false + }, + "properties": { + "name": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/resources/test-composable-index-template.json b/data-prepper-plugins/opensearch/src/test/resources/test-composable-index-template.json new file mode 100644 index 0000000000..a73966dd91 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/resources/test-composable-index-template.json @@ -0,0 +1,28 @@ +{ + "version": 1, + "template": { + "mappings": { + "date_detection": false, + "dynamic_templates": [ + { + "strings_as_keyword": { + "mapping": { + "ignore_above": 1024, + "type": "keyword" + }, + "match_mapping_type": "string" + } + } + ], + "_source": { + "enabled": false + }, + "properties": { + "name": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + } +} \ No newline at end of file