Skip to content

Commit

Permalink
Translate PPL Dedup Command: only one duplication allowd
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Jin <[email protected]>
  • Loading branch information
LantaoJin committed Aug 5, 2024
1 parent d3e54d4 commit 941155a
Show file tree
Hide file tree
Showing 5 changed files with 736 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,40 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| """.stripMargin)
}

protected def createDuplicationNullableTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| id INT,
| name STRING,
| category STRING
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES (1, "A", "X"),
| (2, "A", "Y"),
| (3, "A", "Y"),
| (4, "B", "Z"),
| (5, "B", "Z"),
| (6, "B", "Z"),
| (7, "C", "X"),
| (8, null, "Y"),
| (9, "D", "Z"),
| (10, "E", null),
| (11, "A", "X"),
| (12, "A", "Y"),
| (13, null, "X"),
| (14, "B", null),
| (15, "B", "Y"),
| (16, null, "Z"),
| (17, "C", "X"),
| (18, null, null)
| """.stripMargin)
}

protected def createTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull, IsNull, Or}
import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Filter, LogicalPlan, Project, Union}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLDedupITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createDuplicationNullableTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test dedupe 1 name") {
val frame = sql(s"""
| source = $testTable | dedup 1 name | fields name
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row("A"), Row("B"), Row("C"), Row("D"), Row("E"))
implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"))
val dedupKeys = Seq(UnresolvedAttribute("name"))
val filter = Filter(IsNotNull(UnresolvedAttribute("name")), table)
val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name, category") {
val frame = sql(s"""
| source = $testTable | dedup 1 name, category | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"))
implicit val twoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => (row.getAs(0), row.getAs(1)))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val dedupKeys = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val filter = Filter(
And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))),
table)
val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable | dedup 1 name KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("E", null),
Row(null, "Y"),
Row(null, "X"),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(
results.sorted
.map(_.getAs[String](0))
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val isNotNullFilter =
Filter(IsNotNull(UnresolvedAttribute("name")), table)
val deduplicate = Deduplicate(Seq(UnresolvedAttribute("name")), isNotNullFilter)
val isNullFilter = Filter(IsNull(UnresolvedAttribute("name")), table)
val union = Union(deduplicate, isNullFilter)
val expectedPlan = Project(fieldsProjectList, union)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable | dedup 1 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val isNotNullFilter = Filter(
And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))),
table)
val deduplicate = Deduplicate(
Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")),
isNotNullFilter)
val isNullFilter = Filter(
Or(IsNull(UnresolvedAttribute("name")), IsNull(UnresolvedAttribute("category"))),
table)
val union = Union(deduplicate, isNullFilter)
val expectedPlan = Project(fieldsProjectList, union)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test 1 name CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 1 name CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive duplicate events are not supported"))
}

test("test 1 name KEEPEMPTY=true CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 1 name KEEPEMPTY=true CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive duplicate events are not supported"))
}

ignore("test dedupe 2 name") {
val frame = sql(s"""
| source = $testTable| dedup 2 name | fields name
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] =
Array(Row("A"), Row("A"), Row("B"), Row("B"), Row("C"), Row("C"), Row("D"), Row("E"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name, category") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](row => {
val value = row.getAs[String](0)
if (value == null) String.valueOf(Int.MaxValue) else value
})
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row("E", null),
Row(null, "Y"),
Row(null, "X"),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(
results.sorted
.map(_.getAs[String](0))
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))
}

ignore("test dedupe 2 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test 2 name CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 2 name CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive duplicate events are not supported"))
}

test("test 2 name KEEPEMPTY=true CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 2 name KEEPEMPTY=true CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive duplicate events are not supported"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ commands
| correlateCommand
| fieldsCommand
| statsCommand
| dedupCommand
| sortCommand
| headCommand
| evalCommand
Expand Down
Loading

0 comments on commit 941155a

Please sign in to comment.