Skip to content

Commit

Permalink
update ppl documentations and the ppl's integration test suite trait …
Browse files Browse the repository at this point in the history
…to include both flint & ppl to verify they can interact independently

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Oct 3, 2023
1 parent 60a84ac commit f124d60
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 53 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ To use PPL to Spark translation, you can run Spark with PPL extension:
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
```

### Running With both Extension
```
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'"
```

## Build

To build and run this application with Spark, you can run:
Expand Down
19 changes: 9 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,13 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
publish / skip := true)

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
commonSettings,
name := "flint-spark-integration",
name := "ppl-spark-integration",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
Expand All @@ -80,7 +77,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
libraryDependencies ++= deps(sparkVersion),
// ANTLR settings
Antlr4 / antlr4Version := "4.8",
Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"),
Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"),
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
// Assembly settings
Expand All @@ -100,14 +97,16 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
},
assembly / test := (Test / test).value)

lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.dependsOn(flintSparkIntegration)
.settings(
commonSettings,
name := "ppl-spark-integration",
name := "flint-spark-integration",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
Expand All @@ -117,7 +116,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
libraryDependencies ++= deps(sparkVersion),
// ANTLR settings
Antlr4 / antlr4Version := "4.8",
Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"),
Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"),
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
// Assembly settings
Expand Down
59 changes: 59 additions & 0 deletions docs/PPL-on-Spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Running PPL On Spark Reference Manual

## Overview

This module provides the support for running [PPL](https://github.com/opensearch-project/piped-processing-language) queries on Spark using direct logical plan
translation between PPL's logical plan to Spark's Catalyst logical plan.

### What is PPL ?
OpenSearch PPL, or Pipe Processing Language, is a query language used with the OpenSearch platform and now Apache Spark.
PPL allows users to retrieve, query, and analyze data by using commands that are piped together, making it easier to understand and compose complex queries.
Its syntax is inspired by Unix pipes, which enables chaining of commands to transform and process data.
With PPL, users can filter, aggregate, and visualize data in multiple datasources in a more intuitive manner compared to traditional query languages

### Context

The next concepts are the main purpose of introduction this functionality:
- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals)
- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution.
- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins
- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community


### Running PPL Commands:

In order to run PPL commands, you will need to perform the following tasks:

#### PPL Build & Run

To build and run this PPL in Spark, you can run:

```
sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT"
```

### PPL Extension Usage

To use PPL to Spark translation, you can run Spark with PPL extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
```

### Running With both Flint & PPL Extensions
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT`) to the cluster's
classpath.

Next need to configure both extensions :
```
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'"
```

Once this is done, spark will allow both extensions to parse the query (SQL / PPL) and allow the correct execution of the query.
In addition, PPL queries will enjoy the acceleration capabilities supported by the Flint plugins as described [here](index.md)

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark
package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions}

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
Expand All @@ -13,7 +15,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

trait FlintPPLSuite extends SharedSparkSession {
override protected def sparkConf = {
override protected def sparkConf: SparkConf = {
val conf = new SparkConf()
.set("spark.ui.enabled", "false")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
Expand All @@ -22,17 +24,12 @@ trait FlintPPLSuite extends SharedSparkSession {
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set("spark.sql.extensions", classOf[FlintPPLSparkExtensions].getName)
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
.set(
"spark.sql.extensions",
List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName)
.mkString(", "))
.set(OPTIMIZER_RULE_ENABLED.key, "false")
conf
}

private def withFlintOptimizerDisabled(block: => Unit): Unit = {
spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false")
try {
block
} finally {
spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "true")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, Divide, Floor, Literal, Multiply, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLAggregationWithSpanITSuite
Expand All @@ -25,7 +22,6 @@ class FlintSparkPPLAggregationWithSpanITSuite

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false")

// Create test table
// Update table creation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLAggregationsITSuite
Expand All @@ -25,7 +22,6 @@ class FlintSparkPPLAggregationsITSuite

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false")

// Create test table
// Update table creation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLFiltersITSuite
Expand All @@ -25,8 +22,6 @@ class FlintSparkPPLFiltersITSuite

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false")

// Create test table
// Update table creation
sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLITSuite
Expand All @@ -25,8 +22,6 @@ class FlintSparkPPLITSuite

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false")

// Create test table
// Update table creation
sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ package org.opensearch.flint.spark.ppl

import java.sql.Timestamp

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, Floor, GenericRowWithSchema, Literal, Multiply, SortOrder, TimeWindow}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLTimeWindowITSuite
Expand All @@ -27,9 +24,6 @@ class FlintSparkPPLTimeWindowITSuite

override def beforeAll(): Unit = {
super.beforeAll()
// disable optimization rule
spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false")

// Create test table
// Update table creation
sql(s"""
Expand Down
4 changes: 3 additions & 1 deletion ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ translation between PPL's logical plan to Spark's Catalyst logical plan.
The next concepts are the main purpose of introduction this functionality:
- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals)
- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
- Seamlessly Interact with different datasources (S3 / Prometheus / data-lake) from within OpenSearch
- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution.
- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins
- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community


Acknowledging spark is an excellent conduit for promoting these goals and showcasing the capabilities of PPL to interact & federate data across multiple sources and domains.

Another byproduct of introducing PPL on spark would be the much anticipated JOIN capability that will emerge from the usage of Spark compute engine.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.ppl.FlintSparkPPLParser

import org.apache.spark.sql.SparkSessionExtensions
Expand All @@ -19,9 +18,5 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectParser { (spark, parser) =>
new FlintSparkPPLParser(parser)
}

extensions.injectOptimizerRule { spark =>
new FlintSparkOptimizer(spark)
}
}
}

0 comments on commit f124d60

Please sign in to comment.