From 79936aef36a2cc46de27cb0f999327964fd10375 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 12:49:06 -0700 Subject: [PATCH 01/13] add USING statement for covering / MV indices for using existing index / index template in opensearch Signed-off-by: YANGDB --- .../flint/core/metadata/FlintMetadata.scala | 12 ++++++++- .../main/antlr4/FlintSparkSqlExtensions.g4 | 2 +- .../src/main/antlr4/SparkSqlBase.g4 | 1 + .../flint/spark/FlintSparkIndex.scala | 6 +++++ .../flint/spark/FlintSparkIndexFactory.scala | 2 ++ .../covering/FlintSparkCoveringIndex.scala | 27 ++++++++++++++++++- .../spark/mv/FlintSparkMaterializedView.scala | 25 ++++++++++++++++- .../skipping/FlintSparkSkippingIndex.scala | 8 ++++++ .../FlintSparkCoveringIndexAstBuilder.scala | 8 ++++-- 9 files changed, 85 insertions(+), 6 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index ea0fb0f98..98b9dbfec 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -18,6 +18,8 @@ import org.opensearch.flint.core.metadata.FlintJsonHelper._ case class FlintMetadata( /** Flint spec version */ version: FlintVersion, + /** Flint index target name */ + targetName: Option[String], /** Flint index name */ name: String, /** Flint index kind */ @@ -53,11 +55,11 @@ case class FlintMetadata( objectField(builder, "_meta") { builder .field("version", version.version) + .field("targetName", targetName.getOrElse(name) ) .field("name", name) .field("kind", kind) .field("source", source) .field("indexedColumns", indexedColumns) - optionalObjectField(builder, "options", options) optionalObjectField(builder, "properties", properties) } @@ -109,6 +111,7 @@ object FlintMetadata { innerFieldName match { case "version" => builder.version(FlintVersion.apply(parser.text())) case "name" => builder.name(parser.text()) + case "targetName" => builder.targetName(parser.text()) case "kind" => builder.kind(parser.text()) case "source" => builder.source(parser.text()) case "indexedColumns" => @@ -142,6 +145,7 @@ object FlintMetadata { class Builder { private var version: FlintVersion = FlintVersion.current() private var name: String = "" + private var targetName: Option[String] = None private var kind: String = "" private var source: String = "" private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() @@ -160,6 +164,11 @@ object FlintMetadata { this } + def targetName(name: String): this.type = { + this.targetName = Option(name) + this + } + def kind(kind: String): this.type = { this.kind = kind this @@ -219,6 +228,7 @@ object FlintMetadata { def build(): FlintMetadata = { FlintMetadata( if (version == null) current() else version, + targetName, name, kind, source, diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index e8e0264f2..c1bc84765 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -55,7 +55,7 @@ coveringIndexStatement createCoveringIndexStatement : CREATE INDEX (IF NOT EXISTS)? indexName - ON tableName + ON tableName (USING indexName)? LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 4ac1ced5c..96a9d2596 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -139,6 +139,7 @@ nonReserved // Flint lexical tokens +USING: 'USING'; MIN_MAX: 'MIN_MAX'; SKIPPING: 'SKIPPING'; VALUE_SET: 'VALUE_SET'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 0586bfc49..2279ad421 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -34,6 +34,12 @@ trait FlintSparkIndex { */ def name(): String + /** + * @return + * Flint target index name + */ + def targetName(): String + /** * @return * Flint index metadata diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index cda11405c..1f169d652 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -60,6 +60,7 @@ object FlintSparkIndexFactory { FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) case COVERING_INDEX_TYPE => FlintSparkCoveringIndex( + metadata.targetName, metadata.name, metadata.source, metadata.indexedColumns.map { colInfo => @@ -68,6 +69,7 @@ object FlintSparkIndexFactory { indexOptions) case MV_INDEX_TYPE => FlintSparkMaterializedView( + metadata.targetName, metadata.name, metadata.source, metadata.indexedColumns.map { colInfo => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e9c2b5be5..f18fd3997 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -18,6 +18,8 @@ import org.apache.spark.sql._ /** * Flint covering index in Spark. * + * @param targetIndexName + * optional index target name * @param indexName * index name * @param tableName @@ -26,6 +28,7 @@ import org.apache.spark.sql._ * indexed column list */ case class FlintSparkCoveringIndex( + targetIndexName: Option[String], indexName: String, tableName: String, indexedColumns: Map[String, String], @@ -38,6 +41,14 @@ case class FlintSparkCoveringIndex( override def name(): String = getFlintIndexName(indexName, tableName) + /** + * @return + * Flint target index name - index that already exist or has existing template to be created with + */ + override def targetName(): String = { + targetIndexName.getOrElse(name()) + } + override def metadata(): FlintMetadata = { val indexColumnMaps = { indexedColumns.map { case (colName, colType) => @@ -93,6 +104,7 @@ object FlintSparkCoveringIndex { /** Builder class for covering index build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var targetIndexName: String = "" private var indexName: String = "" private var indexedColumns: Map[String, String] = Map() @@ -109,6 +121,19 @@ object FlintSparkCoveringIndex { this } + /** + * Set covering index target name. + * + * @param indexName + * index name + * @return + * index builder + */ + def targetName(indexName: String): Builder = { + this.targetIndexName = indexName + this + } + /** * Configure which source table the index is based on. * @@ -138,6 +163,6 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions) + new FlintSparkCoveringIndex(Option.apply(targetIndexName), indexName, tableName, indexedColumns, indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index ee58ec7f5..fbb985734 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName} * index options */ case class FlintSparkMaterializedView( + targetIndexName: Option[String], mvName: String, query: String, outputSchema: Map[String, String], @@ -51,6 +52,14 @@ case class FlintSparkMaterializedView( override def name(): String = getFlintIndexName(mvName) + /** + * @return + * Flint target index name - index that already exist or has existing template to be created with + */ + override def targetName(): String = { + targetIndexName.getOrElse(name()) + } + override def metadata(): FlintMetadata = { val indexColumnMaps = outputSchema.map { case (colName, colType) => @@ -150,9 +159,23 @@ object FlintSparkMaterializedView { /** Builder class for MV build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var targetIndexName: String = "" private var mvName: String = "" private var query: String = "" + /** + * Set covering index target name. + * + * @param indexName + * index name + * @return + * index builder + */ + def targetName(indexName: String): Builder = { + this.targetIndexName = indexName + this + } + /** * Set MV name. * @@ -188,7 +211,7 @@ object FlintSparkMaterializedView { field.name -> field.dataType.typeName } .toMap - FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions) + FlintSparkMaterializedView(Option.apply(targetIndexName), mvName, query, outputSchema, indexOptions) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index eb2075b63..4ead3206b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -39,6 +39,14 @@ case class FlintSparkSkippingIndex( /** Skipping index type */ override val kind: String = SKIPPING_INDEX_TYPE + /** + * @return + * Flint target index name ( in skipping index case not allowing using existing indices) + */ + def targetName(): String = { + name() + } + override def name(): String = { getSkippingIndexName(tableName) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index c0bb47830..eb3b7b2cb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -27,7 +27,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitCreateCoveringIndexStatement( ctx: CreateCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => - val indexName = ctx.indexName.getText + val indexName = ctx.indexName.get(0).getText val tableName = getFullTableName(flint, ctx.tableName) val indexBuilder = flint @@ -41,6 +41,10 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } val ignoreIfExists = ctx.EXISTS() != null + if (ctx.USING() != null) { + indexBuilder.targetName(ctx.indexName().get(1).getText) + } + val indexOptions = visitPropertyList(ctx.propertyList()) indexBuilder .options(indexOptions) @@ -48,7 +52,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A // Trigger auto refresh if enabled if (indexOptions.autoRefresh()) { - val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + val flintIndexName = getFlintIndexName(flint, ctx.indexName.get(0), ctx.tableName) flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) } Seq.empty From d2677d01e0cde32b6454c19665849faadd518916 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 15:00:05 -0700 Subject: [PATCH 02/13] update the flint client api Signed-off-by: YANGDB --- .../opensearch/flint/core/FlintClient.java | 7 + .../core/storage/FlintOpenSearchClient.java | 338 ++++++++++-------- .../opensearch/flint/spark/FlintSpark.scala | 6 +- .../flint/spark/FlintSparkIndex.scala | 2 +- .../covering/FlintSparkCoveringIndex.scala | 6 +- .../spark/mv/FlintSparkMaterializedView.scala | 6 +- .../skipping/FlintSparkSkippingIndex.scala | 4 +- .../FlintSparkMaterializedViewITSuite.scala | 47 +++ 8 files changed, 260 insertions(+), 156 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index d50c0002e..6873dff2d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -26,6 +26,13 @@ public interface FlintClient { */ void createIndex(String indexName, FlintMetadata metadata); + /** + * Create an alias name for the given index + * @param indexName + * @param aliasName + */ + void alias(String indexName, String aliasName, FlintMetadata metadata); + /** * Does Flint index with the given name exist * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index ff2761856..8ce130c35 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -5,11 +5,13 @@ package org.opensearch.flint.core.storage; +import static java.lang.String.format; import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; import com.amazonaws.auth.AWS4Signer; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; + import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; @@ -19,17 +21,22 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; + import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.opensearch.action.admin.indices.alias.Alias; +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.cluster.metadata.MappingMetadata; @@ -40,6 +47,7 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.FlintVersion; import org.opensearch.flint.core.auth.AWSRequestSigningApacheInterceptor; import org.opensearch.flint.core.metadata.FlintMetadata; import org.opensearch.index.query.AbstractQueryBuilder; @@ -54,161 +62,199 @@ */ public class FlintOpenSearchClient implements FlintClient { - /** - * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string. - */ - private final static NamedXContentRegistry - xContentRegistry = - new NamedXContentRegistry(new SearchModule(Settings.builder().build(), - new ArrayList<>()).getNamedXContents()); - - private final FlintOptions options; - - public FlintOpenSearchClient(FlintOptions options) { - this.options = options; - } - - @Override public void createIndex(String indexName, FlintMetadata metadata) { - String osIndexName = toLowercase(indexName); - try (RestHighLevelClient client = createClient()) { - CreateIndexRequest request = new CreateIndexRequest(osIndexName); - request.mapping(metadata.getContent(), XContentType.JSON); - - Option settings = metadata.indexSettings(); - if (settings.isDefined()) { - request.settings(settings.get(), XContentType.JSON); - } - client.indices().create(request, RequestOptions.DEFAULT); - } catch (Exception e) { - throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); + /** + * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string. + */ + private final static NamedXContentRegistry + xContentRegistry = + new NamedXContentRegistry(new SearchModule(Settings.builder().build(), + new ArrayList<>()).getNamedXContents()); + + private final FlintOptions options; + + public FlintOpenSearchClient(FlintOptions options) { + this.options = options; } - } - - @Override public boolean exists(String indexName) { - String osIndexName = toLowercase(indexName); - try (RestHighLevelClient client = createClient()) { - return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); - } catch (IOException e) { - throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); + + @Override + public void alias(String indexName, String aliasName, FlintMetadata metadata) { + String osIndexName = toLowercase(indexName); + String osAliasName = toLowercase(aliasName); + try (RestHighLevelClient client = createClient()) { + boolean exists = client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); + if (!exists) { + // create index including the alias name with is the flint convention name + createIndex(osIndexName, metadata); + } else { + // Adding the alias to the existing index + IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest(); + IndicesAliasesRequest.AliasActions aliasAction = + new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) + .index(osIndexName) + .alias(osAliasName); + aliasesRequest.addAliasAction(aliasAction); + // Executing the updateAliases request + AcknowledgedResponse response = client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge Alias %s for index %s", aliasName, indexName)); + } + } + } catch ( + Exception e) { + throw new IllegalStateException(format("Failed to create Alias %s for index %s ", aliasName, indexName), e); + } } - } - - @Override public List getAllIndexMetadata(String indexNamePattern) { - String osIndexNamePattern = toLowercase(indexNamePattern); - try (RestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); - GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - - return Arrays.stream(response.getIndices()) - .map(index -> FlintMetadata.apply( - response.getMappings().get(index).source().toString(), - response.getSettings().get(index).toString())) - .collect(Collectors.toList()); - } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); + + @Override + public void createIndex(String indexName, FlintMetadata metadata) { + String osIndexName = toLowercase(indexName); + try (RestHighLevelClient client = createClient()) { + CreateIndexRequest request = new CreateIndexRequest(osIndexName); + request.mapping(metadata.getContent(), XContentType.JSON); + metadata.targetName().exists(name -> { + return request.alias(new Alias(toLowercase(metadata.name()))); + }); + Option settings = metadata.indexSettings(); + if (settings.isDefined()) { + request.settings(settings.get(), XContentType.JSON); + } + client.indices().create(request, RequestOptions.DEFAULT); + } catch (Exception e) { + throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); + } } - } - - @Override public FlintMetadata getIndexMetadata(String indexName) { - String osIndexName = toLowercase(indexName); - try (RestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(osIndexName); - GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - - MappingMetadata mapping = response.getMappings().get(osIndexName); - Settings settings = response.getSettings().get(osIndexName); - return FlintMetadata.apply(mapping.source().string(), settings.toString()); - } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); + + @Override + public boolean exists(String indexName) { + String osIndexName = toLowercase(indexName); + try (RestHighLevelClient client = createClient()) { + return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); + } catch (IOException e) { + throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); + } + } + + @Override + public List getAllIndexMetadata(String indexNamePattern) { + String osIndexNamePattern = toLowercase(indexNamePattern); + try (RestHighLevelClient client = createClient()) { + GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); + GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); + + return Arrays.stream(response.getIndices()) + .map(index -> FlintMetadata.apply( + response.getMappings().get(index).source().toString(), + response.getSettings().get(index).toString())) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); + } } - } - @Override public void deleteIndex(String indexName) { - String osIndexName = toLowercase(indexName); - try (RestHighLevelClient client = createClient()) { - DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); + @Override + public FlintMetadata getIndexMetadata(String indexName) { + String osIndexName = toLowercase(indexName); + try (RestHighLevelClient client = createClient()) { + GetIndexRequest request = new GetIndexRequest(osIndexName); + GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - client.indices().delete(request, RequestOptions.DEFAULT); - } catch (Exception e) { - throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); + MappingMetadata mapping = response.getMappings().get(osIndexName); + Settings settings = response.getSettings().get(osIndexName); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); + } } - } - - /** - * Create {@link FlintReader}. - * - * @param indexName index name. - * @param query DSL query. DSL query is null means match_all. - * @return {@link FlintReader}. - */ - @Override public FlintReader createReader(String indexName, String query) { - try { - QueryBuilder queryBuilder = new MatchAllQueryBuilder(); - if (!Strings.isNullOrEmpty(query)) { - XContentParser - parser = - XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query); - queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); - } - return new OpenSearchScrollReader(createClient(), - toLowercase(indexName), - new SearchSourceBuilder().query(queryBuilder), - options); - } catch (IOException e) { - throw new RuntimeException(e); + + @Override + public void deleteIndex(String indexName) { + String osIndexName = toLowercase(indexName); + try (RestHighLevelClient client = createClient()) { + DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); + + client.indices().delete(request, RequestOptions.DEFAULT); + } catch (Exception e) { + throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); + } } - } - - public FlintWriter createWriter(String indexName) { - return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); - } - - @Override public RestHighLevelClient createClient() { - RestClientBuilder - restClientBuilder = - RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); - - // SigV4 support - if (options.getAuth().equals(FlintOptions.SIGV4_AUTH)) { - AWS4Signer signer = new AWS4Signer(); - signer.setServiceName("es"); - signer.setRegionName(options.getRegion()); - - // Use DefaultAWSCredentialsProviderChain by default. - final AtomicReference awsCredentialsProvider = - new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); - String providerClass = options.getCustomAwsCredentialsProvider(); - if (!Strings.isNullOrEmpty(providerClass)) { + + /** + * Create {@link FlintReader}. + * + * @param indexName index name. + * @param query DSL query. DSL query is null means match_all. + * @return {@link FlintReader}. + */ + @Override + public FlintReader createReader(String indexName, String query) { try { - Class awsCredentialsProviderClass = Class.forName(providerClass); - Constructor ctor = awsCredentialsProviderClass.getDeclaredConstructor(); - ctor.setAccessible(true); - awsCredentialsProvider.set((AWSCredentialsProvider) ctor.newInstance()); - } catch (Exception e) { - throw new RuntimeException(e); + QueryBuilder queryBuilder = new MatchAllQueryBuilder(); + if (!Strings.isNullOrEmpty(query)) { + XContentParser + parser = + XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query); + queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); + } + return new OpenSearchScrollReader(createClient(), + toLowercase(indexName), + new SearchSourceBuilder().query(queryBuilder), + options); + } catch (IOException e) { + throw new RuntimeException(e); } - } - restClientBuilder.setHttpClientConfigCallback(cb -> - cb.addInterceptorLast(new AWSRequestSigningApacheInterceptor(signer.getServiceName(), - signer, awsCredentialsProvider.get()))); - } else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(options.getUsername(), options.getPassword())); - restClientBuilder.setHttpClientConfigCallback( - httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } - return new RestHighLevelClient(restClientBuilder); - } - - /* - * Because OpenSearch requires all lowercase letters in index name, we have to - * lowercase all letters in the given Flint index name. - */ - private String toLowercase(String indexName) { - Objects.requireNonNull(indexName); - - return indexName.toLowerCase(Locale.ROOT); - } + + public FlintWriter createWriter(String indexName) { + return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); + } + + @Override + public RestHighLevelClient createClient() { + RestClientBuilder + restClientBuilder = + RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme())); + + // SigV4 support + if (options.getAuth().equals(FlintOptions.SIGV4_AUTH)) { + AWS4Signer signer = new AWS4Signer(); + signer.setServiceName("es"); + signer.setRegionName(options.getRegion()); + + // Use DefaultAWSCredentialsProviderChain by default. + final AtomicReference awsCredentialsProvider = + new AtomicReference<>(new DefaultAWSCredentialsProviderChain()); + String providerClass = options.getCustomAwsCredentialsProvider(); + if (!Strings.isNullOrEmpty(providerClass)) { + try { + Class awsCredentialsProviderClass = Class.forName(providerClass); + Constructor ctor = awsCredentialsProviderClass.getDeclaredConstructor(); + ctor.setAccessible(true); + awsCredentialsProvider.set((AWSCredentialsProvider) ctor.newInstance()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + restClientBuilder.setHttpClientConfigCallback(cb -> + cb.addInterceptorLast(new AWSRequestSigningApacheInterceptor(signer.getServiceName(), + signer, awsCredentialsProvider.get()))); + } else if (options.getAuth().equals(FlintOptions.BASIC_AUTH)) { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(options.getUsername(), options.getPassword())); + restClientBuilder.setHttpClientConfigCallback( + httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + return new RestHighLevelClient(restClientBuilder); + } + + /* + * Because OpenSearch requires all lowercase letters in index name, we have to + * lowercase all letters in the given Flint index name. + */ + private String toLowercase(String indexName) { + Objects.requireNonNull(indexName); + + return indexName.toLowerCase(Locale.ROOT); + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9c78a07f8..e112a354d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -82,7 +82,11 @@ class FlintSpark(val spark: SparkSession) { */ def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = { val indexName = index.name() - if (flintClient.exists(indexName)) { + val targetName = index.targetName() + if (targetName.nonEmpty) { + //use targetIndex as the index to store the acceleration data + flintClient.alias(targetName.get, indexName, index.metadata()) + } else if (flintClient.exists(indexName)) { if (!ignoreIfExists) { throw new IllegalStateException(s"Flint index $indexName already exists") } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 2279ad421..d7cdfb996 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -38,7 +38,7 @@ trait FlintSparkIndex { * @return * Flint target index name */ - def targetName(): String + def targetName(): Option[String] /** * @return diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index f18fd3997..60d0fef93 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql._ * indexed column list */ case class FlintSparkCoveringIndex( - targetIndexName: Option[String], + targetIndexName: Option[String] = None, indexName: String, tableName: String, indexedColumns: Map[String, String], @@ -45,8 +45,8 @@ case class FlintSparkCoveringIndex( * @return * Flint target index name - index that already exist or has existing template to be created with */ - override def targetName(): String = { - targetIndexName.getOrElse(name()) + override def targetName(): Option[String] = { + targetIndexName } override def metadata(): FlintMetadata = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index fbb985734..567bc13e4 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName} * index options */ case class FlintSparkMaterializedView( - targetIndexName: Option[String], + targetIndexName: Option[String] = None, mvName: String, query: String, outputSchema: Map[String, String], @@ -56,8 +56,8 @@ case class FlintSparkMaterializedView( * @return * Flint target index name - index that already exist or has existing template to be created with */ - override def targetName(): String = { - targetIndexName.getOrElse(name()) + override def targetName(): Option[String] = { + targetIndexName } override def metadata(): FlintMetadata = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 4ead3206b..cf6f61a77 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -43,8 +43,8 @@ case class FlintSparkSkippingIndex( * @return * Flint target index name ( in skipping index case not allowing using existing indices) */ - def targetName(): String = { - name() + def targetName(): Option[String] = { + None } override def name(): String = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 29ab433c6..81b3819d9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -86,6 +86,53 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | } |""".stripMargin) } + + test("create materialized view using existing OpebnSearch index successfully") { + val indexOptions = + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) + flint + .materializedView() + .targetName("existing_index") + .name(testMvName) + .query(testQuery) + .options(indexOptions) + .create() + + val index = flint.describeIndex("existing_index") + index shouldBe defined + index.get.metadata().getContent should matchJson(s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "spark_catalog.default.mv_test_metrics", + | "kind": "mv", + | "source": "$testQuery", + | "indexedColumns": [ + | { + | "columnName": "startTime", + | "columnType": "timestamp" + | },{ + | "columnName": "count", + | "columnType": "long" + | }], + | "options": { + | "auto_refresh": "true", + | "checkpoint_location": "s3://test/" + | }, + | "properties": {} + | }, + | "properties": { + | "startTime": { + | "type": "date", + | "format": "strict_date_optional_time_nanos" + | }, + | "count": { + | "type": "long" + | } + | } + | } + |""".stripMargin) + } // TODO: fix this windowing function unable to be used in GROUP BY ignore("full refresh materialized view") { From 42530a8bbc9ca72b82724504c076d8fefa5ef1ba Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 16:28:59 -0700 Subject: [PATCH 03/13] update the flint client createIndex api add metadata creation parameters Signed-off-by: YANGDB --- .../flint/core/metadata/FlintMetadata.scala | 22 ++++++++++++------- .../core/storage/FlintOpenSearchClient.java | 10 +++++---- .../core/metadata/FlintMetadataSuite.scala | 2 +- .../apache/spark/sql/flint/FlintTable.scala | 2 +- .../opensearch/flint/spark/FlintSpark.scala | 14 +++++++----- .../covering/FlintSparkCoveringIndex.scala | 7 +++--- .../spark/mv/FlintSparkMaterializedView.scala | 7 +++--- .../FlintSparkCoveringIndexSuite.scala | 6 ++--- .../mv/FlintSparkMaterializedViewSuite.scala | 22 +++++++++---------- .../FlintSparkSkippingIndexSuite.scala | 2 +- .../core/FlintOpenSearchClientSuite.scala | 8 +++---- .../FlintSparkCoveringIndexITSuite.scala | 2 +- .../FlintSparkMaterializedViewITSuite.scala | 10 +++++---- .../FlintSparkSkippingIndexITSuite.scala | 6 ++--- 14 files changed, 67 insertions(+), 53 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 98b9dbfec..ae34b82df 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -48,24 +48,30 @@ case class FlintMetadata( * @return * JSON content */ - def getContent: String = { + def getContent(includeProperties: Boolean = true): String = { try { buildJson(builder => { // Add _meta field objectField(builder, "_meta") { builder .field("version", version.version) - .field("targetName", targetName.getOrElse(name) ) .field("name", name) .field("kind", kind) .field("source", source) .field("indexedColumns", indexedColumns) + // Only add targetName if it's not empty + targetName.foreach(tn => builder.field("targetName", tn)) optionalObjectField(builder, "options", options) - optionalObjectField(builder, "properties", properties) + + if (includeProperties) { + optionalObjectField(builder, "properties", properties) + } } // Add properties (schema) field - builder.field("properties", schema) + if (includeProperties) { + builder.field("properties", schema) + } }) } catch { case e: Exception => @@ -111,7 +117,7 @@ object FlintMetadata { innerFieldName match { case "version" => builder.version(FlintVersion.apply(parser.text())) case "name" => builder.name(parser.text()) - case "targetName" => builder.targetName(parser.text()) + case "targetName" => builder.targetName(Option.apply(parser.text())) case "kind" => builder.kind(parser.text()) case "source" => builder.source(parser.text()) case "indexedColumns" => @@ -144,8 +150,8 @@ object FlintMetadata { */ class Builder { private var version: FlintVersion = FlintVersion.current() - private var name: String = "" private var targetName: Option[String] = None + private var name: String = "" private var kind: String = "" private var source: String = "" private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() @@ -164,8 +170,8 @@ object FlintMetadata { this } - def targetName(name: String): this.type = { - this.targetName = Option(name) + def targetName(name: Option[String]): this.type = { + this.targetName = name this } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 8ce130c35..59395137d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -110,10 +110,12 @@ public void createIndex(String indexName, FlintMetadata metadata) { String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); - request.mapping(metadata.getContent(), XContentType.JSON); - metadata.targetName().exists(name -> { - return request.alias(new Alias(toLowercase(metadata.name()))); - }); + boolean includeMappingProperties = true; + if(metadata.targetName().nonEmpty()) { + request.alias(new Alias(toLowercase(metadata.name()))); + includeMappingProperties = false; + } + request.mapping(metadata.getContent(includeMappingProperties), XContentType.JSON); Option settings = metadata.indexSettings(); if (settings.isDefined()) { request.settings(settings.get(), XContentType.JSON); diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala index dc2f5fe6a..991eb51cb 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -62,6 +62,6 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { builder.schema("""{"properties": {"test_field": {"type": "os_type"}}}""") val metadata = builder.build() - metadata.getContent should matchJson(testMetadataJson) + metadata.getContent() should matchJson(testMetadataJson) } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala index c078f7fb6..318cb9dab 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala @@ -46,7 +46,7 @@ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Optio FlintClientBuilder .build(flintSparkConf.flintOptions()) .getIndexMetadata(name) - .getContent) + .getContent()) } } schema diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index e112a354d..a1016a2a5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -86,13 +86,15 @@ class FlintSpark(val spark: SparkSession) { if (targetName.nonEmpty) { //use targetIndex as the index to store the acceleration data flintClient.alias(targetName.get, indexName, index.metadata()) - } else if (flintClient.exists(indexName)) { - if (!ignoreIfExists) { - throw new IllegalStateException(s"Flint index $indexName already exists") - } } else { - val metadata = index.metadata() - flintClient.createIndex(indexName, metadata) + if (flintClient.exists(indexName)) { + if (!ignoreIfExists) { + throw new IllegalStateException(s"Flint index $indexName already exists") + } + } else { + val metadata = index.metadata() + flintClient.createIndex(indexName, metadata) + } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 60d0fef93..7ed63713d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -59,6 +59,7 @@ case class FlintSparkCoveringIndex( metadataBuilder(this) .name(indexName) + .targetName(targetIndexName) .source(tableName) .indexedColumns(indexColumnMaps) .schema(schemaJson) @@ -104,7 +105,7 @@ object FlintSparkCoveringIndex { /** Builder class for covering index build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { - private var targetIndexName: String = "" + private var targetIndexName: Option[String] = None private var indexName: String = "" private var indexedColumns: Map[String, String] = Map() @@ -130,7 +131,7 @@ object FlintSparkCoveringIndex { * index builder */ def targetName(indexName: String): Builder = { - this.targetIndexName = indexName + this.targetIndexName = Option.apply(indexName) this } @@ -163,6 +164,6 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(Option.apply(targetIndexName), indexName, tableName, indexedColumns, indexOptions) + new FlintSparkCoveringIndex(targetIndexName, indexName, tableName, indexedColumns, indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 567bc13e4..a449f0bc7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -69,6 +69,7 @@ case class FlintSparkMaterializedView( metadataBuilder(this) .name(mvName) + .targetName(targetIndexName) .source(query) .indexedColumns(indexColumnMaps) .schema(schemaJson) @@ -159,7 +160,7 @@ object FlintSparkMaterializedView { /** Builder class for MV build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { - private var targetIndexName: String = "" + private var targetIndexName: Option[String] = None private var mvName: String = "" private var query: String = "" @@ -172,7 +173,7 @@ object FlintSparkMaterializedView { * index builder */ def targetName(indexName: String): Builder = { - this.targetIndexName = indexName + this.targetIndexName = Option.apply(indexName) this } @@ -211,7 +212,7 @@ object FlintSparkMaterializedView { field.name -> field.dataType.typeName } .toMap - FlintSparkMaterializedView(Option.apply(targetIndexName), mvName, query, outputSchema, indexOptions) + FlintSparkMaterializedView(targetIndexName, mvName, query, outputSchema, indexOptions) } } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 8c144b46b..cc17d5190 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -13,12 +13,12 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("get covering index name") { val index = - new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) + new FlintSparkCoveringIndex(None, "ci", "spark_catalog.default.test", Map("name" -> "string")) index.name() shouldBe "flint_spark_catalog_default_test_ci_index" } test("should fail if get index name without full table name") { - val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) + val index = new FlintSparkCoveringIndex(None, "ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { index.name() } @@ -26,7 +26,7 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { - new FlintSparkCoveringIndex("ci", "default.test", Map.empty) + new FlintSparkCoveringIndex(None, "ci", "default.test", Map.empty) } } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index c28495c69..08b33343c 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -36,20 +36,20 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testQuery = "SELECT 1" test("get name") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) mv.name() shouldBe "flint_spark_catalog_default_mv" } test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy - FlintSparkMaterializedView("mv", testQuery, Map.empty).name() + FlintSparkMaterializedView(None, "mv", testQuery, Map.empty).name() the[IllegalArgumentException] thrownBy - FlintSparkMaterializedView("default.mv", testQuery, Map.empty).name() + FlintSparkMaterializedView(None, "default.mv", testQuery, Map.empty).name() } test("get metadata") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map("test_col" -> "integer")) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map("test_col" -> "integer")) val metadata = mv.metadata() metadata.name shouldBe mv.mvName @@ -64,7 +64,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val indexSettings = """{"number_of_shards": 2}""" val indexOptions = FlintSparkIndexOptions(Map("auto_refresh" -> "true", "index_settings" -> indexSettings)) - val mv = FlintSparkMaterializedView( + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map("test_col" -> "integer"), @@ -77,12 +77,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } test("build batch data frame") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) mv.build(spark, None).collect() shouldBe Array(Row(1)) } test("should fail if build given other source data frame") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) the[IllegalArgumentException] thrownBy mv.build(spark, Some(mock[DataFrame])) } @@ -100,7 +100,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( @@ -127,7 +127,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty) val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( @@ -145,7 +145,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - val mv = FlintSparkMaterializedView( + val mv = FlintSparkMaterializedView(None, testMvName, s"SELECT name, age FROM $testTable WHERE age > 30", Map.empty) @@ -164,7 +164,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - val mv = FlintSparkMaterializedView( + val mv = FlintSparkMaterializedView(None, testMvName, s"SELECT name, COUNT(*) AS count FROM $testTable GROUP BY name", Map.empty) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index d52c43842..12dc81c34 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -345,7 +345,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { } private def schemaShouldMatch(metadata: FlintMetadata, expected: String): Unit = { - val actual = parse(metadata.getContent) \ "properties" + val actual = parse(metadata.getContent()) \ "properties" assert(actual == parse(expected)) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 5c799128c..779e025f6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -46,7 +46,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M |""".stripMargin val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn(content) + when(metadata.getContent()).thenReturn(content) when(metadata.indexSettings).thenReturn(None) flintClient.createIndex(indexName, metadata) @@ -58,7 +58,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val indexName = "flint_test_with_settings" val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") + when(metadata.getContent()).thenReturn("{}") when(metadata.indexSettings).thenReturn(Some(indexSettings)) flintClient.createIndex(indexName, metadata) @@ -73,14 +73,14 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M it should "get all index metadata with the given index name pattern" in { val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") + when(metadata.getContent()).thenReturn("{}") when(metadata.indexSettings).thenReturn(None) flintClient.createIndex("flint_test_1_index", metadata) flintClient.createIndex("flint_test_2_index", metadata) val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 - allMetadata.forEach(metadata => metadata.getContent should not be empty) + allMetadata.forEach(metadata => metadata.getContent() should not be empty) allMetadata.forEach(metadata => metadata.indexSettings should not be empty) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index a4b0069dd..cbb71a98f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -44,7 +44,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "version": "${current()}", | "name": "name_and_age", diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 81b3819d9..e7fb7a1f6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -22,6 +22,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { private val testTable = "spark_catalog.default.mv_test" private val testMvName = "spark_catalog.default.mv_test_metrics" private val testFlintIndex = getFlintIndexName(testMvName) + private val testTargetIndex = "existing_index" private val testQuery = s""" | SELECT @@ -53,7 +54,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s""" + index.get.metadata().getContent() should matchJson(s""" | { | "_meta": { | "version": "${current()}", @@ -87,12 +88,12 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { |""".stripMargin) } - test("create materialized view using existing OpebnSearch index successfully") { + test("create materialized view using existing OpenSearch index successfully") { val indexOptions = FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) flint .materializedView() - .targetName("existing_index") + .targetName(testTargetIndex) .name(testMvName) .query(testQuery) .options(indexOptions) @@ -100,12 +101,13 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { val index = flint.describeIndex("existing_index") index shouldBe defined - index.get.metadata().getContent should matchJson(s""" + index.get.metadata().getContent() should matchJson(s""" | { | "_meta": { | "version": "${current()}", | "name": "spark_catalog.default.mv_test_metrics", | "kind": "mv", + | "targetName": "$testTargetIndex", | "source": "$testQuery", | "indexedColumns": [ | { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e3fb467e6..efcec8ee6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -52,7 +52,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_test_skipping_index", | "version": "${current()}", @@ -123,7 +123,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined val optionJson = compact(render( - parse(index.get.metadata().getContent) \ "_meta" \ "options")) + parse(index.get.metadata().getContent()) \ "_meta" \ "options")) optionJson should matchJson(""" | { | "auto_refresh": "true", @@ -449,7 +449,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson( + index.get.metadata().getContent() should matchJson( s"""{ | "_meta": { | "name": "flint_spark_catalog_default_data_type_table_skipping_index", From bd9a33bc5153943443b3a6c5bdc8cd9d269712ed Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 17:36:50 -0700 Subject: [PATCH 04/13] update sql USING query test Signed-off-by: YANGDB --- .../core/storage/FlintOpenSearchClient.java | 65 ++++++++----- .../FlintSparkCoveringIndexSqlITSuite.scala | 28 +++++- .../FlintSparkMaterializedViewITSuite.scala | 97 ++++++++++--------- 3 files changed, 116 insertions(+), 74 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 59395137d..be8a4728b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -18,7 +18,9 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -39,6 +41,7 @@ import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; @@ -85,19 +88,18 @@ public void alias(String indexName, String aliasName, FlintMetadata metadata) { if (!exists) { // create index including the alias name with is the flint convention name createIndex(osIndexName, metadata); - } else { - // Adding the alias to the existing index - IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest(); - IndicesAliasesRequest.AliasActions aliasAction = - new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) - .index(osIndexName) - .alias(osAliasName); - aliasesRequest.addAliasAction(aliasAction); - // Executing the updateAliases request - AcknowledgedResponse response = client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT); - if (!response.isAcknowledged()) { - throw new IllegalStateException(String.format("Failed to acknowledge Alias %s for index %s", aliasName, indexName)); - } + } + // Adding the alias to the existing index + IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest(); + IndicesAliasesRequest.AliasActions aliasAction = + new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) + .index(osIndexName) + .alias(osAliasName); + aliasesRequest.addAliasAction(aliasAction); + // Executing the updateAliases request + AcknowledgedResponse response = client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge Alias %s for index %s", aliasName, indexName)); } } catch ( Exception e) { @@ -110,17 +112,16 @@ public void createIndex(String indexName, FlintMetadata metadata) { String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); - boolean includeMappingProperties = true; - if(metadata.targetName().nonEmpty()) { - request.alias(new Alias(toLowercase(metadata.name()))); - includeMappingProperties = false; - } + boolean includeMappingProperties = !metadata.targetName().nonEmpty(); request.mapping(metadata.getContent(includeMappingProperties), XContentType.JSON); Option settings = metadata.indexSettings(); if (settings.isDefined()) { request.settings(settings.get(), XContentType.JSON); } - client.indices().create(request, RequestOptions.DEFAULT); + CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge create index %s", indexName)); + } } catch (Exception e) { throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); } @@ -159,10 +160,24 @@ public FlintMetadata getIndexMetadata(String indexName) { try (RestHighLevelClient client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexName); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); + if (response.getMappings().containsKey(osIndexName)) { + MappingMetadata mapping = response.getMappings().get(osIndexName); + Settings settings = response.getSettings().get(osIndexName); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); + } + if (!response.getAliases().isEmpty()) { + Optional aliasAncestor = response.getAliases().entrySet().stream() + .filter(entry -> entry.getValue().stream().anyMatch(alias -> alias.alias().equals(indexName))) + .map(Map.Entry::getKey) + .findFirst(); - MappingMetadata mapping = response.getMappings().get(osIndexName); - Settings settings = response.getSettings().get(osIndexName); - return FlintMetadata.apply(mapping.source().string(), settings.toString()); + if(aliasAncestor.isPresent()) { + MappingMetadata mapping = response.getMappings().get(aliasAncestor.get()); + Settings settings = response.getSettings().get(aliasAncestor.get()); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); + } + } + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } @@ -173,8 +188,10 @@ public void deleteIndex(String indexName) { String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); - - client.indices().delete(request, RequestOptions.DEFAULT); + AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge delete index %s", indexName)); + } } catch (Exception e) { throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 627e11f52..90b561398 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -25,6 +25,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { /** Test table and index name */ private val testTable = "spark_catalog.default.covering_sql_test" private val testIndex = "name_and_age" + private val targetIndex = "target_index" private val testFlintIndex = getFlintIndexName(testIndex, testTable) override def beforeAll(): Unit = { @@ -32,7 +33,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { createPartitionedTable(testTable) } - + override def afterEach(): Unit = { super.afterEach() @@ -93,7 +94,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } - + test("create covering index with invalid option") { the[IllegalArgumentException] thrownBy sql(s""" @@ -232,4 +233,27 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe empty } + + test("use existing index as the covering index") { + sql( + s""" + | CREATE INDEX $testIndex ON $testTable USING $targetIndex ( name ) + | WITH ( + | index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}' + | ) + |""".stripMargin) + + // Check if the index setting option is set to OS index setting + val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + var settings = parse(flintClient.getIndexMetadata(targetIndex).indexSettings.get) + (settings \ "index.number_of_shards").extract[String] shouldBe "2" + (settings \ "index.number_of_replicas").extract[String] shouldBe "3" + //validate the index alias is working + settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + (settings \ "index.number_of_shards").extract[String] shouldBe "2" + (settings \ "index.number_of_replicas").extract[String] shouldBe "3" + } + } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index e7fb7a1f6..27672bb7e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -88,54 +88,6 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { |""".stripMargin) } - test("create materialized view using existing OpenSearch index successfully") { - val indexOptions = - FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) - flint - .materializedView() - .targetName(testTargetIndex) - .name(testMvName) - .query(testQuery) - .options(indexOptions) - .create() - - val index = flint.describeIndex("existing_index") - index shouldBe defined - index.get.metadata().getContent() should matchJson(s""" - | { - | "_meta": { - | "version": "${current()}", - | "name": "spark_catalog.default.mv_test_metrics", - | "kind": "mv", - | "targetName": "$testTargetIndex", - | "source": "$testQuery", - | "indexedColumns": [ - | { - | "columnName": "startTime", - | "columnType": "timestamp" - | },{ - | "columnName": "count", - | "columnType": "long" - | }], - | "options": { - | "auto_refresh": "true", - | "checkpoint_location": "s3://test/" - | }, - | "properties": {} - | }, - | "properties": { - | "startTime": { - | "type": "date", - | "format": "strict_date_optional_time_nanos" - | }, - | "count": { - | "type": "long" - | } - | } - | } - |""".stripMargin) - } - // TODO: fix this windowing function unable to be used in GROUP BY ignore("full refresh materialized view") { flint @@ -233,6 +185,55 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } + test("use existing existing OpenSearch index for materialized view successfully") { + val indexOptions = + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) + flint + .materializedView() + .targetName(testTargetIndex) + .name(testMvName) + .query(testQuery) + .options(indexOptions) + .create() + + val index = flint.describeIndex("existing_index") + index shouldBe defined + index.get.metadata().getContent() should matchJson( + s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "spark_catalog.default.mv_test_metrics", + | "kind": "mv", + | "targetName": "$testTargetIndex", + | "source": "$testQuery", + | "indexedColumns": [ + | { + | "columnName": "startTime", + | "columnType": "timestamp" + | },{ + | "columnName": "count", + | "columnType": "long" + | }], + | "options": { + | "auto_refresh": "true", + | "checkpoint_location": "s3://test/" + | }, + | "properties": {} + | }, + | "properties": { + | "startTime": { + | "type": "date", + | "format": "strict_date_optional_time_nanos" + | }, + | "count": { + | "type": "long" + | } + | } + | } + |""".stripMargin) + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) private def withIncrementalMaterializedView(query: String)( From bc320bd0130407b67d8e3971963cf764861bd91c Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 17:47:00 -0700 Subject: [PATCH 05/13] update using scalafmtAll Signed-off-by: YANGDB --- .../FlintSparkCoveringIndexSqlITSuite.scala | 9 +++--- .../FlintSparkMaterializedViewITSuite.scala | 5 ++-- .../FlintSparkSkippingIndexITSuite.scala | 29 +++++++------------ 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 90b561398..d6cfc80c1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -33,7 +33,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { createPartitionedTable(testTable) } - + override def afterEach(): Unit = { super.afterEach() @@ -94,7 +94,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } - + test("create covering index with invalid option") { the[IllegalArgumentException] thrownBy sql(s""" @@ -235,8 +235,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { } test("use existing index as the covering index") { - sql( - s""" + sql(s""" | CREATE INDEX $testIndex ON $testTable USING $targetIndex ( name ) | WITH ( | index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}' @@ -250,7 +249,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { var settings = parse(flintClient.getIndexMetadata(targetIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" - //validate the index alias is working + // validate the index alias is working settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 27672bb7e..e659e223b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -87,7 +87,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | } |""".stripMargin) } - + // TODO: fix this windowing function unable to be used in GROUP BY ignore("full refresh materialized view") { flint @@ -198,8 +198,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { val index = flint.describeIndex("existing_index") index shouldBe defined - index.get.metadata().getContent() should matchJson( - s""" + index.get.metadata().getContent() should matchJson(s""" | { | "_meta": { | "version": "${current()}", diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index efcec8ee6..40de6e223 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -122,8 +122,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - val optionJson = compact(render( - parse(index.get.metadata().getContent()) \ "_meta" \ "options")) + val optionJson = + compact(render(parse(index.get.metadata().getContent()) \ "_meta" \ "options")) optionJson should matchJson(""" | { | "auto_refresh": "true", @@ -321,8 +321,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { |""".stripMargin) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex( - hasIndexFilter(col("year") === 2023)) + useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023)) } test("should not rewrite original query if filtering condition has disjunction") { @@ -388,8 +387,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { // Prepare test table val testTable = "spark_catalog.default.data_type_table" val testIndex = getSkippingIndexName(testTable) - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | boolean_col BOOLEAN, @@ -408,8 +406,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | ) | USING PARQUET |""".stripMargin) - sql( - s""" + sql(s""" | INSERT INTO $testTable | VALUES ( | TRUE, @@ -449,8 +446,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent() should matchJson( - s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_data_type_table_skipping_index", | "version": "${current()}", @@ -587,8 +583,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { test("can build skipping index for varchar and char and rewrite applicable query") { val testTable = "spark_catalog.default.varchar_char_table" val testIndex = getSkippingIndexName(testTable) - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | varchar_col VARCHAR(20), @@ -596,8 +591,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | ) | USING PARQUET |""".stripMargin) - sql( - s""" + sql(s""" | INSERT INTO $testTable | VALUES ( | "sample varchar", @@ -613,8 +607,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .create() flint.refreshIndex(testIndex, FULL) - val query = sql( - s""" + val query = sql(s""" | SELECT varchar_col, char_col | FROM $testTable | WHERE varchar_col = "sample varchar" AND char_col = "sample char" @@ -624,8 +617,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val paddedChar = "sample char".padTo(20, ' ') checkAnswer(query, Row("sample varchar", paddedChar)) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex(hasIndexFilter( - col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) flint.deleteIndex(testIndex) } From 904a86157284a7ad7ae212868b3713b228e034f9 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 17:54:44 -0700 Subject: [PATCH 06/13] update using scalafmtAll Signed-off-by: YANGDB --- .../flint/spark/FlintSparkCoveringIndexSqlITSuite.scala | 2 -- .../flint/spark/FlintSparkMaterializedViewITSuite.scala | 1 - 2 files changed, 3 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index d6cfc80c1..288b78308 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -33,7 +33,6 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { createPartitionedTable(testTable) } - override def afterEach(): Unit = { super.afterEach() @@ -94,7 +93,6 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } - test("create covering index with invalid option") { the[IllegalArgumentException] thrownBy sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index e659e223b..902dcb00c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -87,7 +87,6 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | } |""".stripMargin) } - // TODO: fix this windowing function unable to be used in GROUP BY ignore("full refresh materialized view") { flint From 6dac21b881af258a4ae5a9c8e6e6301fa4406af7 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 18:08:17 -0700 Subject: [PATCH 07/13] update test with removal of test indices Signed-off-by: YANGDB --- .../flint/spark/FlintSparkCoveringIndexSqlITSuite.scala | 3 +++ .../flint/spark/FlintSparkMaterializedViewITSuite.scala | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 288b78308..7e8481464 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -251,6 +251,9 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" + + //remove test index + flint.deleteIndex(targetIndex) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 902dcb00c..24fbb62ec 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -195,7 +195,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { .options(indexOptions) .create() - val index = flint.describeIndex("existing_index") + val index = flint.describeIndex(testTargetIndex) index shouldBe defined index.get.metadata().getContent() should matchJson(s""" | { @@ -230,6 +230,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | } | } |""".stripMargin) + //remove test index + flint.deleteIndex(testTargetIndex) } private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) From b3e2eb50ca337d67bd57f23bb4e7e3aae32a5594 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 19:57:56 -0700 Subject: [PATCH 08/13] fix client tests mocks & style Signed-off-by: YANGDB --- .../opensearch/flint/core/FlintOpenSearchClientSuite.scala | 6 ++++-- .../flint/spark/FlintSparkCoveringIndexSqlITSuite.scala | 2 +- .../flint/spark/FlintSparkMaterializedViewITSuite.scala | 2 +- .../flint/spark/FlintSparkSkippingIndexITSuite.scala | 3 ++- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 779e025f6..ab683980c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -10,6 +10,7 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.mockito.ArgumentMatchers.anyBoolean import org.mockito.Mockito.when import org.opensearch.client.json.jackson.JacksonJsonpMapper import org.opensearch.client.opensearch.OpenSearchClient @@ -46,8 +47,9 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M |""".stripMargin val metadata = mock[FlintMetadata] - when(metadata.getContent()).thenReturn(content) + when(metadata.getContent(anyBoolean())).thenReturn(content) when(metadata.indexSettings).thenReturn(None) + when(metadata.targetName).thenReturn(None) flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true @@ -58,7 +60,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val indexName = "flint_test_with_settings" val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" val metadata = mock[FlintMetadata] - when(metadata.getContent()).thenReturn("{}") + when(metadata.getContent(anyBoolean())).thenReturn("{}") when(metadata.indexSettings).thenReturn(Some(indexSettings)) flintClient.createIndex(indexName, metadata) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 7e8481464..d3774eb37 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -252,7 +252,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" - //remove test index + // remove test index flint.deleteIndex(targetIndex) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 24fbb62ec..1377c2384 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -230,7 +230,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | } | } |""".stripMargin) - //remove test index + // remove test index flint.deleteIndex(testTargetIndex) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 40de6e223..bb6282fb4 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -7,6 +7,7 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ +import org.mockito.ArgumentMatchers.anyBoolean import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN @@ -446,7 +447,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent() should matchJson(s"""{ + index.get.metadata().getContent(anyBoolean()) should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_data_type_table_skipping_index", | "version": "${current()}", From afd59b741655b8945e2bd54a49ed06d5afd756c8 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 20:17:05 -0700 Subject: [PATCH 09/13] fix client tests mocks & style Signed-off-by: YANGDB --- .../opensearch/flint/core/FlintOpenSearchClientSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index ab683980c..1f1e72250 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -61,6 +61,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" val metadata = mock[FlintMetadata] when(metadata.getContent(anyBoolean())).thenReturn("{}") + when(metadata.targetName).thenReturn(None) when(metadata.indexSettings).thenReturn(Some(indexSettings)) flintClient.createIndex(indexName, metadata) @@ -75,8 +76,9 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M it should "get all index metadata with the given index name pattern" in { val metadata = mock[FlintMetadata] - when(metadata.getContent()).thenReturn("{}") + when(metadata.getContent(anyBoolean())).thenReturn("{}") when(metadata.indexSettings).thenReturn(None) + when(metadata.targetName).thenReturn(None) flintClient.createIndex("flint_test_1_index", metadata) flintClient.createIndex("flint_test_2_index", metadata) From 451fe8e77620e719498b0dff9e83264b6ae54f83 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 21:51:18 -0700 Subject: [PATCH 10/13] fix client tests mocks & style Signed-off-by: YANGDB --- .../opensearch/flint/core/storage/FlintOpenSearchClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index be8a4728b..8830a0178 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -112,7 +112,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); - boolean includeMappingProperties = !metadata.targetName().nonEmpty(); + boolean includeMappingProperties = (metadata.targetName()!=null && !metadata.targetName().nonEmpty()); request.mapping(metadata.getContent(includeMappingProperties), XContentType.JSON); Option settings = metadata.indexSettings(); if (settings.isDefined()) { From baecbe81fa384c166c443de73d5c2336e7846c3a Mon Sep 17 00:00:00 2001 From: YANGDB Date: Thu, 19 Oct 2023 09:41:39 -0700 Subject: [PATCH 11/13] fix redundant client tests mocks Signed-off-by: YANGDB --- .../opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index bb6282fb4..b29ab0e2c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -447,7 +447,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent(anyBoolean()) should matchJson(s"""{ + index.get.metadata().getContent() should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_data_type_table_skipping_index", | "version": "${current()}", From 34b50b3953ba39ea9e2e5e6f12117c18da3495ce Mon Sep 17 00:00:00 2001 From: YANGDB Date: Thu, 19 Oct 2023 10:05:31 -0700 Subject: [PATCH 12/13] update according to scalastyle Signed-off-by: YANGDB --- .../scala/org/opensearch/flint/spark/FlintSpark.scala | 2 +- .../org/opensearch/flint/spark/FlintSparkIndex.scala | 2 +- .../flint/spark/covering/FlintSparkCoveringIndex.scala | 10 ++++++++-- .../flint/spark/mv/FlintSparkMaterializedView.scala | 7 ++++--- .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 2 +- .../spark/mv/FlintSparkMaterializedViewSuite.scala | 9 ++++++--- 6 files changed, 21 insertions(+), 11 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index a1016a2a5..fad928bf7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -84,7 +84,7 @@ class FlintSpark(val spark: SparkSession) { val indexName = index.name() val targetName = index.targetName() if (targetName.nonEmpty) { - //use targetIndex as the index to store the acceleration data + // use targetIndex as the index to store the acceleration data flintClient.alias(targetName.get, indexName, index.metadata()) } else { if (flintClient.exists(indexName)) { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index d7cdfb996..32de8f42a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -36,7 +36,7 @@ trait FlintSparkIndex { /** * @return - * Flint target index name + * Flint target index name */ def targetName(): Option[String] diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 7ed63713d..b368de0ec 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -43,7 +43,8 @@ case class FlintSparkCoveringIndex( /** * @return - * Flint target index name - index that already exist or has existing template to be created with + * Flint target index name - index that already exist or has existing template to be created + * with */ override def targetName(): Option[String] = { targetIndexName @@ -164,6 +165,11 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(targetIndexName, indexName, tableName, indexedColumns, indexOptions) + new FlintSparkCoveringIndex( + targetIndexName, + indexName, + tableName, + indexedColumns, + indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index a449f0bc7..a08c627bd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -54,7 +54,8 @@ case class FlintSparkMaterializedView( /** * @return - * Flint target index name - index that already exist or has existing template to be created with + * Flint target index name - index that already exist or has existing template to be created + * with */ override def targetName(): Option[String] = { targetIndexName @@ -168,9 +169,9 @@ object FlintSparkMaterializedView { * Set covering index target name. * * @param indexName - * index name + * index name * @return - * index builder + * index builder */ def targetName(indexName: String): Builder = { this.targetIndexName = Option.apply(indexName) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index cf6f61a77..4144bd013 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -41,7 +41,7 @@ case class FlintSparkSkippingIndex( /** * @return - * Flint target index name ( in skipping index case not allowing using existing indices) + * Flint target index name ( in skipping index case not allowing using existing indices) */ def targetName(): Option[String] = { None diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index 08b33343c..13d181e5e 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -64,7 +64,8 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val indexSettings = """{"number_of_shards": 2}""" val indexOptions = FlintSparkIndexOptions(Map("auto_refresh" -> "true", "index_settings" -> indexSettings)) - val mv = FlintSparkMaterializedView(None, + val mv = FlintSparkMaterializedView( + None, testMvName, testQuery, Map("test_col" -> "integer"), @@ -145,7 +146,8 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - val mv = FlintSparkMaterializedView(None, + val mv = FlintSparkMaterializedView( + None, testMvName, s"SELECT name, age FROM $testTable WHERE age > 30", Map.empty) @@ -164,7 +166,8 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - val mv = FlintSparkMaterializedView(None, + val mv = FlintSparkMaterializedView( + None, testMvName, s"SELECT name, COUNT(*) AS count FROM $testTable GROUP BY name", Map.empty) From 1593c9e23ad6c89c610d01bd8b0bd0d6ab9219e3 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Thu, 19 Oct 2023 10:15:59 -0700 Subject: [PATCH 13/13] update according to scalastyle Signed-off-by: YANGDB --- .../flint/spark/covering/FlintSparkCoveringIndexSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index cc17d5190..71e542183 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -13,7 +13,11 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { test("get covering index name") { val index = - new FlintSparkCoveringIndex(None, "ci", "spark_catalog.default.test", Map("name" -> "string")) + new FlintSparkCoveringIndex( + None, + "ci", + "spark_catalog.default.test", + Map("name" -> "string")) index.name() shouldBe "flint_spark_catalog_default_test_ci_index" }