Skip to content

Commit

Permalink
Implement analyze skipping index statement (#284)
Browse files Browse the repository at this point in the history
* dummy result test

Signed-off-by: Rupal Mahajan <[email protected]>

* Add grammar for analyze skipping index

Signed-off-by: Rupal Mahajan <[email protected]>

* Add analyze skippig index function

Signed-off-by: Rupal Mahajan <[email protected]>

* update analyze strategy

Signed-off-by: Rupal Mahajan <[email protected]>

* Update recommendations

Signed-off-by: Rupal Mahajan <[email protected]>

* Add test

Signed-off-by: Rupal Mahajan <[email protected]>

* Format code

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused import

Signed-off-by: Rupal Mahajan <[email protected]>

* Update doc

Signed-off-by: Rupal Mahajan <[email protected]>

* Update doc

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

---------

Signed-off-by: Rupal Mahajan <[email protected]>
  • Loading branch information
rupal-bq authored Mar 18, 2024
1 parent 5a2df20 commit e6a97dc
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 1 deletion.
27 changes: 27 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ DROP SKIPPING INDEX ON <object>

VACUUM SKIPPING INDEX ON <object>

ANALYZE SKIPPING INDEX ON <object>

<object> ::= [db_name].[schema_name].table_name
```

Expand Down Expand Up @@ -319,6 +321,31 @@ fetched rows / total rows = 3/3
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+
```

- **Analyze Skipping Index**: Provides recommendation for creating skipping index. It outputs the following columns:
- column_name: recommended column's name
- column_type: recommended column's type
- skipping_type: recommended skipping type for column
- reason: why this skipping type is recommended

```sql
ANALYZE SKIPPING INDEX ON [catalog.database.]table
```

Example:
```
sql> ANALYZE SKIPPING INDEX ON alb_logs;
fetched rows / total rows = 5/5
+-------------------------+-------------+---------------+-------------------------------------------------------------------+
| column_name | column_type | skipping_type | reason |
|-------------------------+-------------+---------------+-------------------------------------------------------------------+
| year | integer | PARTITION | PARTITION data structure is recommended for partition columns |
| month | integer | PARTITION | PARTITION data structure is recommended for partition columns |
| day | integer | PARTITION | PARTITION data structure is recommended for partition columns |
| request_processing_time | integer | MIN_MAX | MIN_MAX data structure is recommended for IntegerType columns |
| client_ip | string | BLOOM_FILTER | BLOOM_FILTER data structure is recommended for StringType columns |
+-------------------------+-------------+---------------+-------------------------------------------------------------------+
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ skippingIndexStatement
| alterSkippingIndexStatement
| dropSkippingIndexStatement
| vacuumSkippingIndexStatement
| analyzeSkippingIndexStatement
;

createSkippingIndexStatement
Expand Down Expand Up @@ -105,6 +106,10 @@ vacuumCoveringIndexStatement
: VACUUM INDEX indexName ON tableName
;

analyzeSkippingIndexStatement
: ANALYZE SKIPPING INDEX ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ DOT: '.';

AS: 'AS';
ALTER: 'ALTER';
ANALYZE: 'ANALYZE';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
Expand Down Expand Up @@ -332,6 +333,18 @@ class FlintSpark(val spark: SparkSession) extends Logging {
spark.read.format(FLINT_DATASOURCE).load(indexName)
}

/**
* Recommend skipping index columns and algorithm.
*
* @param tableName
* table name
* @return
* skipping index recommendation dataframe
*/
def analyzeSkippingIndex(tableName: String): Seq[Row] = {
new DataTypeSkippingStrategy().analyzeSkippingIndexColumns(tableName, spark)
}

private def stopRefreshingJob(indexName: String): Unit = {
logInfo(s"Terminating refreshing job $indexName")
val job = spark.streams.active.find(_.name == indexName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.flint.spark.skipping.recommendations

import org.apache.spark.sql.{Row, SparkSession}

/**
* Automate skipping index column and algorithm selection.
*/
trait AnalyzeSkippingStrategy {

/**
* Recommend skipping index columns and algorithm.
*
* @param tableName
* table name
* @return
* skipping index recommendation dataframe
*/
def analyzeSkippingIndexColumns(tableName: String, spark: SparkSession): Seq[Row]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.skipping.recommendations

import scala.collection.mutable.ArrayBuffer

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.flint.{loadTable, parseTableName}

class DataTypeSkippingStrategy extends AnalyzeSkippingStrategy {

val rules = Map(
"PARTITION" -> (PARTITION.toString, "PARTITION data structure is recommended for partition columns"),
"BooleanType" -> (VALUE_SET.toString, "VALUE_SET data structure is recommended for BooleanType columns"),
"IntegerType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for IntegerType columns"),
"LongType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for LongType columns"),
"ShortType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for ShortType columns"),
"DateType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for DateType columns"),
"TimestampType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for TimestampType columns"),
"StringType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for StringType columns"),
"VarcharType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for VarcharType columns"),
"CharType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for CharType columns"),
"StructType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for StructType columns"))

override def analyzeSkippingIndexColumns(tableName: String, spark: SparkSession): Seq[Row] = {
val (catalog, ident) = parseTableName(spark, tableName)
val table = loadTable(catalog, ident).getOrElse(
throw new IllegalStateException(s"Table $tableName is not found"))

val partitionFields = table.partitioning().flatMap { transform =>
transform
.references()
.collect({ case reference =>
reference.fieldNames()
})
.flatten
.toSet
}

val result = ArrayBuffer[Row]()
table.schema().fields.map { field =>
if (partitionFields.contains(field.name)) {
result += Row(
field.name,
field.dataType.typeName,
rules("PARTITION")._1,
rules("PARTITION")._2)
} else if (rules.contains(field.dataType.toString)) {
result += Row(
field.name,
field.dataType.typeName,
rules(field.dataType.toString)._1,
rules(field.dataType.toString)._2)
}
}
result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

override def visitAnalyzeSkippingIndexStatement(
ctx: AnalyzeSkippingIndexStatementContext): Command = {

val outputSchema = Seq(
AttributeReference("column_name", StringType, nullable = false)(),
AttributeReference("column_type", StringType, nullable = false)(),
AttributeReference("reason", StringType, nullable = false)(),
AttributeReference("skipping_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
flint.analyzeSkippingIndex(ctx.tableName().getText)
}
}

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,43 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(testIndex) shouldBe empty
}

test("analyze skipping index with for supported data types") {
val result = sql(s"ANALYZE SKIPPING INDEX ON $testTable")

checkAnswer(
result,
Seq(
Row(
"year",
"integer",
"PARTITION",
"PARTITION data structure is recommended for partition columns"),
Row(
"month",
"integer",
"PARTITION",
"PARTITION data structure is recommended for partition columns"),
Row(
"name",
"string",
"BLOOM_FILTER",
"BLOOM_FILTER data structure is recommended for StringType columns"),
Row(
"age",
"integer",
"MIN_MAX",
"MIN_MAX data structure is recommended for IntegerType columns"),
Row(
"address",
"string",
"BLOOM_FILTER",
"BLOOM_FILTER data structure is recommended for StringType columns")))
}

test("analyze skipping index on invalid table") {
the[IllegalStateException] thrownBy {
sql(s"ANALYZE SKIPPING INDEX ON testTable")
}
}
}

0 comments on commit e6a97dc

Please sign in to comment.