From 4d0f1fdffc9c427dea13fbb47e8de78aae9a79d2 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 3 May 2024 10:59:21 -0700 Subject: [PATCH] Add query rewrite logging Signed-off-by: Chen Dai --- .../ApplyFlintSparkCoveringIndex.scala | 36 ++++++++++++++----- .../FlintSparkSourceRelationProvider.scala | 6 +++- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala index b7fa22428..9a519b42b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala @@ -33,13 +33,15 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] private val supportedProviders = FlintSparkSourceRelationProvider.getAllProviders(flint.spark) override def apply(plan: LogicalPlan): LogicalPlan = { - if (plan.isInstanceOf[V2WriteCommand]) { + if (plan.isInstanceOf[V2WriteCommand]) { // TODO: bypass any non-select plan plan } else { + // Iterate each sub plan tree in the given plan plan transform { case subPlan => supportedProviders .collectFirst { case provider if provider.isSupported(subPlan) => + logInfo(s"Provider [${provider.name()}] can match sub plan ${subPlan.nodeName}") val relation = provider.getRelation(subPlan) val relationCols = collectRelationColumnsInQueryPlan(plan, relation) @@ -82,19 +84,35 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] private def findAllCoveringIndexesOnTable(tableName: String): Seq[FlintSparkCoveringIndex] = { val qualifiedTableName = qualifyTableName(flint.spark, tableName) val indexPattern = getFlintIndexName("*", qualifiedTableName) - flint - .describeIndexes(indexPattern) - .collect { // cast to covering index - case index: FlintSparkCoveringIndex => index - } + val indexes = + flint + .describeIndexes(indexPattern) + .collect { // cast to covering index + case index: FlintSparkCoveringIndex => index + } + + val indexNames = indexes.map(_.name()).mkString(",") + logInfo(s"Found covering index [$indexNames] on table $qualifiedTableName") + indexes } private def isCoveringIndexApplicable( index: FlintSparkCoveringIndex, relationCols: Set[String]): Boolean = { - index.latestLogEntry.exists(_.state != DELETED) && - index.filterCondition.isEmpty && // TODO: support partial covering index later - relationCols.subsetOf(index.indexedColumns.keySet) + val indexedCols = index.indexedColumns.keySet + val isApplicable = + index.latestLogEntry.exists(_.state != DELETED) && + index.filterCondition.isEmpty && // TODO: support partial covering index later + relationCols.subsetOf(indexedCols) + + logInfo(s""" + | Is covering index ${index.name()} applicable: $isApplicable + | Index state: ${index.latestLogEntry.map(_.state)} + | Index filter condition: ${index.filterCondition} + | Columns required: $relationCols + | Columns indexed: $indexedCols + |""".stripMargin) + isApplicable } private def replaceTableRelationWithIndexRelation( diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelationProvider.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelationProvider.scala index 414ab0c6a..4872d191f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelationProvider.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelationProvider.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.source import org.opensearch.flint.spark.source.file.FileSourceRelationProvider import org.opensearch.flint.spark.source.iceberg.IcebergSourceRelationProvider +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -50,7 +51,7 @@ trait FlintSparkSourceRelationProvider { /** * Companion object provides utility methods. */ -object FlintSparkSourceRelationProvider { +object FlintSparkSourceRelationProvider extends Logging { /** * Retrieve all supported source relation provider for the given Spark session. @@ -72,6 +73,9 @@ object FlintSparkSourceRelationProvider { .contains("org.apache.iceberg.spark.SparkSessionCatalog")) { relations = relations :+ new IcebergSourceRelationProvider } + + val providerNames = relations.map(_.name()).mkString(",") + logInfo(s"Loaded source relation providers [$providerNames]") relations } }