Skip to content

Commit

Permalink
Add more logging
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 10, 2023
1 parent e61f3dd commit 98972e2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Predicate}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
Expand All @@ -25,7 +26,7 @@ import org.apache.spark.sql.flint.qualifyTableName
* @param flint
* Flint Spark API
*/
class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] {
class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] with Logging {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter( // TODO: abstract pattern match logic for different table support
Expand All @@ -36,8 +37,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
Some(table),
false))
if isConjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] =>
logInfo(s"Applying skipping index rewrite rule on filter condition $filter")
val index = flint.describeIndex(getIndexName(table))

if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
logInfo(s"Found skipping index $index")
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)

Expand All @@ -49,6 +53,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
* |- FileIndex <== replaced with FlintSkippingFileIndex
*/
if (indexFilter.isDefined) {
logInfo(s"Found filter condition can be pushed down to skipping index: $indexFilter")
// Enforce hybrid scan if skipping index is partial
val isHybridScan =
if (skippingIndex.filterCondition.isDefined) true
Expand All @@ -60,9 +65,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
val indexRelation = baseRelation.copy(location = fileIndex)(baseRelation.sparkSession)
filter.copy(child = relation.copy(relation = indexRelation))
} else {
logInfo("No filter condition can be pushed down to skipping index")
filter
}
} else {
logInfo("No skipping index found for query rewrite")
filter
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping
import org.apache.hadoop.fs.{FileStatus, Path}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
Expand All @@ -28,7 +29,8 @@ case class FlintSparkSkippingFileIndex(
indexScan: DataFrame,
indexFilter: Expression,
isHybridScanMode: Boolean = FlintSparkConf().isHybridScanEnabled)
extends FileIndex {
extends FileIndex
with Logging {

override def listFiles(
partitionFilters: Seq[Expression],
Expand All @@ -42,6 +44,7 @@ case class FlintSparkSkippingFileIndex(
} else {
selectFilesFromIndexOnly()
}
logInfo(s"${selectedFiles.size} source files to scan after skipping")

// Keep partition files present in selected file list above
partitions
Expand All @@ -62,21 +65,23 @@ case class FlintSparkSkippingFileIndex(
/*
* Left join source partitions and index data to keep unknown source files:
* Express the logic in SQL:
* SELECT left.file_path
* FROM partitions AS left
* LEFT JOIN indexScan AS right
* ON left.file_path = right.file_path
* WHERE right.file_path IS NULL
* OR [indexFilter]
* SELECT file_path
* FROM partitions
* WHERE file_path NOT IN (
* SELECT file_path
* FROM indexScan
* WHERE NOT [indexFilter]
* )
*/
private def selectFilesFromIndexAndSource(partitions: Seq[PartitionDirectory]): Set[String] = {
val sparkSession = indexScan.sparkSession
import sparkSession.implicits._

logInfo("Selecting files from both skipping index and source in hybrid scan mode")
partitions
.flatMap(_.files.map(f => f.getPath.toUri.toString))
.toDF(FILE_PATH_COLUMN)
.join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "left_anti")
.join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "anti")
.select(FILE_PATH_COLUMN)
.collect()
.map(_.getString(0))
Expand All @@ -88,6 +93,7 @@ case class FlintSparkSkippingFileIndex(
* to index store.
*/
private def selectFilesFromIndexOnly(): Set[String] = {
logInfo("Selecting files from skipping index only")
indexScan
.filter(new Column(indexFilter))
.select(FILE_PATH_COLUMN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
| """.stripMargin)
sql(s"REFRESH SKIPPING INDEX ON $testTimeSeriesTable")

// Only 2 rows indexed
// Only 1 rows indexed
flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined
val indexData = flint.queryIndex(testFlintTimeSeriesTable)
indexData.count() shouldBe 1
Expand Down

0 comments on commit 98972e2

Please sign in to comment.