Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Describe show commands #533

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLBasicITSuite
Expand Down Expand Up @@ -36,7 +38,42 @@ class FlintSparkPPLBasicITSuite
}
}

test("create ppl simple query test") {
test("describe table query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
describe flint_ppl_test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we miss testing the table_identifier with quoted.

Seq("flint_ppl_test", "`flint_ppl_test`").foreach { table =>
  val frame = sql(s"""
     describe $table

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes probably need to add some more fail based use cases...

""".stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("name", "string", null),
Row("age", "int", null),
Row("state", "string", null),
Row("country", "string", null),
Row("year", "int", null),
Row("month", "int", null),
Row("# Partition Information", "", ""),
Row("# col_name", "data_type", "comment"),
Row("year", "int", null),
Row("month", "int", null)
)
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan
// Define the expected logical plan
val expectedPlan: LogicalPlan =
DescribeTableCommand(TableIdentifier("default.flint_ppl_test"), null, isExtended = false, output = Seq())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YANG-DB to make this IT test successful.
The expectedPlan should change as following either.

      val expectedPlan: LogicalPlan =
        DescribeTableCommand(
          TableIdentifier("flint_ppl_test"),
          Map.empty[String, String],
          isExtended = false,
          output = DescribeRelation.getOutputAttrs)

// Compare the two plans
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}
}

test("create ppl simple query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
Expand Down Expand Up @@ -208,7 +245,7 @@ class FlintSparkPPLBasicITSuite
val sortedPlan: LogicalPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("age"), Ascending)), global = true, limitPlan)

val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan);
val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan)
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ queryStatement
// commands
pplCommands
: searchCommand
| describeCommand
;

commands
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import org.opensearch.sql.ast.expression.UnresolvedExpression;

/**
* Extend Relation to describe the table itself
*/
public class DescribeRelation extends Relation{
public DescribeRelation(UnresolvedExpression tableName) {
super(tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ppl;

import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
Expand All @@ -13,8 +14,10 @@
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -46,6 +49,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand Down Expand Up @@ -107,6 +111,15 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) {

@Override
public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) {
if (node instanceof DescribeRelation) {
return context.with(
new DescribeTableCommand(
new TableIdentifier(node.getTableQualifiedName().toString()),
Copy link
Member

@LantaoJin LantaoJin Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the spark sql reference. The DESCRIBE command syntax is DESCRIBE [ database_name. ] table_name.
So to support database prefix, here need additional modification:

            TableIdentifier identifier;
            if (node.getTableQualifiedName().getParts().size() == 1) {
                identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0));
            } else if (node.getTableQualifiedName().getParts().size() == 2) {
                identifier = new TableIdentifier(
                    node.getTableQualifiedName().getParts().get(1),
                    Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0)));
            } else {
                throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName()
                    + " Syntax: [ database_name. ] table_name");
            }
            return context.with(
                new DescribeTableCommand(
                    identifier,
                    scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
                    false,
                    DescribeRelation$.MODULE$.getOutputAttrs()));

Copy link
Member

@LantaoJin LantaoJin Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can test on Seq("flint_ppl_test", "`flint_ppl_test`", "default.flint_ppl_test", "`default`.`flint_ppl_test`")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin thanks again

scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
false,
DescribeRelation$.MODULE$.getOutputAttrs()));
}
//regular sql algebraic relations
node.getTableName().forEach(t ->
// Resolving the qualifiedName which is composed of a datasource.schema.table
context.with(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand Down Expand Up @@ -97,7 +98,7 @@ public UnresolvedPlan visitDescribeCommand(OpenSearchPPLParser.DescribeCommandCo
final Relation table = (Relation) visitTableSourceClause(ctx.tableSourceClause());
QualifiedName tableQualifiedName = table.getTableQualifiedName();
ArrayList<String> parts = new ArrayList<>(tableQualifiedName.getParts());
return new Relation(new QualifiedName(parts));
return new DescribeRelation(new QualifiedName(parts));
}

/** Where command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.UnresolvedPlan;

Expand Down Expand Up @@ -78,11 +79,13 @@ public Object build() {
}
}

private UnresolvedPlan addSelectAll(UnresolvedPlan plan) {
if ((plan instanceof Project) && !((Project) plan).isExcluded()) {
return plan;
} else {
return new Project(ImmutableList.of(AllFields.of())).attach(plan);
private UnresolvedPlan addSelectAll(UnresolvedPlan plan) {
if ((plan instanceof Project) && !((Project) plan).isExcluded()) {
return plan;
} else if (plan instanceof DescribeRelation) {
return plan;
} else {
return new Project(ImmutableList.of(AllFields.of())).attach(plan);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand

class PPLLogicalPlanBasicQueriesTranslatorTestSuite
extends SparkFunSuite
Expand All @@ -24,6 +26,25 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite
private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test simple describe clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe t", false), context)

val expectedPlan = DescribeTableCommand(TableIdentifier("t"), null, isExtended = false, Seq.empty)
comparePlans(expectedPlan, logPlan, false)
}


test("test FQN table describe table clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.schema.t", false), context)

val expectedPlan = DescribeTableCommand(TableIdentifier("catalog.schema.t"), null, isExtended = false, Seq.empty)
comparePlans(expectedPlan, logPlan, false)
}

test("test simple search with only one table and no explicit fields (defaults to all fields)") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
Expand Down
Loading