From 00192bab8c965870b25b7532ca4a6b3bf0938229 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 2 Oct 2023 15:05:33 -0700 Subject: [PATCH 1/3] add optimizer support for flint based acceleration queries - also using the PPL add ppl-spark-extension jar mvn publish Signed-off-by: YANGDB --- .github/workflows/snapshot-publish.yml | 1 + build.sbt | 19 ++++++++++--------- .../flint/spark/FlintPPLSparkExtensions.scala | 7 +++++++ 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/.github/workflows/snapshot-publish.yml b/.github/workflows/snapshot-publish.yml index 7dae546b3..6025a3b02 100644 --- a/.github/workflows/snapshot-publish.yml +++ b/.github/workflows/snapshot-publish.yml @@ -29,6 +29,7 @@ jobs: - name: Publish to Local Maven run: | sbt standaloneCosmetic/publishM2 + sbt sparkPPLCosmetic/publishM2 sbt sparkSqlApplicationCosmetic/publishM2 - uses: actions/checkout@v3 diff --git a/build.sbt b/build.sbt index 6b7c8d53a..ad1f5de2f 100644 --- a/build.sbt +++ b/build.sbt @@ -61,13 +61,16 @@ lazy val flintCore = (project in file("flint-core")) exclude ("com.fasterxml.jackson.core", "jackson-databind")), publish / skip := true) -lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) +lazy val flintSparkIntegration = (project in file("flint-spark-integration")) + .dependsOn(flintCore) .enablePlugins(AssemblyPlugin, Antlr4Plugin) .settings( commonSettings, - name := "ppl-spark-integration", + name := "flint-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( + "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" + exclude ("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -77,7 +80,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) libraryDependencies ++= deps(sparkVersion), // ANTLR settings Antlr4 / antlr4Version := "4.8", - Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"), + Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings @@ -97,16 +100,14 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) }, assembly / test := (Test / test).value) -lazy val flintSparkIntegration = (project in file("flint-spark-integration")) - .dependsOn(flintCore) +lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) .enablePlugins(AssemblyPlugin, Antlr4Plugin) + .dependsOn(flintSparkIntegration) .settings( commonSettings, - name := "flint-spark-integration", + name := "ppl-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( - "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" - exclude ("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -116,7 +117,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) libraryDependencies ++= deps(sparkVersion), // ANTLR settings Antlr4 / antlr4Version := "4.8", - Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), + Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala index 26ad4b69b..3379048a2 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala @@ -5,6 +5,7 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.ppl.FlintSparkPPLParser import org.apache.spark.sql.SparkSessionExtensions @@ -18,5 +19,11 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) { extensions.injectParser { (spark, parser) => new FlintSparkPPLParser(parser) } + + extensions.injectFunction(TumbleFunction.description) + + extensions.injectOptimizerRule { spark => + new FlintSparkOptimizer(spark) + } } } From 60a84ac9e1166a2bc8a1a6eb9d745d19cd8e5cfa Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 2 Oct 2023 16:56:24 -0700 Subject: [PATCH 2/3] updated the FQN for the tables ("spark_catalog","default","flint_ppl_test") disabled the Optimization rules during the IT Signed-off-by: YANGDB --- ...ntSparkPPLAggregationWithSpanITSuite.scala | 16 +++++----- .../FlintSparkPPLAggregationsITSuite.scala | 24 ++++++++------- .../ppl/FlintSparkPPLFiltersITSuite.scala | 30 ++++++++++--------- .../spark/ppl/FlintSparkPPLITSuite.scala | 20 ++++++++----- .../ppl/FlintSparkPPLTimeWindowITSuite.scala | 15 ++++++---- .../flint/spark/FlintPPLSparkExtensions.scala | 2 -- .../flint/spark/FlintPPLSuite.scala | 11 +++++++ 7 files changed, 70 insertions(+), 48 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala index 70951ad27..add5ae47d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala @@ -11,6 +11,7 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, Divide, Floor, Literal, Multiply, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLAggregationWithSpanITSuite @@ -20,10 +21,11 @@ class FlintSparkPPLAggregationWithSpanITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_test" + private val testTable = "spark_catalog.default.flint_ppl_test" override def beforeAll(): Unit = { super.beforeAll() + spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation @@ -92,7 +94,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("COUNT"), Seq(ageField), isDistinct = false), "count(age)")() @@ -132,7 +134,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")() @@ -161,7 +163,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")() @@ -203,7 +205,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val countryField = UnresolvedAttribute("country") val countryAlias = Alias(countryField, "country")() @@ -239,7 +241,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val countryField = UnresolvedAttribute("country") val countryAlias = Alias(countryField, "country")() @@ -272,7 +274,7 @@ class FlintSparkPPLAggregationWithSpanITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val countryField = UnresolvedAttribute("country") val countryAlias = Alias(countryField, "country")() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala index 0a58a039d..3c4caf83b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala @@ -11,6 +11,7 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLAggregationsITSuite @@ -20,10 +21,11 @@ class FlintSparkPPLAggregationsITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_test" + private val testTable = "spark_catalog.default.flint_ppl_test" override def beforeAll(): Unit = { super.beforeAll() + spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation @@ -86,7 +88,7 @@ class FlintSparkPPLAggregationsITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Seq(Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")()) val aggregatePlan = Aggregate(Seq(), aggregateExpressions, table) @@ -116,7 +118,7 @@ class FlintSparkPPLAggregationsITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = LessThan(ageField, Literal(50)) val filterPlan = Filter(filterExpr, table) val aggregateExpressions = @@ -148,7 +150,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -177,7 +179,7 @@ class FlintSparkPPLAggregationsITSuite // Define the expected logical plan val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -213,7 +215,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -248,7 +250,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -283,7 +285,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -317,7 +319,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -358,7 +360,7 @@ class FlintSparkPPLAggregationsITSuite val star = Seq(UnresolvedStar(None)) val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = @@ -396,7 +398,7 @@ class FlintSparkPPLAggregationsITSuite val stateField = UnresolvedAttribute("state") val countryField = UnresolvedAttribute("country") val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val groupByAttributes = Seq(Alias(countryField, "country")()) val aggregateExpressions = diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala index 0a4810b01..fe75ff49b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala @@ -11,6 +11,7 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLFiltersITSuite @@ -20,10 +21,11 @@ class FlintSparkPPLFiltersITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_test" + private val testTable = "spark_catalog.default.flint_ppl_test" override def beforeAll(): Unit = { super.beforeAll() + spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation @@ -83,7 +85,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = EqualTo(UnresolvedAttribute("age"), Literal(25)) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -110,7 +112,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = And( GreaterThan(UnresolvedAttribute("age"), Literal(10)), Not(EqualTo(UnresolvedAttribute("country"), Literal("USA")))) @@ -137,7 +139,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = And( GreaterThan(UnresolvedAttribute("age"), Literal(10)), Not(EqualTo(UnresolvedAttribute("country"), Literal("USA")))) @@ -168,7 +170,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = Or( LessThanOrEqual(UnresolvedAttribute("age"), Literal(20)), EqualTo(UnresolvedAttribute("country"), Literal("USA"))) @@ -192,7 +194,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = Or( LessThanOrEqual(UnresolvedAttribute("age"), Literal(20)), EqualTo(UnresolvedAttribute("country"), Literal("USA"))) @@ -221,7 +223,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = GreaterThan(UnresolvedAttribute("age"), Literal(25)) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -248,7 +250,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = LessThanOrEqual(UnresolvedAttribute("age"), Literal(65)) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -273,7 +275,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = LessThanOrEqual(UnresolvedAttribute("age"), Literal(65)) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -301,7 +303,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = EqualTo(UnresolvedAttribute("name"), Literal("Jake")) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -328,7 +330,7 @@ class FlintSparkPPLFiltersITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val filterExpr = Not(EqualTo(UnresolvedAttribute("name"), Literal("Jake"))) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) @@ -356,7 +358,7 @@ class FlintSparkPPLFiltersITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")() @@ -385,7 +387,7 @@ class FlintSparkPPLFiltersITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")() @@ -432,7 +434,7 @@ class FlintSparkPPLFiltersITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val countryField = UnresolvedAttribute("country") val countryAlias = Alias(countryField, "country")() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala index 848c2af2c..dd623fe70 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala @@ -11,6 +11,7 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLITSuite @@ -20,10 +21,11 @@ class FlintSparkPPLITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_test" + private val testTable = "spark_catalog.default.flint_ppl_test" override def beforeAll(): Unit = { super.beforeAll() + spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation @@ -88,7 +90,9 @@ class FlintSparkPPLITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan val expectedPlan: LogicalPlan = - Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("default", "flint_ppl_test"))) + Project( + Seq(UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Compare the two plans assert(expectedPlan === logicalPlan) } @@ -106,7 +110,7 @@ class FlintSparkPPLITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan val limitPlan: LogicalPlan = - Limit(Literal(2), UnresolvedRelation(Seq("default", "flint_ppl_test"))) + Limit(Literal(2), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) val expectedPlan = Project(Seq(UnresolvedStar(None)), limitPlan) // Compare the two plans @@ -130,7 +134,7 @@ class FlintSparkPPLITSuite Sort( Seq(SortOrder(UnresolvedAttribute("name"), Ascending)), global = true, - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val expectedPlan: LogicalPlan = @@ -159,7 +163,7 @@ class FlintSparkPPLITSuite // Define the expected logical plan val expectedPlan: LogicalPlan = Project( Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Compare the two plans assert(expectedPlan === logicalPlan) } @@ -183,7 +187,7 @@ class FlintSparkPPLITSuite Sort( Seq(SortOrder(UnresolvedAttribute("age"), Ascending)), global = true, - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val expectedPlan: LogicalPlan = @@ -206,7 +210,7 @@ class FlintSparkPPLITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val project = Project( Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val limitPlan: LogicalPlan = Limit(Literal(1), project) val expectedPlan: LogicalPlan = Project(Seq(UnresolvedStar(None)), limitPlan) @@ -227,7 +231,7 @@ class FlintSparkPPLITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val project = Project( Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), - UnresolvedRelation(Seq("default", "flint_ppl_test"))) + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val limitPlan: LogicalPlan = Limit(Literal(1), project) val sortedPlan: LogicalPlan = diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala index 40bcbdcb9..e234806d8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala @@ -13,6 +13,7 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, Floor, GenericRowWithSchema, Literal, Multiply, SortOrder, TimeWindow} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLTimeWindowITSuite @@ -22,10 +23,12 @@ class FlintSparkPPLTimeWindowITSuite with StreamTest { /** Test table and index name */ - private val testTable = "default.flint_ppl_sales_test" + private val testTable = "spark_catalog.default.flint_ppl_sales_test" override def beforeAll(): Unit = { super.beforeAll() + // disable optimization rule + spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation @@ -130,7 +133,7 @@ class FlintSparkPPLTimeWindowITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val productsAmount = UnresolvedAttribute("productsAmount") - val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val windowExpression = Alias( TimeWindow( @@ -189,7 +192,7 @@ class FlintSparkPPLTimeWindowITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val productsAmount = UnresolvedAttribute("productsAmount") - val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val windowExpression = Alias( TimeWindow( @@ -291,7 +294,7 @@ class FlintSparkPPLTimeWindowITSuite val star = Seq(UnresolvedStar(None)) val productsId = Alias(UnresolvedAttribute("productId"), "productId")() val productsAmount = UnresolvedAttribute("productsAmount") - val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val windowExpression = Alias( TimeWindow( @@ -351,7 +354,7 @@ class FlintSparkPPLTimeWindowITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val productsAmount = UnresolvedAttribute("productsAmount") - val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val windowExpression = Alias( TimeWindow( @@ -394,7 +397,7 @@ class FlintSparkPPLTimeWindowITSuite // Define the expected logical plan val star = Seq(UnresolvedStar(None)) val ageField = UnresolvedAttribute("age") - val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_sales_test")) val aggregateExpressions = Alias(UnresolvedFunction(Seq("COUNT"), Seq(ageField), isDistinct = false), "count(age)")() diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala index 3379048a2..ba96ec1ed 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala @@ -20,8 +20,6 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) { new FlintSparkPPLParser(parser) } - extensions.injectFunction(TumbleFunction.description) - extensions.injectOptimizerRule { spark => new FlintSparkOptimizer(spark) } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala index 450f21c63..81fdc90c9 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation +import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -24,4 +25,14 @@ trait FlintPPLSuite extends SharedSparkSession { .set("spark.sql.extensions", classOf[FlintPPLSparkExtensions].getName) conf } + + private def withFlintOptimizerDisabled(block: => Unit): Unit = { + spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") + try { + block + } finally { + spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "true") + } + } + } From f124d6028c9341b6f5f156d4012f02d034dceccb Mon Sep 17 00:00:00 2001 From: YANGDB Date: Tue, 3 Oct 2023 16:50:05 -0700 Subject: [PATCH 3/3] update ppl documentations and the ppl's integration test suite trait to include both flint & ppl to verify they can interact independently Signed-off-by: YANGDB --- README.md | 5 ++ build.sbt | 19 +++--- docs/PPL-on-Spark.md | 59 +++++++++++++++++++ .../flint/spark/ppl}/FlintPPLSuite.scala | 23 ++++---- ...ntSparkPPLAggregationWithSpanITSuite.scala | 4 -- .../FlintSparkPPLAggregationsITSuite.scala | 4 -- .../ppl/FlintSparkPPLFiltersITSuite.scala | 5 -- .../spark/ppl/FlintSparkPPLITSuite.scala | 5 -- .../ppl/FlintSparkPPLTimeWindowITSuite.scala | 6 -- ppl-spark-integration/README.md | 4 +- .../flint/spark/FlintPPLSparkExtensions.scala | 5 -- 11 files changed, 86 insertions(+), 53 deletions(-) create mode 100644 docs/PPL-on-Spark.md rename {ppl-spark-integration/src/test/scala/org/opensearch/flint/spark => integ-test/src/test/scala/org/opensearch/flint/spark/ppl}/FlintPPLSuite.scala (68%) diff --git a/README.md b/README.md index 2f957a40a..973e0d0aa 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,11 @@ To use PPL to Spark translation, you can run Spark with PPL extension: spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions" ``` +### Running With both Extension +``` +spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'" +``` + ## Build To build and run this application with Spark, you can run: diff --git a/build.sbt b/build.sbt index ad1f5de2f..6b7c8d53a 100644 --- a/build.sbt +++ b/build.sbt @@ -61,16 +61,13 @@ lazy val flintCore = (project in file("flint-core")) exclude ("com.fasterxml.jackson.core", "jackson-databind")), publish / skip := true) -lazy val flintSparkIntegration = (project in file("flint-spark-integration")) - .dependsOn(flintCore) +lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) .enablePlugins(AssemblyPlugin, Antlr4Plugin) .settings( commonSettings, - name := "flint-spark-integration", + name := "ppl-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( - "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" - exclude ("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -80,7 +77,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) libraryDependencies ++= deps(sparkVersion), // ANTLR settings Antlr4 / antlr4Version := "4.8", - Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), + Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings @@ -100,14 +97,16 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) }, assembly / test := (Test / test).value) -lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) +lazy val flintSparkIntegration = (project in file("flint-spark-integration")) + .dependsOn(flintCore) .enablePlugins(AssemblyPlugin, Antlr4Plugin) - .dependsOn(flintSparkIntegration) .settings( commonSettings, - name := "ppl-spark-integration", + name := "flint-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( + "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" + exclude ("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -117,7 +116,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) libraryDependencies ++= deps(sparkVersion), // ANTLR settings Antlr4 / antlr4Version := "4.8", - Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"), + Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings diff --git a/docs/PPL-on-Spark.md b/docs/PPL-on-Spark.md new file mode 100644 index 000000000..d94ee8037 --- /dev/null +++ b/docs/PPL-on-Spark.md @@ -0,0 +1,59 @@ +# Running PPL On Spark Reference Manual + +## Overview + +This module provides the support for running [PPL](https://github.com/opensearch-project/piped-processing-language) queries on Spark using direct logical plan +translation between PPL's logical plan to Spark's Catalyst logical plan. + +### What is PPL ? +OpenSearch PPL, or Pipe Processing Language, is a query language used with the OpenSearch platform and now Apache Spark. +PPL allows users to retrieve, query, and analyze data by using commands that are piped together, making it easier to understand and compose complex queries. +Its syntax is inspired by Unix pipes, which enables chaining of commands to transform and process data. +With PPL, users can filter, aggregate, and visualize data in multiple datasources in a more intuitive manner compared to traditional query languages + +### Context + +The next concepts are the main purpose of introduction this functionality: +- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals) +- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. +- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution. +- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins +- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community + + +### Running PPL Commands: + +In order to run PPL commands, you will need to perform the following tasks: + +#### PPL Build & Run + +To build and run this PPL in Spark, you can run: + +``` +sbt clean sparkPPLCosmetic/publishM2 +``` +then add org.opensearch:opensearch-spark_2.12 when run spark application, for example, +``` +bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT" +``` + +### PPL Extension Usage + +To use PPL to Spark translation, you can run Spark with PPL extension: + +``` +spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions" +``` + +### Running With both Flint & PPL Extensions +In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT`) to the cluster's +classpath. + +Next need to configure both extensions : +``` +spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'" +``` + +Once this is done, spark will allow both extensions to parse the query (SQL / PPL) and allow the correct execution of the query. +In addition, PPL queries will enjoy the acceleration capabilities supported by the Flint plugins as described [here](index.md) + diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala similarity index 68% rename from ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala index 81fdc90c9..be43447fe 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintPPLSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala @@ -3,7 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.spark +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions} import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode @@ -13,7 +15,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession trait FlintPPLSuite extends SharedSparkSession { - override protected def sparkConf = { + override protected def sparkConf: SparkConf = { val conf = new SparkConf() .set("spark.ui.enabled", "false") .set(SQLConf.CODEGEN_FALLBACK.key, "false") @@ -22,17 +24,12 @@ trait FlintPPLSuite extends SharedSparkSession { // LocalRelation will exercise the optimization rules better by disabling it as // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. - .set("spark.sql.extensions", classOf[FlintPPLSparkExtensions].getName) + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + .set( + "spark.sql.extensions", + List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName) + .mkString(", ")) + .set(OPTIMIZER_RULE_ENABLED.key, "false") conf } - - private def withFlintOptimizerDisabled(block: => Unit): Unit = { - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") - try { - block - } finally { - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "true") - } - } - } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala index add5ae47d..8dfde6c94 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, Divide, Floor, Literal, Multiply, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLAggregationWithSpanITSuite @@ -25,7 +22,6 @@ class FlintSparkPPLAggregationWithSpanITSuite override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala index 3c4caf83b..e8533d831 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLAggregationsITSuite @@ -25,7 +22,6 @@ class FlintSparkPPLAggregationsITSuite override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") // Create test table // Update table creation diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala index fe75ff49b..fb46ce4de 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLFiltersITSuite @@ -25,8 +22,6 @@ class FlintSparkPPLFiltersITSuite override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") - // Create test table // Update table creation sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala index dd623fe70..9dea04872 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.spark.ppl -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLITSuite @@ -25,8 +22,6 @@ class FlintSparkPPLITSuite override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") - // Create test table // Update table creation sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala index e234806d8..df77e0d90 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala @@ -7,13 +7,10 @@ package org.opensearch.flint.spark.ppl import java.sql.Timestamp -import org.opensearch.flint.spark.FlintPPLSuite - import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, Floor, GenericRowWithSchema, Literal, Multiply, SortOrder, TimeWindow} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLTimeWindowITSuite @@ -27,9 +24,6 @@ class FlintSparkPPLTimeWindowITSuite override def beforeAll(): Unit = { super.beforeAll() - // disable optimization rule - spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") - // Create test table // Update table creation sql(s""" diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index a824cfaaf..7e44b39e4 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -7,9 +7,11 @@ translation between PPL's logical plan to Spark's Catalyst logical plan. The next concepts are the main purpose of introduction this functionality: - Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals) - Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language. -- Seamlessly Interact with different datasources (S3 / Prometheus / data-lake) from within OpenSearch +- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution. +- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins - Improve and promote PPL to become extensible and general purpose query language to be adopted by the community + Acknowledging spark is an excellent conduit for promoting these goals and showcasing the capabilities of PPL to interact & federate data across multiple sources and domains. Another byproduct of introducing PPL on spark would be the much anticipated JOIN capability that will emerge from the usage of Spark compute engine. diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala index ba96ec1ed..26ad4b69b 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala @@ -5,7 +5,6 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.ppl.FlintSparkPPLParser import org.apache.spark.sql.SparkSessionExtensions @@ -19,9 +18,5 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) { extensions.injectParser { (spark, parser) => new FlintSparkPPLParser(parser) } - - extensions.injectOptimizerRule { spark => - new FlintSparkOptimizer(spark) - } } }