diff --git a/docs/index.md b/docs/index.md index fe0b1e360..8f7a79a2e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -125,7 +125,7 @@ CREATE SKIPPING INDEX [IF NOT EXISTS] ON ( column [, ...] ) WHERE -WITH (auto_refresh = (true|false)) +WITH ( options ) REFRESH SKIPPING INDEX ON @@ -164,7 +164,7 @@ DROP SKIPPING INDEX ON alb_logs CREATE INDEX [IF NOT EXISTS] name ON ( column [, ...] ) WHERE -WITH (auto_refresh = (true|false)) +WITH ( options ) REFRESH INDEX name ON @@ -190,6 +190,34 @@ DESCRIBE INDEX elb_and_requestUri ON alb_logs DROP INDEX elb_and_requestUri ON alb_logs ``` +#### Create Index Options + +User can provide the following options in `WITH` clause of create statement: + ++ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually. ++ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. ++ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. + +```sql +WITH ( + auto_refresh = [true|false], + refresh_interval = 'time interval expression', + checkpoint_location = 'checkpoint directory path' +) +``` + +Example: + +```sql +CREATE INDEX elb_and_requestUri +ON alb_logs ( elb, requestUri ) +WITH ( + auto_refresh = true, + refresh_interval = '1 minute', + checkpoint_location = 's3://test/' +) +``` + ## Index Store ### OpenSearch diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 6c0e50017..4ac1ced5c 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -168,7 +168,6 @@ ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; -STRING: 'STRING'; TRUE: 'TRUE'; WITH: 'WITH'; @@ -177,6 +176,13 @@ EQ : '=' | '=='; MINUS: '-'; +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + | 'R\'' (~'\'')* '\'' + | 'R"'(~'"')* '"' + ; + INTEGER_VALUE : DIGIT+ ; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 57b360a98..b3de3c4b6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters._ import org.json4s.{Formats, JArray, NoTypeHints} +import org.json4s.JsonAST.{JField, JObject} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} @@ -30,6 +31,7 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} import org.apache.spark.sql.streaming.OutputMode.Append +import org.apache.spark.sql.streaming.Trigger /** * Flint Spark integration API entrypoint. @@ -132,11 +134,22 @@ class FlintSpark(val spark: SparkSession) { .writeStream .queryName(indexName) .outputMode(Append()) - .foreachBatch { (batchDF: DataFrame, _: Long) => - writeFlintIndex(batchDF) - } - .start() - Some(job.id.toString) + + index.options + .checkpointLocation() + .foreach(location => job.option("checkpointLocation", location)) + index.options + .refreshInterval() + .foreach(interval => job.trigger(Trigger.ProcessingTime(interval))) + + val jobId = + job + .foreachBatch { (batchDF: DataFrame, _: Long) => + writeFlintIndex(batchDF) + } + .start() + .id + Some(jobId.toString) } } @@ -179,8 +192,8 @@ class FlintSpark(val spark: SparkSession) { */ def deleteIndex(indexName: String): Boolean = { if (flintClient.exists(indexName)) { - flintClient.deleteIndex(indexName) stopRefreshingJob(indexName) + flintClient.deleteIndex(indexName) true } else { false @@ -227,6 +240,14 @@ class FlintSpark(val spark: SparkSession) { val tableName = (meta \ "source").extract[String] val indexType = (meta \ "kind").extract[String] val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray] + val indexOptions = FlintSparkIndexOptions( + (meta \ "options") + .asInstanceOf[JObject] + .obj + .map { case JField(key, value) => + key -> value.values.toString + } + .toMap) indexType match { case SKIPPING_INDEX_TYPE => @@ -246,14 +267,15 @@ class FlintSpark(val spark: SparkSession) { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - new FlintSparkSkippingIndex(tableName, strategies) + new FlintSparkSkippingIndex(tableName, strategies, indexOptions) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( indexName, tableName, indexedColumns.arr.map { obj => ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) - }.toMap) + }.toMap, + indexOptions) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 62e6b4668..6c09d625c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -19,6 +19,11 @@ trait FlintSparkIndex { */ val kind: String + /** + * Index options + */ + val options: FlintSparkIndexOptions + /** * @return * Flint index name diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index ed8a4ba56..2212826dc 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty + import org.apache.spark.sql.catalog.Column /** @@ -18,6 +20,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { /** Source table name */ protected var tableName: String = "" + /** Index options */ + protected var indexOptions: FlintSparkIndexOptions = empty + /** All columns of the given source table */ lazy protected val allColumns: Map[String, Column] = { require(tableName.nonEmpty, "Source table name is not provided") @@ -29,6 +34,19 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { .toMap } + /** + * Add index options. + * + * @param options + * index options + * @return + * builder + */ + def options(options: FlintSparkIndexOptions): this.type = { + this.indexOptions = options + this + } + /** * Create Flint index. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala new file mode 100644 index 000000000..f348c12a0 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +/** + * Flint Spark index configurable options. + * + * @param options + * index option mappings + */ +case class FlintSparkIndexOptions(options: Map[String, String]) { + + /** + * Is Flint index auto refreshed or manual refreshed. + * + * @return + * auto refresh option value + */ + def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean + + /** + * The refresh interval (only valid if auto refresh enabled). + * + * @return + * refresh interval expression + */ + def refreshInterval(): Option[String] = options.get("refresh_interval") + + /** + * The checkpoint location which maybe required by Flint index's refresh. + * + * @return + * checkpoint location path + */ + def checkpointLocation(): Option[String] = options.get("checkpoint_location") +} + +object FlintSparkIndexOptions { + + /** + * Empty options + */ + val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty) +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 46a2a52b5..29503919d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -10,8 +10,9 @@ import org.json4s.JsonAST.{JArray, JObject, JString} import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql.DataFrame @@ -31,7 +32,8 @@ import org.apache.spark.sql.types.StructType case class FlintSparkCoveringIndex( indexName: String, tableName: String, - indexedColumns: Map[String, String]) + indexedColumns: Map[String, String], + override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -49,7 +51,8 @@ case class FlintSparkCoveringIndex( | "name": "$indexName", | "kind": "$kind", | "indexedColumns": $getMetaInfo, - | "source": "$tableName" + | "source": "$tableName", + | "options": $getIndexOptions | }, | "properties": $getSchema | } @@ -69,6 +72,10 @@ case class FlintSparkCoveringIndex( Serialization.write(JArray(objects)) } + private def getIndexOptions: String = { + Serialization.write(options.options) + } + private def getSchema: String = { val catalogDDL = indexedColumns @@ -154,6 +161,6 @@ object FlintSparkCoveringIndex { } override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(indexName, tableName, indexedColumns) + new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 325f40254..da69cc1fa 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -10,8 +10,9 @@ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy @@ -34,7 +35,8 @@ import org.apache.spark.sql.types.StructType */ class FlintSparkSkippingIndex( tableName: String, - val indexedColumns: Seq[FlintSparkSkippingStrategy]) + val indexedColumns: Seq[FlintSparkSkippingStrategy], + override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") @@ -56,7 +58,8 @@ class FlintSparkSkippingIndex( | "version": "${FlintVersion.current()}", | "kind": "$SKIPPING_INDEX_TYPE", | "indexedColumns": $getMetaInfo, - | "source": "$tableName" + | "source": "$tableName", + | "options": $getIndexOptions | }, | "properties": $getSchema | } @@ -82,6 +85,10 @@ class FlintSparkSkippingIndex( Serialization.write(indexedColumns) } + private def getIndexOptions: String = { + Serialization.write(options.options) + } + private def getSchema: String = { val allFieldTypes = indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") @@ -192,7 +199,7 @@ object FlintSparkSkippingIndex { } override def buildIndex(): FlintSparkIndex = - new FlintSparkSkippingIndex(tableName, indexedColumns) + new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions) private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = { require( diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 5c03c1a26..9f3539b09 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -5,50 +5,33 @@ package org.opensearch.flint.spark.sql -import org.antlr.v4.runtime.tree.RuleNode +import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.PropertyListContext import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder -import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** - * Flint Spark AST builder that builds Spark command for Flint index statement. - * This class mix-in all other AST builders and provides util methods. + * Flint Spark AST builder that builds Spark command for Flint index statement. This class mix-in + * all other AST builders and provides util methods. */ class FlintSparkSqlAstBuilder - extends FlintSparkSqlExtensionsBaseVisitor[Command] + extends FlintSparkSqlExtensionsBaseVisitor[AnyRef] with FlintSparkSkippingIndexAstBuilder - with FlintSparkCoveringIndexAstBuilder { + with FlintSparkCoveringIndexAstBuilder + with SparkSqlAstBuilder { - override def aggregateResult(aggregate: Command, nextResult: Command): Command = + override def visit(tree: ParseTree): LogicalPlan = { + tree.accept(this).asInstanceOf[LogicalPlan] + } + + override def aggregateResult(aggregate: AnyRef, nextResult: AnyRef): AnyRef = if (nextResult != null) nextResult else aggregate } object FlintSparkSqlAstBuilder { - /** - * Check if auto_refresh is true in property list. - * - * @param ctx - * property list - */ - def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = { - if (ctx == null) { - false - } else { - ctx - .property() - .forEach(p => { - if (p.key.getText == "auto_refresh") { - return p.value.getText.toBoolean - } - }) - false - } - } - /** * Get full table name if database not specified. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala new file mode 100644 index 000000000..4dadd4d5e --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/SparkSqlAstBuilder.scala @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql + +import java.util.Locale + +import scala.collection.JavaConverters.asScalaBufferConverter + +import org.opensearch.flint.spark.FlintSparkIndexOptions +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{PropertyKeyContext, PropertyListContext, PropertyValueContext} + +import org.apache.spark.sql.catalyst.parser.ParserUtils.string + +/** + * AST builder that builds for common rule in Spark SQL grammar. The main logic is modified slightly + * from Spark AstBuilder code. + */ +trait SparkSqlAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + + override def visitPropertyList(ctx: PropertyListContext): FlintSparkIndexOptions = { + if (ctx == null) { + FlintSparkIndexOptions.empty + } else { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + FlintSparkIndexOptions(properties.toMap) + } + } + + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.STRING() != null) { + string(key.STRING()) + } else { + key.getText + } + } + + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 7249cf244..65a87c568 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -9,8 +9,8 @@ import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} -import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row @@ -21,7 +21,8 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint covering index statement. */ -trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] { +trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => override def visitCreateCoveringIndexStatement( ctx: CreateCoveringIndexStatementContext): Command = { @@ -40,10 +41,13 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C } val ignoreIfExists = ctx.EXISTS() != null - indexBuilder.create(ignoreIfExists) + val indexOptions = visitPropertyList(ctx.propertyList()) + indexBuilder + .options(indexOptions) + .create(ignoreIfExists) // Trigger auto refresh if enabled - if (isAutoRefreshEnabled(ctx.propertyList())) { + if (indexOptions.autoRefresh()) { val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 2a3b6e6c1..dc8132a25 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -11,8 +11,8 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} -import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} -import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row @@ -23,7 +23,8 @@ import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint skipping index statement. */ -trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] { +trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => override def visitCreateSkippingIndexStatement( ctx: CreateSkippingIndexStatementContext): Command = @@ -44,10 +45,13 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C } val ignoreIfExists = ctx.EXISTS() != null - indexBuilder.create(ignoreIfExists) + val indexOptions = visitPropertyList(ctx.propertyList()) + indexBuilder + .options(indexOptions) + .create(ignoreIfExists) // Trigger auto refresh if enabled - if (isAutoRefreshEnabled(ctx.propertyList())) { + if (indexOptions.autoRefresh()) { val indexName = getSkippingIndexName(flint, ctx.tableName) flint.refreshIndex(indexName, RefreshMode.INCREMENTAL) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 20e4dca24..140ecdd77 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -56,7 +56,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "columnName": "age", | "columnType": "int" | }], - | "source": "default.ci_test" + | "source": "default.ci_test", + | "options": {} | }, | "properties": { | "name": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 5d31d8724..7d5598cd5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -78,7 +78,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnName": "age", | "columnType": "int" | }], - | "source": "default.test" + | "source": "default.test", + | "options": {} | }, | "properties": { | "year": { @@ -102,6 +103,27 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | } | } |""".stripMargin) + + index.get.options shouldBe FlintSparkIndexOptions.empty + } + + test("create skipping index with index options successfully") { + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .options(FlintSparkIndexOptions(Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> "s3a://test/" + ))) + .create() + + val index = flint.describeIndex(testIndex) + index shouldBe defined + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("1 Minute") + index.get.options.checkpointLocation() shouldBe Some("s3a://test/") } test("should not have ID column in index data") { @@ -484,7 +506,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnName": "struct_col", | "columnType": "struct" | }], - | "source": "$testTable" + | "source": "$testTable", + | "options": {} | }, | "properties": { | "boolean_col": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index f4c7fb13c..eec9dfca9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -7,7 +7,6 @@ package org.opensearch.flint.spark import scala.Option.empty -import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -56,6 +55,26 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create skipping index with streaming job options") { + withTempDir { checkpointDir => + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | auto_refresh = true, + | refresh_interval = '5 Seconds', + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + val index = flint.describeIndex(testIndex) + index shouldBe defined + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("5 Seconds") + index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + } + } + test("create skipping index with manual refresh") { sql(s""" | CREATE SKIPPING INDEX ON $testTable