Skip to content

Commit

Permalink
add PPL describe command
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Aug 7, 2024
1 parent a91b3ef commit 21c92a0
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLBasicITSuite
Expand Down Expand Up @@ -36,7 +38,29 @@ class FlintSparkPPLBasicITSuite
}
}

test("create ppl simple query test") {
test("describe table query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
describe $table
""".stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 2)
// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val expectedPlan: LogicalPlan =
Project(
Seq(UnresolvedStar(None)),
DescribeTableCommand(TableIdentifier("table"), null, isExtended = false, Seq.empty))
// Compare the two plans
assert(expectedPlan === logicalPlan)
}
}

test("create ppl simple query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import org.opensearch.sql.ast.expression.UnresolvedExpression;

/**
* Extend Relation to describe the table itself
*/
public class DescribeRelation extends Relation{
public DescribeRelation(UnresolvedExpression tableName) {
super(tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ppl;

import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
Expand All @@ -15,6 +16,7 @@
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -46,6 +48,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand Down Expand Up @@ -107,6 +110,10 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) {

@Override
public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) {
if (node instanceof DescribeRelation) {
return context.with(new DescribeTableCommand(new TableIdentifier(node.getTableQualifiedName().toString()), null, false, seq()));
}
//regular sql algebraic relations
node.getTableName().forEach(t ->
// Resolving the qualifiedName which is composed of a datasource.schema.table
context.with(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand Down Expand Up @@ -97,7 +98,7 @@ public UnresolvedPlan visitDescribeCommand(OpenSearchPPLParser.DescribeCommandCo
final Relation table = (Relation) visitTableSourceClause(ctx.tableSourceClause());
QualifiedName tableQualifiedName = table.getTableQualifiedName();
ArrayList<String> parts = new ArrayList<>(tableQualifiedName.getParts());
return new Relation(new QualifiedName(parts));
return new DescribeRelation(new QualifiedName(parts));
}

/** Where command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand

class PPLLogicalPlanBasicQueriesTranslatorTestSuite
extends SparkFunSuite
Expand All @@ -24,6 +26,27 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite
private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test simple describe clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe table", false), context)

val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))
val expectedPlan = Project(projectList, DescribeTableCommand(TableIdentifier("table"), null, isExtended = false, Seq.empty))
comparePlans(expectedPlan, logPlan, false)
}


test("test FQN table describe clause") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.schema.table", false), context)

val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None))
val expectedPlan = Project(projectList, DescribeTableCommand(TableIdentifier("catalog.schema.table"), null, isExtended = false, Seq.empty))
comparePlans(expectedPlan, logPlan, false)
}

test("test simple search with only one table and no explicit fields (defaults to all fields)") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
Expand Down

0 comments on commit 21c92a0

Please sign in to comment.