Skip to content

Commit

Permalink
Add query rewrite logging
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 3, 2024
1 parent 3b6a4a8 commit 4d0f1fd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
private val supportedProviders = FlintSparkSourceRelationProvider.getAllProviders(flint.spark)

override def apply(plan: LogicalPlan): LogicalPlan = {
if (plan.isInstanceOf[V2WriteCommand]) {
if (plan.isInstanceOf[V2WriteCommand]) { // TODO: bypass any non-select plan
plan
} else {
// Iterate each sub plan tree in the given plan
plan transform { case subPlan =>
supportedProviders
.collectFirst {
case provider if provider.isSupported(subPlan) =>
logInfo(s"Provider [${provider.name()}] can match sub plan ${subPlan.nodeName}")
val relation = provider.getRelation(subPlan)
val relationCols = collectRelationColumnsInQueryPlan(plan, relation)

Expand Down Expand Up @@ -82,19 +84,35 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
private def findAllCoveringIndexesOnTable(tableName: String): Seq[FlintSparkCoveringIndex] = {
val qualifiedTableName = qualifyTableName(flint.spark, tableName)
val indexPattern = getFlintIndexName("*", qualifiedTableName)
flint
.describeIndexes(indexPattern)
.collect { // cast to covering index
case index: FlintSparkCoveringIndex => index
}
val indexes =
flint
.describeIndexes(indexPattern)
.collect { // cast to covering index
case index: FlintSparkCoveringIndex => index
}

val indexNames = indexes.map(_.name()).mkString(",")
logInfo(s"Found covering index [$indexNames] on table $qualifiedTableName")
indexes
}

private def isCoveringIndexApplicable(
index: FlintSparkCoveringIndex,
relationCols: Set[String]): Boolean = {
index.latestLogEntry.exists(_.state != DELETED) &&
index.filterCondition.isEmpty && // TODO: support partial covering index later
relationCols.subsetOf(index.indexedColumns.keySet)
val indexedCols = index.indexedColumns.keySet
val isApplicable =
index.latestLogEntry.exists(_.state != DELETED) &&
index.filterCondition.isEmpty && // TODO: support partial covering index later
relationCols.subsetOf(indexedCols)

logInfo(s"""
| Is covering index ${index.name()} applicable: $isApplicable
| Index state: ${index.latestLogEntry.map(_.state)}
| Index filter condition: ${index.filterCondition}
| Columns required: $relationCols
| Columns indexed: $indexedCols
|""".stripMargin)
isApplicable
}

private def replaceTableRelationWithIndexRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.source
import org.opensearch.flint.spark.source.file.FileSourceRelationProvider
import org.opensearch.flint.spark.source.iceberg.IcebergSourceRelationProvider

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

Expand Down Expand Up @@ -50,7 +51,7 @@ trait FlintSparkSourceRelationProvider {
/**
* Companion object provides utility methods.
*/
object FlintSparkSourceRelationProvider {
object FlintSparkSourceRelationProvider extends Logging {

/**
* Retrieve all supported source relation provider for the given Spark session.
Expand All @@ -72,6 +73,9 @@ object FlintSparkSourceRelationProvider {
.contains("org.apache.iceberg.spark.SparkSessionCatalog")) {
relations = relations :+ new IcebergSourceRelationProvider
}

val providerNames = relations.map(_.name()).mkString(",")
logInfo(s"Loaded source relation providers [$providerNames]")
relations
}
}

0 comments on commit 4d0f1fd

Please sign in to comment.