diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index eee5999ef..a9ebbb8dd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -9,6 +9,7 @@ import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{And, Expression, Predicate} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} @@ -25,7 +26,7 @@ import org.apache.spark.sql.flint.qualifyTableName * @param flint * Flint Spark API */ -class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] { +class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] with Logging { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter( // TODO: abstract pattern match logic for different table support @@ -36,8 +37,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] Some(table), false)) if isConjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] => + logInfo(s"Applying skipping index rewrite rule on filter condition $filter") val index = flint.describeIndex(getIndexName(table)) + if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { + logInfo(s"Found skipping index $index") val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] val indexFilter = rewriteToIndexFilter(skippingIndex, condition) @@ -49,6 +53,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] * |- FileIndex <== replaced with FlintSkippingFileIndex */ if (indexFilter.isDefined) { + logInfo(s"Found filter condition can be pushed down to skipping index: $indexFilter") // Enforce hybrid scan if skipping index is partial val isHybridScan = if (skippingIndex.filterCondition.isDefined) true @@ -60,9 +65,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] val indexRelation = baseRelation.copy(location = fileIndex)(baseRelation.sparkSession) filter.copy(child = relation.copy(relation = indexRelation)) } else { + logInfo("No filter condition can be pushed down to skipping index") filter } } else { + logInfo("No skipping index found for query rewrite") filter } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index b53324f4c..81d2d9d36 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping import org.apache.hadoop.fs.{FileStatus, Path} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN +import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} @@ -28,7 +29,8 @@ case class FlintSparkSkippingFileIndex( indexScan: DataFrame, indexFilter: Expression, isHybridScanMode: Boolean = FlintSparkConf().isHybridScanEnabled) - extends FileIndex { + extends FileIndex + with Logging { override def listFiles( partitionFilters: Seq[Expression], @@ -42,6 +44,7 @@ case class FlintSparkSkippingFileIndex( } else { selectFilesFromIndexOnly() } + logInfo(s"${selectedFiles.size} source files to scan after skipping") // Keep partition files present in selected file list above partitions @@ -62,21 +65,23 @@ case class FlintSparkSkippingFileIndex( /* * Left join source partitions and index data to keep unknown source files: * Express the logic in SQL: - * SELECT left.file_path - * FROM partitions AS left - * LEFT JOIN indexScan AS right - * ON left.file_path = right.file_path - * WHERE right.file_path IS NULL - * OR [indexFilter] + * SELECT file_path + * FROM partitions + * WHERE file_path NOT IN ( + * SELECT file_path + * FROM indexScan + * WHERE NOT [indexFilter] + * ) */ private def selectFilesFromIndexAndSource(partitions: Seq[PartitionDirectory]): Set[String] = { val sparkSession = indexScan.sparkSession import sparkSession.implicits._ + logInfo("Selecting files from both skipping index and source in hybrid scan mode") partitions .flatMap(_.files.map(f => f.getPath.toUri.toString)) .toDF(FILE_PATH_COLUMN) - .join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "left_anti") + .join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "anti") .select(FILE_PATH_COLUMN) .collect() .map(_.getString(0)) @@ -88,6 +93,7 @@ case class FlintSparkSkippingFileIndex( * to index store. */ private def selectFilesFromIndexOnly(): Set[String] = { + logInfo("Selecting files from skipping index only") indexScan .filter(new Column(indexFilter)) .select(FILE_PATH_COLUMN) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 410def4bc..b119d6008 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -105,7 +105,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | """.stripMargin) sql(s"REFRESH SKIPPING INDEX ON $testTimeSeriesTable") - // Only 2 rows indexed + // Only 1 rows indexed flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined val indexData = flint.queryIndex(testFlintTimeSeriesTable) indexData.count() shouldBe 1