Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ppl count approximate support #884

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ source = table | where ispresent(a) |
- `source = table | stats max(c) by b`
- `source = table | stats count(c) by b | head 5`
- `source = table | stats distinct_count(c)`
- `source = table | stats distinct_count_approx(c)`
- `source = table | stats stddev_samp(c)`
- `source = table | stats stddev_pop(c)`
- `source = table | stats percentile(c, 90)`
Expand All @@ -202,6 +203,7 @@ source = table | where ispresent(a) |
- `source = table | where a < 50 | eventstats avg(c) `
- `source = table | eventstats max(c) by b`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats stddev_samp(c)`
- `source = table | eventstats stddev_pop(c)`
- `source = table | eventstats percentile(c, 90)`
Expand Down Expand Up @@ -246,12 +248,15 @@ source = table | where ispresent(a) |

- `source=accounts | rare gender`
- `source=accounts | rare age by gender`
- `source=accounts | rare 5 age by gender`
- `source=accounts | rare_approx age by gender`

#### **Top**
[See additional command details](ppl-top-command.md)

- `source=accounts | top gender`
- `source=accounts | top 1 gender`
- `source=accounts | top_approx 5 gender`
- `source=accounts | top 1 age by gender`

#### **Parse**
Expand Down
9 changes: 7 additions & 2 deletions docs/ppl-lang/ppl-rare-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ Using ``rare`` command to find the least common tuple of values of all fields in
**Note**: A maximum of 10 results is returned for each distinct tuple of values of the group-by fields.

**Syntax**
`rare <field-list> [by-clause]`
`rare [N] <field-list> [by-clause]`
`rare_approx [N] <field-list> [by-clause]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.
* top_approx: approximate the count by using estimated [cardinality by HyperLogLog++ algorithm](https://spark.apache.org/docs/3.5.2/sql-ref-functions-builtin.html).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be rare_approx here?



### Example 1: Find the least common values in a field
Expand All @@ -19,6 +22,7 @@ The example finds least common gender of all the accounts.
PPL query:

os> source=accounts | rare gender;
os> source=accounts | rare_approx gender;
fetched rows / total rows = 2/2
+----------+
| gender |
Expand All @@ -34,7 +38,8 @@ The example finds least common age of all the accounts group by gender.

PPL query:

os> source=accounts | rare age by gender;
os> source=accounts | rare 5 age by gender;
os> source=accounts | rare_approx 5 age by gender;
fetched rows / total rows = 4/4
+----------+-------+
| gender | age |
Expand Down
7 changes: 5 additions & 2 deletions docs/ppl-lang/ppl-top-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ Using ``top`` command to find the most common tuple of values of all fields in t

### Syntax
`top [N] <field-list> [by-clause]`
`top_approx [N] <field-list> [by-clause]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.

* top_approx: approximate the count by using estimated [cardinality by HyperLogLog++ algorithm](https://spark.apache.org/docs/3.5.2/sql-ref-functions-builtin.html).

### Example 1: Find the most common values in a field

Expand All @@ -19,6 +20,7 @@ The example finds most common gender of all the accounts.
PPL query:

os> source=accounts | top gender;
os> source=accounts_approx | top gender;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we add this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typing error - thanks !

fetched rows / total rows = 2/2
+----------+
| gender |
Expand All @@ -33,7 +35,7 @@ The example finds most common gender of all the accounts.

PPL query:

os> source=accounts | top 1 gender;
os> source=accounts_approx | top 1 gender;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

fetched rows / total rows = 1/1
+----------+
| gender |
Expand All @@ -48,6 +50,7 @@ The example finds most common age of all the accounts group by gender.
PPL query:

os> source=accounts | top 1 age by gender;
os> source=accounts | top_approx 1 age by gender;
fetched rows / total rows = 2/2
+----------+-------+
| gender | age |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,43 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Compare the two plans
comparePlans(expectedPlan, logicalPlan, false)
}

test(
"create ppl simple distinct count age by span of interval of 10 years query with state filter test using approximation") {
val frame = sql(s"""
| source = $testTable | where state != 'Quebec' | stats distinct_count_approx(age) by span(age, 10) as age_span
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(Row(1, 70L), Row(1, 30L), Row(1, 20L))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Long](_.getAs[Long](1))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val stateField = UnresolvedAttribute("state")
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(ageField), isDistinct = true),
"distinct_count_approx(age)")()
val span = Alias(
Multiply(Floor(Divide(UnresolvedAttribute("age"), Literal(10))), Literal(10)),
"age_span")()
val filterExpr = Not(EqualTo(stateField, Literal("Quebec")))
val filterPlan = Filter(filterExpr, table)
val aggregatePlan = Aggregate(Seq(span), Seq(aggregateExpressions, span), filterPlan)
val expectedPlan = Project(star, aggregatePlan)

// Compare the two plans
comparePlans(expectedPlan, logicalPlan, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,43 @@ class FlintSparkPPLAggregationsITSuite
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl simple country distinct_count using approximation ") {
val frame = sql(s"""
| source = $testTable| stats distinct_count_approx(country)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()

// Define the expected results
val expectedResults: Array[Row] = Array(Row(2L))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](1))
assert(
results.sorted.sameElements(expectedResults.sorted),
s"Expected: ${expectedResults.mkString(", ")}, but got: ${results.mkString(", ")}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(countryField), isDistinct = true),
"distinct_count_approx(country)")()

val aggregatePlan =
Aggregate(Seq.empty, Seq(aggregateExpressions), table)
val expectedPlan = Project(star, aggregatePlan)

// Compare the two plans
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl simple age distinct_count group by country query test with sort") {
val frame = sql(s"""
| source = $testTable | stats distinct_count(age) by country | sort country
Expand Down Expand Up @@ -881,6 +918,53 @@ class FlintSparkPPLAggregationsITSuite
s"Expected plan: ${compareByString(expectedPlan)}, but got: ${compareByString(logicalPlan)}")
}

test(
"create ppl simple age distinct_count group by country query test with sort using approximation") {
val frame = sql(s"""
| source = $testTable | stats distinct_count_approx(age) by country | sort country
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(Row(2L, "Canada"), Row(2L, "USA"))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](1))
assert(
results.sorted.sameElements(expectedResults.sorted),
s"Expected: ${expectedResults.mkString(", ")}, but got: ${results.mkString(", ")}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(ageField), isDistinct = true),
"distinct_count_approx(age)")()
val productAlias = Alias(countryField, "country")()

val aggregatePlan =
Aggregate(groupByAttributes, Seq(aggregateExpressions, productAlias), table)
val sortedPlan: LogicalPlan =
Sort(
Seq(SortOrder(UnresolvedAttribute("country"), Ascending)),
global = true,
aggregatePlan)
val expectedPlan = Project(star, sortedPlan)

// Compare the two plans
assert(
compareByString(expectedPlan) === compareByString(logicalPlan),
s"Expected plan: ${compareByString(expectedPlan)}, but got: ${compareByString(logicalPlan)}")
}

test("create ppl simple age distinct_count group by country with state filter query test") {
val frame = sql(s"""
| source = $testTable | where state != 'Ontario' | stats distinct_count(age) by country
Expand Down Expand Up @@ -920,6 +1004,46 @@ class FlintSparkPPLAggregationsITSuite
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test(
"create ppl simple age distinct_count group by country with state filter query test using approximation") {
val frame = sql(s"""
| source = $testTable | where state != 'Ontario' | stats distinct_count_approx(age) by country
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(Row(1L, "Canada"), Row(2L, "USA"))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](1))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val stateField = UnresolvedAttribute("state")
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val filterExpr = Not(EqualTo(stateField, Literal("Ontario")))
val filterPlan = Filter(filterExpr, table)
val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(ageField), isDistinct = true),
"distinct_count_approx(age)")()
val productAlias = Alias(countryField, "country")()
val aggregatePlan =
Aggregate(groupByAttributes, Seq(aggregateExpressions, productAlias), filterPlan)
val expectedPlan = Project(star, aggregatePlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("two-level stats") {
val frame = sql(s"""
| source = $testTable| stats avg(age) as avg_age by state, country | stats avg(avg_age) as avg_state_age by country
Expand Down
Loading
Loading