Skip to content

Commit

Permalink
support spark prior to 3.5 with its extended table identifier (existi…
Browse files Browse the repository at this point in the history
…ng table identifier only has 2 parts)

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Oct 12, 2024
1 parent b6c4b0f commit 745f73a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class FlintSparkPPLBasicITSuite
private val t2 = "`spark_catalog`.default.`flint_ppl_test2`"
private val t3 = "spark_catalog.`default`.`flint_ppl_test3`"
private val t4 = "`spark_catalog`.`default`.flint_ppl_test4"

override def beforeAll(): Unit = {
super.beforeAll()

Expand Down Expand Up @@ -129,54 +129,25 @@ class FlintSparkPPLBasicITSuite

test("test backtick table names and name contains '.'") {
Seq(t1, t2, t3, t4).foreach { table =>
val frame = sql(
s"""
val frame = sql(s"""
| source = $table| head 2
| """.stripMargin)
assert(frame.collect().length == 2)
}
// test read table which is unable to create
val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`"
val t6 = "spark_catalog.default.`flint_ppl_test6.log`"
val t5 = "default.`flint/ppl/test5.log`"
val t6 = "default.`flint_ppl_test6.log`"
Seq(t5, t6).foreach { table =>
val ex = intercept[AnalysisException](sql(
s"""
val ex = intercept[AnalysisException](sql(s"""
| source = $table| head 2
| """.stripMargin))
assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND"))
assert(ex.getMessage().contains("Table or view not found"))
}
val t7 = "spark_catalog.default.flint_ppl_test7.log"
val ex = intercept[IllegalArgumentException](sql(
s"""
val ex = intercept[AnalysisException](sql(s"""
| source = $t7| head 2
| """.stripMargin))
assert(ex.getMessage().contains("Invalid table name"))
}

test("test describe backtick table names and name contains '.'") {
Seq(t1, t2, t3, t4).foreach { table =>
val frame = sql(
s"""
| describe $table
| """.stripMargin)
assert(frame.collect().length > 0)
}
// test read table which is unable to create
val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`"
val t6 = "spark_catalog.default.`flint_ppl_test6.log`"
Seq(t5, t6).foreach { table =>
val ex = intercept[AnalysisException](sql(
s"""
| describe $table
| """.stripMargin))
assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND"))
}
val t7 = "spark_catalog.default.flint_ppl_test7.log"
val ex = intercept[IllegalArgumentException](sql(
s"""
| describe $t7
| """.stripMargin))
assert(ex.getMessage().contains("Invalid table name"))
assert(ex.getMessage().contains("spark_catalog requires a single-part namespace"))
}

test("create ppl simple query test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate;
import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join;
import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier;
import static org.opensearch.sql.ppl.utils.RelationUtils.namedParts;
import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField;
import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window;

Expand Down Expand Up @@ -143,7 +144,7 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) {
//regular sql algebraic relations
node.getQualifiedNames().forEach(q ->
// Resolving the qualifiedName which is composed of a datasource.schema.table
context.withRelation(new UnresolvedRelation(getTableIdentifier(q).nameParts(), CaseInsensitiveStringMap.empty(), false))
context.withRelation(new UnresolvedRelation(namedParts(q), CaseInsensitiveStringMap.empty(), false))
);
return context.getPlan();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.opensearch.sql.ast.expression.QualifiedName;
import scala.Option$;
import scala.collection.Seq;

import java.util.List;
import java.util.Optional;

import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;

public interface RelationUtils {
/**
* attempt resolving if the field is relating to the given relation
Expand All @@ -21,7 +24,7 @@ public interface RelationUtils {
*/
static Optional<QualifiedName> resolveField(List<UnresolvedRelation> relations, QualifiedName node, List<LogicalPlan> tables) {
//when is only a single tables in the query - return the node as is to be resolved by the schema itself
if(tables.size()==1) return Optional.of(node);
if (tables.size() == 1) return Optional.of(node);
//when is more than one table in the query (union or join) - filter out nodes that dont apply to the current relation
return relations.stream()
.filter(rel -> node.getPrefix().isEmpty() ||
Expand All @@ -30,7 +33,7 @@ static Optional<QualifiedName> resolveField(List<UnresolvedRelation> relations,
.findFirst()
.map(rel -> node);
}

static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) {
TableIdentifier identifier;
if (qualifiedName.getParts().isEmpty()) {
Expand All @@ -41,15 +44,14 @@ static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) {
identifier = new TableIdentifier(
qualifiedName.getParts().get(1),
Option$.MODULE$.apply(qualifiedName.getParts().get(0)));
} else if (qualifiedName.getParts().size() == 3) {
identifier = new TableIdentifier(
qualifiedName.getParts().get(2),
Option$.MODULE$.apply(qualifiedName.getParts().get(1)),
Option$.MODULE$.apply(qualifiedName.getParts().get(0)));
} else {
throw new IllegalArgumentException("Invalid table name: " + qualifiedName
+ " Syntax: [ database_name. ] table_name");
}
return identifier;
}

static Seq<String> namedParts(QualifiedName qualifiedName) {
return seq(qualifiedName.getParts());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.common.antlr.SyntaxCheckException
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, GreaterThan, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, GreaterThan, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
Expand All @@ -30,33 +29,33 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite
test("test error describe clause") {
val context = new CatalystPlanContext
val thrown = intercept[IllegalArgumentException] {
planTransformer.visit(plan(pplParser, "describe t.b.c.d", false), context)
planTransformer.visit(plan(pplParser, "describe b.c.d", false), context)
}

assert(
thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name")
thrown.getMessage === "Invalid table name: b.c.d Syntax: [ database_name. ] table_name")
}

test("test describe with backticks") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "describe t.b.`c.d`", false), context)
planTransformer.visit(plan(pplParser, "describe b.`c.d`", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("c.d", Option("b"), Option("t")),
TableIdentifier("c.d", Option("b")),
Map.empty[String, String].empty,
isExtended = true,
output = DescribeRelation.getOutputAttrs)
comparePlans(expectedPlan, logPlan, false)
}

test("test describe FQN table clause") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "describe catalog.schema.http_logs", false), context)
planTransformer.visit(plan(pplParser, "describe default.http_logs", false), context)

val expectedPlan = DescribeTableCommand(
TableIdentifier("http_logs", Option("schema"), Option("catalog")),
TableIdentifier("http_logs", Option("default")),
Map.empty[String, String].empty,
isExtended = true,
output = DescribeRelation.getOutputAttrs)
Expand Down

0 comments on commit 745f73a

Please sign in to comment.