From 400fbad4d1ac07bf6898f435de104c6b0f9ef95d Mon Sep 17 00:00:00 2001 From: YANGDB Date: Sat, 14 Oct 2023 00:05:57 -0700 Subject: [PATCH] update correlation related traversal - add plan branches context traversal - add resolving of un-resolved attributes (columns) - add join spec transformer util API - add documentation about the correlation design considerations Signed-off-by: YANGDB --- docs/PPL-Correlation-command.md | 283 ++++++++++++++++++ .../FlintSparkSkippingIndexITSuite.scala | 28 +- .../flint/spark/FlintSparkSuite.scala | 15 +- ....scala => FlintSparkPPLBasicITSuite.scala} | 2 +- .../ppl/FlintSparkPPLCorrelationITSuite.scala | 116 ++++++- .../opensearch/sql/ast/expression/And.java | 22 +- .../sql/ast/expression/BinaryExpression.java | 29 ++ .../opensearch/sql/ast/expression/Field.java | 1 + .../org/opensearch/sql/ast/expression/Or.java | 22 +- .../opensearch/sql/ast/expression/Xor.java | 20 +- .../sql/ppl/CatalystPlanContext.java | 105 ++++++- .../sql/ppl/CatalystQueryPlanVisitor.java | 135 +++++---- .../sql/ppl/utils/ComparatorTransformer.java | 1 - .../sql/ppl/utils/JoinSpecTransformer.java | 14 +- .../sql/ppl/utils/RelationUtils.java | 33 ++ spark-sql-integration/README.md | 109 +++++++ 16 files changed, 769 insertions(+), 166 deletions(-) create mode 100644 docs/PPL-Correlation-command.md rename integ-test/src/test/scala/org/opensearch/flint/spark/ppl/{FlintSparkPPLITSuite.scala => FlintSparkPPLBasicITSuite.scala} (99%) create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/BinaryExpression.java create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java create mode 100644 spark-sql-integration/README.md diff --git a/docs/PPL-Correlation-command.md b/docs/PPL-Correlation-command.md new file mode 100644 index 000000000..f7ef3e266 --- /dev/null +++ b/docs/PPL-Correlation-command.md @@ -0,0 +1,283 @@ +## PPL Correlation Command + +## Overview + +In the past year OpenSearch Observability & security teams have been busy with many aspects of improving data monitoring and visibility. +The key idea behind our work was to enable the users to dig in their data and emerge the hidden insight within the massive corpus of logs, events and observations. + +One fundamental concept that will help and support this process is the ability to correlate different data sources according to common dimensions and timeframe. +This subject is well documented and described and this RFC will not dive into the necessity of the correlation (appendix will refer to multiple resources related) but for the structuring of the linguistic support for such capability . + +![](https://user-images.githubusercontent.com/48943349/253685892-225e78e1-0942-46b0-8f67-97f9412a1c4c.png) + + +### Problem definition + +In the appendix I’ll add some formal references to the domain of the problem both in Observability / Security, but the main takeaway is that such capability is fundamental in the daily work of such domain experts and SRE’s. +The daily encounters with huge amount of data arriving from different verticals (data-sources) which share the same time-frames but are not synchronized in a formal manner. + +The correlation capability to intersect these different verticals according to the timeframe and the similar dimensions will enrich the data and allow the desired insight to surface. + +**Example** +Lets take the Observability domain for which we have 3 distinct data sources +*- Logs* +*- Metrics* +*- Traces* + +Each datasource may share many common dimensions but to be able to transition from one data-source to another its necessary to be able to correctly correlate them. +According to the semantic naming conventions we know that both logs, traces and metrics + +Lets take the following examples: + +**Log** + +``` +{ + "@timestamp": "2018-07-02T22:23:00.186Z", + "aws": { + "elb": { + "backend": { + "http": { + "response": { + "status_code": 500 + } + }, + "ip": "10.0.0.1", + "port": "80" + }, + ... + "target_port": [ + "10.0.0.1:80" + ], + "target_status_code": [ + "500" + ], + "traceId": "Root=1-58337262-36d228ad5d99923122bbe354", + "type": "http" + } + }, + "cloud": { + "provider": "aws" + }, + "http": { + "request": { + ... + }, + "communication": { + "source": { + "address": "192.168.131.39", + "ip": "192.168.131.39", + "port": 2817 + } + }, + "traceId": "Root=1-58337262-36d228ad5d99923122bbe354" +} +``` + +This is an AWS ELB log arriving from a service residing on aws. +It shows that a `backend.http.response.status_code` was 500 - which is an error. + +This may come up as part of a monitoring process or an alert triggered by some rule. Once this is identified, the next step would be to collect as much data surrounding this event so that an investigation could be done in the most Intelligent and thorough way. + +The most obviously step would be to create a query that brings all data related to that timeframe - but in many case this is too much of a brute force action. + +Data may be too large to analyze and would result in spending most of the time only filtering the none-relevant data instead of actually trying to locate the root cause of the problem. + + +### **Suggest Correlation command** + +The next approach would allow to search in a much fine-grained manner and further simplify the analysis stage. + +Lets review the known facts - we have multiple dimensions that can be used to correlate data data from other sources: + +- **IP** - `"ip": "10.0.0.1" | "ip": "192.168.131.39"` + +- **Port** - `"port": 2817 | ` "target_port": `"10.0.0.1:80"` + +So assuming we have the additional traces / metrics indices available and using the fact that we know our schema structure (see appendix with relevant schema references) we can generate a query for getting all relevant data bearing these dimensions during the same timeframe. + +Here is a snipped of the trace index document that has http information that we would like to correlate with: + +``` +{ + "traceId": "c1d985bd02e1dbb85b444011f19a1ecc", + "spanId": "55a698828fe06a42", + "traceState": [], + "parentSpanId": "", + "name": "mysql", + "kind": "CLIENT", + "@timestamp": "2021-11-13T20:20:39+00:00", + "events": [ + { + "@timestamp": "2021-03-25T17:21:03+00:00", + ... + } + ], + "links": [ + { + "traceId": "c1d985bd02e1dbb85b444011f19a1ecc", + "spanId": "55a698828fe06a42w2", + }, + "droppedAttributesCount": 0 + } + ], + "resource": { + "service@name": "database", + "telemetry@sdk@name": "opentelemetry", + "host@hostname": "ip-172-31-10-8.us-west-2.compute.internal" + }, + "status": { + ... + }, + "attributes": { + "http": { + "user_agent": { + "original": "Mozilla/5.0" + }, + "network": { + ... + } + }, + "request": { + ... + } + }, + "response": { + "status_code": "200", + "body": { + "size": 500 + } + }, + "client": { + "server": { + "socket": { + "address": "192.168.0.1", + "domain": "example.com", + "port": 80 + }, + "address": "192.168.0.1", + "port": 80 + }, + "resend_count": 0, + "url": { + "full": "http://example.com" + } + }, + "server": { + "route": "/index", + "address": "192.168.0.2", + "port": 8080, + "socket": { + ... + }, + "client": { + ... + } + }, + "url": { + ... + } + } + } + } +} +``` + +In the above document we can see both the `traceId` and the http’s client/server `ip` that can be correlated with the elb logs to better understand the system’s behaviour and condition . + + +### New Correlation Query Command + +Here is the new command that would allow this type of investigation : + +`source alb_logs, traces | where alb_logs.ip="10.0.0.1" AND alb_logs.cloud.provider="aws"| ` +`correlate exact fields(traceId, ip) scope(@timestamp, 1D) mapping(alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId ) ` + +Lets break this down a bit: + +`1. source alb_logs, traces` allows to select all the data-sources that will be correlated to one another + +`2. where ip="10.0.0.1" AND cloud.provider="aws"` predicate clause constraints the scope of the search corpus + +`3. correlate exact fields(traceId, ip)` express the correlation operation on the following list of field : + +`- ip` has an explicit filter condition so this will be propagated into the correlation condition for all the data-sources +`- traceId` has no explicit filter so the correlation will only match same traceId’s from all the data-sources + +The fields names indicate the logical meaning the function within the correlation command, the actual join condition will take the mapping statement described bellow. + +The term `exact` means that the correlation statements will require all the fields to match in order to fulfill the query statement. + +Other alternative for this can be `approximate` that will attempt to match on a best case scenario and will not reject rows with partially match. + + +### Addressing different field mapping + +In cases where the same logical field (such as `ip` ) may have different mapping within several data-sources, the explicit mapping field path is expected. + +The next syntax will extend the correlation conditions to allow matching different field names with similar logical meaning +`alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId ` + +It is expected that for each `field` that participates in the correlation join, there should be a relevant `mapping` statement that includes all the tables that should be joined by this correlation command. + +**Example****:** +In our case there are 2 sources : `alb_logs, traces` +There are 2 fields: `traceId, ip` +These are 2 mapping statements : `alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId` + + +### Scoping the correlation timeframes + +In order to simplify the work that has to be done by the execution engine (driver) the scope statement was added to explicitly direct the join query on the time it should scope for this search. + +`scope(@timestamp, 1D)` in this example, the scope of the search should be focused on a daily basis so that correlations appearing in the same day should be grouped together. This scoping mechanism simplifies and allows better control over results and allows incremental search resolution base on the user’s needs. + +***Diagram*** +These are the correlation conditions that explicitly state how the sources are going to be joined. +[Image: Screenshot 2023-10-06 at 12.23.59 PM.png]* * * + +## Supporting Drivers + +The new correlation command is actually a ‘hidden’ join command therefore the only following PPL drivers support this command: + +- [ppl-spark](https://github.com/opensearch-project/opensearch-spark/tree/main/ppl-spark-integration) + In this driver the `correlation` command will be directly translated into the appropriate Catalyst Join logical plan + +**Example:** +*`source alb_logs, traces, metrics | where ip="10.0.0.1" AND cloud.provider="aws"| correlate exact on (ip, port) scope(@timestamp, 2018-07-02T22:23:00, 1 D)`* + +**Logical Plan:** + +``` +'Project [*] ++- 'Join Inner, ('ip && 'port) + :- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D")) + +- 'UnresolvedRelation [alb_logs] + +- 'Join Inner, ('ip && 'port) + :- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D")) + +- 'UnresolvedRelation [traces] + +- 'Filter (('ip === "10.0.0.1" && 'cloud.provider === "aws") && inTimeScope('@timestamp, "2018-07-02T22:23:00", "1 D")) + +- 'UnresolvedRelation [metrics] +``` + +Catalyst engine will optimize this query according to the most efficient join ordering. + +* * * + +## Appendix + +* Correlation concepts + * https://github.com/opensearch-project/sql/issues/1583 + * https://github.com/opensearch-project/dashboards-observability/issues?q=is%3Aopen+is%3Aissue+label%3Ametrics +* Observability Correlation + * https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/ + * https://github.com/opensearch-project/dashboards-observability/wiki/Observability-Future-Vision#data-correlation +* Security Correlation + * [OpenSearch new correlation engine](https://opensearch.org/docs/latest/security-analytics/usage/correlation-graph/) + * [ocsf](https://github.com/ocsf/) +* Simple schema + * [correlation use cases](https://github.com/opensearch-project/dashboards-observability/wiki/Observability-Future-Vision#data-correlation) + * [correlation mapping metadata](https://github.com/opensearch-project/opensearch-catalog/tree/main/docs/schema) + +![](https://user-images.githubusercontent.com/48943349/274153824-9c6008e0-fdaf-434f-8e5d-4347cee66ac4.png) + diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 02dc681d7..da61feebc 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -122,8 +122,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - val optionJson = compact(render( - parse(index.get.metadata().getContent) \ "_meta" \ "options")) + val optionJson = compact(render(parse(index.get.metadata().getContent) \ "_meta" \ "options")) optionJson should matchJson(""" | { | "auto_refresh": "true", @@ -321,8 +320,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { |""".stripMargin) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex( - hasIndexFilter(col("year") === 2023)) + useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023)) } test("should not rewrite original query if filtering condition has disjunction") { @@ -388,8 +386,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { // Prepare test table val testTable = "spark_catalog.default.data_type_table" val testIndex = getSkippingIndexName(testTable) - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | boolean_col BOOLEAN, @@ -408,8 +405,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | ) | USING PARQUET |""".stripMargin) - sql( - s""" + sql(s""" | INSERT INTO $testTable | VALUES ( | TRUE, @@ -449,8 +445,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson( - s"""{ + index.get.metadata().getContent should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_data_type_table_skipping_index", | "version": "${current()}", @@ -587,8 +582,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { test("can build skipping index for varchar and char and rewrite applicable query") { val testTable = "spark_catalog.default.varchar_char_table" val testIndex = getSkippingIndexName(testTable) - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | varchar_col VARCHAR(20), @@ -596,8 +590,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | ) | USING PARQUET |""".stripMargin) - sql( - s""" + sql(s""" | INSERT INTO $testTable | VALUES ( | "sample varchar", @@ -613,8 +606,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .create() flint.refreshIndex(testIndex, FULL) - val query = sql( - s""" + val query = sql(s""" | SELECT varchar_col, char_col | FROM $testTable | WHERE varchar_col = "sample varchar" AND char_col = "sample char" @@ -624,8 +616,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val paddedChar = "sample char".padTo(20, ' ') checkAnswer(query, Row("sample varchar", paddedChar)) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex(hasIndexFilter( - col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) flint.deleteIndex(testIndex) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index edbf5935a..d1f01caca 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -15,11 +15,7 @@ import org.apache.spark.sql.streaming.StreamTest /** * Flint Spark suite trait that initializes [[FlintSpark]] API instance. */ -trait FlintSparkSuite - extends QueryTest - with FlintSuite - with OpenSearchSuite - with StreamTest { +trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite with StreamTest { /** Flint Spark high level API being tested */ lazy protected val flint: FlintSpark = new FlintSpark(spark) @@ -33,8 +29,7 @@ trait FlintSparkSuite } protected def createPartitionedTable(testTable: String): Unit = { - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | name STRING, @@ -52,15 +47,13 @@ trait FlintSparkSuite | ) |""".stripMargin) - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=4) | VALUES ('Hello', 30, 'Seattle') | """.stripMargin) - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=5) | VALUES ('World', 25, 'Portland') diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala similarity index 99% rename from integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 9dea04872..8f1d1bd1f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -11,7 +11,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest -class FlintSparkPPLITSuite +class FlintSparkPPLBasicITSuite extends QueryTest with LogicalPlanTestUtils with FlintPPLSuite diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLCorrelationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLCorrelationITSuite.scala index c345fd5bc..b4ed6a51d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLCorrelationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLCorrelationITSuite.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.ppl import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{And, Ascending, EqualTo, Literal, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{And, Ascending, EqualTo, GreaterThan, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.JoinHint.NONE @@ -23,6 +23,7 @@ class FlintSparkPPLCorrelationITSuite /** Test table and index name */ private val testTable1 = "spark_catalog.default.flint_ppl_test1" private val testTable2 = "spark_catalog.default.flint_ppl_test2" + private val testTable3 = "spark_catalog.default.flint_ppl_test3" override def beforeAll(): Unit = { super.beforeAll() @@ -89,6 +90,38 @@ class FlintSparkPPLCorrelationITSuite | ('David', 'Unemployed', 'Canada', 0), | ('Jane', 'Scientist', 'Canada', 90000) | """.stripMargin) + sql(s""" + | CREATE TABLE $testTable3 + | ( + | name STRING, + | country STRING, + | hobby STRING, + | language STRING + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + | PARTITIONED BY ( + | year INT, + | month INT + | ) + |""".stripMargin) + + // Insert data into the new table + sql(s""" + | INSERT INTO $testTable3 + | PARTITION (year=2023, month=4) + | VALUES ('Jake', 'USA', 'Fishing', 'English'), + | ('Hello', 'USA', 'Painting', 'English'), + | ('John', 'Canada', 'Reading', 'French'), + | ('Jim', 'Canada', 'Hiking', 'English'), + | ('Peter', 'Canada', 'Gaming', 'English'), + | ('Rick', 'USA', 'Swimming', 'English'), + | ('David', 'USA', 'Gardening', 'English'), + | ('Jane', 'Canada', 'Singing', 'French') + | """.stripMargin) } protected override def afterEach(): Unit = { @@ -120,7 +153,19 @@ class FlintSparkPPLCorrelationITSuite val results: Array[Row] = frame.collect() // Define the expected results val expectedResults: Array[Row] = Array( - Row("Jake", 70, "California", "USA", 2023, 4, "Jake", "Engineer", "England", 100000, 2023, 4), + Row( + "Jake", + 70, + "California", + "USA", + 2023, + 4, + "Jake", + "Engineer", + "England", + 100000, + 2023, + 4), Row("Hello", 30, "New York", "USA", 2023, 4, "Hello", "Artist", "USA", 70000, 2023, 4), Row("John", 25, "Ontario", "Canada", 2023, 4, "John", "Doctor", "Canada", 120000, 2023, 4), Row("David", 40, "Washington", "USA", 2023, 4, "David", "Unemployed", "Canada", 0, 2023, 4), @@ -163,7 +208,7 @@ class FlintSparkPPLCorrelationITSuite } test("create ppl correlation query with two tables correlating on a two fields test") { - val frame = sql(s""" + val frame = sql(s""" | source = $testTable1, $testTable2| where year = 2023 AND month = 4 | correlate exact fields(name, country) scope(month, 1W) | mapping($testTable1.name = $testTable2.name, $testTable1.country = $testTable2.country) | """.stripMargin) @@ -198,9 +243,68 @@ class FlintSparkPPLCorrelationITSuite // Define join condition val joinCondition = And( - EqualTo(UnresolvedAttribute(s"$testTable1.name"), UnresolvedAttribute(s"$testTable2.name")), - EqualTo(UnresolvedAttribute(s"$testTable1.country"), UnresolvedAttribute(s"$testTable2.country")) - ) + EqualTo( + UnresolvedAttribute(s"$testTable1.name"), + UnresolvedAttribute(s"$testTable2.name")), + EqualTo( + UnresolvedAttribute(s"$testTable1.country"), + UnresolvedAttribute(s"$testTable2.country"))) + + // Create Join plan + val joinPlan = Join(plan1, plan2, Inner, Some(joinCondition), JoinHint.NONE) + + // Add the projection + val expectedPlan = Project(Seq(UnresolvedStar(None)), joinPlan) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // Compare the two plans + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test( + "create ppl correlation query with two tables correlating on a two fields and disjoint filters test") { + val frame = sql(s""" + | source = $testTable1, $testTable2| where year = 2023 AND month = 4 AND $testTable2.salary > 100000 | correlate exact fields(name, country) scope(month, 1W) + | mapping($testTable1.name = $testTable2.name, $testTable1.country = $testTable2.country) + | """.stripMargin) + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("David", 40, "Washington", "USA", 2023, 4, "David", "Doctor", "USA", 120000, 2023, 4), + Row("John", 25, "Ontario", "Canada", 2023, 4, "John", "Doctor", "Canada", 120000, 2023, 4)) + + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + // Compare the results + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Define unresolved relations + val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1")) + val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2")) + + // Define filter expressions + val filter1Expr = And( + EqualTo(UnresolvedAttribute("year"), Literal(2023)), + EqualTo(UnresolvedAttribute("month"), Literal(4))) + val filter2Expr = And( + And( + EqualTo(UnresolvedAttribute("year"), Literal(2023)), + EqualTo(UnresolvedAttribute("month"), Literal(4))), + GreaterThan(UnresolvedAttribute(s"$testTable2.salary"), Literal(100000))) + // Define subquery aliases + val plan1 = Filter(filter1Expr, table1) + val plan2 = Filter(filter2Expr, table2) + + // Define join condition + val joinCondition = + And( + EqualTo( + UnresolvedAttribute(s"$testTable1.name"), + UnresolvedAttribute(s"$testTable2.name")), + EqualTo( + UnresolvedAttribute(s"$testTable1.country"), + UnresolvedAttribute(s"$testTable2.country"))) // Create Join plan val joinPlan = Join(plan1, plan2, Inner, Some(joinCondition), JoinHint.NONE) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/And.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/And.java index f19de2a05..f783aabb7 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/And.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/And.java @@ -11,28 +11,12 @@ import java.util.List; /** Expression node of logic AND. */ -public class And extends UnresolvedExpression { - private UnresolvedExpression left; - private UnresolvedExpression right; +public class And extends BinaryExpression { public And(UnresolvedExpression left, UnresolvedExpression right) { - this.left = left; - this.right = right; + super(left,right); } - - @Override - public List getChild() { - return Arrays.asList(left, right); - } - - public UnresolvedExpression getLeft() { - return left; - } - - public UnresolvedExpression getRight() { - return right; - } - + @Override public R accept(AbstractNodeVisitor nodeVisitor, C context) { return nodeVisitor.visitAnd(this, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/BinaryExpression.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/BinaryExpression.java new file mode 100644 index 000000000..a50a153a0 --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/BinaryExpression.java @@ -0,0 +1,29 @@ +package org.opensearch.sql.ast.expression; + +import java.util.Arrays; +import java.util.List; + +public abstract class BinaryExpression extends UnresolvedExpression { + private UnresolvedExpression left; + private UnresolvedExpression right; + + public BinaryExpression(UnresolvedExpression left, UnresolvedExpression right) { + this.left = left; + this.right = right; + } + + @Override + public List getChild() { + return Arrays.asList(left, right); + } + + public UnresolvedExpression getLeft() { + return left; + } + + public UnresolvedExpression getRight() { + return right; + } + +} + diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Field.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Field.java index 7c77fae1f..39b42dfe4 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Field.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Field.java @@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableList; import org.opensearch.sql.ast.AbstractNodeVisitor; +import java.util.ArrayList; import java.util.Collections; import java.util.List; public class Field extends UnresolvedExpression { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Or.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Or.java index 65e1a2e6d..d76cda695 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Or.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Or.java @@ -12,28 +12,10 @@ /** Expression node of the logic OR. */ -public class Or extends UnresolvedExpression { - private UnresolvedExpression left; - private UnresolvedExpression right; - +public class Or extends BinaryExpression { public Or(UnresolvedExpression left, UnresolvedExpression right) { - this.left = left; - this.right = right; + super(left,right); } - - @Override - public List getChild() { - return Arrays.asList(left, right); - } - - public UnresolvedExpression getLeft() { - return left; - } - - public UnresolvedExpression getRight() { - return right; - } - @Override public R accept(AbstractNodeVisitor nodeVisitor, C context) { return nodeVisitor.visitOr(this, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Xor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Xor.java index 9368a6363..9f618a067 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Xor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Xor.java @@ -12,28 +12,14 @@ /** Expression node of the logic XOR. */ -public class Xor extends UnresolvedExpression { +public class Xor extends BinaryExpression { private UnresolvedExpression left; private UnresolvedExpression right; public Xor(UnresolvedExpression left, UnresolvedExpression right) { - this.left = left; - this.right = right; + super(left,right); } - - @Override - public List getChild() { - return Arrays.asList(left, right); - } - - public UnresolvedExpression getLeft() { - return left; - } - - public UnresolvedExpression getRight() { - return right; - } - + @Override public R accept(AbstractNodeVisitor nodeVisitor, C context) { return nodeVisitor.visitXor(this, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java index 4145f5628..d6133206f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java @@ -5,17 +5,24 @@ package org.opensearch.sql.ppl; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.catalyst.plans.logical.Union; +import scala.collection.Iterator; import scala.collection.Seq; +import java.util.Collection; +import java.util.List; +import java.util.Optional; import java.util.Stack; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; +import static scala.collection.JavaConverters.asJavaCollection; import static scala.collection.JavaConverters.asScalaBuffer; /** @@ -26,6 +33,10 @@ public class CatalystPlanContext { * Catalyst evolving logical plan **/ private Stack planBranches = new Stack<>(); + /** + * The current traversal context the visitor is going threw + */ + private Stack planTraversalContext = new Stack<>(); /** * NamedExpression contextual parameters @@ -49,16 +60,29 @@ public LogicalPlan getPlan() { return new Union(asScalaBuffer(this.planBranches), true, true); } + /** + * get the current traversals visitor context + * + * @return + */ + public Stack traversalContext() { + return planTraversalContext; + } + public Stack getNamedParseExpressions() { return namedParseExpressions; } + public Optional popNamedParseExpressions() { + return namedParseExpressions.isEmpty() ? Optional.empty() : Optional.of(namedParseExpressions.pop()); + } + public Stack getGroupingParseExpressions() { return groupingParseExpressions; } /** - * append context with evolving plan + * append plan with evolving plans branches * * @param plan * @return @@ -66,14 +90,50 @@ public Stack getGroupingParseExpressions() { public LogicalPlan with(LogicalPlan plan) { return this.planBranches.push(plan); } - - public LogicalPlan plan(Function transformFunction) { - this.planBranches.replaceAll(transformFunction::apply); + /** + * append plans collection with evolving plans branches + * + * @param plans + * @return + */ + public LogicalPlan withAll(Collection plans) { + this.planBranches.addAll(plans); return getPlan(); } - - /** + + /** + * reduce all plans with the given reduce function + * @param transformFunction + * @return + */ + public LogicalPlan reduce(BiFunction transformFunction) { + return with(asJavaCollection(retainAllPlans(p -> p)).stream().reduce((left, right) -> { + planTraversalContext.push(left); + planTraversalContext.push(right); + LogicalPlan result = transformFunction.apply(left, right); + planTraversalContext.pop(); + planTraversalContext.pop(); + return result; + }).orElse(getPlan())); + } + + /** + * apply for each plan with the given function + * @param transformFunction + * @return + */ + public LogicalPlan apply(Function transformFunction) { + return withAll(asJavaCollection(retainAllPlans(p -> p)).stream().map(p -> { + planTraversalContext.push(p); + LogicalPlan result = transformFunction.apply(p); + planTraversalContext.pop(); + return result; + }).collect(Collectors.toList())); + } + + /** * retain all logical plans branches + * * @return */ public Seq retainAllPlans(Function transformFunction) { @@ -81,9 +141,10 @@ public Seq retainAllPlans(Function transformFunction) { getPlanBranches().retainAll(emptyList()); return plans; } - /** - * + + /** * retain all expressions and clear expression stack + * * @return */ public Seq retainAllNamedParseExpressions(Function transformFunction) { @@ -95,6 +156,7 @@ public Seq retainAllNamedParseExpressions(Function transfo /** * retain all aggregate expressions and clear expression stack + * * @return */ public Seq retainAllGroupingNamedParseExpressions(Function transformFunction) { @@ -103,4 +165,31 @@ public Seq retainAllGroupingNamedParseExpressions(Function getGroupingParseExpressions().retainAll(emptyList()); return aggregateExpressions; } + + public static List findRelation(Stack plan) { + return plan.stream() + .map(CatalystPlanContext::findRelation) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + public static Optional findRelation(LogicalPlan plan) { + // Check if the current node is an UnresolvedRelation + if (plan instanceof UnresolvedRelation) { + return Optional.of((UnresolvedRelation) plan); + } + + // Traverse the children of the current node + Iterator children = plan.children().iterator(); + while (children.hasNext()) { + Optional result = findRelation(children.next()); + if (result.isPresent()) { + return result; + } + } + + // Return null if no UnresolvedRelation is found + return Optional.empty(); + } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 8b0998720..320e6617c 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -23,6 +23,7 @@ import org.opensearch.sql.ast.expression.AllFields; import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.BinaryExpression; import org.opensearch.sql.ast.expression.Case; import org.opensearch.sql.ast.expression.Compare; import org.opensearch.sql.ast.expression.Field; @@ -60,13 +61,17 @@ import java.util.List; import java.util.Objects; +import java.util.Optional; +import java.util.function.BiFunction; import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.List.of; +import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate; import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join; +import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField; import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window; /** @@ -99,9 +104,9 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { @Override public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { - node.getTableName().forEach(t -> - // Resolving the qualifiedName which is composed of a datasource.schema.table - context.with(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false)) + node.getTableName().forEach(t -> + // Resolving the qualifiedName which is composed of a datasource.schema.table + context.with(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false)) ); return context.getPlan(); } @@ -109,23 +114,28 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { @Override public LogicalPlan visitFilter(Filter node, CatalystPlanContext context) { node.getChild().get(0).accept(this, context); - Expression conditionExpression = visitExpression(node.getCondition(), context); - Expression innerConditionExpression = context.getNamedParseExpressions().pop(); - return context.plan(p -> new org.apache.spark.sql.catalyst.plans.logical.Filter(innerConditionExpression, p)); + return context.apply(p -> { + Expression conditionExpression = visitExpression(node.getCondition(), context); + Optional innerConditionExpression = context.popNamedParseExpressions(); + return innerConditionExpression.map(expression -> new org.apache.spark.sql.catalyst.plans.logical.Filter(innerConditionExpression.get(), p)).orElse(null); + }); } @Override public LogicalPlan visitCorrelation(Correlation node, CatalystPlanContext context) { node.getChild().get(0).accept(this, context); - visitFieldList(node.getFieldsList().stream().map(Field::new).collect(Collectors.toList()), context); - Seq fields = context.retainAllNamedParseExpressions(e -> e); - expressionAnalyzer.visitSpan(node.getScope(), context); - Expression scope = context.getNamedParseExpressions().pop(); - expressionAnalyzer.visitCorrelationMapping(node.getMappingListContext(), context); - Seq mapping = context.retainAllNamedParseExpressions(e -> e); - return join(node.getCorrelationType(), fields, scope, mapping, context); + context.reduce((left,right) -> { + visitFieldList(node.getFieldsList().stream().map(Field::new).collect(Collectors.toList()), context); + Seq fields = context.retainAllNamedParseExpressions(e -> e); + expressionAnalyzer.visitSpan(node.getScope(), context); + Expression scope = context.popNamedParseExpressions().get(); + expressionAnalyzer.visitCorrelationMapping(node.getMappingListContext(), context); + Seq mapping = context.retainAllNamedParseExpressions(e -> e); + return join(node.getCorrelationType(), fields, scope, mapping, left, right); + }); + return context.getPlan(); } - + @Override public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext context) { node.getChild().get(0).accept(this, context); @@ -150,7 +160,7 @@ public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext contex private static LogicalPlan extractedAggregation(CatalystPlanContext context) { Seq groupingExpression = context.retainAllGroupingNamedParseExpressions(p -> p); Seq aggregateExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p); - return context.plan(p -> new Aggregate(groupingExpression, aggregateExpressions, p)); + return context.apply(p -> new Aggregate(groupingExpression, aggregateExpressions, p)); } @Override @@ -169,7 +179,7 @@ public LogicalPlan visitProject(Project node, CatalystPlanContext context) { if (!projectList.isEmpty()) { Seq projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p); // build the plan with the projection step - child = context.plan(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p)); + child = context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p)); } if (node.hasArgument()) { Argument argument = node.getArgExprList().get(0); @@ -183,13 +193,13 @@ public LogicalPlan visitSort(Sort node, CatalystPlanContext context) { node.getChild().get(0).accept(this, context); visitFieldList(node.getSortList(), context); Seq sortElements = context.retainAllNamedParseExpressions(exp -> SortUtils.getSortDirection(node, (NamedExpression) exp)); - return context.plan(p -> (LogicalPlan) new org.apache.spark.sql.catalyst.plans.logical.Sort(sortElements, true, p)); + return context.apply(p -> (LogicalPlan) new org.apache.spark.sql.catalyst.plans.logical.Sort(sortElements, true, p)); } @Override public LogicalPlan visitHead(Head node, CatalystPlanContext context) { node.getChild().get(0).accept(this, context); - return context.plan(p -> (LogicalPlan) Limit.apply(new org.apache.spark.sql.catalyst.expressions.Literal( + return context.apply(p -> (LogicalPlan) Limit.apply(new org.apache.spark.sql.catalyst.expressions.Literal( node.getSize(), DataTypes.IntegerType), p)); } @@ -258,53 +268,67 @@ public Expression visitLiteral(Literal node, CatalystPlanContext context) { translate(node.getValue(), node.getType()), translate(node.getType()))); } - @Override - public Expression visitAnd(And node, CatalystPlanContext context) { + /** + * generic binary (And, Or, Xor , ...) arithmetic expression resolver + * @param node + * @param transformer + * @param context + * @return + */ + public Expression visitBinaryArithmetic(BinaryExpression node, BiFunction transformer, CatalystPlanContext context) { node.getLeft().accept(this, context); - Expression left = (Expression) context.getNamedParseExpressions().pop(); + Optional left = context.popNamedParseExpressions(); node.getRight().accept(this, context); - Expression right = (Expression) context.getNamedParseExpressions().pop(); - return context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.And(left, right)); + Optional right = context.popNamedParseExpressions(); + if(left.isPresent() && right.isPresent()) { + return transformer.apply(left.get(),right.get()); + } else if(left.isPresent()) { + return context.getNamedParseExpressions().push(left.get()); + } else if(right.isPresent()) { + return context.getNamedParseExpressions().push(right.get()); + } + return null; + + } + + @Override + public Expression visitAnd(And node, CatalystPlanContext context) { + return visitBinaryArithmetic(node, + (left,right)-> context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.And(left, right)), context); } @Override public Expression visitOr(Or node, CatalystPlanContext context) { - node.getLeft().accept(this, context); - Expression left = (Expression) context.getNamedParseExpressions().pop(); - node.getRight().accept(this, context); - Expression right = (Expression) context.getNamedParseExpressions().pop(); - return context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.Or(left, right)); + return visitBinaryArithmetic(node, + (left,right)-> context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.Or(left, right)), context); } @Override public Expression visitXor(Xor node, CatalystPlanContext context) { - node.getLeft().accept(this, context); - Expression left = (Expression) context.getNamedParseExpressions().pop(); - node.getRight().accept(this, context); - Expression right = (Expression) context.getNamedParseExpressions().pop(); - return context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.BitwiseXor(left, right)); + return visitBinaryArithmetic(node, + (left,right)-> context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.BitwiseXor(left, right)), context); } @Override public Expression visitNot(Not node, CatalystPlanContext context) { node.getExpression().accept(this, context); - Expression arg = (Expression) context.getNamedParseExpressions().pop(); - return context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.Not(arg)); + Optional arg = context.popNamedParseExpressions(); + return arg.map(expression -> context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.Not(expression))).orElse(null); } @Override public Expression visitSpan(Span node, CatalystPlanContext context) { node.getField().accept(this, context); - Expression field = (Expression) context.getNamedParseExpressions().pop(); + Expression field = (Expression) context.popNamedParseExpressions().get(); node.getValue().accept(this, context); - Expression value = (Expression) context.getNamedParseExpressions().pop(); + Expression value = (Expression) context.popNamedParseExpressions().get(); return context.getNamedParseExpressions().push(window(field, value, node.getUnit())); } @Override public Expression visitAggregateFunction(AggregateFunction node, CatalystPlanContext context) { node.getField().accept(this, context); - Expression arg = (Expression) context.getNamedParseExpressions().pop(); + Expression arg = (Expression) context.popNamedParseExpressions().get(); Expression aggregator = AggregatorTranslator.aggregator(node, arg); return context.getNamedParseExpressions().push(aggregator); } @@ -312,32 +336,31 @@ public Expression visitAggregateFunction(AggregateFunction node, CatalystPlanCon @Override public Expression visitCompare(Compare node, CatalystPlanContext context) { analyze(node.getLeft(), context); - Expression left = (Expression) context.getNamedParseExpressions().pop(); + Optional left = context.popNamedParseExpressions(); analyze(node.getRight(), context); - Expression right = (Expression) context.getNamedParseExpressions().pop(); - Predicate comparator = ComparatorTransformer.comparator(node, left, right); - return context.getNamedParseExpressions().push((org.apache.spark.sql.catalyst.expressions.Expression) comparator); + Optional right = context.popNamedParseExpressions(); + if (left.isPresent() && right.isPresent()) { + Predicate comparator = ComparatorTransformer.comparator(node, left.get(), right.get()); + return context.getNamedParseExpressions().push((org.apache.spark.sql.catalyst.expressions.Expression) comparator); + } + return null; } @Override public Expression visitQualifiedName(QualifiedName node, CatalystPlanContext context) { + List relation = findRelation(context.traversalContext()); + if (!relation.isEmpty()) { + Optional resolveField = resolveField(relation, node); + return resolveField.map(qualifiedName -> context.getNamedParseExpressions().push(UnresolvedAttribute$.MODULE$.apply(seq(qualifiedName.getParts())))) + .orElse(null); + } return context.getNamedParseExpressions().push(UnresolvedAttribute$.MODULE$.apply(seq(node.getParts()))); } - - @Override - public Expression visitField(Field node, CatalystPlanContext context) { - return context.getNamedParseExpressions().push(UnresolvedAttribute$.MODULE$.apply(seq(node.getField().toString()))); - } - - @Override - public Expression visitCorrelation(Correlation node, CatalystPlanContext context) { - return super.visitCorrelation(node, context); - } - + @Override public Expression visitCorrelationMapping(FieldsMapping node, CatalystPlanContext context) { - return node.getChild().stream().map(expression -> - visitCompare((Compare) expression, context) + return node.getChild().stream().map(expression -> + visitCompare((Compare) expression, context) ).reduce(org.apache.spark.sql.catalyst.expressions.And::new).orElse(null); } @@ -354,7 +377,7 @@ public Expression visitAllFields(AllFields node, CatalystPlanContext context) { @Override public Expression visitAlias(Alias node, CatalystPlanContext context) { node.getDelegated().accept(this, context); - Expression arg = context.getNamedParseExpressions().pop(); + Expression arg = context.popNamedParseExpressions().get(); return context.getNamedParseExpressions().push( org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(arg, node.getAlias() != null ? node.getAlias() : node.getName(), diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ComparatorTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ComparatorTransformer.java index 2a176ec3d..a0e6d974b 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ComparatorTransformer.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ComparatorTransformer.java @@ -54,5 +54,4 @@ static Predicate comparator(Compare expression, Expression left, Expression righ } throw new IllegalStateException("Not Supported value: " + expression.getOperator()); } - } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/JoinSpecTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/JoinSpecTransformer.java index 96163e20d..71a2ec9ec 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/JoinSpecTransformer.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/JoinSpecTransformer.java @@ -29,24 +29,23 @@ public interface JoinSpecTransformer { * @param fields - fields (columns) that needed to be joined by * @param scope - this is a time base expression that timeframes the join to a specific period : (Time-field-name, value, unit) * @param mapping - in case fields in different relations have different name, that can be aliased with the following names - * @param context - parent context including the plan to evolve to join with * @return */ - static LogicalPlan join(Correlation.CorrelationType correlationType, Seq fields, Expression scope, Seq mapping, CatalystPlanContext context) { + static LogicalPlan join(Correlation.CorrelationType correlationType, Seq fields, Expression scope, Seq mapping, LogicalPlan left, LogicalPlan right) { //create a join statement - which will replace all the different plans with a single plan which contains the joined plans switch (correlationType) { case self: //expecting exactly one source relation - if (context.getPlanBranches().size() != 1) + if (left != null && right != null) throw new IllegalStateException("Correlation command with `inner` type must have exactly on source table "); break; case exact: //expecting at least two source relations - if (context.getPlanBranches().size() < 2) + if (left == null || right == null) throw new IllegalStateException("Correlation command with `exact` type must at least two source tables "); break; case approximate: - if (context.getPlanBranches().size() < 2) + if (left == null || right == null) throw new IllegalStateException("Correlation command with `approximate` type must at least two source tables "); //expecting at least two source relations break; @@ -54,11 +53,8 @@ static LogicalPlan join(Correlation.CorrelationType correlationType, Seq logicalPlans = seqAsJavaListConverter(context.retainAllPlans(p -> p)).asJava(); // Define join step instead on the multiple query branches - return context.with(logicalPlans.stream().reduce((left, right) - -> new Join(left, right, getType(correlationType), Option.apply(joinCondition), JoinHint.NONE())).get()); + return new Join(left, right, getType(correlationType), Option.apply(joinCondition), JoinHint.NONE()); } static Expression buildJoinCondition(List fields, List mapping, Correlation.CorrelationType correlationType) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java new file mode 100644 index 000000000..b402aaae5 --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java @@ -0,0 +1,33 @@ +package org.opensearch.sql.ppl.utils; + +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.opensearch.sql.ast.expression.QualifiedName; + +import java.util.List; +import java.util.Optional; + +public interface RelationUtils { + /** + * attempt resolving if the field is relating to the given relation + * if name doesnt contain table prefix - add the current relation prefix to the fields name - returns true + * if name does contain table prefix - verify field's table name corresponds to the current contextual relation + * + * @param relations + * @param node + * @return + */ + static Optional resolveField(List relations, QualifiedName node) { + return relations.stream() + .map(rel -> { + //if name doesnt contain table prefix - add the current relation prefix to the fields name - returns true + if (node.getPrefix().isEmpty()) +// return Optional.of(QualifiedName.of(relation.tableName(), node.getParts().toArray(new String[]{}))); + return Optional.of(node); + if (node.getPrefix().get().toString().equals(rel.tableName())) + return Optional.of(node); + return Optional.empty(); + }).filter(Optional::isPresent) + .map(field -> (QualifiedName) field.get()) + .findFirst(); + } +} diff --git a/spark-sql-integration/README.md b/spark-sql-integration/README.md new file mode 100644 index 000000000..07bf46406 --- /dev/null +++ b/spark-sql-integration/README.md @@ -0,0 +1,109 @@ +# Spark SQL Application + +This application execute sql query and store the result in OpenSearch index in following format +``` +"stepId":"", +"applicationId":"" +"schema": "json blob", +"result": "json blob" +``` + +## Prerequisites + ++ Spark 3.3.1 ++ Scala 2.12.15 ++ flint-spark-integration + +## Usage + +To use this application, you can run Spark with Flint extension: + +``` +./bin/spark-submit \ + --class org.opensearch.sql.SQLJob \ + --jars \ + sql-job.jar \ + \ + \ + \ + \ + \ + \ + \ +``` + +## Result Specifications + +Following example shows how the result is written to OpenSearch index after query execution. + +Let's assume sql query result is +``` ++------+------+ +|Letter|Number| ++------+------+ +|A |1 | +|B |2 | +|C |3 | ++------+------+ +``` +OpenSearch index document will look like +```json +{ + "_index" : ".query_execution_result", + "_id" : "A2WOsYgBMUoqCqlDJHrn", + "_score" : 1.0, + "_source" : { + "result" : [ + "{'Letter':'A','Number':1}", + "{'Letter':'B','Number':2}", + "{'Letter':'C','Number':3}" + ], + "schema" : [ + "{'column_name':'Letter','data_type':'string'}", + "{'column_name':'Number','data_type':'integer'}" + ], + "stepId" : "s-JZSB1139WIVU", + "applicationId" : "application_1687726870985_0003" + } +} +``` + +## Build + +To build and run this application with Spark, you can run: + +``` +sbt clean sparkSqlApplicationCosmetic/publishM2 +``` + +## Test + +To run tests, you can use: + +``` +sbt test +``` + +## Scalastyle + +To check code with scalastyle, you can run: + +``` +sbt scalastyle +``` + +## Code of Conduct + +This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md). + +## Security + +If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public GitHub issue. + +## License + +See the [LICENSE](../LICENSE.txt) file for our project's licensing. We will ask you to confirm the licensing of your contribution. + +## Copyright + +Copyright OpenSearch Contributors. See [NOTICE](../NOTICE) for details. \ No newline at end of file