Skip to content

Commit

Permalink
[Backport 0.5-nexus] Support table identifier contains dot with backt…
Browse files Browse the repository at this point in the history
…icks (#774)

* backport #768 to 0.5-nexus

Signed-off-by: YANGDB <[email protected]>

* support spark prior to 3.5 with its extended table identifier (existing table identifier only has 2 parts)

Signed-off-by: YANGDB <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored Oct 12, 2024
1 parent b3adf46 commit 30a77de
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.execution.ExplainMode
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLBasicITSuite
Expand All @@ -21,12 +22,20 @@ class FlintSparkPPLBasicITSuite

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
private val t1 = "`spark_catalog`.`default`.`flint_ppl_test1`"
private val t2 = "`spark_catalog`.default.`flint_ppl_test2`"
private val t3 = "spark_catalog.`default`.`flint_ppl_test3`"
private val t4 = "`spark_catalog`.`default`.flint_ppl_test4"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createPartitionedStateCountryTable(testTable)
createPartitionedStateCountryTable(t1)
createPartitionedStateCountryTable(t2)
createPartitionedStateCountryTable(t3)
createPartitionedStateCountryTable(t4)
}

protected override def afterEach(): Unit = {
Expand All @@ -39,48 +48,106 @@ class FlintSparkPPLBasicITSuite
}

test("describe (extended) table query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
val frame = sql(s"""
describe flint_ppl_test
""".stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("name", "string", null),
Row("age", "int", null),
Row("state", "string", null),
Row("country", "string", null),
Row("year", "int", null),
Row("month", "int", null),
Row("# Partition Information", "", ""),
Row("# col_name", "data_type", "comment"),
Row("year", "int", null),
Row("month", "int", null))

// Convert actual results to a Set for quick lookup
val resultsSet: Set[Row] = results.toSet
// Check that each expected row is present in the actual results
expectedResults.foreach { expectedRow =>
assert(
resultsSet.contains(expectedRow),
s"Expected row $expectedRow not found in results")
}
// Retrieve the logical plan
val logicalPlan: LogicalPlan =
frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan
// Define the expected logical plan
val expectedPlan: LogicalPlan =
DescribeTableCommand(
TableIdentifier("flint_ppl_test"),
Map.empty[String, String],
isExtended = true,
output = DescribeRelation.getOutputAttrs)
// Compare the two plans
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("name", "string", null),
Row("age", "int", null),
Row("state", "string", null),
Row("country", "string", null),
Row("year", "int", null),
Row("month", "int", null),
Row("# Partition Information", "", ""),
Row("# col_name", "data_type", "comment"),
Row("year", "int", null),
Row("month", "int", null))

// Convert actual results to a Set for quick lookup
val resultsSet: Set[Row] = results.toSet
// Check that each expected row is present in the actual results
expectedResults.foreach { expectedRow =>
assert(resultsSet.contains(expectedRow), s"Expected row $expectedRow not found in results")
}
// Retrieve the logical plan
val logicalPlan: LogicalPlan =
frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan
// Define the expected logical plan
val expectedPlan: LogicalPlan =
DescribeTableCommand(
TableIdentifier("flint_ppl_test"),
Map.empty[String, String],
isExtended = true,
output = DescribeRelation.getOutputAttrs)
// Compare the two plans
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("describe (extended) FQN (2 parts) table query test") {
val frame = sql(s"""
describe default.flint_ppl_test
""".stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("name", "string", null),
Row("age", "int", null),
Row("state", "string", null),
Row("country", "string", null),
Row("year", "int", null),
Row("month", "int", null),
Row("# Partition Information", "", ""),
Row("# col_name", "data_type", "comment"),
Row("year", "int", null),
Row("month", "int", null))

// Convert actual results to a Set for quick lookup
val resultsSet: Set[Row] = results.toSet
// Check that each expected row is present in the actual results
expectedResults.foreach { expectedRow =>
assert(resultsSet.contains(expectedRow), s"Expected row $expectedRow not found in results")
}
// Retrieve the logical plan
val logicalPlan: LogicalPlan =
frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan
// Define the expected logical plan
val expectedPlan: LogicalPlan =
DescribeTableCommand(
TableIdentifier("flint_ppl_test", Option("default")),
Map.empty[String, String],
isExtended = true,
output = DescribeRelation.getOutputAttrs)
// Compare the two plans
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test backtick table names and name contains '.'") {
Seq(t1, t2, t3, t4).foreach { table =>
val frame = sql(s"""
| source = $table| head 2
| """.stripMargin)
assert(frame.collect().length == 2)
}
// test read table which is unable to create
val t5 = "default.`flint/ppl/test5.log`"
val t6 = "default.`flint_ppl_test6.log`"
Seq(t5, t6).foreach { table =>
val ex = intercept[AnalysisException](sql(s"""
| source = $table| head 2
| """.stripMargin))
assert(ex.getMessage().contains("Table or view not found"))
}
val t7 = "spark_catalog.default.flint_ppl_test7.log"
val ex = intercept[AnalysisException](sql(s"""
| source = $t7| head 2
| """.stripMargin))
assert(ex.getMessage().contains("spark_catalog requires a single-part namespace"))
}

test("create ppl simple query test") {
Expand Down
13 changes: 12 additions & 1 deletion ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,21 @@ See the next samples of PPL queries :

**Describe**
- `describe table` This command is equal to the `DESCRIBE EXTENDED table` SQL command

- `describe schema.table`
- `` describe schema.`table` ``
- `describe catalog.schema.table`
- `` describe catalog.schema.`table` ``
- `` describe `catalog`.`schema`.`table` ``
**Fields**
- `source = table`
- `source = table | fields a,b,c`
- `source = table | fields + a,b,c`
- `source = table | fields - b,c`
- `source = table | eval b1 = b | fields - b1,c`

_- **Limitation: new field added by eval command with a function cannot be dropped in current version:**_
- `source = table | eval b1 = b + 1 | fields - b1,c` (Field `b1` cannot be dropped caused by SPARK-49782)
- `source = table | eval b1 = lower(b) | fields - b1,c` (Field `b1` cannot be dropped caused by SPARK-49782)

**Nested-Fields**
- `source = catalog.schema.table1, catalog.schema.table2 | fields A.nested1, B.nested1`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ public Relation(UnresolvedExpression tableName, String alias) {
private String alias;

/**
* Return table name.
*
* @return table name
*/
public List<String> getTableName() {
return tableName.stream().map(Object::toString).collect(Collectors.toList());
}



public List<QualifiedName> getQualifiedNames() {
return tableName.stream().map(t -> (QualifiedName) t).collect(Collectors.toList());
}

/**
* Return alias.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate;
import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join;
import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier;
import static org.opensearch.sql.ppl.utils.RelationUtils.namedParts;
import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField;
import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window;

Expand Down Expand Up @@ -131,28 +133,18 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) {
@Override
public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) {
if (node instanceof DescribeRelation) {
TableIdentifier identifier;
if (node.getTableQualifiedName().getParts().size() == 1) {
identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0));
} else if (node.getTableQualifiedName().getParts().size() == 2) {
identifier = new TableIdentifier(
node.getTableQualifiedName().getParts().get(1),
Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0)));
} else {
throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName()
+ " Syntax: [ database_name. ] table_name");
}
TableIdentifier identifier = getTableIdentifier(node.getTableQualifiedName());
return context.with(
new DescribeTableCommand(
identifier,
scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
true,
DescribeRelation$.MODULE$.getOutputAttrs()));
}
//regular sql algebraic relations
node.getTableName().forEach(t ->
//regular sql algebraic relations
node.getQualifiedNames().forEach(q ->
// Resolving the qualifiedName which is composed of a datasource.schema.table
context.withRelation(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false))
context.withRelation(new UnresolvedRelation(namedParts(q), CaseInsensitiveStringMap.empty(), false))
);
return context.getPlan();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package org.opensearch.sql.ppl.utils;

import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.opensearch.sql.ast.expression.QualifiedName;
import scala.Option$;
import scala.collection.Seq;

import java.util.List;
import java.util.Optional;

import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;

public interface RelationUtils {
/**
* attempt resolving if the field is relating to the given relation
Expand All @@ -15,12 +20,11 @@ public interface RelationUtils {
*
* @param relations
* @param node
* @param contextRelations
* @return
*/
static Optional<QualifiedName> resolveField(List<UnresolvedRelation> relations, QualifiedName node, List<LogicalPlan> tables) {
//when is only a single tables in the query - return the node as is to be resolved by the schema itself
if(tables.size()==1) return Optional.of(node);
if (tables.size() == 1) return Optional.of(node);
//when is more than one table in the query (union or join) - filter out nodes that dont apply to the current relation
return relations.stream()
.filter(rel -> node.getPrefix().isEmpty() ||
Expand All @@ -29,4 +33,25 @@ static Optional<QualifiedName> resolveField(List<UnresolvedRelation> relations,
.findFirst()
.map(rel -> node);
}

static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) {
TableIdentifier identifier;
if (qualifiedName.getParts().isEmpty()) {
throw new IllegalArgumentException("Empty table name is invalid");
} else if (qualifiedName.getParts().size() == 1) {
identifier = new TableIdentifier(qualifiedName.getParts().get(0));
} else if (qualifiedName.getParts().size() == 2) {
identifier = new TableIdentifier(
qualifiedName.getParts().get(1),
Option$.MODULE$.apply(qualifiedName.getParts().get(0)));
} else {
throw new IllegalArgumentException("Invalid table name: " + qualifiedName
+ " Syntax: [ database_name. ] table_name");
}
return identifier;
}

static Seq<String> namedParts(QualifiedName qualifiedName) {
return seq(qualifiedName.getParts());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.scalatest.matchers.should.Matchers
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, GreaterThan, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, GreaterThan, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
Expand All @@ -29,11 +29,37 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite
test("test error describe clause") {
val context = new CatalystPlanContext
val thrown = intercept[IllegalArgumentException] {
planTransformer.visit(plan(pplParser, "describe t.b.c.d", false), context)
planTransformer.visit(plan(pplParser, "describe b.c.d", false), context)
}

assert(
thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name")
thrown.getMessage === "Invalid table name: b.c.d Syntax: [ database_name. ] table_name")
}

test("test describe with backticks") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "describe b.`c.d`", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("c.d", Option("b")),
Map.empty[String, String].empty,
isExtended = true,
output = DescribeRelation.getOutputAttrs)
comparePlans(expectedPlan, logPlan, false)
}

test("test describe FQN table clause") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "describe default.http_logs", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("http_logs", Option("default")),
Map.empty[String, String].empty,
isExtended = true,
output = DescribeRelation.getOutputAttrs)
comparePlans(expectedPlan, logPlan, false)
}

test("test simple describe clause") {
Expand All @@ -50,10 +76,10 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite

test("test FQN table describe table clause") {
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.t", false), context)
val logPlan = planTransformer.visit(plan(pplParser, "describe schema.t", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("t", Option("catalog")),
TableIdentifier("t", Option("schema")),
Map.empty[String, String].empty,
isExtended = true,
output = DescribeRelation.getOutputAttrs)
Expand Down

0 comments on commit 30a77de

Please sign in to comment.