From e6a97dcc8c248788b7afc7843f90282f4f8db7c4 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 18 Mar 2024 13:20:18 -0700 Subject: [PATCH] Implement analyze skipping index statement (#284) * dummy result test Signed-off-by: Rupal Mahajan * Add grammar for analyze skipping index Signed-off-by: Rupal Mahajan * Add analyze skippig index function Signed-off-by: Rupal Mahajan * update analyze strategy Signed-off-by: Rupal Mahajan * Update recommendations Signed-off-by: Rupal Mahajan * Add test Signed-off-by: Rupal Mahajan * Format code Signed-off-by: Rupal Mahajan * Remove unused import Signed-off-by: Rupal Mahajan * Update doc Signed-off-by: Rupal Mahajan * Update doc Signed-off-by: Rupal Mahajan * nit Signed-off-by: Rupal Mahajan --------- Signed-off-by: Rupal Mahajan --- docs/index.md | 27 ++++++++ .../main/antlr4/FlintSparkSqlExtensions.g4 | 5 ++ .../src/main/antlr4/SparkSqlBase.g4 | 1 + .../opensearch/flint/spark/FlintSpark.scala | 15 ++++- .../AnalyzeSkippingStrategy.scala | 24 +++++++ .../DataTypeSkippingStrategy.scala | 63 +++++++++++++++++++ .../FlintSparkSkippingIndexAstBuilder.scala | 14 +++++ .../FlintSparkSkippingIndexSqlITSuite.scala | 39 ++++++++++++ 8 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/recommendations/AnalyzeSkippingStrategy.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/recommendations/DataTypeSkippingStrategy.scala diff --git a/docs/index.md b/docs/index.md index 8fb202e0e..3b1e2efe5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -183,6 +183,8 @@ DROP SKIPPING INDEX ON VACUUM SKIPPING INDEX ON +ANALYZE SKIPPING INDEX ON + ::= [db_name].[schema_name].table_name ``` @@ -319,6 +321,31 @@ fetched rows / total rows = 3/3 +-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+ ``` +- **Analyze Skipping Index**: Provides recommendation for creating skipping index. It outputs the following columns: + - column_name: recommended column's name + - column_type: recommended column's type + - skipping_type: recommended skipping type for column + - reason: why this skipping type is recommended + +```sql +ANALYZE SKIPPING INDEX ON [catalog.database.]table +``` + +Example: +``` +sql> ANALYZE SKIPPING INDEX ON alb_logs; +fetched rows / total rows = 5/5 ++-------------------------+-------------+---------------+-------------------------------------------------------------------+ +| column_name | column_type | skipping_type | reason | +|-------------------------+-------------+---------------+-------------------------------------------------------------------+ +| year | integer | PARTITION | PARTITION data structure is recommended for partition columns | +| month | integer | PARTITION | PARTITION data structure is recommended for partition columns | +| day | integer | PARTITION | PARTITION data structure is recommended for partition columns | +| request_processing_time | integer | MIN_MAX | MIN_MAX data structure is recommended for IntegerType columns | +| client_ip | string | BLOOM_FILTER | BLOOM_FILTER data structure is recommended for StringType columns | ++-------------------------+-------------+---------------+-------------------------------------------------------------------+ +``` + #### Create Index Options User can provide the following options in `WITH` clause of create statement: diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 3c22becf5..dc097d596 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -29,6 +29,7 @@ skippingIndexStatement | alterSkippingIndexStatement | dropSkippingIndexStatement | vacuumSkippingIndexStatement + | analyzeSkippingIndexStatement ; createSkippingIndexStatement @@ -105,6 +106,10 @@ vacuumCoveringIndexStatement : VACUUM INDEX indexName ON tableName ; +analyzeSkippingIndexStatement + : ANALYZE SKIPPING INDEX ON tableName + ; + materializedViewStatement : createMaterializedViewStatement | refreshMaterializedViewStatement diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 8f9ed570f..283981e47 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -157,6 +157,7 @@ DOT: '.'; AS: 'AS'; ALTER: 'ALTER'; +ANALYZE: 'ANALYZE'; CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index fba818a0f..89e8dc423 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -19,9 +19,10 @@ import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer +import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} @@ -332,6 +333,18 @@ class FlintSpark(val spark: SparkSession) extends Logging { spark.read.format(FLINT_DATASOURCE).load(indexName) } + /** + * Recommend skipping index columns and algorithm. + * + * @param tableName + * table name + * @return + * skipping index recommendation dataframe + */ + def analyzeSkippingIndex(tableName: String): Seq[Row] = { + new DataTypeSkippingStrategy().analyzeSkippingIndexColumns(tableName, spark) + } + private def stopRefreshingJob(indexName: String): Unit = { logInfo(s"Terminating refreshing job $indexName") val job = spark.streams.active.find(_.name == indexName) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/recommendations/AnalyzeSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/recommendations/AnalyzeSkippingStrategy.scala new file mode 100644 index 000000000..cd73b3854 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/recommendations/AnalyzeSkippingStrategy.scala @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.flint.spark.skipping.recommendations + +import org.apache.spark.sql.{Row, SparkSession} + +/** + * Automate skipping index column and algorithm selection. + */ +trait AnalyzeSkippingStrategy { + + /** + * Recommend skipping index columns and algorithm. + * + * @param tableName + * table name + * @return + * skipping index recommendation dataframe + */ + def analyzeSkippingIndexColumns(tableName: String, spark: SparkSession): Seq[Row] + +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/recommendations/DataTypeSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/recommendations/DataTypeSkippingStrategy.scala new file mode 100644 index 000000000..1f9cc0b68 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/recommendations/DataTypeSkippingStrategy.scala @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping.recommendations + +import scala.collection.mutable.ArrayBuffer + +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET} + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.flint.{loadTable, parseTableName} + +class DataTypeSkippingStrategy extends AnalyzeSkippingStrategy { + + val rules = Map( + "PARTITION" -> (PARTITION.toString, "PARTITION data structure is recommended for partition columns"), + "BooleanType" -> (VALUE_SET.toString, "VALUE_SET data structure is recommended for BooleanType columns"), + "IntegerType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for IntegerType columns"), + "LongType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for LongType columns"), + "ShortType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for ShortType columns"), + "DateType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for DateType columns"), + "TimestampType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for TimestampType columns"), + "StringType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for StringType columns"), + "VarcharType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for VarcharType columns"), + "CharType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for CharType columns"), + "StructType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for StructType columns")) + + override def analyzeSkippingIndexColumns(tableName: String, spark: SparkSession): Seq[Row] = { + val (catalog, ident) = parseTableName(spark, tableName) + val table = loadTable(catalog, ident).getOrElse( + throw new IllegalStateException(s"Table $tableName is not found")) + + val partitionFields = table.partitioning().flatMap { transform => + transform + .references() + .collect({ case reference => + reference.fieldNames() + }) + .flatten + .toSet + } + + val result = ArrayBuffer[Row]() + table.schema().fields.map { field => + if (partitionFields.contains(field.name)) { + result += Row( + field.name, + field.dataType.typeName, + rules("PARTITION")._1, + rules("PARTITION")._2) + } else if (rules.contains(field.dataType.toString)) { + result += Row( + field.name, + field.dataType.typeName, + rules(field.dataType.toString)._1, + rules(field.dataType.toString)._2) + } + } + result + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index fe6356c8e..e98446c22 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -110,6 +110,20 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + override def visitAnalyzeSkippingIndexStatement( + ctx: AnalyzeSkippingIndexStatementContext): Command = { + + val outputSchema = Seq( + AttributeReference("column_name", StringType, nullable = false)(), + AttributeReference("column_type", StringType, nullable = false)(), + AttributeReference("reason", StringType, nullable = false)(), + AttributeReference("skipping_type", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + flint.analyzeSkippingIndex(ctx.tableName().getText) + } + } + override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command = FlintSparkSqlCommand() { flint => val indexName = getSkippingIndexName(flint, ctx.tableName) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 63442e6e1..b08945953 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -365,4 +365,43 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { sql(s"VACUUM SKIPPING INDEX ON $testTable") flint.describeIndex(testIndex) shouldBe empty } + + test("analyze skipping index with for supported data types") { + val result = sql(s"ANALYZE SKIPPING INDEX ON $testTable") + + checkAnswer( + result, + Seq( + Row( + "year", + "integer", + "PARTITION", + "PARTITION data structure is recommended for partition columns"), + Row( + "month", + "integer", + "PARTITION", + "PARTITION data structure is recommended for partition columns"), + Row( + "name", + "string", + "BLOOM_FILTER", + "BLOOM_FILTER data structure is recommended for StringType columns"), + Row( + "age", + "integer", + "MIN_MAX", + "MIN_MAX data structure is recommended for IntegerType columns"), + Row( + "address", + "string", + "BLOOM_FILTER", + "BLOOM_FILTER data structure is recommended for StringType columns"))) + } + + test("analyze skipping index on invalid table") { + the[IllegalStateException] thrownBy { + sql(s"ANALYZE SKIPPING INDEX ON testTable") + } + } }