From d26748680468aa50bf1bf93c83cc005ae423a2e0 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Sat, 11 Nov 2023 15:54:29 -0600 Subject: [PATCH] Add normalize_index flag to normalize invalid dynamic indices (#3634) Signed-off-by: Taylor Gray --- .../converter/MetadataKeyAttributes.java | 2 +- .../dynamodb/converter/RecordConverter.java | 4 +- .../converter/ExportRecordConverterTest.java | 4 +- .../converter/StreamRecordConverterTest.java | 10 ++-- .../opensearch/index/DynamicIndexManager.java | 36 +++++++++++-- .../opensearch/index/IndexConfiguration.java | 13 +++++ .../index/DynamicIndexManagerTests.java | 53 +++++++++++++++++++ 7 files changed, 109 insertions(+), 13 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java index 51c85bbefb..3fdcbe287d 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java @@ -14,7 +14,7 @@ public class MetadataKeyAttributes { static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "dynamodb_timestamp"; - static final String EVENT_DYNAMODB_ITEM_VERSION = "dynamodb_item_version"; + static final String EVENT_VERSION_FROM_TIMESTAMP = "document_version"; static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action"; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java index 7123b6825e..5c8093597f 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java @@ -18,7 +18,7 @@ import java.util.Map; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_DYNAMODB_ITEM_VERSION; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; @@ -97,7 +97,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet, eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis); eventMetadata.setAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName); eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(eventName)); - eventMetadata.setAttribute(EVENT_DYNAMODB_ITEM_VERSION, eventVersionNumber); + eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber); String partitionKey = getAttributeValue(keys, tableInfo.getMetadata().getPartitionKeyAttributeName()); eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java index 093536bbbd..e4dc77c988 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java @@ -41,7 +41,7 @@ import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_DYNAMODB_ITEM_VERSION; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; @@ -146,7 +146,7 @@ void test_writeSingleRecordToBuffer() throws Exception { assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), notNullValue()); assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), nullValue()); assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), notNullValue()); - assertThat(event.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(0L)); + assertThat(event.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(0L)); assertThat(event.getEventHandle(), notNullValue()); assertThat(event.getEventHandle().getExternalOriginationTime(), nullValue()); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index d8977b7875..d9f1ed676f 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -42,7 +42,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_DYNAMODB_ITEM_VERSION; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; @@ -180,7 +180,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta assertThat(firstEventForSecond.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); assertThat(firstEventForSecond.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT")); assertThat(firstEventForSecond.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(timestamp.toEpochMilli())); - assertThat(firstEventForSecond.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(timestamp.toEpochMilli() * 1000)); + assertThat(firstEventForSecond.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(timestamp.toEpochMilli() * 1000)); assertThat(firstEventForSecond.getEventHandle(), notNullValue()); assertThat(firstEventForSecond.getEventHandle().getExternalOriginationTime(), equalTo(timestamp)); @@ -195,7 +195,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta assertThat(secondEventForSameSecond.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); assertThat(secondEventForSameSecond.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT")); assertThat(secondEventForSameSecond.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(timestamp.toEpochMilli())); - assertThat(secondEventForSameSecond.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(timestamp.toEpochMilli() * 1000 + 1)); + assertThat(secondEventForSameSecond.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(timestamp.toEpochMilli() * 1000 + 1)); assertThat(secondEventForSameSecond.getEventHandle(), notNullValue()); assertThat(secondEventForSameSecond.getEventHandle().getExternalOriginationTime(), equalTo(timestamp)); @@ -210,7 +210,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT")); assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(olderSecond.toEpochMilli())); - assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(olderSecond.toEpochMilli() * 1000)); + assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(olderSecond.toEpochMilli() * 1000)); assertThat(thirdEventWithOlderSecond.getEventHandle(), notNullValue()); assertThat(thirdEventWithOlderSecond.getEventHandle().getExternalOriginationTime(), equalTo(olderSecond)); @@ -225,7 +225,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT")); assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(newerSecond.toEpochMilli())); - assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(newerSecond.toEpochMilli() * 1000)); + assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(newerSecond.toEpochMilli() * 1000)); assertThat(fourthEventWithNewerSecond.getEventHandle(), notNullValue()); assertThat(fourthEventWithNewerSecond.getEventHandle().getExternalOriginationTime(), equalTo(newerSecond)); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java index 0b8151c6da..8c600c30af 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java @@ -11,12 +11,12 @@ import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; +import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import static com.google.common.base.Preconditions.checkNotNull; @@ -27,6 +27,8 @@ public class DynamicIndexManager extends AbstractIndexManager { private Cache indexManagerCache; final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30; final int APPROXIMATE_INDEX_MANAGER_SIZE = 32; + private static final String INVALID_INDEX_CHARACTERS = "[:\\\"*+/\\\\|?#><]"; + private static final Pattern INVALID_REGEX_CHARACTERS_PATTERN = Pattern.compile(INVALID_INDEX_CHARACTERS); private final long cacheSizeInKB = 1024; protected RestHighLevelClient restHighLevelClient; protected OpenSearchClient openSearchClient; @@ -73,6 +75,11 @@ public String getIndexName(final String dynamicIndexAlias) throws IOException { throw new IOException("index alias is null"); } String fullIndexAlias = AbstractIndexManager.getIndexAliasWithDate(dynamicIndexAlias); + + if (openSearchSinkConfiguration.getIndexConfiguration().isNormalizeIndex()) { + fullIndexAlias = normalizeIndex(fullIndexAlias); + } + IndexManager indexManager = indexManagerCache.getIfPresent(fullIndexAlias); if (indexManager == null) { indexManager = indexManagerFactory.getIndexManager( @@ -82,7 +89,6 @@ public String getIndexName(final String dynamicIndexAlias) throws IOException { } return indexManager.getIndexName(fullIndexAlias); } - private void setupIndexWithRetries(final IndexManager indexManager) throws IOException { boolean isIndexSetup = false; @@ -100,5 +106,29 @@ private void setupIndexWithRetries(final IndexManager indexManager) throws IOExc } } } + // Restrictions on index names (https://opensearch.org/docs/1.0/opensearch/rest-api/create-index/#index-naming-restrictions) + private String normalizeIndex(final String indexName) { + String normalizedIndexName = indexName.toLowerCase(Locale.ROOT); + + normalizedIndexName = INVALID_REGEX_CHARACTERS_PATTERN.matcher(normalizedIndexName).replaceAll(""); + + while (normalizedIndexName.startsWith("_") || normalizedIndexName.startsWith("-")) { + if (normalizedIndexName.length() == 1) { + throw new RuntimeException(String.format( + "Unable to normalize index '%s'. This index name is invalid.", indexName) + ); + } + + normalizedIndexName = normalizedIndexName.substring(1); + } + + if (normalizedIndexName.isBlank()) { + throw new RuntimeException(String.format( + "Unable to normalize index '%s'. The result after normalization was an empty String.", indexName) + ); + } + + return normalizedIndexName; + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 363e074c65..edceb5b7e5 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -72,6 +72,7 @@ public class IndexConfiguration { public static final String DOCUMENT_ROOT_KEY = "document_root_key"; public static final String DOCUMENT_VERSION_EXPRESSION = "document_version"; public static final String DOCUMENT_VERSION_TYPE = "document_version_type"; + public static final String NORMALIZE_INDEX = "normalize_index"; private IndexType indexType; private TemplateType templateType; @@ -96,6 +97,7 @@ public class IndexConfiguration { private final String documentRootKey; private final String versionExpression; private final VersionType versionType; + private final boolean normalizeIndex; private static final String S3_PREFIX = "s3://"; private static final String DEFAULT_AWS_REGION = "us-east-1"; @@ -112,6 +114,7 @@ private IndexConfiguration(final Builder builder) { this.s3Client = builder.s3Client; this.versionExpression = builder.versionExpression; this.versionType = builder.versionType; + this.normalizeIndex = builder.normalizeIndex; determineTemplateType(builder); @@ -230,6 +233,8 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti final String versionExpression = pluginSetting.getStringOrDefault(DOCUMENT_VERSION_EXPRESSION, null); final String versionType = pluginSetting.getStringOrDefault(DOCUMENT_VERSION_TYPE, null); + final boolean normalizeIndex = pluginSetting.getBooleanOrDefault(NORMALIZE_INDEX, false); + builder = builder.withNormalizeIndex(normalizeIndex); builder = builder.withVersionExpression(versionExpression); if (versionExpression != null && (!expressionEvaluator.isValidFormatExpression(versionExpression))) { @@ -376,6 +381,8 @@ public String getDocumentRootKey() { public String getVersionExpression() { return versionExpression; } + public boolean isNormalizeIndex() { return normalizeIndex; } + /** * This method is used in the creation of IndexConfiguration object. It takes in the template file path * or index type and returns the index template read from the file or specific to index type or returns an @@ -458,6 +465,7 @@ public static class Builder { private String documentRootKey; private VersionType versionType; private String versionExpression; + private boolean normalizeIndex; public Builder withIndexAlias(final String indexAlias) { checkArgument(indexAlias != null, "indexAlias cannot be null."); @@ -626,6 +634,11 @@ public Builder withVersionType(final String versionType) { return this; } + public Builder withNormalizeIndex(final boolean normalizeIndex) { + this.normalizeIndex = normalizeIndex; + return this; + } + private VersionType getVersionType(final String versionType) { switch (versionType.toLowerCase()) { case "internal": diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java index d752370d4e..ffa274a4a2 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java @@ -8,8 +8,12 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.opensearch.client.opensearch._types.OpenSearchException; +import org.mockito.MockedStatic; import org.opensearch.client.IndicesClient; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; @@ -24,6 +28,8 @@ import java.util.Optional; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -31,6 +37,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -230,4 +237,50 @@ public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilFailure() throws I verify(innerIndexManager, times(3)).setupIndex(); } + + @ParameterizedTest + @CsvSource(value = {"INVALID_INDEX#, invalid_index", "-AAA:\\\"*+/\\\\|?#><, aaa", "_TeST_InDeX<, test_index", "-- abstractIndexManagerMockedStatic = mockStatic(AbstractIndexManager.class)) { + abstractIndexManagerMockedStatic.when(() -> AbstractIndexManager.getIndexAliasWithDate(dynamicIndexName)) + .thenReturn(indexWithDateTimePatternResolved); + final String result = dynamicIndexManager.getIndexName(dynamicIndexName); + assertThat(result, equalTo(normalizedDynamicIndexName)); + } + } + + @ParameterizedTest + @ValueSource(strings = {"*-<-", " dynamicIndexManager.getIndexName(dynamicIndexName)); + + assertThat(exception, notNullValue()); + assertThat(exception.getMessage(), startsWith("Unable to normalize index")); + } }