From f9eedf73b4cb87c977ccf0ace0086a842d5ed9ae Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 23 Apr 2024 15:01:37 -0700 Subject: [PATCH] Update user manual and code comments Signed-off-by: Chen Dai --- docs/index.md | 3 ++- .../ApplyFlintSparkCoveringIndex.scala | 18 +++++------------- .../ApplyFlintSparkCoveringIndexSuite.scala | 2 +- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/docs/index.md b/docs/index.md index 055756e4c..b1bf5478d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -514,7 +514,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. - `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway). - `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown. -- `spark.flint.optimizer.enabled`: default is true. +- `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance. +- `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance. - `spark.flint.index.hybridscan.enabled`: default is false. - `spark.flint.index.checkpoint.mandatory`: default is true. - `spark.datasource.flint.socket_timeout_millis`: default value is 60000. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala index c8cb62b2c..f917f9c98 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala @@ -27,19 +27,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap */ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] { - /** - * Prerequisite: - * ``` - * 1) Not an insert statement - * 2) Relation is supported, ex. Iceberg, Delta, File. (is this check required?) - * 3) Any covering index on the table: - * 3.1) doesn't have filtering condition - * 3.2) cover all columns present in the query - * ``` - */ override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case relation @ LogicalRelation(_, _, Some(table), false) - if !plan.isInstanceOf[V2WriteCommand] => + if !plan.isInstanceOf[V2WriteCommand] => // Not an insert statement val tableName = table.qualifiedName val requiredCols = collectAllColumnsInQueryPlan(plan) @@ -74,7 +64,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] index: FlintSparkCoveringIndex, requiredCols: Set[String]): Boolean = { index.latestLogEntry.exists(_.state != DELETED) && - index.filterCondition.isEmpty && + index.filterCondition.isEmpty && // TODO: support partial covering index later requiredCols.subsetOf(index.indexedColumns.keySet) } @@ -87,7 +77,9 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] val inferredSchema = ds.inferSchema(options) val flintTable = ds.getTable(inferredSchema, Array.empty, options) - // Keep original output attributes in index only + // Keep original output attributes only if available in covering index. + // We have to reuse original attribute object because it's already analyzed + // with exprId referenced by the other parts of the query plan. val outputAttributes = index.indexedColumns.keys .map(colName => relation.output.find(_.name == colName).get) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 6a64be925..bd0d8225b 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -71,7 +71,7 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { .assertIndexNotUsed() } - // Covering index doesn't column age + // Covering index doesn't cover column age Seq( s"SELECT * FROM $testTable", s"SELECT name, age FROM $testTable",