Skip to content

Commit

Permalink
Translate PPL dedup Command Part 1: allowedDuplication=1 (#521)
Browse files Browse the repository at this point in the history
* Translate PPL Dedup Command: only one duplication allowd

Signed-off-by: Lantao Jin <[email protected]>

* add document

Signed-off-by: Lantao Jin <[email protected]>

---------

Signed-off-by: Lantao Jin <[email protected]>
Signed-off-by: YANGDB <[email protected]>
Co-authored-by: YANGDB <[email protected]>
  • Loading branch information
LantaoJin and YANG-DB authored Aug 8, 2024
1 parent d6e71fa commit 7c4244f
Show file tree
Hide file tree
Showing 6 changed files with 752 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,40 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| """.stripMargin)
}

protected def createDuplicationNullableTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| id INT,
| name STRING,
| category STRING
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES (1, "A", "X"),
| (2, "A", "Y"),
| (3, "A", "Y"),
| (4, "B", "Z"),
| (5, "B", "Z"),
| (6, "B", "Z"),
| (7, "C", "X"),
| (8, null, "Y"),
| (9, "D", "Z"),
| (10, "E", null),
| (11, "A", "X"),
| (12, "A", "Y"),
| (13, null, "X"),
| (14, "B", null),
| (15, "B", "Y"),
| (16, null, "Z"),
| (17, "C", "X"),
| (18, null, null)
| """.stripMargin)
}

protected def createTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull, IsNull, Or}
import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Filter, LogicalPlan, Project, Union}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLDedupITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createDuplicationNullableTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test dedupe 1 name") {
val frame = sql(s"""
| source = $testTable | dedup 1 name | fields name
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row("A"), Row("B"), Row("C"), Row("D"), Row("E"))
implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"))
val dedupKeys = Seq(UnresolvedAttribute("name"))
val filter = Filter(IsNotNull(UnresolvedAttribute("name")), table)
val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name, category") {
val frame = sql(s"""
| source = $testTable | dedup 1 name, category | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"))
implicit val twoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => (row.getAs(0), row.getAs(1)))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val dedupKeys = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val filter = Filter(
And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))),
table)
val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable | dedup 1 name KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("E", null),
Row(null, "Y"),
Row(null, "X"),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(
results.sorted
.map(_.getAs[String](0))
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val isNotNullFilter =
Filter(IsNotNull(UnresolvedAttribute("name")), table)
val deduplicate = Deduplicate(Seq(UnresolvedAttribute("name")), isNotNullFilter)
val isNullFilter = Filter(IsNull(UnresolvedAttribute("name")), table)
val union = Union(deduplicate, isNullFilter)
val expectedPlan = Project(fieldsProjectList, union)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable | dedup 1 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val isNotNullFilter = Filter(
And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))),
table)
val deduplicate = Deduplicate(
Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")),
isNotNullFilter)
val isNullFilter = Filter(
Or(IsNull(UnresolvedAttribute("name")), IsNull(UnresolvedAttribute("category"))),
table)
val union = Union(deduplicate, isNullFilter)
val expectedPlan = Project(fieldsProjectList, union)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test 1 name CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 1 name CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

test("test 1 name KEEPEMPTY=true CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 1 name KEEPEMPTY=true CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

ignore("test dedupe 2 name") {
val frame = sql(s"""
| source = $testTable| dedup 2 name | fields name
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] =
Array(Row("A"), Row("A"), Row("B"), Row("B"), Row("C"), Row("C"), Row("D"), Row("E"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name, category") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](row => {
val value = row.getAs[String](0)
if (value == null) String.valueOf(Int.MaxValue) else value
})
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row("E", null),
Row(null, "Y"),
Row(null, "X"),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(
results.sorted
.map(_.getAs[String](0))
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))
}

ignore("test dedupe 2 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test 2 name CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 2 name CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

test("test 2 name KEEPEMPTY=true CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 2 name KEEPEMPTY=true CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}
}
17 changes: 16 additions & 1 deletion ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,24 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- `source = table | stats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date`
- `source = table | stats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId`

---
**Dedup**

- `source = table | dedup a | fields a,b,c`
- `source = table | dedup a,b | fields a,b,c`
- `source = table | dedup a keepempty=true | fields a,b,c`
- `source = table | dedup a,b keepempty=true | fields a,b,c`
- `source = table | dedup 1 a | fields a,b,c`
- `source = table | dedup 1 a,b | fields a,b,c`
- `source = table | dedup 1 a keepempty=true | fields a,b,c`
- `source = table | dedup 1 a,b keepempty=true | fields a,b,c`
- `source = table | dedup 1 a consecutive=true| fields a,b,c` (Unsupported)
- `source = table | dedup 2 a | fields a,b,c` (Unsupported)


For additional details on PPL commands - view [PPL Commands Docs](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst)

---

For additional details on Spark PPL commands project, see [PPL Project](https://github.com/orgs/opensearch-project/projects/214/views/2)
For additional details on Spark PPL commands support campaign, see [PPL Commands Campaign](https://github.com/opensearch-project/opensearch-spark/issues/408)

Expand All @@ -284,3 +298,4 @@ For additional details on Spark PPL commands support campaign, see [PPL Commands

> This is an experimental command - it may be removed in future versions

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ commands
| correlateCommand
| fieldsCommand
| statsCommand
| dedupCommand
| sortCommand
| headCommand
| evalCommand
Expand Down
Loading

0 comments on commit 7c4244f

Please sign in to comment.