Skip to content

Commit

Permalink
update syntax from project to view
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Dec 13, 2024
1 parent c9c5b14 commit bfbb555
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 97 deletions.
10 changes: 5 additions & 5 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,18 @@ source = table | where ispresent(a) |
- `source=accounts | parse email '.+@(?<host>.+)' | where age > 45 | sort - age | fields age, email, host`
- `source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street`

#### **Project**
[See additional command details](ppl-project-command.md)
#### **view**
[See additional command details](ppl-view-command.md)

```sql
project newTableName using csv | source = table | where fieldA > value | stats count(fieldA) by fieldB
view newTableName using csv | source = table | where fieldA > value | stats count(fieldA) by fieldB

project ageDistribByCountry using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false') partitioned by (age, country) |
view ageDistribByCountry using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false') partitioned by (age, country) |
source = table | stats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | stats
avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | stats avg(avg_state_age) as
avg_adult_country_age by country

project ageDistribByCountry using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false') partitioned by (age, country) location 's://demo-app/my-bucket'|
view ageDistribByCountry using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false') partitioned by (age, country) location 's://demo-app/my-bucket'|
source = table | stats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | stats
avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | stats avg(avg_state_age) as
avg_adult_country_age by country
Expand Down
2 changes: 1 addition & 1 deletion docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).

- [`parse command`](ppl-parse-command.md)
-
- [`project command`](ppl-project-command.md)
- [`view command`](ppl-view-command.md)

- [`patterns command`](ppl-patterns-command.md)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
## PPL `project` command
## PPL `view` command

### Description
Using `project` command to materialize a query into a dedicated view:
In some cases it is required to construct a projection view (materialized into a view) of the query results.
This projection can be later used as a source of continued queries for further slicing and dicing the data, in addition such tables can be also saved into a MV table that are pushed into OpenSearch and can be used for visualization and enhanced performant queries.
Using `view` command to materialize a query into a dedicated view:
In some cases it is required to construct a view (materialized into a view) of the query results.
This view can be later used as a source of continued queries for further slicing and dicing the data, in addition such tables can be also saved into a MV table that are pushed into OpenSearch and can be used for visualization and enhanced performant queries.

The command can also function as an ETL process where the original datasource will be transformed and ingested into the output projected view using the ppl transformation and aggregation operators
The command can also function as an ETL process where the original datasource will be transformed and ingested into the output view using the ppl transformation and aggregation operators

**### Syntax
`PROJECT (IF NOT EXISTS)? viewName (USING datasource)? (OPTIONS optionsList)? (PARTITIONED BY partitionColumnNames)? location?`
`VIEW (IF NOT EXISTS)? viewName (USING datasource)? (OPTIONS optionsList)? (PARTITIONED BY partitionColumnNames)? location?`

- **viewName**
Specifies a view name, which may be optionally qualified with a database name.
Expand All @@ -29,38 +29,38 @@ Specifies the physical location where the view or table data is stored. This cou
The outcome view (viewName) is populated using the data from the select statement.

### Usage Guidelines
The project command produces a view based on the resulting rows returned from the query.
The view command produces a view based on the resulting rows returned from the query.
Any query can be used in the `AS <query>` statement and attention must be used to the volume and compute that may incur due to such queries.

As a precautions an `explain cost | source = table | ... ` can be run prior to the `project` statement to have a better estimation.
As a precautions an `explain cost | source = table | ... ` can be run prior to the `view` statement to have a better estimation.

### Examples:
```sql
project newTableName using csv | source = table | where fieldA > value | stats count(fieldA) by fieldB
view newTableName using csv | source = table | where fieldA > value | stats count(fieldA) by fieldB

project ipRanges using parquet | source = table | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange
view ipRanges using parquet | source = table | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange

project avgBridgesByCountry using json | source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country
view avgBridgesByCountry using json | source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country

project ageDistribByCountry using parquet partitioned by (age, country) |
view ageDistribByCountry using parquet partitioned by (age, country) |
source = table | stats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | stats
avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | stats avg(avg_state_age) as
avg_adult_country_age by country

project ageDistribByCountry using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false') partitioned by (age, country) |
view ageDistribByCountry using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false') partitioned by (age, country) |
source = table | stats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | stats
avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | stats avg(avg_state_age) as
avg_adult_country_age by country

project ageDistribByCountry using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false') partitioned by (age, country) location 's://demo-app/my-bucket'|
view ageDistribByCountry using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false') partitioned by (age, country) location 's://demo-app/my-bucket'|
source = table | stats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | stats
avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | stats avg(avg_state_age) as
avg_adult_country_age by country

```

### Effective SQL push-down query
The project command is translated into an equivalent SQL `create table <viewName> [Using <datasuorce>] As <statement>` as shown here:
The view command is translated into an equivalent SQL `create table <viewName> [Using <datasuorce>] As <statement>` as shown here:

```sql
CREATE TABLE [ IF NOT EXISTS ] table_identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.spark.sql.execution.ExplainMode
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLProjectStatementITSuite
class FlintSparkPPLViewStatementITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
Expand Down Expand Up @@ -78,7 +78,7 @@ class FlintSparkPPLProjectStatementITSuite
}
}

ignore("project sql test using csv") {
ignore("view sql test using csv") {
val viewLocation = viewFolderLocation.toAbsolutePath.toString
val frame = sql(s"""
| CREATE TABLE student_partition_bucket
Expand Down Expand Up @@ -107,9 +107,9 @@ class FlintSparkPPLProjectStatementITSuite
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("project using csv") {
test("view using csv") {
val frame = sql(s"""
| project $viewName using csv | source = $testTable | where state != 'California' | fields name
| view $viewName using csv | source = $testTable | where state != 'California' | fields name
| """.stripMargin)

// Retrieve the logical plan
Expand Down Expand Up @@ -168,9 +168,9 @@ class FlintSparkPPLProjectStatementITSuite
"Partitioning does not contain ay FieldReferences")
}

test("project using csv partition by age") {
test("view using csv partition by age") {
val frame = sql(s"""
| project $viewName using csv partitioned by (age) | source = $testTable | where state != 'California' | fields name, age
| view $viewName using csv partitioned by (age) | source = $testTable | where state != 'California' | fields name, age
| """.stripMargin)

// Retrieve the logical plan
Expand Down Expand Up @@ -237,9 +237,9 @@ class FlintSparkPPLProjectStatementITSuite
"Partitioning does not contain a FieldReferences: 'age'")
}

test("project using csv partition by state and country") {
test("view using csv partition by state and country") {
val frame = sql(s"""
|project $viewName using csv partitioned by (state, country) | source = $testTable | dedup name | fields name, state, country
|view $viewName using csv partitioned by (state, country) | source = $testTable | dedup name | fields name, state, country
| """.stripMargin)

frame.collect()
Expand Down Expand Up @@ -347,9 +347,9 @@ class FlintSparkPPLProjectStatementITSuite
"Partitioning does not contain a FieldReferences: 'name'")
}

test("project using parquet partition by state & country") {
test("view using parquet partition by state & country") {
val frame = sql(s"""
|project $viewName using parquet partitioned by (state, country) | source = $testTable | dedup name | fields name, state, country
|view $viewName using parquet partitioned by (state, country) | source = $testTable | dedup name | fields name, state, country
| """.stripMargin)

frame.collect()
Expand Down Expand Up @@ -457,9 +457,9 @@ class FlintSparkPPLProjectStatementITSuite
"Partitioning does not contain a FieldReferences: 'name'")
}

test("project using parquet with options & partition by state & country") {
test("view using parquet with options & partition by state & country") {
val frame = sql(s"""
| project $viewName using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false')
| view $viewName using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false')
| partitioned by (state, country) | source = $testTable | dedup name | fields name, state, country
| """.stripMargin)

Expand Down Expand Up @@ -571,10 +571,10 @@ class FlintSparkPPLProjectStatementITSuite
"Partitioning does not contain a FieldReferences: 'name'")
}

test("project using parquet with options & location with partition by state & country") {
test("view using parquet with options & location with partition by state & country") {
val viewLocation = viewFolderLocation.toAbsolutePath.toString
val frame = sql(s"""
| project $viewName using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false')
| view $viewName using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false')
| partitioned by (state, country) location '$viewLocation' | source = $testTable | dedup name | fields name, state, country
| """.stripMargin)

Expand Down Expand Up @@ -688,7 +688,7 @@ class FlintSparkPPLProjectStatementITSuite
test("test inner join with relation subquery") {
val viewLocation = viewFolderLocation.toAbsolutePath.toString
val frame = sql(s"""
| project $viewName using parquet OPTIONS('parquet.bloom.filter.enabled'='true')
| view $viewName using parquet OPTIONS('parquet.bloom.filter.enabled'='true')
| partitioned by (age_span) location '$viewLocation'
| | source = $testTable1
| | where country = 'USA' OR country = 'England'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ TOP: 'TOP';
RARE_APPROX: 'RARE_APPROX';
RARE: 'RARE';
PARSE: 'PARSE';
PROJECT: 'PROJECT';
VIEW: 'VIEW';
METHOD: 'METHOD';
REGEX: 'REGEX';
PUNCT: 'PUNCT';
Expand Down
6 changes: 3 additions & 3 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pplStatement
;

dmlStatement
: (explainCommand PIPE | projectCommand PIPE)? queryStatement
: (explainCommand PIPE | viewCommand PIPE)? queryStatement
;

queryStatement
Expand Down Expand Up @@ -194,8 +194,8 @@ grokCommand
: GROK (source_field = expression) (pattern = stringLiteral)
;

projectCommand
: PROJECT (IF NOT EXISTS)? tableQualifiedName (USING datasourceValues)? (OPTIONS options=tablePropertyList)? (PARTITIONED BY partitionColumnNames=identifierSeq)? locationSpec?
viewCommand
: VIEW (IF NOT EXISTS)? tableQualifiedName (USING datasourceValues)? (OPTIONS options=tablePropertyList)? (PARTITIONED BY partitionColumnNames=identifierSeq)? locationSpec?
;

locationSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.FieldList;
import org.opensearch.sql.ast.expression.LambdaFunction;
import org.opensearch.sql.ast.statement.ProjectStatement;
import org.opensearch.sql.ast.statement.ViewStatement;
import org.opensearch.sql.ast.tree.FieldSummary;
import org.opensearch.sql.ast.expression.FieldsMapping;
import org.opensearch.sql.ast.expression.Function;
Expand Down Expand Up @@ -56,7 +56,7 @@
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.View;
import org.opensearch.sql.ast.tree.RareTopN;
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.Rename;
Expand Down Expand Up @@ -138,11 +138,11 @@ public T visitSubqueryAlias(SubqueryAlias node, C context) {
return visitChildren(node, context);
}

public T visitProject(Project node, C context) {
public T visitView(View node, C context) {
return visitChildren(node, context);
}

public T visitProjectStatement(ProjectStatement node, C context) {
public T visitProjectStatement(ViewStatement node, C context) {
return visitChildren(node, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class ProjectStatement extends Statement implements TableIdentifier {
public class ViewStatement extends Statement implements TableIdentifier {

private final Statement statement;
private final boolean override;
Expand All @@ -36,9 +36,9 @@ public class ProjectStatement extends Statement implements TableIdentifier {
private final Optional<UnresolvedExpression> partitionColumns;
private final Optional<UnresolvedExpression> location;

public ProjectStatement(List<UnresolvedExpression> tableNames, Optional<UnresolvedExpression> using,
Optional<UnresolvedExpression> options, Optional<UnresolvedExpression> partitionColumns,
Optional<UnresolvedExpression> location, Query statement, boolean override) {
public ViewStatement(List<UnresolvedExpression> tableNames, Optional<UnresolvedExpression> using,
Optional<UnresolvedExpression> options, Optional<UnresolvedExpression> partitionColumns,
Optional<UnresolvedExpression> location, Query statement, boolean override) {
this.tableNames = tableNames;
this.using = using.map(p->DataSourceType.valueOf(p.toString()));
this.options = options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
@ToString
@Getter
@EqualsAndHashCode(callSuper = false)
public class Project extends UnresolvedPlan {
@Setter private List<UnresolvedExpression> projectList;
public class View extends UnresolvedPlan {
@Setter private List<UnresolvedExpression> viewList;
private List<Argument> argExprList;
private UnresolvedPlan child;

public Project(List<UnresolvedExpression> projectList) {
this.projectList = projectList;
public View(List<UnresolvedExpression> viewList) {
this.viewList = viewList;
this.argExprList = Collections.emptyList();
}

public Project(List<UnresolvedExpression> projectList, List<Argument> argExprList) {
this.projectList = projectList;
public View(List<UnresolvedExpression> viewList, List<Argument> argExprList) {
this.viewList = viewList;
this.argExprList = argExprList;
}

Expand All @@ -50,7 +50,7 @@ public boolean isExcluded() {
}

@Override
public Project attach(UnresolvedPlan child) {
public View attach(UnresolvedPlan child) {
this.child = child;
return this;
}
Expand All @@ -63,6 +63,6 @@ public List<UnresolvedPlan> getChild() {
@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {

return nodeVisitor.visitProject(this, context);
return nodeVisitor.visitView(this, context);
}
}
Loading

0 comments on commit bfbb555

Please sign in to comment.