Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract value from map when the key is nested field alike #600

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,34 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
|""".stripMargin)
}

protected def createStructNestedTableWithKeysLikeNestedFields(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| user_data
| STRUCT<
| `user.first.name`:STRING,
| `user.last.name`:STRING,
| `user.age`:INT,
| `user.home.address.street`:STRING,
| `user.home.address.city`:STRING
| >,
| user_credentials STRUCT<login:STRING, password: STRING>
| )
| USING JSON
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ( STRUCT("Alice", "Smith", 30, "123 Main St", "Seattle"), STRUCT("asmith", "REDACTED") ),
| ( STRUCT("Bob", "Johnson", 55, "456 Elm St", "Seattle"), STRUCT("bjohnson", "REDACTED") ),
| ( STRUCT("Charlie", "Williams", 65, "789 Pine St", "San Francisco"), STRUCT("cwilliams", "REDACTED") ),
| ( STRUCT("David", "Brown", 19, "101 Maple St", "San Francisco"), STRUCT("dbrown", "REDACTED") )
|""".stripMargin)
}

protected def createTableIssue112(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Descending, EqualTo, GreaterThan, Literal, SortOrder}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedExtractValue, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, EqualTo, GreaterThan, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLNestedFieldsITSuite
Expand All @@ -20,12 +18,15 @@ class FlintSparkPPLNestedFieldsITSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
private val nestedTestTable = "spark_catalog.default.flint_ppl_test_nested"
private val nestedTestTableWithNestedKeys =
"spark_catalog.default.flint_ppl_test_nested_with_nested_keys"

override def beforeAll(): Unit = {
super.beforeAll()

createStructNestedTable(testTable)
createStructNestedTable(nestedTestTable)
createStructNestedTableWithKeysLikeNestedFields(nestedTestTableWithNestedKeys)
}

protected override def afterEach(): Unit = {
Expand All @@ -38,8 +39,8 @@ class FlintSparkPPLNestedFieldsITSuite
}

test("create ppl simple query test") {
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`"
Seq(testTable, testTableQuoted).foreach { table =>
val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test_nested`"
Seq(nestedTestTable, testTableQuoted).foreach { table =>
val frame = sql(s"""
| source = $table
| """.stripMargin)
Expand All @@ -63,15 +64,15 @@ class FlintSparkPPLNestedFieldsITSuite
val expectedPlan: LogicalPlan =
Project(
Seq(UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))
// Compare the two plans
assert(expectedPlan === logicalPlan)
}
}

test("create ppl simple query with head (limit) 1 test") {
val frame = sql(s"""
| source = $testTable| head 1
| source = $nestedTestTable| head 1
| """.stripMargin)

// Retrieve the results
Expand All @@ -82,7 +83,9 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val limitPlan: LogicalPlan =
Limit(Literal(1), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
Limit(
Literal(1),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))
val expectedPlan = Project(Seq(UnresolvedStar(None)), limitPlan)

// Compare the two plans
Expand All @@ -91,7 +94,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with head (limit) and sorted test") {
val frame = sql(s"""
| source = $testTable| sort int_col | head 1
| source = $nestedTestTable| sort int_col | head 1
| """.stripMargin)

// Retrieve the results
Expand All @@ -105,7 +108,7 @@ class FlintSparkPPLNestedFieldsITSuite
Sort(
Seq(SortOrder(UnresolvedAttribute("int_col"), Ascending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Expand All @@ -117,7 +120,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with head (limit) and nested column sorted test") {
val frame = sql(s"""
| source = $testTable| sort struct_col.field1 | head 1
| source = $nestedTestTable| sort struct_col.field1 | head 1
| """.stripMargin)

// Retrieve the results
Expand All @@ -131,7 +134,7 @@ class FlintSparkPPLNestedFieldsITSuite
Sort(
Seq(SortOrder(UnresolvedAttribute("struct_col.field1"), Ascending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Expand All @@ -143,7 +146,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query two with fields result test") {
val frame = sql(s"""
| source = $testTable| fields int_col, struct_col.field2, struct_col2.field2
| source = $nestedTestTable| fields int_col, struct_col.field2, struct_col2.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -168,14 +171,14 @@ class FlintSparkPPLNestedFieldsITSuite
UnresolvedAttribute("int_col"),
UnresolvedAttribute("struct_col.field2"),
UnresolvedAttribute("struct_col2.field2")),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))
// Compare the two plans
assert(expectedPlan === logicalPlan)
}

test("create ppl simple sorted query two with fields result test sorted") {
val frame = sql(s"""
| source = $testTable| sort - struct_col2.field2 | fields int_col, struct_col2.field2
| source = $nestedTestTable| sort - struct_col2.field2 | fields int_col, struct_col2.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -192,7 +195,7 @@ class FlintSparkPPLNestedFieldsITSuite
Sort(
Seq(SortOrder(UnresolvedAttribute("struct_col2.field2"), Descending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Expand All @@ -206,7 +209,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple sorted by nested field query with two with fields result test ") {
val frame = sql(s"""
| source = $testTable| sort - struct_col.field2 , - int_col | fields int_col, struct_col.field2
| source = $nestedTestTable| sort - struct_col.field2 , - int_col | fields int_col, struct_col.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -225,7 +228,7 @@ class FlintSparkPPLNestedFieldsITSuite
SortOrder(UnresolvedAttribute("struct_col.field2"), Descending),
SortOrder(UnresolvedAttribute("int_col"), Descending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Expand All @@ -239,7 +242,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with nested field 1 range filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col.field2 > 200 | sort - struct_col.field2 | fields int_col, struct_col.field2
| source = $nestedTestTable| where struct_col.field2 > 200 | sort - struct_col.field2 | fields int_col, struct_col.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -253,7 +256,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested"))
// Define the expected logical plan components
val filterPlan =
Filter(GreaterThan(UnresolvedAttribute("struct_col.field2"), Literal(200)), table)
Expand All @@ -273,7 +276,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with nested field 2 range filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col2.field2 > 50 | sort - struct_col2.field2 | fields int_col, struct_col2.field2
| source = $nestedTestTable| where struct_col2.field2 > 50 | sort - struct_col2.field2 | fields int_col, struct_col2.field2
| """.stripMargin)

// Retrieve the results
Expand All @@ -287,7 +290,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested"))
// Define the expected logical plan components
val filterPlan =
Filter(GreaterThan(UnresolvedAttribute("struct_col2.field2"), Literal(50)), table)
Expand All @@ -307,7 +310,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with nested field string match test") {
val frame = sql(s"""
| source = $testTable| where struct_col.field1.subfield = 'value1' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
| source = $nestedTestTable| where struct_col.field1.subfield = 'value1' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
| """.stripMargin)

// Retrieve the results
Expand All @@ -321,7 +324,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested"))
// Define the expected logical plan components
val filterPlan =
Filter(EqualTo(UnresolvedAttribute("struct_col.field1.subfield"), Literal("value1")), table)
Expand All @@ -341,7 +344,7 @@ class FlintSparkPPLNestedFieldsITSuite

test("create ppl simple query with nested field string filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
| source = $nestedTestTable| where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
| """.stripMargin)

// Retrieve the results
Expand All @@ -359,7 +362,7 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test_nested"))
// Define the expected logical plan components
val filterPlan = Filter(
GreaterThan(UnresolvedAttribute("struct_col2.field1.subfield"), Literal("valueA")),
Expand All @@ -377,4 +380,64 @@ class FlintSparkPPLNestedFieldsITSuite
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("create ppl simple query nested column with dots in name test") {
val frame = sql(s"""
| source = $nestedTestTableWithNestedKeys | fields user_data[user.first.name]
| """.stripMargin)
// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row("Alice"), Row("Bob"), Row("Charlie"), Row("David"))
assert(results.length == 4)
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val pplLogicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(
Seq("spark_catalog", "default", "flint_ppl_test_nested_with_nested_keys"))
val expectedPlan = Project(
Seq(
UnresolvedAlias(
UnresolvedExtractValue(UnresolvedAttribute("user_data"), Literal("user.first.name")))),
table)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(pplLogicalPlan))
}

test("create ppl simple query nested column with dots in name filter test") {
val frame = sql(s"""
| source = $nestedTestTableWithNestedKeys | where user_data[user.home.address.city] = 'Seattle'
| """.stripMargin)
// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row(Row("Alice", "Smith", 30, "123 Main St", "Seattle"), Row("asmith", "REDACTED")),
Row(Row("Bob", "Johnson", 55, "456 Elm St", "Seattle"), Row("bjohnson", "REDACTED")))
assert(results.length == 2)
// Compare the results
assert(results === expectedResults)

// Retrieve the logical plan
val pplLogicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(
Seq("spark_catalog", "default", "flint_ppl_test_nested_with_nested_keys"))
val cityEqualTo = EqualTo(
UnresolvedExtractValue(UnresolvedAttribute("user_data"), Literal("user.home.address.city")),
Literal("Seattle"))
val filter = Filter(cityEqualTo, table)
val expectedPlan = Project(Seq(UnresolvedStar(Option.empty)), filter)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(pplLogicalPlan))
}
}
1 change: 1 addition & 0 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ See the next samples of PPL queries :
**Fields**
- `source = table`
- `source = table | fields a,b,c`
- `source = table | fields level1[level2.leaf]` (nested key)

**Nested-Fields**
- `source = catalog.schema.table1, catalog.schema.table2 | fields A.nested1, B.nested1`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ sortFieldExpression

fieldExpression
: qualifiedName
| extractedName
;

wcFieldExpression
Expand Down Expand Up @@ -784,6 +785,10 @@ qualifiedName
: ident (DOT ident)* # identsAsQualifiedName
;

extractedName
: rootName = qualifiedName LT_SQR_PRTHS extractPath = qualifiedName RT_SQR_PRTHS
;

tableQualifiedName
: tableIdent (DOT ident)* # identsAsTableQualifiedName
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.ast.expression.Case;
import org.opensearch.sql.ast.expression.Compare;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.ExtractedField;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.FieldsMapping;
import org.opensearch.sql.ast.expression.Function;
Expand Down Expand Up @@ -180,6 +181,10 @@ public T visitField(Field node, C context) {
return visitChildren(node, context);
}

public T visitExtractedField(ExtractedField node, C context) {
return visitChildren(node, context);
}

public T visitQualifiedName(QualifiedName node, C context) {
return visitChildren(node, context);
}
Expand Down
Loading
Loading