diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index d50c0002e..6873dff2d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -26,6 +26,13 @@ public interface FlintClient { */ void createIndex(String indexName, FlintMetadata metadata); + /** + * Create an alias name for the given index + * @param indexName + * @param aliasName + */ + void alias(String indexName, String aliasName, FlintMetadata metadata); + /** * Does Flint index with the given name exist * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index ea0fb0f98..ae34b82df 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -18,6 +18,8 @@ import org.opensearch.flint.core.metadata.FlintJsonHelper._ case class FlintMetadata( /** Flint spec version */ version: FlintVersion, + /** Flint index target name */ + targetName: Option[String], /** Flint index name */ name: String, /** Flint index kind */ @@ -46,7 +48,7 @@ case class FlintMetadata( * @return * JSON content */ - def getContent: String = { + def getContent(includeProperties: Boolean = true): String = { try { buildJson(builder => { // Add _meta field @@ -57,13 +59,19 @@ case class FlintMetadata( .field("kind", kind) .field("source", source) .field("indexedColumns", indexedColumns) - + // Only add targetName if it's not empty + targetName.foreach(tn => builder.field("targetName", tn)) optionalObjectField(builder, "options", options) - optionalObjectField(builder, "properties", properties) + + if (includeProperties) { + optionalObjectField(builder, "properties", properties) + } } // Add properties (schema) field - builder.field("properties", schema) + if (includeProperties) { + builder.field("properties", schema) + } }) } catch { case e: Exception => @@ -109,6 +117,7 @@ object FlintMetadata { innerFieldName match { case "version" => builder.version(FlintVersion.apply(parser.text())) case "name" => builder.name(parser.text()) + case "targetName" => builder.targetName(Option.apply(parser.text())) case "kind" => builder.kind(parser.text()) case "source" => builder.source(parser.text()) case "indexedColumns" => @@ -141,6 +150,7 @@ object FlintMetadata { */ class Builder { private var version: FlintVersion = FlintVersion.current() + private var targetName: Option[String] = None private var name: String = "" private var kind: String = "" private var source: String = "" @@ -160,6 +170,11 @@ object FlintMetadata { this } + def targetName(name: Option[String]): this.type = { + this.targetName = name + this + } + def kind(kind: String): this.type = { this.kind = kind this @@ -219,6 +234,7 @@ object FlintMetadata { def build(): FlintMetadata = { FlintMetadata( if (version == null) current() else version, + targetName, name, kind, source, diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index ff2761856..8830a0178 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -5,33 +5,43 @@ package org.opensearch.flint.core.storage; +import static java.lang.String.format; import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; import com.amazonaws.auth.AWS4Signer; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; + import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; + import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.opensearch.action.admin.indices.alias.Alias; +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; @@ -40,6 +50,7 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.FlintVersion; import org.opensearch.flint.core.auth.AWSRequestSigningApacheInterceptor; import org.opensearch.flint.core.metadata.FlintMetadata; import org.opensearch.index.query.AbstractQueryBuilder; @@ -54,161 +65,215 @@ */ public class FlintOpenSearchClient implements FlintClient { - /** - * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string. - */ - private final static NamedXContentRegistry - xContentRegistry = - new NamedXContentRegistry(new SearchModule(Settings.builder().build(), - new ArrayList<>()).getNamedXContents()); - - private final FlintOptions options; - - public FlintOpenSearchClient(FlintOptions options) { - this.options = options; - } - - @Override public void createIndex(String indexName, FlintMetadata metadata) { - String osIndexName = toLowercase(indexName); - try (RestHighLevelClient client = createClient()) { - CreateIndexRequest request = new CreateIndexRequest(osIndexName); - request.mapping(metadata.getContent(), XContentType.JSON); - - Option settings = metadata.indexSettings(); - if (settings.isDefined()) { - request.settings(settings.get(), XContentType.JSON); - } - client.indices().create(request, RequestOptions.DEFAULT); - } catch (Exception e) { - throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); + /** + * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string. + */ + private final static NamedXContentRegistry + xContentRegistry = + new NamedXContentRegistry(new SearchModule(Settings.builder().build(), + new ArrayList<>()).getNamedXContents()); + + private final FlintOptions options; + + public FlintOpenSearchClient(FlintOptions options) { + this.options = options; } - } - - @Override public boolean exists(String indexName) { - String osIndexName = toLowercase(indexName); - try (RestHighLevelClient client = createClient()) { - return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); - } catch (IOException e) { - throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); + + @Override + public void alias(String indexName, String aliasName, FlintMetadata metadata) { + String osIndexName = toLowercase(indexName); + String osAliasName = toLowercase(aliasName); + try (RestHighLevelClient client = createClient()) { + boolean exists = client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); + if (!exists) { + // create index including the alias name with is the flint convention name + createIndex(osIndexName, metadata); + } + // Adding the alias to the existing index + IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest(); + IndicesAliasesRequest.AliasActions aliasAction = + new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) + .index(osIndexName) + .alias(osAliasName); + aliasesRequest.addAliasAction(aliasAction); + // Executing the updateAliases request + AcknowledgedResponse response = client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge Alias %s for index %s", aliasName, indexName)); + } + } catch ( + Exception e) { + throw new IllegalStateException(format("Failed to create Alias %s for index %s ", aliasName, indexName), e); + } } - } - - @Override public List getAllIndexMetadata(String indexNamePattern) { - String osIndexNamePattern = toLowercase(indexNamePattern); - try (RestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); - GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - - return Arrays.stream(response.getIndices()) - .map(index -> FlintMetadata.apply( - response.getMappings().get(index).source().toString(), - response.getSettings().get(index).toString())) - .collect(Collectors.toList()); - } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); + + @Override + public void createIndex(String indexName, FlintMetadata metadata) { + String osIndexName = toLowercase(indexName); + try (RestHighLevelClient client = createClient()) { + CreateIndexRequest request = new CreateIndexRequest(osIndexName); + boolean includeMappingProperties = (metadata.targetName()!=null && !metadata.targetName().nonEmpty()); + request.mapping(metadata.getContent(includeMappingProperties), XContentType.JSON); + Option settings = metadata.indexSettings(); + if (settings.isDefined()) { + request.settings(settings.get(), XContentType.JSON); + } + CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge create index %s", indexName)); + } + } catch (Exception e) { + throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); + } } - } - - @Override public FlintMetadata getIndexMetadata(String indexName) { - String osIndexName = toLowercase(indexName); - try (RestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(osIndexName); - GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - - MappingMetadata mapping = response.getMappings().get(osIndexName); - Settings settings = response.getSettings().get(osIndexName); - return FlintMetadata.apply(mapping.source().string(), settings.toString()); - } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); + + @Override + public boolean exists(String indexName) { + String osIndexName = toLowercase(indexName); + try (RestHighLevelClient client = createClient()) { + return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); + } catch (IOException e) { + throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); + } } - } - @Override public void deleteIndex(String indexName) { - String osIndexName = toLowercase(indexName); - try (RestHighLevelClient client = createClient()) { - DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); + @Override + public List getAllIndexMetadata(String indexNamePattern) { + String osIndexNamePattern = toLowercase(indexNamePattern); + try (RestHighLevelClient client = createClient()) { + GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); + GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - client.indices().delete(request, RequestOptions.DEFAULT); - } catch (Exception e) { - throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); + return Arrays.stream(response.getIndices()) + .map(index -> FlintMetadata.apply( + response.getMappings().get(index).source().toString(), + response.getSettings().get(index).toString())) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); + } } - } - - /** - * Create {@link FlintReader}. - * - * @param indexName index name. - * @param query DSL query. DSL query is null means match_all. - * @return {@link FlintReader}. - */ - @Override public FlintReader createReader(String indexName, String query) { - try { - QueryBuilder queryBuilder = new MatchAllQueryBuilder(); - if (!Strings.isNullOrEmpty(query)) { - XContentParser - parser = - XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query); - queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); - } - return new OpenSearchScrollReader(createClient(), - toLowercase(indexName), - new SearchSourceBuilder().query(queryBuilder), - options); - } catch (IOException e) { - throw new RuntimeException(e); + + @Override + public FlintMetadata getIndexMetadata(String indexName) { + String osIndexName = toLowercase(indexName); + try (RestHighLevelClient client = createClient()) { + GetIndexRequest request = new GetIndexRequest(osIndexName); + GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); + if (response.getMappings().containsKey(osIndexName)) { + MappingMetadata mapping = response.getMappings().get(osIndexName); + Settings settings = response.getSettings().get(osIndexName); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); + } + if (!response.getAliases().isEmpty()) { + Optional aliasAncestor = response.getAliases().entrySet().stream() + .filter(entry -> entry.getValue().stream().anyMatch(alias -> alias.alias().equals(indexName))) + .map(Map.Entry::getKey) + .findFirst(); + + if(aliasAncestor.isPresent()) { + MappingMetadata mapping = response.getMappings().get(aliasAncestor.get()); + Settings settings = response.getSettings().get(aliasAncestor.get()); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); + } + } + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName); + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); + } } - } - - public FlintWriter createWriter(String indexName) { - return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); - } - - @Override public RestHighLevelClient createClient() { - RestClientBuilder - restClientBuilder = - RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); - - // SigV4 support - if (options.getAuth().equals(FlintOptions.SIGV4_AUTH)) { - AWS4Signer signer = new AWS4Signer(); - signer.setServiceName("es"); - signer.setRegionName(options.getRegion()); - - // Use DefaultAWSCredentialsProviderChain by default. - final AtomicReference awsCredentialsProvider = - new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); - String providerClass = options.getCustomAwsCredentialsProvider(); - if (!Strings.isNullOrEmpty(providerClass)) { - try { - Class awsCredentialsProviderClass = Class.forName(providerClass); - Constructor ctor = awsCredentialsProviderClass.getDeclaredConstructor(); - ctor.setAccessible(true); - awsCredentialsProvider.set((AWSCredentialsProvider) ctor.newInstance()); + + @Override + public void deleteIndex(String indexName) { + String osIndexName = toLowercase(indexName); + try (RestHighLevelClient client = createClient()) { + DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); + AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge delete index %s", indexName)); + } } catch (Exception e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); + } + } + + /** + * Create {@link FlintReader}. + * + * @param indexName index name. + * @param query DSL query. DSL query is null means match_all. + * @return {@link FlintReader}. + */ + @Override + public FlintReader createReader(String indexName, String query) { + try { + QueryBuilder queryBuilder = new MatchAllQueryBuilder(); + if (!Strings.isNullOrEmpty(query)) { + XContentParser + parser = + XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query); + queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); + } + return new OpenSearchScrollReader(createClient(), + toLowercase(indexName), + new SearchSourceBuilder().query(queryBuilder), + options); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public FlintWriter createWriter(String indexName) { + return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); + } + + @Override + public RestHighLevelClient createClient() { + RestClientBuilder + restClientBuilder = + RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); + + // SigV4 support + if (options.getAuth().equals(FlintOptions.SIGV4_AUTH)) { + AWS4Signer signer = new AWS4Signer(); + signer.setServiceName("es"); + signer.setRegionName(options.getRegion()); + + // Use DefaultAWSCredentialsProviderChain by default. + final AtomicReference awsCredentialsProvider = + new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); + String providerClass = options.getCustomAwsCredentialsProvider(); + if (!Strings.isNullOrEmpty(providerClass)) { + try { + Class awsCredentialsProviderClass = Class.forName(providerClass); + Constructor ctor = awsCredentialsProviderClass.getDeclaredConstructor(); + ctor.setAccessible(true); + awsCredentialsProvider.set((AWSCredentialsProvider) ctor.newInstance()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + restClientBuilder.setHttpClientConfigCallback(cb -> + cb.addInterceptorLast(new AWSRequestSigningApacheInterceptor(signer.getServiceName(), + signer, awsCredentialsProvider.get()))); + } else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(options.getUsername(), options.getPassword())); + restClientBuilder.setHttpClientConfigCallback( + httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } - } - restClientBuilder.setHttpClientConfigCallback(cb -> - cb.addInterceptorLast(new AWSRequestSigningApacheInterceptor(signer.getServiceName(), - signer, awsCredentialsProvider.get()))); - } else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(options.getUsername(), options.getPassword())); - restClientBuilder.setHttpClientConfigCallback( - httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + return new RestHighLevelClient(restClientBuilder); + } + + /* + * Because OpenSearch requires all lowercase letters in index name, we have to + * lowercase all letters in the given Flint index name. + */ + private String toLowercase(String indexName) { + Objects.requireNonNull(indexName); + + return indexName.toLowerCase(Locale.ROOT); } - return new RestHighLevelClient(restClientBuilder); - } - - /* - * Because OpenSearch requires all lowercase letters in index name, we have to - * lowercase all letters in the given Flint index name. - */ - private String toLowercase(String indexName) { - Objects.requireNonNull(indexName); - - return indexName.toLowerCase(Locale.ROOT); - } } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala index dc2f5fe6a..991eb51cb 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -62,6 +62,6 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { builder.schema("""{"properties": {"test_field": {"type": "os_type"}}}""") val metadata = builder.build() - metadata.getContent should matchJson(testMetadataJson) + metadata.getContent() should matchJson(testMetadataJson) } } diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index e8e0264f2..c1bc84765 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -55,7 +55,7 @@ coveringIndexStatement createCoveringIndexStatement : CREATE INDEX (IF NOT EXISTS)? indexName - ON tableName + ON tableName (USING indexName)? LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 4ac1ced5c..96a9d2596 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -139,6 +139,7 @@ nonReserved // Flint lexical tokens +USING: 'USING'; MIN_MAX: 'MIN_MAX'; SKIPPING: 'SKIPPING'; VALUE_SET: 'VALUE_SET'; diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala index c078f7fb6..318cb9dab 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala @@ -46,7 +46,7 @@ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Optio FlintClientBuilder .build(flintSparkConf.flintOptions()) .getIndexMetadata(name) - .getContent) + .getContent()) } } schema diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9c78a07f8..fad928bf7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -82,13 +82,19 @@ class FlintSpark(val spark: SparkSession) { */ def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = { val indexName = index.name() - if (flintClient.exists(indexName)) { - if (!ignoreIfExists) { - throw new IllegalStateException(s"Flint index $indexName already exists") - } + val targetName = index.targetName() + if (targetName.nonEmpty) { + // use targetIndex as the index to store the acceleration data + flintClient.alias(targetName.get, indexName, index.metadata()) } else { - val metadata = index.metadata() - flintClient.createIndex(indexName, metadata) + if (flintClient.exists(indexName)) { + if (!ignoreIfExists) { + throw new IllegalStateException(s"Flint index $indexName already exists") + } + } else { + val metadata = index.metadata() + flintClient.createIndex(indexName, metadata) + } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 0586bfc49..32de8f42a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -34,6 +34,12 @@ trait FlintSparkIndex { */ def name(): String + /** + * @return + * Flint target index name + */ + def targetName(): Option[String] + /** * @return * Flint index metadata diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index cda11405c..1f169d652 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -60,6 +60,7 @@ object FlintSparkIndexFactory { FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) case COVERING_INDEX_TYPE => FlintSparkCoveringIndex( + metadata.targetName, metadata.name, metadata.source, metadata.indexedColumns.map { colInfo => @@ -68,6 +69,7 @@ object FlintSparkIndexFactory { indexOptions) case MV_INDEX_TYPE => FlintSparkMaterializedView( + metadata.targetName, metadata.name, metadata.source, metadata.indexedColumns.map { colInfo => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e9c2b5be5..b368de0ec 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -18,6 +18,8 @@ import org.apache.spark.sql._ /** * Flint covering index in Spark. * + * @param targetIndexName + * optional index target name * @param indexName * index name * @param tableName @@ -26,6 +28,7 @@ import org.apache.spark.sql._ * indexed column list */ case class FlintSparkCoveringIndex( + targetIndexName: Option[String] = None, indexName: String, tableName: String, indexedColumns: Map[String, String], @@ -38,6 +41,15 @@ case class FlintSparkCoveringIndex( override def name(): String = getFlintIndexName(indexName, tableName) + /** + * @return + * Flint target index name - index that already exist or has existing template to be created + * with + */ + override def targetName(): Option[String] = { + targetIndexName + } + override def metadata(): FlintMetadata = { val indexColumnMaps = { indexedColumns.map { case (colName, colType) => @@ -48,6 +60,7 @@ case class FlintSparkCoveringIndex( metadataBuilder(this) .name(indexName) + .targetName(targetIndexName) .source(tableName) .indexedColumns(indexColumnMaps) .schema(schemaJson) @@ -93,6 +106,7 @@ object FlintSparkCoveringIndex { /** Builder class for covering index build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var targetIndexName: Option[String] = None private var indexName: String = "" private var indexedColumns: Map[String, String] = Map() @@ -109,6 +123,19 @@ object FlintSparkCoveringIndex { this } + /** + * Set covering index target name. + * + * @param indexName + * index name + * @return + * index builder + */ + def targetName(indexName: String): Builder = { + this.targetIndexName = Option.apply(indexName) + this + } + /** * Configure which source table the index is based on. * @@ -138,6 +165,11 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions) + new FlintSparkCoveringIndex( + targetIndexName, + indexName, + tableName, + indexedColumns, + indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index ee58ec7f5..a08c627bd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName} * index options */ case class FlintSparkMaterializedView( + targetIndexName: Option[String] = None, mvName: String, query: String, outputSchema: Map[String, String], @@ -51,6 +52,15 @@ case class FlintSparkMaterializedView( override def name(): String = getFlintIndexName(mvName) + /** + * @return + * Flint target index name - index that already exist or has existing template to be created + * with + */ + override def targetName(): Option[String] = { + targetIndexName + } + override def metadata(): FlintMetadata = { val indexColumnMaps = outputSchema.map { case (colName, colType) => @@ -60,6 +70,7 @@ case class FlintSparkMaterializedView( metadataBuilder(this) .name(mvName) + .targetName(targetIndexName) .source(query) .indexedColumns(indexColumnMaps) .schema(schemaJson) @@ -150,9 +161,23 @@ object FlintSparkMaterializedView { /** Builder class for MV build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var targetIndexName: Option[String] = None private var mvName: String = "" private var query: String = "" + /** + * Set covering index target name. + * + * @param indexName + * index name + * @return + * index builder + */ + def targetName(indexName: String): Builder = { + this.targetIndexName = Option.apply(indexName) + this + } + /** * Set MV name. * @@ -188,7 +213,7 @@ object FlintSparkMaterializedView { field.name -> field.dataType.typeName } .toMap - FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions) + FlintSparkMaterializedView(targetIndexName, mvName, query, outputSchema, indexOptions) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index eb2075b63..4144bd013 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -39,6 +39,14 @@ case class FlintSparkSkippingIndex( /** Skipping index type */ override val kind: String = SKIPPING_INDEX_TYPE + /** + * @return + * Flint target index name ( in skipping index case not allowing using existing indices) + */ + def targetName(): Option[String] = { + None + } + override def name(): String = { getSkippingIndexName(tableName) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index c0bb47830..eb3b7b2cb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -27,7 +27,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitCreateCoveringIndexStatement( ctx: CreateCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => - val indexName = ctx.indexName.getText + val indexName = ctx.indexName.get(0).getText val tableName = getFullTableName(flint, ctx.tableName) val indexBuilder = flint @@ -41,6 +41,10 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } val ignoreIfExists = ctx.EXISTS() != null + if (ctx.USING() != null) { + indexBuilder.targetName(ctx.indexName().get(1).getText) + } + val indexOptions = visitPropertyList(ctx.propertyList()) indexBuilder .options(indexOptions) @@ -48,7 +52,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A // Trigger auto refresh if enabled if (indexOptions.autoRefresh()) { - val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + val flintIndexName = getFlintIndexName(flint, ctx.indexName.get(0), ctx.tableName) flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) } Seq.empty diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 8c144b46b..71e542183 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -13,12 +13,16 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("get covering index name") { val index = - new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) + new FlintSparkCoveringIndex( + None, + "ci", + "spark_catalog.default.test", + Map("name" -> "string")) index.name() shouldBe "flint_spark_catalog_default_test_ci_index" } test("should fail if get index name without full table name") { - val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) + val index = new FlintSparkCoveringIndex(None, "ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { index.name() } @@ -26,7 +30,7 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { - new FlintSparkCoveringIndex("ci", "default.test", Map.empty) + new FlintSparkCoveringIndex(None, "ci", "default.test", Map.empty) } } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index c28495c69..13d181e5e 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -36,20 +36,20 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testQuery = "SELECT 1" test("get name") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) mv.name() shouldBe "flint_spark_catalog_default_mv" } test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy - FlintSparkMaterializedView("mv", testQuery, Map.empty).name() + FlintSparkMaterializedView(None, "mv", testQuery, Map.empty).name() the[IllegalArgumentException] thrownBy - FlintSparkMaterializedView("default.mv", testQuery, Map.empty).name() + FlintSparkMaterializedView(None, "default.mv", testQuery, Map.empty).name() } test("get metadata") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map("test_col" -> "integer")) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map("test_col" -> "integer")) val metadata = mv.metadata() metadata.name shouldBe mv.mvName @@ -65,6 +65,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val indexOptions = FlintSparkIndexOptions(Map("auto_refresh" -> "true", "index_settings" -> indexSettings)) val mv = FlintSparkMaterializedView( + None, testMvName, testQuery, Map("test_col" -> "integer"), @@ -77,12 +78,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } test("build batch data frame") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) mv.build(spark, None).collect() shouldBe Array(Row(1)) } test("should fail if build given other source data frame") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) the[IllegalArgumentException] thrownBy mv.build(spark, Some(mock[DataFrame])) } @@ -100,7 +101,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( @@ -127,7 +128,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( @@ -146,6 +147,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") val mv = FlintSparkMaterializedView( + None, testMvName, s"SELECT name, age FROM $testTable WHERE age > 30", Map.empty) @@ -165,6 +167,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") val mv = FlintSparkMaterializedView( + None, testMvName, s"SELECT name, COUNT(*) AS count FROM $testTable GROUP BY name", Map.empty) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index d52c43842..12dc81c34 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -345,7 +345,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { } private def schemaShouldMatch(metadata: FlintMetadata, expected: String): Unit = { - val actual = parse(metadata.getContent) \ "properties" + val actual = parse(metadata.getContent()) \ "properties" assert(actual == parse(expected)) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 5c799128c..1f1e72250 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -10,6 +10,7 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.mockito.ArgumentMatchers.anyBoolean import org.mockito.Mockito.when import org.opensearch.client.json.jackson.JacksonJsonpMapper import org.opensearch.client.opensearch.OpenSearchClient @@ -46,8 +47,9 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M |""".stripMargin val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn(content) + when(metadata.getContent(anyBoolean())).thenReturn(content) when(metadata.indexSettings).thenReturn(None) + when(metadata.targetName).thenReturn(None) flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true @@ -58,7 +60,8 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val indexName = "flint_test_with_settings" val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") + when(metadata.getContent(anyBoolean())).thenReturn("{}") + when(metadata.targetName).thenReturn(None) when(metadata.indexSettings).thenReturn(Some(indexSettings)) flintClient.createIndex(indexName, metadata) @@ -73,14 +76,15 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M it should "get all index metadata with the given index name pattern" in { val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") + when(metadata.getContent(anyBoolean())).thenReturn("{}") when(metadata.indexSettings).thenReturn(None) + when(metadata.targetName).thenReturn(None) flintClient.createIndex("flint_test_1_index", metadata) flintClient.createIndex("flint_test_2_index", metadata) val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 - allMetadata.forEach(metadata => metadata.getContent should not be empty) + allMetadata.forEach(metadata => metadata.getContent() should not be empty) allMetadata.forEach(metadata => metadata.indexSettings should not be empty) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index a4b0069dd..cbb71a98f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -44,7 +44,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "version": "${current()}", | "name": "name_and_age", diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 627e11f52..d3774eb37 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -25,6 +25,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { /** Test table and index name */ private val testTable = "spark_catalog.default.covering_sql_test" private val testIndex = "name_and_age" + private val targetIndex = "target_index" private val testFlintIndex = getFlintIndexName(testIndex, testTable) override def beforeAll(): Unit = { @@ -32,7 +33,6 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { createPartitionedTable(testTable) } - override def afterEach(): Unit = { super.afterEach() @@ -93,7 +93,6 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } - test("create covering index with invalid option") { the[IllegalArgumentException] thrownBy sql(s""" @@ -232,4 +231,29 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe empty } + + test("use existing index as the covering index") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable USING $targetIndex ( name ) + | WITH ( + | index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}' + | ) + |""".stripMargin) + + // Check if the index setting option is set to OS index setting + val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + var settings = parse(flintClient.getIndexMetadata(targetIndex).indexSettings.get) + (settings \ "index.number_of_shards").extract[String] shouldBe "2" + (settings \ "index.number_of_replicas").extract[String] shouldBe "3" + // validate the index alias is working + settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + (settings \ "index.number_of_shards").extract[String] shouldBe "2" + (settings \ "index.number_of_replicas").extract[String] shouldBe "3" + + // remove test index + flint.deleteIndex(targetIndex) + } + } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 29ab433c6..1377c2384 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -22,6 +22,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { private val testTable = "spark_catalog.default.mv_test" private val testMvName = "spark_catalog.default.mv_test_metrics" private val testFlintIndex = getFlintIndexName(testMvName) + private val testTargetIndex = "existing_index" private val testQuery = s""" | SELECT @@ -53,7 +54,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s""" + index.get.metadata().getContent() should matchJson(s""" | { | "_meta": { | "version": "${current()}", @@ -86,7 +87,6 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | } |""".stripMargin) } - // TODO: fix this windowing function unable to be used in GROUP BY ignore("full refresh materialized view") { flint @@ -184,6 +184,56 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } + test("use existing existing OpenSearch index for materialized view successfully") { + val indexOptions = + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) + flint + .materializedView() + .targetName(testTargetIndex) + .name(testMvName) + .query(testQuery) + .options(indexOptions) + .create() + + val index = flint.describeIndex(testTargetIndex) + index shouldBe defined + index.get.metadata().getContent() should matchJson(s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "spark_catalog.default.mv_test_metrics", + | "kind": "mv", + | "targetName": "$testTargetIndex", + | "source": "$testQuery", + | "indexedColumns": [ + | { + | "columnName": "startTime", + | "columnType": "timestamp" + | },{ + | "columnName": "count", + | "columnType": "long" + | }], + | "options": { + | "auto_refresh": "true", + | "checkpoint_location": "s3://test/" + | }, + | "properties": {} + | }, + | "properties": { + | "startTime": { + | "type": "date", + | "format": "strict_date_optional_time_nanos" + | }, + | "count": { + | "type": "long" + | } + | } + | } + |""".stripMargin) + // remove test index + flint.deleteIndex(testTargetIndex) + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) private def withIncrementalMaterializedView(query: String)( diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e3fb467e6..b29ab0e2c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -7,6 +7,7 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ +import org.mockito.ArgumentMatchers.anyBoolean import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN @@ -52,7 +53,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_test_skipping_index", | "version": "${current()}", @@ -122,8 +123,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - val optionJson = compact(render( - parse(index.get.metadata().getContent) \ "_meta" \ "options")) + val optionJson = + compact(render(parse(index.get.metadata().getContent()) \ "_meta" \ "options")) optionJson should matchJson(""" | { | "auto_refresh": "true", @@ -321,8 +322,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { |""".stripMargin) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex( - hasIndexFilter(col("year") === 2023)) + useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023)) } test("should not rewrite original query if filtering condition has disjunction") { @@ -388,8 +388,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { // Prepare test table val testTable = "spark_catalog.default.data_type_table" val testIndex = getSkippingIndexName(testTable) - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | boolean_col BOOLEAN, @@ -408,8 +407,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | ) | USING PARQUET |""".stripMargin) - sql( - s""" + sql(s""" | INSERT INTO $testTable | VALUES ( | TRUE, @@ -449,8 +447,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson( - s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_data_type_table_skipping_index", | "version": "${current()}", @@ -587,8 +584,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { test("can build skipping index for varchar and char and rewrite applicable query") { val testTable = "spark_catalog.default.varchar_char_table" val testIndex = getSkippingIndexName(testTable) - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | varchar_col VARCHAR(20), @@ -596,8 +592,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | ) | USING PARQUET |""".stripMargin) - sql( - s""" + sql(s""" | INSERT INTO $testTable | VALUES ( | "sample varchar", @@ -613,8 +608,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .create() flint.refreshIndex(testIndex, FULL) - val query = sql( - s""" + val query = sql(s""" | SELECT varchar_col, char_col | FROM $testTable | WHERE varchar_col = "sample varchar" AND char_col = "sample char" @@ -624,8 +618,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val paddedChar = "sample char".padTo(20, ' ') checkAnswer(query, Row("sample varchar", paddedChar)) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex(hasIndexFilter( - col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) flint.deleteIndex(testIndex) }