From c6b388a99b25a635e0cd80081fa9ed9292533395 Mon Sep 17 00:00:00 2001 From: Hendrik Saly Date: Fri, 13 Sep 2024 20:18:27 +0200 Subject: [PATCH] Add coalesce PPL command (#609) * Inital coalesce implementation Signed-off-by: Hendrik Saly * Change coalesce to use built-in function Signed-off-by: Hendrik Saly * Cleanup Signed-off-by: Hendrik Saly * Cleanup Signed-off-by: Hendrik Saly * Tests related to coalesce function and documentation. Signed-off-by: Lukasz Soszynski --------- Signed-off-by: Hendrik Saly Signed-off-by: Lukasz Soszynski Co-authored-by: Lukasz Soszynski --- .../FlintSparkPPLBuiltinFunctionITSuite.scala | 30 +++++++++++++++++ ppl-spark-integration/README.md | 4 ++- .../src/main/antlr4/OpenSearchPPLLexer.g4 | 3 ++ .../src/main/antlr4/OpenSearchPPLParser.g4 | 5 +++ .../function/BuiltinFunctionName.java | 4 ++- .../ppl/utils/BuiltinFunctionTranslator.java | 2 ++ ...anStringFunctionsTranslatorTestSuite.scala | 33 +++++++++++++++++++ 7 files changed, 79 insertions(+), 2 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltinFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltinFunctionITSuite.scala index 152d3f003..bc155c023 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltinFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBuiltinFunctionITSuite.scala @@ -599,6 +599,36 @@ class FlintSparkPPLBuiltinFunctionITSuite | """.stripMargin)) } + test("test coalesce function") { + val frame = sql(s""" + | source = $testNullTable | where age = 10 | eval result=coalesce(name, state, country) | fields result + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = Array(Row("Canada")) + assert(results.sameElements(expectedResults)) + } + + test("test coalesce function nulls only") { + val frame = sql(s""" + | source = $testNullTable | where age = 10 | eval result=coalesce(name, state) | fields result + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = Array(Row(null)) + assert(results.sameElements(expectedResults)) + } + + test("test coalesce function where") { + val frame = sql(s""" + | source = $testNullTable | where isnull(coalesce(name, state)) + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = Array(Row(null, 10, null, "Canada")) + assert(results.sameElements(expectedResults)) + } + // Todo // +---------------------------------------+ // | Below tests are not supported (cast) | diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index c44f8ac1c..20cf829cd 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -242,7 +242,8 @@ See the next samples of PPL queries : - `source = table | where a < 1 | fields a,b,c` - `source = table | where b != 'test' | fields a,b,c` - `source = table | where c = 'test' | fields a,b,c | head 3` - - `source = table where ispresent(b)` + - `source = table | where ispresent(b)` + - `source = table | where isnull(coalesce(a, b)) | fields a,b,c | head 3` **Filters With Logical Conditions** - `source = table | where c = 'test' AND a = 1 | fields a,b,c` @@ -261,6 +262,7 @@ Assumptions: `a`, `b`, `c` are existing fields in `table` - `source = table | eval f = a * 2, h = f * 2 | fields a,f,h` - `source = table | eval f = a * 2, h = b | stats avg(f) by h` - `source = table | eval f = ispresent(a)` + - `source = table | eval r = coalesce(a, b, c) | fields r Limitation: Overriding existing field is unsupported, following queries throw exceptions with "Reference 'a' is ambiguous" - `source = table | eval a = 10 | fields a,b,c` diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index fc3e23514..324b2a0f9 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -339,6 +339,9 @@ NULLIF: 'NULLIF'; IF: 'IF'; TYPEOF: 'TYPEOF'; +//OTHER CONDITIONAL EXPRESSIONS +COALESCE: 'COALESCE'; + // RELEVANCE FUNCTIONS AND PARAMETERS MATCH: 'MATCH'; MATCH_PHRASE: 'MATCH_PHRASE'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 32b5f3f17..99a4fe9df 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -384,6 +384,7 @@ evalFunctionName | conditionFunctionBase | systemFunctionName | positionFunctionName + | coalesceFunctionName ; functionArgs @@ -660,6 +661,10 @@ positionFunctionName : POSITION ; +coalesceFunctionName + : COALESCE + ; + // operators comparisonOperator : EQUAL diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index bcb805c25..6b549663a 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -250,7 +250,9 @@ public enum BuiltinFunctionName { MULTIMATCH(FunctionName.of("multimatch")), MULTIMATCHQUERY(FunctionName.of("multimatchquery")), WILDCARDQUERY(FunctionName.of("wildcardquery")), - WILDCARD_QUERY(FunctionName.of("wildcard_query")); + WILDCARD_QUERY(FunctionName.of("wildcard_query")), + + COALESCE(FunctionName.of("coalesce")); private FunctionName name; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTranslator.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTranslator.java index cb2246db8..230fa1dad 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTranslator.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTranslator.java @@ -14,6 +14,7 @@ import java.util.Map; import static org.opensearch.sql.expression.function.BuiltinFunctionName.ADD; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.COALESCE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBTRACT; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY; import static org.opensearch.sql.expression.function.BuiltinFunctionName.DIVIDE; @@ -67,6 +68,7 @@ public interface BuiltinFunctionTranslator { .put(IS_NULL, "isnull") .put(IS_NOT_NULL, "isnotnull") .put(BuiltinFunctionName.ISPRESENT, "isnotnull") + .put(COALESCE, "coalesce") .build(); static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List args) { diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanStringFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanStringFunctionsTranslatorTestSuite.scala index fd6fd8866..611563add 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanStringFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanStringFunctionsTranslatorTestSuite.scala @@ -237,4 +237,37 @@ class PPLLogicalPlanStringFunctionsTranslatorTestSuite val expectedPlan = Project(projectList, filterPlan) comparePlans(expectedPlan, logPlan, false) } + + test("test coalesce") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t a = coalesce(b)", false), context) + + val table = UnresolvedRelation(Seq("t")) + val filterExpr = EqualTo( + UnresolvedAttribute("a"), + UnresolvedFunction("coalesce", seq(UnresolvedAttribute("b")), isDistinct = false)) + val filterPlan = Filter(filterExpr, table) + val projectList = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, filterPlan) + comparePlans(expectedPlan, logPlan, false) + } + + test("test coalesce two args") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t a = coalesce(b, c)", false), context) + + val table = UnresolvedRelation(Seq("t")) + val filterExpr = EqualTo( + UnresolvedAttribute("a"), + UnresolvedFunction( + "coalesce", + seq(UnresolvedAttribute("b"), UnresolvedAttribute("c")), + isDistinct = false)) + val filterPlan = Filter(filterExpr, table) + val projectList = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, filterPlan) + comparePlans(expectedPlan, logPlan, false) + } }