Skip to content

Commit

Permalink
Add PPL describe command (opensearch-project#541)
Browse files Browse the repository at this point in the history
update correlation comment as experimental

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored Aug 8, 2024
1 parent 773ad22 commit c2e4020
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 34 deletions.
5 changes: 5 additions & 0 deletions docs/PPL-Correlation-command.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## PPL Correlation Command

> This is an experimental command - it may be removed in future versions

## Overview

In the past year OpenSearch Observability & security teams have been busy with many aspects of improving data monitoring and visibility.
Expand Down Expand Up @@ -262,6 +265,8 @@ The new correlation command is actually a ‘hidden’ join command therefore th

Catalyst engine will optimize this query according to the most efficient join ordering.

> This is an experimental command - it may be removed in future versions
* * *

## Appendix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLBasicITSuite
Expand Down Expand Up @@ -36,6 +38,45 @@ class FlintSparkPPLBasicITSuite
}
}

test("describe table query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
describe flint_ppl_test
""".stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("name", "string", null),
Row("age", "int", null),
Row("state", "string", null),
Row("country", "string", null),
Row("year", "int", null),
Row("month", "int", null),
Row("# Partition Information", "", ""),
Row("# col_name", "data_type", "comment"),
Row("year", "int", null),
Row("month", "int", null))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
// Retrieve the logical plan
val logicalPlan: LogicalPlan =
frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan
// Define the expected logical plan
val expectedPlan: LogicalPlan =
DescribeTableCommand(
TableIdentifier("flint_ppl_test"),
Map.empty[String, String],
isExtended = false,
output = DescribeRelation.getOutputAttrs)
// Compare the two plans
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}
}

test("create ppl simple query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
Expand Down Expand Up @@ -208,7 +249,7 @@ class FlintSparkPPLBasicITSuite
val sortedPlan: LogicalPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("age"), Ascending)), global = true, limitPlan)

val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan);
val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan)
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}
Expand Down
34 changes: 9 additions & 25 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ Next tasks ahead will resolve this:

This section describes the next steps planned for enabling additional commands and gamer translation.

#### Supported
The next samples of PPL queries are currently supported:
#### Example PPL Queries
See the next samples of PPL queries :

**Fields**
- `source = table`
Expand Down Expand Up @@ -272,31 +272,15 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- `source = table | stats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date`
- `source = table | stats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId`

> For additional details, review [FlintSparkPPLTimeWindowITSuite.scala](../integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala)
#### Supported Commands:
- `search` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/search.rst)
- `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst)
- `fields` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/fields.rst)
- `eval` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/eval.rst)
- `head` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/head.rst)
- `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst) (supports AVG, COUNT, DISTINCT_COUNT, MAX, MIN and SUM aggregation functions)
- `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst)
- `correlation` - [See details](../docs/PPL-Correlation-command.md)

> For additional details, review [Integration Tests](../integ-test/src/test/scala/org/opensearch/flint/spark/)
---

#### Planned Support
For additional details on PPL commands - view [PPL Commands Docs](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst)

- support the `explain` command to return the explained PPL query logical plan and expected execution plan
For additional details on Spark PPL commands project, see [PPL Project](https://github.com/orgs/opensearch-project/projects/214/views/2)
For additional details on Spark PPL commands support campaign, see [PPL Commands Campaign](https://github.com/opensearch-project/opensearch-spark/issues/408)

#### Experimental Commands:
- `correlation` - [See details](../docs/PPL-Correlation-command.md)

- attend [sort](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst) partially supported, missing capability to sort by alias field (span like or aggregation)
- attend `alias` - partially supported, missing capability to sort by / group-by alias field name
> This is an experimental command - it may be removed in future versions
- add [conditions](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/condition.rst) support
- add [top](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/top.rst) support
- add [cast](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/conversion.rst) support
- add [math](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/math.rst) support
- add [deduplicate](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/dedup.rst) support
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ queryStatement
// commands
pplCommands
: searchCommand
| describeCommand
;

commands
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import org.opensearch.sql.ast.expression.UnresolvedExpression;

/**
* Extend Relation to describe the table itself
*/
public class DescribeRelation extends Relation{
public DescribeRelation(UnresolvedExpression tableName) {
super(tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ppl;

import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
Expand All @@ -13,8 +14,10 @@
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -46,6 +49,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand All @@ -59,6 +63,7 @@
import org.opensearch.sql.ppl.utils.ComparatorTransformer;
import org.opensearch.sql.ppl.utils.SortUtils;
import scala.Option;
import scala.Option$;
import scala.collection.Seq;

import java.util.ArrayList;
Expand Down Expand Up @@ -107,6 +112,26 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) {

@Override
public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) {
if (node instanceof DescribeRelation) {
TableIdentifier identifier;
if (node.getTableQualifiedName().getParts().size() == 1) {
identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0));
} else if (node.getTableQualifiedName().getParts().size() == 2) {
identifier = new TableIdentifier(
node.getTableQualifiedName().getParts().get(1),
Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0)));
} else {
throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName()
+ " Syntax: [ database_name. ] table_name");
}
return context.with(
new DescribeTableCommand(
identifier,
scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
false,
DescribeRelation$.MODULE$.getOutputAttrs()));
}
//regular sql algebraic relations
node.getTableName().forEach(t ->
// Resolving the qualifiedName which is composed of a datasource.schema.table
context.with(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand Down Expand Up @@ -97,7 +98,7 @@ public UnresolvedPlan visitDescribeCommand(OpenSearchPPLParser.DescribeCommandCo
final Relation table = (Relation) visitTableSourceClause(ctx.tableSourceClause());
QualifiedName tableQualifiedName = table.getTableQualifiedName();
ArrayList<String> parts = new ArrayList<>(tableQualifiedName.getParts());
return new Relation(new QualifiedName(parts));
return new DescribeRelation(new QualifiedName(parts));
}

/** Where command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.UnresolvedPlan;

Expand Down Expand Up @@ -78,11 +79,13 @@ public Object build() {
}
}

private UnresolvedPlan addSelectAll(UnresolvedPlan plan) {
if ((plan instanceof Project) && !((Project) plan).isExcluded()) {
return plan;
} else {
return new Project(ImmutableList.of(AllFields.of())).attach(plan);
private UnresolvedPlan addSelectAll(UnresolvedPlan plan) {
if ((plan instanceof Project) && !((Project) plan).isExcluded()) {
return plan;
} else if (plan instanceof DescribeRelation) {
return plan;
} else {
return new Project(ImmutableList.of(AllFields.of())).attach(plan);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand

class PPLLogicalPlanBasicQueriesTranslatorTestSuite
extends SparkFunSuite
Expand All @@ -24,6 +26,45 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite
private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test error describe clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
// Intercept the exception and check the message
val thrown = intercept[IllegalArgumentException] {
planTransformer.visit(plan(pplParser, "describe t.b.c.d", false), context)
}

// Verify the exception message
assert(
thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name")
}

test("test simple describe clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe t", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("t"),
Map.empty[String, String],
isExtended = false,
output = DescribeRelation.getOutputAttrs)
comparePlans(expectedPlan, logPlan, false)
}

test("test FQN table describe table clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.t", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("t", Option("catalog")),
Map.empty[String, String].empty,
isExtended = false,
output = DescribeRelation.getOutputAttrs)
comparePlans(expectedPlan, logPlan, false)
}

test("test simple search with only one table and no explicit fields (defaults to all fields)") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
Expand Down

0 comments on commit c2e4020

Please sign in to comment.