Skip to content

Commit

Permalink
Delay index scan collect to query execution time
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Nov 14, 2023
1 parent 2713f08 commit deb5fe8
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 13 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
},
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter { file => file.data.getName.contains("AWSLogsDALConnectorSpark")}
cp filter { file => file.data.getName.contains("LogsConnectorSpark")}
},
assembly / test := (Test / test).value)

Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,34 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
relation @ DataSourceV2Relation(table, _, Some(catalog), Some(identifier), _))
if hasNoDisjunction(condition) &&
// Check if query plan already rewritten
table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasLogFileIds() =>
table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() =>
val index = flint.describeIndex(getIndexName(catalog, identifier))
if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)
/*
* Replace original LogsTable with a new one with log file idx:
* Replace original LogsTable with a new one with file index scan:
* Filter(a=b)
* |- DataSourceV2Relation(A)
* |- LogsTable <== replaced with a new LogsTable with log file ids
* |- LogsTable <== replaced with a new LogsTable with file index scan
*/
if (indexFilter.isDefined) {
val indexScan = flint.queryIndex(skippingIndex.name())
val selectedFiles =
val selectFileIndexScan =
// Non hybrid scan
// TODO: refactor common logic with file-based skipping index
indexScan
.filter(new Column(indexFilter.get))
.select(FILE_PATH_COLUMN)
.collect
.map(_.getString(0))

// Construct LogsTable with selectedFiles as its log file ids
// It will build scan operator using these log file ids
// Construct LogsTable with file index scan
// It will build scan operator using log file ids collected from file index scan
val logsTable = table.asInstanceOf[LogsTable]
val newTable = new LogsTable(
logsTable.getSchema(),
logsTable.getOptions(),
selectedFiles,
logsTable.getProcessedFields())
logsTable.schema(),
logsTable.options(),
selectFileIndexScan,
logsTable.processedFields())
filter.copy(child = relation.copy(table = newTable))
} else {
filter
Expand Down

0 comments on commit deb5fe8

Please sign in to comment.