Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add create, drop and refresh covering index SQL support #32

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ A Flint index is ...

### Feature Highlights

- Skipping Index
- Skipping Index: accelerate data scan by maintaining compact aggregate data structure which includes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be very helpful if u can add a diagram showing the content of the skipping index and how it relates to the original table ...

Copy link
Collaborator Author

@dai-chen dai-chen Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've created test for doc update. Will add doctest with examples in #28. Thanks!

- Partition: skip data scan by maintaining and filtering partitioned column value per file.
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.
- Covering Index: create index for selected columns within the source dataset to improve query performance

Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.

Expand Down Expand Up @@ -117,7 +118,7 @@ High level API is dependent on query engine implementation. Please see Query Eng

### SQL

DDL statement:
#### Skipping Index

```sql
CREATE SKIPPING INDEX
Expand All @@ -128,7 +129,7 @@ WITH (auto_refresh = (true|false))

REFRESH SKIPPING INDEX ON <object>

DESCRIBE SKIPPING INDEX ON <object>
[DESC|DESCRIBE] SKIPPING INDEX ON <object>

DROP SKIPPING INDEX ON <object>

Expand Down Expand Up @@ -157,6 +158,38 @@ DESCRIBE SKIPPING INDEX ON alb_logs
DROP SKIPPING INDEX ON alb_logs
```

#### Covering Index

```sql
CREATE INDEX name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))

REFRESH INDEX name ON <object>

SHOW [INDEX|INDEXES] ON <object>

[DESC|DESCRIBE] INDEX name ON <object>

DROP INDEX name ON <object>
```

Example:

```sql
CREATE INDEX elb_and_requestUri
ON alb_logs ( elb, requestUri )

REFRESH INDEX elb_and_requestUri ON alb_logs

SHOW INDEX ON alb_logs

DESCRIBE INDEX elb_and_requestUri ON alb_logs

DROP INDEX elb_and_requestUri ON alb_logs
```

## Index Store

### OpenSearch
Expand Down Expand Up @@ -264,6 +297,7 @@ Here is an example for Flint Spark integration:
```scala
val flint = new FlintSpark(spark)

// Skipping index
flint.skippingIndex()
.onTable("alb_logs")
.filterBy("time > 2023-04-01 00:00:00")
Expand All @@ -273,6 +307,15 @@ flint.skippingIndex()
.create()

flint.refresh("flint_alb_logs_skipping_index", FULL)

// Covering index
flint.coveringIndex()
.name("elb_and_requestUri")
.onTable("alb_logs")
.addIndexColumns("elb", "requestUri")
.create()

flint.refresh("flint_alb_logs_elb_and_requestUri_index")
```

#### Skipping Index Provider SPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ singleStatement

statement
: skippingIndexStatement
| coveringIndexStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -43,6 +44,26 @@ dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName=multipartIdentifier
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName=identifier ON tableName=multipartIdentifier
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier
;

dropCoveringIndexStatement
: DROP INDEX indexName=identifier ON tableName=multipartIdentifier
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
8 changes: 8 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ grammar SparkSqlBase;
}


multipartIdentifierPropertyList
: multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
;

multipartIdentifierProperty
: multipartIdentifier (options=propertyList)?
;

propertyList
: property (COMMA property)*
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,84 +7,34 @@ package org.opensearch.flint.spark.sql

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.PropertyListContext
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint index statement.
* This class mix-in all other AST builders and provides util methods.
*/
class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] {
class FlintSparkSqlAstBuilder
extends FlintSparkSqlExtensionsBaseVisitor[Command]
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder {

override def visitCreateSkippingIndexStatement(
ctx: CreateSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
// Create skipping index
val indexBuilder = flint
.skippingIndex()
.onTable(getFullTableName(flint, ctx.tableName))

ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
val colName = colTypeCtx.identifier().getText
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
skipType match {
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET => indexBuilder.addValueSet(colName)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
indexBuilder.create()

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName, RefreshMode.INCREMENTAL)
}
Seq.empty
}

override def visitRefreshSkippingIndexStatement(
ctx: RefreshSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName, RefreshMode.FULL)
Seq.empty
}

override def visitDescribeSkippingIndexStatement(
ctx: DescribeSkippingIndexStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("indexed_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("skip_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint
.describeIndex(indexName)
.map { case index: FlintSparkSkippingIndex =>
index.indexedColumns.map(strategy =>
Row(strategy.columnName, strategy.columnType, strategy.kind.toString))
}
.getOrElse(Seq.empty)
}
}
override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate
}

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.deleteIndex(indexName)
Seq.empty
}
object FlintSparkSqlAstBuilder {

private def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
/**
* Check if auto_refresh is true in property list.
*
* @param ctx
* property list
*/
def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
if (ctx == null) {
false
} else {
Expand All @@ -99,10 +49,16 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command
}
}

private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String =
FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx))

private def getFullTableName(flint: FlintSpark, tableNameCtx: RuleNode): String = {
/**
* Get full table name if database not specified.
*
* @param flint
* Flint Spark which has access to Spark Catalog
* @param tableNameCtx
* table name
* @return
*/
def getFullTableName(flint: FlintSpark, tableNameCtx: RuleNode): String = {
val tableName = tableNameCtx.getText
if (tableName.contains(".")) {
tableName
Expand All @@ -111,7 +67,4 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command
s"$db.$tableName"
}
}

override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql.covering

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext}

import org.apache.spark.sql.catalyst.plans.logical.Command

/**
* Flint Spark AST builder that builds Spark command for Flint covering index statement.
*/
trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[Command] {

override def visitCreateCoveringIndexStatement(
ctx: CreateCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val indexName = ctx.indexName.getText
val tableName = ctx.tableName.getText
val indexBuilder =
flint
.coveringIndex()
.name(indexName)
.onTable(tableName)

ctx.indexColumns.multipartIdentifierProperty().forEach { indexColCtx =>
val colName = indexColCtx.multipartIdentifier().getText
indexBuilder.addIndexColumns(colName)
}
indexBuilder.create()

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL)
}
Seq.empty
}
}

override def visitRefreshCoveringIndexStatement(
ctx: RefreshCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName, RefreshMode.FULL)
Seq.empty
}
}

override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.deleteIndex(flintIndexName)
Seq.empty
}
}

private def getFlintIndexName(
flint: FlintSpark,
indexNameCtx: RuleNode,
tableNameCtx: RuleNode): String = {
val indexName = indexNameCtx.getText
val fullTableName = getFullTableName(flint, tableNameCtx)
FlintSparkCoveringIndex.getFlintIndexName(indexName, fullTableName)
}
}
Loading
Loading