Skip to content

Commit

Permalink
add functional approximation support for:
Browse files Browse the repository at this point in the history
- distinct count
- top
- rare

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Nov 9, 2024
1 parent 182689c commit fd45d52
Show file tree
Hide file tree
Showing 16 changed files with 441 additions and 22 deletions.
5 changes: 5 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ source = table | where ispresent(a) |
- `source = table | stats max(c) by b`
- `source = table | stats count(c) by b | head 5`
- `source = table | stats distinct_count(c)`
- `source = table | stats distinct_count_approx(c)`
- `source = table | stats stddev_samp(c)`
- `source = table | stats stddev_pop(c)`
- `source = table | stats percentile(c, 90)`
Expand All @@ -202,6 +203,7 @@ source = table | where ispresent(a) |
- `source = table | where a < 50 | eventstats avg(c) `
- `source = table | eventstats max(c) by b`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats stddev_samp(c)`
- `source = table | eventstats stddev_pop(c)`
- `source = table | eventstats percentile(c, 90)`
Expand Down Expand Up @@ -246,12 +248,15 @@ source = table | where ispresent(a) |

- `source=accounts | rare gender`
- `source=accounts | rare age by gender`
- `source=accounts | rare 5 age by gender`
- `source=accounts | rare_approx age by gender`

#### **Top**
[See additional command details](ppl-top-command.md)

- `source=accounts | top gender`
- `source=accounts | top 1 gender`
- `source=accounts | top_approx 5 gender`
- `source=accounts | top 1 age by gender`

#### **Parse**
Expand Down
9 changes: 7 additions & 2 deletions docs/ppl-lang/ppl-rare-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ Using ``rare`` command to find the least common tuple of values of all fields in
**Note**: A maximum of 10 results is returned for each distinct tuple of values of the group-by fields.

**Syntax**
`rare <field-list> [by-clause]`
`rare [N] <field-list> [by-clause]`
`rare_approx [N] <field-list> [by-clause]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.
* top_approx: approximate the count by using estimated [cardinality by HyperLogLog++ algorithm](https://spark.apache.org/docs/3.5.2/sql-ref-functions-builtin.html).


### Example 1: Find the least common values in a field
Expand All @@ -19,6 +22,7 @@ The example finds least common gender of all the accounts.
PPL query:

os> source=accounts | rare gender;
os> source=accounts | rare_approx gender;
fetched rows / total rows = 2/2
+----------+
| gender |
Expand All @@ -34,7 +38,8 @@ The example finds least common age of all the accounts group by gender.

PPL query:

os> source=accounts | rare age by gender;
os> source=accounts | rare 5 age by gender;
os> source=accounts | rare_approx 5 age by gender;
fetched rows / total rows = 4/4
+----------+-------+
| gender | age |
Expand Down
7 changes: 5 additions & 2 deletions docs/ppl-lang/ppl-top-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ Using ``top`` command to find the most common tuple of values of all fields in t

### Syntax
`top [N] <field-list> [by-clause]`
`top_approx [N] <field-list> [by-clause]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.

* top_approx: approximate the count by using estimated [cardinality by HyperLogLog++ algorithm](https://spark.apache.org/docs/3.5.2/sql-ref-functions-builtin.html).

### Example 1: Find the most common values in a field

Expand All @@ -19,6 +20,7 @@ The example finds most common gender of all the accounts.
PPL query:

os> source=accounts | top gender;
os> source=accounts_approx | top gender;
fetched rows / total rows = 2/2
+----------+
| gender |
Expand All @@ -33,7 +35,7 @@ The example finds most common gender of all the accounts.

PPL query:

os> source=accounts | top 1 gender;
os> source=accounts_approx | top 1 gender;
fetched rows / total rows = 1/1
+----------+
| gender |
Expand All @@ -48,6 +50,7 @@ The example finds most common age of all the accounts group by gender.
PPL query:

os> source=accounts | top 1 age by gender;
os> source=accounts | top_approx 1 age by gender;
fetched rows / total rows = 2/2
+----------+-------+
| gender | age |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,45 @@ class FlintSparkPPLTopAndRareITSuite
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl rare address field query test with approximation") {
val frame = sql(s"""
| source = $testTable| rare_approx address
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val addressField = UnresolvedAttribute("address")
val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))

val aggregateExpressions = Seq(
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(addressField), isDistinct = false),
"count_address")(),
addressField)
val aggregatePlan =
Aggregate(
Seq(addressField),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(addressField), isDistinct = false),
"count_address")(),
Ascending)),
global = true,
aggregatePlan)
val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl rare address by age field query test") {
val frame = sql(s"""
| source = $testTable| rare address by age
Expand Down Expand Up @@ -131,6 +170,102 @@ class FlintSparkPPLTopAndRareITSuite
val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl rare 3 address by age field query test") {
val frame = sql(s"""
| source = $testTable| rare 3 address by age
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

val expectedRow = Row(1, "Vancouver", 60)
assert(
results.head == expectedRow,
s"Expected least frequent result to be $expectedRow, but got ${results.head}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val addressField = UnresolvedAttribute("address")
val ageField = UnresolvedAttribute("age")
val ageAlias = Alias(ageField, "age")()

val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))

val countExpr = Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")()

val aggregateExpressions = Seq(countExpr, addressField, ageAlias)
val aggregatePlan =
Aggregate(
Seq(addressField, ageAlias),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")(),
Ascending)),
global = true,
aggregatePlan)

val planWithLimit =
GlobalLimit(Literal(3), LocalLimit(Literal(3), sortedPlan))
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl rare 3 address by age field query test with approximation") {
val frame = sql(s"""
| source = $testTable| rare_approx 3 address by age
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)


// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val addressField = UnresolvedAttribute("address")
val ageField = UnresolvedAttribute("age")
val ageAlias = Alias(ageField, "age")()

val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))

val countExpr = Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(addressField), isDistinct = false),
"count_address")()

val aggregateExpressions = Seq(countExpr, addressField, ageAlias)
val aggregatePlan =
Aggregate(
Seq(addressField, ageAlias),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(addressField), isDistinct = false),
"count_address")(),
Ascending)),
global = true,
aggregatePlan)

val planWithLimit =
GlobalLimit(Literal(3), LocalLimit(Literal(3), sortedPlan))
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl top address field query test") {
val frame = sql(s"""
Expand Down Expand Up @@ -178,6 +313,46 @@ class FlintSparkPPLTopAndRareITSuite
val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl top address field query test with approximation") {
val frame = sql(s"""
| source = $testTable| top_approx address
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)


// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val addressField = UnresolvedAttribute("address")
val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))

val aggregateExpressions = Seq(
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(addressField), isDistinct = false),
"count_address")(),
addressField)
val aggregatePlan =
Aggregate(
Seq(addressField),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(addressField), isDistinct = false),
"count_address")(),
Descending)),
global = true,
aggregatePlan)
val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl top 3 countries query test") {
val frame = sql(s"""
Expand Down Expand Up @@ -225,6 +400,45 @@ class FlintSparkPPLTopAndRareITSuite
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl top 3 countries query test with approximation") {
val frame = sql(s"""
| source = $newTestTable| top_approx 3 country
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val countryField = UnresolvedAttribute("country")
val countExpr = Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(countryField), isDistinct = false),
"count_country")()
val aggregateExpressions = Seq(countExpr, countryField)
val aggregatePlan =
Aggregate(
Seq(countryField),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test")))

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(countryField), isDistinct = false),
"count_country")(),
Descending)),
global = true,
aggregatePlan)

val planWithLimit =
GlobalLimit(Literal(3), LocalLimit(Literal(3), sortedPlan))
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl top 2 countries by occupation field query test") {
val frame = sql(s"""
Expand Down Expand Up @@ -277,4 +491,47 @@ class FlintSparkPPLTopAndRareITSuite
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)

}

test("create ppl top 2 countries by occupation field query test with approximation") {
val frame = sql(s"""
| source = $newTestTable| top_approx 3 country by occupation
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val countryField = UnresolvedAttribute("country")
val occupationField = UnresolvedAttribute("occupation")
val occupationFieldAlias = Alias(occupationField, "occupation")()

val countExpr = Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(countryField), isDistinct = false),
"count_country")()
val aggregateExpressions = Seq(countExpr, countryField, occupationFieldAlias)
val aggregatePlan =
Aggregate(
Seq(countryField, occupationFieldAlias),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test")))

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(countryField), isDistinct = false),
"count_country")(),
Descending)),
global = true,
aggregatePlan)

val planWithLimit =
GlobalLimit(Literal(3), LocalLimit(Literal(3), sortedPlan))
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)

}
}
Loading

0 comments on commit fd45d52

Please sign in to comment.