Skip to content

Commit

Permalink
Merge branch 'main' into encode-catalog-name-in-flint-index-name
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 28, 2023
2 parents 25805cc + 33a5f0a commit a162cb5
Show file tree
Hide file tree
Showing 105 changed files with 10,334 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
installation_id: 22958780

- name: Backport
uses: VachaShah/backport@v2.1.0
uses: VachaShah/backport@v2.2.0
with:
github_token: ${{ steps.github_app_token.outputs.token }}
head_template: backport/backport-<%= number %>-to-<%= base %>
Expand Down
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sbt scalafmtAll
```
The code style is automatically checked, but users can also manually check it.
```
sbt sbt scalastyle
sbt scalastyle
```
For IntelliJ user, read more in [scalafmt IntelliJ](https://scalameta.org/scalafmt/docs/installation.html#intellij) to integrate
scalafmt with IntelliJ
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ OpenSearch Flint is ... It consists of two modules:

- `flint-core`: a module that contains Flint specification and client.
- `flint-spark-integration`: a module that provides Spark integration for Flint and derived dataset based on it.
- `ppl-spark-integration`: a module that provides PPL query execution on top of Spark See [PPL repository](https://github.com/opensearch-project/piped-processing-language).

## Documentation

Please refer to the [Flint Index Reference Manual](./docs/index.md) for more information.
For PPL language see [PPL Reference Manual](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst) for more information.

## Prerequisites

Expand All @@ -17,14 +19,22 @@ Version compatibility:
|---------------|-------------|---------------|---------------|------------|
| 0.1.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |

## Usage
## Flint Extension Usage

To use this application, you can run Spark with Flint extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintSparkExtensions"
```

## PPL Extension Usage

To use PPL to Spark translation, you can run Spark with PPL extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
```

## Build

To build and run this application with Spark, you can run:
Expand All @@ -37,6 +47,18 @@ then add org.opensearch:opensearch-spark_2.12 when run spark application, for ex
bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT"
```

### PPL Build & Run

To build and run this PPL in Spark, you can run:

```
sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT"
```

## Code of Conduct

This project has adopted an [Open Source Code of Conduct](./CODE_OF_CONDUCT.md).
Expand Down
50 changes: 47 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ lazy val commonSettings = Seq(
Test / test := ((Test / test) dependsOn testScalastyle).value)

lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, sparkSqlApplication)
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand All @@ -61,6 +61,42 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
publish / skip := true)

lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
commonSettings,
name := "ppl-spark-integration",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"com.github.sbt" % "junit-interface" % "0.13.3" % "test"),
libraryDependencies ++= deps(sparkVersion),
// ANTLR settings
Antlr4 / antlr4Version := "4.8",
Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"),
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
// Assembly settings
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps @ _*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs @ _, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / test := (Test / test).value)

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
Expand Down Expand Up @@ -102,7 +138,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test")
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test" )
.settings(
commonSettings,
name := "integ-test",
Expand All @@ -118,7 +154,7 @@ lazy val integtest = (project in file("integ-test"))
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath += (flintSparkIntegration / assembly).value)
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value))

lazy val standaloneCosmetic = project
.settings(
Expand All @@ -144,6 +180,14 @@ lazy val sparkSqlApplicationCosmetic = project
exportJars := true,
Compile / packageBin := (sparkSqlApplication / assembly).value)

lazy val sparkPPLCosmetic = project
.settings(
name := "opensearch-spark-ppl",
commonSettings,
releaseSettings,
exportJars := true,
Compile / packageBin := (pplSparkIntegration / assembly).value)

lazy val releaseSettings = Seq(
publishMavenStyle := true,
publishArtifact := true,
Expand Down
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ High level API is dependent on query engine implementation. Please see Query Eng
#### Skipping Index

```sql
CREATE SKIPPING INDEX
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
( column <index_type> [, ...] )
WHERE <filter_predicate>
Expand Down Expand Up @@ -161,7 +161,7 @@ DROP SKIPPING INDEX ON alb_logs
#### Covering Index

```sql
CREATE INDEX name ON <object>
CREATE INDEX [IF NOT EXISTS] name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH ( options )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ skippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName
: CREATE SKIPPING INDEX (IF NOT EXISTS)?
ON tableName
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand All @@ -53,7 +54,8 @@ coveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName ON tableName
: CREATE INDEX (IF NOT EXISTS)? indexName
ON tableName
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand Down
3 changes: 3 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,12 @@ CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,18 @@ class FlintSpark(val spark: SparkSession) {
*
* @param index
* Flint index to create
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex): Unit = {
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
val indexName = index.name()
if (flintClient.exists(indexName)) {
throw new IllegalStateException(
s"A table can only have one Flint skipping index: Flint index $indexName is found")
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
flintClient.createIndex(indexName, index.metadata())
}
flintClient.createIndex(indexName, index.metadata())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {

/**
* Create Flint index.
*
* @param ignoreIfExists
* ignore existing index
*/
def create(): Unit = flint.createIndex(buildIndex())
def create(ignoreIfExists: Boolean = false): Unit =
flint.createIndex(buildIndex(), ignoreIfExists)

/**
* Build method for concrete builder class to implement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
indexBuilder.addIndexColumns(colName)
}

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
indexBuilder
.options(indexOptions)
.create()
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
indexBuilder
.options(indexOptions)
.create()
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,26 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
}
}

test("create covering index if not exists") {
sql(s"""
| CREATE INDEX IF NOT EXISTS $testIndex
| ON $testTable (name, age)
|""".stripMargin)
flint.describeIndex(testFlintIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
assertThrows[IllegalStateException] {
sql(s"""
| CREATE INDEX $testIndex
| ON $testTable (name, age)
|""".stripMargin)
}
sql(s"""
| CREATE INDEX IF NOT EXISTS $testIndex
| ON $testTable (name, age)
|""".stripMargin)
}

test("show all covering index on the source table") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index if not exists") {
sql(s"""
| CREATE SKIPPING INDEX
| IF NOT EXISTS
| ON $testTable ( year PARTITION )
| """.stripMargin)
flint.describeIndex(testIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
assertThrows[IllegalStateException] {
sql(s"""
| CREATE SKIPPING INDEX
| ON $testTable ( year PARTITION )
| """.stripMargin)
}
sql(s"""
| CREATE SKIPPING INDEX
| IF NOT EXISTS
| ON $testTable ( year PARTITION )
| """.stripMargin)
}

test("describe skipping index") {
flint
.skippingIndex()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.spark.sql.catalyst.expressions.{Alias, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}

/**
* general utility functions for ppl to spark transformation test
*/
trait LogicalPlanTestUtils {

/**
* utility function to compare two logical plans while ignoring the auto-generated expressionId
* associated with the alias which is used for projection or aggregation
* @param plan
* @return
*/
def compareByString(plan: LogicalPlan): String = {
// Create a rule to replace Alias's ExprId with a dummy id
val rule: PartialFunction[LogicalPlan, LogicalPlan] = {
case p: Project =>
val newProjections = p.projectList.map {
case alias: Alias =>
Alias(alias.child, alias.name)(exprId = ExprId(0), qualifier = alias.qualifier)
case other => other
}
p.copy(projectList = newProjections)

case agg: Aggregate =>
val newGrouping = agg.groupingExpressions.map {
case alias: Alias =>
Alias(alias.child, alias.name)(exprId = ExprId(0), qualifier = alias.qualifier)
case other => other
}
val newAggregations = agg.aggregateExpressions.map {
case alias: Alias =>
Alias(alias.child, alias.name)(exprId = ExprId(0), qualifier = alias.qualifier)
case other => other
}
agg.copy(groupingExpressions = newGrouping, aggregateExpressions = newAggregations)

case other => other
}

// Apply the rule using transform
val transformedPlan = plan.transform(rule)

// Return the string representation of the transformed plan
transformedPlan.toString
}

}
Loading

0 comments on commit a162cb5

Please sign in to comment.