Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PPL describe command #541

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/PPL-Correlation-command.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## PPL Correlation Command

> This is an experimental command - it may be removed in future versions


## Overview

In the past year OpenSearch Observability & security teams have been busy with many aspects of improving data monitoring and visibility.
Expand Down Expand Up @@ -262,6 +265,8 @@ The new correlation command is actually a ‘hidden’ join command therefore th

Catalyst engine will optimize this query according to the most efficient join ordering.

> This is an experimental command - it may be removed in future versions

* * *

## Appendix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLBasicITSuite
Expand Down Expand Up @@ -36,6 +38,45 @@ class FlintSparkPPLBasicITSuite
}
}

test("describe table query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
describe flint_ppl_test
""".stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("name", "string", null),
Row("age", "int", null),
Row("state", "string", null),
Row("country", "string", null),
Row("year", "int", null),
Row("month", "int", null),
Row("# Partition Information", "", ""),
Row("# col_name", "data_type", "comment"),
Row("year", "int", null),
Row("month", "int", null))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
// Retrieve the logical plan
val logicalPlan: LogicalPlan =
frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan
// Define the expected logical plan
val expectedPlan: LogicalPlan =
DescribeTableCommand(
TableIdentifier("flint_ppl_test"),
Map.empty[String, String],
isExtended = false,
output = DescribeRelation.getOutputAttrs)
// Compare the two plans
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}
}

test("create ppl simple query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
Expand Down Expand Up @@ -208,7 +249,7 @@ class FlintSparkPPLBasicITSuite
val sortedPlan: LogicalPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("age"), Ascending)), global = true, limitPlan)

val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan);
val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan)
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}
Expand Down
34 changes: 9 additions & 25 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ Next tasks ahead will resolve this:

This section describes the next steps planned for enabling additional commands and gamer translation.

#### Supported
The next samples of PPL queries are currently supported:
#### Example PPL Queries
See the next samples of PPL queries :

**Fields**
- `source = table`
Expand Down Expand Up @@ -272,31 +272,15 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- `source = table | stats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date`
- `source = table | stats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId`

> For additional details, review [FlintSparkPPLTimeWindowITSuite.scala](../integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala)

#### Supported Commands:
- `search` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/search.rst)
- `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst)
- `fields` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/fields.rst)
- `eval` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/eval.rst)
- `head` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/head.rst)
- `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst) (supports AVG, COUNT, DISTINCT_COUNT, MAX, MIN and SUM aggregation functions)
- `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst)
- `correlation` - [See details](../docs/PPL-Correlation-command.md)

> For additional details, review [Integration Tests](../integ-test/src/test/scala/org/opensearch/flint/spark/)

---

#### Planned Support
For additional details on PPL commands - view [PPL Commands Docs](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst)

- support the `explain` command to return the explained PPL query logical plan and expected execution plan
For additional details on Spark PPL commands project, see [PPL Project](https://github.com/orgs/opensearch-project/projects/214/views/2)
For additional details on Spark PPL commands support campaign, see [PPL Commands Campaign](https://github.com/opensearch-project/opensearch-spark/issues/408)

#### Experimental Commands:
- `correlation` - [See details](../docs/PPL-Correlation-command.md)

- attend [sort](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst) partially supported, missing capability to sort by alias field (span like or aggregation)
- attend `alias` - partially supported, missing capability to sort by / group-by alias field name
> This is an experimental command - it may be removed in future versions

- add [conditions](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/condition.rst) support
- add [top](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/top.rst) support
- add [cast](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/conversion.rst) support
- add [math](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/math.rst) support
- add [deduplicate](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/dedup.rst) support
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ queryStatement
// commands
pplCommands
: searchCommand
| describeCommand
;

commands
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import org.opensearch.sql.ast.expression.UnresolvedExpression;

/**
* Extend Relation to describe the table itself
*/
public class DescribeRelation extends Relation{
public DescribeRelation(UnresolvedExpression tableName) {
super(tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ppl;

import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
Expand All @@ -13,8 +14,10 @@
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -46,6 +49,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand All @@ -59,6 +63,7 @@
import org.opensearch.sql.ppl.utils.ComparatorTransformer;
import org.opensearch.sql.ppl.utils.SortUtils;
import scala.Option;
import scala.Option$;
import scala.collection.Seq;

import java.util.ArrayList;
Expand Down Expand Up @@ -107,6 +112,26 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) {

@Override
public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) {
if (node instanceof DescribeRelation) {
TableIdentifier identifier;
if (node.getTableQualifiedName().getParts().size() == 1) {
identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0));
} else if (node.getTableQualifiedName().getParts().size() == 2) {
identifier = new TableIdentifier(
node.getTableQualifiedName().getParts().get(1),
Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0)));
} else {
throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName()
+ " Syntax: [ database_name. ] table_name");
}
return context.with(
new DescribeTableCommand(
identifier,
scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
false,
DescribeRelation$.MODULE$.getOutputAttrs()));
}
//regular sql algebraic relations
node.getTableName().forEach(t ->
// Resolving the qualifiedName which is composed of a datasource.schema.table
context.with(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand Down Expand Up @@ -97,7 +98,7 @@ public UnresolvedPlan visitDescribeCommand(OpenSearchPPLParser.DescribeCommandCo
final Relation table = (Relation) visitTableSourceClause(ctx.tableSourceClause());
QualifiedName tableQualifiedName = table.getTableQualifiedName();
ArrayList<String> parts = new ArrayList<>(tableQualifiedName.getParts());
return new Relation(new QualifiedName(parts));
return new DescribeRelation(new QualifiedName(parts));
}

/** Where command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.UnresolvedPlan;

Expand Down Expand Up @@ -78,11 +79,13 @@ public Object build() {
}
}

private UnresolvedPlan addSelectAll(UnresolvedPlan plan) {
if ((plan instanceof Project) && !((Project) plan).isExcluded()) {
return plan;
} else {
return new Project(ImmutableList.of(AllFields.of())).attach(plan);
private UnresolvedPlan addSelectAll(UnresolvedPlan plan) {
if ((plan instanceof Project) && !((Project) plan).isExcluded()) {
return plan;
} else if (plan instanceof DescribeRelation) {
return plan;
} else {
return new Project(ImmutableList.of(AllFields.of())).attach(plan);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand

class PPLLogicalPlanBasicQueriesTranslatorTestSuite
extends SparkFunSuite
Expand All @@ -24,6 +26,45 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite
private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test error describe clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
// Intercept the exception and check the message
val thrown = intercept[IllegalArgumentException] {
planTransformer.visit(plan(pplParser, "describe t.b.c.d", false), context)
}

// Verify the exception message
assert(
thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name")
}

test("test simple describe clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe t", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("t"),
Map.empty[String, String],
isExtended = false,
output = DescribeRelation.getOutputAttrs)
comparePlans(expectedPlan, logPlan, false)
}

test("test FQN table describe table clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.t", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("t", Option("catalog")),
Map.empty[String, String].empty,
isExtended = false,
output = DescribeRelation.getOutputAttrs)
comparePlans(expectedPlan, logPlan, false)
}

test("test simple search with only one table and no explicit fields (defaults to all fields)") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
Expand Down
Loading