Skip to content

Commit

Permalink
Merge branch 'main' into ppl-spark-translation
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Sep 22, 2023
2 parents ebd165f + e72f054 commit 357d61e
Show file tree
Hide file tree
Showing 13 changed files with 483 additions and 127 deletions.
49 changes: 46 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ A Flint index is ...

### Feature Highlights

- Skipping Index
- Skipping Index: accelerate data scan by maintaining compact aggregate data structure which includes
- Partition: skip data scan by maintaining and filtering partitioned column value per file.
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.
- Covering Index: create index for selected columns within the source dataset to improve query performance

Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.

Expand Down Expand Up @@ -117,7 +118,7 @@ High level API is dependent on query engine implementation. Please see Query Eng

### SQL

DDL statement:
#### Skipping Index

```sql
CREATE SKIPPING INDEX
Expand All @@ -128,7 +129,7 @@ WITH (auto_refresh = (true|false))

REFRESH SKIPPING INDEX ON <object>

DESCRIBE SKIPPING INDEX ON <object>
[DESC|DESCRIBE] SKIPPING INDEX ON <object>

DROP SKIPPING INDEX ON <object>

Expand Down Expand Up @@ -157,6 +158,38 @@ DESCRIBE SKIPPING INDEX ON alb_logs
DROP SKIPPING INDEX ON alb_logs
```

#### Covering Index

```sql
CREATE INDEX name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))

REFRESH INDEX name ON <object>

SHOW [INDEX|INDEXES] ON <object>

[DESC|DESCRIBE] INDEX name ON <object>

DROP INDEX name ON <object>
```

Example:

```sql
CREATE INDEX elb_and_requestUri
ON alb_logs ( elb, requestUri )

REFRESH INDEX elb_and_requestUri ON alb_logs

SHOW INDEX ON alb_logs

DESCRIBE INDEX elb_and_requestUri ON alb_logs

DROP INDEX elb_and_requestUri ON alb_logs
```

## Index Store

### OpenSearch
Expand Down Expand Up @@ -264,6 +297,7 @@ Here is an example for Flint Spark integration:
```scala
val flint = new FlintSpark(spark)

// Skipping index
flint.skippingIndex()
.onTable("alb_logs")
.filterBy("time > 2023-04-01 00:00:00")
Expand All @@ -273,6 +307,15 @@ flint.skippingIndex()
.create()

flint.refresh("flint_alb_logs_skipping_index", FULL)

// Covering index
flint.coveringIndex()
.name("elb_and_requestUri")
.onTable("alb_logs")
.addIndexColumns("elb", "requestUri")
.create()

flint.refresh("flint_alb_logs_elb_and_requestUri_index")
```

#### Skipping Index Provider SPI
Expand Down
21 changes: 21 additions & 0 deletions flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ singleStatement

statement
: skippingIndexStatement
| coveringIndexStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -43,6 +44,26 @@ dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName=multipartIdentifier
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName=identifier ON tableName=multipartIdentifier
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier
;

dropCoveringIndexStatement
: DROP INDEX indexName=identifier ON tableName=multipartIdentifier
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
8 changes: 8 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ grammar SparkSqlBase;
}


multipartIdentifierPropertyList
: multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
;

multipartIdentifierProperty
: multipartIdentifier (options=propertyList)?
;

propertyList
: property (COMMA property)*
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.sql.FlintSparkSqlParser

import org.apache.spark.sql.SparkSessionExtensions
Expand All @@ -18,6 +19,9 @@ class FlintSparkExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectParser { (spark, parser) =>
new FlintSparkSqlParser(parser)
}

extensions.injectFunction(TumbleFunction.description)

extensions.injectOptimizerRule { spark =>
new FlintSparkOptimizer(spark)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.function

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.functions.window

/**
* Tumble windowing function that assigns row to fixed interval window without overlap.
*/
object TumbleFunction {

/**
* Function name.
*/
val identifier: FunctionIdentifier = FunctionIdentifier("tumble")

/**
* Function signature: tumble function generates a new struct column after evaluation.
*/
val exprInfo: ExpressionInfo = new ExpressionInfo(classOf[Column].getCanonicalName, "window")

/**
* Function implementation builder.
*/
val functionBuilder: Seq[Expression] => Expression =
(children: Seq[Expression]) => {
require(children.size == 2, "column name and window expression are required")

// Delegate actual implementation to Spark existing window() function
val timeColumn = children.head
val windowDuration = children(1)
window(new Column(timeColumn), windowDuration.toString()).expr
}

/**
* Function description to register current function to Spark extension.
*/
val description: (FunctionIdentifier, ExpressionInfo, FunctionBuilder) =
(identifier, exprInfo, functionBuilder)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,84 +7,34 @@ package org.opensearch.flint.spark.sql

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.PropertyListContext
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint index statement.
* This class mix-in all other AST builders and provides util methods.
*/
class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] {
class FlintSparkSqlAstBuilder
extends FlintSparkSqlExtensionsBaseVisitor[Command]
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder {

override def visitCreateSkippingIndexStatement(
ctx: CreateSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
// Create skipping index
val indexBuilder = flint
.skippingIndex()
.onTable(getFullTableName(flint, ctx.tableName))

ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
val colName = colTypeCtx.identifier().getText
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
skipType match {
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET => indexBuilder.addValueSet(colName)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
indexBuilder.create()

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName, RefreshMode.INCREMENTAL)
}
Seq.empty
}

override def visitRefreshSkippingIndexStatement(
ctx: RefreshSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName, RefreshMode.FULL)
Seq.empty
}

override def visitDescribeSkippingIndexStatement(
ctx: DescribeSkippingIndexStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("indexed_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("skip_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint
.describeIndex(indexName)
.map { case index: FlintSparkSkippingIndex =>
index.indexedColumns.map(strategy =>
Row(strategy.columnName, strategy.columnType, strategy.kind.toString))
}
.getOrElse(Seq.empty)
}
}
override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate
}

override def visitDropSkippingIndexStatement(ctx: DropSkippingIndexStatementContext): Command =
FlintSparkSqlCommand() { flint =>
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.deleteIndex(indexName)
Seq.empty
}
object FlintSparkSqlAstBuilder {

private def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
/**
* Check if auto_refresh is true in property list.
*
* @param ctx
* property list
*/
def isAutoRefreshEnabled(ctx: PropertyListContext): Boolean = {
if (ctx == null) {
false
} else {
Expand All @@ -99,10 +49,16 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command
}
}

private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String =
FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx))

private def getFullTableName(flint: FlintSpark, tableNameCtx: RuleNode): String = {
/**
* Get full table name if database not specified.
*
* @param flint
* Flint Spark which has access to Spark Catalog
* @param tableNameCtx
* table name
* @return
*/
def getFullTableName(flint: FlintSpark, tableNameCtx: RuleNode): String = {
val tableName = tableNameCtx.getText
if (tableName.contains(".")) {
tableName
Expand All @@ -111,7 +67,4 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command
s"$db.$tableName"
}
}

override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate
}
Loading

0 comments on commit 357d61e

Please sign in to comment.