diff --git a/README.md b/README.md index 2f957a40a..973e0d0aa 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,11 @@ To use PPL to Spark translation, you can run Spark with PPL extension: spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions" ``` +### Running With both Extension +``` +spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'" +``` + ## Build To build and run this application with Spark, you can run: diff --git a/build.sbt b/build.sbt index ad1f5de2f..6b7c8d53a 100644 --- a/build.sbt +++ b/build.sbt @@ -61,16 +61,13 @@ lazy val flintCore = (project in file("flint-core")) exclude ("com.fasterxml.jackson.core", "jackson-databind")), publish / skip := true) -lazy val flintSparkIntegration = (project in file("flint-spark-integration")) - .dependsOn(flintCore) +lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) .enablePlugins(AssemblyPlugin, Antlr4Plugin) .settings( commonSettings, - name := "flint-spark-integration", + name := "ppl-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( - "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" - exclude ("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -80,7 +77,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) libraryDependencies ++= deps(sparkVersion), // ANTLR settings Antlr4 / antlr4Version := "4.8", - Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), + Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings @@ -100,14 +97,16 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) }, assembly / test := (Test / test).value) -lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) +lazy val flintSparkIntegration = (project in file("flint-spark-integration")) + .dependsOn(flintCore) .enablePlugins(AssemblyPlugin, Antlr4Plugin) - .dependsOn(flintSparkIntegration) .settings( commonSettings, - name := "ppl-spark-integration", + name := "flint-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( + "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" + exclude ("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -117,7 +116,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) libraryDependencies ++= deps(sparkVersion), // ANTLR settings Antlr4 / antlr4Version := "4.8", - Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"), + Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings diff --git a/docs/PPL-on-Spark.md b/docs/PPL-on-Spark.md new file mode 100644 index 000000000..d94ee8037 --- /dev/null +++ b/docs/PPL-on-Spark.md @@ -0,0 +1,59 @@ +# Running PPL On Spark Reference Manual + +## Overview + +This module provides the support for running [PPL](https://github.com/opensearch-project/piped-processing-language) queries on Spark using direct logical plan +translation between PPL's logical plan to Spark's Catalyst logical plan. + +### What is PPL ? +OpenSearch PPL, or Pipe Processing Language, is a query language used with the OpenSearch platform and now Apache Spark. +PPL allows users to retrieve, query, and analyze data by using commands that are piped together, making it easier to understand and compose complex queries. +Its syntax is inspired by Unix pipes, which enables chaining of commands to transform and process data. +With PPL, users can filter, aggregate, and visualize data in multiple datasources in a more intuitive manner compared to traditional query languages + +### Context + +The next concepts are the main purpose of introduction this functionality: +- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals) +- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. +- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution. +- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins +- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community + + +### Running PPL Commands: + +In order to run PPL commands, you will need to perform the following tasks: + +#### PPL Build & Run + +To build and run this PPL in Spark, you can run: + +``` +sbt clean sparkPPLCosmetic/publishM2 +``` +then add org.opensearch:opensearch-spark_2.12 when run spark application, for example, +``` +bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT" +``` + +### PPL Extension Usage + +To use PPL to Spark translation, you can run Spark with PPL extension: + +``` +spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions" +``` + +### Running With both Flint & PPL Extensions +In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT`) to the cluster's +classpath. + +Next need to configure both extensions : +``` +spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'" +``` + +Once this is done, spark will allow both extensions to parse the query (SQL / PPL) and allow the correct execution of the query. +In addition, PPL queries will enjoy the acceleration capabilities supported by the Flint plugins as described [here](index.md) + diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala similarity index 68% rename from ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala index 81fdc90c9..be43447fe 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala @@ -3,7 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.spark +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions} import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode @@ -13,7 +15,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession trait FlintPPLSuite extends SharedSparkSession { - override protected def sparkConf = { + override protected def sparkConf: SparkConf = { val conf = new SparkConf() .set("spark.ui.enabled", "false") .set(SQLConf.CODEGEN_FALLBACK.key, "false") @@ -22,17 +24,12 @@ trait FlintPPLSuite extends SharedSparkSession { // LocalRelation will exercise the optimization rules better by disabling it as // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. - .set("spark.sql.extensions", classOf[FlintPPLSparkExtensions].getName) + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + .set( + "spark.sql.extensions", + List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName) + .mkString(", ")) + .set(OPTIMIZER_RULE_ENABLED.key, "false") conf } - - private def withFlintOptimizerDisabled(block: => Unit): Unit = { - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") - try { - block - } finally { - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "true") - } - } - } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala index add5ae47d..8dfde6c94 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, Divide, Floor, Literal, Multiply, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLAggregationWithSpanITSuite @@ -25,7 +22,6 @@ class FlintSparkPPLAggregationWithSpanITSuite override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala index 3c4caf83b..e8533d831 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLAggregationsITSuite @@ -25,7 +22,6 @@ class FlintSparkPPLAggregationsITSuite override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala index fe75ff49b..fb46ce4de 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLFiltersITSuite @@ -25,8 +22,6 @@ class FlintSparkPPLFiltersITSuite override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") - // Create test table // Update table creation sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala index dd623fe70..9dea04872 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLITSuite @@ -25,8 +22,6 @@ class FlintSparkPPLITSuite override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") - // Create test table // Update table creation sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala index e234806d8..df77e0d90 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala @@ -7,13 +7,10 @@ package org.opensearch.flint.spark.ppl import java.sql.Timestamp -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, Floor, GenericRowWithSchema, Literal, Multiply, SortOrder, TimeWindow} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLTimeWindowITSuite @@ -27,9 +24,6 @@ class FlintSparkPPLTimeWindowITSuite override def beforeAll(): Unit = { super.beforeAll() - // disable optimization rule - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") - // Create test table // Update table creation sql(s""" diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index a824cfaaf..7e44b39e4 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -7,9 +7,11 @@ translation between PPL's logical plan to Spark's Catalyst logical plan. The next concepts are the main purpose of introduction this functionality: - Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals) - Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. -- Seamlessly Interact with different datasources (S3 / Prometheus / data-lake) from within OpenSearch +- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution. +- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins - Improve and promote PPL to become extensible and general purpose query language to be adopted by the community + Acknowledging spark is an excellent conduit for promoting these goals and showcasing the capabilities of PPL to interact & federate data across multiple sources and domains. Another byproduct of introducing PPL on spark would be the much anticipated JOIN capability that will emerge from the usage of Spark compute engine. diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala index ba96ec1ed..26ad4b69b 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala @@ -5,7 +5,6 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.ppl.FlintSparkPPLParser import org.apache.spark.sql.SparkSessionExtensions @@ -19,9 +18,5 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) { extensions.injectParser { (spark, parser) => new FlintSparkPPLParser(parser) } - - extensions.injectOptimizerRule { spark => - new FlintSparkOptimizer(spark) - } } }