From 61c962097100018485e097b7bc2aeccba81ad2c1 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 9 Aug 2024 14:38:31 -0700 Subject: [PATCH] remove query rewrite for LogsTable skipping index (#552) Signed-off-by: Sean Kao --- build.sbt | 4 -- .../lib/LogsConnectorSpark-1.0.jar | Bin 2049 -> 0 bytes .../ApplyFlintSparkSkippingIndex.scala | 51 +----------------- 3 files changed, 1 insertion(+), 54 deletions(-) delete mode 100644 flint-spark-integration/lib/LogsConnectorSpark-1.0.jar diff --git a/build.sbt b/build.sbt index c373feecb..542086f2e 100644 --- a/build.sbt +++ b/build.sbt @@ -210,10 +210,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) }, - assembly / assemblyExcludedJars := { - val cp = (assembly / fullClasspath).value - cp filter { file => file.data.getName.contains("LogsConnectorSpark")} - }, assembly / test := (Test / test).value) lazy val IntegrationTest = config("it") extend Test diff --git a/flint-spark-integration/lib/LogsConnectorSpark-1.0.jar b/flint-spark-integration/lib/LogsConnectorSpark-1.0.jar deleted file mode 100644 index 0aa50bbb26e40dad5a9a5f53e56c5f94a06fc1cd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2049 zcmWIWW@h1HVBp|j(9g&VX8-~w5CH_7K2Rrz^%l`5esO)M|Y$xkm%Ni0cBOioTMF3w4;OfAA|f-X@eB4N?}3Mf=km*SM7Ib?p$#(!gQn8qWr^UI^DPLmZobzdjG-v$2(@l zi!S$TKd4Kd_nFSLbi%S@)8EZBem>{@?DuDu?*CWwoAH5|*rg7|c`i!L$}f^7A|A`{ zcv-ML%`QGtIPAoQ@J+GSF)1C-xlgkN8BFp>FA$MzoAmUmZ`||wHrJoLzVtC`(<$e3 zy51fyKky39nD%^E$Z_r^nzeF!j8BVvnY7-b$;&%C^or%-kel3DYTqq;7aRP%XLy)5 z-ea>68?Sba=eMS&_vaP58QYdTZ{POZt1TyTb6(4~WV@&P`c9=5FsV!S?cFN-@y6P& ze$npTJkPC^@2X!5YOyLcXjWvM@Riwrt#gR{twSqMnVjX-p6e!Y@7eB@J8o|azHEDN zs7NT?W6s=pdx}yIf8VPha`pK>jvYb|4!V11s9ur?T`QBk4lAn|1PjjvpT=;!?n_)q?4<;uKL{; zJumfDRM6$OO87UOcRHUBggrhnlOsGg>tE8(_-_ge`QGXJ`F1nZeZTftZ|ker74sf& zo#yGUy1V7=?MV+m|G&EIx~SZF@mnX1?qq#0-8+*#Y-i1m@4psUf8cfAuYK+6kGR8S ztCSK}$SSnA#VqQxD_dHyRQ8s94fA5mnfwp%}yB?V`d9QGr z@M-O2>*WFk^WHs?coF5!WcgL?;_7L^@0oudJJ}cL;b{SDcdJl+l)E-N{^W7V%M`5S{}{1knrTDT1tl0$Bttr zscDA77k8|RW?Hf@`O8*}Krhu{RxD{;PM!s5Wqg1)D;vm64j@bb K*7Dt~ARYh^@6faW diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index 8ce458055..3c14eb00d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -5,19 +5,15 @@ package org.opensearch.flint.spark.skipping -import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} -import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or, Predicate} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.flint.qualifyTableName /** @@ -62,46 +58,6 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] } else { filter } - case filter @ Filter( - condition: Predicate, - relation @ DataSourceV2Relation(table, _, Some(catalog), Some(identifier), _)) - if hasNoDisjunction(condition) && - // Check if query plan already rewritten - table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() => - val index = flint.describeIndex(getIndexName(catalog, identifier)) - if (isActiveSkippingIndex(index)) { - val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] - val indexFilter = rewriteToIndexFilter(skippingIndex, condition) - /* - * Replace original LogsTable with a new one with file index scan: - * Filter(a=b) - * |- DataSourceV2Relation(A) - * |- LogsTable <== replaced with a new LogsTable with file index scan - */ - if (indexFilter.isDefined) { - val indexScan = flint.queryIndex(skippingIndex.name()) - val selectFileIndexScan = - // Non hybrid scan - // TODO: refactor common logic with file-based skipping index - indexScan - .filter(new Column(indexFilter.get)) - .select(FILE_PATH_COLUMN) - - // Construct LogsTable with file index scan - // It will build scan operator using log file ids collected from file index scan - val logsTable = table.asInstanceOf[LogsTable] - val newTable = new LogsTable( - logsTable.schema(), - logsTable.options(), - selectFileIndexScan, - logsTable.processedFields()) - filter.copy(child = relation.copy(table = newTable)) - } else { - filter - } - } else { - filter - } } private def getIndexName(table: CatalogTable): String = { @@ -112,11 +68,6 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] getSkippingIndexName(qualifiedTableName) } - private def getIndexName(catalog: CatalogPlugin, identifier: Identifier): String = { - val qualifiedTableName = s"${catalog.name}.${identifier}" - getSkippingIndexName(qualifiedTableName) - } - private def hasNoDisjunction(condition: Expression): Boolean = { condition.collectFirst { case Or(_, _) => true