From 916b7d126331a99a7109fae9dce1fd4d737e15e6 Mon Sep 17 00:00:00 2001 From: Kenrick Yap <14yapkc1@gmail.com> Date: Tue, 10 Dec 2024 14:18:49 -0800 Subject: [PATCH] added integration tests + addressing PR comments Signed-off-by: Kenrick Yap <14yapkc1@gmail.com> --- docs/ppl-lang/planning/ppl-geoip.md | 52 ++++++++--- .../flint/spark/FlintSparkSuite.scala | 76 +++++++++++++++ .../spark/ppl/FlintSparkPPLGeoipITSuite.scala | 92 +++++++++++++++++++ .../sql/ppl/CatalystQueryPlanVisitor.java | 2 +- 4 files changed, 210 insertions(+), 12 deletions(-) create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGeoipITSuite.scala diff --git a/docs/ppl-lang/planning/ppl-geoip.md b/docs/ppl-lang/planning/ppl-geoip.md index f6bef8f34..78aa8ce8f 100644 --- a/docs/ppl-lang/planning/ppl-geoip.md +++ b/docs/ppl-lang/planning/ppl-geoip.md @@ -2,25 +2,55 @@ geoip function to add information about the geographical location of an IPv4 or IPv6 address -1. **Proposed syntax** - - `... | eval geoinfo = geoip([datasource,] ipAddress [,properties])` +**Implementation syntax** + - `... | eval geoinfo = geoip([datasource,] ipAddress *[,properties])` - generic syntax - `... | eval geoinfo = geoip(ipAddress)` - use the default geoip datasource - `... | eval geoinfo = geoip("abc", ipAddress)` - use the "abc" geoip datasource - - `... | eval geoinfo = geoip(ipAddress, "city,lat,lon")` - - use the default geoip datasource, retrieve only city, lat and lon - - `... | eval geoinfo = geoip("abc", ipAddress, "city,lat,lon")` - - use the "abc" geoip datasource, retrieve only city, lat and lon + - `... | eval geoinfo = geoip(ipAddress, city, location)` + - use the default geoip datasource, retrieve only city, and location + - `... | eval geoinfo = geoip("abc", ipAddress, city, location")` + - use the "abc" geoip datasource, retrieve only city, and location +**Implementation details** + - Current implementation requires user to have created a geoip table. Geoip table has the following schema: + + ```SQL + CREATE TABLE geoip ( + cidr STRING, + country_iso_code STRING, + country_name STRING, + continent_name STRING, + region_iso_code STRING, + region_name STRING, + city_name STRING, + time_zone STRING, + location STRING, + ip_range_start BIGINT, + ip_range_end BIGINT, + ipv4 BOOLEAN + ) + ``` -2. **Proposed wiring with the geoip database** - - Leverage the functionality of the ip2geo processor - - ip2geo processor configuration, functionality and code will be used - - Prerequisite for the geoip is that ip2geo processor is configured properly - - See https://opensearch.org/docs/latest/ingest-pipelines/processors/ip2geo/ + - `geoip` is resolved by performing a join on said table and projecting the resulting geoip data as a struct. + - an example of using `geoip` is equivalent to running the following SQL query: + + ```SQL + SELECT source.*, struct(geoip.country_name, geoip.city_name) AS a + FROM source, geoip + WHERE geoip.ip_range_start <= ip_to_int(source.ip) + AND geoip.ip_range_end > ip_to_int(source.ip) + AND geoip.ip_type = is_ipv4(source.ip); + ``` + +**Future plan for additional data-sources** + - Currently only using pre-existing geoip table defined within spark is possible. + - There is future plans to allow users to specify data sources: + - API data sources - if users have their own geoip provided will create ability for users to configure and call said endpoints + - OpenSearch geospatial client - once geospatial client is published we can leverage client to utilize opensearch geo2ip functionality. ### New syntax definition in ANTLR diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 68d370791..2db1790b5 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -743,6 +743,82 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | """.stripMargin) } + protected def createGeoIpTestTable(testTable: String): Unit = { + sql( + s""" + | CREATE TABLE $testTable + | ( + | ip STRING, + | isValid BOOLEAN + | ) + | USING $tableType $tableOptions + |""".stripMargin) + + sql( + s""" + | INSERT INTO $testTable + | VALUES ('66.249.157.90', true), + | ('2a09:bac2:19f8:2ac3::', true), + | ('192.168.2.', false), + | ('2001:db8::ff00:12:', false) + | """.stripMargin) + } + + protected def createGeoIpTable() : Unit = { + sql( + s""" + | CREATE TABLE geoip + | ( + | cidr STRING, + | country_iso_code STRING, + | country_name STRING, + | continent_name STRING, + | region_iso_code STRING, + | region_name STRING, + | city_name STRING, + | time_zone STRING, + | location STRING, + | ip_range_start BIGINT, + | ip_range_end BIGINT, + | ipv4 BOOLEAN + | ) + | USING $tableType $tableOptions + |""".stripMargin) + + sql( + s""" + | INSERT INTO geoip + | VALUES ( + | '66.249.157.0/24', + | 'JM', + | 'Jamaica', + | 'North America', + | '14', + | 'Saint Catherine Parish', + | 'Portmore', + | 'America/Jamaica', + | '17.9686,-76.8827', + | 1123654912, + | 1123655167, + | true + | ), + | VALUES ( + | '2a09:bac2:19f8::/45', + | `'CA', + | 'Canada', + | 'North America', + | 'PE', + | 'Prince Edward Island', + | 'Charlottetown', + | 'America/Halifax', + | '46.2396,-63.1355', + | 55878094401180025937395073088449675264, + | 55878094401189697343951990121847324671, + | false + | ) + | """.stripMargin) + } + protected def createNestedJsonContentTable(tempFile: Path, testTable: String): Unit = { val json = """ diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGeoipITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGeoipITSuite.scala new file mode 100644 index 000000000..6f9534bba --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGeoipITSuite.scala @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.streaming.StreamTest + +class FlintSparkPPLGeoipITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.flint_ppl_test" + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test table + createGeoIpTestTable(testTable) + createGeoIpTable() + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("test geoip with no parameters") { + val frame = sql( + s""" + | source = $testTable| where isValid = true | eval a = geoip(ip) | fields ip, a + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("66.249.157.90", Row("JM", "Jamaica", "North America", 14, "Saint Catherine Parish", "Portmore", "America/Jamaica", "17.9686, -76.8827")), + Row("2a09:bac2:19f8:2ac3::", Row("CA", "Canada", "North America", "PE", "Prince Edward Island", "Charlottetown", "America/Halifax", "46.2396, -63.1355")) + ) + + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + + test("test geoip with one parameters") { + val frame = sql( + s""" + | source = $testTable| where isValid = true | eval a = geoip(ip, country_name) | fields ip, a + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("66.249.157.90", "Jamaica"), + Row("2a09:bac2:19f8:2ac3::", "Canada") + ) + + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + + test("test geoip with multiple parameters") { + val frame = sql( + s""" + | source = $testTable| where isValid = true | eval a = geoip(ip, country_name, city_name) | fields ip, a + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("66.249.157.90", Row("Jamaica", "Portmore")), + Row("2a09:bac2:19f8:2ac3::", Row("Canada", "Charlottetown")) + ) + + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 6d32db317..ec552d75a 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -566,7 +566,7 @@ public LogicalPlan visitEval(Eval node, CatalystPlanContext context) { // build the plan with the projection step return context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p)); } else { - return context.apply(p -> p); + return context.getPlan(); } }