Skip to content

Commit

Permalink
Integration test for usage signum function in the eval command
Browse files Browse the repository at this point in the history
Signed-off-by: Lukasz Soszynski <[email protected]>
  • Loading branch information
lukasz-soszynski-eliatra committed Sep 30, 2024
1 parent be667b4 commit e95f173
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThanOrEqual, LessThan, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, LogicalPlan, Project, Sort}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLEvalITSuite
Expand All @@ -22,13 +22,15 @@ class FlintSparkPPLEvalITSuite
/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
private val testTableHttpLog = "spark_catalog.default.flint_ppl_test_http_log"
private val duplicatesNullableTestTable = "spark_catalog.default.duplicates_nullable_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createPartitionedStateCountryTable(testTable)
createTableHttpLog(testTableHttpLog)
createDuplicationNullableTable(duplicatesNullableTestTable)
}

protected override def afterEach(): Unit = {
Expand Down Expand Up @@ -632,8 +634,45 @@ class FlintSparkPPLEvalITSuite
EqualTo(Literal(true), and)
}

// Todo excluded fields not support yet
test("Test eval and signum function") {
val frame = sql(s"""
| source = $duplicatesNullableTestTable | fields id | sort id | eval i = pow(-2, id), s = signum(i) | head 5
| """.stripMargin)
val rows = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1, -2d, -1d),
Row(2, 4d, 1d),
Row(3, -8d, -1d),
Row(4, 16d, 1d),
Row(5, -32d, -1d))
assert(rows.sameElements(expectedResults))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val tablePlan =
UnresolvedRelation(Seq("spark_catalog", "default", "duplicates_nullable_test"))
val projectIdPlan = Project(Seq(UnresolvedAttribute("id")), tablePlan)
val sortPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), global = true, projectIdPlan)
val evalPlan = Project(
Seq(
UnresolvedStar(None),
Alias(
UnresolvedFunction(
"pow",
Seq(Literal(-2), UnresolvedAttribute("id")),
isDistinct = false),
"i")(),
Alias(
UnresolvedFunction("signum", Seq(UnresolvedAttribute("i")), isDistinct = false),
"s")()),
sortPlan)
val localLimitPlan = LocalLimit(Literal(5), evalPlan)
val globalLimitPlan = GlobalLimit(Literal(5), localLimitPlan)
val expectedPlan = Project(Seq(UnresolvedStar(None)), globalLimitPlan)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

// Todo excluded fields not support yet
ignore("test single eval expression with excluded fields") {
val frame = sql(s"""
| source = $testTable | eval new_field = "New Field" | fields - age
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class PPLLogicalPlanMathFunctionsTranslatorTestSuite

test("test signum") {
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "source=t a = signum(b)", false), context)
val logPlan = planTransformer.visit(plan(pplParser, "source=t a = signum(b)"), context)

val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(
Expand Down

0 comments on commit e95f173

Please sign in to comment.