Skip to content

Commit

Permalink
remove query rewrite for LogsTable skipping index (#552) (#553)
Browse files Browse the repository at this point in the history
(cherry picked from commit 61c9620)

Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 7b43ff2 commit 4ee247d
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 54 deletions.
4 changes: 0 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter { file => file.data.getName.contains("LogsConnectorSpark")}
},
assembly / test := (Test / test).value)

lazy val IntegrationTest = config("it") extend Test
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@

package org.opensearch.flint.spark.skipping

import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or, Predicate}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.qualifyTableName

/**
Expand Down Expand Up @@ -62,46 +58,6 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
} else {
filter
}
case filter @ Filter(
condition: Predicate,
relation @ DataSourceV2Relation(table, _, Some(catalog), Some(identifier), _))
if hasNoDisjunction(condition) &&
// Check if query plan already rewritten
table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() =>
val index = flint.describeIndex(getIndexName(catalog, identifier))
if (isActiveSkippingIndex(index)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)
/*
* Replace original LogsTable with a new one with file index scan:
* Filter(a=b)
* |- DataSourceV2Relation(A)
* |- LogsTable <== replaced with a new LogsTable with file index scan
*/
if (indexFilter.isDefined) {
val indexScan = flint.queryIndex(skippingIndex.name())
val selectFileIndexScan =
// Non hybrid scan
// TODO: refactor common logic with file-based skipping index
indexScan
.filter(new Column(indexFilter.get))
.select(FILE_PATH_COLUMN)

// Construct LogsTable with file index scan
// It will build scan operator using log file ids collected from file index scan
val logsTable = table.asInstanceOf[LogsTable]
val newTable = new LogsTable(
logsTable.schema(),
logsTable.options(),
selectFileIndexScan,
logsTable.processedFields())
filter.copy(child = relation.copy(table = newTable))
} else {
filter
}
} else {
filter
}
}

private def getIndexName(table: CatalogTable): String = {
Expand All @@ -112,11 +68,6 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
getSkippingIndexName(qualifiedTableName)
}

private def getIndexName(catalog: CatalogPlugin, identifier: Identifier): String = {
val qualifiedTableName = s"${catalog.name}.${identifier}"
getSkippingIndexName(qualifiedTableName)
}

private def hasNoDisjunction(condition: Expression): Boolean = {
condition.collectFirst { case Or(_, _) =>
true
Expand Down

0 comments on commit 4ee247d

Please sign in to comment.