diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 8058f9bff..3aceeadb5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters._ import org.json4s.{Formats, JArray, NoTypeHints} +import org.json4s.JsonAST.{JField, JObject} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} @@ -211,6 +212,14 @@ class FlintSpark(val spark: SparkSession) { val tableName = (meta \ "source").extract[String] val indexType = (meta \ "kind").extract[String] val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray] + val indexOptions = FlintSparkIndexOptions( + (meta \ "options") + .asInstanceOf[JObject] + .obj + .map { case JField(key, value) => + key -> value.values.toString + } + .toMap) indexType match { case SKIPPING_INDEX_TYPE => @@ -230,14 +239,15 @@ class FlintSpark(val spark: SparkSession) { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - new FlintSparkSkippingIndex(tableName, strategies) + new FlintSparkSkippingIndex(tableName, strategies, indexOptions) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( indexName, tableName, indexedColumns.arr.map { obj => ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) - }.toMap) + }.toMap, + indexOptions) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 62e6b4668..6c09d625c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -19,6 +19,11 @@ trait FlintSparkIndex { */ val kind: String + /** + * Index options + */ + val options: FlintSparkIndexOptions + /** * @return * Flint index name diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 740c02e1d..7171bf1af 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty + import org.apache.spark.sql.catalog.Column /** @@ -18,6 +20,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { /** Source table name */ protected var tableName: String = "" + /** Index options */ + protected var indexOptions: FlintSparkIndexOptions = empty + /** All columns of the given source table */ lazy protected val allColumns: Map[String, Column] = { require(tableName.nonEmpty, "Source table name is not provided") @@ -29,6 +34,11 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { .toMap } + def options(options: FlintSparkIndexOptions): this.type = { + this.indexOptions = options + this + } + /** * Create Flint index. */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala new file mode 100644 index 000000000..f348c12a0 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +/** + * Flint Spark index configurable options. + * + * @param options + * index option mappings + */ +case class FlintSparkIndexOptions(options: Map[String, String]) { + + /** + * Is Flint index auto refreshed or manual refreshed. + * + * @return + * auto refresh option value + */ + def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean + + /** + * The refresh interval (only valid if auto refresh enabled). + * + * @return + * refresh interval expression + */ + def refreshInterval(): Option[String] = options.get("refresh_interval") + + /** + * The checkpoint location which maybe required by Flint index's refresh. + * + * @return + * checkpoint location path + */ + def checkpointLocation(): Option[String] = options.get("checkpoint_location") +} + +object FlintSparkIndexOptions { + + /** + * Empty options + */ + val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty) +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index f2f5933d6..96b7f39bf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -10,8 +10,9 @@ import org.json4s.JsonAST.{JArray, JObject, JString} import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql.DataFrame @@ -31,7 +32,8 @@ import org.apache.spark.sql.types.StructType class FlintSparkCoveringIndex( indexName: String, tableName: String, - indexedColumns: Map[String, String]) + indexedColumns: Map[String, String], + override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -154,6 +156,6 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(indexName, tableName, indexedColumns) + new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 325f40254..c871a1900 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -10,8 +10,9 @@ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy @@ -32,9 +33,10 @@ import org.apache.spark.sql.types.StructType * @param indexedColumns * indexed column list */ -class FlintSparkSkippingIndex( +case class FlintSparkSkippingIndex( tableName: String, - val indexedColumns: Seq[FlintSparkSkippingStrategy]) + indexedColumns: Seq[FlintSparkSkippingStrategy], + override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -192,7 +194,7 @@ object FlintSparkSkippingIndex { } override def buildIndex(): FlintSparkIndex = - new FlintSparkSkippingIndex(tableName, indexedColumns) + new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions) private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = { require( diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 5c03c1a26..3a67625de 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -7,48 +7,25 @@ package org.opensearch.flint.spark.sql import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.PropertyListContext import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder -import org.apache.spark.sql.catalyst.plans.logical.Command - /** - * Flint Spark AST builder that builds Spark command for Flint index statement. - * This class mix-in all other AST builders and provides util methods. + * Flint Spark AST builder that builds Spark command for Flint index statement. This class mix-in + * all other AST builders and provides util methods. */ class FlintSparkSqlAstBuilder - extends FlintSparkSqlExtensionsBaseVisitor[Command] + extends FlintSparkSqlExtensionsBaseVisitor[AnyRef] with FlintSparkSkippingIndexAstBuilder - with FlintSparkCoveringIndexAstBuilder { + with FlintSparkCoveringIndexAstBuilder + with SparkSqlAstBuilder { - override def aggregateResult(aggregate: Command, nextResult: Command): Command = + override def aggregateResult(aggregate: AnyRef, nextResult: AnyRef): AnyRef = if (nextResult != null) nextResult else aggregate } object FlintSparkSqlAstBuilder { - /** - * Check if auto_refresh is true in property list. - * - * @param ctx - * property list - */ - def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = { - if (ctx == null) { - false - } else { - ctx - .property() - .forEach(p => { - if (p.key.getText == "auto_refresh") { - return p.value.getText.toBoolean - } - }) - false - } - } - /** * Get full table name if database not specified. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala index 0fa146b9d..4bc828644 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala @@ -54,7 +54,7 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { flintParser => try { - flintAstBuilder.visit(flintParser.singleStatement()) + flintAstBuilder.visit(flintParser.singleStatement()).asInstanceOf[LogicalPlan] } catch { // Fall back to Spark parse plan logic if flint cannot parse case _: ParseException => sparkParser.parsePlan(sqlText) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala new file mode 100644 index 000000000..8a5855d47 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql + +import java.util.Locale + +import scala.collection.JavaConverters.asScalaBufferConverter + +import org.opensearch.flint.spark.FlintSparkIndexOptions +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{PropertyKeyContext, PropertyListContext, PropertyValueContext} + +import org.apache.spark.sql.catalyst.parser.ParserUtils.string + +/** + * AST builder that builds for common rule in Spark SQL grammar. + */ +trait SparkSqlAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + + override def visitPropertyList(ctx: PropertyListContext): FlintSparkIndexOptions = { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + FlintSparkIndexOptions(properties.toMap) + } + + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.STRING() != null) { + string(key.STRING()) + } else { + key.getText + } + } + + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index c412b6eb6..8dd8f1bd6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -9,8 +9,8 @@ import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} -import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext} import org.apache.spark.sql.catalyst.plans.logical.Command @@ -18,7 +18,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Command /** * Flint Spark AST builder that builds Spark command for Flint covering index statement. */ -trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] { +trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => override def visitCreateCoveringIndexStatement( ctx: CreateCoveringIndexStatementContext): Command = { @@ -35,10 +36,14 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C val colName = indexColCtx.multipartIdentifier().getText indexBuilder.addIndexColumns(colName) } - indexBuilder.create() + + val indexOptions = visitPropertyList(ctx.propertyList()) + indexBuilder + .options(indexOptions) + .create() // Trigger auto refresh if enabled - if (isAutoRefreshEnabled(ctx.propertyList())) { + if (indexOptions.autoRefresh()) { val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index c58972f30..c6cd0ae48 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -11,8 +11,8 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} -import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row @@ -23,7 +23,8 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint skipping index statement. */ -trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] { +trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => override def visitCreateSkippingIndexStatement( ctx: CreateSkippingIndexStatementContext): Command = @@ -42,10 +43,14 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C case MIN_MAX => indexBuilder.addMinMax(colName) } } - indexBuilder.create() + + val indexOptions = visitPropertyList(ctx.propertyList()) + indexBuilder + .options(indexOptions) + .create() // Trigger auto refresh if enabled - if (isAutoRefreshEnabled(ctx.propertyList())) { + if (indexOptions.autoRefresh()) { val indexName = getSkippingIndexName(flint, ctx.tableName) flint.refreshIndex(indexName, RefreshMode.INCREMENTAL) }