Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partial indexing support for skipping index #124

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ High level API is dependent on query engine implementation. Please see Query Eng

#### Skipping Index

Note that the filtering condition must be conjunction with only partitioned column involved.

```sql
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
Expand Down Expand Up @@ -158,6 +160,8 @@ DROP SKIPPING INDEX ON alb_logs

#### Covering Index

Note that the filtering condition must be conjunction.

```sql
CREATE INDEX [IF NOT EXISTS] name ON <object>
( column [, ...] )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
throw new IllegalStateException(s"Table $qualifiedTableName is not found"))

val allFields = table.schema().fields
allFields.map { field => field.name -> convertFieldToColumn(field) }.toMap
val partitionFields = table.partitioning().map(_.arguments().mkString(",")).toSet
allFields.map { field => field.name -> convertFieldToColumn(field, partitionFields) }.toMap
}

/**
Expand Down Expand Up @@ -87,15 +88,15 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
colName,
throw new IllegalArgumentException(s"Column $colName does not exist"))

private def convertFieldToColumn(field: StructField): Column = {
private def convertFieldToColumn(field: StructField, partitionFields: Set[String]): Column = {
// Ref to CatalogImpl.listColumns(): Varchar/Char is StringType with real type name in metadata
new Column(
name = field.name,
description = field.getComment().orNull,
dataType =
CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType).catalogString,
nullable = field.nullable,
isPartition = false, // useless for now so just set to false
isPartition = partitionFields.contains(field.name),
isBucket = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ object FlintSparkIndexFactory {
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
}
FlintSparkSkippingIndex(metadata.source, strategies, indexOptions)
FlintSparkSkippingIndex(
metadata.source,
strategies,
getOptString(metadata.properties, "filterCondition"),
indexOptions)
case COVERING_INDEX_TYPE =>
FlintSparkCoveringIndex(
metadata.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.spark.sql.catalyst.expressions.{Expression, Or}
import org.apache.spark.sql.functions.expr

/**
* Flint Spark index utility methods.
*/
object FlintSparkIndexUtils {

/**
* Is the given Spark predicate string a conjunction
*
* @param condition
* predicate condition string
* @return
* true if yes, otherwise false
*/
def isConjunction(condition: String): Boolean = {
isConjunction(expr(condition).expr)
}

/**
* Is the given Spark predicate a conjunction
*
* @param condition
* predicate condition
* @return
* true if yes, otherwise false
*/
def isConjunction(condition: Expression): Boolean = {
condition.collectFirst { case Or(_, _) =>
true
}.isEmpty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

import org.apache.spark.sql._
Expand All @@ -34,6 +35,9 @@ case class FlintSparkCoveringIndex(
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
require(
filterCondition.forall(isConjunction),
s"filtering condition $filterCondition must be conjunction")

override val kind: String = COVERING_INDEX_TYPE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package org.opensearch.flint.spark.skipping

import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or, Predicate}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Predicate}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
Expand All @@ -28,7 +30,7 @@ import org.apache.spark.sql.flint.qualifyTableName
* @param flint
* Flint Spark API
*/
class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] {
class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] with Logging {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter( // TODO: abstract pattern match logic for different table support
Expand All @@ -38,9 +40,12 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
_,
Some(table),
false))
if hasNoDisjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] =>
if isConjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] =>
logInfo(s"Applying skipping index rewrite rule on filter condition $filter")
val index = flint.describeIndex(getIndexName(table))

if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
logInfo(s"Found skipping index $index")
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)

Expand All @@ -52,20 +57,29 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
* |- FileIndex <== replaced with FlintSkippingFileIndex
*/
if (indexFilter.isDefined) {
logInfo(s"Found filter condition can be pushed down to skipping index: $indexFilter")
// Enforce hybrid scan if skipping index is partial
val isHybridScan =
if (skippingIndex.filterCondition.isDefined) true
else FlintSparkConf().isHybridScanEnabled

val indexScan = flint.queryIndex(skippingIndex.name())
val fileIndex = FlintSparkSkippingFileIndex(location, indexScan, indexFilter.get)
val fileIndex =
FlintSparkSkippingFileIndex(location, indexScan, indexFilter.get, isHybridScan)
val indexRelation = baseRelation.copy(location = fileIndex)(baseRelation.sparkSession)
filter.copy(child = relation.copy(relation = indexRelation))
} else {
logInfo("No filter condition can be pushed down to skipping index")
filter
}
} else {
logInfo("No skipping index found for query rewrite")
filter
}
case filter @ Filter(
condition: Predicate,
relation @ DataSourceV2Relation(table, _, Some(catalog), Some(identifier), _))
if hasNoDisjunction(condition) &&
if isConjunction(condition) &&
// Check if query plan already rewritten
table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() =>
val index = flint.describeIndex(getIndexName(catalog, identifier))
Expand Down Expand Up @@ -117,12 +131,6 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
getSkippingIndexName(qualifiedTableName)
}

private def hasNoDisjunction(condition: Expression): Boolean = {
condition.collectFirst { case Or(_, _) =>
true
}.isEmpty
}

private def rewriteToIndexFilter(
index: FlintSparkSkippingIndex,
condition: Expression): Option[Expression] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ package org.opensearch.flint.spark.skipping
import org.apache.hadoop.fs.{FileStatus, Path}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.functions.isnull
import org.apache.spark.sql.functions.not
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -26,8 +27,10 @@ import org.apache.spark.sql.types.StructType
case class FlintSparkSkippingFileIndex(
baseFileIndex: FileIndex,
indexScan: DataFrame,
indexFilter: Expression)
extends FileIndex {
indexFilter: Expression,
isHybridScanMode: Boolean = FlintSparkConf().isHybridScanEnabled)
extends FileIndex
with Logging {

override def listFiles(
partitionFilters: Seq[Expression],
Expand All @@ -36,11 +39,12 @@ case class FlintSparkSkippingFileIndex(
// TODO: make this listFile call only in hybrid scan mode
val partitions = baseFileIndex.listFiles(partitionFilters, dataFilters)
val selectedFiles =
if (FlintSparkConf().isHybridScanEnabled) {
if (isHybridScanMode) {
selectFilesFromIndexAndSource(partitions)
} else {
selectFilesFromIndexOnly()
}
logInfo(s"${selectedFiles.size} source files to scan after skipping")

// Keep partition files present in selected file list above
partitions
Expand All @@ -61,22 +65,23 @@ case class FlintSparkSkippingFileIndex(
/*
* Left join source partitions and index data to keep unknown source files:
* Express the logic in SQL:
* SELECT left.file_path
* FROM partitions AS left
* LEFT JOIN indexScan AS right
* ON left.file_path = right.file_path
* WHERE right.file_path IS NULL
* OR [indexFilter]
* SELECT file_path
* FROM partitions
* WHERE file_path NOT IN (
* SELECT file_path
* FROM indexScan
* WHERE NOT [indexFilter]
* )
*/
private def selectFilesFromIndexAndSource(partitions: Seq[PartitionDirectory]): Set[String] = {
val sparkSession = indexScan.sparkSession
import sparkSession.implicits._

logInfo("Selecting files from both skipping index and source in hybrid scan mode")
partitions
.flatMap(_.files.map(f => f.getPath.toUri.toString))
.toDF(FILE_PATH_COLUMN)
.join(indexScan, Seq(FILE_PATH_COLUMN), "left")
.filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter))
.join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "anti")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • left join still works if flint-core can support array types, right?
  • the reason we use left anti join is performance consideration, right? if yes, could we add test to guardian it?

Copy link
Collaborator Author

@dai-chen dai-chen Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on current understanding, Anti Semi join seems required. Even if we support push down optimization for array, the OR condition in previous Left Outer join cannot be pushed down to skipping index.

   *   SELECT left.file_path
   *   FROM partitions AS left
   *   LEFT JOIN indexScan AS right
   *     ON left.file_path = right.file_path
   *   WHERE right.file_path IS NULL
   *     OR [indexFilter]

.select(FILE_PATH_COLUMN)
.collect()
.map(_.getString(0))
Expand All @@ -88,6 +93,7 @@ case class FlintSparkSkippingFileIndex(
* to index store.
*/
private def selectFilesFromIndexOnly(): Set[String] = {
logInfo("Selecting files from skipping index only")
indexScan
.filter(new Column(indexFilter))
.select(FILE_PATH_COLUMN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex._
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.FlintSparkIndexUtils.isConjunction
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression
import org.apache.spark.sql.functions.{col, input_file_name, sha1}
import org.apache.spark.sql.functions.{col, expr, input_file_name, sha1}

/**
* Flint skipping index in Spark.
Expand All @@ -31,10 +33,14 @@ import org.apache.spark.sql.functions.{col, input_file_name, sha1}
case class FlintSparkSkippingIndex(
tableName: String,
indexedColumns: Seq[FlintSparkSkippingStrategy],
filterCondition: Option[String] = None,
override val options: FlintSparkIndexOptions = empty)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
require(
filterCondition.forall(isConjunction),
s"filtering condition $filterCondition must be conjunction")

/** Skipping index type */
override val kind: String = SKIPPING_INDEX_TYPE
Expand All @@ -59,12 +65,15 @@ case class FlintSparkSkippingIndex(
.toMap + (FILE_PATH_COLUMN -> "string")
val schemaJson = generateSchemaJSON(fieldTypes)

metadataBuilder(this)
.name(name())
val builder = metadataBuilder(this)
.name("") // skipping index is unique per table without name
.source(tableName)
.indexedColumns(indexColumnMaps)
.schema(schemaJson)
.build()

// Add optional index properties
filterCondition.map(builder.addProperty("filterCondition", _))
builder.build()
}

override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = {
Expand All @@ -77,7 +86,14 @@ case class FlintSparkSkippingIndex(
new Column(aggFunc.toAggregateExpression().as(name))
}

df.getOrElse(spark.read.table(tableName))
var job = df.getOrElse(spark.read.table(tableName))

// Add optional filtering condition
if (filterCondition.isDefined) {
job = job.where(filterCondition.get)
}

job
.groupBy(input_file_name().as(FILE_PATH_COLUMN))
.agg(namedAggFuncs.head, namedAggFuncs.tail: _*)
.withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN)))
Expand Down Expand Up @@ -118,6 +134,7 @@ object FlintSparkSkippingIndex {
/** Builder class for skipping index build */
class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) {
private var indexedColumns: Seq[FlintSparkSkippingStrategy] = Seq()
private var filterCondition: Option[String] = None

/**
* Configure which source table the index is based on.
Expand Down Expand Up @@ -181,8 +198,29 @@ object FlintSparkSkippingIndex {
this
}

/**
* Add filtering condition.
*
* @param condition
* filter condition
* @return
* index builder
*/
def filterBy(condition: String): Builder = {
expr(condition).expr.foreach {
case colName: UnresolvedAttribute =>
require(
findColumn(colName.name).isPartition,
s"${colName.name} is not partitioned column and cannot be used in index filtering condition")
case _ =>
}

filterCondition = Some(condition)
this
}

override def buildIndex(): FlintSparkIndex =
new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions)
new FlintSparkSkippingIndex(tableName, indexedColumns, filterCondition, indexOptions)

private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = {
require(
Expand Down
Loading
Loading