From e95f17380e606416ec13688851a16a1c6d28449b Mon Sep 17 00:00:00 2001 From: Lukasz Soszynski Date: Mon, 30 Sep 2024 13:57:14 +0200 Subject: [PATCH] Integration test for usage signum function in the eval command Signed-off-by: Lukasz Soszynski --- .../spark/ppl/FlintSparkPPLEvalITSuite.scala | 43 ++++++++++++++++++- ...PlanMathFunctionsTranslatorTestSuite.scala | 2 +- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala index 596626698..e10b2e2a6 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala @@ -10,7 +10,7 @@ import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThanOrEqual, LessThan, Literal, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project, Sort} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, LogicalPlan, Project, Sort} import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLEvalITSuite @@ -22,6 +22,7 @@ class FlintSparkPPLEvalITSuite /** Test table and index name */ private val testTable = "spark_catalog.default.flint_ppl_test" private val testTableHttpLog = "spark_catalog.default.flint_ppl_test_http_log" + private val duplicatesNullableTestTable = "spark_catalog.default.duplicates_nullable_test" override def beforeAll(): Unit = { super.beforeAll() @@ -29,6 +30,7 @@ class FlintSparkPPLEvalITSuite // Create test table createPartitionedStateCountryTable(testTable) createTableHttpLog(testTableHttpLog) + createDuplicationNullableTable(duplicatesNullableTestTable) } protected override def afterEach(): Unit = { @@ -632,8 +634,45 @@ class FlintSparkPPLEvalITSuite EqualTo(Literal(true), and) } - // Todo excluded fields not support yet + test("Test eval and signum function") { + val frame = sql(s""" + | source = $duplicatesNullableTestTable | fields id | sort id | eval i = pow(-2, id), s = signum(i) | head 5 + | """.stripMargin) + val rows = frame.collect() + val expectedResults: Array[Row] = Array( + Row(1, -2d, -1d), + Row(2, 4d, 1d), + Row(3, -8d, -1d), + Row(4, 16d, 1d), + Row(5, -32d, -1d)) + assert(rows.sameElements(expectedResults)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val tablePlan = + UnresolvedRelation(Seq("spark_catalog", "default", "duplicates_nullable_test")) + val projectIdPlan = Project(Seq(UnresolvedAttribute("id")), tablePlan) + val sortPlan = + Sort(Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), global = true, projectIdPlan) + val evalPlan = Project( + Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "pow", + Seq(Literal(-2), UnresolvedAttribute("id")), + isDistinct = false), + "i")(), + Alias( + UnresolvedFunction("signum", Seq(UnresolvedAttribute("i")), isDistinct = false), + "s")()), + sortPlan) + val localLimitPlan = LocalLimit(Literal(5), evalPlan) + val globalLimitPlan = GlobalLimit(Literal(5), localLimitPlan) + val expectedPlan = Project(Seq(UnresolvedStar(None)), globalLimitPlan) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + // Todo excluded fields not support yet ignore("test single eval expression with excluded fields") { val frame = sql(s""" | source = $testTable | eval new_field = "New Field" | fields - age diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanMathFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanMathFunctionsTranslatorTestSuite.scala index e17cf0747..feaa7d8ca 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanMathFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanMathFunctionsTranslatorTestSuite.scala @@ -194,7 +194,7 @@ class PPLLogicalPlanMathFunctionsTranslatorTestSuite test("test signum") { val context = new CatalystPlanContext - val logPlan = planTransformer.visit(plan(pplParser, "source=t a = signum(b)", false), context) + val logPlan = planTransformer.visit(plan(pplParser, "source=t a = signum(b)"), context) val table = UnresolvedRelation(Seq("t")) val filterExpr = EqualTo(