From 7b398003a48b73e44cd67395e52b4b3126e8e83b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 2 Nov 2023 10:59:46 -0700 Subject: [PATCH 01/13] Add filter condition in Flint API layer and fix IT Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexFactory.scala | 6 ++++- .../skipping/FlintSparkSkippingIndex.scala | 26 ++++++++++++++++--- .../FlintSparkSkippingIndexAstBuilder.scala | 7 ----- .../FlintSparkSkippingIndexSuite.scala | 2 +- .../FlintSparkSkippingIndexITSuite.scala | 9 ++++--- 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index 6d680ae39..503414e85 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -57,7 +57,11 @@ object FlintSparkIndexFactory { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) + FlintSparkSkippingIndex( + metadata.source, + strategies, + getOptString(metadata.properties, "filterCondition"), + indexOptions) case COVERING_INDEX_TYPE => FlintSparkCoveringIndex( metadata.name, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index eb2075b63..e79aa7ddd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.functions.{col, input_file_name, sha1} case class FlintSparkSkippingIndex( tableName: String, indexedColumns: Seq[FlintSparkSkippingStrategy], + filterCondition: Option[String] = None, override val options: FlintSparkIndexOptions = empty) extends FlintSparkIndex { @@ -59,12 +60,15 @@ case class FlintSparkSkippingIndex( .toMap + (FILE_PATH_COLUMN -> "string") val schemaJson = generateSchemaJSON(fieldTypes) - metadataBuilder(this) - .name(name()) + val builder = metadataBuilder(this) + .name("") // skipping index is unique per table without name .source(tableName) .indexedColumns(indexColumnMaps) .schema(schemaJson) - .build() + + // Add optional index properties + filterCondition.map(builder.addProperty("filterCondition", _)) + builder.build() } override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = { @@ -118,6 +122,7 @@ object FlintSparkSkippingIndex { /** Builder class for skipping index build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { private var indexedColumns: Seq[FlintSparkSkippingStrategy] = Seq() + private var filterCondition: Option[String] = None /** * Configure which source table the index is based on. @@ -181,8 +186,21 @@ object FlintSparkSkippingIndex { this } + /** + * Add filtering condition. + * + * @param condition + * filter condition + * @return + * index builder + */ + def filterBy(condition: String): Builder = { + filterCondition = Some(condition) + this + } + override def buildIndex(): FlintSparkIndex = - new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions) + new FlintSparkSkippingIndex(tableName, indexedColumns, filterCondition, indexOptions) private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = { require( diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 2b0bb6c48..21288e599 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -29,13 +29,6 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A override def visitCreateSkippingIndexStatement( ctx: CreateSkippingIndexStatementContext): Command = FlintSparkSqlCommand() { flint => - // TODO: support filtering condition - if (ctx.whereClause() != null) { - throw new UnsupportedOperationException( - s"Filtering condition is not supported: ${getSqlText(ctx.whereClause())}") - } - - // Create skipping index val indexBuilder = flint .skippingIndex() .onTable(getFullTableName(flint, ctx.tableName)) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index d52c43842..37e9e4395 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -40,7 +40,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { val metadata = index.metadata() metadata.kind shouldBe SKIPPING_INDEX_TYPE - metadata.name shouldBe index.name() + metadata.name shouldBe "" metadata.source shouldBe testTable metadata.indexedColumns shouldBe Array( Map( diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e3fb467e6..136a76503 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -48,13 +48,14 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .addValueSet("address") .addMinMax("age") + .filterBy("age > 30") .create() val index = flint.describeIndex(testIndex) index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { - | "name": "flint_spark_catalog_default_test_skipping_index", + | "name": "", | "version": "${current()}", | "kind": "skipping", | "indexedColumns": [ @@ -80,7 +81,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | }], | "source": "spark_catalog.default.test", | "options": { "auto_refresh": "false" }, - | "properties": {} + | "properties": { + | "filterCondition": "age > 30" + | } | }, | "properties": { | "year": { @@ -452,7 +455,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { index.get.metadata().getContent should matchJson( s"""{ | "_meta": { - | "name": "flint_spark_catalog_default_data_type_table_skipping_index", + | "name": "", | "version": "${current()}", | "kind": "skipping", | "indexedColumns": [ From 7bafbc7971236054c60f965cdd82a28014e7ac7b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 2 Nov 2023 11:14:21 -0700 Subject: [PATCH 02/13] Add ast builder and SQL IT Signed-off-by: Chen Dai --- .../skipping/FlintSparkSkippingIndex.scala | 9 +++++- .../FlintSparkSkippingIndexAstBuilder.scala | 4 +++ .../FlintSparkSkippingIndexSqlITSuite.scala | 28 +++++++++++++------ .../flint/spark/FlintSparkSuite.scala | 16 +++++++++-- 4 files changed, 45 insertions(+), 12 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index e79aa7ddd..3a555db01 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -81,7 +81,14 @@ case class FlintSparkSkippingIndex( new Column(aggFunc.toAggregateExpression().as(name)) } - df.getOrElse(spark.read.table(tableName)) + var job = df.getOrElse(spark.read.table(tableName)) + + // Add optional filtering condition + if (filterCondition.isDefined) { + job = job.where(filterCondition.get) + } + + job .groupBy(input_file_name().as(FILE_PATH_COLUMN)) .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 21288e599..558615c91 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -43,6 +43,10 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + if (ctx.whereClause() != null) { + indexBuilder.filterBy(getSqlText(ctx.whereClause().filterCondition())) + } + val ignoreIfExists = ctx.EXISTS() != null val indexOptions = visitPropertyList(ctx.propertyList()) indexBuilder diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 21de15de7..8975b7072 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -14,7 +14,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -50,18 +50,28 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | WITH (auto_refresh = true) | """.stripMargin) - // Wait for streaming job complete current micro batch - val job = spark.streams.active.find(_.name == testIndex) - job shouldBe defined - failAfter(streamingTimeout) { - job.get.processAllAvailable() - } - - val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex) + val indexData = awaitStreamingDataComplete(testIndex) flint.describeIndex(testIndex) shouldBe defined indexData.count() shouldBe 2 } + test("create skipping index with filtering condition") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | year PARTITION, + | name VALUE_SET, + | age MIN_MAX + | ) + | WHERE address = 'Portland' + | WITH (auto_refresh = true) + | """.stripMargin) + + val indexData = awaitStreamingDataComplete(testIndex) + flint.describeIndex(testIndex) shouldBe defined + indexData.count() shouldBe 1 + } + test("create skipping index with streaming job options") { withTempDir { checkpointDir => sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 168279eb3..3b6accd83 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -6,11 +6,13 @@ package org.opensearch.flint.spark import org.opensearch.flint.OpenSearchSuite - import org.apache.spark.FlintSuite -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} import org.apache.spark.sql.streaming.StreamTest +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper /** * Flint Spark suite trait that initializes [[FlintSpark]] API instance. @@ -31,6 +33,16 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit setFlintSparkConf(CHECKPOINT_MANDATORY, "false") } + protected def awaitStreamingDataComplete(flintIndexName: String): DataFrame = { + val job = spark.streams.active.find(_.name == flintIndexName) + job shouldBe defined + + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + spark.read.format(FLINT_DATASOURCE).load(flintIndexName) + } + protected def awaitStreamingComplete(jobId: String): Unit = { val job = spark.streams.get(jobId) failAfter(streamingTimeout) { From ea8b2a7e13531e5ec922f60314422cdbccd098e0 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 2 Nov 2023 11:16:00 -0700 Subject: [PATCH 03/13] Remove unused parser IT Signed-off-by: Chen Dai --- .../spark/sql/FlintSparkSqlParserSuite.scala | 35 ------------------- 1 file changed, 35 deletions(-) delete mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParserSuite.scala diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParserSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParserSuite.scala deleted file mode 100644 index 87ea34582..000000000 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParserSuite.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark.sql - -import org.scalatest.matchers.should.Matchers - -import org.apache.spark.FlintSuite - -class FlintSparkSqlParserSuite extends FlintSuite with Matchers { - - test("create skipping index with filtering condition") { - the[UnsupportedOperationException] thrownBy { - sql(""" - | CREATE SKIPPING INDEX ON alb_logs - | (client_ip VALUE_SET) - | WHERE status != 200 - | WITH (auto_refresh = true) - |""".stripMargin) - } should have message "Filtering condition is not supported: WHERE status != 200" - } - - ignore("create covering index with filtering condition") { - the[UnsupportedOperationException] thrownBy { - sql(""" - | CREATE INDEX test ON alb_logs - | (elb, client_ip) - | WHERE status != 404 - | WITH (auto_refresh = true) - |""".stripMargin) - } should have message "Filtering condition is not supported: WHERE status != 404" - } -} From bc50661dee786358451718838bb07ca570213de4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 2 Nov 2023 11:21:33 -0700 Subject: [PATCH 04/13] Fix scalafmt Signed-off-by: Chen Dai --- .../scala/org/opensearch/flint/spark/FlintSparkSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 3b6accd83..33650b899 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -6,13 +6,14 @@ package org.opensearch.flint.spark import org.opensearch.flint.OpenSearchSuite +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + import org.apache.spark.FlintSuite import org.apache.spark.sql.{DataFrame, QueryTest} import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} import org.apache.spark.sql.streaming.StreamTest -import org.scalatest.matchers.must.Matchers.defined -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper /** * Flint Spark suite trait that initializes [[FlintSpark]] API instance. From cf4034a186536d37e574ae4156934fbc57f8fdc4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 6 Nov 2023 09:24:14 -0800 Subject: [PATCH 05/13] Add conjunction check for skipping and covering index Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexUtils.scala | 41 +++++++++++++++++++ .../covering/FlintSparkCoveringIndex.scala | 2 + .../ApplyFlintSparkSkippingIndex.scala | 11 ++--- .../skipping/FlintSparkSkippingIndex.scala | 10 ++++- .../FlintSparkCoveringIndexSuite.scala | 18 ++++++++ .../FlintSparkSkippingIndexSuite.scala | 16 ++++++++ 6 files changed, 88 insertions(+), 10 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexUtils.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexUtils.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexUtils.scala new file mode 100644 index 000000000..6b19aa092 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexUtils.scala @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.apache.spark.sql.catalyst.expressions.{Expression, Or} +import org.apache.spark.sql.functions.expr + +/** + * Flint Spark index utility methods. + */ +object FlintSparkIndexUtils { + + /** + * Is the given Spark predicate string a conjunction + * + * @param condition + * predicate condition string + * @return + * true if yes, otherwise false + */ + def isConjunction(condition: String): Boolean = { + isConjunction(expr(condition).expr) + } + + /** + * Is the given Spark predicate a conjunction + * + * @param condition + * predicate condition + * @return + * true if yes, otherwise false + */ + def isConjunction(condition: Expression): Boolean = { + condition.collectFirst { case Or(_, _) => + true + }.isEmpty + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 91272309f..845af17f2 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -11,6 +11,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty +import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql._ @@ -34,6 +35,7 @@ case class FlintSparkCoveringIndex( extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") + require(filterCondition.forall(isConjunction), "filtering condition must be conjunction") override val kind: String = COVERING_INDEX_TYPE diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index 11f8ad304..b29307ed7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -6,10 +6,11 @@ package org.opensearch.flint.spark.skipping import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or, Predicate} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, Predicate} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} @@ -33,7 +34,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] _, Some(table), false)) - if hasNoDisjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] => + if isConjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] => val index = flint.describeIndex(getIndexName(table)) if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] @@ -67,12 +68,6 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] getSkippingIndexName(qualifiedTableName) } - private def hasNoDisjunction(condition: Expression): Boolean = { - condition.collectFirst { case Or(_, _) => - true - }.isEmpty - } - private def rewriteToIndexFilter( index: FlintSparkSkippingIndex, condition: Expression): Option[Expression] = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 3a555db01..0956f9273 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -11,6 +11,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex._ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty +import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy @@ -18,7 +19,7 @@ import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression -import org.apache.spark.sql.functions.{col, input_file_name, sha1} +import org.apache.spark.sql.functions.{col, expr, input_file_name, sha1} /** * Flint skipping index in Spark. @@ -36,6 +37,7 @@ case class FlintSparkSkippingIndex( extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") + require(filterCondition.forall(isConjunction), "filtering condition must be conjunction") /** Skipping index type */ override val kind: String = SKIPPING_INDEX_TYPE @@ -85,7 +87,11 @@ case class FlintSparkSkippingIndex( // Add optional filtering condition if (filterCondition.isDefined) { - job = job.where(filterCondition.get) + if (isConjunction(expr(filterCondition.get).expr)) { // TODO: do the same for covering and add UT/IT + job = job.where(filterCondition.get) + } else { + throw new IllegalStateException("Filtering condition is not conjunction") + } } job diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 8c144b46b..819052b0c 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -24,6 +24,24 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { } } + test("should succeed if filtering condition is conjunction") { + new FlintSparkCoveringIndex( + "ci", + "test", + Map("name" -> "string"), + Some("test_field1 = 1 AND test_field2 = 2")) + } + + test("should fail if filtering condition is not conjunction") { + assertThrows[IllegalArgumentException] { + new FlintSparkCoveringIndex( + "ci", + "test", + Map("name" -> "string"), + Some("test_field1 = 1 OR test_field2 = 2")) + } + } + test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { new FlintSparkCoveringIndex("ci", "default.test", Map.empty) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 37e9e4395..c87d91999 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -49,6 +49,22 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { "columnType" -> "integer").asJava) } + test("should succeed if filtering condition is conjunction") { + new FlintSparkSkippingIndex( + testTable, + Seq(mock[FlintSparkSkippingStrategy]), + Some("test_field1 = 1 AND test_field2 = 2")) + } + + test("should fail if filtering condition is not conjunction") { + assertThrows[IllegalArgumentException] { + new FlintSparkSkippingIndex( + testTable, + Seq(mock[FlintSparkSkippingStrategy]), + Some("test_field1 = 1 OR test_field2 = 2")) + } + } + test("can build index building job with unique ID column") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) From 87f53b2ceca498b02e764abf44a13b9abf6c4520 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 8 Nov 2023 21:09:42 -0800 Subject: [PATCH 06/13] Enforce hybrid scan if skipping index is partial Signed-off-by: Chen Dai --- .../spark/skipping/ApplyFlintSparkSkippingIndex.scala | 9 ++++++++- .../spark/skipping/FlintSparkSkippingFileIndex.scala | 5 +++-- .../skipping/ApplyFlintSparkSkippingIndexSuite.scala | 1 + 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index b29307ed7..eee5999ef 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -14,6 +14,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Expression, Predicate} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.qualifyTableName /** @@ -48,8 +49,14 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] * |- FileIndex <== replaced with FlintSkippingFileIndex */ if (indexFilter.isDefined) { + // Enforce hybrid scan if skipping index is partial + val isHybridScan = + if (skippingIndex.filterCondition.isDefined) true + else FlintSparkConf().isHybridScanEnabled + val indexScan = flint.queryIndex(skippingIndex.name()) - val fileIndex = FlintSparkSkippingFileIndex(location, indexScan, indexFilter.get) + val fileIndex = + FlintSparkSkippingFileIndex(location, indexScan, indexFilter.get, isHybridScan) val indexRelation = baseRelation.copy(location = fileIndex)(baseRelation.sparkSession) filter.copy(child = relation.copy(relation = indexRelation)) } else { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index bd7abcfb3..0fd7b9484 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.types.StructType case class FlintSparkSkippingFileIndex( baseFileIndex: FileIndex, indexScan: DataFrame, - indexFilter: Expression) + indexFilter: Expression, + isHybridScanMode: Boolean = FlintSparkConf().isHybridScanEnabled) extends FileIndex { override def listFiles( @@ -36,7 +37,7 @@ case class FlintSparkSkippingFileIndex( // TODO: make this listFile call only in hybrid scan mode val partitions = baseFileIndex.listFiles(partitionFilters, dataFilters) val selectedFiles = - if (FlintSparkConf().isHybridScanEnabled) { + if (isHybridScanMode) { selectFilesFromIndexAndSource(partitions) } else { selectFilesFromIndexOnly() diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala index f9455fbfa..bbc214832 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala @@ -144,6 +144,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { when(skippingIndex.kind).thenReturn(SKIPPING_INDEX_TYPE) when(skippingIndex.name()).thenReturn(indexName) when(skippingIndex.indexedColumns).thenReturn(indexCols.map(FakeSkippingStrategy)) + when(skippingIndex.filterCondition).thenReturn(None) when(flint.describeIndex(any())).thenReturn(Some(skippingIndex)) this From 194f1aebd9f782e6cd234a7485c28299f2ec0267 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 9 Nov 2023 10:12:52 -0800 Subject: [PATCH 07/13] Add IT for partial index query rewrite Signed-off-by: Chen Dai --- .../skipping/FlintSparkSkippingIndex.scala | 8 +-- .../FlintSparkSkippingIndexITSuite.scala | 51 +++++++++++++++++-- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 0956f9273..0a8e98fc5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -19,7 +19,7 @@ import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression -import org.apache.spark.sql.functions.{col, expr, input_file_name, sha1} +import org.apache.spark.sql.functions.{col, input_file_name, sha1} /** * Flint skipping index in Spark. @@ -87,11 +87,7 @@ case class FlintSparkSkippingIndex( // Add optional filtering condition if (filterCondition.isDefined) { - if (isConjunction(expr(filterCondition.get).expr)) { // TODO: do the same for covering and add UT/IT - job = job.where(filterCondition.get) - } else { - throw new IllegalStateException("Filtering condition is not conjunction") - } + job = job.where(filterCondition.get) } job diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 136a76503..90e7caecb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -267,7 +267,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { checkAnswer(query, Row("Hello")) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023 && col("month") === 4)) + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("year") === 2023 && col("month") === 4) + and hasScanMode(isHybridScanExpected = false)) } test("can build value set skipping index and rewrite applicable query") { @@ -286,7 +288,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { checkAnswer(query, Row("World")) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex(hasIndexFilter(col("address") === "Portland")) + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("address") === "Portland") + and hasScanMode(isHybridScanExpected = false)) } test("can build min max skipping index and rewrite applicable query") { @@ -306,7 +310,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { checkAnswer(query, Row("World")) query.queryExecution.executedPlan should useFlintSparkSkippingFileIndex( - hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25)) + hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25) + and hasScanMode(isHybridScanExpected = false)) } test("should rewrite applicable query with table name without database specified") { @@ -325,7 +330,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { query.queryExecution.executedPlan should useFlintSparkSkippingFileIndex( - hasIndexFilter(col("year") === 2023)) + hasIndexFilter(col("year") === 2023) + and hasScanMode(isHybridScanExpected = false)) } test("should not rewrite original query if filtering condition has disjunction") { @@ -379,10 +385,36 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | WHERE month = 4 |""".stripMargin) + query.queryExecution.executedPlan should + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("month") === 4) + and hasScanMode(isHybridScanExpected = true)) + checkAnswer(query, Seq(Row("Seattle"), Row("Vancouver"))) } } + test("should rewrite applicable query to scan latest source files if partial index") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("month") + .filterBy("month > 4") + .create() + flint.refreshIndex(testIndex, FULL) + + val query = sql(s""" + | SELECT address + | FROM $testTable + | WHERE month = 4 + |""".stripMargin) + + query.queryExecution.executedPlan should + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("month") === 4) + and hasScanMode(isHybridScanExpected = true)) + } + test("should return empty if describe index not exist") { flint.describeIndex("non-exist") shouldBe empty } @@ -670,6 +702,17 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + def hasScanMode(isHybridScanExpected: Boolean): Matcher[FlintSparkSkippingFileIndex] = { + Matcher { (fileIndex: FlintSparkSkippingFileIndex) => + val hasExpectedScanMode = fileIndex.isHybridScanMode == isHybridScanExpected + + MatchResult( + hasExpectedScanMode, + "FlintSparkSkippingFileIndex does not have expected scan mode", + "FlintSparkSkippingFileIndex has expected scan mode") + } + } + private def withFlintOptimizerDisabled(block: => Unit): Unit = { spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") try { From c65671edad9bed6191f186769367891ec31220be Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 9 Nov 2023 10:35:51 -0800 Subject: [PATCH 08/13] Add UT for partial skipping index Signed-off-by: Chen Dai --- .../ApplyFlintSparkSkippingIndexSuite.scala | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala index bbc214832..dc2590fb2 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala @@ -78,7 +78,16 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { .withSourceTable(testTable, testSchema) .withFilter(EqualTo(nameCol, Literal("hello"))) .withSkippingIndex(testIndex, "name") - .shouldPushDownAfterRewrite(col("name") === "hello") + .shouldPushDownAfterRewrite(col("name") === "hello", isHybridScanExpected = false) + } + + test("should rewrite query with partial skipping index") { + assertFlintQueryRewriter() + .withSourceTable(testTable, testSchema) + .withFilter(EqualTo(nameCol, Literal("hello"))) + .withSkippingIndex(testIndex, "name") + .withSkippingIndexFilter("age > 30") + .shouldPushDownAfterRewrite(col("name") === "hello", isHybridScanExpected = true) } test("should only push down filter condition with indexed column") { @@ -86,7 +95,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { .withSourceTable(testTable, testSchema) .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) .withSkippingIndex(testIndex, "name") - .shouldPushDownAfterRewrite(col("name") === "hello") + .shouldPushDownAfterRewrite(col("name") === "hello", isHybridScanExpected = false) } test("should push down all filter conditions with indexed column") { @@ -94,7 +103,9 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { .withSourceTable(testTable, testSchema) .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) .withSkippingIndex(testIndex, "name", "age") - .shouldPushDownAfterRewrite(col("name") === "hello" && col("age") === 30) + .shouldPushDownAfterRewrite( + col("name") === "hello" && col("age") === 30, + isHybridScanExpected = false) assertFlintQueryRewriter() .withSourceTable(testTable, testSchema) @@ -104,7 +115,8 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { And(EqualTo(ageCol, Literal(30)), EqualTo(addressCol, Literal("Seattle"))))) .withSkippingIndex(testIndex, "name", "age", "address") .shouldPushDownAfterRewrite( - col("name") === "hello" && col("age") === 30 && col("address") === "Seattle") + col("name") === "hello" && col("age") === 30 && col("address") === "Seattle", + isHybridScanExpected = false) } private def assertFlintQueryRewriter(): AssertionHelper = { @@ -118,6 +130,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { .thenReturn("spark_catalog") mockFlint } + private val skippingIndex = mock[FlintSparkSkippingIndex] private val rule = new ApplyFlintSparkSkippingIndex(flint) private var relation: LogicalRelation = _ private var plan: LogicalPlan = _ @@ -140,7 +153,6 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { } def withSkippingIndex(indexName: String, indexCols: String*): AssertionHelper = { - val skippingIndex = mock[FlintSparkSkippingIndex] when(skippingIndex.kind).thenReturn(SKIPPING_INDEX_TYPE) when(skippingIndex.name()).thenReturn(indexName) when(skippingIndex.indexedColumns).thenReturn(indexCols.map(FakeSkippingStrategy)) @@ -150,13 +162,18 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { this } + def withSkippingIndexFilter(filterCondition: String): AssertionHelper = { + when(skippingIndex.filterCondition).thenReturn(Some(filterCondition)) + this + } + def withNoSkippingIndex(): AssertionHelper = { when(flint.describeIndex(any())).thenReturn(None) this } - def shouldPushDownAfterRewrite(expected: Column): Unit = { - rule.apply(plan) should pushDownFilterToIndexScan(expected) + def shouldPushDownAfterRewrite(expectCol: Column, isHybridScanExpected: Boolean): Unit = { + rule.apply(plan) should pushDownFilterToIndexScan(expectCol, isHybridScanExpected) } def shouldNotRewrite(): Unit = { @@ -181,14 +198,18 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { baseRelation } - private def pushDownFilterToIndexScan(expect: Column): Matcher[LogicalPlan] = { + private def pushDownFilterToIndexScan( + expectCol: Column, + isHybridScanExpected: Boolean): Matcher[LogicalPlan] = { Matcher { (plan: LogicalPlan) => val useFlintSparkSkippingFileIndex = plan.exists { case LogicalRelation( HadoopFsRelation(fileIndex: FlintSparkSkippingFileIndex, _, _, _, _, _), _, _, - _) if fileIndex.indexFilter.semanticEquals(expect.expr) => + _) + if (fileIndex.indexFilter.semanticEquals( + expectCol.expr)) && (fileIndex.isHybridScanMode == isHybridScanExpected) => true case _ => false } From e61f3dd854c6852852677d9b95e6b0fcab70233e Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 9 Nov 2023 16:08:38 -0800 Subject: [PATCH 09/13] Fix hybrid scan bug Signed-off-by: Chen Dai --- .../FlintSparkSkippingFileIndex.scala | 5 +- .../FlintSparkSkippingIndexITSuite.scala | 16 +++- .../FlintSparkSkippingIndexSqlITSuite.scala | 92 ++++++++++++++++--- 3 files changed, 94 insertions(+), 19 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index 0fd7b9484..b53324f4c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -12,7 +12,7 @@ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} import org.apache.spark.sql.flint.config.FlintSparkConf -import org.apache.spark.sql.functions.isnull +import org.apache.spark.sql.functions.not import org.apache.spark.sql.types.StructType /** @@ -76,8 +76,7 @@ case class FlintSparkSkippingFileIndex( partitions .flatMap(_.files.map(f => f.getPath.toUri.toString)) .toDF(FILE_PATH_COLUMN) - .join(indexScan, Seq(FILE_PATH_COLUMN), "left") - .filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter)) + .join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "left_anti") .select(FILE_PATH_COLUMN) .collect() .map(_.getString(0)) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 90e7caecb..ac258f046 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -251,6 +251,20 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + test("should not rewrite original query if no where clause") { + val query = + s""" + | SELECT name + | FROM $testTable + |""".stripMargin + + val actual = sql(query).queryExecution.optimizedPlan + withFlintOptimizerDisabled { + val expect = sql(query).queryExecution.optimizedPlan + actual shouldBe expect + } + } + test("can build partition skipping index and rewrite applicable query") { flint .skippingIndex() @@ -394,7 +408,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } - test("should rewrite applicable query to scan latest source files if partial index") { + test("should rewrite applicable query to scan unknown source files if partial index") { flint .skippingIndex() .onTable(testTable) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 8975b7072..410def4bc 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -14,10 +14,11 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.must.Matchers.{defined, include} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.SimpleMode import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY @@ -55,21 +56,82 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } - test("create skipping index with filtering condition") { - sql(s""" - | CREATE SKIPPING INDEX ON $testTable - | ( - | year PARTITION, - | name VALUE_SET, - | age MIN_MAX - | ) - | WHERE address = 'Portland' - | WITH (auto_refresh = true) - | """.stripMargin) + test("create skipping index with auto refresh and filtering condition") { + val testTimeSeriesTable = "spark_catalog.default.partial_skipping_sql_test" + val testFlintTimeSeriesTable = getSkippingIndexName(testTimeSeriesTable) + + withTable(testTimeSeriesTable) { + createTimeSeriesTable(testTimeSeriesTable) + sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable + | ( address VALUE_SET ) + | WHERE time >= '2023-10-01 01:00:00' + | WITH (auto_refresh = true)""".stripMargin) + flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined + + // Only 2 rows indexed + val indexData = awaitStreamingDataComplete(testFlintTimeSeriesTable) + indexData.count() shouldBe 2 + + // Query without filter condition + sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 5 + + // File indexed should be included + var query = sql(s""" SELECT name FROM $testTimeSeriesTable + | WHERE time > '2023-10-01 00:05:00' + | AND address = 'Portland' """.stripMargin) + query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex") + checkAnswer(query, Seq(Row("C"), Row("D"))) + + // File not indexed should be included too + query = sql(s""" SELECT name FROM $testTimeSeriesTable + | WHERE time > '2023-10-01 00:05:00' + | AND address = 'Seattle' """.stripMargin) + query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex") + checkAnswer(query, Seq(Row("B"))) + + flint.deleteIndex(testFlintTimeSeriesTable) + } + } - val indexData = awaitStreamingDataComplete(testIndex) - flint.describeIndex(testIndex) shouldBe defined - indexData.count() shouldBe 1 + test("create skipping index with manual refresh and filtering condition") { + val testTimeSeriesTable = "spark_catalog.default.partial_skipping_sql_test" + val testFlintTimeSeriesTable = getSkippingIndexName(testTimeSeriesTable) + + withTable(testTimeSeriesTable) { + createTimeSeriesTable(testTimeSeriesTable) + sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable + | ( address VALUE_SET ) + | WHERE time >= '2023-10-01 01:00:00' AND age = 15 + | """.stripMargin) + sql(s"REFRESH SKIPPING INDEX ON $testTimeSeriesTable") + + // Only 2 rows indexed + flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined + val indexData = flint.queryIndex(testFlintTimeSeriesTable) + indexData.count() shouldBe 1 + + // File not indexed should be included too + sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 5 + var query = sql(s""" SELECT name FROM $testTimeSeriesTable + | WHERE time > '2023-10-01 00:05:00' + | AND address = 'Portland' """.stripMargin) + query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex") + checkAnswer(query, Seq(Row("C"), Row("D"))) + + // Generate new data + sql(s""" INSERT INTO $testTimeSeriesTable VALUES + | (TIMESTAMP '2023-10-01 04:00:00', 'F', 30, 'Vancouver')""".stripMargin) + + // Latest file should be included too without refresh + sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 6 + query = sql(s""" SELECT name FROM $testTimeSeriesTable + | WHERE time > '2023-10-01 00:05:00' + | AND address = 'Vancouver' """.stripMargin) + query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex") + checkAnswer(query, Seq(Row("E"), Row("F"))) + + flint.deleteIndex(testFlintTimeSeriesTable) + } } test("create skipping index with streaming job options") { From 98972e2cfe4669cbd668a597c25d283097e5c7dc Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 9 Nov 2023 16:34:28 -0800 Subject: [PATCH 10/13] Add more logging Signed-off-by: Chen Dai --- .../ApplyFlintSparkSkippingIndex.scala | 9 +++++++- .../FlintSparkSkippingFileIndex.scala | 22 ++++++++++++------- .../FlintSparkSkippingIndexSqlITSuite.scala | 2 +- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index eee5999ef..a9ebbb8dd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -9,6 +9,7 @@ import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{And, Expression, Predicate} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} @@ -25,7 +26,7 @@ import org.apache.spark.sql.flint.qualifyTableName * @param flint * Flint Spark API */ -class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] { +class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] with Logging { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter( // TODO: abstract pattern match logic for different table support @@ -36,8 +37,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] Some(table), false)) if isConjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] => + logInfo(s"Applying skipping index rewrite rule on filter condition $filter") val index = flint.describeIndex(getIndexName(table)) + if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { + logInfo(s"Found skipping index $index") val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] val indexFilter = rewriteToIndexFilter(skippingIndex, condition) @@ -49,6 +53,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] * |- FileIndex <== replaced with FlintSkippingFileIndex */ if (indexFilter.isDefined) { + logInfo(s"Found filter condition can be pushed down to skipping index: $indexFilter") // Enforce hybrid scan if skipping index is partial val isHybridScan = if (skippingIndex.filterCondition.isDefined) true @@ -60,9 +65,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] val indexRelation = baseRelation.copy(location = fileIndex)(baseRelation.sparkSession) filter.copy(child = relation.copy(relation = indexRelation)) } else { + logInfo("No filter condition can be pushed down to skipping index") filter } } else { + logInfo("No skipping index found for query rewrite") filter } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index b53324f4c..81d2d9d36 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping import org.apache.hadoop.fs.{FileStatus, Path} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN +import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} @@ -28,7 +29,8 @@ case class FlintSparkSkippingFileIndex( indexScan: DataFrame, indexFilter: Expression, isHybridScanMode: Boolean = FlintSparkConf().isHybridScanEnabled) - extends FileIndex { + extends FileIndex + with Logging { override def listFiles( partitionFilters: Seq[Expression], @@ -42,6 +44,7 @@ case class FlintSparkSkippingFileIndex( } else { selectFilesFromIndexOnly() } + logInfo(s"${selectedFiles.size} source files to scan after skipping") // Keep partition files present in selected file list above partitions @@ -62,21 +65,23 @@ case class FlintSparkSkippingFileIndex( /* * Left join source partitions and index data to keep unknown source files: * Express the logic in SQL: - * SELECT left.file_path - * FROM partitions AS left - * LEFT JOIN indexScan AS right - * ON left.file_path = right.file_path - * WHERE right.file_path IS NULL - * OR [indexFilter] + * SELECT file_path + * FROM partitions + * WHERE file_path NOT IN ( + * SELECT file_path + * FROM indexScan + * WHERE NOT [indexFilter] + * ) */ private def selectFilesFromIndexAndSource(partitions: Seq[PartitionDirectory]): Set[String] = { val sparkSession = indexScan.sparkSession import sparkSession.implicits._ + logInfo("Selecting files from both skipping index and source in hybrid scan mode") partitions .flatMap(_.files.map(f => f.getPath.toUri.toString)) .toDF(FILE_PATH_COLUMN) - .join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "left_anti") + .join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "anti") .select(FILE_PATH_COLUMN) .collect() .map(_.getString(0)) @@ -88,6 +93,7 @@ case class FlintSparkSkippingFileIndex( * to index store. */ private def selectFilesFromIndexOnly(): Set[String] = { + logInfo("Selecting files from skipping index only") indexScan .filter(new Column(indexFilter)) .select(FILE_PATH_COLUMN) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 410def4bc..b119d6008 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -105,7 +105,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | """.stripMargin) sql(s"REFRESH SKIPPING INDEX ON $testTimeSeriesTable") - // Only 2 rows indexed + // Only 1 rows indexed flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined val indexData = flint.queryIndex(testFlintTimeSeriesTable) indexData.count() shouldBe 1 From 201cb0932c25c435ab9a5283dfaa656ec03698d0 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 10 Nov 2023 10:19:53 -0800 Subject: [PATCH 11/13] Restrict column has to be partitioned column Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexBuilder.scala | 7 +- .../skipping/FlintSparkSkippingIndex.scala | 11 +- .../spark/FlintSparkIndexBuilderSuite.scala | 137 ++++++++++++------ .../FlintSparkSkippingIndexSqlITSuite.scala | 28 +++- .../flint/spark/FlintSparkSuite.scala | 44 ++++++ 5 files changed, 171 insertions(+), 56 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index d429244d1..f8cdc44d9 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -35,7 +35,8 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { throw new IllegalStateException(s"Table $qualifiedTableName is not found")) val allFields = table.schema().fields - allFields.map { field => field.name -> convertFieldToColumn(field) }.toMap + val partitionFields = table.partitioning().map(_.arguments().mkString(",")).toSet + allFields.map { field => field.name -> convertFieldToColumn(field, partitionFields) }.toMap } /** @@ -87,7 +88,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { colName, throw new IllegalArgumentException(s"Column $colName does not exist")) - private def convertFieldToColumn(field: StructField): Column = { + private def convertFieldToColumn(field: StructField, partitionFields: Set[String]): Column = { // Ref to CatalogImpl.listColumns(): Varchar/Char is StringType with real type name in metadata new Column( name = field.name, @@ -95,7 +96,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { dataType = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType).catalogString, nullable = field.nullable, - isPartition = false, // useless for now so just set to false + isPartition = partitionFields.contains(field.name), isBucket = false) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 021707d68..0c78736b9 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -18,8 +18,9 @@ import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression -import org.apache.spark.sql.functions.{col, input_file_name, sha1} +import org.apache.spark.sql.functions.{col, expr, input_file_name, sha1} /** * Flint skipping index in Spark. @@ -204,6 +205,14 @@ object FlintSparkSkippingIndex { * index builder */ def filterBy(condition: String): Builder = { + expr(condition).expr.foreach { + case colName: UnresolvedAttribute => + require( + findColumn(colName.name).isPartition, + s"${colName.name} is not partitioned column and cannot be used in index filtering condition") + case _ => + } + filterCondition = Some(condition) this } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala index 0cd4a5293..0830b4b08 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala @@ -12,68 +12,106 @@ import org.apache.spark.FlintSuite class FlintSparkIndexBuilderSuite extends FlintSuite { - override def beforeAll(): Unit = { - super.beforeAll() - - sql(""" - | CREATE TABLE spark_catalog.default.test - | ( name STRING, age INT ) - | USING JSON + test("should return all columns") { + val testTable = "spark_catalog.default.index_builder_test" + withTable(testTable) { + sql(s""" + | CREATE TABLE $testTable + | ( name STRING, age INT ) + | USING JSON """.stripMargin) + + builder() + .onTable(testTable) + .expectAllColumns("name", "age") + } } - protected override def afterAll(): Unit = { - sql("DROP TABLE spark_catalog.default.test") + test("should return all partitioned columns") { + val testTable = "spark_catalog.default.index_builder_test" + withTable(testTable) { + sql(s""" + | CREATE TABLE $testTable + | ( name STRING, age INT ) + | USING JSON + | PARTITIONED BY ( year INT, month INT ) + """.stripMargin) - super.afterAll() + builder() + .onTable(testTable) + .expectAllColumns("year", "month", "name", "age") + .expectPartitionColumns("year", "month") + } } test("should qualify table name in default database") { - builder() - .onTable("test") - .expectTableName("spark_catalog.default.test") - .expectAllColumns("name", "age") - - builder() - .onTable("default.test") - .expectTableName("spark_catalog.default.test") - .expectAllColumns("name", "age") - - builder() - .onTable("spark_catalog.default.test") - .expectTableName("spark_catalog.default.test") - .expectAllColumns("name", "age") - } - - test("should qualify table name and get columns in other database") { - sql("CREATE DATABASE mydb") - sql("CREATE TABLE mydb.test2 (address STRING) USING JSON") - sql("USE mydb") - - try { - builder() - .onTable("test2") - .expectTableName("spark_catalog.mydb.test2") - .expectAllColumns("address") + val testTable = "spark_catalog.default.test" + withTable(testTable) { + sql(s""" + | CREATE TABLE $testTable + | ( name STRING, age INT ) + | USING JSON + """.stripMargin) builder() - .onTable("mydb.test2") - .expectTableName("spark_catalog.mydb.test2") - .expectAllColumns("address") + .onTable("test") + .expectTableName("spark_catalog.default.test") + .expectAllColumns("name", "age") builder() - .onTable("spark_catalog.mydb.test2") - .expectTableName("spark_catalog.mydb.test2") - .expectAllColumns("address") + .onTable("default.test") + .expectTableName("spark_catalog.default.test") + .expectAllColumns("name", "age") - // Can parse any specified table name builder() .onTable("spark_catalog.default.test") .expectTableName("spark_catalog.default.test") .expectAllColumns("name", "age") - } finally { - sql("DROP DATABASE mydb CASCADE") - sql("USE default") + } + } + + test("should qualify table name and get columns in other database") { + withDatabase("mydb") { + val testTable = "spark_catalog.default.index_builder_test" + withTable(testTable) { + // Create a table in default database + sql(s""" + | CREATE TABLE $testTable + | ( name STRING, age INT ) + | USING JSON + """.stripMargin) + + // Create another database and table and switch database + sql("CREATE DATABASE mydb") + sql("CREATE TABLE mydb.test2 (address STRING) USING JSON") + sql("USE mydb") + + builder() + .onTable("test2") + .expectTableName("spark_catalog.mydb.test2") + .expectAllColumns("address") + + builder() + .onTable("mydb.test2") + .expectTableName("spark_catalog.mydb.test2") + .expectAllColumns("address") + + builder() + .onTable("spark_catalog.mydb.test2") + .expectTableName("spark_catalog.mydb.test2") + .expectAllColumns("address") + + // Can parse any specified table name + builder() + .onTable(testTable) + .expectTableName(testTable) + .expectAllColumns("name", "age") + + builder() + .onTable("default.index_builder_test") + .expectTableName(testTable) + .expectAllColumns("name", "age") + } } } @@ -101,6 +139,13 @@ class FlintSparkIndexBuilderSuite extends FlintSuite { this } + def expectPartitionColumns(expected: String*): FakeFlintSparkIndexBuilder = { + allColumns.values + .filter(_.isPartition) + .map(_.name) should contain theSameElementsAs expected + this + } + override protected def buildIndex(): FlintSparkIndex = { null } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index b119d6008..46f837a69 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -61,10 +61,10 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { val testFlintTimeSeriesTable = getSkippingIndexName(testTimeSeriesTable) withTable(testTimeSeriesTable) { - createTimeSeriesTable(testTimeSeriesTable) + createPartitionedTimeSeriesTable(testTimeSeriesTable) sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable | ( address VALUE_SET ) - | WHERE time >= '2023-10-01 01:00:00' + | WHERE hour >= 1 | WITH (auto_refresh = true)""".stripMargin) flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined @@ -98,10 +98,10 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { val testFlintTimeSeriesTable = getSkippingIndexName(testTimeSeriesTable) withTable(testTimeSeriesTable) { - createTimeSeriesTable(testTimeSeriesTable) + createPartitionedTimeSeriesTable(testTimeSeriesTable) sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable | ( address VALUE_SET ) - | WHERE time >= '2023-10-01 01:00:00' AND age = 15 + | WHERE hour >= 3 | """.stripMargin) sql(s"REFRESH SKIPPING INDEX ON $testTimeSeriesTable") @@ -119,8 +119,9 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { checkAnswer(query, Seq(Row("C"), Row("D"))) // Generate new data - sql(s""" INSERT INTO $testTimeSeriesTable VALUES - | (TIMESTAMP '2023-10-01 04:00:00', 'F', 30, 'Vancouver')""".stripMargin) + sql(s""" INSERT INTO $testTimeSeriesTable + | PARTITION (year=2023, month=10, day=1, hour=4) + | VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 30, 'Vancouver')""".stripMargin) // Latest file should be included too without refresh sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 6 @@ -134,6 +135,21 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { } } + test( + "should fail if create skipping index with filtering condition on non-partitioned column") { + val testTimeSeriesTable = "spark_catalog.default.partial_skipping_sql_test" + withTable(testTimeSeriesTable) { + createPartitionedTimeSeriesTable(testTimeSeriesTable) + + assertThrows[IllegalArgumentException] { + sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable + | ( address VALUE_SET ) + | WHERE time >= '2023-10-01 01:00:00' + | WITH (auto_refresh = true)""".stripMargin) + } + } + } + test("create skipping index with streaming job options") { withTempDir { checkpointDir => sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 4b166ec6a..83d9b9583 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -120,4 +120,48 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland')") sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')") } + + protected def createPartitionedTimeSeriesTable(testTable: String): Unit = { + sql(s""" + | CREATE TABLE $testTable + | ( + | time TIMESTAMP, + | name STRING, + | age INT, + | address STRING + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + | PARTITIONED BY ( + | year INT, + | month INT, + | day INT, + | hour INT + | ) + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable PARTITION (year=2023, month=10, day=1, hour=0) + | VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 'Seattle') + | """.stripMargin) + sql(s""" + | INSERT INTO $testTable PARTITION (year=2023, month=10, day=1, hour=0) + | VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 'Seattle') + | """.stripMargin) + sql(s""" + | INSERT INTO $testTable PARTITION (year=2023, month=10, day=1, hour=0) + | VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 'Portland') + | """.stripMargin) + sql(s""" + | INSERT INTO $testTable PARTITION (year=2023, month=10, day=1, hour=1) + | VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland') + | """.stripMargin) + sql(s""" + | INSERT INTO $testTable PARTITION (year=2023, month=10, day=1, hour=3) + | VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver') + | """.stripMargin) + } } From a7c21646a261f1d91f21ea74f73d2211699d108c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 10 Nov 2023 10:23:21 -0800 Subject: [PATCH 12/13] Update user manual Signed-off-by: Chen Dai --- docs/index.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/index.md b/docs/index.md index 411877c30..04b5caf0c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -118,6 +118,8 @@ High level API is dependent on query engine implementation. Please see Query Eng #### Skipping Index +Note that the filtering condition must be conjunction with only partitioned column involved. + ```sql CREATE SKIPPING INDEX [IF NOT EXISTS] ON @@ -158,6 +160,8 @@ DROP SKIPPING INDEX ON alb_logs #### Covering Index +Note that the filtering condition must be conjunction. + ```sql CREATE INDEX [IF NOT EXISTS] name ON ( column [, ...] ) From 422c46d74946810ef61a65cd6e2ef81ec830e81f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 10 Nov 2023 10:46:14 -0800 Subject: [PATCH 13/13] Fix broken IT Signed-off-by: Chen Dai --- .../flint/spark/covering/FlintSparkCoveringIndex.scala | 4 +++- .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 4 +++- .../flint/spark/FlintSparkSkippingIndexITSuite.scala | 4 ++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index f1d5481fa..82e599afd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -35,7 +35,9 @@ case class FlintSparkCoveringIndex( extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") - require(filterCondition.forall(isConjunction), "filtering condition must be conjunction") + require( + filterCondition.forall(isConjunction), + s"filtering condition $filterCondition must be conjunction") override val kind: String = COVERING_INDEX_TYPE diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 0c78736b9..6313158b7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -38,7 +38,9 @@ case class FlintSparkSkippingIndex( extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") - require(filterCondition.forall(isConjunction), "filtering condition must be conjunction") + require( + filterCondition.forall(isConjunction), + s"filtering condition $filterCondition must be conjunction") /** Skipping index type */ override val kind: String = SKIPPING_INDEX_TYPE diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index ac258f046..bb67ff9cc 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -48,7 +48,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .addValueSet("address") .addMinMax("age") - .filterBy("age > 30") + .filterBy("year = 2023") .create() val index = flint.describeIndex(testIndex) @@ -82,7 +82,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "source": "spark_catalog.default.test", | "options": { "auto_refresh": "false" }, | "properties": { - | "filterCondition": "age > 30" + | "filterCondition": "year = 2023" | } | }, | "properties": {