Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query rewrite for LogsTable skipping index #154

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter { file => file.data.getName.contains("LogsConnectorSpark")}
},
assembly / test := (Test / test).value)

// Test assembly package with integration test.
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package org.opensearch.flint.spark.skipping

import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or, Predicate}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.qualifyTableName

/**
Expand Down Expand Up @@ -57,6 +62,46 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
} else {
filter
}
case filter @ Filter(
condition: Predicate,
relation @ DataSourceV2Relation(table, _, Some(catalog), Some(identifier), _))
if hasNoDisjunction(condition) &&
// Check if query plan already rewritten
table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() =>
val index = flint.describeIndex(getIndexName(catalog, identifier))
if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)
/*
* Replace original LogsTable with a new one with file index scan:
* Filter(a=b)
* |- DataSourceV2Relation(A)
* |- LogsTable <== replaced with a new LogsTable with file index scan
*/
if (indexFilter.isDefined) {
val indexScan = flint.queryIndex(skippingIndex.name())
val selectFileIndexScan =
// Non hybrid scan
// TODO: refactor common logic with file-based skipping index
indexScan
.filter(new Column(indexFilter.get))
.select(FILE_PATH_COLUMN)

// Construct LogsTable with file index scan
// It will build scan operator using log file ids collected from file index scan
val logsTable = table.asInstanceOf[LogsTable]
val newTable = new LogsTable(
logsTable.schema(),
logsTable.options(),
selectFileIndexScan,
logsTable.processedFields())
filter.copy(child = relation.copy(table = newTable))
} else {
filter
}
} else {
filter
}
}

private def getIndexName(table: CatalogTable): String = {
Expand All @@ -67,6 +112,11 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
getSkippingIndexName(qualifiedTableName)
}

private def getIndexName(catalog: CatalogPlugin, identifier: Identifier): String = {
val qualifiedTableName = s"${catalog.name}.${identifier}"
getSkippingIndexName(qualifiedTableName)
}

private def hasNoDisjunction(condition: Expression): Boolean = {
condition.collectFirst { case Or(_, _) =>
true
Expand Down
Loading