forked from opensearch-project/opensearch-spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Kenrick Yap <[email protected]>
- Loading branch information
Showing
15 changed files
with
1,050 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
## geoip syntax proposal | ||
|
||
geoip function to add information about the geographical location of an IPv4 or IPv6 address | ||
|
||
**Implementation syntax** | ||
- `... | eval geoinfo = geoip([datasource,] ipAddress *[,properties])` | ||
- generic syntax | ||
- `... | eval geoinfo = geoip(ipAddress)` | ||
- use the default geoip datasource | ||
- `... | eval geoinfo = geoip("abc", ipAddress)` | ||
- use the "abc" geoip datasource | ||
- `... | eval geoinfo = geoip(ipAddress, city, location)` | ||
- use the default geoip datasource, retrieve only city, and location | ||
- `... | eval geoinfo = geoip("abc", ipAddress, city, location")` | ||
- use the "abc" geoip datasource, retrieve only city, and location | ||
|
||
**Implementation details** | ||
- Current implementation requires user to have created a geoip table. Geoip table has the following schema: | ||
|
||
```SQL | ||
CREATE TABLE geoip ( | ||
cidr STRING, | ||
country_iso_code STRING, | ||
country_name STRING, | ||
continent_name STRING, | ||
region_iso_code STRING, | ||
region_name STRING, | ||
city_name STRING, | ||
time_zone STRING, | ||
location STRING, | ||
ip_range_start BIGINT, | ||
ip_range_end BIGINT, | ||
ipv4 BOOLEAN | ||
) | ||
``` | ||
|
||
- `geoip` is resolved by performing a join on said table and projecting the resulting geoip data as a struct. | ||
- an example of using `geoip` is equivalent to running the following SQL query: | ||
|
||
```SQL | ||
SELECT source.*, struct(geoip.country_name, geoip.city_name) AS a | ||
FROM source, geoip | ||
WHERE geoip.ip_range_start <= ip_to_int(source.ip) | ||
AND geoip.ip_range_end > ip_to_int(source.ip) | ||
AND geoip.ip_type = is_ipv4(source.ip); | ||
``` | ||
|
||
**Future plan for additional data-sources** | ||
|
||
- Currently only using pre-existing geoip table defined within spark is possible. | ||
- There is future plans to allow users to specify data sources: | ||
- API data sources - if users have their own geoip provided will create ability for users to configure and call said endpoints | ||
- OpenSearch geospatial client - once geospatial client is published we can leverage client to utilize opensearch geo2ip functionality. | ||
|
||
### New syntax definition in ANTLR | ||
|
||
```ANTLR | ||
// functions | ||
evalFunctionCall | ||
: evalFunctionName LT_PRTHS functionArgs RT_PRTHS | ||
| geoipFunction | ||
; | ||
geoipFunction | ||
: GEOIP LT_PRTHS (datasource = functionArg COMMA)? ipAddress = functionArg (COMMA properties = stringLiteral)? RT_PRTHS | ||
; | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
...test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGeoipITSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark.ppl | ||
|
||
import org.apache.spark.sql.{QueryTest, Row} | ||
import org.apache.spark.sql.streaming.StreamTest | ||
|
||
class FlintSparkPPLGeoipITSuite | ||
extends QueryTest | ||
with LogicalPlanTestUtils | ||
with FlintPPLSuite | ||
with StreamTest { | ||
|
||
/** Test table and index name */ | ||
private val testTable = "spark_catalog.default.flint_ppl_test" | ||
override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
|
||
// Create test table | ||
createGeoIpTestTable(testTable) | ||
createGeoIpTable() | ||
} | ||
|
||
protected override def afterEach(): Unit = { | ||
super.afterEach() | ||
// Stop all streaming jobs if any | ||
spark.streams.active.foreach { job => | ||
job.stop() | ||
job.awaitTermination() | ||
} | ||
} | ||
|
||
test("test geoip with no parameters") { | ||
val frame = sql( | ||
s""" | ||
| source = $testTable| where isValid = true | eval a = geoip(ip) | fields ip, a | ||
| """.stripMargin) | ||
|
||
// Retrieve the results | ||
val results: Array[Row] = frame.collect() | ||
// Define the expected results | ||
val expectedResults: Array[Row] = Array( | ||
Row("66.249.157.90", Row("JM", "Jamaica", "North America", 14, "Saint Catherine Parish", "Portmore", "America/Jamaica", "17.9686, -76.8827")), | ||
Row("2a09:bac2:19f8:2ac3::", Row("CA", "Canada", "North America", "PE", "Prince Edward Island", "Charlottetown", "America/Halifax", "46.2396, -63.1355")) | ||
) | ||
|
||
// Compare the results | ||
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) | ||
assert(results.sorted.sameElements(expectedResults.sorted)) | ||
} | ||
|
||
test("test geoip with one parameters") { | ||
val frame = sql( | ||
s""" | ||
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name) | fields ip, a | ||
| """.stripMargin) | ||
|
||
// Retrieve the results | ||
val results: Array[Row] = frame.collect() | ||
// Define the expected results | ||
val expectedResults: Array[Row] = Array( | ||
Row("66.249.157.90", "Jamaica"), | ||
Row("2a09:bac2:19f8:2ac3::", "Canada") | ||
) | ||
|
||
// Compare the results | ||
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) | ||
assert(results.sorted.sameElements(expectedResults.sorted)) | ||
} | ||
|
||
test("test geoip with multiple parameters") { | ||
val frame = sql( | ||
s""" | ||
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name, city_name) | fields ip, a | ||
| """.stripMargin) | ||
|
||
// Retrieve the results | ||
val results: Array[Row] = frame.collect() | ||
// Define the expected results | ||
val expectedResults: Array[Row] = Array( | ||
Row("66.249.157.90", Row("Jamaica", "Portmore")), | ||
Row("2a09:bac2:19f8:2ac3::", Row("Canada", "Charlottetown")) | ||
) | ||
|
||
// Compare the results | ||
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) | ||
assert(results.sorted.sameElements(expectedResults.sorted)) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.