Skip to content

Commit

Permalink
Update user manual and code comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Apr 23, 2024
1 parent 3ec9e42 commit f9eedf7
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 15 deletions.
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
- `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway).
- `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance.
- `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
*/
class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] {

/**
* Prerequisite:
* ```
* 1) Not an insert statement
* 2) Relation is supported, ex. Iceberg, Delta, File. (is this check required?)
* 3) Any covering index on the table:
* 3.1) doesn't have filtering condition
* 3.2) cover all columns present in the query
* ```
*/
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case relation @ LogicalRelation(_, _, Some(table), false)
if !plan.isInstanceOf[V2WriteCommand] =>
if !plan.isInstanceOf[V2WriteCommand] => // Not an insert statement
val tableName = table.qualifiedName
val requiredCols = collectAllColumnsInQueryPlan(plan)

Expand Down Expand Up @@ -74,7 +64,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
index: FlintSparkCoveringIndex,
requiredCols: Set[String]): Boolean = {
index.latestLogEntry.exists(_.state != DELETED) &&
index.filterCondition.isEmpty &&
index.filterCondition.isEmpty && // TODO: support partial covering index later
requiredCols.subsetOf(index.indexedColumns.keySet)
}

Expand All @@ -87,7 +77,9 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
val inferredSchema = ds.inferSchema(options)
val flintTable = ds.getTable(inferredSchema, Array.empty, options)

// Keep original output attributes in index only
// Keep original output attributes only if available in covering index.
// We have to reuse original attribute object because it's already analyzed
// with exprId referenced by the other parts of the query plan.
val outputAttributes =
index.indexedColumns.keys
.map(colName => relation.output.find(_.name == colName).get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
.assertIndexNotUsed()
}

// Covering index doesn't column age
// Covering index doesn't cover column age
Seq(
s"SELECT * FROM $testTable",
s"SELECT name, age FROM $testTable",
Expand Down

0 comments on commit f9eedf7

Please sign in to comment.