diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 2f9ec76e2..914a948fd 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -113,7 +113,11 @@ materializedViewQuery ; whereClause - : WHERE .+? + : WHERE filterCondition + ; + +filterCondition + : .+? ; indexColTypeList diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index cda11405c..6d680ae39 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -65,6 +65,7 @@ object FlintSparkIndexFactory { metadata.indexedColumns.map { colInfo => getString(colInfo, "columnName") -> getString(colInfo, "columnType") }.toMap, + getOptString(metadata.properties, "filterCondition"), indexOptions) case MV_INDEX_TYPE => FlintSparkMaterializedView( @@ -80,4 +81,13 @@ object FlintSparkIndexFactory { private def getString(map: java.util.Map[String, AnyRef], key: String): String = { map.get(key).asInstanceOf[String] } + + private def getOptString(map: java.util.Map[String, AnyRef], key: String): Option[String] = { + val value = map.get(key) + if (value == null) { + None + } else { + Some(value.asInstanceOf[String]) + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e9c2b5be5..91272309f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -29,6 +29,7 @@ case class FlintSparkCoveringIndex( indexName: String, tableName: String, indexedColumns: Map[String, String], + filterCondition: Option[String] = None, override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { @@ -46,17 +47,25 @@ case class FlintSparkCoveringIndex( } val schemaJson = generateSchemaJSON(indexedColumns) - metadataBuilder(this) + val builder = metadataBuilder(this) .name(indexName) .source(tableName) .indexedColumns(indexColumnMaps) .schema(schemaJson) - .build() + + // Add optional index properties + filterCondition.map(builder.addProperty("filterCondition", _)) + builder.build() } override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = { val colNames = indexedColumns.keys.toSeq - df.getOrElse(spark.read.table(tableName)) + val job = df.getOrElse(spark.read.table(tableName)) + + // Add optional filtering condition + filterCondition + .map(job.where) + .getOrElse(job) .select(colNames.head, colNames.tail: _*) } } @@ -95,6 +104,7 @@ object FlintSparkCoveringIndex { class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { private var indexName: String = "" private var indexedColumns: Map[String, String] = Map() + private var filterCondition: Option[String] = None /** * Set covering index name. @@ -137,7 +147,25 @@ object FlintSparkCoveringIndex { this } + /** + * Add filtering condition. + * + * @param condition + * filter condition + * @return + * index builder + */ + def filterBy(condition: String): Builder = { + filterCondition = Some(condition) + this + } + override protected def buildIndex(): FlintSparkIndex = - new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions) + new FlintSparkCoveringIndex( + indexName, + tableName, + indexedColumns, + filterCondition, + indexOptions) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 1ccdd6f07..83a816a58 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -27,12 +27,6 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitCreateCoveringIndexStatement( ctx: CreateCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => - // TODO: support filtering condition - if (ctx.whereClause() != null) { - throw new UnsupportedOperationException( - s"Filtering condition is not supported: ${getSqlText(ctx.whereClause())}") - } - val indexName = ctx.indexName.getText val tableName = getFullTableName(flint, ctx.tableName) val indexBuilder = @@ -46,6 +40,10 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A indexBuilder.addIndexColumns(colName) } + if (ctx.whereClause() != null) { + indexBuilder.filterBy(getSqlText(ctx.whereClause().filterCondition())) + } + val ignoreIfExists = ctx.EXISTS() != null val indexOptions = visitPropertyList(ctx.propertyList()) indexBuilder diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParserSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParserSuite.scala index 7190c22bb..87ea34582 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParserSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParserSuite.scala @@ -22,7 +22,7 @@ class FlintSparkSqlParserSuite extends FlintSuite with Matchers { } should have message "Filtering condition is not supported: WHERE status != 200" } - test("create covering index with filtering condition") { + ignore("create covering index with filtering condition") { the[UnsupportedOperationException] thrownBy { sql(""" | CREATE INDEX test ON alb_logs diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index a4b0069dd..c79069b9b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -40,6 +40,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .name(testIndex) .onTable(testTable) .addIndexColumns("name", "age") + .filterBy("age > 30") .create() val index = flint.describeIndex(testFlintIndex) @@ -60,7 +61,9 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | }], | "source": "spark_catalog.default.ci_test", | "options": { "auto_refresh": "false" }, - | "properties": {} + | "properties": { + | "filterCondition": "age > 30" + | } | }, | "properties": { | "name": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 0d3f7a887..14235b455 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -58,6 +58,22 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create covering index with filtering condition") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WHERE address = 'Portland' + | WITH (auto_refresh = true) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + awaitStreamingComplete(job.get.id.toString) + + val indexData = flint.queryIndex(testFlintIndex) + indexData.count() shouldBe 1 + } + test("create covering index with streaming job options") { withTempDir { checkpointDir => sql(s"""