Skip to content

Commit

Permalink
PPL to Spark translation (#33)
Browse files Browse the repository at this point in the history
* adding support for containerized flint with spark / Livy docker-compose.yml

Signed-off-by: YANGDB <[email protected]>

* adding support for containerized flint with spark / Livy docker-compose.yml

Signed-off-by: YANGDB <[email protected]>

* adding support for containerized flint with spark / Livy docker-compose.yml

Signed-off-by: YANGDB <[email protected]>

* adding support for containerized flint with spark / Livy docker-compose.yml

Signed-off-by: YANGDB <[email protected]>

* update ppl ast builder

Signed-off-by: YANGDB <[email protected]>

* add ppl ast components
add ppl statement logical plan elements
add ppl parser components
add ppl expressions components

Signed-off-by: YANGDB <[email protected]>

* populate ppl test suit for covering different types of PPL queries

Signed-off-by: YANGDB <[email protected]>

* update additional tests

Signed-off-by: YANGDB <[email protected]>

* separate ppl-spark code into a dedicated module

Signed-off-by: YANGDB <[email protected]>

* add ppl translation of simple filter and data-type literal expression

Signed-off-by: YANGDB <[email protected]>

* remove none-used ppl ast builder

Signed-off-by: YANGDB <[email protected]>

* add log-plan test results validation

Signed-off-by: YANGDB <[email protected]>

* add support for multiple table selection using union

Signed-off-by: YANGDB <[email protected]>

* add support for multiple table selection using union

Signed-off-by: YANGDB <[email protected]>

* update sbt with new IT test suite for PPL module

Signed-off-by: YANGDB <[email protected]>

* update ppl IT suite test

Signed-off-by: YANGDB <[email protected]>

* update ppl IT suite dependencies

Signed-off-by: YANGDB <[email protected]>

* add tests for ppl IT with
 -  source = $testTable
 -  source = $testTable | fields name, age
 -  source = $testTable age=25 | fields name, age

Signed-off-by: YANGDB <[email protected]>

* update literal transformations according to catalyst's convention

Signed-off-by: YANGDB <[email protected]>

* separate unit-tests into a dedicated file per each test category

Signed-off-by: YANGDB <[email protected]>

* add IT tests for additional filters

Signed-off-by: YANGDB <[email protected]>

* mark unsatisfied tests as ignored until supporting code is ready

Signed-off-by: YANGDB <[email protected]>

* add README.md design and implementation details
add AggregateFunction translation & tests
remove unused DSL builder

Signed-off-by: YANGDB <[email protected]>

* remove docker related files

Signed-off-by: YANGDB <[email protected]>

* add text related unwrapping bug - fix
add actual ppl based table content fetch and verification

Signed-off-by: YANGDB <[email protected]>

* add AggregatorTranslator support

Signed-off-by: YANGDB <[email protected]>

* resolve group by issues

Signed-off-by: YANGDB <[email protected]>

* add generic ppl extension chain which registers a chain of parsers

Signed-off-by: YANGDB <[email protected]>

* update some tests

Signed-off-by: YANGDB <[email protected]>

* add filter test with stats

Signed-off-by: YANGDB <[email protected]>

* add support for AND / OR

Signed-off-by: YANGDB <[email protected]>

* add additional unit tests support for AND / OR

Signed-off-by: YANGDB <[email protected]>

* add Max,Min,Count,Sum aggregation functions support

Signed-off-by: YANGDB <[email protected]>

* add basic span support for aggregate based queries

Signed-off-by: YANGDB <[email protected]>

* update supported PPL and roadmap for future support ppl commands...

Signed-off-by: YANGDB <[email protected]>

* update readme doc

Signed-off-by: YANGDB <[email protected]>

* add `head` support
add README.md details for supported commands and planned future support

Signed-off-by: YANGDB <[email protected]>

* add support for sort command
add missing license header
update supported command in readme

Signed-off-by: YANGDB <[email protected]>

* update supported command in readme

Signed-off-by: YANGDB <[email protected]>

* update according to PR comments & review

Signed-off-by: YANGDB <[email protected]>

* update span & alias group by tests and composition

Signed-off-by: YANGDB <[email protected]>

* update scalastyle

Signed-off-by: YANGDB <[email protected]>

* update scalastyle

Signed-off-by: YANGDB <[email protected]>

* update scalastyle

Signed-off-by: YANGDB <[email protected]>

* update scalastyle

Signed-off-by: YANGDB <[email protected]>

* continue update according to PR comments

Signed-off-by: YANGDB <[email protected]>

* continue update according to PR comments

Signed-off-by: YANGDB <[email protected]>

* continue update according to PR comments

Signed-off-by: YANGDB <[email protected]>

* adding window function support for time based spans

Signed-off-by: YANGDB <[email protected]>

* adding window function test
updating the PPL to Spark README.md

Signed-off-by: YANGDB <[email protected]>

* scalastyle updates

Signed-off-by: YANGDB <[email protected]>

* update abt build and README.md

Signed-off-by: YANGDB <[email protected]>

* update ppl CatalystPlan visitor to produce the logical plan as part of the visitor instead of String

Signed-off-by: YANGDB <[email protected]>

* update ppl tests & IT tests

Signed-off-by: YANGDB <[email protected]>

* update scala style

Signed-off-by: YANGDB <[email protected]>

* update scala style

Signed-off-by: YANGDB <[email protected]>

* minor refactory & package movement

Signed-off-by: YANGDB <[email protected]>

* additional refactory update the limit / sort visitor functions

Signed-off-by: YANGDB <[email protected]>

* update scala style formattings

Signed-off-by: YANGDB <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored Sep 27, 2023
1 parent d32879b commit 7e6a19f
Show file tree
Hide file tree
Showing 93 changed files with 10,021 additions and 7 deletions.
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sbt scalafmtAll
```
The code style is automatically checked, but users can also manually check it.
```
sbt sbt scalastyle
sbt scalastyle
```
For IntelliJ user, read more in [scalafmt IntelliJ](https://scalameta.org/scalafmt/docs/installation.html#intellij) to integrate
scalafmt with IntelliJ
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ OpenSearch Flint is ... It consists of two modules:

- `flint-core`: a module that contains Flint specification and client.
- `flint-spark-integration`: a module that provides Spark integration for Flint and derived dataset based on it.
- `ppl-spark-integration`: a module that provides PPL query execution on top of Spark See [PPL repository](https://github.com/opensearch-project/piped-processing-language).

## Documentation

Please refer to the [Flint Index Reference Manual](./docs/index.md) for more information.
For PPL language see [PPL Reference Manual](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst) for more information.

## Prerequisites

Expand All @@ -17,14 +19,22 @@ Version compatibility:
|---------------|-------------|---------------|---------------|------------|
| 0.1.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |

## Usage
## Flint Extension Usage

To use this application, you can run Spark with Flint extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintSparkExtensions"
```

## PPL Extension Usage

To use PPL to Spark translation, you can run Spark with PPL extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
```

## Build

To build and run this application with Spark, you can run:
Expand All @@ -37,6 +47,18 @@ then add org.opensearch:opensearch-spark_2.12 when run spark application, for ex
bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT"
```

### PPL Build & Run

To build and run this PPL in Spark, you can run:

```
sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT"
```

## Code of Conduct

This project has adopted an [Open Source Code of Conduct](./CODE_OF_CONDUCT.md).
Expand Down
50 changes: 47 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ lazy val commonSettings = Seq(
Test / test := ((Test / test) dependsOn testScalastyle).value)

lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, sparkSqlApplication)
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand All @@ -61,6 +61,42 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
publish / skip := true)

lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
commonSettings,
name := "ppl-spark-integration",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"com.github.sbt" % "junit-interface" % "0.13.3" % "test"),
libraryDependencies ++= deps(sparkVersion),
// ANTLR settings
Antlr4 / antlr4Version := "4.8",
Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"),
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
// Assembly settings
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps @ _*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs @ _, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / test := (Test / test).value)

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
Expand Down Expand Up @@ -102,7 +138,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test")
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test" )
.settings(
commonSettings,
name := "integ-test",
Expand All @@ -118,7 +154,7 @@ lazy val integtest = (project in file("integ-test"))
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath += (flintSparkIntegration / assembly).value)
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value))

lazy val standaloneCosmetic = project
.settings(
Expand All @@ -144,6 +180,14 @@ lazy val sparkSqlApplicationCosmetic = project
exportJars := true,
Compile / packageBin := (sparkSqlApplication / assembly).value)

lazy val sparkPPLCosmetic = project
.settings(
name := "opensearch-spark-ppl",
commonSettings,
releaseSettings,
exportJars := true,
Compile / packageBin := (pplSparkIntegration / assembly).value)

lazy val releaseSettings = Seq(
publishMavenStyle := true,
publishArtifact := true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.spark.sql.catalyst.expressions.{Alias, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}

/**
* general utility functions for ppl to spark transformation test
*/
trait LogicalPlanTestUtils {

/**
* utility function to compare two logical plans while ignoring the auto-generated expressionId
* associated with the alias which is used for projection or aggregation
* @param plan
* @return
*/
def compareByString(plan: LogicalPlan): String = {
// Create a rule to replace Alias's ExprId with a dummy id
val rule: PartialFunction[LogicalPlan, LogicalPlan] = {
case p: Project =>
val newProjections = p.projectList.map {
case alias: Alias =>
Alias(alias.child, alias.name)(exprId = ExprId(0), qualifier = alias.qualifier)
case other => other
}
p.copy(projectList = newProjections)

case agg: Aggregate =>
val newGrouping = agg.groupingExpressions.map {
case alias: Alias =>
Alias(alias.child, alias.name)(exprId = ExprId(0), qualifier = alias.qualifier)
case other => other
}
val newAggregations = agg.aggregateExpressions.map {
case alias: Alias =>
Alias(alias.child, alias.name)(exprId = ExprId(0), qualifier = alias.qualifier)
case other => other
}
agg.copy(groupingExpressions = newGrouping, aggregateExpressions = newAggregations)

case other => other
}

// Apply the rule using transform
val transformedPlan = plan.transform(rule)

// Return the string representation of the transformed plan
transformedPlan.toString
}

}
Loading

0 comments on commit 7e6a19f

Please sign in to comment.