Skip to content

Commit

Permalink
update some tests
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Sep 11, 2023
1 parent 17e93fb commit 69df8ad
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 95 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface
private val flintAstBuilder = new FlintSparkSqlAstBuilder()

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { flintParser =>
flintAstBuilder.visit(flintParser.singleStatement())
try {
flintAstBuilder.visit(flintParser.singleStatement())
} catch {
// Fall back to Spark parse plan logic if flint cannot parse
case _: ParseException => sparkParser.parsePlan(sqlText)
}
}

override def parseExpression(sqlText: String): Expression = sparkParser.parseExpression(sqlText)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.spark.sql.flint.config.FlintConfigEntry
import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.opensearch.flint.spark.FlintGenericSparkExtensions
import org.opensearch.flint.spark.FlintSparkExtensions

trait FlintSuite extends SharedSparkSession {
override protected def sparkConf = {
Expand All @@ -24,7 +24,7 @@ trait FlintSuite extends SharedSparkSession {
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
.set("spark.sql.extensions", classOf[FlintGenericSparkExtensions].getName)
.set("spark.sql.extensions", classOf[FlintSparkExtensions].getName)
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.apache.spark.sql.{QueryTest, Row}
class FlintSparkPPLITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintSuite
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
Expand Down Expand Up @@ -309,16 +309,11 @@ class FlintSparkPPLITSuite
}

test("create ppl simple age avg group by country query test ") {
val checkData = sql(s"SELECT country, AVG(age) AS avg_age FROM $testTable group by country");
checkData.show()
checkData.queryExecution.logical.show()

val frame = sql(
s"""
| source = $testTable| stats avg(age) by country
| """.stripMargin)


// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,15 @@ class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface
private val pplParser = new PPLSyntaxParser()

override def parsePlan(sqlText: String): LogicalPlan = {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
planTrnasormer.visit(plan(pplParser, sqlText, false), context)
context.getPlan
try {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
planTrnasormer.visit(plan(pplParser, sqlText, false), context)
context.getPlan
} catch {
// Fall back to Spark parse plan logic if flint cannot parse
case _: ParseException | _: SyntaxCheckException => sparkParser.parsePlan(sqlText)
}
}

override def parseExpression(sqlText: String): Expression = sparkParser.parseExpression(sqlText)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object SQLJob {

val conf: SparkConf = new SparkConf()
.setAppName("SQLJob")
.set("spark.sql.extensions", "org.opensearch.flint.spark.FlintGenericSparkExtensions")
.set("spark.sql.extensions", "org.opensearch.flint.spark.FlintSparkExtensions")
.set("spark.datasource.flint.host", host)
.set("spark.datasource.flint.port", port)
.set("spark.datasource.flint.scheme", scheme)
Expand Down

0 comments on commit 69df8ad

Please sign in to comment.