From 7c4244f0b0341b9b5270b009d1a6d562d6f05cf6 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Fri, 9 Aug 2024 06:00:13 +0800 Subject: [PATCH] Translate PPL `dedup` Command Part 1: allowedDuplication=1 (#521) * Translate PPL Dedup Command: only one duplication allowd Signed-off-by: Lantao Jin * add document Signed-off-by: Lantao Jin --------- Signed-off-by: Lantao Jin Signed-off-by: YANGDB Co-authored-by: YANGDB --- .../flint/spark/FlintSparkSuite.scala | 34 ++ .../spark/ppl/FlintSparkPPLDedupITSuite.scala | 310 ++++++++++++++++++ ppl-spark-integration/README.md | 17 +- .../src/main/antlr4/OpenSearchPPLParser.g4 | 1 + .../sql/ppl/CatalystQueryPlanVisitor.java | 102 +++++- ...LLogicalPlanDedupTranslatorTestSuite.scala | 290 ++++++++++++++++ 6 files changed, 752 insertions(+), 2 deletions(-) create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLDedupITSuite.scala create mode 100644 ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanDedupTranslatorTestSuite.scala diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index ea9945fce..8766a8cb0 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -272,6 +272,40 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | """.stripMargin) } + protected def createDuplicationNullableTable(testTable: String): Unit = { + sql(s""" + | CREATE TABLE $testTable + | ( + | id INT, + | name STRING, + | category STRING + | ) + | USING $tableType $tableOptions + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | VALUES (1, "A", "X"), + | (2, "A", "Y"), + | (3, "A", "Y"), + | (4, "B", "Z"), + | (5, "B", "Z"), + | (6, "B", "Z"), + | (7, "C", "X"), + | (8, null, "Y"), + | (9, "D", "Z"), + | (10, "E", null), + | (11, "A", "X"), + | (12, "A", "Y"), + | (13, null, "X"), + | (14, "B", null), + | (15, "B", "Y"), + | (16, null, "Z"), + | (17, "C", "X"), + | (18, null, null) + | """.stripMargin) + } + protected def createTimeSeriesTable(testTable: String): Unit = { sql(s""" | CREATE TABLE $testTable diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLDedupITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLDedupITSuite.scala new file mode 100644 index 000000000..06c90527d --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLDedupITSuite.scala @@ -0,0 +1,310 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} +import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull, IsNull, Or} +import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Filter, LogicalPlan, Project, Union} +import org.apache.spark.sql.streaming.StreamTest + +class FlintSparkPPLDedupITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.flint_ppl_test" + + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test table + createDuplicationNullableTable(testTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("test dedupe 1 name") { + val frame = sql(s""" + | source = $testTable | dedup 1 name | fields name + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = Array(Row("A"), Row("B"), Row("C"), Row("D"), Row("E")) + implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val fieldsProjectList = Seq(UnresolvedAttribute("name")) + val dedupKeys = Seq(UnresolvedAttribute("name")) + val filter = Filter(IsNotNull(UnresolvedAttribute("name")), table) + val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter)) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test dedupe 1 name, category") { + val frame = sql(s""" + | source = $testTable | dedup 1 name, category | fields name, category + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row("A", "X"), + Row("A", "Y"), + Row("B", "Z"), + Row("C", "X"), + Row("D", "Z"), + Row("B", "Y")) + implicit val twoColsRowOrdering: Ordering[Row] = + Ordering.by[Row, (String, String)](row => (row.getAs(0), row.getAs(1))) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")) + val dedupKeys = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")) + val filter = Filter( + And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))), + table) + val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter)) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test dedupe 1 name KEEPEMPTY=true") { + val frame = sql(s""" + | source = $testTable | dedup 1 name KEEPEMPTY=true | fields name, category + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row("A", "X"), + Row("B", "Z"), + Row("C", "X"), + Row("D", "Z"), + Row("E", null), + Row(null, "Y"), + Row(null, "X"), + Row(null, "Z"), + Row(null, null)) + implicit val nullableTwoColsRowOrdering: Ordering[Row] = + Ordering.by[Row, (String, String)](row => { + val value0 = row.getAs[String](0) + val value1 = row.getAs[String](1) + ( + if (value0 == null) String.valueOf(Int.MaxValue) else value0, + if (value1 == null) String.valueOf(Int.MaxValue) else value1) + }) + assert( + results.sorted + .map(_.getAs[String](0)) + .sameElements(expectedResults.sorted.map(_.getAs[String](0)))) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val isNotNullFilter = + Filter(IsNotNull(UnresolvedAttribute("name")), table) + val deduplicate = Deduplicate(Seq(UnresolvedAttribute("name")), isNotNullFilter) + val isNullFilter = Filter(IsNull(UnresolvedAttribute("name")), table) + val union = Union(deduplicate, isNullFilter) + val expectedPlan = Project(fieldsProjectList, union) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test dedupe 1 name, category KEEPEMPTY=true") { + val frame = sql(s""" + | source = $testTable | dedup 1 name, category KEEPEMPTY=true | fields name, category + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row("A", "X"), + Row("A", "Y"), + Row("B", "Z"), + Row("C", "X"), + Row("D", "Z"), + Row("B", "Y"), + Row(null, "Y"), + Row("E", null), + Row(null, "X"), + Row("B", null), + Row(null, "Z"), + Row(null, null)) + implicit val nullableTwoColsRowOrdering: Ordering[Row] = + Ordering.by[Row, (String, String)](row => { + val value0 = row.getAs[String](0) + val value1 = row.getAs[String](1) + ( + if (value0 == null) String.valueOf(Int.MaxValue) else value0, + if (value1 == null) String.valueOf(Int.MaxValue) else value1) + }) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")) + val isNotNullFilter = Filter( + And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))), + table) + val deduplicate = Deduplicate( + Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")), + isNotNullFilter) + val isNullFilter = Filter( + Or(IsNull(UnresolvedAttribute("name")), IsNull(UnresolvedAttribute("category"))), + table) + val union = Union(deduplicate, isNullFilter) + val expectedPlan = Project(fieldsProjectList, union) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test 1 name CONSECUTIVE=true") { + val ex = intercept[UnsupportedOperationException](sql(s""" + | source = $testTable | dedup 1 name CONSECUTIVE=true | fields name + | """.stripMargin)) + assert(ex.getMessage.contains("Consecutive deduplication is not supported")) + } + + test("test 1 name KEEPEMPTY=true CONSECUTIVE=true") { + val ex = intercept[UnsupportedOperationException](sql(s""" + | source = $testTable | dedup 1 name KEEPEMPTY=true CONSECUTIVE=true | fields name + | """.stripMargin)) + assert(ex.getMessage.contains("Consecutive deduplication is not supported")) + } + + ignore("test dedupe 2 name") { + val frame = sql(s""" + | source = $testTable| dedup 2 name | fields name + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = + Array(Row("A"), Row("A"), Row("B"), Row("B"), Row("C"), Row("C"), Row("D"), Row("E")) + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + + ignore("test dedupe 2 name, category") { + val frame = sql(s""" + | source = $testTable| dedup 2 name, category | fields name, category + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row("A", "X"), + Row("A", "X"), + Row("A", "Y"), + Row("A", "Y"), + Row("B", "Y"), + Row("B", "Z"), + Row("B", "Z"), + Row("C", "X"), + Row("C", "X"), + Row("D", "Z")) + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](row => { + val value = row.getAs[String](0) + if (value == null) String.valueOf(Int.MaxValue) else value + }) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + + ignore("test dedupe 2 name KEEPEMPTY=true") { + val frame = sql(s""" + | source = $testTable| dedup 2 name KEEPEMPTY=true | fields name, category + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row("A", "X"), + Row("A", "Y"), + Row("B", "Z"), + Row("B", "Z"), + Row("C", "X"), + Row("C", "X"), + Row("D", "Z"), + Row("E", null), + Row(null, "Y"), + Row(null, "X"), + Row(null, "Z"), + Row(null, null)) + implicit val nullableTwoColsRowOrdering: Ordering[Row] = + Ordering.by[Row, (String, String)](row => { + val value0 = row.getAs[String](0) + val value1 = row.getAs[String](1) + ( + if (value0 == null) String.valueOf(Int.MaxValue) else value0, + if (value1 == null) String.valueOf(Int.MaxValue) else value1) + }) + assert( + results.sorted + .map(_.getAs[String](0)) + .sameElements(expectedResults.sorted.map(_.getAs[String](0)))) + } + + ignore("test dedupe 2 name, category KEEPEMPTY=true") { + val frame = sql(s""" + | source = $testTable| dedup 2 name, category KEEPEMPTY=true | fields name, category + | """.stripMargin) + + val results: Array[Row] = frame.collect() + // results.foreach(println(_)) + val expectedResults: Array[Row] = Array( + Row("A", "X"), + Row("A", "X"), + Row("A", "Y"), + Row("A", "Y"), + Row("B", "Y"), + Row("B", "Z"), + Row("B", "Z"), + Row("C", "X"), + Row("C", "X"), + Row("D", "Z"), + Row(null, "Y"), + Row("E", null), + Row(null, "X"), + Row("B", null), + Row(null, "Z"), + Row(null, null)) + implicit val nullableTwoColsRowOrdering: Ordering[Row] = + Ordering.by[Row, (String, String)](row => { + val value0 = row.getAs[String](0) + val value1 = row.getAs[String](1) + ( + if (value0 == null) String.valueOf(Int.MaxValue) else value0, + if (value1 == null) String.valueOf(Int.MaxValue) else value1) + }) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + + test("test 2 name CONSECUTIVE=true") { + val ex = intercept[UnsupportedOperationException](sql(s""" + | source = $testTable | dedup 2 name CONSECUTIVE=true | fields name + | """.stripMargin)) + assert(ex.getMessage.contains("Consecutive deduplication is not supported")) + } + + test("test 2 name KEEPEMPTY=true CONSECUTIVE=true") { + val ex = intercept[UnsupportedOperationException](sql(s""" + | source = $testTable | dedup 2 name KEEPEMPTY=true CONSECUTIVE=true | fields name + | """.stripMargin)) + assert(ex.getMessage.contains("Consecutive deduplication is not supported")) + } +} diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index 2a17fe19f..6b8f2ac5c 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -272,10 +272,24 @@ Limitation: Overriding existing field is unsupported, following queries throw ex - `source = table | stats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date` - `source = table | stats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId` ---- +**Dedup** + +- `source = table | dedup a | fields a,b,c` +- `source = table | dedup a,b | fields a,b,c` +- `source = table | dedup a keepempty=true | fields a,b,c` +- `source = table | dedup a,b keepempty=true | fields a,b,c` +- `source = table | dedup 1 a | fields a,b,c` +- `source = table | dedup 1 a,b | fields a,b,c` +- `source = table | dedup 1 a keepempty=true | fields a,b,c` +- `source = table | dedup 1 a,b keepempty=true | fields a,b,c` +- `source = table | dedup 1 a consecutive=true| fields a,b,c` (Unsupported) +- `source = table | dedup 2 a | fields a,b,c` (Unsupported) + For additional details on PPL commands - view [PPL Commands Docs](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst) +--- + For additional details on Spark PPL commands project, see [PPL Project](https://github.com/orgs/opensearch-project/projects/214/views/2) For additional details on Spark PPL commands support campaign, see [PPL Commands Campaign](https://github.com/opensearch-project/opensearch-spark/issues/408) @@ -284,3 +298,4 @@ For additional details on Spark PPL commands support campaign, see [PPL Commands > This is an experimental command - it may be removed in future versions + \ No newline at end of file diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 765e54d93..19d480327 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -35,6 +35,7 @@ commands | correlateCommand | fieldsCommand | statsCommand + | dedupCommand | sortCommand | headCommand | evalCommand diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 5910efd39..812cbea82 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -14,9 +14,11 @@ import org.apache.spark.sql.catalyst.expressions.Predicate; import org.apache.spark.sql.catalyst.expressions.SortOrder; import org.apache.spark.sql.catalyst.plans.logical.Aggregate; +import org.apache.spark.sql.catalyst.plans.logical.Deduplicate; import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$; import org.apache.spark.sql.catalyst.plans.logical.Limit; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.Union; import org.apache.spark.sql.execution.command.DescribeTableCommand; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -296,7 +298,105 @@ public LogicalPlan visitWindowFunction(WindowFunction node, CatalystPlanContext @Override public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) { - throw new IllegalStateException("Not Supported operation : dedupe "); + node.getChild().get(0).accept(this, context); + List options = node.getOptions(); + Integer allowedDuplication = (Integer) options.get(0).getValue().getValue(); + Boolean keepEmpty = (Boolean) options.get(1).getValue().getValue(); + Boolean consecutive = (Boolean) options.get(2).getValue().getValue(); + if (allowedDuplication <= 0) { + throw new IllegalArgumentException("Number of duplicate events must be greater than 0"); + } + if (consecutive) { + // Spark is not able to remove only consecutive events + throw new UnsupportedOperationException("Consecutive deduplication is not supported"); + } + visitFieldList(node.getFields(), context); + // Columns to deduplicate + Seq dedupFields + = context.retainAllNamedParseExpressions(e -> (org.apache.spark.sql.catalyst.expressions.Attribute) e); + // Although we can also use the Window operator to translate this as allowedDuplication > 1 did, + // adding Aggregate operator could achieve better performance. + if (allowedDuplication == 1) { + if (keepEmpty) { + // Union + // :- Deduplicate ['a, 'b] + // : +- Filter (isnotnull('a) AND isnotnull('b) + // : +- Project + // : +- UnresolvedRelation + // +- Filter (isnull('a) OR isnull('a)) + // +- Project + // +- UnresolvedRelation + + context.apply(p -> { + Expression isNullExpr = buildIsNullFilterExpression(node, context); + LogicalPlan right = new org.apache.spark.sql.catalyst.plans.logical.Filter(isNullExpr, p); + + Expression isNotNullExpr = buildIsNotNullFilterExpression(node, context); + LogicalPlan left = + new Deduplicate(dedupFields, + new org.apache.spark.sql.catalyst.plans.logical.Filter(isNotNullExpr, p)); + return new Union(seq(left, right), false, false); + }); + return context.getPlan(); + } else { + // Deduplicate ['a, 'b] + // +- Filter (isnotnull('a) AND isnotnull('b)) + // +- Project + // +- UnresolvedRelation + + Expression isNotNullExpr = buildIsNotNullFilterExpression(node, context); + context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Filter(isNotNullExpr, p)); + // Todo DeduplicateWithinWatermark in streaming dataset? + return context.apply(p -> new Deduplicate(dedupFields, p)); + } + } else { + // TODO + throw new UnsupportedOperationException("Number of duplicate events greater than 1 is not supported"); + } + } + + private Expression buildIsNotNullFilterExpression(Dedupe node, CatalystPlanContext context) { + visitFieldList(node.getFields(), context); + Seq isNotNullExpressions = + context.retainAllNamedParseExpressions( + org.apache.spark.sql.catalyst.expressions.IsNotNull$.MODULE$::apply); + + Expression isNotNullExpr; + if (isNotNullExpressions.size() == 1) { + isNotNullExpr = isNotNullExpressions.apply(0); + } else { + isNotNullExpr = isNotNullExpressions.reduce( + new scala.Function2() { + @Override + public Expression apply(Expression e1, Expression e2) { + return new org.apache.spark.sql.catalyst.expressions.And(e1, e2); + } + } + ); + } + return isNotNullExpr; + } + + private Expression buildIsNullFilterExpression(Dedupe node, CatalystPlanContext context) { + visitFieldList(node.getFields(), context); + Seq isNullExpressions = + context.retainAllNamedParseExpressions( + org.apache.spark.sql.catalyst.expressions.IsNull$.MODULE$::apply); + + Expression isNullExpr; + if (isNullExpressions.size() == 1) { + isNullExpr = isNullExpressions.apply(0); + } else { + isNullExpr = isNullExpressions.reduce( + new scala.Function2() { + @Override + public Expression apply(Expression e1, Expression e2) { + return new org.apache.spark.sql.catalyst.expressions.Or(e1, e2); + } + } + ); + } + return isNullExpr; } /** diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanDedupTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanDedupTranslatorTestSuite.scala new file mode 100644 index 000000000..34cfcbd90 --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanDedupTranslatorTestSuite.scala @@ -0,0 +1,290 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull, IsNull, NamedExpression, Or} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Filter, Project, Union} + +class PPLLogicalPlanDedupTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + test("test dedup a") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=table | dedup a | fields a", false), context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("a")) + val filter = Filter(IsNotNull(UnresolvedAttribute("a")), UnresolvedRelation(Seq("table"))) + val deduplicate = Deduplicate(Seq(UnresolvedAttribute("a")), filter) + val expectedPlan = Project(projectList, deduplicate) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test dedup a, b, c") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup a, b, c | fields a, b, c", false), + context) + + val projectList: Seq[NamedExpression] = + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")) + val filter = Filter( + And( + And(IsNotNull(UnresolvedAttribute("a")), IsNotNull(UnresolvedAttribute("b"))), + IsNotNull(UnresolvedAttribute("c"))), + UnresolvedRelation(Seq("table"))) + val deduplicate = Deduplicate( + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")), + filter) + val expectedPlan = Project(projectList, deduplicate) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test dedup a keepempty=true") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup a keepempty=true | fields a", false), + context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("a")) + val isNotNullFilter = + Filter(IsNotNull(UnresolvedAttribute("a")), UnresolvedRelation(Seq("table"))) + val deduplicate = Deduplicate(Seq(UnresolvedAttribute("a")), isNotNullFilter) + val isNullFilter = Filter(IsNull(UnresolvedAttribute("a")), UnresolvedRelation(Seq("table"))) + val union = Union(deduplicate, isNullFilter) + val expectedPlan = Project(projectList, union) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test dedup a, b, c keepempty=true") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup a, b, c keepempty=true | fields a, b, c", false), + context) + + val projectList: Seq[NamedExpression] = + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")) + val isNotNullFilter = Filter( + And( + And(IsNotNull(UnresolvedAttribute("a")), IsNotNull(UnresolvedAttribute("b"))), + IsNotNull(UnresolvedAttribute("c"))), + UnresolvedRelation(Seq("table"))) + val deduplicate = Deduplicate( + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")), + isNotNullFilter) + val isNullFilter = Filter( + Or( + Or(IsNull(UnresolvedAttribute("a")), IsNull(UnresolvedAttribute("b"))), + IsNull(UnresolvedAttribute("c"))), + UnresolvedRelation(Seq("table"))) + val union = Union(deduplicate, isNullFilter) + val expectedPlan = Project(projectList, union) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test dedup a consecutive=true") { + val context = new CatalystPlanContext + val ex = intercept[UnsupportedOperationException] { + planTransformer.visit( + plan(pplParser, "source=table | dedup a consecutive=true | fields a", false), + context) + } + assert(ex.getMessage === "Consecutive deduplication is not supported") + } + + test("test dedup a keepempty=true consecutive=true") { + val context = new CatalystPlanContext + val ex = intercept[UnsupportedOperationException] { + planTransformer.visit( + plan( + pplParser, + "source=table | dedup a keepempty=true consecutive=true | fields a", + false), + context) + } + assert(ex.getMessage === "Consecutive deduplication is not supported") + } + + test("test dedup 1 a") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup 1 a | fields a", false), + context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("a")) + val filter = Filter(IsNotNull(UnresolvedAttribute("a")), UnresolvedRelation(Seq("table"))) + val deduplicate = Deduplicate(Seq(UnresolvedAttribute("a")), filter) + val expectedPlan = Project(projectList, deduplicate) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test dedup 1 a, b, c") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup 1 a, b, c | fields a, b, c", false), + context) + + val projectList: Seq[NamedExpression] = + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")) + val filter = Filter( + And( + And(IsNotNull(UnresolvedAttribute("a")), IsNotNull(UnresolvedAttribute("b"))), + IsNotNull(UnresolvedAttribute("c"))), + UnresolvedRelation(Seq("table"))) + val deduplicate = Deduplicate( + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")), + filter) + val expectedPlan = Project(projectList, deduplicate) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test dedup 1 a keepempty=true") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup 1 a keepempty=true | fields a", false), + context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("a")) + val isNotNullFilter = + Filter(IsNotNull(UnresolvedAttribute("a")), UnresolvedRelation(Seq("table"))) + val deduplicate = Deduplicate(Seq(UnresolvedAttribute("a")), isNotNullFilter) + val isNullFilter = Filter(IsNull(UnresolvedAttribute("a")), UnresolvedRelation(Seq("table"))) + val union = Union(deduplicate, isNullFilter) + val expectedPlan = Project(projectList, union) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test dedup 1 a, b, c keepempty=true") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup 1 a, b, c keepempty=true | fields a, b, c", false), + context) + + val projectList: Seq[NamedExpression] = + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")) + val isNotNullFilter = Filter( + And( + And(IsNotNull(UnresolvedAttribute("a")), IsNotNull(UnresolvedAttribute("b"))), + IsNotNull(UnresolvedAttribute("c"))), + UnresolvedRelation(Seq("table"))) + val deduplicate = Deduplicate( + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")), + isNotNullFilter) + val isNullFilter = Filter( + Or( + Or(IsNull(UnresolvedAttribute("a")), IsNull(UnresolvedAttribute("b"))), + IsNull(UnresolvedAttribute("c"))), + UnresolvedRelation(Seq("table"))) + val union = Union(deduplicate, isNullFilter) + val expectedPlan = Project(projectList, union) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test dedup 1 a consecutive=true") { + val context = new CatalystPlanContext + val ex = intercept[UnsupportedOperationException] { + planTransformer.visit( + plan(pplParser, "source=table | dedup 1 a consecutive=true | fields a", false), + context) + } + assert(ex.getMessage === "Consecutive deduplication is not supported") + } + + test("test dedup 1 a keepempty=true consecutive=true") { + val context = new CatalystPlanContext + val ex = intercept[UnsupportedOperationException] { + planTransformer.visit( + plan( + pplParser, + "source=table | dedup 1 a keepempty=true consecutive=true | fields a", + false), + context) + } + assert(ex.getMessage === "Consecutive deduplication is not supported") + } + + test("test dedup 0") { + val context = new CatalystPlanContext + val ex = intercept[IllegalArgumentException] { + planTransformer.visit( + plan(pplParser, "source=table | dedup 0 a | fields a", false), + context) + } + assert(ex.getMessage === "Number of duplicate events must be greater than 0") + } + + // Todo + ignore("test dedup 2 a") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup 2 a | fields a", false), + context) + + } + + // Todo + ignore("test dedup 2 a, b, c") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup 2 a, b, c | fields a, b, c", false), + context) + + } + + // Todo + ignore("test dedup 2 a keepempty=true") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup 2 a keepempty=true | fields a", false), + context) + + } + + // Todo + ignore("test dedup 2 a, b, c keepempty=true") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=table | dedup 2 a, b, c keepempty=true | fields a, b, c", false), + context) + + } + + test("test dedup 2 a consecutive=true") { + val context = new CatalystPlanContext + val ex = intercept[UnsupportedOperationException] { + planTransformer.visit( + plan(pplParser, "source=table | dedup 2 a consecutive=true | fields a | fields a", false), + context) + } + assert(ex.getMessage === "Consecutive deduplication is not supported") + } + + test("test dedup 2 a keepempty=true consecutive=true") { + val context = new CatalystPlanContext + val ex = intercept[UnsupportedOperationException] { + planTransformer.visit( + plan( + pplParser, + "source=table | dedup 2 a keepempty=true consecutive=true | fields a", + false), + context) + } + assert(ex.getMessage === "Consecutive deduplication is not supported") + } +}