Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement analyze skipping index statement #284

Merged
27 changes: 27 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ DROP SKIPPING INDEX ON <object>

VACUUM SKIPPING INDEX ON <object>

ANALYZE SKIPPING INDEX ON <object>

<object> ::= [db_name].[schema_name].table_name
```

Expand Down Expand Up @@ -319,6 +321,31 @@ fetched rows / total rows = 3/3
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+
```

- **Analyze Skipping Index**: Provides recommendation for creating skipping index. It outputs the following columns:
- column_name: recommended column's name
- column_type: recommended column's type
- skipping_type: recommended skipping type for column
- reason: why this skipping type is recommended

```sql
ANALYZE SKIPPING INDEX ON [catalog.database.]table
```

Example:
```
sql> ANALYZE SKIPPING INDEX ON alb_logs;
fetched rows / total rows = 5/5
+-------------------------+-------------+---------------+-------------------------------------------------------------------+
| column_name | column_type | skipping_type | reason |
|-------------------------+-------------+---------------+-------------------------------------------------------------------+
| year | integer | PARTITION | PARTITION data structure is recommended for partition columns |
| month | integer | PARTITION | PARTITION data structure is recommended for partition columns |
| day | integer | PARTITION | PARTITION data structure is recommended for partition columns |
| request_processing_time | integer | MIN_MAX | MIN_MAX data structure is recommended for IntegerType columns |
| client_ip | string | BLOOM_FILTER | BLOOM_FILTER data structure is recommended for StringType columns |
+-------------------------+-------------+---------------+-------------------------------------------------------------------+
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ skippingIndexStatement
| alterSkippingIndexStatement
| dropSkippingIndexStatement
| vacuumSkippingIndexStatement
| analyzeSkippingIndexStatement
;

createSkippingIndexStatement
Expand Down Expand Up @@ -105,6 +106,10 @@ vacuumCoveringIndexStatement
: VACUUM INDEX indexName ON tableName
;

analyzeSkippingIndexStatement
: ANALYZE SKIPPING INDEX ON tableName
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this grammar finalized? What is the semantic meaning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is proposed grammar. Please comment if you have any other suggestions. Analyze refers to examining data to get insights. This command will return recommendation for creating skipping index (skipping index columns with suggested data structure) based on table data.

Copy link
Collaborator

@noCharger noCharger Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is proposed grammar.

Any reference / compatibility analysis with the mainstream syntax?

Please comment if you have any other suggestions.

Just brainstorming -

ANALYZE TABLE tableName FOR SKIPPING INDEX RECOMMENDATIONS;

Or

ANALYZE TABLE tableName RECOMMEND SKIPPING INDEX COLUMNS;

The assumption is we may want to do more things other from the recommendation.

ref https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/ANALYZE.html#GUID-535CE98E-2359-4147-839F-DCB3772C1B0E

;

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ DOT: '.';

AS: 'AS';
ALTER: 'ALTER';
ANALYZE: 'ANALYZE';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
Expand Down Expand Up @@ -332,6 +333,18 @@ class FlintSpark(val spark: SparkSession) extends Logging {
spark.read.format(FLINT_DATASOURCE).load(indexName)
}

/**
* Recommend skipping index columns and algorithm.
*
* @param tableName
* table name
* @return
* skipping index recommendation dataframe
*/
def analyzeSkippingIndex(tableName: String): Seq[Row] = {
new DataTypeSkippingStrategy().analyzeSkippingIndexColumns(tableName, spark)
}

private def stopRefreshingJob(indexName: String): Unit = {
logInfo(s"Terminating refreshing job $indexName")
val job = spark.streams.active.find(_.name == indexName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.flint.spark.skipping.recommendations

import org.apache.spark.sql.{Row, SparkSession}

/**
* Automate skipping index column and algorithm selection.
*/
trait AnalyzeSkippingStrategy {

/**
* Recommend skipping index columns and algorithm.
*
* @param tableName
* table name
* @return
* skipping index recommendation dataframe
*/
def analyzeSkippingIndexColumns(tableName: String, spark: SparkSession): Seq[Row]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.skipping.recommendations

import scala.collection.mutable.ArrayBuffer

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.flint.{loadTable, parseTableName}

class DataTypeSkippingStrategy extends AnalyzeSkippingStrategy {

val rules = Map(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if more flexible to move this static mapping to config file? Or maybe not necessary for this P0 solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. added this here thinking it's specific to data type based recommendation and won't be used by other strategies(e.g. recommendation based on table stats).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take this up as fast follow up because it will unblock sql plugin if we can finalize grammar before 2.13 release.

"PARTITION" -> (PARTITION.toString, "PARTITION data structure is recommended for partition columns"),
"BooleanType" -> (VALUE_SET.toString, "VALUE_SET data structure is recommended for BooleanType columns"),
"IntegerType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for IntegerType columns"),
"LongType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for LongType columns"),
"ShortType" -> (MIN_MAX.toString, "MIN_MAX data structure is recommended for ShortType columns"),
"DateType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for DateType columns"),
"TimestampType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for TimestampType columns"),
"StringType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for StringType columns"),
"VarcharType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for VarcharType columns"),
"CharType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for CharType columns"),
"StructType" -> (BLOOM_FILTER.toString, "BLOOM_FILTER data structure is recommended for StructType columns"))

override def analyzeSkippingIndexColumns(tableName: String, spark: SparkSession): Seq[Row] = {
val (catalog, ident) = parseTableName(spark, tableName)
val table = loadTable(catalog, ident).getOrElse(
throw new IllegalStateException(s"Table $tableName is not found"))

val partitionFields = table.partitioning().flatMap { transform =>
transform
.references()
.collect({ case reference =>
reference.fieldNames()
})
.flatten
.toSet
}
Comment on lines +35 to +43
Copy link
Collaborator

@dai-chen dai-chen Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the right API because I've only used table.schema(). Could you double check this along with the comment above later? I will merge this PR for now so we can get the grammar into SQL plugin side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure will do. Thanks!


val result = ArrayBuffer[Row]()
table.schema().fields.map { field =>
if (partitionFields.contains(field.name)) {
result += Row(
field.name,
field.dataType.typeName,
rules("PARTITION")._1,
rules("PARTITION")._2)
} else if (rules.contains(field.dataType.toString)) {
result += Row(
field.name,
field.dataType.typeName,
rules(field.dataType.toString)._1,
rules(field.dataType.toString)._2)
}
}
result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

override def visitAnalyzeSkippingIndexStatement(
ctx: AnalyzeSkippingIndexStatementContext): Command = {

val outputSchema = Seq(
AttributeReference("column_name", StringType, nullable = false)(),
AttributeReference("column_type", StringType, nullable = false)(),
AttributeReference("reason", StringType, nullable = false)(),
AttributeReference("skipping_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
flint.analyzeSkippingIndex(ctx.tableName().getText)
}
}

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,43 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(testIndex) shouldBe empty
}

test("analyze skipping index with for supported data types") {
val result = sql(s"ANALYZE SKIPPING INDEX ON $testTable")

checkAnswer(
result,
Seq(
Row(
"year",
"integer",
"PARTITION",
"PARTITION data structure is recommended for partition columns"),
Row(
"month",
"integer",
"PARTITION",
"PARTITION data structure is recommended for partition columns"),
Row(
"name",
"string",
"BLOOM_FILTER",
"BLOOM_FILTER data structure is recommended for StringType columns"),
Row(
"age",
"integer",
"MIN_MAX",
"MIN_MAX data structure is recommended for IntegerType columns"),
Row(
"address",
"string",
"BLOOM_FILTER",
"BLOOM_FILTER data structure is recommended for StringType columns")))
}

test("analyze skipping index on invalid table") {
the[IllegalStateException] thrownBy {
sql(s"ANALYZE SKIPPING INDEX ON testTable")
}
}
}
Loading