Skip to content

Commit

Permalink
addressing PR comments (added addtional integ tests, doc changes)
Browse files Browse the repository at this point in the history
Signed-off-by: Kenrick Yap <[email protected]>
  • Loading branch information
14yapkc1 committed Dec 19, 2024
1 parent 543740d commit 91ac8e5
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 80 deletions.
2 changes: 1 addition & 1 deletion docs/ppl-lang/functions/ppl-ip.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Note:
`GEOIP(ip[, property]...)` retrieves geospatial data corresponding to the provided `ip`.

**Argument type:**
- `ip` is string be **STRING**.
- `ip` is string be **STRING** representing an IPv4 or an IPv6 address.
- `property` is **STRING** and must be one of the following:
- `COUNTRY_ISO_CODE`
- `COUNTRY_NAME`
Expand Down
22 changes: 14 additions & 8 deletions docs/ppl-lang/planning/ppl-geoip-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
geoip function to add information about the geographical location of an IPv4 or IPv6 address

**Implementation syntax**
- `... | eval geoinfo = geoip([datasource,] ipAddress *[,properties])`
- `... | eval geoinfo = geoip(ipAddress *[,properties])`
- generic syntax
- `... | eval geoinfo = geoip(ipAddress)`
- use the default geoip datasource
- `... | eval geoinfo = geoip("abc", ipAddress)`
- use the "abc" geoip datasource
- retrieves all geo data
- `... | eval geoinfo = geoip(ipAddress, city, location)`
- use the default geoip datasource, retrieve only city, and location
- `... | eval geoinfo = geoip("abc", ipAddress, city, location")`
- use the "abc" geoip datasource, retrieve only city, and location
- retrieve only city, and location

**Implementation details**
- Current implementation requires user to have created a geoip table. Geoip table has the following schema:
Expand Down Expand Up @@ -44,13 +40,23 @@ geoip function to add information about the geographical location of an IPv4 or
AND geoip.ip_range_end > ip_to_int(source.ip)
AND geoip.ip_type = is_ipv4(source.ip);
```
- in the case that only one property is provided in function call, `geoip` returns string of specified property instead:

```SQL
SELECT source.*, geoip.country_name AS a
FROM source, geoip
WHERE geoip.ip_range_start <= ip_to_int(source.ip)
AND geoip.ip_range_end > ip_to_int(source.ip)
AND geoip.ip_type = is_ipv4(source.ip);
```

**Future plan for additional data-sources**

- Currently only using pre-existing geoip table defined within spark is possible.
- There is future plans to allow users to specify data sources:
- API data sources - if users have their own geoip provided will create ability for users to configure and call said endpoints
- OpenSearch geospatial client - once geospatial client is published we can leverage client to utilize opensearch geo2ip functionality.
- OpenSearch geospatial client - once geospatial client is published we can leverage client to utilize OpenSearch geo2ip functionality.
- Additional datasource connection params will be provided through spark config options.

### New syntax definition in ANTLR

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,17 +776,18 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| CREATE TABLE $testTable
| (
| ip STRING,
| ipv4 STRING,
| isValid BOOLEAN
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES ('66.249.157.90', true),
| ('2a09:bac2:19f8:2ac3::', true),
| ('192.168.2.', false),
| ('2001:db8::ff00:12:', false)
| VALUES ('66.249.157.90', '66.249.157.90', true),
| ('2a09:bac2:19f8:2ac3::', 'Given IPv6 is not mapped to IPv4', true),
| ('192.168.2.', '192.168.2.', false),
| ('2001:db8::ff00:12:', 'Given IPv6 is not mapped to IPv4', false)
| """.stripMargin)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@

package org.opensearch.flint.spark.ppl

import java.util

import org.opensearch.sql.expression.function.SerializableUdf.visit
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, CreateNamedStruct, EqualTo, Expression, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.catalyst.plans.LeftOuter
import org.apache.spark.sql.catalyst.plans.logical.{DataFrameDropColumns, Filter, Join, JoinHint, LogicalPlan, Project, SubqueryAlias}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLGeoipITSuite
Expand Down Expand Up @@ -33,9 +43,54 @@ class FlintSparkPPLGeoipITSuite
}
}

private def getGeoIpQueryPlan(
ipAddress: UnresolvedAttribute,
left: LogicalPlan,
right: LogicalPlan,
projectionProperties: Alias): LogicalPlan = {
val joinPlan = getJoinPlan(ipAddress, left, right)
getProjection(joinPlan, projectionProperties)
}

private def getJoinPlan(
ipAddress: UnresolvedAttribute,
left: LogicalPlan,
right: LogicalPlan): LogicalPlan = {
val is_ipv4 = visit("is_ipv4", util.List.of[Expression](ipAddress))
val ip_to_int = visit("ip_to_int", util.List.of[Expression](ipAddress))

val t1 = SubqueryAlias("t1", left)
val t2 = SubqueryAlias("t2", right)

val joinCondition = And(
And(
GreaterThanOrEqual(ip_to_int, UnresolvedAttribute("t2.ip_range_start")),
LessThan(ip_to_int, UnresolvedAttribute("t2.ip_range_end"))),
EqualTo(is_ipv4, UnresolvedAttribute("t2.ipv4")))
Join(t1, t2, LeftOuter, Some(joinCondition), JoinHint.NONE)
}

private def getProjection(joinPlan: LogicalPlan, projectionProperties: Alias): LogicalPlan = {
val projection = Project(Seq(UnresolvedStar(None), projectionProperties), joinPlan)
val dropList = Seq(
"t2.country_iso_code",
"t2.country_name",
"t2.continent_name",
"t2.region_iso_code",
"t2.region_name",
"t2.city_name",
"t2.time_zone",
"t2.location",
"t2.cidr",
"t2.ip_range_start",
"t2.ip_range_end",
"t2.ipv4").map(UnresolvedAttribute(_))
DataFrameDropColumns(dropList, projection)
}

test("test geoip with no parameters") {
val frame = sql(s"""
| source = $testTable| where isValid = true | eval a = geoip(ip) | fields ip, a
| source = $testTable | where isValid = true | eval a = geoip(ip) | fields ip, a
| """.stripMargin)

// Retrieve the results
Expand Down Expand Up @@ -69,11 +124,44 @@ class FlintSparkPPLGeoipITSuite
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Compare the logical plans
val logicalPlan: LogicalPlan = frame.queryExecution.logical

val sourceTable: LogicalPlan = Filter(
EqualTo(UnresolvedAttribute("isValid"), Literal(true)),
UnresolvedRelation(seq(testTable)))
val geoTable: LogicalPlan = UnresolvedRelation(seq("geoip"))
val projectionStruct = CreateNamedStruct(
Seq(
Literal("country_iso_code"),
UnresolvedAttribute("t2.country_iso_code"),
Literal("country_name"),
UnresolvedAttribute("t2.country_name"),
Literal("continent_name"),
UnresolvedAttribute("t2.continent_name"),
Literal("region_iso_code"),
UnresolvedAttribute("t2.region_iso_code"),
Literal("region_name"),
UnresolvedAttribute("t2.region_name"),
Literal("city_name"),
UnresolvedAttribute("t2.city_name"),
Literal("time_zone"),
UnresolvedAttribute("t2.time_zone"),
Literal("location"),
UnresolvedAttribute("t2.location")))
val structProjection = Alias(projectionStruct, "a")()
val geoIpPlan =
getGeoIpQueryPlan(UnresolvedAttribute("ip"), sourceTable, geoTable, structProjection)
val expectedPlan: LogicalPlan =
Project(Seq(UnresolvedAttribute("ip"), UnresolvedAttribute("a")), geoIpPlan)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test geoip with one parameters") {
val frame = sql(s"""
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name) | fields ip, a
| source = $testTable | where isValid = true | eval a = geoip(ip, country_name) | fields ip, a
| """.stripMargin)

// Retrieve the results
Expand All @@ -85,11 +173,26 @@ class FlintSparkPPLGeoipITSuite
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Compare the logical plans
val logicalPlan: LogicalPlan = frame.queryExecution.logical

val sourceTable: LogicalPlan = Filter(
EqualTo(UnresolvedAttribute("isValid"), Literal(true)),
UnresolvedRelation(seq(testTable)))
val geoTable: LogicalPlan = UnresolvedRelation(seq("geoip"))
val structProjection = Alias(UnresolvedAttribute("t2.country_name"), "a")()
val geoIpPlan =
getGeoIpQueryPlan(UnresolvedAttribute("ip"), sourceTable, geoTable, structProjection)
val expectedPlan: LogicalPlan =
Project(Seq(UnresolvedAttribute("ip"), UnresolvedAttribute("a")), geoIpPlan)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test geoip with multiple parameters") {
val frame = sql(s"""
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name, city_name) | fields ip, a
| source = $testTable | where isValid = true | eval a = geoip(ip, country_name, city_name) | fields ip, a
| """.stripMargin)

// Retrieve the results
Expand All @@ -102,5 +205,114 @@ class FlintSparkPPLGeoipITSuite
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Compare the logical plans
val logicalPlan: LogicalPlan = frame.queryExecution.logical

val sourceTable: LogicalPlan = Filter(
EqualTo(UnresolvedAttribute("isValid"), Literal(true)),
UnresolvedRelation(seq(testTable)))
val geoTable: LogicalPlan = UnresolvedRelation(seq("geoip"))
val projectionStruct = CreateNamedStruct(
Seq(
Literal("country_name"),
UnresolvedAttribute("t2.country_name"),
Literal("city_name"),
UnresolvedAttribute("t2.city_name")))
val structProjection = Alias(projectionStruct, "a")()
val geoIpPlan =
getGeoIpQueryPlan(UnresolvedAttribute("ip"), sourceTable, geoTable, structProjection)
val expectedPlan: LogicalPlan =
Project(Seq(UnresolvedAttribute("ip"), UnresolvedAttribute("a")), geoIpPlan)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test geoip with partial projection on evaluated fields") {
val frame = sql(s"""
| source = $testTable | where isValid = true | eval a = geoip(ip, city_name), b = geoip(ip, country_name) | fields ip, b
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row("66.249.157.90", "Portmore"), Row("2a09:bac2:19f8:2ac3::", "Charlottetown"))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Compare the logical plans
val logicalPlan: LogicalPlan = frame.queryExecution.logical

val sourceTable: LogicalPlan = Filter(
EqualTo(UnresolvedAttribute("isValid"), Literal(true)),
UnresolvedRelation(seq(testTable)))
val geoTable: LogicalPlan = UnresolvedRelation(seq("geoip"))

val structProjectionA = Alias(UnresolvedAttribute("t2.city_name"), "a")()
val geoIpPlanA =
getGeoIpQueryPlan(UnresolvedAttribute("ip"), sourceTable, geoTable, structProjectionA)

val structProjectionB = Alias(UnresolvedAttribute("t2.country_name"), "b")()
val geoIpPlanB =
getGeoIpQueryPlan(UnresolvedAttribute("ip"), geoIpPlanA, geoTable, structProjectionB)

val expectedPlan: LogicalPlan =
Project(Seq(UnresolvedAttribute("ip"), UnresolvedAttribute("b")), geoIpPlanB)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test geoip with projection on field that exists in both source and geoip table") {
val frame = sql(s"""
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name) | fields ipv4, a
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(
Row("66.249.157.90", "Portmore"),
Row("Given IPv6 is not mapped to IPv4", "Charlottetown"))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Compare the logical plans
val logicalPlan: LogicalPlan = frame.queryExecution.logical

val sourceTable: LogicalPlan = Filter(
EqualTo(UnresolvedAttribute("isValid"), Literal(true)),
UnresolvedRelation(seq(testTable)))
val geoTable: LogicalPlan = UnresolvedRelation(seq("geoip"))
val structProjection = Alias(UnresolvedAttribute("t2.country_name"), "a")()
val geoIpPlan =
getGeoIpQueryPlan(UnresolvedAttribute("ip"), sourceTable, geoTable, structProjection)
val expectedPlan: LogicalPlan =
Project(Seq(UnresolvedAttribute("ipv4"), UnresolvedAttribute("a")), geoIpPlan)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test geoip with invalid parameter") {
val frame = sql(s"""
| source = $testTable | where isValid = true | eval a = geoip(ip, invalid_param) | fields ip, a
| """.stripMargin)

// Retrieve the results
assertThrows[SparkException](frame.collect())
}

test("test geoip with invalid ip address provided") {
val frame = sql(s"""
| source = $testTable | eval a = geoip(ip) | fields ip, a
| """.stripMargin)

// Retrieve the results
assertThrows[SparkException](frame.collect())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,17 @@ public Boolean apply(String ipAddress, String cidrBlock) {
}

return parsedCidrBlock.contains(parsedIpAddress);

}};
}
};

class geoIpUtils {
/**
* Append values to JSON arrays based on specified path-values.
*
* @param jsonStr The input JSON string.
* @param elements A list of path-values where the first item is the path and subsequent items are values to append.
* @return The updated JSON string.
*/
public static Function1<String,Boolean> isIpv4 = new SerializableAbstractFunction1<>() {

IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder()
Expand All @@ -170,6 +177,13 @@ public Boolean apply(String ipAddress) {
}
};

/**
* Append values to JSON arrays based on specified path-values.
*
* @param jsonStr The input JSON string.
* @param elements A list of path-values where the first item is the path and subsequent items are values to append.
* @return The updated JSON string.
*/
public static Function1<String,BigInteger> ipToInt = new SerializableAbstractFunction1<>() {
@Override
public BigInteger apply(String ipAddress) {
Expand Down Expand Up @@ -224,6 +238,24 @@ static ScalaUDF visit(String funcName, List<Expression> expressions) {
Option.apply("json_append"),
false,
true);
case "is_ipv4":
return new ScalaUDF(geoIpUtils.isIpv4,
DataTypes.BooleanType,
seq(expressions),
seq(),
Option.empty(),
Option.apply("is_ipv4"),
false,
true);
case "ip_to_int":
return new ScalaUDF(geoIpUtils.ipToInt,
DataTypes.createDecimalType(38,0),
seq(expressions),
seq(),
Option.empty(),
Option.apply("ip_to_int"),
false,
true);
default:
return null;
}
Expand Down
Loading

0 comments on commit 91ac8e5

Please sign in to comment.