Skip to content

Commit

Permalink
Add coalesce PPL command (#609)
Browse files Browse the repository at this point in the history
* Inital coalesce implementation

Signed-off-by: Hendrik Saly <[email protected]>

* Change coalesce to use built-in function

Signed-off-by: Hendrik Saly <[email protected]>

* Cleanup

Signed-off-by: Hendrik Saly <[email protected]>

* Cleanup

Signed-off-by: Hendrik Saly <[email protected]>

* Tests related to coalesce function and documentation.

Signed-off-by: Lukasz Soszynski <[email protected]>

---------

Signed-off-by: Hendrik Saly <[email protected]>
Signed-off-by: Lukasz Soszynski <[email protected]>
Co-authored-by: Lukasz Soszynski <[email protected]>
  • Loading branch information
salyh and lukasz-soszynski-eliatra authored Sep 13, 2024
1 parent 4563bda commit c6b388a
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,36 @@ class FlintSparkPPLBuiltinFunctionITSuite
| """.stripMargin))
}

test("test coalesce function") {
val frame = sql(s"""
| source = $testNullTable | where age = 10 | eval result=coalesce(name, state, country) | fields result
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row("Canada"))
assert(results.sameElements(expectedResults))
}

test("test coalesce function nulls only") {
val frame = sql(s"""
| source = $testNullTable | where age = 10 | eval result=coalesce(name, state) | fields result
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row(null))
assert(results.sameElements(expectedResults))
}

test("test coalesce function where") {
val frame = sql(s"""
| source = $testNullTable | where isnull(coalesce(name, state))
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row(null, 10, null, "Canada"))
assert(results.sameElements(expectedResults))
}

// Todo
// +---------------------------------------+
// | Below tests are not supported (cast) |
Expand Down
4 changes: 3 additions & 1 deletion ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ See the next samples of PPL queries :
- `source = table | where a < 1 | fields a,b,c`
- `source = table | where b != 'test' | fields a,b,c`
- `source = table | where c = 'test' | fields a,b,c | head 3`
- `source = table where ispresent(b)`
- `source = table | where ispresent(b)`
- `source = table | where isnull(coalesce(a, b)) | fields a,b,c | head 3`

**Filters With Logical Conditions**
- `source = table | where c = 'test' AND a = 1 | fields a,b,c`
Expand All @@ -261,6 +262,7 @@ Assumptions: `a`, `b`, `c` are existing fields in `table`
- `source = table | eval f = a * 2, h = f * 2 | fields a,f,h`
- `source = table | eval f = a * 2, h = b | stats avg(f) by h`
- `source = table | eval f = ispresent(a)`
- `source = table | eval r = coalesce(a, b, c) | fields r

Limitation: Overriding existing field is unsupported, following queries throw exceptions with "Reference 'a' is ambiguous"
- `source = table | eval a = 10 | fields a,b,c`
Expand Down
3 changes: 3 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ NULLIF: 'NULLIF';
IF: 'IF';
TYPEOF: 'TYPEOF';

//OTHER CONDITIONAL EXPRESSIONS
COALESCE: 'COALESCE';

// RELEVANCE FUNCTIONS AND PARAMETERS
MATCH: 'MATCH';
MATCH_PHRASE: 'MATCH_PHRASE';
Expand Down
5 changes: 5 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ evalFunctionName
| conditionFunctionBase
| systemFunctionName
| positionFunctionName
| coalesceFunctionName
;

functionArgs
Expand Down Expand Up @@ -660,6 +661,10 @@ positionFunctionName
: POSITION
;

coalesceFunctionName
: COALESCE
;

// operators
comparisonOperator
: EQUAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ public enum BuiltinFunctionName {
MULTIMATCH(FunctionName.of("multimatch")),
MULTIMATCHQUERY(FunctionName.of("multimatchquery")),
WILDCARDQUERY(FunctionName.of("wildcardquery")),
WILDCARD_QUERY(FunctionName.of("wildcard_query"));
WILDCARD_QUERY(FunctionName.of("wildcard_query")),

COALESCE(FunctionName.of("coalesce"));

private FunctionName name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Map;

import static org.opensearch.sql.expression.function.BuiltinFunctionName.ADD;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.COALESCE;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBTRACT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DIVIDE;
Expand Down Expand Up @@ -67,6 +68,7 @@ public interface BuiltinFunctionTranslator {
.put(IS_NULL, "isnull")
.put(IS_NOT_NULL, "isnotnull")
.put(BuiltinFunctionName.ISPRESENT, "isnotnull")
.put(COALESCE, "coalesce")
.build();

static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List<Expression> args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,37 @@ class PPLLogicalPlanStringFunctionsTranslatorTestSuite
val expectedPlan = Project(projectList, filterPlan)
comparePlans(expectedPlan, logPlan, false)
}

test("test coalesce") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "source=t a = coalesce(b)", false), context)

val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(
UnresolvedAttribute("a"),
UnresolvedFunction("coalesce", seq(UnresolvedAttribute("b")), isDistinct = false))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedStar(None))
val expectedPlan = Project(projectList, filterPlan)
comparePlans(expectedPlan, logPlan, false)
}

test("test coalesce two args") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "source=t a = coalesce(b, c)", false), context)

val table = UnresolvedRelation(Seq("t"))
val filterExpr = EqualTo(
UnresolvedAttribute("a"),
UnresolvedFunction(
"coalesce",
seq(UnresolvedAttribute("b"), UnresolvedAttribute("c")),
isDistinct = false))
val filterPlan = Filter(filterExpr, table)
val projectList = Seq(UnresolvedStar(None))
val expectedPlan = Project(projectList, filterPlan)
comparePlans(expectedPlan, logPlan, false)
}
}

0 comments on commit c6b388a

Please sign in to comment.