diff --git a/docs/ppl-lang/README.md b/docs/ppl-lang/README.md index efbeafe91..3b51ffc8d 100644 --- a/docs/ppl-lang/README.md +++ b/docs/ppl-lang/README.md @@ -27,6 +27,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md). - [`dedup command `](ppl-dedup-command.md) - [`describe command`](PPL-Example-Commands.md/#describe) + + - [`fillnull command`](ppl-fillnull-command.md) - [`eval command`](ppl-eval-command.md) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 78abf7ff2..1ecf48d28 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -619,4 +619,27 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | (6, 403, '/home', '2023-10-01 10:25:00') | """.stripMargin) } + + protected def createNullableTableHttpLog(testTable: String): Unit = { + sql(s""" + | CREATE TABLE $testTable + |( + | id INT, + | status_code INT, + | request_path STRING, + | timestamp STRING + |) + | USING $tableType $tableOptions + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | VALUES (1, 200, '/home', null), + | (2, null, '/about', '2023-10-01 10:05:00'), + | (3, null, '/contact', '2023-10-01 10:10:00'), + | (4, 301, null, '2023-10-01 10:15:00'), + | (5, 200, null, '2023-10-01 10:20:00'), + | (6, 403, '/home', null) + | """.stripMargin) + } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFillnullITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFillnullITSuite.scala new file mode 100644 index 000000000..34f70a8e0 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFillnullITSuite.scala @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.flint.spark.ppl + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.streaming.StreamTest + +class FlintSparkPPLFillnullITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + private val testTable = "spark_catalog.default.flint_ppl_test" + + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test table + createNullableTableHttpLog(testTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("test fillnull with one null replacement value and one column") { + val frame = sql(s""" + | source = $testTable | fillnull value = 0 status_code + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(1, "/home", null, 200), + Row(2, "/about", "2023-10-01 10:05:00", 0), + Row(3, "/contact", "2023-10-01 10:10:00", 0), + Row(4, null, "2023-10-01 10:15:00", 301), + Row(5, null, "2023-10-01 10:20:00", 200), + Row(6, "/home", null, 403)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + + test("test fillnull with various null replacement values and one column") { + val frame = sql(s""" + | source = $testTable | fillnull fields status_code=101 + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(1, "/home", null, 200), + Row(2, "/about", "2023-10-01 10:05:00", 101), + Row(3, "/contact", "2023-10-01 10:10:00", 101), + Row(4, null, "2023-10-01 10:15:00", 301), + Row(5, null, "2023-10-01 10:20:00", 200), + Row(6, "/home", null, 403)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + + test("test fillnull with one null replacement value and two columns") { + val frame = sql(s""" + | source = $testTable | fillnull value = '???' request_path, timestamp | fields id, request_path, timestamp + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(1, "/home", "???"), + Row(2, "/about", "2023-10-01 10:05:00"), + Row(3, "/contact", "2023-10-01 10:10:00"), + Row(4, "???", "2023-10-01 10:15:00"), + Row(5, "???", "2023-10-01 10:20:00"), + Row(6, "/home", "???")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + + test("test fillnull with various null replacement values and two columns") { + val frame = sql(s""" + | source = $testTable | fillnull fields request_path='/not_found', timestamp='*' | fields id, request_path, timestamp + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(1, "/home", "*"), + Row(2, "/about", "2023-10-01 10:05:00"), + Row(3, "/contact", "2023-10-01 10:10:00"), + Row(4, "/not_found", "2023-10-01 10:15:00"), + Row(5, "/not_found", "2023-10-01 10:20:00"), + Row(6, "/home", "*")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } +} diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index 7af3e2109..904024ea3 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -35,6 +35,7 @@ NEW_FIELD: 'NEW_FIELD'; KMEANS: 'KMEANS'; AD: 'AD'; ML: 'ML'; +FILLNULL: 'FILLNULL'; //Native JOIN KEYWORDS JOIN: 'JOIN'; @@ -72,6 +73,7 @@ INDEX: 'INDEX'; D: 'D'; DESC: 'DESC'; DATASOURCES: 'DATASOURCES'; +VALUE: 'VALUE'; // CLAUSE KEYWORDS SORTBY: 'SORTBY'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 12ec4ed26..80551f9ec 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -51,6 +51,7 @@ commands | patternsCommand | lookupCommand | renameCommand + | fillnullCommand ; searchCommand @@ -184,6 +185,29 @@ lookupPair : inputField = fieldExpression (AS outputField = fieldExpression)? ; +fillnullCommand + : FILLNULL (fillNullWithTheSameValue + | fillNullWithFieldVariousValues) + ; + + fillNullWithTheSameValue + : VALUE EQUAL nullReplacement nullableField (COMMA nullableField)* + ; + + fillNullWithFieldVariousValues + : FIELDS nullableField EQUAL nullReplacement (COMMA nullableField EQUAL nullReplacement)* + ; + + + nullableField + : fieldExpression + ; + + nullReplacement + : literalValue + ; + + kmeansCommand : KMEANS (kmeansParameter)* ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 76f9479f4..e42306965 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -56,6 +56,7 @@ import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TableFunction; import org.opensearch.sql.ast.tree.Values; +import org.opensearch.sql.ast.tree.*; /** AST nodes visitor Defines the traverse path. */ public abstract class AbstractNodeVisitor { @@ -293,4 +294,7 @@ public T visitExplain(Explain node, C context) { public T visitInSubquery(InSubquery node, C context) { return visitChildren(node, context); } + public T visitFillNull(FillNull fillNull, C context) { + return visitChildren(fillNull, context); + } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FillNull.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FillNull.java new file mode 100644 index 000000000..16d15894c --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/FillNull.java @@ -0,0 +1,99 @@ +package org.opensearch.sql.ast.tree; + +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Literal; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class FillNull extends UnresolvedPlan { ; + + public static class NullableFieldFill { + private final Field nullableFieldReference; + private final Literal replaceNullWithMe; + + public NullableFieldFill(Field nullableFieldReference, Literal replaceNullWithMe) { + this.nullableFieldReference = Objects.requireNonNull(nullableFieldReference, "Field to replace is required"); + this.replaceNullWithMe = Objects.requireNonNull(replaceNullWithMe, "Null replacement is required"); + } + + public Field getNullableFieldReference() { + return nullableFieldReference; + } + + public Literal getReplaceNullWithMe() { + return replaceNullWithMe; + } + } + + private interface ContainNullableFieldFill { + Stream getNullFieldFill(); + } + + public static class SameValueNullFill implements ContainNullableFieldFill { + private final List replacements; + + public SameValueNullFill(Literal replaceNullWithMe, List nullableFieldReferences) { + Objects.requireNonNull(replaceNullWithMe, "Null replacement is required"); + this.replacements = Objects.requireNonNull(nullableFieldReferences, "Nullable field reference is required") + .stream() + .map(nullableReference -> new NullableFieldFill(nullableReference, replaceNullWithMe)) + .collect(Collectors.toList()); + } + + @Override + public Stream getNullFieldFill() { + return replacements.stream(); + } + } + + public static class VariousValueNullFill implements ContainNullableFieldFill { + private final List replacements; + + public VariousValueNullFill(List replacements) { + this.replacements = replacements; + } + + @Override + public Stream getNullFieldFill() { + return replacements.stream(); + } + } + + private UnresolvedPlan child; + private final SameValueNullFill sameValueNullFill; + private final VariousValueNullFill variousValueNullFill; + + public FillNull(SameValueNullFill sameValueNullFill, VariousValueNullFill variousValueNullFill) { + this.sameValueNullFill = sameValueNullFill; + this.variousValueNullFill = variousValueNullFill; + } + + public List getNullableFieldFills() { + return Stream.of(sameValueNullFill, variousValueNullFill) + .filter(Objects::nonNull) + .flatMap(ContainNullableFieldFill::getNullFieldFill) + .collect(Collectors.toList()); + } + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + + return child == null ? List.of() : List.of(child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitFillNull(this, context); + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index bd1785c85..70717b2a0 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -7,6 +7,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; +import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.Ascending$; @@ -60,6 +61,7 @@ import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Eval; +import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; @@ -84,6 +86,7 @@ import scala.Option; import scala.Option$; import scala.Tuple2; +import scala.collection.IterableLike; import scala.collection.Seq; import java.util.*; @@ -388,6 +391,37 @@ public LogicalPlan visitHead(Head node, CatalystPlanContext context) { node.getSize(), DataTypes.IntegerType), p)); } + @Override + public LogicalPlan visitFillNull(FillNull fillNull, CatalystPlanContext context) { + fillNull.getChild().get(0).accept(this, context); + List aliases = new ArrayList<>(); + for(FillNull.NullableFieldFill nullableFieldFill : fillNull.getNullableFieldFills()) { + Field field = nullableFieldFill.getNullableFieldReference(); + Literal replaceNullWithMe = nullableFieldFill.getReplaceNullWithMe(); + Function coalesce = new Function("coalesce", of(field, replaceNullWithMe)); + String fieldName = field.getField().toString(); + Alias alias = new Alias(fieldName, coalesce); + aliases.add(alias); + } + if (context.getNamedParseExpressions().isEmpty()) { + // Create an UnresolvedStar for all-fields projection + context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); + } + // ((Alias) expressionList.get(0)).child().children().head() + List toDrop = visitExpressionList(aliases, context).stream() + .map(org.apache.spark.sql.catalyst.expressions.Alias.class::cast) + .map(org.apache.spark.sql.catalyst.expressions.Alias::child) // coalesce + .map(UnresolvedFunction.class::cast)// coalesce + .map(UnresolvedFunction::children) // Seq of coalesce arguments + .map(IterableLike::head) // first function argument which is source field + .collect(Collectors.toList()); + Seq projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p); + // build the plan with the projection step + context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p)); + LogicalPlan resultWithoutDuplicatedColumns = context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(toDrop), logicalPlan)); + return Objects.requireNonNull(resultWithoutDuplicatedColumns, "FillNull operation failed"); + } + private void visitFieldList(List fieldList, CatalystPlanContext context) { fieldList.forEach(field -> visitExpression(field, context)); } @@ -694,7 +728,10 @@ public Expression visitIsEmpty(IsEmpty node, CatalystPlanContext context) { return expression; } - + @Override + public Expression visitFillNull(FillNull fillNull, CatalystPlanContext context) { + throw new IllegalStateException("Not Supported operation : FillNull"); + } @Override public Expression visitInterval(Interval node, CatalystPlanContext context) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 8ab370c7f..67632f5e1 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -11,6 +11,8 @@ import org.antlr.v4.runtime.Token; import org.antlr.v4.runtime.tree.ParseTree; import org.opensearch.flint.spark.ppl.OpenSearchPPLParser; +import org.opensearch.flint.spark.ppl.OpenSearchPPLParser.FillNullWithFieldVariousValuesContext; +import org.opensearch.flint.spark.ppl.OpenSearchPPLParser.FillNullWithTheSameValueContext; import org.opensearch.flint.spark.ppl.OpenSearchPPLParserBaseVisitor; import org.opensearch.sql.ast.expression.AggregateFunction; import org.opensearch.sql.ast.expression.Alias; @@ -28,6 +30,11 @@ import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.expression.UnresolvedArgument; import org.opensearch.sql.ast.expression.UnresolvedExpression; +import org.opensearch.sql.ast.tree.*; +import org.opensearch.sql.ast.tree.FillNull.NullableFieldFill; +import org.opensearch.sql.ast.tree.FillNull.SameValueNullFill; +import org.opensearch.sql.ast.tree.FillNull.VariousValueNullFill; +import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Correlation; import org.opensearch.sql.ast.tree.Dedupe; @@ -57,6 +64,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -498,6 +506,36 @@ public UnresolvedPlan visitKmeansCommand(OpenSearchPPLParser.KmeansCommandContex return new Kmeans(builder.build()); } + @Override + public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandContext ctx) { + // ctx contain result of parsing fillnull command. Lets transform it to UnresolvedPlan which is FillNull + FillNullWithTheSameValueContext sameValueContext = ctx.fillNullWithTheSameValue(); + FillNullWithFieldVariousValuesContext variousValuesContext = ctx.fillNullWithFieldVariousValues(); + if (sameValueContext != null) { + // todo consider using expression instead of Literal + Literal replaceNullWithMe = (Literal) internalVisitExpression(sameValueContext.nullReplacement().literalValue()); + List fieldsToReplace = sameValueContext.nullableField() + .stream() + .map(this::internalVisitExpression) + .map(Field.class::cast) + .collect(Collectors.toList()); + SameValueNullFill sameValueNullFill = new SameValueNullFill(replaceNullWithMe, fieldsToReplace); + return new FillNull(sameValueNullFill, null); + } else if (variousValuesContext != null) { + List nullableFieldFills = IntStream.range(0, variousValuesContext.nullableField().size()) + .mapToObj(index -> { + variousValuesContext.nullableField(index); + Literal replaceNullWithMe = (Literal) internalVisitExpression(variousValuesContext.nullReplacement(index).literalValue()); + Field nullableFieldReference = (Field) internalVisitExpression(variousValuesContext.nullableField(index)); + return new NullableFieldFill(nullableFieldReference, replaceNullWithMe); + }) + .collect(Collectors.toList()); + return new FillNull(null, new VariousValueNullFill(nullableFieldFills)); + } else { + throw new SyntaxCheckException("Invalid fillnull command"); + } + } + /** AD command. */ @Override public UnresolvedPlan visitAdCommand(OpenSearchPPLParser.AdCommandContext ctx) { diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanRenameCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanRenameCommandTranslatorTestSuite.scala new file mode 100644 index 000000000..9e94581e8 --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanRenameCommandTranslatorTestSuite.scala @@ -0,0 +1,173 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{DataFrameDropColumns, Project} + +class PPLLogicalPlanRenameCommandTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + test("test fillnull with one null replacement value and one column") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=relation | fillnull value = 'null replacement value' column_name"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + + val renameProjectList: Seq[NamedExpression] = + Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "coalesce", + Seq(UnresolvedAttribute("column_name"), Literal("null replacement value")), + isDistinct = false), + "column_name")()) + val renameProject = Project(renameProjectList, relation) + + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("column_name")), renameProject) + + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test fillnull with one null replacement value and multiple column") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=relation | fillnull value = 'another null replacement value' column_name_one, column_name_two, column_name_three"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + + val renameProjectList: Seq[NamedExpression] = Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "coalesce", + Seq(UnresolvedAttribute("column_name_one"), Literal("another null replacement value")), + isDistinct = false), + "column_name_one")(), + Alias( + UnresolvedFunction( + "coalesce", + Seq(UnresolvedAttribute("column_name_two"), Literal("another null replacement value")), + isDistinct = false), + "column_name_two")(), + Alias( + UnresolvedFunction( + "coalesce", + Seq( + UnresolvedAttribute("column_name_three"), + Literal("another null replacement value")), + isDistinct = false), + "column_name_three")()) + val renameProject = Project(renameProjectList, relation) + + val dropSourceColumn = DataFrameDropColumns( + Seq( + UnresolvedAttribute("column_name_one"), + UnresolvedAttribute("column_name_two"), + UnresolvedAttribute("column_name_three")), + renameProject) + + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test fillnull with possibly various null replacement value and one column") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | fillnull fields column_name='null replacement value'"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + + val renameProjectList: Seq[NamedExpression] = + Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "coalesce", + Seq(UnresolvedAttribute("column_name"), Literal("null replacement value")), + isDistinct = false), + "column_name")()) + val renameProject = Project(renameProjectList, relation) + + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("column_name")), renameProject) + + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test fillnull with possibly various null replacement value and three columns") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=relation | fillnull fields column_name_1='null replacement value 1', column_name_2='null replacement value 2', column_name_3='null replacement value 3'"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + + val renameProjectList: Seq[NamedExpression] = Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "coalesce", + Seq(UnresolvedAttribute("column_name_1"), Literal("null replacement value 1")), + isDistinct = false), + "column_name_1")(), + Alias( + UnresolvedFunction( + "coalesce", + Seq(UnresolvedAttribute("column_name_2"), Literal("null replacement value 2")), + isDistinct = false), + "column_name_2")(), + Alias( + UnresolvedFunction( + "coalesce", + Seq(UnresolvedAttribute("column_name_3"), Literal("null replacement value 3")), + isDistinct = false), + "column_name_3")()) + val renameProject = Project(renameProjectList, relation) + + val dropSourceColumn = DataFrameDropColumns( + Seq( + UnresolvedAttribute("column_name_1"), + UnresolvedAttribute("column_name_2"), + UnresolvedAttribute("column_name_3")), + renameProject) + + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } +}