diff --git a/build.sbt b/build.sbt index 4cb825f1a..938f19a64 100644 --- a/build.sbt +++ b/build.sbt @@ -141,7 +141,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) }, assembly / assemblyExcludedJars := { val cp = (assembly / fullClasspath).value - cp filter { file => file.data.getName.contains("AWSLogsDALConnectorSpark")} + cp filter { file => file.data.getName.contains("LogsConnectorSpark")} }, assembly / test := (Test / test).value) diff --git a/flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar b/flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar deleted file mode 100644 index 30c3fd3c4..000000000 Binary files a/flint-spark-integration/lib/AWSLogsDALConnectorSpark-1.0.jar and /dev/null differ diff --git a/flint-spark-integration/lib/LogsConnectorSpark-1.0.jar b/flint-spark-integration/lib/LogsConnectorSpark-1.0.jar new file mode 100644 index 000000000..0aa50bbb2 Binary files /dev/null and b/flint-spark-integration/lib/LogsConnectorSpark-1.0.jar differ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index 5a2f97038..83f8def4d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -67,36 +67,34 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] relation @ DataSourceV2Relation(table, _, Some(catalog), Some(identifier), _)) if hasNoDisjunction(condition) && // Check if query plan already rewritten - table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasLogFileIds() => + table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() => val index = flint.describeIndex(getIndexName(catalog, identifier)) if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] val indexFilter = rewriteToIndexFilter(skippingIndex, condition) /* - * Replace original LogsTable with a new one with log file idx: + * Replace original LogsTable with a new one with file index scan: * Filter(a=b) * |- DataSourceV2Relation(A) - * |- LogsTable <== replaced with a new LogsTable with log file ids + * |- LogsTable <== replaced with a new LogsTable with file index scan */ if (indexFilter.isDefined) { val indexScan = flint.queryIndex(skippingIndex.name()) - val selectedFiles = + val selectFileIndexScan = // Non hybrid scan // TODO: refactor common logic with file-based skipping index indexScan .filter(new Column(indexFilter.get)) .select(FILE_PATH_COLUMN) - .collect - .map(_.getString(0)) - // Construct LogsTable with selectedFiles as its log file ids - // It will build scan operator using these log file ids + // Construct LogsTable with file index scan + // It will build scan operator using log file ids collected from file index scan val logsTable = table.asInstanceOf[LogsTable] val newTable = new LogsTable( - logsTable.getSchema(), - logsTable.getOptions(), - selectedFiles, - logsTable.getProcessedFields()) + logsTable.schema(), + logsTable.options(), + selectFileIndexScan, + logsTable.processedFields()) filter.copy(child = relation.copy(table = newTable)) } else { filter