diff --git a/README.md b/README.md index 2a3754e6c..f074e4427 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ bin/spark-shell --packages "org.opensearch:opensearch-spark-standalone_2.12:0.7. To build and run this PPL in Spark, you can run (requires Java 11): ``` - +sbt clean sparkPPLCosmetic/publishM2 ``` diff --git a/build.sbt b/build.sbt index 131fb2347..724d348ae 100644 --- a/build.sbt +++ b/build.sbt @@ -190,9 +190,6 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) "com.github.sbt" % "junit-interface" % "0.13.3" % "test", "org.projectlombok" % "lombok" % "1.18.30", "com.github.seancfoley" % "ipaddress" % "5.5.1", - "org.apache.commons" % "commons-lang3" % "3.17.0", - "org.apache.commons" % "commons-csv" % "1.12.0", - "com.fasterxml.jackson.core" % "jackson-annotations" % "2.14.2", ), libraryDependencies ++= deps(sparkVersion), // ANTLR settings diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/GeoIp.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/GeoIp.java index e8d351dc6..feefa6929 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/GeoIp.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/GeoIp.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.sql.ast.tree; import com.google.common.collect.ImmutableList; @@ -15,7 +20,6 @@ import java.util.List; import java.util.Optional; -@ToString @Getter @RequiredArgsConstructor @EqualsAndHashCode(callSuper = false) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java index a651f83e9..04bbd6152 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java @@ -8,7 +8,6 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; -import org.apache.spark.sql.catalyst.analysis.UnresolvedStar; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.CaseWhen; import org.apache.spark.sql.catalyst.expressions.Exists$; @@ -28,11 +27,31 @@ import org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable$; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.unsafe.types.UTF8String; import org.opensearch.sql.ast.AbstractNodeVisitor; -import org.opensearch.sql.ast.expression.*; +import org.opensearch.sql.ast.expression.AggregateFunction; +import org.opensearch.sql.ast.expression.Alias; +import org.opensearch.sql.ast.expression.AllFields; +import org.opensearch.sql.ast.expression.And; +import org.opensearch.sql.ast.expression.Between; +import org.opensearch.sql.ast.expression.BinaryExpression; +import org.opensearch.sql.ast.expression.Case; +import org.opensearch.sql.ast.expression.Compare; +import org.opensearch.sql.ast.expression.FieldsMapping; +import org.opensearch.sql.ast.expression.Function; +import org.opensearch.sql.ast.expression.In; +import org.opensearch.sql.ast.expression.Interval; +import org.opensearch.sql.ast.expression.IsEmpty; +import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.Not; +import org.opensearch.sql.ast.expression.Or; +import org.opensearch.sql.ast.expression.LambdaFunction; +import org.opensearch.sql.ast.expression.QualifiedName; +import org.opensearch.sql.ast.expression.Span; +import org.opensearch.sql.ast.expression.UnresolvedExpression; +import org.opensearch.sql.ast.expression.When; +import org.opensearch.sql.ast.expression.WindowFunction; +import org.opensearch.sql.ast.expression.Xor; import org.opensearch.sql.ast.expression.subquery.ExistsSubquery; import org.opensearch.sql.ast.expression.subquery.InSubquery; import org.opensearch.sql.ast.expression.subquery.ScalarSubquery; @@ -54,7 +73,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Locale; import java.util.Optional; import java.util.Stack; import java.util.function.BiFunction; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 2832e389c..32942987a 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -5,49 +5,29 @@ package org.opensearch.sql.ppl; -import org.apache.spark.sql.catalyst.AliasIdentifier; import org.apache.spark.sql.catalyst.TableIdentifier; -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; -import org.apache.spark.sql.catalyst.analysis.UnresolvedStar; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; -import org.apache.spark.sql.catalyst.expressions.And; import org.apache.spark.sql.catalyst.expressions.Ascending$; -import org.apache.spark.sql.catalyst.expressions.AttributeReference; -import org.apache.spark.sql.catalyst.expressions.CreateStruct; import org.apache.spark.sql.catalyst.expressions.Descending$; -import org.apache.spark.sql.catalyst.expressions.EqualTo; -import org.apache.spark.sql.catalyst.expressions.Exp; import org.apache.spark.sql.catalyst.expressions.Explode; -import org.apache.spark.sql.catalyst.expressions.ExprId; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.GeneratorOuter; -import org.apache.spark.sql.catalyst.expressions.GreaterThan; -import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual; -import org.apache.spark.sql.catalyst.expressions.LessThan; -import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual; -import org.apache.spark.sql.catalyst.expressions.Log; import org.apache.spark.sql.catalyst.expressions.NamedExpression; -import org.apache.spark.sql.catalyst.expressions.ScalaUDF; import org.apache.spark.sql.catalyst.expressions.SortDirection; import org.apache.spark.sql.catalyst.expressions.SortOrder; import org.apache.spark.sql.catalyst.plans.logical.Aggregate; import org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns$; import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$; -import org.apache.spark.sql.catalyst.plans.logical.DropColumns; -import org.apache.spark.sql.catalyst.plans.logical.DropColumns$; import org.apache.spark.sql.catalyst.plans.logical.Generate; import org.apache.spark.sql.catalyst.plans.logical.Limit; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.catalyst.plans.logical.Project$; -import org.apache.spark.sql.catalyst.plans.logical.Union; import org.apache.spark.sql.execution.ExplainMode; import org.apache.spark.sql.execution.command.DescribeTableCommand; import org.apache.spark.sql.execution.command.ExplainCommand; import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.opensearch.flint.spark.FlattenGenerator; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -92,8 +72,8 @@ import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.common.antlr.SyntaxCheckException; -import org.opensearch.sql.expression.function.SerializableUdf; import org.opensearch.sql.ppl.utils.FieldSummaryTransformer; +import org.opensearch.sql.ppl.utils.GeoipCatalystUtils; import org.opensearch.sql.ppl.utils.ParseTransformer; import org.opensearch.sql.ppl.utils.SortUtils; import org.opensearch.sql.ppl.utils.TrendlineCatalystUtils; @@ -101,20 +81,16 @@ import scala.None$; import scala.Option; import scala.collection.IterableLike; -import scala.collection.JavaConverters; import scala.collection.Seq; -import javax.naming.Name; import java.util.ArrayList; import java.util.List; -import java.util.Locale; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.List.of; -import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEvents; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEventsAndKeepEmpty; @@ -593,11 +569,9 @@ public LogicalPlan visitGeoIp(GeoIp node, CatalystPlanContext context) { List attributeList = new ArrayList<>(); while (!context.getNamedParseExpressions().isEmpty()) { - Expression nextExpression = context.getNamedParseExpressions().pop(); String attributeName = nextExpression.toString(); - if (attributeList.contains(attributeName)) { throw new IllegalStateException("Duplicate attribute in GEOIP attribute list"); } @@ -605,112 +579,17 @@ public LogicalPlan visitGeoIp(GeoIp node, CatalystPlanContext context) { attributeList.add(0, attributeName); } - Expression ipAddressExpression = visitExpression(node.getIpAddress(), context); Expression fieldExpression = visitExpression(node.getField(), context); + Expression ipAddressExpression = visitExpression(node.getIpAddress(), context); - ScalaUDF ipInt = new ScalaUDF(SerializableUdf.ipToInt, - DataTypes.createDecimalType(38,0), - seq(ipAddressExpression), - seq(), - Option.empty(), - Option.apply("ip_to_int"), - false, - true); - - ScalaUDF isIpv4 = new ScalaUDF(SerializableUdf.isIpv4, - DataTypes.BooleanType, - seq(ipAddressExpression), - seq(), Option.empty(), - Option.apply("is_ipv4"), - false, - true); - - LogicalPlan joinPlan = context.apply(left -> { - LogicalPlan right = new UnresolvedRelation(seq("geoip"), CaseInsensitiveStringMap.empty(), false); - LogicalPlan leftAlias = org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$.MODULE$.apply("t1", left); - LogicalPlan rightAlias = org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$.MODULE$.apply("t2", right); - Optional joinCondition = Optional.of(new And( - new And( - new GreaterThanOrEqual( - ipInt, - UnresolvedAttribute$.MODULE$.apply(seq("t2","start")) - ), - new LessThan( - ipInt, - UnresolvedAttribute$.MODULE$.apply(seq("t2","end")) - ) - ), - new EqualTo( - isIpv4, - UnresolvedAttribute$.MODULE$.apply(seq("t2","ipv4")) - ) - )); - context.retainAllNamedParseExpressions(p -> p); - context.retainAllPlans(p -> p); - return join(leftAlias, - rightAlias, - Join.JoinType.INNER, - joinCondition, - new Join.JoinHint()); - }); - - org.apache.spark.sql.catalyst.plans.logical.Join lol = (org.apache.spark.sql.catalyst.plans.logical.Join) joinPlan; - System.out.println(JavaConverters.seqAsJavaListConverter(lol.right().output()).asJava().size()); - - List projectExpressions = JavaConverters.seqAsJavaListConverter(lol.left().output()).asJava() - .stream() - .map(attr -> (NamedExpression) attr) - .collect(Collectors.toList()); - - projectExpressions.add(UnresolvedStar$.MODULE$.apply(Option.>empty())); - - NamedExpression geoCol = org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply( - CreateStruct.apply(seq(createGeoIpStructFields(attributeList))), - fieldExpression.toString(), - NamedExpression.newExprId(), - seq(new java.util.ArrayList()), - Option.empty(), - seq(new java.util.ArrayList())); - - projectExpressions.add(geoCol); - - for (NamedExpression expression : projectExpressions) { - System.out.println(expression); - } - - List dropList = createGeoIpStructFields(new ArrayList<>()); - dropList.addAll(List.of( - - UnresolvedAttribute$.MODULE$.apply(seq("t2","cidr")), - UnresolvedAttribute$.MODULE$.apply(seq("t2","start")), - UnresolvedAttribute$.MODULE$.apply(seq("t2","end")), - UnresolvedAttribute$.MODULE$.apply(seq("t2","ipv4")) - )); - - context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(seq(projectExpressions), joinPlan)); - return context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns(seq(dropList), p)); - } - - private List createGeoIpStructFields(List attributeList) { - List attributeListToUse; - if (attributeList == null || attributeList.isEmpty()) { - attributeListToUse = List.of( - "country_iso_code", - "country_name", - "continent_name", - "region_iso_code", - "region_name", - "city_name", - "time_zone", - "location" - ); - } else { - attributeListToUse = attributeList; - } - - return attributeListToUse.stream() - .map(a -> UnresolvedAttribute$.MODULE$.apply(seq("t2",a.toLowerCase(Locale.ROOT)))) - .collect(Collectors.toList()); + return GeoipCatalystUtils.getGeoipLogicalPlan( + new GeoipCatalystUtils.GeoIpParameters( + fieldExpression, + ipAddressExpression, + attributeList + ), + context + ); } @Override diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoipCatalystUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoipCatalystUtils.java index a35114140..63b31c27b 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoipCatalystUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoipCatalystUtils.java @@ -1,4 +1,156 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.sql.ppl.utils; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; +import org.apache.spark.sql.catalyst.expressions.And; +import org.apache.spark.sql.catalyst.expressions.CreateStruct; +import org.apache.spark.sql.catalyst.expressions.EqualTo; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual; +import org.apache.spark.sql.catalyst.expressions.LessThan; +import org.apache.spark.sql.catalyst.expressions.NamedExpression; +import org.apache.spark.sql.catalyst.expressions.ScalaUDF; +import org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.Project; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.opensearch.sql.ast.tree.Join; +import org.opensearch.sql.expression.function.SerializableUdf; +import org.opensearch.sql.ppl.CatalystPlanContext; +import scala.Option; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; +import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join; + public interface GeoipCatalystUtils { + static LogicalPlan getGeoipLogicalPlan(GeoIpParameters parameters, CatalystPlanContext context) { + applyJoin(parameters.getIpAddress(), context); + return applyProjection(parameters.getField(), parameters.getProperties(), context); + } + + static private ScalaUDF getIpInt(Expression ipAddress) { + return new ScalaUDF(SerializableUdf.ipToInt, + DataTypes.createDecimalType(38,0), + seq(ipAddress), + seq(), + Option.empty(), + Option.apply("ip_to_int"), + false, + true + ); + } + + static private ScalaUDF getIsIpv4(Expression ipAddress) { + return new ScalaUDF(SerializableUdf.isIpv4, + DataTypes.BooleanType, + seq(ipAddress), + seq(), Option.empty(), + Option.apply("is_ipv4"), + false, + true + ); + } + + static LogicalPlan applyJoin(Expression ipAddress, CatalystPlanContext context) { + return context.apply(left -> { + LogicalPlan right = new UnresolvedRelation(seq("geoip"), CaseInsensitiveStringMap.empty(), false); + LogicalPlan leftAlias = org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$.MODULE$.apply("t1", left); + LogicalPlan rightAlias = org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$.MODULE$.apply("t2", right); + Optional joinCondition = Optional.of(new And( + new And( + new GreaterThanOrEqual( + getIpInt(ipAddress), + UnresolvedAttribute$.MODULE$.apply(seq("t2","start")) + ), + new LessThan( + getIpInt(ipAddress), + UnresolvedAttribute$.MODULE$.apply(seq("t2","end")) + ) + ), + new EqualTo( + getIsIpv4(ipAddress), + UnresolvedAttribute$.MODULE$.apply(seq("t2","ipv4")) + ) + )); + context.retainAllNamedParseExpressions(p -> p); + context.retainAllPlans(p -> p); + return join(leftAlias, + rightAlias, + Join.JoinType.INNER, + joinCondition, + new Join.JoinHint()); + }); + } + + static private LogicalPlan applyProjection(Expression field, List properties, CatalystPlanContext context) { + List projectExpressions = new ArrayList<>(); + projectExpressions.add(UnresolvedStar$.MODULE$.apply(Option.empty())); + + NamedExpression geoCol = org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply( + CreateStruct.apply(seq(createGeoIpStructFields(properties))), + field.toString(), + NamedExpression.newExprId(), + seq(new java.util.ArrayList<>()), + Option.empty(), + seq(new java.util.ArrayList<>())); + + projectExpressions.add(geoCol); + + List dropList = createGeoIpStructFields(new ArrayList<>()); + dropList.addAll(List.of( + + UnresolvedAttribute$.MODULE$.apply(seq("t2","cidr")), + UnresolvedAttribute$.MODULE$.apply(seq("t2","start")), + UnresolvedAttribute$.MODULE$.apply(seq("t2","end")), + UnresolvedAttribute$.MODULE$.apply(seq("t2","ipv4")) + )); + + context.apply(p -> new Project(seq(projectExpressions), p)); + return context.apply(p -> new DataFrameDropColumns(seq(dropList), p)); + } + + static private List createGeoIpStructFields(List attributeList) { + List attributeListToUse; + if (attributeList == null || attributeList.isEmpty()) { + attributeListToUse = List.of( + "country_iso_code", + "country_name", + "continent_name", + "region_iso_code", + "region_name", + "city_name", + "time_zone", + "location" + ); + } else { + attributeListToUse = attributeList; + } + + return attributeListToUse.stream() + .map(a -> UnresolvedAttribute$.MODULE$.apply(seq("t2",a.toLowerCase(Locale.ROOT)))) + .collect(Collectors.toList()); + } + + @Getter + @AllArgsConstructor + class GeoIpParameters { + private final Expression field; + private final Expression ipAddress; + private final List properties; + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanEvalTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanEvalTranslatorTestSuite.scala index 1b61dc98f..3fe65f512 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanEvalTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanEvalTranslatorTestSuite.scala @@ -6,7 +6,6 @@ package org.opensearch.flint.spark.ppl import org.opensearch.flint.spark.ppl.PlaneUtils.plan -import org.opensearch.sql.expression.function.SerializableUdf import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq import org.scalatest.matchers.should.Matchers @@ -27,243 +26,192 @@ class PPLLogicalPlanEvalTranslatorTestSuite private val planTransformer = new CatalystQueryPlanVisitor() private val pplParser = new PPLSyntaxParser() -// test("test eval expressions not included in fields expressions") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit(plan(pplParser, "source=t | eval a = 1, b = 1 | fields c"), context) -// val evalProjectList: Seq[NamedExpression] = -// Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")()) -// val expectedPlan = Project( -// seq(UnresolvedAttribute("c")), -// Project(evalProjectList, UnresolvedRelation(Seq("t")))) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } + test("test eval expressions not included in fields expressions") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t | eval a = 1, b = 1 | fields c"), context) + val evalProjectList: Seq[NamedExpression] = + Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")()) + val expectedPlan = Project( + seq(UnresolvedAttribute("c")), + Project(evalProjectList, UnresolvedRelation(Seq("t")))) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test eval expressions included in fields expression") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=t | eval a = 1, c = 1 | fields a, b, c"), + context) -// test("test eval expressions included in fields expression") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit( -// plan(pplParser, "source=t | eval a = 1, c = 1 | fields a, b, c"), -// context) -// -// val evalProjectList: Seq[NamedExpression] = -// Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "c")()) -// val expectedPlan = Project( -// seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")), -// Project(evalProjectList, UnresolvedRelation(Seq("t")))) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -// -// test("test eval expressions without fields command") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit(plan(pplParser, "source=t | eval a = 1, b = 1"), context) -// -// val evalProjectList: Seq[NamedExpression] = -// Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")()) -// val expectedPlan = -// Project(seq(UnresolvedStar(None)), Project(evalProjectList, UnresolvedRelation(Seq("t")))) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -// -// test("test eval expressions with sort") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit( -// plan(pplParser, "source=t | eval a = 1, b = 1 | sort - a | fields b"), -// context) -// -// val evalProjectList: Seq[NamedExpression] = -// Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")()) -// val evalProject = Project(evalProjectList, UnresolvedRelation(Seq("t"))) -// val sortOrder = SortOrder(UnresolvedAttribute("a"), Descending, Seq.empty) -// val sort = Sort(seq(sortOrder), global = true, evalProject) -// val expectedPlan = Project(seq(UnresolvedAttribute("b")), sort) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -// -// test("test eval expressions with multiple recursive sort") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit( -// plan(pplParser, "source=t | eval a = 1, a = a | sort - a | fields b"), -// context) -// -// val evalProjectList: Seq[NamedExpression] = -// Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(UnresolvedAttribute("a"), "a")()) -// val evalProject = Project(evalProjectList, UnresolvedRelation(Seq("t"))) -// val sortOrder = SortOrder(UnresolvedAttribute("a"), Descending, Seq.empty) -// val sort = Sort(seq(sortOrder), global = true, evalProject) -// val expectedPlan = Project(seq(UnresolvedAttribute("b")), sort) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -// -// test("test multiple eval expressions") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit( -// plan(pplParser, "source=t | eval a = 1, b = 'hello' | eval b = a | sort - b | fields b"), -// context) -// -// val evalProjectList1: Seq[NamedExpression] = -// Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal("hello"), "b")()) -// val evalProjectList2: Seq[NamedExpression] = Seq( -// UnresolvedStar(None), -// Alias(UnresolvedAttribute("a"), "b")(exprId = ExprId(2), qualifier = Seq.empty)) -// val evalProject1 = Project(evalProjectList1, UnresolvedRelation(Seq("t"))) -// val evalProject2 = Project(evalProjectList2, evalProject1) -// val sortOrder = SortOrder(UnresolvedAttribute("b"), Descending, Seq.empty) -// val sort = Sort(seq(sortOrder), global = true, evalProject2) -// val expectedPlan = Project(seq(UnresolvedAttribute("b")), sort) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -// -// test("test complex eval expressions - date function") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit( -// plan(pplParser, "source=t | eval a = TIMESTAMP('2020-09-16 17:30:00') | fields a"), -// context) -// -// val evalProjectList: Seq[NamedExpression] = Seq( -// UnresolvedStar(None), -// Alias( -// UnresolvedFunction("timestamp", seq(Literal("2020-09-16 17:30:00")), isDistinct = false), -// "a")()) -// val expectedPlan = Project( -// seq(UnresolvedAttribute("a")), -// Project(evalProjectList, UnresolvedRelation(Seq("t")))) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -// -// test("test complex eval expressions - math function") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit(plan(pplParser, "source=t | eval a = RAND() | fields a"), context) -// -// val evalProjectList: Seq[NamedExpression] = Seq( -// UnresolvedStar(None), -// Alias(UnresolvedFunction("rand", Seq.empty, isDistinct = false), "a")( -// exprId = ExprId(0), -// qualifier = Seq.empty)) -// val expectedPlan = Project( -// seq(UnresolvedAttribute("a")), -// Project(evalProjectList, UnresolvedRelation(Seq("t")))) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -// -// test("test complex eval expressions - compound function") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit( -// plan(pplParser, "source=t | eval a = if(like(b, '%Hello%'), 'World', 'Hi') | fields a"), -// context) -// -// val evalProjectList: Seq[NamedExpression] = Seq( -// UnresolvedStar(None), -// Alias( -// UnresolvedFunction( -// "if", -// seq( -// UnresolvedFunction( -// "like", -// seq(UnresolvedAttribute("b"), Literal("%Hello%")), -// isDistinct = false), -// Literal("World"), -// Literal("Hi")), -// isDistinct = false), -// "a")()) -// val expectedPlan = Project( -// seq(UnresolvedAttribute("a")), -// Project(evalProjectList, UnresolvedRelation(Seq("t")))) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } + val evalProjectList: Seq[NamedExpression] = + Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "c")()) + val expectedPlan = Project( + seq(UnresolvedAttribute("a"), UnresolvedAttribute("b"), UnresolvedAttribute("c")), + Project(evalProjectList, UnresolvedRelation(Seq("t")))) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } - test("test eval expression - geoip function") { + test("test eval expressions without fields command") { val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t | eval a = 1, b = 1"), context) - //scalastyle:off - println("Wow I like Pancakes"); - //scalastyle:on + val evalProjectList: Seq[NamedExpression] = + Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")()) + val expectedPlan = + Project(seq(UnresolvedStar(None)), Project(evalProjectList, UnresolvedRelation(Seq("t")))) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + test("test eval expressions with sort") { + val context = new CatalystPlanContext val logPlan = planTransformer.visit( - plan(pplParser, "source=t | eval a = geoip(lol,ip_address,TIME_ZONE)"), + plan(pplParser, "source=t | eval a = 1, b = 1 | sort - a | fields b"), context) - //scalastyle:off - println("Wow I like Pancakes"); - //scalastyle:on + val evalProjectList: Seq[NamedExpression] = + Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")()) + val evalProject = Project(evalProjectList, UnresolvedRelation(Seq("t"))) + val sortOrder = SortOrder(UnresolvedAttribute("a"), Descending, Seq.empty) + val sort = Sort(seq(sortOrder), global = true, evalProject) + val expectedPlan = Project(seq(UnresolvedAttribute("b")), sort) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } - val ipAddress = UnresolvedAttribute("ip_address") + test("test eval expressions with multiple recursive sort") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=t | eval a = 1, a = a | sort - a | fields b"), + context) - val is_ipv4 = ScalaUDF( - SerializableUdf.isIpv4, - DataTypes.BooleanType, - seq(ipAddress), - seq(), - Option.empty, - Option.apply("is_ipv4") - ) + val evalProjectList: Seq[NamedExpression] = + Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(UnresolvedAttribute("a"), "a")()) + val evalProject = Project(evalProjectList, UnresolvedRelation(Seq("t"))) + val sortOrder = SortOrder(UnresolvedAttribute("a"), Descending, Seq.empty) + val sort = Sort(seq(sortOrder), global = true, evalProject) + val expectedPlan = Project(seq(UnresolvedAttribute("b")), sort) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } - val ip_int = ScalaUDF( - SerializableUdf.ipToInt, - DataTypes.IntegerType, - seq(ipAddress), - seq(), - Option.empty, - Option.apply("ip_to_int") - ) + test("test multiple eval expressions") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=t | eval a = 1, b = 'hello' | eval b = a | sort - b | fields b"), + context) - val sourceTable = SubqueryAlias("l", UnresolvedRelation(seq("users"))) - val geoTable = SubqueryAlias("r", UnresolvedRelation(seq("geoip"))) + val evalProjectList1: Seq[NamedExpression] = + Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal("hello"), "b")()) + val evalProjectList2: Seq[NamedExpression] = Seq( + UnresolvedStar(None), + Alias(UnresolvedAttribute("a"), "b")(exprId = ExprId(2), qualifier = Seq.empty)) + val evalProject1 = Project(evalProjectList1, UnresolvedRelation(Seq("t"))) + val evalProject2 = Project(evalProjectList2, evalProject1) + val sortOrder = SortOrder(UnresolvedAttribute("b"), Descending, Seq.empty) + val sort = Sort(seq(sortOrder), global = true, evalProject2) + val expectedPlan = Project(seq(UnresolvedAttribute("b")), sort) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } - val ipRangeStartCondition = GreaterThanOrEqual(ip_int, UnresolvedAttribute("r.ip_t")) - val ipRangeEndCondition = LessThan(ip_int, UnresolvedAttribute("r.ip")) - val isIpv4Condition = EqualTo(is_ipv4, UnresolvedAttribute("r.ip_type")) + test("test complex eval expressions - date function") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=t | eval a = TIMESTAMP('2020-09-16 17:30:00') | fields a"), + context) + + val evalProjectList: Seq[NamedExpression] = Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction("timestamp", seq(Literal("2020-09-16 17:30:00")), isDistinct = false), + "a")()) + val expectedPlan = Project( + seq(UnresolvedAttribute("a")), + Project(evalProjectList, UnresolvedRelation(Seq("t")))) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } - val joinCondition = And(And(ipRangeStartCondition, ipRangeEndCondition), isIpv4Condition) + test("test complex eval expressions - math function") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t | eval a = RAND() | fields a"), context) + + val evalProjectList: Seq[NamedExpression] = Seq( + UnresolvedStar(None), + Alias(UnresolvedFunction("rand", Seq.empty, isDistinct = false), "a")( + exprId = ExprId(0), + qualifier = Seq.empty)) + val expectedPlan = Project( + seq(UnresolvedAttribute("a")), + Project(evalProjectList, UnresolvedRelation(Seq("t")))) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } - val joinPlan = Join(sourceTable, geoTable, Inner, Some(joinCondition), JoinHint.NONE) - val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + test("test complex eval expressions - compound function") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=t | eval a = if(like(b, '%Hello%'), 'World', 'Hi') | fields a"), + context) + val evalProjectList: Seq[NamedExpression] = Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "if", + seq( + UnresolvedFunction( + "like", + seq(UnresolvedAttribute("b"), Literal("%Hello%")), + isDistinct = false), + Literal("World"), + Literal("Hi")), + isDistinct = false), + "a")()) + val expectedPlan = Project( + seq(UnresolvedAttribute("a")), + Project(evalProjectList, UnresolvedRelation(Seq("t")))) comparePlans(expectedPlan, logPlan, checkAnalysis = false) } -// // Todo fields-excluded command not supported -// ignore("test eval expressions with fields-excluded command") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit(plan(pplParser, "source=t | eval a = 1, b = 2 | fields - b"), context) -// -// val projectList: Seq[NamedExpression] = -// Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(2), "b")()) -// val expectedPlan = Project(projectList, UnresolvedRelation(Seq("t"))) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -// -// // Todo fields-included command not supported -// ignore("test eval expressions with fields-included command") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit(plan(pplParser, "source=t | eval a = 1, b = 2 | fields + b"), context) -// -// val projectList: Seq[NamedExpression] = -// Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(2), "b")()) -// val expectedPlan = Project(projectList, UnresolvedRelation(Seq("t"))) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } -//// -// test("test IN expr in eval") { -// val context = new CatalystPlanContext -// val logPlan = -// planTransformer.visit( -// plan(pplParser, "source=t | eval in = a in ('Hello', 'World') | fields in"), -// context) -// -// val in = Alias(In(UnresolvedAttribute("a"), Seq(Literal("Hello"), Literal("World"))), "in")() -// val eval = Project(Seq(UnresolvedStar(None), in), UnresolvedRelation(Seq("t"))) -// val expectedPlan = Project(Seq(UnresolvedAttribute("in")), eval) -// comparePlans(expectedPlan, logPlan, checkAnalysis = false) -// } + // Todo fields-excluded command not supported + ignore("test eval expressions with fields-excluded command") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t | eval a = 1, b = 2 | fields - b"), context) + + val projectList: Seq[NamedExpression] = + Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(2), "b")()) + val expectedPlan = Project(projectList, UnresolvedRelation(Seq("t"))) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + // Todo fields-included command not supported + ignore("test eval expressions with fields-included command") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t | eval a = 1, b = 2 | fields + b"), context) + + val projectList: Seq[NamedExpression] = + Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(2), "b")()) + val expectedPlan = Project(projectList, UnresolvedRelation(Seq("t"))) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test IN expr in eval") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=t | eval in = a in ('Hello', 'World') | fields in"), + context) + + val in = Alias(In(UnresolvedAttribute("a"), Seq(Literal("Hello"), Literal("World"))), "in")() + val eval = Project(Seq(UnresolvedStar(None), in), UnresolvedRelation(Seq("t"))) + val expectedPlan = Project(Seq(UnresolvedAttribute("in")), eval) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJoinTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJoinTranslatorTestSuite.scala index d75de8d9f..f4ed397e3 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJoinTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJoinTranslatorTestSuite.scala @@ -30,30 +30,12 @@ class PPLLogicalPlanJoinTranslatorTestSuite private val testTable3 = "spark_catalog.default.flint_ppl_test3" private val testTable4 = "spark_catalog.default.flint_ppl_test4" -// test("test two-tables inner join: join condition with aliases") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| JOIN left = l right = r ON l.id = r.id $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } - test("test two-tables inner join: join condition with aliases") { val context = new CatalystPlanContext val logPlan = plan( pplParser, s""" - | source=users | join left = t1 right = t2 on t1.ip_int>=t2.ip_range_start and t1.ip_int 10 AND lower(r.name) = 'hello' $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = And( -// And( -// EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")), -// EqualTo( -// Literal("hello"), -// UnresolvedFunction.apply( -// "lower", -// Seq(UnresolvedAttribute("r.name")), -// isDistinct = false))), -// LessThan(Literal(10), UnresolvedAttribute("l.count"))) -// val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test inner join: join condition with table names and predicates") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| INNER JOIN left = l right = r ON $testTable1.id = $testTable2.id AND $testTable1.count > 10 AND lower($testTable2.name) = 'hello' $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = And( -// And( -// EqualTo(UnresolvedAttribute(s"$testTable1.id"), UnresolvedAttribute(s"$testTable2.id")), -// EqualTo( -// Literal("hello"), -// UnresolvedFunction.apply( -// "lower", -// Seq(UnresolvedAttribute(s"$testTable2.name")), -// isDistinct = false))), -// LessThan(Literal(10), UnresolvedAttribute(s"$testTable1.count"))) -// val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test left outer join") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| LEFT OUTER JOIN left = l right = r ON l.id = r.id $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, LeftOuter, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test right outer join") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| RIGHT JOIN left = l right = r ON l.id = r.id $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, RightOuter, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test left semi join") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| LEFT SEMI JOIN left = l right = r ON l.id = r.id $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, LeftSemi, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test left anti join") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| LEFT ANTI JOIN left = l right = r ON l.id = r.id $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, LeftAnti, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test full outer join") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| FULL JOIN left = l right = r ON l.id = r.id $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, FullOuter, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test cross join") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| CROSS JOIN left = l right = r $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinPlan = Join(leftPlan, rightPlan, Cross, None, JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test cross join with join condition") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| CROSS JOIN left = l right = r ON l.id = r.id $testTable2 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightPlan = SubqueryAlias("r", table2) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, Cross, Some(joinCondition), JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test multiple joins") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1 -// | | inner JOIN left = l right = r ON l.id = r.id $testTable2 -// | | left JOIN left = l right = r ON l.name = r.name $testTable3 -// | | cross JOIN left = l right = r $testTable4 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3")) -// val table4 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test4")) -// var leftPlan = SubqueryAlias("l", table1) -// var rightPlan = SubqueryAlias("r", table2) -// val joinCondition1 = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan1 = Join(leftPlan, rightPlan, Inner, Some(joinCondition1), JoinHint.NONE) -// leftPlan = SubqueryAlias("l", joinPlan1) -// rightPlan = SubqueryAlias("r", table3) -// val joinCondition2 = EqualTo(UnresolvedAttribute("l.name"), UnresolvedAttribute("r.name")) -// val joinPlan2 = Join(leftPlan, rightPlan, LeftOuter, Some(joinCondition2), JoinHint.NONE) -// leftPlan = SubqueryAlias("l", joinPlan2) -// rightPlan = SubqueryAlias("r", table4) -// val joinPlan3 = Join(leftPlan, rightPlan, Cross, None, JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test complex join: TPC-H Q13") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | SEARCH source = $testTable1 -// | | FIELDS id, name -// | | LEFT OUTER JOIN left = c right = o ON c.custkey = o.custkey $testTable2 -// | | STATS count(o.orderkey) AS o_count BY c.custkey -// | | STATS count(1) AS custdist BY o_count -// | | SORT - custdist, - o_count -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val tableC = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val tableO = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val left = SubqueryAlias( -// "c", -// Project(Seq(UnresolvedAttribute("id"), UnresolvedAttribute("name")), tableC)) -// val right = SubqueryAlias("o", tableO) -// val joinCondition = -// EqualTo(UnresolvedAttribute("o.custkey"), UnresolvedAttribute("c.custkey")) -// val join = Join(left, right, LeftOuter, Some(joinCondition), JoinHint.NONE) -// val groupingExpression1 = Alias(UnresolvedAttribute("c.custkey"), "c.custkey")() -// val aggregateExpressions1 = -// Alias( -// UnresolvedFunction( -// Seq("COUNT"), -// Seq(UnresolvedAttribute("o.orderkey")), -// isDistinct = false), -// "o_count")() -// val agg1 = -// Aggregate(Seq(groupingExpression1), Seq(aggregateExpressions1, groupingExpression1), join) -// val groupingExpression2 = Alias(UnresolvedAttribute("o_count"), "o_count")() -// val aggregateExpressions2 = -// Alias(UnresolvedFunction(Seq("COUNT"), Seq(Literal(1)), isDistinct = false), "custdist")() -// val agg2 = -// Aggregate(Seq(groupingExpression2), Seq(aggregateExpressions2, groupingExpression2), agg1) -// val sort = Sort( -// Seq( -// SortOrder(UnresolvedAttribute("custdist"), Descending), -// SortOrder(UnresolvedAttribute("o_count"), Descending)), -// global = true, -// agg2) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), sort) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test inner join with relation subquery") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| JOIN left = l right = r ON l.id = r.id -// | [ -// | source = $testTable2 -// | | where id > 10 and name = 'abc' -// | | fields id, name -// | | sort id -// | | head 10 -// | ] -// | | stats count(id) as cnt by type -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightSubquery = -// GlobalLimit( -// Literal(10), -// LocalLimit( -// Literal(10), -// Sort( -// Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), -// global = true, -// Project( -// Seq(UnresolvedAttribute("id"), UnresolvedAttribute("name")), -// Filter( -// And( -// GreaterThan(UnresolvedAttribute("id"), Literal(10)), -// EqualTo(UnresolvedAttribute("name"), Literal("abc"))), -// table2))))) -// val rightPlan = SubqueryAlias("r", rightSubquery) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) -// val groupingExpression = Alias(UnresolvedAttribute("type"), "type")() -// val aggregateExpression = Alias( -// UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedAttribute("id")), isDistinct = false), -// "cnt")() -// val aggPlan = -// Aggregate(Seq(groupingExpression), Seq(aggregateExpression, groupingExpression), joinPlan) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), aggPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test left outer join with relation subquery") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1| LEFT JOIN left = l right = r ON l.id = r.id -// | [ -// | source = $testTable2 -// | | where id > 10 and name = 'abc' -// | | fields id, name -// | | sort id -// | | head 10 -// | ] -// | | stats count(id) as cnt by type -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val leftPlan = SubqueryAlias("l", table1) -// val rightSubquery = -// GlobalLimit( -// Literal(10), -// LocalLimit( -// Literal(10), -// Sort( -// Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), -// global = true, -// Project( -// Seq(UnresolvedAttribute("id"), UnresolvedAttribute("name")), -// Filter( -// And( -// GreaterThan(UnresolvedAttribute("id"), Literal(10)), -// EqualTo(UnresolvedAttribute("name"), Literal("abc"))), -// table2))))) -// val rightPlan = SubqueryAlias("r", rightSubquery) -// val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan = Join(leftPlan, rightPlan, LeftOuter, Some(joinCondition), JoinHint.NONE) -// val groupingExpression = Alias(UnresolvedAttribute("type"), "type")() -// val aggregateExpression = Alias( -// UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedAttribute("id")), isDistinct = false), -// "cnt")() -// val aggPlan = -// Aggregate(Seq(groupingExpression), Seq(aggregateExpression, groupingExpression), joinPlan) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), aggPlan) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test multiple joins with relation subquery") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1 -// | | head 10 -// | | inner JOIN left = l right = r ON l.id = r.id -// | [ -// | source = $testTable2 -// | | where id > 10 -// | ] -// | | left JOIN left = l right = r ON l.name = r.name -// | [ -// | source = $testTable3 -// | | fields id -// | ] -// | | cross JOIN left = l right = r -// | [ -// | source = $testTable4 -// | | sort id -// | ] -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3")) -// val table4 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test4")) -// var leftPlan = SubqueryAlias("l", GlobalLimit(Literal(10), LocalLimit(Literal(10), table1))) -// var rightPlan = -// SubqueryAlias("r", Filter(GreaterThan(UnresolvedAttribute("id"), Literal(10)), table2)) -// val joinCondition1 = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) -// val joinPlan1 = Join(leftPlan, rightPlan, Inner, Some(joinCondition1), JoinHint.NONE) -// leftPlan = SubqueryAlias("l", joinPlan1) -// rightPlan = SubqueryAlias("r", Project(Seq(UnresolvedAttribute("id")), table3)) -// val joinCondition2 = EqualTo(UnresolvedAttribute("l.name"), UnresolvedAttribute("r.name")) -// val joinPlan2 = Join(leftPlan, rightPlan, LeftOuter, Some(joinCondition2), JoinHint.NONE) -// leftPlan = SubqueryAlias("l", joinPlan2) -// rightPlan = SubqueryAlias( -// "r", -// Sort(Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), global = true, table4)) -// val joinPlan3 = Join(leftPlan, rightPlan, Cross, None, JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test complex join: TPC-H Q13 with relation subquery") { -// // select -// // c_count, -// // count(*) as custdist -// // from -// // ( -// // select -// // c_custkey, -// // count(o_orderkey) as c_count -// // from -// // customer left outer join orders on -// // c_custkey = o_custkey -// // and o_comment not like '%special%requests%' -// // group by -// // c_custkey -// // ) as c_orders -// // group by -// // c_count -// // order by -// // custdist desc, -// // c_count desc -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | SEARCH source = [ -// | SEARCH source = customer -// | | LEFT OUTER JOIN left = c right = o ON c_custkey = o_custkey -// | [ -// | SEARCH source = orders -// | | WHERE not like(o_comment, '%special%requests%') -// | ] -// | | STATS COUNT(o_orderkey) AS c_count BY c_custkey -// | ] AS c_orders -// | | STATS COUNT(o_orderkey) AS c_count BY c_custkey -// | | STATS COUNT(1) AS custdist BY c_count -// | | SORT - custdist, - c_count -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val tableC = UnresolvedRelation(Seq("customer")) -// val tableO = UnresolvedRelation(Seq("orders")) -// val left = SubqueryAlias("c", tableC) -// val filterNot = Filter( -// Not( -// UnresolvedFunction( -// Seq("like"), -// Seq(UnresolvedAttribute("o_comment"), Literal("%special%requests%")), -// isDistinct = false)), -// tableO) -// val right = SubqueryAlias("o", filterNot) -// val joinCondition = -// EqualTo(UnresolvedAttribute("o_custkey"), UnresolvedAttribute("c_custkey")) -// val join = Join(left, right, LeftOuter, Some(joinCondition), JoinHint.NONE) -// val groupingExpression1 = Alias(UnresolvedAttribute("c_custkey"), "c_custkey")() -// val aggregateExpressions1 = -// Alias( -// UnresolvedFunction( -// Seq("COUNT"), -// Seq(UnresolvedAttribute("o_orderkey")), -// isDistinct = false), -// "c_count")() -// val agg3 = -// Aggregate(Seq(groupingExpression1), Seq(aggregateExpressions1, groupingExpression1), join) -// val subqueryAlias = SubqueryAlias("c_orders", agg3) -// val agg2 = -// Aggregate( -// Seq(groupingExpression1), -// Seq(aggregateExpressions1, groupingExpression1), -// subqueryAlias) -// val groupingExpression2 = Alias(UnresolvedAttribute("c_count"), "c_count")() -// val aggregateExpressions2 = -// Alias(UnresolvedFunction(Seq("COUNT"), Seq(Literal(1)), isDistinct = false), "custdist")() -// val agg1 = -// Aggregate(Seq(groupingExpression2), Seq(aggregateExpressions2, groupingExpression2), agg2) -// val sort = Sort( -// Seq( -// SortOrder(UnresolvedAttribute("custdist"), Descending), -// SortOrder(UnresolvedAttribute("c_count"), Descending)), -// global = true, -// agg1) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), sort) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test multiple joins with table alias") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = table1 as t1 -// | | JOIN ON t1.id = t2.id -// | [ -// | source = table2 as t2 -// | ] -// | | JOIN ON t2.id = t3.id -// | [ -// | source = table3 as t3 -// | ] -// | | JOIN ON t3.id = t4.id -// | [ -// | source = table4 as t4 -// | ] -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("table1")) -// val table2 = UnresolvedRelation(Seq("table2")) -// val table3 = UnresolvedRelation(Seq("table3")) -// val table4 = UnresolvedRelation(Seq("table4")) -// val joinPlan1 = Join( -// SubqueryAlias("t1", table1), -// SubqueryAlias("t2", table2), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.id"), UnresolvedAttribute("t2.id"))), -// JoinHint.NONE) -// val joinPlan2 = Join( -// joinPlan1, -// SubqueryAlias("t3", table3), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t2.id"), UnresolvedAttribute("t3.id"))), -// JoinHint.NONE) -// val joinPlan3 = Join( -// joinPlan2, -// SubqueryAlias("t4", table4), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t3.id"), UnresolvedAttribute("t4.id"))), -// JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test multiple joins with table and subquery alias") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = table1 as t1 -// | | JOIN left = l right = r ON t1.id = t2.id -// | [ -// | source = table2 as t2 -// | ] -// | | JOIN left = l right = r ON t2.id = t3.id -// | [ -// | source = table3 as t3 -// | ] -// | | JOIN left = l right = r ON t3.id = t4.id -// | [ -// | source = table4 as t4 -// | ] -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("table1")) -// val table2 = UnresolvedRelation(Seq("table2")) -// val table3 = UnresolvedRelation(Seq("table3")) -// val table4 = UnresolvedRelation(Seq("table4")) -// val joinPlan1 = Join( -// SubqueryAlias("l", SubqueryAlias("t1", table1)), -// SubqueryAlias("r", SubqueryAlias("t2", table2)), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.id"), UnresolvedAttribute("t2.id"))), -// JoinHint.NONE) -// val joinPlan2 = Join( -// SubqueryAlias("l", joinPlan1), -// SubqueryAlias("r", SubqueryAlias("t3", table3)), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t2.id"), UnresolvedAttribute("t3.id"))), -// JoinHint.NONE) -// val joinPlan3 = Join( -// SubqueryAlias("l", joinPlan2), -// SubqueryAlias("r", SubqueryAlias("t4", table4)), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t3.id"), UnresolvedAttribute("t4.id"))), -// JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test multiple joins without table aliases") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = table1 -// | | JOIN ON table1.id = table2.id table2 -// | | JOIN ON table1.id = table3.id table3 -// | | JOIN ON table2.id = table4.id table4 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("table1")) -// val table2 = UnresolvedRelation(Seq("table2")) -// val table3 = UnresolvedRelation(Seq("table3")) -// val table4 = UnresolvedRelation(Seq("table4")) -// val joinPlan1 = Join( -// table1, -// table2, -// Inner, -// Some(EqualTo(UnresolvedAttribute("table1.id"), UnresolvedAttribute("table2.id"))), -// JoinHint.NONE) -// val joinPlan2 = Join( -// joinPlan1, -// table3, -// Inner, -// Some(EqualTo(UnresolvedAttribute("table1.id"), UnresolvedAttribute("table3.id"))), -// JoinHint.NONE) -// val joinPlan3 = Join( -// joinPlan2, -// table4, -// Inner, -// Some(EqualTo(UnresolvedAttribute("table2.id"), UnresolvedAttribute("table4.id"))), -// JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test multiple joins with part subquery aliases") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = table1 -// | | JOIN left = t1 right = t2 ON t1.name = t2.name table2 -// | | JOIN right = t3 ON t1.name = t3.name table3 -// | | JOIN right = t4 ON t2.name = t4.name table4 -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("table1")) -// val table2 = UnresolvedRelation(Seq("table2")) -// val table3 = UnresolvedRelation(Seq("table3")) -// val table4 = UnresolvedRelation(Seq("table4")) -// val joinPlan1 = Join( -// SubqueryAlias("t1", table1), -// SubqueryAlias("t2", table2), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))), -// JoinHint.NONE) -// val joinPlan2 = Join( -// joinPlan1, -// SubqueryAlias("t3", table3), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))), -// JoinHint.NONE) -// val joinPlan3 = Join( -// joinPlan2, -// SubqueryAlias("t4", table4), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t2.name"), UnresolvedAttribute("t4.name"))), -// JoinHint.NONE) -// val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test multiple joins with self join 1") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1 -// | | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2 -// | | JOIN right = t3 ON t1.name = t3.name $testTable3 -// | | JOIN right = t4 ON t1.name = t4.name $testTable1 -// | | fields t1.name, t2.name, t3.name, t4.name -// | """.stripMargin) -// -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3")) -// val joinPlan1 = Join( -// SubqueryAlias("t1", table1), -// SubqueryAlias("t2", table2), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))), -// JoinHint.NONE) -// val joinPlan2 = Join( -// joinPlan1, -// SubqueryAlias("t3", table3), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))), -// JoinHint.NONE) -// val joinPlan3 = Join( -// joinPlan2, -// SubqueryAlias("t4", table1), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t4.name"))), -// JoinHint.NONE) -// val expectedPlan = Project( -// Seq( -// UnresolvedAttribute("t1.name"), -// UnresolvedAttribute("t2.name"), -// UnresolvedAttribute("t3.name"), -// UnresolvedAttribute("t4.name")), -// joinPlan3) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test multiple joins with self join 2") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1 -// | | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2 -// | | JOIN right = t3 ON t1.name = t3.name $testTable3 -// | | JOIN ON t1.name = t4.name -// | [ -// | source = $testTable1 -// | ] as t4 -// | | fields t1.name, t2.name, t3.name, t4.name -// | """.stripMargin) -// -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3")) -// val joinPlan1 = Join( -// SubqueryAlias("t1", table1), -// SubqueryAlias("t2", table2), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))), -// JoinHint.NONE) -// val joinPlan2 = Join( -// joinPlan1, -// SubqueryAlias("t3", table3), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))), -// JoinHint.NONE) -// val joinPlan3 = Join( -// joinPlan2, -// SubqueryAlias("t4", table1), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t4.name"))), -// JoinHint.NONE) -// val expectedPlan = Project( -// Seq( -// UnresolvedAttribute("t1.name"), -// UnresolvedAttribute("t2.name"), -// UnresolvedAttribute("t3.name"), -// UnresolvedAttribute("t4.name")), -// joinPlan3) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } -// -// test("test side alias will override the subquery alias") { -// val context = new CatalystPlanContext -// val logPlan = plan( -// pplParser, -// s""" -// | source = $testTable1 -// | | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = $testTable2 as ttt ] as tt -// | | fields t1.name, t2.name -// | """.stripMargin) -// val logicalPlan = planTransformer.visit(logPlan, context) -// val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) -// val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) -// val joinPlan1 = Join( -// SubqueryAlias("t1", table1), -// SubqueryAlias("t2", SubqueryAlias("tt", SubqueryAlias("ttt", table2))), -// Inner, -// Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))), -// JoinHint.NONE) -// val expectedPlan = -// Project(Seq(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name")), joinPlan1) -// comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) -// } + test("test two-tables inner join: join condition with table names") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| JOIN left = l right = r ON $testTable1.id = $testTable2.id $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = + EqualTo(UnresolvedAttribute(s"$testTable1.id"), UnresolvedAttribute(s"$testTable2.id")) + val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test inner join: join condition without prefix") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| JOIN left = l right = r ON id = name $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = + EqualTo(UnresolvedAttribute("id"), UnresolvedAttribute("name")) + val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test inner join: join condition with aliases and predicates") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| JOIN left = l right = r ON l.id = r.id AND l.count > 10 AND lower(r.name) = 'hello' $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = And( + And( + EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")), + EqualTo( + Literal("hello"), + UnresolvedFunction.apply( + "lower", + Seq(UnresolvedAttribute("r.name")), + isDistinct = false))), + LessThan(Literal(10), UnresolvedAttribute("l.count"))) + val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test inner join: join condition with table names and predicates") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| INNER JOIN left = l right = r ON $testTable1.id = $testTable2.id AND $testTable1.count > 10 AND lower($testTable2.name) = 'hello' $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = And( + And( + EqualTo(UnresolvedAttribute(s"$testTable1.id"), UnresolvedAttribute(s"$testTable2.id")), + EqualTo( + Literal("hello"), + UnresolvedFunction.apply( + "lower", + Seq(UnresolvedAttribute(s"$testTable2.name")), + isDistinct = false))), + LessThan(Literal(10), UnresolvedAttribute(s"$testTable1.count"))) + val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test left outer join") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| LEFT OUTER JOIN left = l right = r ON l.id = r.id $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan = Join(leftPlan, rightPlan, LeftOuter, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test right outer join") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| RIGHT JOIN left = l right = r ON l.id = r.id $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan = Join(leftPlan, rightPlan, RightOuter, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test left semi join") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| LEFT SEMI JOIN left = l right = r ON l.id = r.id $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan = Join(leftPlan, rightPlan, LeftSemi, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test left anti join") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| LEFT ANTI JOIN left = l right = r ON l.id = r.id $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan = Join(leftPlan, rightPlan, LeftAnti, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test full outer join") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| FULL JOIN left = l right = r ON l.id = r.id $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan = Join(leftPlan, rightPlan, FullOuter, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test cross join") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| CROSS JOIN left = l right = r $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinPlan = Join(leftPlan, rightPlan, Cross, None, JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test cross join with join condition") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| CROSS JOIN left = l right = r ON l.id = r.id $testTable2 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightPlan = SubqueryAlias("r", table2) + val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan = Join(leftPlan, rightPlan, Cross, Some(joinCondition), JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test multiple joins") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1 + | | inner JOIN left = l right = r ON l.id = r.id $testTable2 + | | left JOIN left = l right = r ON l.name = r.name $testTable3 + | | cross JOIN left = l right = r $testTable4 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3")) + val table4 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test4")) + var leftPlan = SubqueryAlias("l", table1) + var rightPlan = SubqueryAlias("r", table2) + val joinCondition1 = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan1 = Join(leftPlan, rightPlan, Inner, Some(joinCondition1), JoinHint.NONE) + leftPlan = SubqueryAlias("l", joinPlan1) + rightPlan = SubqueryAlias("r", table3) + val joinCondition2 = EqualTo(UnresolvedAttribute("l.name"), UnresolvedAttribute("r.name")) + val joinPlan2 = Join(leftPlan, rightPlan, LeftOuter, Some(joinCondition2), JoinHint.NONE) + leftPlan = SubqueryAlias("l", joinPlan2) + rightPlan = SubqueryAlias("r", table4) + val joinPlan3 = Join(leftPlan, rightPlan, Cross, None, JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test complex join: TPC-H Q13") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | SEARCH source = $testTable1 + | | FIELDS id, name + | | LEFT OUTER JOIN left = c right = o ON c.custkey = o.custkey $testTable2 + | | STATS count(o.orderkey) AS o_count BY c.custkey + | | STATS count(1) AS custdist BY o_count + | | SORT - custdist, - o_count + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val tableC = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val tableO = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val left = SubqueryAlias( + "c", + Project(Seq(UnresolvedAttribute("id"), UnresolvedAttribute("name")), tableC)) + val right = SubqueryAlias("o", tableO) + val joinCondition = + EqualTo(UnresolvedAttribute("o.custkey"), UnresolvedAttribute("c.custkey")) + val join = Join(left, right, LeftOuter, Some(joinCondition), JoinHint.NONE) + val groupingExpression1 = Alias(UnresolvedAttribute("c.custkey"), "c.custkey")() + val aggregateExpressions1 = + Alias( + UnresolvedFunction( + Seq("COUNT"), + Seq(UnresolvedAttribute("o.orderkey")), + isDistinct = false), + "o_count")() + val agg1 = + Aggregate(Seq(groupingExpression1), Seq(aggregateExpressions1, groupingExpression1), join) + val groupingExpression2 = Alias(UnresolvedAttribute("o_count"), "o_count")() + val aggregateExpressions2 = + Alias(UnresolvedFunction(Seq("COUNT"), Seq(Literal(1)), isDistinct = false), "custdist")() + val agg2 = + Aggregate(Seq(groupingExpression2), Seq(aggregateExpressions2, groupingExpression2), agg1) + val sort = Sort( + Seq( + SortOrder(UnresolvedAttribute("custdist"), Descending), + SortOrder(UnresolvedAttribute("o_count"), Descending)), + global = true, + agg2) + val expectedPlan = Project(Seq(UnresolvedStar(None)), sort) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test inner join with relation subquery") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| JOIN left = l right = r ON l.id = r.id + | [ + | source = $testTable2 + | | where id > 10 and name = 'abc' + | | fields id, name + | | sort id + | | head 10 + | ] + | | stats count(id) as cnt by type + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightSubquery = + GlobalLimit( + Literal(10), + LocalLimit( + Literal(10), + Sort( + Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), + global = true, + Project( + Seq(UnresolvedAttribute("id"), UnresolvedAttribute("name")), + Filter( + And( + GreaterThan(UnresolvedAttribute("id"), Literal(10)), + EqualTo(UnresolvedAttribute("name"), Literal("abc"))), + table2))))) + val rightPlan = SubqueryAlias("r", rightSubquery) + val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE) + val groupingExpression = Alias(UnresolvedAttribute("type"), "type")() + val aggregateExpression = Alias( + UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedAttribute("id")), isDistinct = false), + "cnt")() + val aggPlan = + Aggregate(Seq(groupingExpression), Seq(aggregateExpression, groupingExpression), joinPlan) + val expectedPlan = Project(Seq(UnresolvedStar(None)), aggPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test left outer join with relation subquery") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1| LEFT JOIN left = l right = r ON l.id = r.id + | [ + | source = $testTable2 + | | where id > 10 and name = 'abc' + | | fields id, name + | | sort id + | | head 10 + | ] + | | stats count(id) as cnt by type + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val leftPlan = SubqueryAlias("l", table1) + val rightSubquery = + GlobalLimit( + Literal(10), + LocalLimit( + Literal(10), + Sort( + Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), + global = true, + Project( + Seq(UnresolvedAttribute("id"), UnresolvedAttribute("name")), + Filter( + And( + GreaterThan(UnresolvedAttribute("id"), Literal(10)), + EqualTo(UnresolvedAttribute("name"), Literal("abc"))), + table2))))) + val rightPlan = SubqueryAlias("r", rightSubquery) + val joinCondition = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan = Join(leftPlan, rightPlan, LeftOuter, Some(joinCondition), JoinHint.NONE) + val groupingExpression = Alias(UnresolvedAttribute("type"), "type")() + val aggregateExpression = Alias( + UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedAttribute("id")), isDistinct = false), + "cnt")() + val aggPlan = + Aggregate(Seq(groupingExpression), Seq(aggregateExpression, groupingExpression), joinPlan) + val expectedPlan = Project(Seq(UnresolvedStar(None)), aggPlan) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test multiple joins with relation subquery") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1 + | | head 10 + | | inner JOIN left = l right = r ON l.id = r.id + | [ + | source = $testTable2 + | | where id > 10 + | ] + | | left JOIN left = l right = r ON l.name = r.name + | [ + | source = $testTable3 + | | fields id + | ] + | | cross JOIN left = l right = r + | [ + | source = $testTable4 + | | sort id + | ] + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3")) + val table4 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test4")) + var leftPlan = SubqueryAlias("l", GlobalLimit(Literal(10), LocalLimit(Literal(10), table1))) + var rightPlan = + SubqueryAlias("r", Filter(GreaterThan(UnresolvedAttribute("id"), Literal(10)), table2)) + val joinCondition1 = EqualTo(UnresolvedAttribute("l.id"), UnresolvedAttribute("r.id")) + val joinPlan1 = Join(leftPlan, rightPlan, Inner, Some(joinCondition1), JoinHint.NONE) + leftPlan = SubqueryAlias("l", joinPlan1) + rightPlan = SubqueryAlias("r", Project(Seq(UnresolvedAttribute("id")), table3)) + val joinCondition2 = EqualTo(UnresolvedAttribute("l.name"), UnresolvedAttribute("r.name")) + val joinPlan2 = Join(leftPlan, rightPlan, LeftOuter, Some(joinCondition2), JoinHint.NONE) + leftPlan = SubqueryAlias("l", joinPlan2) + rightPlan = SubqueryAlias( + "r", + Sort(Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), global = true, table4)) + val joinPlan3 = Join(leftPlan, rightPlan, Cross, None, JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test complex join: TPC-H Q13 with relation subquery") { + // select + // c_count, + // count(*) as custdist + // from + // ( + // select + // c_custkey, + // count(o_orderkey) as c_count + // from + // customer left outer join orders on + // c_custkey = o_custkey + // and o_comment not like '%special%requests%' + // group by + // c_custkey + // ) as c_orders + // group by + // c_count + // order by + // custdist desc, + // c_count desc + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | SEARCH source = [ + | SEARCH source = customer + | | LEFT OUTER JOIN left = c right = o ON c_custkey = o_custkey + | [ + | SEARCH source = orders + | | WHERE not like(o_comment, '%special%requests%') + | ] + | | STATS COUNT(o_orderkey) AS c_count BY c_custkey + | ] AS c_orders + | | STATS COUNT(o_orderkey) AS c_count BY c_custkey + | | STATS COUNT(1) AS custdist BY c_count + | | SORT - custdist, - c_count + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val tableC = UnresolvedRelation(Seq("customer")) + val tableO = UnresolvedRelation(Seq("orders")) + val left = SubqueryAlias("c", tableC) + val filterNot = Filter( + Not( + UnresolvedFunction( + Seq("like"), + Seq(UnresolvedAttribute("o_comment"), Literal("%special%requests%")), + isDistinct = false)), + tableO) + val right = SubqueryAlias("o", filterNot) + val joinCondition = + EqualTo(UnresolvedAttribute("o_custkey"), UnresolvedAttribute("c_custkey")) + val join = Join(left, right, LeftOuter, Some(joinCondition), JoinHint.NONE) + val groupingExpression1 = Alias(UnresolvedAttribute("c_custkey"), "c_custkey")() + val aggregateExpressions1 = + Alias( + UnresolvedFunction( + Seq("COUNT"), + Seq(UnresolvedAttribute("o_orderkey")), + isDistinct = false), + "c_count")() + val agg3 = + Aggregate(Seq(groupingExpression1), Seq(aggregateExpressions1, groupingExpression1), join) + val subqueryAlias = SubqueryAlias("c_orders", agg3) + val agg2 = + Aggregate( + Seq(groupingExpression1), + Seq(aggregateExpressions1, groupingExpression1), + subqueryAlias) + val groupingExpression2 = Alias(UnresolvedAttribute("c_count"), "c_count")() + val aggregateExpressions2 = + Alias(UnresolvedFunction(Seq("COUNT"), Seq(Literal(1)), isDistinct = false), "custdist")() + val agg1 = + Aggregate(Seq(groupingExpression2), Seq(aggregateExpressions2, groupingExpression2), agg2) + val sort = Sort( + Seq( + SortOrder(UnresolvedAttribute("custdist"), Descending), + SortOrder(UnresolvedAttribute("c_count"), Descending)), + global = true, + agg1) + val expectedPlan = Project(Seq(UnresolvedStar(None)), sort) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test multiple joins with table alias") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = table1 as t1 + | | JOIN ON t1.id = t2.id + | [ + | source = table2 as t2 + | ] + | | JOIN ON t2.id = t3.id + | [ + | source = table3 as t3 + | ] + | | JOIN ON t3.id = t4.id + | [ + | source = table4 as t4 + | ] + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("table1")) + val table2 = UnresolvedRelation(Seq("table2")) + val table3 = UnresolvedRelation(Seq("table3")) + val table4 = UnresolvedRelation(Seq("table4")) + val joinPlan1 = Join( + SubqueryAlias("t1", table1), + SubqueryAlias("t2", table2), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.id"), UnresolvedAttribute("t2.id"))), + JoinHint.NONE) + val joinPlan2 = Join( + joinPlan1, + SubqueryAlias("t3", table3), + Inner, + Some(EqualTo(UnresolvedAttribute("t2.id"), UnresolvedAttribute("t3.id"))), + JoinHint.NONE) + val joinPlan3 = Join( + joinPlan2, + SubqueryAlias("t4", table4), + Inner, + Some(EqualTo(UnresolvedAttribute("t3.id"), UnresolvedAttribute("t4.id"))), + JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test multiple joins with table and subquery alias") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = table1 as t1 + | | JOIN left = l right = r ON t1.id = t2.id + | [ + | source = table2 as t2 + | ] + | | JOIN left = l right = r ON t2.id = t3.id + | [ + | source = table3 as t3 + | ] + | | JOIN left = l right = r ON t3.id = t4.id + | [ + | source = table4 as t4 + | ] + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("table1")) + val table2 = UnresolvedRelation(Seq("table2")) + val table3 = UnresolvedRelation(Seq("table3")) + val table4 = UnresolvedRelation(Seq("table4")) + val joinPlan1 = Join( + SubqueryAlias("l", SubqueryAlias("t1", table1)), + SubqueryAlias("r", SubqueryAlias("t2", table2)), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.id"), UnresolvedAttribute("t2.id"))), + JoinHint.NONE) + val joinPlan2 = Join( + SubqueryAlias("l", joinPlan1), + SubqueryAlias("r", SubqueryAlias("t3", table3)), + Inner, + Some(EqualTo(UnresolvedAttribute("t2.id"), UnresolvedAttribute("t3.id"))), + JoinHint.NONE) + val joinPlan3 = Join( + SubqueryAlias("l", joinPlan2), + SubqueryAlias("r", SubqueryAlias("t4", table4)), + Inner, + Some(EqualTo(UnresolvedAttribute("t3.id"), UnresolvedAttribute("t4.id"))), + JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test multiple joins without table aliases") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = table1 + | | JOIN ON table1.id = table2.id table2 + | | JOIN ON table1.id = table3.id table3 + | | JOIN ON table2.id = table4.id table4 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("table1")) + val table2 = UnresolvedRelation(Seq("table2")) + val table3 = UnresolvedRelation(Seq("table3")) + val table4 = UnresolvedRelation(Seq("table4")) + val joinPlan1 = Join( + table1, + table2, + Inner, + Some(EqualTo(UnresolvedAttribute("table1.id"), UnresolvedAttribute("table2.id"))), + JoinHint.NONE) + val joinPlan2 = Join( + joinPlan1, + table3, + Inner, + Some(EqualTo(UnresolvedAttribute("table1.id"), UnresolvedAttribute("table3.id"))), + JoinHint.NONE) + val joinPlan3 = Join( + joinPlan2, + table4, + Inner, + Some(EqualTo(UnresolvedAttribute("table2.id"), UnresolvedAttribute("table4.id"))), + JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test multiple joins with part subquery aliases") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = table1 + | | JOIN left = t1 right = t2 ON t1.name = t2.name table2 + | | JOIN right = t3 ON t1.name = t3.name table3 + | | JOIN right = t4 ON t2.name = t4.name table4 + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("table1")) + val table2 = UnresolvedRelation(Seq("table2")) + val table3 = UnresolvedRelation(Seq("table3")) + val table4 = UnresolvedRelation(Seq("table4")) + val joinPlan1 = Join( + SubqueryAlias("t1", table1), + SubqueryAlias("t2", table2), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))), + JoinHint.NONE) + val joinPlan2 = Join( + joinPlan1, + SubqueryAlias("t3", table3), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))), + JoinHint.NONE) + val joinPlan3 = Join( + joinPlan2, + SubqueryAlias("t4", table4), + Inner, + Some(EqualTo(UnresolvedAttribute("t2.name"), UnresolvedAttribute("t4.name"))), + JoinHint.NONE) + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan3) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test multiple joins with self join 1") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1 + | | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2 + | | JOIN right = t3 ON t1.name = t3.name $testTable3 + | | JOIN right = t4 ON t1.name = t4.name $testTable1 + | | fields t1.name, t2.name, t3.name, t4.name + | """.stripMargin) + + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3")) + val joinPlan1 = Join( + SubqueryAlias("t1", table1), + SubqueryAlias("t2", table2), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))), + JoinHint.NONE) + val joinPlan2 = Join( + joinPlan1, + SubqueryAlias("t3", table3), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))), + JoinHint.NONE) + val joinPlan3 = Join( + joinPlan2, + SubqueryAlias("t4", table1), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t4.name"))), + JoinHint.NONE) + val expectedPlan = Project( + Seq( + UnresolvedAttribute("t1.name"), + UnresolvedAttribute("t2.name"), + UnresolvedAttribute("t3.name"), + UnresolvedAttribute("t4.name")), + joinPlan3) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test multiple joins with self join 2") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1 + | | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2 + | | JOIN right = t3 ON t1.name = t3.name $testTable3 + | | JOIN ON t1.name = t4.name + | [ + | source = $testTable1 + | ] as t4 + | | fields t1.name, t2.name, t3.name, t4.name + | """.stripMargin) + + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3")) + val joinPlan1 = Join( + SubqueryAlias("t1", table1), + SubqueryAlias("t2", table2), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))), + JoinHint.NONE) + val joinPlan2 = Join( + joinPlan1, + SubqueryAlias("t3", table3), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))), + JoinHint.NONE) + val joinPlan3 = Join( + joinPlan2, + SubqueryAlias("t4", table1), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t4.name"))), + JoinHint.NONE) + val expectedPlan = Project( + Seq( + UnresolvedAttribute("t1.name"), + UnresolvedAttribute("t2.name"), + UnresolvedAttribute("t3.name"), + UnresolvedAttribute("t4.name")), + joinPlan3) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test side alias will override the subquery alias") { + val context = new CatalystPlanContext + val logPlan = plan( + pplParser, + s""" + | source = $testTable1 + | | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = $testTable2 as ttt ] as tt + | | fields t1.name, t2.name + | """.stripMargin) + val logicalPlan = planTransformer.visit(logPlan, context) + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + val joinPlan1 = Join( + SubqueryAlias("t1", table1), + SubqueryAlias("t2", SubqueryAlias("tt", SubqueryAlias("ttt", table2))), + Inner, + Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))), + JoinHint.NONE) + val expectedPlan = + Project(Seq(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name")), joinPlan1) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } }