Skip to content

Commit

Permalink
geoip function implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Kenrick Yap <[email protected]>
  • Loading branch information
14yapkc1 committed Dec 11, 2024
1 parent 896fda2 commit f8d243d
Show file tree
Hide file tree
Showing 15 changed files with 1,050 additions and 31 deletions.
65 changes: 64 additions & 1 deletion docs/ppl-lang/functions/ppl-ip.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,67 @@ Note:
- `ip` can be an IPv4 or an IPv6 address
- `cidr` can be an IPv4 or an IPv6 block
- `ip` and `cidr` must be either both IPv4 or both IPv6
- `ip` and `cidr` must both be valid and non-empty/non-null
- `ip` and `cidr` must both be valid and non-empty/non-null

### `GEOIP`

**Description**

`GEOIP(ip[, property]...)` retrieves geospatial data corresponding to the provided `ip`.

**Argument type:**
- `ip` is string be **STRING**.
- `property` is **STRING** and must be one of the following:
- `COUNTRY_ISO_CODE`
- `COUNTRY_NAME`
- `CONTINENT_NAME`
- `REGION_ISO_CODE`
- `REGION_NAME`
- `CITY_NAME`
- `TIME_ZONE`
- `LOCATION`
- Return type:
- **STRING** if one property given
- **STRUCT_TYPE** if more than one or no property is given

Example:

_Without properties:_

os> source=ips | eval a = geoip(ip) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+-------------------------------------------------------------------------------------------------------+
|ip |lol |
+---------------------+-------------------------------------------------------------------------------------------------------+
|66.249.157.90 |{JM, Jamaica, North America, 14, Saint Catherine Parish, Portmore, America/Jamaica, 17.9686,-76.8827} |
|2a09:bac2:19f8:2ac3::|{CA, Canada, North America, PE, Prince Edward Island, Charlottetown, America/Halifax, 46.2396,-63.1355}|
+---------------------+-------+------+-------------------------------------------------------------------------------------------------------+

_With one property:_

os> source=users | eval a = geoip(ip, COUNTRY_NAME) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+-------+
|ip |a |
+---------------------+-------+
|66.249.157.90 |Jamaica|
|2a09:bac2:19f8:2ac3::|Canada |
+---------------------+-------+

_With multiple properties:_

os> source=users | eval a = geoip(ip, COUNTRY_NAME, REGION_NAME, CITY_NAME) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+---------------------------------------------+
|ip |a |
+---------------------+---------------------------------------------+
|66.249.157.90 |{Jamaica, Saint Catherine Parish, Portmore} |
|2a09:bac2:19f8:2ac3::|{Canada, Prince Edward Island, Charlottetown}|
+---------------------+---------------------------------------------+

Note:
- To use `geoip` user must create spark table containing geo ip location data. Instructions to create table can be found [here](../../opensearch-geoip.md).
- `geoip` command by default expects the created table to be called `geoip_ip_data`.
- if a different table name is desired, can set `spark.geoip.tablename` spark config to new table name.
- `ip` can be an IPv4 or an IPv6 address.
- `geoip` commands will always calculated first if used with other eval functions.
68 changes: 68 additions & 0 deletions docs/ppl-lang/planning/ppl-geoip-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
## geoip syntax proposal

geoip function to add information about the geographical location of an IPv4 or IPv6 address

**Implementation syntax**
- `... | eval geoinfo = geoip([datasource,] ipAddress *[,properties])`
- generic syntax
- `... | eval geoinfo = geoip(ipAddress)`
- use the default geoip datasource
- `... | eval geoinfo = geoip("abc", ipAddress)`
- use the "abc" geoip datasource
- `... | eval geoinfo = geoip(ipAddress, city, location)`
- use the default geoip datasource, retrieve only city, and location
- `... | eval geoinfo = geoip("abc", ipAddress, city, location")`
- use the "abc" geoip datasource, retrieve only city, and location

**Implementation details**
- Current implementation requires user to have created a geoip table. Geoip table has the following schema:

```SQL
CREATE TABLE geoip (
cidr STRING,
country_iso_code STRING,
country_name STRING,
continent_name STRING,
region_iso_code STRING,
region_name STRING,
city_name STRING,
time_zone STRING,
location STRING,
ip_range_start BIGINT,
ip_range_end BIGINT,
ipv4 BOOLEAN
)
```

- `geoip` is resolved by performing a join on said table and projecting the resulting geoip data as a struct.
- an example of using `geoip` is equivalent to running the following SQL query:

```SQL
SELECT source.*, struct(geoip.country_name, geoip.city_name) AS a
FROM source, geoip
WHERE geoip.ip_range_start <= ip_to_int(source.ip)
AND geoip.ip_range_end > ip_to_int(source.ip)
AND geoip.ip_type = is_ipv4(source.ip);
```

**Future plan for additional data-sources**

- Currently only using pre-existing geoip table defined within spark is possible.
- There is future plans to allow users to specify data sources:
- API data sources - if users have their own geoip provided will create ability for users to configure and call said endpoints
- OpenSearch geospatial client - once geospatial client is published we can leverage client to utilize opensearch geo2ip functionality.

### New syntax definition in ANTLR

```ANTLR
// functions
evalFunctionCall
: evalFunctionName LT_PRTHS functionArgs RT_PRTHS
| geoipFunction
;
geoipFunction
: GEOIP LT_PRTHS (datasource = functionArg COMMA)? ipAddress = functionArg (COMMA properties = stringLiteral)? RT_PRTHS
;
```
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,82 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| """.stripMargin)
}

protected def createGeoIpTestTable(testTable: String): Unit = {
sql(
s"""
| CREATE TABLE $testTable
| (
| ip STRING,
| isValid BOOLEAN
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(
s"""
| INSERT INTO $testTable
| VALUES ('66.249.157.90', true),
| ('2a09:bac2:19f8:2ac3::', true),
| ('192.168.2.', false),
| ('2001:db8::ff00:12:', false)
| """.stripMargin)
}

protected def createGeoIpTable(): Unit = {
sql(
s"""
| CREATE TABLE geoip
| (
| cidr STRING,
| country_iso_code STRING,
| country_name STRING,
| continent_name STRING,
| region_iso_code STRING,
| region_name STRING,
| city_name STRING,
| time_zone STRING,
| location STRING,
| ip_range_start BIGINT,
| ip_range_end BIGINT,
| ipv4 BOOLEAN
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(
s"""
| INSERT INTO geoip
| VALUES (
| '66.249.157.0/24',
| 'JM',
| 'Jamaica',
| 'North America',
| '14',
| 'Saint Catherine Parish',
| 'Portmore',
| 'America/Jamaica',
| '17.9686,-76.8827',
| 1123654912,
| 1123655167,
| true
| ),
| VALUES (
| '2a09:bac2:19f8::/45',
| `'CA',
| 'Canada',
| 'North America',
| 'PE',
| 'Prince Edward Island',
| 'Charlottetown',
| 'America/Halifax',
| '46.2396,-63.1355',
| 55878094401180025937395073088449675264,
| 55878094401189697343951990121847324671,
| false
| )
| """.stripMargin)
}

protected def createNestedJsonContentTable(tempFile: Path, testTable: String): Unit = {
val json =
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLGeoipITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createGeoIpTestTable(testTable)
createGeoIpTable()
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test geoip with no parameters") {
val frame = sql(
s"""
| source = $testTable| where isValid = true | eval a = geoip(ip) | fields ip, a
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("66.249.157.90", Row("JM", "Jamaica", "North America", 14, "Saint Catherine Parish", "Portmore", "America/Jamaica", "17.9686, -76.8827")),
Row("2a09:bac2:19f8:2ac3::", Row("CA", "Canada", "North America", "PE", "Prince Edward Island", "Charlottetown", "America/Halifax", "46.2396, -63.1355"))
)

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test geoip with one parameters") {
val frame = sql(
s"""
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name) | fields ip, a
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("66.249.157.90", "Jamaica"),
Row("2a09:bac2:19f8:2ac3::", "Canada")
)

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test geoip with multiple parameters") {
val frame = sql(
s"""
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name, city_name) | fields ip, a
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("66.249.157.90", Row("Jamaica", "Portmore")),
Row("2a09:bac2:19f8:2ac3::", Row("Canada", "Charlottetown"))
)

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}
}
15 changes: 12 additions & 3 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,6 @@ ISPRESENT: 'ISPRESENT';
BETWEEN: 'BETWEEN';
CIDRMATCH: 'CIDRMATCH';

// Geo Loction
GEOIP: 'GEOIP';

// FLOWCONTROL FUNCTIONS
IFNULL: 'IFNULL';
NULLIF: 'NULLIF';
Expand All @@ -428,6 +425,18 @@ TYPEOF: 'TYPEOF';
//OTHER CONDITIONAL EXPRESSIONS
COALESCE: 'COALESCE';

//GEOLOCATION FUNCTIONS
GEOIP: 'GEOIP';

//GEOLOCATION PROPERTIES
COUNTRY_ISO_CODE: 'COUNTRY_ISO_CODE';
COUNTRY_NAME: 'COUNTRY_NAME';
CONTINENT_NAME: 'CONTINENT_NAME';
REGION_ISO_CODE: 'REGION_ISO_CODE';
REGION_NAME: 'REGION_NAME';
CITY_NAME: 'CITY_NAME';
LOCATION: 'LOCATION';

// RELEVANCE FUNCTIONS AND PARAMETERS
MATCH: 'MATCH';
MATCH_PHRASE: 'MATCH_PHRASE';
Expand Down
Loading

0 comments on commit f8d243d

Please sign in to comment.