diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index fbe5f6ace..0ecea4885 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -86,6 +86,14 @@ Assumptions: `a`, `b`, `c` are existing fields in `table` - `source = table | eval f = case(a = 0, 'zero', a = 1, 'one' else 'unknown')` - `source = table | eval f = case(a = 0, 'zero', a = 1, 'one' else concat(a, ' is an incorrect binary digit'))` +#### Fillnull +Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table` +- `source = table | fillnull with 0 in a` +- `source = table | fillnull with 'N/A' in a, b, c` +- `source = table | fillnull with concat(a, b) in c, d` +- `source = table | fillnull using a = 101` +- `source = table | fillnull using a = 101, b = 102` +- `source = table | fillnull using a = concat(b, c), d = 2 * pi() * e` ```sql source = table | eval e = eval status_category = diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFillnullITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFillnullITSuite.scala index 63e9149bb..4788aa23f 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFillnullITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFillnullITSuite.scala @@ -8,8 +8,8 @@ import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Literal, NamedExpression, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, LogicalPlan, Project, Sort, UnaryNode} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Expression, Literal, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, LogicalPlan, Project, Sort} import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLFillnullITSuite @@ -38,7 +38,7 @@ class FlintSparkPPLFillnullITSuite test("test fillnull with one null replacement value and one column") { val frame = sql(s""" - | source = $testTable | fillnull value = 0 status_code + | source = $testTable | fillnull with 0 in status_code | """.stripMargin) assert(frame.columns.sameElements(Array("id", "request_path", "timestamp", "status_code"))) @@ -57,13 +57,13 @@ class FlintSparkPPLFillnullITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical - val expectedPlan = fillNullExpectedPlan(Seq(("status_code", 0))) + val expectedPlan = fillNullExpectedPlan(Seq(("status_code", Literal(0)))) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } test("test fillnull with various null replacement values and one column") { val frame = sql(s""" - | source = $testTable | fillnull fields status_code=101 + | source = $testTable | fillnull using status_code=101 | """.stripMargin) assert(frame.columns.sameElements(Array("id", "request_path", "timestamp", "status_code"))) @@ -82,13 +82,13 @@ class FlintSparkPPLFillnullITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical - val expectedPlan = fillNullExpectedPlan(Seq(("status_code", 101))) + val expectedPlan = fillNullExpectedPlan(Seq(("status_code", Literal(101)))) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } test("test fillnull with one null replacement value and two columns") { val frame = sql(s""" - | source = $testTable | fillnull value = '???' request_path, timestamp | fields id, request_path, timestamp + | source = $testTable | fillnull with concat('??', '?') in request_path, timestamp | fields id, request_path, timestamp | """.stripMargin) assert(frame.columns.sameElements(Array("id", "request_path", "timestamp"))) @@ -108,7 +108,13 @@ class FlintSparkPPLFillnullITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical val fillNullPlan = fillNullExpectedPlan( - Seq(("request_path", "???"), ("timestamp", "???")), + Seq( + ( + "request_path", + UnresolvedFunction("concat", Seq(Literal("??"), Literal("?")), isDistinct = false)), + ( + "timestamp", + UnresolvedFunction("concat", Seq(Literal("??"), Literal("?")), isDistinct = false))), addDefaultProject = false) val expectedPlan = Project( Seq( @@ -121,7 +127,7 @@ class FlintSparkPPLFillnullITSuite test("test fillnull with various null replacement values and two columns") { val frame = sql(s""" - | source = $testTable | fillnull fields request_path='/not_found', timestamp='*' | fields id, request_path, timestamp + | source = $testTable | fillnull using request_path=upper('/not_found'), timestamp='*' | fields id, request_path, timestamp | """.stripMargin) assert(frame.columns.sameElements(Array("id", "request_path", "timestamp"))) @@ -131,8 +137,8 @@ class FlintSparkPPLFillnullITSuite Row(1, "/home", "*"), Row(2, "/about", "2023-10-01 10:05:00"), Row(3, "/contact", "2023-10-01 10:10:00"), - Row(4, "/not_found", "2023-10-01 10:15:00"), - Row(5, "/not_found", "2023-10-01 10:20:00"), + Row(4, "/NOT_FOUND", "2023-10-01 10:15:00"), + Row(5, "/NOT_FOUND", "2023-10-01 10:20:00"), Row(6, "/home", "*")) // Compare the results implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) @@ -141,7 +147,11 @@ class FlintSparkPPLFillnullITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical val fillNullPlan = fillNullExpectedPlan( - Seq(("request_path", "/not_found"), ("timestamp", "*")), + Seq( + ( + "request_path", + UnresolvedFunction("upper", Seq(Literal("/not_found")), isDistinct = false)), + ("timestamp", Literal("*"))), addDefaultProject = false) val expectedPlan = Project( Seq( @@ -154,7 +164,7 @@ class FlintSparkPPLFillnullITSuite test("test fillnull with one null replacement value and stats and sort command") { val frame = sql(s""" - | source = $testTable | fillnull value = 500 status_code + | source = $testTable | fillnull with 500 in status_code | | stats count(status_code) by status_code, request_path | | sort request_path, status_code | """.stripMargin) @@ -174,7 +184,8 @@ class FlintSparkPPLFillnullITSuite // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical - val fillNullPlan = fillNullExpectedPlan(Seq(("status_code", 500)), addDefaultProject = false) + val fillNullPlan = + fillNullExpectedPlan(Seq(("status_code", Literal(500))), addDefaultProject = false) val aggregateExpressions = Seq( Alias( @@ -203,7 +214,7 @@ class FlintSparkPPLFillnullITSuite test("test fillnull with various null replacement value and stats and sort command") { val frame = sql(s""" - | source = $testTable | fillnull fields status_code = 500, request_path = '/home' + | source = $testTable | fillnull using status_code = 500, request_path = '/home' | | stats count(status_code) by status_code, request_path | | sort request_path, status_code | """.stripMargin) @@ -222,7 +233,7 @@ class FlintSparkPPLFillnullITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val fillNullPlan = fillNullExpectedPlan( - Seq(("status_code", 500), ("request_path", "/home")), + Seq(("status_code", Literal(500)), ("request_path", Literal("/home"))), addDefaultProject = false) val aggregateExpressions = Seq( @@ -252,7 +263,7 @@ class FlintSparkPPLFillnullITSuite test("test fillnull with one null replacement value and missing columns") { val ex = intercept[AnalysisException](sql(s""" - | source = $testTable | fillnull value = '!!!' + | source = $testTable | fillnull with '!!!' in | """.stripMargin)) assert(ex.getMessage().contains("Syntax error ")) @@ -260,14 +271,14 @@ class FlintSparkPPLFillnullITSuite test("test fillnull with various null replacement values and missing columns") { val ex = intercept[AnalysisException](sql(s""" - | source = $testTable | fillnull fields + | source = $testTable | fillnull using | """.stripMargin)) assert(ex.getMessage().contains("Syntax error ")) } private def fillNullExpectedPlan( - nullReplacements: Seq[(String, Any)], + nullReplacements: Seq[(String, Expression)], addDefaultProject: Boolean = true): LogicalPlan = { val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) val renameProjectList = UnresolvedStar(None) +: nullReplacements.map { @@ -275,7 +286,7 @@ class FlintSparkPPLFillnullITSuite Alias( UnresolvedFunction( "coalesce", - Seq(UnresolvedAttribute(nullableColumn), Literal(nullReplacement)), + Seq(UnresolvedAttribute(nullableColumn), nullReplacement), isDistinct = false), nullableColumn)() } diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index 904024ea3..7646b3d74 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -74,6 +74,8 @@ D: 'D'; DESC: 'DESC'; DATASOURCES: 'DATASOURCES'; VALUE: 'VALUE'; +USING: 'USING'; +WITH: 'WITH'; // CLAUSE KEYWORDS SORTBY: 'SORTBY'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 80551f9ec..79b2c56e7 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -191,11 +191,11 @@ fillnullCommand ; fillNullWithTheSameValue - : VALUE EQUAL nullReplacement nullableField (COMMA nullableField)* + : WITH nullReplacement IN nullableField (COMMA nullableField)* ; fillNullWithFieldVariousValues - : FIELDS nullableField EQUAL nullReplacement (COMMA nullableField EQUAL nullReplacement)* + : USING nullableField EQUAL nullReplacement (COMMA nullableField EQUAL nullReplacement)* ; @@ -204,7 +204,7 @@ fillnullCommand ; nullReplacement - : literalValue + : expression ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FillNull.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FillNull.java index d1bb9df66..19bfea668 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FillNull.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FillNull.java @@ -6,7 +6,7 @@ import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.Node; import org.opensearch.sql.ast.expression.Field; -import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.UnresolvedExpression; import java.util.List; import java.util.Objects; @@ -21,7 +21,7 @@ public static class NullableFieldFill { @NonNull private final Field nullableFieldReference; @NonNull - private final Literal replaceNullWithMe; + private final UnresolvedExpression replaceNullWithMe; } public interface ContainNullableFieldFill { @@ -31,7 +31,7 @@ static ContainNullableFieldFill ofVariousValue(List replaceme return new VariousValueNullFill(replacements); } - static ContainNullableFieldFill ofSameValue(Literal replaceNullWithMe, List nullableFieldReferences) { + static ContainNullableFieldFill ofSameValue(UnresolvedExpression replaceNullWithMe, List nullableFieldReferences) { return new SameValueNullFill(replaceNullWithMe, nullableFieldReferences); } } @@ -40,7 +40,7 @@ private static class SameValueNullFill implements ContainNullableFieldFill { @Getter(onMethod_ = @Override) private final List nullFieldFill; - public SameValueNullFill(Literal replaceNullWithMe, List nullableFieldReferences) { + public SameValueNullFill(UnresolvedExpression replaceNullWithMe, List nullableFieldReferences) { Objects.requireNonNull(replaceNullWithMe, "Null replacement is required"); this.nullFieldFill = Objects.requireNonNull(nullableFieldReferences, "Nullable field reference is required") .stream() diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 70717b2a0..e6ab083ee 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -397,7 +397,7 @@ public LogicalPlan visitFillNull(FillNull fillNull, CatalystPlanContext context) List aliases = new ArrayList<>(); for(FillNull.NullableFieldFill nullableFieldFill : fillNull.getNullableFieldFills()) { Field field = nullableFieldFill.getNullableFieldReference(); - Literal replaceNullWithMe = nullableFieldFill.getReplaceNullWithMe(); + UnresolvedExpression replaceNullWithMe = nullableFieldFill.getReplaceNullWithMe(); Function coalesce = new Function("coalesce", of(field, replaceNullWithMe)); String fieldName = field.getField().toString(); Alias alias = new Alias(fieldName, coalesce); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 0a1218324..8673b1582 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -513,7 +513,7 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo FillNullWithFieldVariousValuesContext variousValuesContext = ctx.fillNullWithFieldVariousValues(); if (sameValueContext != null) { // todo consider using expression instead of Literal - Literal replaceNullWithMe = (Literal) internalVisitExpression(sameValueContext.nullReplacement().literalValue()); + UnresolvedExpression replaceNullWithMe = internalVisitExpression(sameValueContext.nullReplacement().expression()); List fieldsToReplace = sameValueContext.nullableField() .stream() .map(this::internalVisitExpression) @@ -524,7 +524,7 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo List nullableFieldFills = IntStream.range(0, variousValuesContext.nullableField().size()) .mapToObj(index -> { variousValuesContext.nullableField(index); - Literal replaceNullWithMe = (Literal) internalVisitExpression(variousValuesContext.nullReplacement(index).literalValue()); + UnresolvedExpression replaceNullWithMe = internalVisitExpression(variousValuesContext.nullReplacement(index).expression()); Field nullableFieldReference = (Field) internalVisitExpression(variousValuesContext.nullableField(index)); return new NullableFieldFill(nullableFieldReference, replaceNullWithMe); }) diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFillnullCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFillnullCommandTranslatorTestSuite.scala index ead17fda2..9f38465da 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFillnullCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFillnullCommandTranslatorTestSuite.scala @@ -31,7 +31,7 @@ class PPLLogicalPlanFillnullCommandTranslatorTestSuite planTransformer.visit( plan( pplParser, - "source=relation | fillnull value = 'null replacement value' column_name"), + "source=relation | fillnull with 'null replacement value' in column_name"), context) val relation = UnresolvedRelation(Seq("relation")) @@ -54,13 +54,45 @@ class PPLLogicalPlanFillnullCommandTranslatorTestSuite comparePlans(expectedPlan, logPlan, checkAnalysis = false) } + test("test fillnull with one null replacement value, one column and function invocation") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | fillnull with upper(another_field) in column_name"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + + val renameProjectList: Seq[NamedExpression] = + Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "coalesce", + Seq( + UnresolvedAttribute("column_name"), + UnresolvedFunction( + "upper", + Seq(UnresolvedAttribute("another_field")), + isDistinct = false)), + isDistinct = false), + "column_name")()) + val renameProject = Project(renameProjectList, relation) + + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("column_name")), renameProject) + + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + test("test fillnull with one null replacement value and multiple column") { val context = new CatalystPlanContext val logPlan = planTransformer.visit( plan( pplParser, - "source=relation | fillnull value = 'another null replacement value' column_name_one, column_name_two, column_name_three"), + "source=relation | fillnull with 'another null replacement value' in column_name_one, column_name_two, column_name_three"), context) val relation = UnresolvedRelation(Seq("relation")) @@ -104,7 +136,7 @@ class PPLLogicalPlanFillnullCommandTranslatorTestSuite val context = new CatalystPlanContext val logPlan = planTransformer.visit( - plan(pplParser, "source=relation | fillnull fields column_name='null replacement value'"), + plan(pplParser, "source=relation | fillnull using column_name='null replacement value'"), context) val relation = UnresolvedRelation(Seq("relation")) @@ -127,13 +159,48 @@ class PPLLogicalPlanFillnullCommandTranslatorTestSuite comparePlans(expectedPlan, logPlan, checkAnalysis = false) } + test( + "test fillnull with possibly various null replacement value, one column and function invocation") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=relation | fillnull using column_name=concat('missing value for', id)"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + + val renameProjectList: Seq[NamedExpression] = + Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "coalesce", + Seq( + UnresolvedAttribute("column_name"), + UnresolvedFunction( + "concat", + Seq(Literal("missing value for"), UnresolvedAttribute("id")), + isDistinct = false)), + isDistinct = false), + "column_name")()) + val renameProject = Project(renameProjectList, relation) + + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("column_name")), renameProject) + + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + test("test fillnull with possibly various null replacement value and three columns") { val context = new CatalystPlanContext val logPlan = planTransformer.visit( plan( pplParser, - "source=relation | fillnull fields column_name_1='null replacement value 1', column_name_2='null replacement value 2', column_name_3='null replacement value 3'"), + "source=relation | fillnull using column_name_1='null replacement value 1', column_name_2='null replacement value 2', column_name_3='null replacement value 3'"), context) val relation = UnresolvedRelation(Seq("relation"))