diff --git a/docs/ppl-lang/functions/ppl-ip.md b/docs/ppl-lang/functions/ppl-ip.md index fb0b468ba..183d4f537 100644 --- a/docs/ppl-lang/functions/ppl-ip.md +++ b/docs/ppl-lang/functions/ppl-ip.md @@ -32,4 +32,67 @@ Note: - `ip` can be an IPv4 or an IPv6 address - `cidr` can be an IPv4 or an IPv6 block - `ip` and `cidr` must be either both IPv4 or both IPv6 - - `ip` and `cidr` must both be valid and non-empty/non-null \ No newline at end of file + - `ip` and `cidr` must both be valid and non-empty/non-null + +### `GEOIP` + +**Description** + +`GEOIP(ip[, property]...)` retrieves geospatial data corresponding to the provided `ip`. + +**Argument type:** +- `ip` is string be **STRING**. +- `property` is **STRING** and must be one of the following: + - `COUNTRY_ISO_CODE` + - `COUNTRY_NAME` + - `CONTINENT_NAME` + - `REGION_ISO_CODE` + - `REGION_NAME` + - `CITY_NAME` + - `TIME_ZONE` + - `LOCATION` +- Return type: + - **STRING** if one property given + - **STRUCT_TYPE** if more than one or no property is given + +Example: + +_Without properties:_ + + os> source=ips | eval a = geoip(ip) | fields ip, a + fetched rows / total rows = 2/2 + +---------------------+-------------------------------------------------------------------------------------------------------+ + |ip |lol | + +---------------------+-------------------------------------------------------------------------------------------------------+ + |66.249.157.90 |{JM, Jamaica, North America, 14, Saint Catherine Parish, Portmore, America/Jamaica, 17.9686,-76.8827} | + |2a09:bac2:19f8:2ac3::|{CA, Canada, North America, PE, Prince Edward Island, Charlottetown, America/Halifax, 46.2396,-63.1355}| + +---------------------+-------+------+-------------------------------------------------------------------------------------------------------+ + +_With one property:_ + + os> source=users | eval a = geoip(ip, COUNTRY_NAME) | fields ip, a + fetched rows / total rows = 2/2 + +---------------------+-------+ + |ip |a | + +---------------------+-------+ + |66.249.157.90 |Jamaica| + |2a09:bac2:19f8:2ac3::|Canada | + +---------------------+-------+ + +_With multiple properties:_ + + os> source=users | eval a = geoip(ip, COUNTRY_NAME, REGION_NAME, CITY_NAME) | fields ip, a + fetched rows / total rows = 2/2 + +---------------------+---------------------------------------------+ + |ip |a | + +---------------------+---------------------------------------------+ + |66.249.157.90 |{Jamaica, Saint Catherine Parish, Portmore} | + |2a09:bac2:19f8:2ac3::|{Canada, Prince Edward Island, Charlottetown}| + +---------------------+---------------------------------------------+ + +Note: +- To use `geoip` user must create spark table containing geo ip location data. Instructions to create table can be found [here](../../opensearch-geoip.md). + - `geoip` command by default expects the created table to be called `geoip_ip_data`. + - if a different table name is desired, can set `spark.geoip.tablename` spark config to new table name. +- `ip` can be an IPv4 or an IPv6 address. +- `geoip` commands will always calculated first if used with other eval functions. diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/GeoIp.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/GeoIp.java index feefa6929..7aed643da 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/GeoIp.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/GeoIp.java @@ -44,4 +44,4 @@ public UnresolvedPlan attach(UnresolvedPlan child) { this.child = child; return this; } -} \ No newline at end of file +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 4fb8929a9..3e0c15aee 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -54,41 +54,43 @@ public Boolean apply(String ipAddress, String cidrBlock) { return parsedCidrBlock.contains(parsedIpAddress); }}; - Function1 isIpv4 = new SerializableAbstractFunction1<>() { - - IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() - .allowEmpty(false) - .setEmptyAsLoopback(false) - .allow_inet_aton(false) - .allowSingleSegment(false) - .toParams(); - - @Override - public Boolean apply(String ipAddress) { - IPAddressString parsedIpAddress = new IPAddressString(ipAddress, valOptions); - - try { - parsedIpAddress.validate(); - } catch (AddressStringException e) { - throw new RuntimeException("The given ipAddress '"+ipAddress+"' is invalid. It must be a valid IPv4 or IPv6 address. Error details: "+e.getMessage()); + class geoIpUtils { + public static Function1 isIpv4 = new SerializableAbstractFunction1<>() { + + IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() + .allowEmpty(false) + .setEmptyAsLoopback(false) + .allow_inet_aton(false) + .allowSingleSegment(false) + .toParams(); + + @Override + public Boolean apply(String ipAddress) { + IPAddressString parsedIpAddress = new IPAddressString(ipAddress, valOptions); + + try { + parsedIpAddress.validate(); + } catch (AddressStringException e) { + throw new RuntimeException("The given ipAddress '"+ipAddress+"' is invalid. It must be a valid IPv4 or IPv6 address. Error details: "+e.getMessage()); + } + + return parsedIpAddress.isIPv4(); + }}; + + public static Function1 ipToInt = new SerializableAbstractFunction1<>() { + @Override + public BigInteger apply(String ipAddress) { + try { + InetAddress inetAddress = InetAddress.getByName(ipAddress); + byte[] addressBytes = inetAddress.getAddress(); + return new BigInteger(1, addressBytes); + } catch (UnknownHostException e) { + System.err.println("Invalid IP address: " + e.getMessage()); + } + return null; } - - return parsedIpAddress.isIPv4(); - }}; - - Function1 ipToInt = new SerializableAbstractFunction1<>() { - @Override - public BigInteger apply(String ipAddress) { - try { - InetAddress inetAddress = InetAddress.getByName(ipAddress); - byte[] addressBytes = inetAddress.getAddress(); - return new BigInteger(1, addressBytes); - } catch (UnknownHostException e) { - System.err.println("Invalid IP address: " + e.getMessage()); - } - return null; - } - }; + }; + } abstract class SerializableAbstractFunction1 extends AbstractFunction1 implements Serializable { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 17a59dc1c..7f007a0d6 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -73,7 +73,7 @@ import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.ppl.utils.FieldSummaryTransformer; -import org.opensearch.sql.ppl.utils.GeoipCatalystUtils; +import org.opensearch.sql.ppl.utils.GeoIpCatalystLogicalPlanTranslator; import org.opensearch.sql.ppl.utils.ParseTransformer; import org.opensearch.sql.ppl.utils.SortUtils; import org.opensearch.sql.ppl.utils.TrendlineCatalystUtils; @@ -587,8 +587,8 @@ public LogicalPlan visitGeoIp(GeoIp node, CatalystPlanContext context) { String fieldExpression = node.getField().getField().toString(); Expression ipAddressExpression = visitExpression(node.getIpAddress(), context); - return GeoipCatalystUtils.getGeoipLogicalPlan( - new GeoipCatalystUtils.GeoIpParameters( + return GeoIpCatalystLogicalPlanTranslator.getGeoipLogicalPlan( + new GeoIpCatalystLogicalPlanTranslator.GeoIpParameters( fieldExpression, ipAddressExpression, attributeList diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 1c80b6514..7dcebb56a 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -25,7 +25,6 @@ import org.opensearch.sql.ast.expression.EqualTo; import org.opensearch.sql.ast.expression.Field; import org.opensearch.sql.ast.expression.Function; -import org.opensearch.sql.ast.tree.GeoIp; import org.opensearch.sql.ast.expression.In; import org.opensearch.sql.ast.expression.Interval; import org.opensearch.sql.ast.expression.IntervalUnit; @@ -47,6 +46,7 @@ import org.opensearch.sql.ast.expression.subquery.ScalarSubquery; import org.opensearch.sql.common.utils.StringUtils; import org.opensearch.sql.ppl.utils.ArgumentFactory; +import org.opensearch.sql.ppl.utils.GeoIpCatalystLogicalPlanTranslator; import java.util.Arrays; import java.util.Collections; @@ -449,26 +449,14 @@ public UnresolvedExpression visitGeoIpPropertyList(OpenSearchPPLParser.GeoIpProp ImmutableList.Builder properties = ImmutableList.builder(); if (ctx != null) { for (OpenSearchPPLParser.GeoIpPropertyContext property : ctx.geoIpProperty()) { - String propertyName; - if (property.COUNTRY_ISO_CODE() != null) { - propertyName = "COUNTRY_ISO_CODE"; - } else if (property.COUNTRY_NAME() != null) { - propertyName = "COUNTRY_NAME"; - } else if (property.CONTINENT_NAME() != null) { - propertyName = "CONTINENT_NAME"; - } else if (property.REGION_ISO_CODE() != null) { - propertyName = "REGION_ISO_CODE"; - } else if (property.REGION_NAME() != null) { - propertyName = "REGION_NAME"; - } else if (property.CITY_NAME() != null) { - propertyName = "CITY_NAME"; - } else if (property.TIME_ZONE() != null) { - propertyName = "TIME_ZONE"; - } else if (property.LOCATION() != null) { - propertyName = "LOCATION"; - } else { - continue; + String propertyName = property.getText().toUpperCase(); + + try { + GeoIpCatalystLogicalPlanTranslator.GeoIpProperty.valueOf(propertyName); + } catch (NullPointerException | IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid properties used."); } + properties.add(new Literal(propertyName, DataType.STRING)); } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoipCatalystUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoIpCatalystLogicalPlanTranslator.java similarity index 64% rename from ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoipCatalystUtils.java rename to ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoIpCatalystLogicalPlanTranslator.java index cb41d01a5..de33ae9d6 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoipCatalystUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/GeoIpCatalystLogicalPlanTranslator.java @@ -35,13 +35,12 @@ import java.util.List; import java.util.Locale; import java.util.Optional; -import java.util.logging.Level; import java.util.stream.Collectors; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join; -public interface GeoipCatalystUtils { +public interface GeoIpCatalystLogicalPlanTranslator { String SPARK_CONF_KEY = "spark.geoip.tablename"; String DEFAULT_GEOIP_TABLE_NAME = "geoip"; String SOURCE_TABLE_ALIAS = "t1"; @@ -57,11 +56,47 @@ public interface GeoipCatalystUtils { "location" ); + /** + * Responsible to produce a Spark Logical Plan with given GeoIp command arguments, below is the sample logical plan + * with configuration [source=users, field=a, ipAddress=ip, properties=[country_name, city_name]] + * +- 'DataFrameDropColumns ['t2.country_iso_code, 't2.country_name, 't2.continent_name, 't2.region_iso_code, 't2.region_name, 't2.city_name, 't2.time_zone, 't2.location, 't2.cidr, 't2.start, 't2.end, 't2.ipv4] + * -- +- 'Project [*, named_struct(country_name, 't2.country_name, city_name, 't2.city_name) AS a#0] + * -- -- +- 'Join LeftOuter, (((ip_to_int('ip) >= 't2.start) AND (ip_to_int('ip) < 't2.end)) AND (is_ipv4('ip) = 't2.ipv4)) + * -- -- -- :- 'SubqueryAlias t1 + * -- -- -- -- : +- 'UnresolvedRelation [users], [], false + * -- -- -- +- 'SubqueryAlias t2 + * -- -- -- -- -- +- 'UnresolvedRelation [geoip], [], false + * . + * And the corresponded SQL query: + * . + * SELECT users.*, struct(geoip.country_name, geoip.city_name) AS a + * FROM users, geoip + * WHERE geoip.ip_range_start <= ip_to_int(users.ip) + * AND geoip.ip_range_end > ip_to_int(users.ip) + * AND geoip.ip_type = is_ipv4(users.ip); + * + * @param parameters GeoIp function parameters. + * @param context Context instance to retrieved Expression in resolved form. + * @return a LogicalPlan which will project new col with geoip location based on given ipAddresses. + */ static LogicalPlan getGeoipLogicalPlan(GeoIpParameters parameters, CatalystPlanContext context) { applyJoin(parameters.getIpAddress(), context); return applyProjection(parameters.getField(), parameters.getProperties(), context); } + /** + * Responsible to produce join plan for GeoIp command, below is the sample logical plan + * with configuration [source=users, ipAddress=ip] + * +- 'Join LeftOuter, (((ip_to_int('ip) >= 't2.start) AND (ip_to_int('ip) < 't2.end)) AND (is_ipv4('ip) = 't2.ipv4)) + * -- :- 'SubqueryAlias t1 + * -- -- : +- 'UnresolvedRelation [users], [], false + * -- +- 'SubqueryAlias t2 + * -- -- -- +- 'UnresolvedRelation [geoip], [], false + * + * @param ipAddress Expression representing ip addresses to be queried. + * @param context Context instance to retrieved Expression in resolved form. + * @return a LogicalPlan which will perform join based on ip within cidr range in geoip table. + */ static private LogicalPlan applyJoin(Expression ipAddress, CatalystPlanContext context) { return context.apply(left -> { LogicalPlan right = new UnresolvedRelation(seq(getGeoipTableName()), CaseInsensitiveStringMap.empty(), false); @@ -71,11 +106,11 @@ static private LogicalPlan applyJoin(Expression ipAddress, CatalystPlanContext c new And( new GreaterThanOrEqual( getIpInt(ipAddress), - UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"start")) + UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ip_range_start")) ), new LessThan( getIpInt(ipAddress), - UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"end")) + UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ip_range_end")) ) ), new EqualTo( @@ -93,6 +128,17 @@ static private LogicalPlan applyJoin(Expression ipAddress, CatalystPlanContext c }); } + /** + * Responsible to produce a Spark Logical Plan with given GeoIp command arguments, below is the sample logical plan + * with configuration [source=users, field=a, properties=[country_name, city_name]] + * +- 'DataFrameDropColumns ['t2.country_iso_code, 't2.country_name, 't2.continent_name, 't2.region_iso_code, 't2.region_name, 't2.city_name, 't2.time_zone, 't2.location, 't2.cidr, 't2.start, 't2.end, 't2.ipv4] + * -- +- 'Project [*, named_struct(country_name, 't2.country_name, city_name, 't2.city_name) AS a#0] + * + * @param field Name of new eval geoip column. + * @param properties List of geo properties to be returned. + * @param context Context instance to retrieved Expression in resolved form. + * @return a LogicalPlan which will return source table and new eval geoip column. + */ static private LogicalPlan applyProjection(String field, List properties, CatalystPlanContext context) { List projectExpressions = new ArrayList<>(); projectExpressions.add(UnresolvedStar$.MODULE$.apply(Option.empty())); @@ -114,8 +160,8 @@ static private LogicalPlan applyProjection(String field, List properties List dropList = createGeoIpStructFields(new ArrayList<>()); dropList.addAll(List.of( UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"cidr")), - UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"start")), - UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"end")), + UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ip_range_start")), + UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ip_range_end")), UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ipv4")) )); @@ -140,7 +186,7 @@ static private List createGeoIpStructFields(List attributeLi } static private Expression getIpInt(Expression ipAddress) { - return new ScalaUDF(SerializableUdf.ipToInt, + return new ScalaUDF(SerializableUdf.geoIpUtils.ipToInt, DataTypes.createDecimalType(38,0), seq(ipAddress), seq(), @@ -152,7 +198,7 @@ static private Expression getIpInt(Expression ipAddress) { } static private Expression getIsIpv4(Expression ipAddress) { - return new ScalaUDF(SerializableUdf.isIpv4, + return new ScalaUDF(SerializableUdf.geoIpUtils.isIpv4, DataTypes.BooleanType, seq(ipAddress), seq(), Option.empty(), @@ -179,4 +225,15 @@ class GeoIpParameters { private final Expression ipAddress; private final List properties; } + + enum GeoIpProperty { + COUNTRY_ISO_CODE, + COUNTRY_NAME, + CONTINENT_NAME, + REGION_ISO_CODE, + REGION_NAME, + CITY_NAME, + TIME_ZONE, + LOCATION + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanGeoipFunctionTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanGeoipFunctionTranslatorTestSuite.scala index f6357d211..fc0ecaf42 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanGeoipFunctionTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanGeoipFunctionTranslatorTestSuite.scala @@ -44,7 +44,7 @@ class PPLLogicalPlanGeoipFunctionTranslatorTestSuite right : LogicalPlan ) : LogicalPlan = { val is_ipv4 = ScalaUDF( - SerializableUdf.isIpv4, + SerializableUdf.geoIpUtils.isIpv4, DataTypes.BooleanType, seq(ipAddress), seq(), @@ -54,7 +54,7 @@ class PPLLogicalPlanGeoipFunctionTranslatorTestSuite true ) val ip_to_int = ScalaUDF( - SerializableUdf.ipToInt, + SerializableUdf.geoIpUtils.ipToInt, DataTypes.createDecimalType(38, 0), seq(ipAddress), seq(), @@ -69,8 +69,8 @@ class PPLLogicalPlanGeoipFunctionTranslatorTestSuite val joinCondition = And( And( - GreaterThanOrEqual(ip_to_int, UnresolvedAttribute("t2.start")), - LessThan(ip_to_int, UnresolvedAttribute("t2.end")) + GreaterThanOrEqual(ip_to_int, UnresolvedAttribute("t2.ip_range_start")), + LessThan(ip_to_int, UnresolvedAttribute("t2.ip_range_end")) ), EqualTo(is_ipv4, UnresolvedAttribute("t2.ipv4")) ) @@ -82,7 +82,7 @@ class PPLLogicalPlanGeoipFunctionTranslatorTestSuite val dropList = Seq( "t2.country_iso_code", "t2.country_name", "t2.continent_name", "t2.region_iso_code", "t2.region_name", "t2.city_name", - "t2.time_zone", "t2.location", "t2.cidr", "t2.start", "t2.end", "t2.ipv4" + "t2.time_zone", "t2.location", "t2.cidr", "t2.ip_range_start", "t2.ip_range_end", "t2.ipv4" ).map(UnresolvedAttribute(_)) DataFrameDropColumns(dropList, projection) }