From 00192bab8c965870b25b7532ca4a6b3bf0938229 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 2 Oct 2023 15:05:33 -0700 Subject: [PATCH] add optimizer support for flint based acceleration queries - also using the PPL add ppl-spark-extension jar mvn publish Signed-off-by: YANGDB --- .github/workflows/snapshot-publish.yml | 1 + build.sbt | 19 ++++++++++--------- .../flint/spark/FlintPPLSparkExtensions.scala | 7 +++++++ 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/.github/workflows/snapshot-publish.yml b/.github/workflows/snapshot-publish.yml index 7dae546b3..6025a3b02 100644 --- a/.github/workflows/snapshot-publish.yml +++ b/.github/workflows/snapshot-publish.yml @@ -29,6 +29,7 @@ jobs: - name: Publish to Local Maven run: | sbt standaloneCosmetic/publishM2 + sbt sparkPPLCosmetic/publishM2 sbt sparkSqlApplicationCosmetic/publishM2 - uses: actions/checkout@v3 diff --git a/build.sbt b/build.sbt index 6b7c8d53a..ad1f5de2f 100644 --- a/build.sbt +++ b/build.sbt @@ -61,13 +61,16 @@ lazy val flintCore = (project in file("flint-core")) exclude ("com.fasterxml.jackson.core", "jackson-databind")), publish / skip := true) -lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) +lazy val flintSparkIntegration = (project in file("flint-spark-integration")) + .dependsOn(flintCore) .enablePlugins(AssemblyPlugin, Antlr4Plugin) .settings( commonSettings, - name := "ppl-spark-integration", + name := "flint-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( + "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" + exclude ("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -77,7 +80,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) libraryDependencies ++= deps(sparkVersion), // ANTLR settings Antlr4 / antlr4Version := "4.8", - Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"), + Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings @@ -97,16 +100,14 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) }, assembly / test := (Test / test).value) -lazy val flintSparkIntegration = (project in file("flint-spark-integration")) - .dependsOn(flintCore) +lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) .enablePlugins(AssemblyPlugin, Antlr4Plugin) + .dependsOn(flintSparkIntegration) .settings( commonSettings, - name := "flint-spark-integration", + name := "ppl-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( - "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" - exclude ("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -116,7 +117,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) libraryDependencies ++= deps(sparkVersion), // ANTLR settings Antlr4 / antlr4Version := "4.8", - Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"), + Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.ppl"), Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala index 26ad4b69b..3379048a2 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintPPLSparkExtensions.scala @@ -5,6 +5,7 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.ppl.FlintSparkPPLParser import org.apache.spark.sql.SparkSessionExtensions @@ -18,5 +19,11 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) { extensions.injectParser { (spark, parser) => new FlintSparkPPLParser(parser) } + + extensions.injectFunction(TumbleFunction.description) + + extensions.injectOptimizerRule { spark => + new FlintSparkOptimizer(spark) + } } }