diff --git a/.github/workflows/snapshot-publish.yml b/.github/workflows/snapshot-publish.yml index 7dae546b3..6025a3b02 100644 --- a/.github/workflows/snapshot-publish.yml +++ b/.github/workflows/snapshot-publish.yml @@ -29,6 +29,7 @@ jobs: - name: Publish to Local Maven run: | sbt standaloneCosmetic/publishM2 + sbt sparkPPLCosmetic/publishM2 sbt sparkSqlApplicationCosmetic/publishM2 - uses: actions/checkout@v3 diff --git a/README.md b/README.md index 2f957a40a..973e0d0aa 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,11 @@ To use PPL to Spark translation, you can run Spark with PPL extension: spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions" ``` +### Running With both Extension +``` +spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'" +``` + ## Build To build and run this application with Spark, you can run: diff --git a/docs/PPL-on-Spark.md b/docs/PPL-on-Spark.md new file mode 100644 index 000000000..d94ee8037 --- /dev/null +++ b/docs/PPL-on-Spark.md @@ -0,0 +1,59 @@ +# Running PPL On Spark Reference Manual + +## Overview + +This module provides the support for running [PPL](https://github.com/opensearch-project/piped-processing-language) queries on Spark using direct logical plan +translation between PPL's logical plan to Spark's Catalyst logical plan. + +### What is PPL ? +OpenSearch PPL, or Pipe Processing Language, is a query language used with the OpenSearch platform and now Apache Spark. +PPL allows users to retrieve, query, and analyze data by using commands that are piped together, making it easier to understand and compose complex queries. +Its syntax is inspired by Unix pipes, which enables chaining of commands to transform and process data. +With PPL, users can filter, aggregate, and visualize data in multiple datasources in a more intuitive manner compared to traditional query languages + +### Context + +The next concepts are the main purpose of introduction this functionality: +- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals) +- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. +- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution. +- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins +- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community + + +### Running PPL Commands: + +In order to run PPL commands, you will need to perform the following tasks: + +#### PPL Build & Run + +To build and run this PPL in Spark, you can run: + +``` +sbt clean sparkPPLCosmetic/publishM2 +``` +then add org.opensearch:opensearch-spark_2.12 when run spark application, for example, +``` +bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT" +``` + +### PPL Extension Usage + +To use PPL to Spark translation, you can run Spark with PPL extension: + +``` +spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions" +``` + +### Running With both Flint & PPL Extensions +In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT`) to the cluster's +classpath. + +Next need to configure both extensions : +``` +spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'" +``` + +Once this is done, spark will allow both extensions to parse the query (SQL / PPL) and allow the correct execution of the query. +In addition, PPL queries will enjoy the acceleration capabilities supported by the Flint plugins as described [here](index.md) + diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala similarity index 63% rename from ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala index 450f21c63..be43447fe 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala @@ -3,16 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.spark +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions} import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation +import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession trait FlintPPLSuite extends SharedSparkSession { - override protected def sparkConf = { + override protected def sparkConf: SparkConf = { val conf = new SparkConf() .set("spark.ui.enabled", "false") .set(SQLConf.CODEGEN_FALLBACK.key, "false") @@ -21,7 +24,12 @@ trait FlintPPLSuite extends SharedSparkSession { // LocalRelation will exercise the optimization rules better by disabling it as // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. - .set("spark.sql.extensions", classOf[FlintPPLSparkExtensions].getName) + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + .set( + "spark.sql.extensions", + List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName) + .mkString(", ")) + .set(OPTIMIZER_RULE_ENABLED.key, "false") conf } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala index 70951ad27..8dfde6c94 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, Divide, Floor, Literal, Multiply, SortOrder} @@ -20,7 +18,7 @@ class FlintSparkPPLAggregationWithSpanITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_test" + private val testTable = "spark_catalog.default.flint_ppl_test" override def beforeAll(): Unit = { super.beforeAll() @@ -92,7 +90,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("COUNT"), Seq(ageField), isDistinct = false), "count(age)")() @@ -132,7 +130,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")() @@ -161,7 +159,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")() @@ -203,7 +201,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val countryField = UnresolvedAttribute("country") val countryAlias = Alias(countryField, "country")() @@ -239,7 +237,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val countryField = UnresolvedAttribute("country") val countryAlias = Alias(countryField, "country")() @@ -272,7 +270,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val countryField = UnresolvedAttribute("country") val countryAlias = Alias(countryField, "country")() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala index 0a58a039d..e8533d831 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder} @@ -20,7 +18,7 @@ class FlintSparkPPLAggregationsITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_test" + private val testTable = "spark_catalog.default.flint_ppl_test" override def beforeAll(): Unit = { super.beforeAll() @@ -86,7 +84,7 @@ class FlintSparkPPLAggregationsITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Seq(Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")()) val aggregatePlan = Aggregate(Seq(), aggregateExpressions, table) @@ -116,7 +114,7 @@ class FlintSparkPPLAggregationsITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = LessThan(ageField, Literal(50)) val filterPlan = Filter(filterExpr, table) val aggregateExpressions = @@ -148,7 +146,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -177,7 +175,7 @@ class FlintSparkPPLAggregationsITSuite // Define the expected logical plan val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -213,7 +211,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -248,7 +246,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -283,7 +281,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -317,7 +315,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -358,7 +356,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -396,7 +394,7 @@ class FlintSparkPPLAggregationsITSuite val stateField = UnresolvedAttribute("state") val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala index 0a4810b01..fb46ce4de 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} @@ -20,11 +18,10 @@ class FlintSparkPPLFiltersITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_test" + private val testTable = "spark_catalog.default.flint_ppl_test" override def beforeAll(): Unit = { super.beforeAll() - // Create test table // Update table creation sql(s""" @@ -83,7 +80,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = EqualTo(UnresolvedAttribute("age"), Literal(25)) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -110,7 +107,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = And( GreaterThan(UnresolvedAttribute("age"), Literal(10)), Not(EqualTo(UnresolvedAttribute("country"), Literal("USA")))) @@ -137,7 +134,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = And( GreaterThan(UnresolvedAttribute("age"), Literal(10)), Not(EqualTo(UnresolvedAttribute("country"), Literal("USA")))) @@ -168,7 +165,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = Or( LessThanOrEqual(UnresolvedAttribute("age"), Literal(20)), EqualTo(UnresolvedAttribute("country"), Literal("USA"))) @@ -192,7 +189,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = Or( LessThanOrEqual(UnresolvedAttribute("age"), Literal(20)), EqualTo(UnresolvedAttribute("country"), Literal("USA"))) @@ -221,7 +218,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = GreaterThan(UnresolvedAttribute("age"), Literal(25)) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -248,7 +245,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = LessThanOrEqual(UnresolvedAttribute("age"), Literal(65)) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -273,7 +270,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = LessThanOrEqual(UnresolvedAttribute("age"), Literal(65)) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -301,7 +298,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = EqualTo(UnresolvedAttribute("name"), Literal("Jake")) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -328,7 +325,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = Not(EqualTo(UnresolvedAttribute("name"), Literal("Jake"))) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -356,7 +353,7 @@ class FlintSparkPPLFiltersITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")() @@ -385,7 +382,7 @@ class FlintSparkPPLFiltersITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")() @@ -432,7 +429,7 @@ class FlintSparkPPLFiltersITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val countryField = UnresolvedAttribute("country") val countryAlias = Alias(countryField, "country")() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala index 848c2af2c..9dea04872 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} @@ -20,11 +18,10 @@ class FlintSparkPPLITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_test" + private val testTable = "spark_catalog.default.flint_ppl_test" override def beforeAll(): Unit = { super.beforeAll() - // Create test table // Update table creation sql(s""" @@ -88,7 +85,9 @@ class FlintSparkPPLITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan val expectedPlan: LogicalPlan = - Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("default", "flint_ppl_test"))) + Project( + Seq(UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Compare the two plans assert(expectedPlan === logicalPlan) } @@ -106,7 +105,7 @@ class FlintSparkPPLITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan val limitPlan: LogicalPlan = - Limit(Literal(2), UnresolvedRelation(Seq("default", "flint_ppl_test"))) + Limit(Literal(2), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) val expectedPlan = Project(Seq(UnresolvedStar(None)), limitPlan) // Compare the two plans @@ -130,7 +129,7 @@ class FlintSparkPPLITSuite Sort( Seq(SortOrder(UnresolvedAttribute("name"), Ascending)), global = true, - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val expectedPlan: LogicalPlan = @@ -159,7 +158,7 @@ class FlintSparkPPLITSuite // Define the expected logical plan val expectedPlan: LogicalPlan = Project( Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Compare the two plans assert(expectedPlan === logicalPlan) } @@ -183,7 +182,7 @@ class FlintSparkPPLITSuite Sort( Seq(SortOrder(UnresolvedAttribute("age"), Ascending)), global = true, - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val expectedPlan: LogicalPlan = @@ -206,7 +205,7 @@ class FlintSparkPPLITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val project = Project( Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val limitPlan: LogicalPlan = Limit(Literal(1), project) val expectedPlan: LogicalPlan = Project(Seq(UnresolvedStar(None)), limitPlan) @@ -227,7 +226,7 @@ class FlintSparkPPLITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val project = Project( Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val limitPlan: LogicalPlan = Limit(Literal(1), project) val sortedPlan: LogicalPlan = diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala index 40bcbdcb9..df77e0d90 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala @@ -7,8 +7,6 @@ package org.opensearch.flint.spark.ppl import java.sql.Timestamp -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, Floor, GenericRowWithSchema, Literal, Multiply, SortOrder, TimeWindow} @@ -22,11 +20,10 @@ class FlintSparkPPLTimeWindowITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_sales_test" + private val testTable = "spark_catalog.default.flint_ppl_sales_test" override def beforeAll(): Unit = { super.beforeAll() - // Create test table // Update table creation sql(s""" @@ -130,7 +127,7 @@ class FlintSparkPPLTimeWindowITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val productsAmount = UnresolvedAttribute("productsAmount") - val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val windowExpression = Alias( TimeWindow( @@ -189,7 +186,7 @@ class FlintSparkPPLTimeWindowITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val productsAmount = UnresolvedAttribute("productsAmount") - val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val windowExpression = Alias( TimeWindow( @@ -291,7 +288,7 @@ class FlintSparkPPLTimeWindowITSuite val star = Seq(UnresolvedStar(None)) val productsId = Alias(UnresolvedAttribute("productId"), "productId")() val productsAmount = UnresolvedAttribute("productsAmount") - val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val windowExpression = Alias( TimeWindow( @@ -351,7 +348,7 @@ class FlintSparkPPLTimeWindowITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val productsAmount = UnresolvedAttribute("productsAmount") - val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val windowExpression = Alias( TimeWindow( @@ -394,7 +391,7 @@ class FlintSparkPPLTimeWindowITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("COUNT"), Seq(ageField), isDistinct = false), "count(age)")() diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index a824cfaaf..7e44b39e4 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -7,9 +7,11 @@ translation between PPL's logical plan to Spark's Catalyst logical plan. The next concepts are the main purpose of introduction this functionality: - Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals) - Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. -- Seamlessly Interact with different datasources (S3 / Prometheus / data-lake) from within OpenSearch +- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution. +- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins - Improve and promote PPL to become extensible and general purpose query language to be adopted by the community + Acknowledging spark is an excellent conduit for promoting these goals and showcasing the capabilities of PPL to interact & federate data across multiple sources and domains. Another byproduct of introducing PPL on spark would be the much anticipated JOIN capability that will emerge from the usage of Spark compute engine.