From 79936aef36a2cc46de27cb0f999327964fd10375 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 18 Oct 2023 12:49:06 -0700 Subject: [PATCH] add USING statement for covering / MV indices for using existing index / index template in opensearch Signed-off-by: YANGDB --- .../flint/core/metadata/FlintMetadata.scala | 12 ++++++++- .../main/antlr4/FlintSparkSqlExtensions.g4 | 2 +- .../src/main/antlr4/SparkSqlBase.g4 | 1 + .../flint/spark/FlintSparkIndex.scala | 6 +++++ .../flint/spark/FlintSparkIndexFactory.scala | 2 ++ .../covering/FlintSparkCoveringIndex.scala | 27 ++++++++++++++++++- .../spark/mv/FlintSparkMaterializedView.scala | 25 ++++++++++++++++- .../skipping/FlintSparkSkippingIndex.scala | 8 ++++++ .../FlintSparkCoveringIndexAstBuilder.scala | 8 ++++-- 9 files changed, 85 insertions(+), 6 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index ea0fb0f98..98b9dbfec 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -18,6 +18,8 @@ import org.opensearch.flint.core.metadata.FlintJsonHelper._ case class FlintMetadata( /** Flint spec version */ version: FlintVersion, + /** Flint index target name */ + targetName: Option[String], /** Flint index name */ name: String, /** Flint index kind */ @@ -53,11 +55,11 @@ case class FlintMetadata( objectField(builder, "_meta") { builder .field("version", version.version) + .field("targetName", targetName.getOrElse(name) ) .field("name", name) .field("kind", kind) .field("source", source) .field("indexedColumns", indexedColumns) - optionalObjectField(builder, "options", options) optionalObjectField(builder, "properties", properties) } @@ -109,6 +111,7 @@ object FlintMetadata { innerFieldName match { case "version" => builder.version(FlintVersion.apply(parser.text())) case "name" => builder.name(parser.text()) + case "targetName" => builder.targetName(parser.text()) case "kind" => builder.kind(parser.text()) case "source" => builder.source(parser.text()) case "indexedColumns" => @@ -142,6 +145,7 @@ object FlintMetadata { class Builder { private var version: FlintVersion = FlintVersion.current() private var name: String = "" + private var targetName: Option[String] = None private var kind: String = "" private var source: String = "" private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() @@ -160,6 +164,11 @@ object FlintMetadata { this } + def targetName(name: String): this.type = { + this.targetName = Option(name) + this + } + def kind(kind: String): this.type = { this.kind = kind this @@ -219,6 +228,7 @@ object FlintMetadata { def build(): FlintMetadata = { FlintMetadata( if (version == null) current() else version, + targetName, name, kind, source, diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index e8e0264f2..c1bc84765 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -55,7 +55,7 @@ coveringIndexStatement createCoveringIndexStatement : CREATE INDEX (IF NOT EXISTS)? indexName - ON tableName + ON tableName (USING indexName)? LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 4ac1ced5c..96a9d2596 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -139,6 +139,7 @@ nonReserved // Flint lexical tokens +USING: 'USING'; MIN_MAX: 'MIN_MAX'; SKIPPING: 'SKIPPING'; VALUE_SET: 'VALUE_SET'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 0586bfc49..2279ad421 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -34,6 +34,12 @@ trait FlintSparkIndex { */ def name(): String + /** + * @return + * Flint target index name + */ + def targetName(): String + /** * @return * Flint index metadata diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index cda11405c..1f169d652 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -60,6 +60,7 @@ object FlintSparkIndexFactory { FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) case COVERING_INDEX_TYPE => FlintSparkCoveringIndex( + metadata.targetName, metadata.name, metadata.source, metadata.indexedColumns.map { colInfo => @@ -68,6 +69,7 @@ object FlintSparkIndexFactory { indexOptions) case MV_INDEX_TYPE => FlintSparkMaterializedView( + metadata.targetName, metadata.name, metadata.source, metadata.indexedColumns.map { colInfo => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e9c2b5be5..f18fd3997 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -18,6 +18,8 @@ import org.apache.spark.sql._ /** * Flint covering index in Spark. * + * @param targetIndexName + * optional index target name * @param indexName * index name * @param tableName @@ -26,6 +28,7 @@ import org.apache.spark.sql._ * indexed column list */ case class FlintSparkCoveringIndex( + targetIndexName: Option[String], indexName: String, tableName: String, indexedColumns: Map[String, String], @@ -38,6 +41,14 @@ case class FlintSparkCoveringIndex( override def name(): String = getFlintIndexName(indexName, tableName) + /** + * @return + * Flint target index name - index that already exist or has existing template to be created with + */ + override def targetName(): String = { + targetIndexName.getOrElse(name()) + } + override def metadata(): FlintMetadata = { val indexColumnMaps = { indexedColumns.map { case (colName, colType) => @@ -93,6 +104,7 @@ object FlintSparkCoveringIndex { /** Builder class for covering index build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var targetIndexName: String = "" private var indexName: String = "" private var indexedColumns: Map[String, String] = Map() @@ -109,6 +121,19 @@ object FlintSparkCoveringIndex { this } + /** + * Set covering index target name. + * + * @param indexName + * index name + * @return + * index builder + */ + def targetName(indexName: String): Builder = { + this.targetIndexName = indexName + this + } + /** * Configure which source table the index is based on. * @@ -138,6 +163,6 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions) + new FlintSparkCoveringIndex(Option.apply(targetIndexName), indexName, tableName, indexedColumns, indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index ee58ec7f5..fbb985734 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName} * index options */ case class FlintSparkMaterializedView( + targetIndexName: Option[String], mvName: String, query: String, outputSchema: Map[String, String], @@ -51,6 +52,14 @@ case class FlintSparkMaterializedView( override def name(): String = getFlintIndexName(mvName) + /** + * @return + * Flint target index name - index that already exist or has existing template to be created with + */ + override def targetName(): String = { + targetIndexName.getOrElse(name()) + } + override def metadata(): FlintMetadata = { val indexColumnMaps = outputSchema.map { case (colName, colType) => @@ -150,9 +159,23 @@ object FlintSparkMaterializedView { /** Builder class for MV build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var targetIndexName: String = "" private var mvName: String = "" private var query: String = "" + /** + * Set covering index target name. + * + * @param indexName + * index name + * @return + * index builder + */ + def targetName(indexName: String): Builder = { + this.targetIndexName = indexName + this + } + /** * Set MV name. * @@ -188,7 +211,7 @@ object FlintSparkMaterializedView { field.name -> field.dataType.typeName } .toMap - FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions) + FlintSparkMaterializedView(Option.apply(targetIndexName), mvName, query, outputSchema, indexOptions) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index eb2075b63..4ead3206b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -39,6 +39,14 @@ case class FlintSparkSkippingIndex( /** Skipping index type */ override val kind: String = SKIPPING_INDEX_TYPE + /** + * @return + * Flint target index name ( in skipping index case not allowing using existing indices) + */ + def targetName(): String = { + name() + } + override def name(): String = { getSkippingIndexName(tableName) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index c0bb47830..eb3b7b2cb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -27,7 +27,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitCreateCoveringIndexStatement( ctx: CreateCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => - val indexName = ctx.indexName.getText + val indexName = ctx.indexName.get(0).getText val tableName = getFullTableName(flint, ctx.tableName) val indexBuilder = flint @@ -41,6 +41,10 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } val ignoreIfExists = ctx.EXISTS() != null + if (ctx.USING() != null) { + indexBuilder.targetName(ctx.indexName().get(1).getText) + } + val indexOptions = visitPropertyList(ctx.propertyList()) indexBuilder .options(indexOptions) @@ -48,7 +52,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A // Trigger auto refresh if enabled if (indexOptions.autoRefresh()) { - val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + val flintIndexName = getFlintIndexName(flint, ctx.indexName.get(0), ctx.tableName) flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) } Seq.empty