Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sample parameter to top & rare command #879

Closed
4 changes: 3 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,15 @@ source = table | where ispresent(a) |

- `source=accounts | rare gender`
- `source=accounts | rare age by gender`
- `source=accounts | rare age by gender sample(50 percent)`

#### **Top**
[See additional command details](ppl-top-command.md)

- `source=accounts | top gender`
- `source=accounts | top 1 gender`
- `source=accounts | top 1 age by gender`
- `source=accounts | top 5 gender sample(50 percent)`
- `source=accounts | top 5 age by gender`

#### **Parse**
[See additional command details](ppl-parse-command.md)
Expand Down
10 changes: 9 additions & 1 deletion docs/ppl-lang/ppl-rare-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ Using ``rare`` command to find the least common tuple of values of all fields in
**Note**: A maximum of 10 results is returned for each distinct tuple of values of the group-by fields.

**Syntax**
`rare <field-list> [by-clause]`
`rare <field-list> [by-clause] [sample(? percent)]`

* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.
* sample: optional. allows reducing the amount of fields being scanned using table sample strategy favour velocity over precision


### Example 1: Find the least common values in a field
Expand Down Expand Up @@ -44,3 +45,10 @@ PPL query:
| M | 33 |
| M | 36 |
+----------+-------+

### Example 3: Find the least common values using 50 % sampling strategy

PPL query:

os> source=accounts | rare age sample(50 percent);
fetched rows / total rows = 2/4
12 changes: 11 additions & 1 deletion docs/ppl-lang/ppl-top-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ Using ``top`` command to find the most common tuple of values of all fields in t


### Syntax
`top [N] <field-list> [by-clause]`
`top [N] <field-list> [by-clause] [sample(? percent)]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.
* sample: optional. allows reducing the amount of fields being scanned using table sample strategy favour velocity over precision


### Example 1: Find the most common values in a field
Expand Down Expand Up @@ -56,3 +57,12 @@ PPL query:
| M | 32 |
+----------+-------+

## Example 2: Find the most common values organized by gender using sample strategy

The example finds most common age of all the accounts group by gender sample only 50 % of rows.

PPL query:

os> source=accounts | top 1 age by gender sample(50 percent);
fetched rows / total rows = 1/2

Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,52 @@ class FlintSparkPPLTopAndRareITSuite
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl rare address field query test sample 75 %") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly suggest to add some IT cases for top/rare with sample in complex join and subquery query.

val frame = sql(s"""
| source = $testTable| rare address sample(75 percent)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

val expectedRow = Row(1, "Vancouver")
assert(
results.head == expectedRow,
s"Expected least frequent result to be $expectedRow, but got ${results.head}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val addressField = UnresolvedAttribute("address")
val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))

val aggregateExpressions = Seq(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")(),
addressField)

val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(addressField),
aggregateExpressions,
Sample(0, 0.75, withReplacement = false, 0, table))
val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")(),
Ascending)),
global = true,
aggregatePlan)
val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl rare address by age field query test") {
val frame = sql(s"""
| source = $testTable| rare address by age
Expand Down Expand Up @@ -111,11 +157,58 @@ class FlintSparkPPLTopAndRareITSuite
"count_address")()

val aggregateExpressions = Seq(countExpr, addressField, ageAlias)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val aggregatePlan =
Aggregate(Seq(addressField, ageAlias), aggregateExpressions, table)

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")(),
Ascending)),
global = true,
aggregatePlan)

val expectedPlan = Project(projectList, sortedPlan)
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl rare address by age field query test sample 75 %") {
val frame = sql(s"""
| source = $testTable| rare address by age sample(75 percent)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 4)

val expectedRow = Row(1, "Vancouver", 60)
assert(
results.head == expectedRow,
s"Expected least frequent result to be $expectedRow, but got ${results.head}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val addressField = UnresolvedAttribute("address")
val ageField = UnresolvedAttribute("age")
val ageAlias = Alias(ageField, "age")()

val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))

val countExpr = Alias(
UnresolvedFunction(Seq("COUNT"), Seq(addressField), isDistinct = false),
"count_address")()

val aggregateExpressions = Seq(countExpr, addressField, ageAlias)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(addressField, ageAlias),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
Sample(0, 0.75, withReplacement = false, 0, table))

val sortedPlan: LogicalPlan =
Sort(
Expand Down Expand Up @@ -226,6 +319,46 @@ class FlintSparkPPLTopAndRareITSuite
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl top 3 countries query test sample 75 %") {
val frame = sql(s"""
| source = $newTestTable| top 3 country sample(75 percent)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val countryField = UnresolvedAttribute("country")
val countExpr = Alias(
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")()
val aggregateExpressions = Seq(countExpr, countryField)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(countryField),
aggregateExpressions,
Sample(0, 0.75, withReplacement = false, 0, table))

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")(),
Descending)),
global = true,
aggregatePlan)

val planWithLimit =
GlobalLimit(Literal(3), LocalLimit(Literal(3), sortedPlan))
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("create ppl top 2 countries by occupation field query test") {
val frame = sql(s"""
| source = $newTestTable| top 3 country by occupation
Expand Down Expand Up @@ -254,11 +387,53 @@ class FlintSparkPPLTopAndRareITSuite
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")()
val aggregateExpressions = Seq(countExpr, countryField, occupationFieldAlias)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test"))
val aggregatePlan =
Aggregate(Seq(countryField, occupationFieldAlias), aggregateExpressions, table)

val sortedPlan: LogicalPlan =
Sort(
Seq(
SortOrder(
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")(),
Descending)),
global = true,
aggregatePlan)

val planWithLimit =
GlobalLimit(Literal(3), LocalLimit(Literal(3), sortedPlan))
val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)

}

test("create ppl top 2 countries by occupation field query test sample 85 %") {
val frame = sql(s"""
| source = $newTestTable| top 3 country by occupation sample(85 percent)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 3)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val countryField = UnresolvedAttribute("country")
val occupationField = UnresolvedAttribute("occupation")
val occupationFieldAlias = Alias(occupationField, "occupation")()

val countExpr = Alias(
UnresolvedFunction(Seq("COUNT"), Seq(countryField), isDistinct = false),
"count_country")()
val aggregateExpressions = Seq(countExpr, countryField, occupationFieldAlias)
val table = UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test"))
val aggregatePlan =
Aggregate(
Seq(countryField, occupationFieldAlias),
aggregateExpressions,
UnresolvedRelation(Seq("spark_catalog", "default", "new_flint_ppl_test")))
Sample(0, 0.85, withReplacement = false, 0, table))

val sortedPlan: LogicalPlan =
Sort(
Expand Down
2 changes: 2 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ SORT: 'SORT';
EVAL: 'EVAL';
HEAD: 'HEAD';
TOP: 'TOP';
SAMPLE: 'SAMPLE';
RARE: 'RARE';
PARSE: 'PARSE';
METHOD: 'METHOD';
Expand Down Expand Up @@ -79,6 +80,7 @@ DESC: 'DESC';
DATASOURCES: 'DATASOURCES';
USING: 'USING';
WITH: 'WITH';
PERCENT: 'PERCENT';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SAMPLE and PERCENT should be added to keywordsCanBeId


// FIELD KEYWORDS
AUTO: 'AUTO';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,16 @@ headCommand
: HEAD (number = integerLiteral)? (FROM from = integerLiteral)?
;

sampleClause
: SAMPLE '(' (percentage = integerLiteral PERCENT ) ')'
;

topCommand
: TOP (number = integerLiteral)? fieldList (byClause)?
: TOP (number = integerLiteral)? fieldList (byClause)? (sampleClause)?
;

rareCommand
: RARE fieldList (byClause)?
: RARE fieldList (byClause)? (sampleClause)?
;

grokCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -16,6 +17,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Optional;

/** Logical plan node of Aggregation, the interface for building aggregation actions in queries. */
@Getter
Expand All @@ -29,7 +31,8 @@ public class Aggregation extends UnresolvedPlan {
private UnresolvedExpression span;
private List<Argument> argExprList;
private UnresolvedPlan child;

private Optional<TablesampleContext> sample = Optional.empty();

/** Aggregation Constructor without span and argument. */
public Aggregation(
List<UnresolvedExpression> aggExprList,
Expand Down Expand Up @@ -71,4 +74,14 @@ public List<UnresolvedPlan> getChild() {
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitAggregation(this, context);
}

@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@AllArgsConstructor
public static class TablesampleContext {
public int percentage;
}

}
Loading
Loading