Skip to content

Commit

Permalink
Extract validation logic from FlintIndexMetadataServiceImpl (#2944)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored Aug 30, 2024
1 parent e1ee3b1 commit d260e0e
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class FlintIndexMetadataValidator {
private static final Logger LOGGER = LogManager.getLogger(FlintIndexMetadataValidator.class);

public static final Set<String> ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS =
new LinkedHashSet<>(Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH));
public static final Set<String> ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS =
new LinkedHashSet<>(
Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION));

/**
* Validate if the flint index options contain valid key/value pairs. Throws
* IllegalArgumentException with description about invalid options.
*/
public static void validateFlintIndexOptions(
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
if ((newOptions.containsKey(INCREMENTAL_REFRESH)
&& Boolean.parseBoolean(newOptions.get(INCREMENTAL_REFRESH)))
|| ((!newOptions.containsKey(INCREMENTAL_REFRESH)
&& Boolean.parseBoolean((String) existingOptions.get(INCREMENTAL_REFRESH))))) {
validateConversionToIncrementalRefresh(kind, existingOptions, newOptions);
} else {
validateConversionToFullRefresh(newOptions);
}
}

private static void validateConversionToFullRefresh(Map<String, String> newOptions) {
if (!ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) {
throw new IllegalArgumentException(
String.format(
"Altering to full refresh only allows: %s options",
ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS));
}
}

private static void validateConversionToIncrementalRefresh(
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
if (!ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) {
throw new IllegalArgumentException(
String.format(
"Altering to incremental refresh only allows: %s options",
ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS));
}
HashMap<String, Object> mergedOptions = new HashMap<>();
mergedOptions.putAll(existingOptions);
mergedOptions.putAll(newOptions);
List<String> missingAttributes = new ArrayList<>();
if (!mergedOptions.containsKey(CHECKPOINT_LOCATION)
|| StringUtils.isEmpty((String) mergedOptions.get(CHECKPOINT_LOCATION))) {
missingAttributes.add(CHECKPOINT_LOCATION);
}
if (kind.equals("mv")
&& (!mergedOptions.containsKey(WATERMARK_DELAY)
|| StringUtils.isEmpty((String) mergedOptions.get(WATERMARK_DELAY)))) {
missingAttributes.add(WATERMARK_DELAY);
}
if (missingAttributes.size() > 0) {
String errorMessage =
"Conversion to incremental refresh index cannot proceed due to missing attributes: "
+ String.join(", ", missingAttributes)
+ ".";
LOGGER.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.junit.jupiter.api.Test;

class FlintIndexMetadataValidatorTest {
@Test
public void conversionToIncrementalRefreshWithValidOption() {
Map<String, Object> existingOptions =
ImmutableMap.<String, Object>builder().put(INCREMENTAL_REFRESH, "false").build();
Map<String, String> newOptions =
ImmutableMap.<String, String>builder()
.put(INCREMENTAL_REFRESH, "true")
.put(CHECKPOINT_LOCATION, "checkpoint_location")
.put(WATERMARK_DELAY, "1")
.build();

FlintIndexMetadataValidator.validateFlintIndexOptions("mv", existingOptions, newOptions);
}

@Test
public void conversionToIncrementalRefreshWithMissingOptions() {
Map<String, Object> existingOptions =
ImmutableMap.<String, Object>builder().put(AUTO_REFRESH, "true").build();
Map<String, String> newOptions =
ImmutableMap.<String, String>builder().put(INCREMENTAL_REFRESH, "true").build();

assertThrows(
IllegalArgumentException.class,
() ->
FlintIndexMetadataValidator.validateFlintIndexOptions(
"mv", existingOptions, newOptions));
}

@Test
public void conversionToIncrementalRefreshWithInvalidOption() {
Map<String, Object> existingOptions =
ImmutableMap.<String, Object>builder().put(INCREMENTAL_REFRESH, "false").build();
Map<String, String> newOptions =
ImmutableMap.<String, String>builder()
.put(INCREMENTAL_REFRESH, "true")
.put("INVALID_OPTION", "1")
.build();

assertThrows(
IllegalArgumentException.class,
() ->
FlintIndexMetadataValidator.validateFlintIndexOptions(
"mv", existingOptions, newOptions));
}

@Test
public void conversionToFullRefreshWithValidOption() {
Map<String, Object> existingOptions =
ImmutableMap.<String, Object>builder().put(AUTO_REFRESH, "false").build();
Map<String, String> newOptions =
ImmutableMap.<String, String>builder().put(AUTO_REFRESH, "true").build();

FlintIndexMetadataValidator.validateFlintIndexOptions("mv", existingOptions, newOptions);
}

@Test
public void conversionToFullRefreshWithInvalidOption() {
Map<String, Object> existingOptions =
ImmutableMap.<String, Object>builder().put(AUTO_REFRESH, "false").build();
Map<String, String> newOptions =
ImmutableMap.<String, String>builder()
.put(AUTO_REFRESH, "true")
.put(WATERMARK_DELAY, "1")
.build();

assertThrows(
IllegalArgumentException.class,
() ->
FlintIndexMetadataValidator.validateFlintIndexOptions(
"mv", existingOptions, newOptions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@

package org.opensearch.sql.spark.flint;

import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH;
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.APP_ID;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.KIND_KEY;
Expand All @@ -20,15 +16,9 @@
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SOURCE_KEY;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
Expand All @@ -43,11 +33,6 @@ public class FlintIndexMetadataServiceImpl implements FlintIndexMetadataService
private static final Logger LOGGER = LogManager.getLogger(FlintIndexMetadataServiceImpl.class);

private final Client client;
public static final Set<String> ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS =
new LinkedHashSet<>(Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH));
public static final Set<String> ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS =
new LinkedHashSet<>(
Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION));

@Override
public Map<String, FlintIndexMetadata> getFlintIndexMetadata(
Expand Down Expand Up @@ -87,63 +72,11 @@ public void updateIndexToManualRefresh(
String kind = (String) meta.get("kind");
Map<String, Object> options = (Map<String, Object>) meta.get("options");
Map<String, String> newOptions = flintIndexOptions.getProvidedOptions();
validateFlintIndexOptions(kind, options, newOptions);
FlintIndexMetadataValidator.validateFlintIndexOptions(kind, options, newOptions);
options.putAll(newOptions);
client.admin().indices().preparePutMapping(indexName).setSource(flintMetadataMap).get();
}

private void validateFlintIndexOptions(
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
if ((newOptions.containsKey(INCREMENTAL_REFRESH)
&& Boolean.parseBoolean(newOptions.get(INCREMENTAL_REFRESH)))
|| ((!newOptions.containsKey(INCREMENTAL_REFRESH)
&& Boolean.parseBoolean((String) existingOptions.get(INCREMENTAL_REFRESH))))) {
validateConversionToIncrementalRefresh(kind, existingOptions, newOptions);
} else {
validateConversionToFullRefresh(newOptions);
}
}

private void validateConversionToFullRefresh(Map<String, String> newOptions) {
if (!ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) {
throw new IllegalArgumentException(
String.format(
"Altering to full refresh only allows: %s options",
ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS));
}
}

private void validateConversionToIncrementalRefresh(
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
if (!ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) {
throw new IllegalArgumentException(
String.format(
"Altering to incremental refresh only allows: %s options",
ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS));
}
HashMap<String, Object> mergedOptions = new HashMap<>();
mergedOptions.putAll(existingOptions);
mergedOptions.putAll(newOptions);
List<String> missingAttributes = new ArrayList<>();
if (!mergedOptions.containsKey(CHECKPOINT_LOCATION)
|| StringUtils.isEmpty((String) mergedOptions.get(CHECKPOINT_LOCATION))) {
missingAttributes.add(CHECKPOINT_LOCATION);
}
if (kind.equals("mv")
&& (!mergedOptions.containsKey(WATERMARK_DELAY)
|| StringUtils.isEmpty((String) mergedOptions.get(WATERMARK_DELAY)))) {
missingAttributes.add(WATERMARK_DELAY);
}
if (missingAttributes.size() > 0) {
String errorMessage =
"Conversion to incremental refresh index cannot proceed due to missing attributes: "
+ String.join(", ", missingAttributes)
+ ".";
LOGGER.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
}

private FlintIndexMetadata fromMetadata(String indexName, Map<String, Object> metaMap) {
FlintIndexMetadata.FlintIndexMetadataBuilder flintIndexMetadataBuilder =
FlintIndexMetadata.builder();
Expand Down

0 comments on commit d260e0e

Please sign in to comment.