diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataValidator.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataValidator.java new file mode 100644 index 0000000000..68ba34c476 --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataValidator.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class FlintIndexMetadataValidator { + private static final Logger LOGGER = LogManager.getLogger(FlintIndexMetadataValidator.class); + + public static final Set ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS = + new LinkedHashSet<>(Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH)); + public static final Set ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS = + new LinkedHashSet<>( + Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION)); + + /** + * Validate if the flint index options contain valid key/value pairs. Throws + * IllegalArgumentException with description about invalid options. + */ + public static void validateFlintIndexOptions( + String kind, Map existingOptions, Map newOptions) { + if ((newOptions.containsKey(INCREMENTAL_REFRESH) + && Boolean.parseBoolean(newOptions.get(INCREMENTAL_REFRESH))) + || ((!newOptions.containsKey(INCREMENTAL_REFRESH) + && Boolean.parseBoolean((String) existingOptions.get(INCREMENTAL_REFRESH))))) { + validateConversionToIncrementalRefresh(kind, existingOptions, newOptions); + } else { + validateConversionToFullRefresh(newOptions); + } + } + + private static void validateConversionToFullRefresh(Map newOptions) { + if (!ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) { + throw new IllegalArgumentException( + String.format( + "Altering to full refresh only allows: %s options", + ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS)); + } + } + + private static void validateConversionToIncrementalRefresh( + String kind, Map existingOptions, Map newOptions) { + if (!ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) { + throw new IllegalArgumentException( + String.format( + "Altering to incremental refresh only allows: %s options", + ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS)); + } + HashMap mergedOptions = new HashMap<>(); + mergedOptions.putAll(existingOptions); + mergedOptions.putAll(newOptions); + List missingAttributes = new ArrayList<>(); + if (!mergedOptions.containsKey(CHECKPOINT_LOCATION) + || StringUtils.isEmpty((String) mergedOptions.get(CHECKPOINT_LOCATION))) { + missingAttributes.add(CHECKPOINT_LOCATION); + } + if (kind.equals("mv") + && (!mergedOptions.containsKey(WATERMARK_DELAY) + || StringUtils.isEmpty((String) mergedOptions.get(WATERMARK_DELAY)))) { + missingAttributes.add(WATERMARK_DELAY); + } + if (missingAttributes.size() > 0) { + String errorMessage = + "Conversion to incremental refresh index cannot proceed due to missing attributes: " + + String.join(", ", missingAttributes) + + "."; + LOGGER.error(errorMessage); + throw new IllegalArgumentException(errorMessage); + } + } +} diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataValidatorTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataValidatorTest.java new file mode 100644 index 0000000000..7a1e718c05 --- /dev/null +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataValidatorTest.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class FlintIndexMetadataValidatorTest { + @Test + public void conversionToIncrementalRefreshWithValidOption() { + Map existingOptions = + ImmutableMap.builder().put(INCREMENTAL_REFRESH, "false").build(); + Map newOptions = + ImmutableMap.builder() + .put(INCREMENTAL_REFRESH, "true") + .put(CHECKPOINT_LOCATION, "checkpoint_location") + .put(WATERMARK_DELAY, "1") + .build(); + + FlintIndexMetadataValidator.validateFlintIndexOptions("mv", existingOptions, newOptions); + } + + @Test + public void conversionToIncrementalRefreshWithMissingOptions() { + Map existingOptions = + ImmutableMap.builder().put(AUTO_REFRESH, "true").build(); + Map newOptions = + ImmutableMap.builder().put(INCREMENTAL_REFRESH, "true").build(); + + assertThrows( + IllegalArgumentException.class, + () -> + FlintIndexMetadataValidator.validateFlintIndexOptions( + "mv", existingOptions, newOptions)); + } + + @Test + public void conversionToIncrementalRefreshWithInvalidOption() { + Map existingOptions = + ImmutableMap.builder().put(INCREMENTAL_REFRESH, "false").build(); + Map newOptions = + ImmutableMap.builder() + .put(INCREMENTAL_REFRESH, "true") + .put("INVALID_OPTION", "1") + .build(); + + assertThrows( + IllegalArgumentException.class, + () -> + FlintIndexMetadataValidator.validateFlintIndexOptions( + "mv", existingOptions, newOptions)); + } + + @Test + public void conversionToFullRefreshWithValidOption() { + Map existingOptions = + ImmutableMap.builder().put(AUTO_REFRESH, "false").build(); + Map newOptions = + ImmutableMap.builder().put(AUTO_REFRESH, "true").build(); + + FlintIndexMetadataValidator.validateFlintIndexOptions("mv", existingOptions, newOptions); + } + + @Test + public void conversionToFullRefreshWithInvalidOption() { + Map existingOptions = + ImmutableMap.builder().put(AUTO_REFRESH, "false").build(); + Map newOptions = + ImmutableMap.builder() + .put(AUTO_REFRESH, "true") + .put(WATERMARK_DELAY, "1") + .build(); + + assertThrows( + IllegalArgumentException.class, + () -> + FlintIndexMetadataValidator.validateFlintIndexOptions( + "mv", existingOptions, newOptions)); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java b/async-query/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java index b8352d15b2..38789dd796 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java @@ -5,10 +5,6 @@ package org.opensearch.sql.spark.flint; -import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH; -import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION; -import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH; -import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.APP_ID; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.KIND_KEY; @@ -20,15 +16,9 @@ import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID; import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SOURCE_KEY; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; import java.util.Map; -import java.util.Set; import lombok.AllArgsConstructor; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; @@ -43,11 +33,6 @@ public class FlintIndexMetadataServiceImpl implements FlintIndexMetadataService private static final Logger LOGGER = LogManager.getLogger(FlintIndexMetadataServiceImpl.class); private final Client client; - public static final Set ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS = - new LinkedHashSet<>(Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH)); - public static final Set ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS = - new LinkedHashSet<>( - Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION)); @Override public Map getFlintIndexMetadata( @@ -87,63 +72,11 @@ public void updateIndexToManualRefresh( String kind = (String) meta.get("kind"); Map options = (Map) meta.get("options"); Map newOptions = flintIndexOptions.getProvidedOptions(); - validateFlintIndexOptions(kind, options, newOptions); + FlintIndexMetadataValidator.validateFlintIndexOptions(kind, options, newOptions); options.putAll(newOptions); client.admin().indices().preparePutMapping(indexName).setSource(flintMetadataMap).get(); } - private void validateFlintIndexOptions( - String kind, Map existingOptions, Map newOptions) { - if ((newOptions.containsKey(INCREMENTAL_REFRESH) - && Boolean.parseBoolean(newOptions.get(INCREMENTAL_REFRESH))) - || ((!newOptions.containsKey(INCREMENTAL_REFRESH) - && Boolean.parseBoolean((String) existingOptions.get(INCREMENTAL_REFRESH))))) { - validateConversionToIncrementalRefresh(kind, existingOptions, newOptions); - } else { - validateConversionToFullRefresh(newOptions); - } - } - - private void validateConversionToFullRefresh(Map newOptions) { - if (!ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) { - throw new IllegalArgumentException( - String.format( - "Altering to full refresh only allows: %s options", - ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS)); - } - } - - private void validateConversionToIncrementalRefresh( - String kind, Map existingOptions, Map newOptions) { - if (!ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) { - throw new IllegalArgumentException( - String.format( - "Altering to incremental refresh only allows: %s options", - ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS)); - } - HashMap mergedOptions = new HashMap<>(); - mergedOptions.putAll(existingOptions); - mergedOptions.putAll(newOptions); - List missingAttributes = new ArrayList<>(); - if (!mergedOptions.containsKey(CHECKPOINT_LOCATION) - || StringUtils.isEmpty((String) mergedOptions.get(CHECKPOINT_LOCATION))) { - missingAttributes.add(CHECKPOINT_LOCATION); - } - if (kind.equals("mv") - && (!mergedOptions.containsKey(WATERMARK_DELAY) - || StringUtils.isEmpty((String) mergedOptions.get(WATERMARK_DELAY)))) { - missingAttributes.add(WATERMARK_DELAY); - } - if (missingAttributes.size() > 0) { - String errorMessage = - "Conversion to incremental refresh index cannot proceed due to missing attributes: " - + String.join(", ", missingAttributes) - + "."; - LOGGER.error(errorMessage); - throw new IllegalArgumentException(errorMessage); - } - } - private FlintIndexMetadata fromMetadata(String indexName, Map metaMap) { FlintIndexMetadata.FlintIndexMetadataBuilder flintIndexMetadataBuilder = FlintIndexMetadata.builder();