Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ppl-spark-extension optimzer support & mvn publish #56

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/snapshot-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
- name: Publish to Local Maven
run: |
sbt standaloneCosmetic/publishM2
sbt sparkPPLCosmetic/publishM2
sbt sparkSqlApplicationCosmetic/publishM2

- uses: actions/checkout@v3
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ To use PPL to Spark translation, you can run Spark with PPL extension:
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
```

### Running With both Extension
```
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'"
```

## Build

To build and run this application with Spark, you can run:
Expand Down
59 changes: 59 additions & 0 deletions docs/PPL-on-Spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Running PPL On Spark Reference Manual

## Overview

This module provides the support for running [PPL](https://github.com/opensearch-project/piped-processing-language) queries on Spark using direct logical plan
translation between PPL's logical plan to Spark's Catalyst logical plan.

### What is PPL ?
OpenSearch PPL, or Pipe Processing Language, is a query language used with the OpenSearch platform and now Apache Spark.
PPL allows users to retrieve, query, and analyze data by using commands that are piped together, making it easier to understand and compose complex queries.
Its syntax is inspired by Unix pipes, which enables chaining of commands to transform and process data.
With PPL, users can filter, aggregate, and visualize data in multiple datasources in a more intuitive manner compared to traditional query languages

### Context

The next concepts are the main purpose of introduction this functionality:
- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals)
- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution.
- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins
- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community


### Running PPL Commands:

In order to run PPL commands, you will need to perform the following tasks:

#### PPL Build & Run

To build and run this PPL in Spark, you can run:

```
sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT"
```

### PPL Extension Usage

To use PPL to Spark translation, you can run Spark with PPL extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
```

### Running With both Flint & PPL Extensions
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT`) to the cluster's
classpath.

Next need to configure both extensions :
```
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'"
```

Once this is done, spark will allow both extensions to parse the query (SQL / PPL) and allow the correct execution of the query.
In addition, PPL queries will enjoy the acceleration capabilities supported by the Flint plugins as described [here](index.md)

Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark
package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions}

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

trait FlintPPLSuite extends SharedSparkSession {
override protected def sparkConf = {
override protected def sparkConf: SparkConf = {
val conf = new SparkConf()
.set("spark.ui.enabled", "false")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
Expand All @@ -21,7 +24,12 @@ trait FlintPPLSuite extends SharedSparkSession {
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set("spark.sql.extensions", classOf[FlintPPLSparkExtensions].getName)
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
.set(
"spark.sql.extensions",
List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlintSparkExtensions is accessible to ppl module?

Copy link
Member Author

@YANG-DB YANG-DB Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used only in the integ-test project which is depended on both flint & ppl plugins
PPL module is not dependent on flint module

.mkString(", "))
.set(OPTIMIZER_RULE_ENABLED.key, "false")
conf
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, Divide, Floor, Literal, Multiply, SortOrder}
Expand All @@ -20,7 +18,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
with StreamTest {

/** Test table and index name */
private val testTable = "default.flint_ppl_test"
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -92,7 +90,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(UnresolvedFunction(Seq("COUNT"), Seq(ageField), isDistinct = false), "count(age)")()
Expand Down Expand Up @@ -132,7 +130,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")()
Expand Down Expand Up @@ -161,7 +159,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")()
Expand Down Expand Up @@ -203,7 +201,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val countryField = UnresolvedAttribute("country")
val countryAlias = Alias(countryField, "country")()

Expand Down Expand Up @@ -239,7 +237,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val countryField = UnresolvedAttribute("country")
val countryAlias = Alias(countryField, "country")()

Expand Down Expand Up @@ -272,7 +270,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val countryField = UnresolvedAttribute("country")
val countryAlias = Alias(countryField, "country")()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder}
Expand All @@ -20,7 +18,7 @@ class FlintSparkPPLAggregationsITSuite
with StreamTest {

/** Test table and index name */
private val testTable = "default.flint_ppl_test"
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -86,7 +84,7 @@ class FlintSparkPPLAggregationsITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val aggregateExpressions =
Seq(Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")())
val aggregatePlan = Aggregate(Seq(), aggregateExpressions, table)
Expand Down Expand Up @@ -116,7 +114,7 @@ class FlintSparkPPLAggregationsITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val filterExpr = LessThan(ageField, Literal(50))
val filterPlan = Filter(filterExpr, table)
val aggregateExpressions =
Expand Down Expand Up @@ -148,7 +146,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -177,7 +175,7 @@ class FlintSparkPPLAggregationsITSuite
// Define the expected logical plan
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -213,7 +211,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -248,7 +246,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -283,7 +281,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -317,7 +315,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -358,7 +356,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -396,7 +394,7 @@ class FlintSparkPPLAggregationsITSuite
val stateField = UnresolvedAttribute("state")
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down
Loading
Loading