diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala index 3ab4228be..293414aa5 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala @@ -27,6 +27,7 @@ class FlintSparkPPLProjectStatementITSuite private val t2 = "`spark_catalog`.default.`flint_ppl_test2`" private val t3 = "spark_catalog.`default`.`flint_ppl_test3`" private val t4 = "`spark_catalog`.`default`.flint_ppl_test4" + private val viewName = "simpleView" override def beforeAll(): Unit = { super.beforeAll() @@ -41,6 +42,7 @@ class FlintSparkPPLProjectStatementITSuite protected override def afterEach(): Unit = { super.afterEach() + sql(s"DROP TABLE $viewName") // Stop all streaming jobs if any spark.streams.active.foreach { job => job.stop() @@ -73,7 +75,7 @@ class FlintSparkPPLProjectStatementITSuite test("project using csv") { val frame = sql(s""" - | project simpleView using csv | source = $testTable | where state != 'California' | fields name + | project $viewName using csv | source = $testTable | where state != 'California' | fields name | """.stripMargin) // Retrieve the logical plan @@ -81,7 +83,7 @@ class FlintSparkPPLProjectStatementITSuite frame.collect() // verify new view was created correctly val results = sql(s""" - | source = simpleView + | source = $viewName | """.stripMargin).collect() // Define the expected results @@ -99,7 +101,7 @@ class FlintSparkPPLProjectStatementITSuite Filter(Not(EqualTo(UnresolvedAttribute("state"), Literal("California"))), relation) val expectedPlan: LogicalPlan = CreateTableAsSelect( - UnresolvedIdentifier(Seq("simpleView")), + UnresolvedIdentifier(Seq(viewName)), Seq(), Project(Seq(UnresolvedAttribute("name")), filter), UnresolvedTableSpec( @@ -119,7 +121,7 @@ class FlintSparkPPLProjectStatementITSuite test("project using csv partition by age") { val frame = sql(s""" - | project simpleView using csv partitioned by ('age') | source = $testTable | where state != 'California' | fields name, age + | project $viewName using csv partitioned by (age) | source = $testTable | where state != 'California' | fields name, age | """.stripMargin) // Retrieve the logical plan @@ -127,7 +129,7 @@ class FlintSparkPPLProjectStatementITSuite frame.collect() // verify new view was created correctly val results = sql(s""" - | source = simpleView + | source = $viewName | """.stripMargin).collect() // Define the expected results @@ -145,7 +147,7 @@ class FlintSparkPPLProjectStatementITSuite Filter(Not(EqualTo(UnresolvedAttribute("state"), Literal("California"))), relation) val expectedPlan: LogicalPlan = CreateTableAsSelect( - UnresolvedIdentifier(Seq("simpleView")), + UnresolvedIdentifier(Seq(viewName)), // Seq(IdentityTransform.apply(FieldReference.apply("age"))), Seq(), Project(Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), filter), @@ -168,13 +170,13 @@ class FlintSparkPPLProjectStatementITSuite test("project using csv partition by state and country") { val frame = sql(s""" - |project simpleView using csv partitioned by ('state', 'country') | source = $testTable | dedup name | fields name, state, country + |project $viewName using csv partitioned by (state, country) | source = $testTable | dedup name | fields name, state, country | """.stripMargin) frame.collect() // verify new view was created correctly val results = sql(s""" - | source = simpleView + | source = $viewName | """.stripMargin).collect() // Define the expected results @@ -188,7 +190,7 @@ class FlintSparkPPLProjectStatementITSuite // verify new view was created correctly val describe = sql(s""" - | describe simpleView + | describe $viewName | """.stripMargin).collect() // Define the expected results @@ -223,7 +225,7 @@ class FlintSparkPPLProjectStatementITSuite Deduplicate(Seq(nameAttribute), Filter(IsNotNull(nameAttribute), relation)) val expectedPlan: LogicalPlan = CreateTableAsSelect( - UnresolvedIdentifier(Seq("simpleView")), + UnresolvedIdentifier(Seq(viewName)), // Seq(IdentityTransform.apply(FieldReference.apply("age")), IdentityTransform.apply(FieldReference.apply("state")), Seq(), Project(Seq(UnresolvedAttribute("name"), UnresolvedAttribute("state"), UnresolvedAttribute("country")), dedup), @@ -246,13 +248,13 @@ class FlintSparkPPLProjectStatementITSuite test("project using parquet partition by state & country") { val frame = sql(s""" - |project simpleView using parquet partitioned by ('state', 'country') | source = $testTable | dedup name | fields name, state, country + |project $viewName using parquet partitioned by (state, country) | source = $testTable | dedup name | fields name, state, country | """.stripMargin) frame.collect() // verify new view was created correctly val results = sql(s""" - | source = simpleView + | source = $viewName | """.stripMargin).collect() // Define the expected results @@ -266,7 +268,7 @@ class FlintSparkPPLProjectStatementITSuite // verify new view was created correctly val describe = sql(s""" - | describe simpleView + | describe $viewName | """.stripMargin).collect() // Define the expected results @@ -301,7 +303,7 @@ class FlintSparkPPLProjectStatementITSuite Deduplicate(Seq(nameAttribute), Filter(IsNotNull(nameAttribute), relation)) val expectedPlan: LogicalPlan = CreateTableAsSelect( - UnresolvedIdentifier(Seq("simpleView")), + UnresolvedIdentifier(Seq(viewName)), // Seq(IdentityTransform.apply(FieldReference.apply("age")), IdentityTransform.apply(FieldReference.apply("state")), Seq(), Project(Seq(UnresolvedAttribute("name"), UnresolvedAttribute("state"), UnresolvedAttribute("country")), dedup), diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 8cc40ac23..65620afe6 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -195,7 +195,7 @@ grokCommand ; projectCommand - : PROJECT (IF NOT EXISTS)? tableQualifiedName (USING datasourceValues)? (OPTIONS options=tablePropertyList)? (PARTITIONED BY partitionColumnNames=identifierList)? locationSpec? + : PROJECT (IF NOT EXISTS)? tableQualifiedName (USING datasourceValues)? (OPTIONS options=tablePropertyList)? (PARTITIONED BY partitionColumnNames=identifierSeq)? locationSpec? ; locationSpec @@ -229,14 +229,6 @@ tablePropertyValue | STRING ; -identifierList - : '(' identifierSeq ')' - ; - -identifierSeq - : stringLiteral (',' stringLiteral)* - ; - parseCommand : PARSE (source_field = expression) (pattern = stringLiteral) ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index ef8cb359b..72480ca79 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -462,16 +462,7 @@ private List timestampFunctionArguments( visitFunctionArg(ctx.timestampFunction().secondArg)); return args; } - - @Override - public UnresolvedExpression visitIdentifierList(OpenSearchPPLParser.IdentifierListContext ctx) { - OpenSearchPPLParser.IdentifierSeqContext seq = ctx.identifierSeq(); - return new FieldList(seq.stringLiteral().stream() - .map(p-> new Field(new QualifiedName((Literal) visitStringLiteral(p)))) - .collect(toList())); - } - - + @Override public UnresolvedExpression visitDatasourceValues(OpenSearchPPLParser.DatasourceValuesContext ctx) { if(ctx.JSON()!=null) return new Literal(DataSourceType.JSON, DataType.STRING); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ProjectionUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ProjectionUtils.java index 8ec09f6d4..2a2c54c02 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ProjectionUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ProjectionUtils.java @@ -14,6 +14,7 @@ import org.apache.spark.sql.connector.expressions.IdentityTransform; import org.apache.spark.sql.connector.expressions.Transform; import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.AttributeList; import org.opensearch.sql.ast.expression.FieldList; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.statement.ProjectStatement; @@ -50,7 +51,7 @@ static CreateTableAsSelect visitProject(LogicalPlan plan, ProjectStatement node, UnresolvedIdentifier name = new UnresolvedIdentifier(seq(node.getTableQualifiedName().getParts()), false); UnresolvedTableSpec tableSpec = new UnresolvedTableSpec(map(emptyMap()), option(using), new OptionList(seq()), Option.empty(), Option.empty(), Option.empty(), false); Seq partitioning = partitionColumns.isPresent() ? - seq(((FieldList) partitionColumns.get()).getFieldList().stream().map(f -> new IdentityTransform(new FieldReference(seq(f.getField().getParts())))).collect(toList())) : seq(); + seq(((AttributeList) partitionColumns.get()).getAttrList().stream().map(f -> new IdentityTransform(new FieldReference(seq(f.toString())))).collect(toList())) : seq(); return new CreateTableAsSelect(name, partitioning, plan, tableSpec, map(emptyMap()), !node.isOverride(), false); } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanProjectQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanProjectQueriesTranslatorTestSuite.scala index a944e16b8..2ab58fce3 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanProjectQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanProjectQueriesTranslatorTestSuite.scala @@ -66,7 +66,7 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite val logPlan = planTransformer.visit( plan( pplParser, - "project if not exists simpleView using csv partitioned by ('age') | source = table | where state != 'California' "), + "project if not exists simpleView using csv partitioned by (age) | source = table | where state != 'California' "), context) // Define the expected logical plan @@ -102,7 +102,7 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite val logPlan = planTransformer.visit( plan( pplParser, - "project if not exists simpleView using csv partitioned by ('age', 'country') | source = table | where state != 'California' "), + "project if not exists simpleView using csv partitioned by (age, country) | source = table | where state != 'California' "), context) // Define the expected logical plan @@ -131,342 +131,4 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite compareByString(logPlan) == expectedPlan.toString ) } - - test("test simple search with escaped table name") { - // if successful build ppl logical plan and translate to catalyst logical plan - val context = new CatalystPlanContext - val logPlan = planTransformer.visit(plan(pplParser, "source=`table`"), context) - - val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) - val expectedPlan = Project(projectList, UnresolvedRelation(Seq("table"))) - comparePlans(expectedPlan, logPlan, false) - } - - test("test simple search with schema.table and no explicit fields (defaults to all fields)") { - // if successful build ppl logical plan and translate to catalyst logical plan - val context = new CatalystPlanContext - val logPlan = planTransformer.visit(plan(pplParser, "source=schema.table"), context) - - val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) - val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table"))) - comparePlans(expectedPlan, logPlan, false) - - } - - test("test simple search with schema.table and one field projected") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit(plan(pplParser, "source=schema.table | fields A"), context) - - val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A")) - val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table"))) - comparePlans(expectedPlan, logPlan, false) - } - - test("create ppl simple query with nested field 1 range filter test") { - val context = new CatalystPlanContext - val logicalPlan = - planTransformer.visit( - plan( - pplParser, - "source = spark_catalog.default.flint_ppl_test | where struct_col.field2 > 200 | sort - struct_col.field2 | fields int_col, struct_col.field2"), - context) - - // Define the expected logical plan - val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) - // Define the expected logical plan components - val filterPlan = - Filter(GreaterThan(UnresolvedAttribute("struct_col.field2"), Literal(200)), table) - val sortedPlan: LogicalPlan = - Sort( - Seq(SortOrder(UnresolvedAttribute("struct_col.field2"), Descending)), - global = true, - filterPlan) - val expectedPlan = - Project( - Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")), - sortedPlan) - - // Compare the two plans - assert(compareByString(expectedPlan) === compareByString(logicalPlan)) - } - - test("test simple search with schema.table and one nested field projected") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit(plan(pplParser, "source=schema.table | fields A.nested"), context) - - val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A.nested")) - val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table"))) - comparePlans(expectedPlan, logPlan, false) - } - - test("test simple search with only one table with one field projected") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit(plan(pplParser, "source=table | fields A"), context) - - val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A")) - val expectedPlan = Project(projectList, UnresolvedRelation(Seq("table"))) - comparePlans(expectedPlan, logPlan, false) - } - - test("test simple search with only one table with two fields projected") { - val context = new CatalystPlanContext - val logPlan = planTransformer.visit(plan(pplParser, "source=t | fields A, B"), context) - - val table = UnresolvedRelation(Seq("t")) - val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - val expectedPlan = Project(projectList, table) - comparePlans(expectedPlan, logPlan, false) - } - - test("test simple search with one table with two fields projected sorted by one field") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit(plan(pplParser, "source=t | sort A | fields A, B"), context) - - val table = UnresolvedRelation(Seq("t")) - val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - // Sort by A ascending - val sortOrder = Seq(SortOrder(UnresolvedAttribute("A"), Ascending)) - val sorted = Sort(sortOrder, true, table) - val expectedPlan = Project(projectList, sorted) - - comparePlans(expectedPlan, logPlan, false) - } - - test("test simple search with one table with two fields projected sorted by one nested field") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit( - plan(pplParser, "source=t | sort A.nested | fields A.nested, B"), - context) - - val table = UnresolvedRelation(Seq("t")) - val projectList = Seq(UnresolvedAttribute("A.nested"), UnresolvedAttribute("B")) - // Sort by A ascending - val sortOrder = Seq(SortOrder(UnresolvedAttribute("A.nested"), Ascending)) - val sorted = Sort(sortOrder, true, table) - val expectedPlan = Project(projectList, sorted) - - comparePlans(expectedPlan, logPlan, false) - } - - test( - "test simple search with only one table with two fields with head (limit ) command projected") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit(plan(pplParser, "source=t | fields A, B | head 5"), context) - - val table = UnresolvedRelation(Seq("t")) - val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - val planWithLimit = - GlobalLimit(Literal(5), LocalLimit(Literal(5), Project(projectList, table))) - val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit) - comparePlans(expectedPlan, logPlan, false) - } - - test( - "test simple search with only one table with two fields with head (limit ) command projected sorted by one descending field") { - val context = new CatalystPlanContext - val logPlan = planTransformer.visit( - plan(pplParser, "source=t | sort - A | fields A, B | head 5"), - context) - - val table = UnresolvedRelation(Seq("t")) - val sortOrder = Seq(SortOrder(UnresolvedAttribute("A"), Descending)) - val sorted = Sort(sortOrder, true, table) - val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - val projectAB = Project(projectList, sorted) - - val planWithLimit = GlobalLimit(Literal(5), LocalLimit(Literal(5), projectAB)) - val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit) - comparePlans(expectedPlan, logPlan, false) - } - - test( - "Search multiple tables - translated into union call - fields expected to exist in both tables ") { - val context = new CatalystPlanContext - val logPlan = planTransformer.visit( - plan(pplParser, "search source = table1, table2 | fields A, B"), - context) - - val table1 = UnresolvedRelation(Seq("table1")) - val table2 = UnresolvedRelation(Seq("table2")) - - val allFields1 = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - val allFields2 = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - - val projectedTable1 = Project(allFields1, table1) - val projectedTable2 = Project(allFields2, table2) - - val expectedPlan = - Union(Seq(projectedTable1, projectedTable2), byName = true, allowMissingCol = true) - - comparePlans(expectedPlan, logPlan, false) - } - - test("Search multiple tables - translated into union call with fields") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit(plan(pplParser, "source = table1, table2 "), context) - - val table1 = UnresolvedRelation(Seq("table1")) - val table2 = UnresolvedRelation(Seq("table2")) - - val allFields1 = UnresolvedStar(None) - val allFields2 = UnresolvedStar(None) - - val projectedTable1 = Project(Seq(allFields1), table1) - val projectedTable2 = Project(Seq(allFields2), table2) - - val expectedPlan = - Union(Seq(projectedTable1, projectedTable2), byName = true, allowMissingCol = true) - - comparePlans(expectedPlan, logPlan, false) - } - - test("Search multiple tables - with table alias") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit( - plan( - pplParser, - """ - | source=table1, table2, table3 as t - | | where t.name = 'Molly' - |""".stripMargin), - context) - - val table1 = UnresolvedRelation(Seq("table1")) - val table2 = UnresolvedRelation(Seq("table2")) - val table3 = UnresolvedRelation(Seq("table3")) - val star = UnresolvedStar(None) - val plan1 = Project( - Seq(star), - Filter( - EqualTo(UnresolvedAttribute("t.name"), Literal("Molly")), - SubqueryAlias("t", table1))) - val plan2 = Project( - Seq(star), - Filter( - EqualTo(UnresolvedAttribute("t.name"), Literal("Molly")), - SubqueryAlias("t", table2))) - val plan3 = Project( - Seq(star), - Filter( - EqualTo(UnresolvedAttribute("t.name"), Literal("Molly")), - SubqueryAlias("t", table3))) - - val expectedPlan = - Union(Seq(plan1, plan2, plan3), byName = true, allowMissingCol = true) - - comparePlans(expectedPlan, logPlan, false) - } - - test("test fields + field list") { - val context = new CatalystPlanContext - val logPlan = planTransformer.visit( - plan(pplParser, "source=t | sort - A | fields + A, B | head 5"), - context) - - val table = UnresolvedRelation(Seq("t")) - val sortOrder = Seq(SortOrder(UnresolvedAttribute("A"), Descending)) - val sorted = Sort(sortOrder, true, table) - val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - val projection = Project(projectList, sorted) - - val planWithLimit = GlobalLimit(Literal(5), LocalLimit(Literal(5), projection)) - val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit) - comparePlans(expectedPlan, logPlan, false) - } - - test("test fields - field list") { - val context = new CatalystPlanContext - val logPlan = planTransformer.visit( - plan(pplParser, "source=t | sort - A | fields - A, B | head 5"), - context) - - val table = UnresolvedRelation(Seq("t")) - val sortOrder = Seq(SortOrder(UnresolvedAttribute("A"), Descending)) - val sorted = Sort(sortOrder, true, table) - val dropList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - val dropAB = DataFrameDropColumns(dropList, sorted) - - val planWithLimit = GlobalLimit(Literal(5), LocalLimit(Literal(5), dropAB)) - val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit) - comparePlans(expectedPlan, logPlan, false) - } - - test("test fields + then - field list") { - val context = new CatalystPlanContext - val logPlan = planTransformer.visit( - plan(pplParser, "source=t | fields + A, B, C | fields - A, B"), - context) - - val table = UnresolvedRelation(Seq("t")) - val projectABC = Project( - Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B"), UnresolvedAttribute("C")), - table) - val dropList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) - val dropAB = DataFrameDropColumns(dropList, projectABC) - - val expectedPlan = Project(Seq(UnresolvedStar(None)), dropAB) - comparePlans(expectedPlan, logPlan, false) - } - - test("test fields - then + field list") { - val context = new CatalystPlanContext - val thrown = intercept[SyntaxCheckException] { - planTransformer.visit( - plan(pplParser, "source=t | fields - A, B | fields + A, B, C"), - context) - } - assert( - thrown.getMessage - === "[Field(field=A, fieldArgs=[]), Field(field=B, fieldArgs=[])] can't be resolved") - } - - test("test line comment should pass without exceptions") { - val context = new CatalystPlanContext - planTransformer.visit(plan(pplParser, "source=t a=1 b=2 //this is a comment"), context) - planTransformer.visit(plan(pplParser, "source=t a=1 b=2 // this is a comment "), context) - planTransformer.visit( - plan( - pplParser, - """ - | // test is a new line comment - | source=t a=1 b=2 // test is a line comment at the end of ppl command - | | fields a,b // this is line comment inner ppl command - | ////this is a new line comment - |""".stripMargin), - context) - } - - test("test block comment should pass without exceptions") { - val context = new CatalystPlanContext - planTransformer.visit(plan(pplParser, "source=t a=1 b=2 /*block comment*/"), context) - planTransformer.visit(plan(pplParser, "source=t a=1 b=2 /* block comment */"), context) - planTransformer.visit( - plan( - pplParser, - """ - | /* - | * This is a - | * multiple - | * line block - | * comment - | */ - | search /* block comment */ source=t /* block comment */ a=1 b=2 - | | /* - | This is a - | multiple - | line - | block - | comment */ fields a,b /* block comment */ - | /* block comment */ - |""".stripMargin), - context) - } }