From 8fc8ec1ed0255de7f6c883f89584be8c04e95b2b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Sep 2023 11:29:05 -0700 Subject: [PATCH 1/8] Add Flint index options and pass it from parser to index Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 14 +++++- .../flint/spark/FlintSparkIndex.scala | 5 ++ .../flint/spark/FlintSparkIndexBuilder.scala | 10 ++++ .../flint/spark/FlintSparkIndexOptions.scala | 47 +++++++++++++++++ .../covering/FlintSparkCoveringIndex.scala | 8 +-- .../skipping/FlintSparkSkippingIndex.scala | 10 ++-- .../spark/sql/FlintSparkSqlAstBuilder.scala | 35 +++---------- .../flint/spark/sql/FlintSparkSqlParser.scala | 2 +- .../flint/spark/sql/SparkSqlAstBuilder.scala | 50 +++++++++++++++++++ .../FlintSparkCoveringIndexAstBuilder.scala | 15 ++++-- .../FlintSparkSkippingIndexAstBuilder.scala | 15 ++++-- 11 files changed, 162 insertions(+), 49 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 8058f9bff..3aceeadb5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters._ import org.json4s.{Formats, JArray, NoTypeHints} +import org.json4s.JsonAST.{JField, JObject} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} @@ -211,6 +212,14 @@ class FlintSpark(val spark: SparkSession) { val tableName = (meta \ "source").extract[String] val indexType = (meta \ "kind").extract[String] val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray] + val indexOptions = FlintSparkIndexOptions( + (meta \ "options") + .asInstanceOf[JObject] + .obj + .map { case JField(key, value) => + key -> value.values.toString + } + .toMap) indexType match { case SKIPPING_INDEX_TYPE => @@ -230,14 +239,15 @@ class FlintSpark(val spark: SparkSession) { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - new FlintSparkSkippingIndex(tableName, strategies) + new FlintSparkSkippingIndex(tableName, strategies, indexOptions) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( indexName, tableName, indexedColumns.arr.map { obj => ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) - }.toMap) + }.toMap, + indexOptions) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 62e6b4668..6c09d625c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -19,6 +19,11 @@ trait FlintSparkIndex { */ val kind: String + /** + * Index options + */ + val options: FlintSparkIndexOptions + /** * @return * Flint index name diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 740c02e1d..7171bf1af 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty + import org.apache.spark.sql.catalog.Column /** @@ -18,6 +20,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { /** Source table name */ protected var tableName: String = "" + /** Index options */ + protected var indexOptions: FlintSparkIndexOptions = empty + /** All columns of the given source table */ lazy protected val allColumns: Map[String, Column] = { require(tableName.nonEmpty, "Source table name is not provided") @@ -29,6 +34,11 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { .toMap } + def options(options: FlintSparkIndexOptions): this.type = { + this.indexOptions = options + this + } + /** * Create Flint index. */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala new file mode 100644 index 000000000..f348c12a0 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +/** + * Flint Spark index configurable options. + * + * @param options + * index option mappings + */ +case class FlintSparkIndexOptions(options: Map[String, String]) { + + /** + * Is Flint index auto refreshed or manual refreshed. + * + * @return + * auto refresh option value + */ + def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean + + /** + * The refresh interval (only valid if auto refresh enabled). + * + * @return + * refresh interval expression + */ + def refreshInterval(): Option[String] = options.get("refresh_interval") + + /** + * The checkpoint location which maybe required by Flint index's refresh. + * + * @return + * checkpoint location path + */ + def checkpointLocation(): Option[String] = options.get("checkpoint_location") +} + +object FlintSparkIndexOptions { + + /** + * Empty options + */ + val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty) +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index f2f5933d6..96b7f39bf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -10,8 +10,9 @@ import org.json4s.JsonAST.{JArray, JObject, JString} import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql.DataFrame @@ -31,7 +32,8 @@ import org.apache.spark.sql.types.StructType class FlintSparkCoveringIndex( indexName: String, tableName: String, - indexedColumns: Map[String, String]) + indexedColumns: Map[String, String], + override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -154,6 +156,6 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(indexName, tableName, indexedColumns) + new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 325f40254..c871a1900 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -10,8 +10,9 @@ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy @@ -32,9 +33,10 @@ import org.apache.spark.sql.types.StructType * @param indexedColumns * indexed column list */ -class FlintSparkSkippingIndex( +case class FlintSparkSkippingIndex( tableName: String, - val indexedColumns: Seq[FlintSparkSkippingStrategy]) + indexedColumns: Seq[FlintSparkSkippingStrategy], + override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -192,7 +194,7 @@ object FlintSparkSkippingIndex { } override def buildIndex(): FlintSparkIndex = - new FlintSparkSkippingIndex(tableName, indexedColumns) + new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions) private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = { require( diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 5c03c1a26..3a67625de 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -7,48 +7,25 @@ package org.opensearch.flint.spark.sql import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.PropertyListContext import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder -import org.apache.spark.sql.catalyst.plans.logical.Command - /** - * Flint Spark AST builder that builds Spark command for Flint index statement. - * This class mix-in all other AST builders and provides util methods. + * Flint Spark AST builder that builds Spark command for Flint index statement. This class mix-in + * all other AST builders and provides util methods. */ class FlintSparkSqlAstBuilder - extends FlintSparkSqlExtensionsBaseVisitor[Command] + extends FlintSparkSqlExtensionsBaseVisitor[AnyRef] with FlintSparkSkippingIndexAstBuilder - with FlintSparkCoveringIndexAstBuilder { + with FlintSparkCoveringIndexAstBuilder + with SparkSqlAstBuilder { - override def aggregateResult(aggregate: Command, nextResult: Command): Command = + override def aggregateResult(aggregate: AnyRef, nextResult: AnyRef): AnyRef = if (nextResult != null) nextResult else aggregate } object FlintSparkSqlAstBuilder { - /** - * Check if auto_refresh is true in property list. - * - * @param ctx - * property list - */ - def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = { - if (ctx == null) { - false - } else { - ctx - .property() - .forEach(p => { - if (p.key.getText == "auto_refresh") { - return p.value.getText.toBoolean - } - }) - false - } - } - /** * Get full table name if database not specified. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala index 0fa146b9d..4bc828644 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala @@ -54,7 +54,7 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { flintParser => try { - flintAstBuilder.visit(flintParser.singleStatement()) + flintAstBuilder.visit(flintParser.singleStatement()).asInstanceOf[LogicalPlan] } catch { // Fall back to Spark parse plan logic if flint cannot parse case _: ParseException => sparkParser.parsePlan(sqlText) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala new file mode 100644 index 000000000..8a5855d47 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql + +import java.util.Locale + +import scala.collection.JavaConverters.asScalaBufferConverter + +import org.opensearch.flint.spark.FlintSparkIndexOptions +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{PropertyKeyContext, PropertyListContext, PropertyValueContext} + +import org.apache.spark.sql.catalyst.parser.ParserUtils.string + +/** + * AST builder that builds for common rule in Spark SQL grammar. + */ +trait SparkSqlAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + + override def visitPropertyList(ctx: PropertyListContext): FlintSparkIndexOptions = { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + FlintSparkIndexOptions(properties.toMap) + } + + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.STRING() != null) { + string(key.STRING()) + } else { + key.getText + } + } + + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index c412b6eb6..8dd8f1bd6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -9,8 +9,8 @@ import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} -import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext} import org.apache.spark.sql.catalyst.plans.logical.Command @@ -18,7 +18,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Command /** * Flint Spark AST builder that builds Spark command for Flint covering index statement. */ -trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] { +trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => override def visitCreateCoveringIndexStatement( ctx: CreateCoveringIndexStatementContext): Command = { @@ -35,10 +36,14 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C val colName = indexColCtx.multipartIdentifier().getText indexBuilder.addIndexColumns(colName) } - indexBuilder.create() + + val indexOptions = visitPropertyList(ctx.propertyList()) + indexBuilder + .options(indexOptions) + .create() // Trigger auto refresh if enabled - if (isAutoRefreshEnabled(ctx.propertyList())) { + if (indexOptions.autoRefresh()) { val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index c58972f30..c6cd0ae48 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -11,8 +11,8 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} -import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row @@ -23,7 +23,8 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint skipping index statement. */ -trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] { +trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => override def visitCreateSkippingIndexStatement( ctx: CreateSkippingIndexStatementContext): Command = @@ -42,10 +43,14 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C case MIN_MAX => indexBuilder.addMinMax(colName) } } - indexBuilder.create() + + val indexOptions = visitPropertyList(ctx.propertyList()) + indexBuilder + .options(indexOptions) + .create() // Trigger auto refresh if enabled - if (isAutoRefreshEnabled(ctx.propertyList())) { + if (indexOptions.autoRefresh()) { val indexName = getSkippingIndexName(flint, ctx.tableName) flint.refreshIndex(indexName, RefreshMode.INCREMENTAL) } From 519f1fb67574a070e36ad891773a68b4dfe25767 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 20 Sep 2023 16:39:54 -0700 Subject: [PATCH 2/8] Update more javadoc Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSparkIndexBuilder.scala | 8 ++++++++ .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 4 ++-- .../flint/spark/sql/FlintSparkSqlAstBuilder.scala | 8 +++++++- .../opensearch/flint/spark/sql/FlintSparkSqlParser.scala | 2 +- .../opensearch/flint/spark/sql/SparkSqlAstBuilder.scala | 3 ++- 5 files changed, 20 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 7171bf1af..95e351f7d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -34,6 +34,14 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { .toMap } + /** + * Add index options. + * + * @param options + * index options + * @return + * builder + */ def options(options: FlintSparkIndexOptions): this.type = { this.indexOptions = options this diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index c871a1900..2eebffebf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -33,9 +33,9 @@ import org.apache.spark.sql.types.StructType * @param indexedColumns * indexed column list */ -case class FlintSparkSkippingIndex( +class FlintSparkSkippingIndex( tableName: String, - indexedColumns: Seq[FlintSparkSkippingStrategy], + val indexedColumns: Seq[FlintSparkSkippingStrategy], override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 3a67625de..9f3539b09 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -5,11 +5,13 @@ package org.opensearch.flint.spark.sql -import org.antlr.v4.runtime.tree.RuleNode +import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + /** * Flint Spark AST builder that builds Spark command for Flint index statement. This class mix-in * all other AST builders and provides util methods. @@ -20,6 +22,10 @@ class FlintSparkSqlAstBuilder with FlintSparkCoveringIndexAstBuilder with SparkSqlAstBuilder { + override def visit(tree: ParseTree): LogicalPlan = { + tree.accept(this).asInstanceOf[LogicalPlan] + } + override def aggregateResult(aggregate: AnyRef, nextResult: AnyRef): AnyRef = if (nextResult != null) nextResult else aggregate } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala index 4bc828644..0fa146b9d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala @@ -54,7 +54,7 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { flintParser => try { - flintAstBuilder.visit(flintParser.singleStatement()).asInstanceOf[LogicalPlan] + flintAstBuilder.visit(flintParser.singleStatement()) } catch { // Fall back to Spark parse plan logic if flint cannot parse case _: ParseException => sparkParser.parsePlan(sqlText) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala index 8a5855d47..d4e4db104 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala @@ -15,7 +15,8 @@ import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{PropertyKey import org.apache.spark.sql.catalyst.parser.ParserUtils.string /** - * AST builder that builds for common rule in Spark SQL grammar. + * AST builder that builds for common rule in Spark SQL grammar. The main logic is modified slightly + * from Spark AstBuilder code. */ trait SparkSqlAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { From fcaa6e10c87057b089d7787baf4415abfa872083 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 21 Sep 2023 09:10:59 -0700 Subject: [PATCH 3/8] Pass index option to Flint metadata and streaming job Signed-off-by: Chen Dai --- .../src/main/antlr4/SparkSqlBase.g4 | 8 +++++++- .../org/opensearch/flint/spark/FlintSpark.scala | 12 +++++++++++- .../spark/covering/FlintSparkCoveringIndex.scala | 7 ++++++- .../spark/skipping/FlintSparkSkippingIndex.scala | 7 ++++++- 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 17627c190..edba43200 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -163,7 +163,6 @@ INDEX: 'INDEX'; ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; -STRING: 'STRING'; TRUE: 'TRUE'; WITH: 'WITH'; @@ -172,6 +171,13 @@ EQ : '=' | '=='; MINUS: '-'; +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + | 'R\'' (~'\'')* '\'' + | 'R"'(~'"')* '"' + ; + INTEGER_VALUE : DIGIT+ ; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 3aceeadb5..f18357615 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} import org.apache.spark.sql.streaming.OutputMode.Append +import org.apache.spark.sql.streaming.Trigger /** * Flint Spark integration API entrypoint. @@ -129,11 +130,20 @@ class FlintSpark(val spark: SparkSession) { .writeStream .queryName(indexName) .outputMode(Append()) + + index.options + .checkpointLocation() + .foreach(location => job.option("checkpointLocation", location)) + index.options + .refreshInterval() + .foreach(interval => job.trigger(Trigger.ProcessingTime(interval))) + + val jobId = job .foreachBatch { (batchDF: DataFrame, _: Long) => writeFlintIndex(batchDF) } .start() - Some(job.id.toString) + Some(jobId.toString) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 96b7f39bf..4c8ca7ecf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -51,7 +51,8 @@ class FlintSparkCoveringIndex( | "name": "$indexName", | "kind": "$kind", | "indexedColumns": $getMetaInfo, - | "source": "$tableName" + | "source": "$tableName", + | "options": $getIndexOptions | }, | "properties": $getSchema | } @@ -71,6 +72,10 @@ class FlintSparkCoveringIndex( Serialization.write(JArray(objects)) } + private def getIndexOptions: String = { + Serialization.write(options.options) + } + private def getSchema: String = { val catalogDDL = indexedColumns diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 2eebffebf..da69cc1fa 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -58,7 +58,8 @@ class FlintSparkSkippingIndex( | "version": "${FlintVersion.current()}", | "kind": "$SKIPPING_INDEX_TYPE", | "indexedColumns": $getMetaInfo, - | "source": "$tableName" + | "source": "$tableName", + | "options": $getIndexOptions | }, | "properties": $getSchema | } @@ -84,6 +85,10 @@ class FlintSparkSkippingIndex( Serialization.write(indexedColumns) } + private def getIndexOptions: String = { + Serialization.write(options.options) + } + private def getSchema: String = { val allFieldTypes = indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") From fdccca794942819d4982da8f567de74d1721272c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 21 Sep 2023 09:33:21 -0700 Subject: [PATCH 4/8] Fix IT and add new IT Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 2 +- .../FlintSparkCoveringIndexITSuite.scala | 3 ++- .../FlintSparkSkippingIndexITSuite.scala | 27 +++++++++++++++++-- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index f18357615..5d3ca4257 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -143,7 +143,7 @@ class FlintSpark(val spark: SparkSession) { writeFlintIndex(batchDF) } .start() - Some(jobId.toString) + Some(jobId.id.toString) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 20e4dca24..140ecdd77 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -56,7 +56,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "columnName": "age", | "columnType": "int" | }], - | "source": "default.ci_test" + | "source": "default.ci_test", + | "options": {} | }, | "properties": { | "name": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 5d31d8724..7d5598cd5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -78,7 +78,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnName": "age", | "columnType": "int" | }], - | "source": "default.test" + | "source": "default.test", + | "options": {} | }, | "properties": { | "year": { @@ -102,6 +103,27 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | } | } |""".stripMargin) + + index.get.options shouldBe FlintSparkIndexOptions.empty + } + + test("create skipping index with index options successfully") { + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .options(FlintSparkIndexOptions(Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> "s3a://test/" + ))) + .create() + + val index = flint.describeIndex(testIndex) + index shouldBe defined + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("1 Minute") + index.get.options.checkpointLocation() shouldBe Some("s3a://test/") } test("should not have ID column in index data") { @@ -484,7 +506,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnName": "struct_col", | "columnType": "struct" | }], - | "source": "$testTable" + | "source": "$testTable", + | "options": {} | }, | "properties": { | "boolean_col": { From ed6ba7f4038752576527572f7f13137762f68aa4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 21 Sep 2023 13:22:52 -0700 Subject: [PATCH 5/8] Add more IT Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 2 +- .../flint/spark/sql/SparkSqlAstBuilder.scala | 14 ++++++---- .../FlintSparkSkippingIndexSqlITSuite.scala | 26 +++++++++++++++---- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 5d3ca4257..dbc14ad50 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -174,8 +174,8 @@ class FlintSpark(val spark: SparkSession) { */ def deleteIndex(indexName: String): Boolean = { if (flintClient.exists(indexName)) { - flintClient.deleteIndex(indexName) stopRefreshingJob(indexName) + flintClient.deleteIndex(indexName) true } else { false diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala index d4e4db104..4dadd4d5e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala @@ -21,12 +21,16 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils.string trait SparkSqlAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { override def visitPropertyList(ctx: PropertyListContext): FlintSparkIndexOptions = { - val properties = ctx.property.asScala.map { property => - val key = visitPropertyKey(property.key) - val value = visitPropertyValue(property.value) - key -> value + if (ctx == null) { + FlintSparkIndexOptions.empty + } else { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + FlintSparkIndexOptions(properties.toMap) } - FlintSparkIndexOptions(properties.toMap) } override def visitPropertyKey(key: PropertyKeyContext): String = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index e01850b4f..5846cab23 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -7,16 +7,12 @@ package org.opensearch.flint.spark import scala.Option.empty -import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper -import org.apache.spark.FlintSuite -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.Row import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE -import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} -import org.apache.spark.sql.streaming.StreamTest class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { @@ -59,6 +55,26 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create skipping index with streaming job options") { + withTempDir { checkpointDir => + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | auto_refresh = true, + | refresh_interval = '5 Seconds', + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + val index = flint.describeIndex(testIndex) + index shouldBe defined + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("5 Seconds") + index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + } + } + test("create skipping index with manual refresh") { sql(s""" | CREATE SKIPPING INDEX ON $testTable From fa1173e571abda099f1b2da5067be65293b2d5ad Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 21 Sep 2023 14:01:48 -0700 Subject: [PATCH 6/8] Update job id var name Signed-off-by: Chen Dai --- .../org/opensearch/flint/spark/FlintSpark.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index dbc14ad50..d1f490113 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -138,12 +138,14 @@ class FlintSpark(val spark: SparkSession) { .refreshInterval() .foreach(interval => job.trigger(Trigger.ProcessingTime(interval))) - val jobId = job - .foreachBatch { (batchDF: DataFrame, _: Long) => - writeFlintIndex(batchDF) - } - .start() - Some(jobId.id.toString) + val jobId = + job + .foreachBatch { (batchDF: DataFrame, _: Long) => + writeFlintIndex(batchDF) + } + .start() + .id + Some(jobId.toString) } } From 0212b93de2c7f23b9d22767d644c03d7a63846af Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 26 Sep 2023 10:03:39 -0700 Subject: [PATCH 7/8] Update doc with index options Signed-off-by: Chen Dai --- docs/index.md | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/docs/index.md b/docs/index.md index 7698f98ac..7f8de2a7f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -125,7 +125,7 @@ CREATE SKIPPING INDEX ON ( column [, ...] ) WHERE -WITH (auto_refresh = (true|false)) +WITH ( options ) REFRESH SKIPPING INDEX ON @@ -164,7 +164,7 @@ DROP SKIPPING INDEX ON alb_logs CREATE INDEX name ON ( column [, ...] ) WHERE -WITH (auto_refresh = (true|false)) +WITH ( options ) REFRESH INDEX name ON @@ -190,6 +190,34 @@ DESCRIBE INDEX elb_and_requestUri ON alb_logs DROP INDEX elb_and_requestUri ON alb_logs ``` +#### Create Index Options + +User can provide the following options in `WITH` clause of create statement: + ++ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually. ++ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. ++ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. + +```sql +WITH ( + auto_refresh = [true|false], + refresh_interval = 'time interval expression', + checkpoint_location = 'checkpoint directory path' +) +``` + +Example: + +```sql +CREATE INDEX elb_and_requestUri +ON alb_logs ( elb, requestUri ) +WITH ( + auto_refresh = true, + refresh_interval = '1 minute', + checkpoint_location = 's3://test/' +) +``` + ## Index Store ### OpenSearch From 87417fc10bc3e0fa58f9a3af30f36e435d64c664 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 26 Sep 2023 13:59:36 -0700 Subject: [PATCH 8/8] Update doc with default behavior Signed-off-by: Chen Dai --- docs/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/index.md b/docs/index.md index 7f8de2a7f..7b8d7aef1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -195,8 +195,8 @@ DROP INDEX elb_and_requestUri ON alb_logs User can provide the following options in `WITH` clause of create statement: + `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually. -+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. -+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. ++ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. ++ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. ```sql WITH (