Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
14yapkc1 committed Dec 8, 2024
1 parent 3f44731 commit a9e9f93
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 72 deletions.
65 changes: 64 additions & 1 deletion docs/ppl-lang/functions/ppl-ip.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,67 @@ Note:
- `ip` can be an IPv4 or an IPv6 address
- `cidr` can be an IPv4 or an IPv6 block
- `ip` and `cidr` must be either both IPv4 or both IPv6
- `ip` and `cidr` must both be valid and non-empty/non-null
- `ip` and `cidr` must both be valid and non-empty/non-null

### `GEOIP`

**Description**

`GEOIP(ip[, property]...)` retrieves geospatial data corresponding to the provided `ip`.

**Argument type:**
- `ip` is string be **STRING**.
- `property` is **STRING** and must be one of the following:
- `COUNTRY_ISO_CODE`
- `COUNTRY_NAME`
- `CONTINENT_NAME`
- `REGION_ISO_CODE`
- `REGION_NAME`
- `CITY_NAME`
- `TIME_ZONE`
- `LOCATION`
- Return type:
- **STRING** if one property given
- **STRUCT_TYPE** if more than one or no property is given

Example:

_Without properties:_

os> source=ips | eval a = geoip(ip) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+-------------------------------------------------------------------------------------------------------+
|ip |lol |
+---------------------+-------------------------------------------------------------------------------------------------------+
|66.249.157.90 |{JM, Jamaica, North America, 14, Saint Catherine Parish, Portmore, America/Jamaica, 17.9686,-76.8827} |
|2a09:bac2:19f8:2ac3::|{CA, Canada, North America, PE, Prince Edward Island, Charlottetown, America/Halifax, 46.2396,-63.1355}|
+---------------------+-------+------+-------------------------------------------------------------------------------------------------------+

_With one property:_

os> source=users | eval a = geoip(ip, COUNTRY_NAME) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+-------+
|ip |a |
+---------------------+-------+
|66.249.157.90 |Jamaica|
|2a09:bac2:19f8:2ac3::|Canada |
+---------------------+-------+

_With multiple properties:_

os> source=users | eval a = geoip(ip, COUNTRY_NAME, REGION_NAME, CITY_NAME) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+---------------------------------------------+
|ip |a |
+---------------------+---------------------------------------------+
|66.249.157.90 |{Jamaica, Saint Catherine Parish, Portmore} |
|2a09:bac2:19f8:2ac3::|{Canada, Prince Edward Island, Charlottetown}|
+---------------------+---------------------------------------------+

Note:
- To use `geoip` user must create spark table containing geo ip location data. Instructions to create table can be found [here](../../opensearch-geoip.md).
- `geoip` command by default expects the created table to be called `geoip_ip_data`.
- if a different table name is desired, can set `spark.geoip.tablename` spark config to new table name.
- `ip` can be an IPv4 or an IPv6 address.
- `geoip` commands will always calculated first if used with other eval functions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,41 +54,43 @@ public Boolean apply(String ipAddress, String cidrBlock) {
return parsedCidrBlock.contains(parsedIpAddress);
}};

Function1<String,Boolean> isIpv4 = new SerializableAbstractFunction1<>() {

IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder()
.allowEmpty(false)
.setEmptyAsLoopback(false)
.allow_inet_aton(false)
.allowSingleSegment(false)
.toParams();

@Override
public Boolean apply(String ipAddress) {
IPAddressString parsedIpAddress = new IPAddressString(ipAddress, valOptions);

try {
parsedIpAddress.validate();
} catch (AddressStringException e) {
throw new RuntimeException("The given ipAddress '"+ipAddress+"' is invalid. It must be a valid IPv4 or IPv6 address. Error details: "+e.getMessage());
class geoIpUtils {
public static Function1<String,Boolean> isIpv4 = new SerializableAbstractFunction1<>() {

IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder()
.allowEmpty(false)
.setEmptyAsLoopback(false)
.allow_inet_aton(false)
.allowSingleSegment(false)
.toParams();

@Override
public Boolean apply(String ipAddress) {
IPAddressString parsedIpAddress = new IPAddressString(ipAddress, valOptions);

try {
parsedIpAddress.validate();
} catch (AddressStringException e) {
throw new RuntimeException("The given ipAddress '"+ipAddress+"' is invalid. It must be a valid IPv4 or IPv6 address. Error details: "+e.getMessage());
}

return parsedIpAddress.isIPv4();
}};

public static Function1<String,BigInteger> ipToInt = new SerializableAbstractFunction1<>() {
@Override
public BigInteger apply(String ipAddress) {
try {
InetAddress inetAddress = InetAddress.getByName(ipAddress);
byte[] addressBytes = inetAddress.getAddress();
return new BigInteger(1, addressBytes);
} catch (UnknownHostException e) {
System.err.println("Invalid IP address: " + e.getMessage());
}
return null;
}

return parsedIpAddress.isIPv4();
}};

Function1<String,BigInteger> ipToInt = new SerializableAbstractFunction1<>() {
@Override
public BigInteger apply(String ipAddress) {
try {
InetAddress inetAddress = InetAddress.getByName(ipAddress);
byte[] addressBytes = inetAddress.getAddress();
return new BigInteger(1, addressBytes);
} catch (UnknownHostException e) {
System.err.println("Invalid IP address: " + e.getMessage());
}
return null;
}
};
};
}

abstract class SerializableAbstractFunction1<T1,R> extends AbstractFunction1<T1,R>
implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.ppl.utils.FieldSummaryTransformer;
import org.opensearch.sql.ppl.utils.GeoipCatalystUtils;
import org.opensearch.sql.ppl.utils.GeoIpCatalystLogicalPlanTranslator;
import org.opensearch.sql.ppl.utils.ParseTransformer;
import org.opensearch.sql.ppl.utils.SortUtils;
import org.opensearch.sql.ppl.utils.TrendlineCatalystUtils;
Expand Down Expand Up @@ -587,8 +587,8 @@ public LogicalPlan visitGeoIp(GeoIp node, CatalystPlanContext context) {
String fieldExpression = node.getField().getField().toString();
Expression ipAddressExpression = visitExpression(node.getIpAddress(), context);

return GeoipCatalystUtils.getGeoipLogicalPlan(
new GeoipCatalystUtils.GeoIpParameters(
return GeoIpCatalystLogicalPlanTranslator.getGeoipLogicalPlan(
new GeoIpCatalystLogicalPlanTranslator.GeoIpParameters(
fieldExpression,
ipAddressExpression,
attributeList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.tree.GeoIp;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IntervalUnit;
Expand All @@ -47,6 +46,7 @@
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.ppl.utils.ArgumentFactory;
import org.opensearch.sql.ppl.utils.GeoIpCatalystLogicalPlanTranslator;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -449,26 +449,14 @@ public UnresolvedExpression visitGeoIpPropertyList(OpenSearchPPLParser.GeoIpProp
ImmutableList.Builder<UnresolvedExpression> properties = ImmutableList.builder();
if (ctx != null) {
for (OpenSearchPPLParser.GeoIpPropertyContext property : ctx.geoIpProperty()) {
String propertyName;
if (property.COUNTRY_ISO_CODE() != null) {
propertyName = "COUNTRY_ISO_CODE";
} else if (property.COUNTRY_NAME() != null) {
propertyName = "COUNTRY_NAME";
} else if (property.CONTINENT_NAME() != null) {
propertyName = "CONTINENT_NAME";
} else if (property.REGION_ISO_CODE() != null) {
propertyName = "REGION_ISO_CODE";
} else if (property.REGION_NAME() != null) {
propertyName = "REGION_NAME";
} else if (property.CITY_NAME() != null) {
propertyName = "CITY_NAME";
} else if (property.TIME_ZONE() != null) {
propertyName = "TIME_ZONE";
} else if (property.LOCATION() != null) {
propertyName = "LOCATION";
} else {
continue;
String propertyName = property.getText().toUpperCase();

try {
GeoIpCatalystLogicalPlanTranslator.GeoIpProperty.valueOf(propertyName);
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid properties used.");
}

properties.add(new Literal(propertyName, DataType.STRING));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.logging.Level;
import java.util.stream.Collectors;

import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join;

public interface GeoipCatalystUtils {
public interface GeoIpCatalystLogicalPlanTranslator {
String SPARK_CONF_KEY = "spark.geoip.tablename";
String DEFAULT_GEOIP_TABLE_NAME = "geoip";
String SOURCE_TABLE_ALIAS = "t1";
Expand All @@ -57,11 +56,47 @@ public interface GeoipCatalystUtils {
"location"
);

/**
* Responsible to produce a Spark Logical Plan with given GeoIp command arguments, below is the sample logical plan
* with configuration [source=users, field=a, ipAddress=ip, properties=[country_name, city_name]]
* +- 'DataFrameDropColumns ['t2.country_iso_code, 't2.country_name, 't2.continent_name, 't2.region_iso_code, 't2.region_name, 't2.city_name, 't2.time_zone, 't2.location, 't2.cidr, 't2.start, 't2.end, 't2.ipv4]
* -- +- 'Project [*, named_struct(country_name, 't2.country_name, city_name, 't2.city_name) AS a#0]
* -- -- +- 'Join LeftOuter, (((ip_to_int('ip) >= 't2.start) AND (ip_to_int('ip) < 't2.end)) AND (is_ipv4('ip) = 't2.ipv4))
* -- -- -- :- 'SubqueryAlias t1
* -- -- -- -- : +- 'UnresolvedRelation [users], [], false
* -- -- -- +- 'SubqueryAlias t2
* -- -- -- -- -- +- 'UnresolvedRelation [geoip], [], false
* .
* And the corresponded SQL query:
* .
* SELECT users.*, struct(geoip.country_name, geoip.city_name) AS a
* FROM users, geoip
* WHERE geoip.ip_range_start <= ip_to_int(users.ip)
* AND geoip.ip_range_end > ip_to_int(users.ip)
* AND geoip.ip_type = is_ipv4(users.ip);
*
* @param parameters GeoIp function parameters.
* @param context Context instance to retrieved Expression in resolved form.
* @return a LogicalPlan which will project new col with geoip location based on given ipAddresses.
*/
static LogicalPlan getGeoipLogicalPlan(GeoIpParameters parameters, CatalystPlanContext context) {
applyJoin(parameters.getIpAddress(), context);
return applyProjection(parameters.getField(), parameters.getProperties(), context);
}

/**
* Responsible to produce join plan for GeoIp command, below is the sample logical plan
* with configuration [source=users, ipAddress=ip]
* +- 'Join LeftOuter, (((ip_to_int('ip) >= 't2.start) AND (ip_to_int('ip) < 't2.end)) AND (is_ipv4('ip) = 't2.ipv4))
* -- :- 'SubqueryAlias t1
* -- -- : +- 'UnresolvedRelation [users], [], false
* -- +- 'SubqueryAlias t2
* -- -- -- +- 'UnresolvedRelation [geoip], [], false
*
* @param ipAddress Expression representing ip addresses to be queried.
* @param context Context instance to retrieved Expression in resolved form.
* @return a LogicalPlan which will perform join based on ip within cidr range in geoip table.
*/
static private LogicalPlan applyJoin(Expression ipAddress, CatalystPlanContext context) {
return context.apply(left -> {
LogicalPlan right = new UnresolvedRelation(seq(getGeoipTableName()), CaseInsensitiveStringMap.empty(), false);
Expand All @@ -71,11 +106,11 @@ static private LogicalPlan applyJoin(Expression ipAddress, CatalystPlanContext c
new And(
new GreaterThanOrEqual(
getIpInt(ipAddress),
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"start"))
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ip_range_start"))
),
new LessThan(
getIpInt(ipAddress),
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"end"))
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ip_range_end"))
)
),
new EqualTo(
Expand All @@ -93,6 +128,17 @@ static private LogicalPlan applyJoin(Expression ipAddress, CatalystPlanContext c
});
}

/**
* Responsible to produce a Spark Logical Plan with given GeoIp command arguments, below is the sample logical plan
* with configuration [source=users, field=a, properties=[country_name, city_name]]
* +- 'DataFrameDropColumns ['t2.country_iso_code, 't2.country_name, 't2.continent_name, 't2.region_iso_code, 't2.region_name, 't2.city_name, 't2.time_zone, 't2.location, 't2.cidr, 't2.start, 't2.end, 't2.ipv4]
* -- +- 'Project [*, named_struct(country_name, 't2.country_name, city_name, 't2.city_name) AS a#0]
*
* @param field Name of new eval geoip column.
* @param properties List of geo properties to be returned.
* @param context Context instance to retrieved Expression in resolved form.
* @return a LogicalPlan which will return source table and new eval geoip column.
*/
static private LogicalPlan applyProjection(String field, List<String> properties, CatalystPlanContext context) {
List<NamedExpression> projectExpressions = new ArrayList<>();
projectExpressions.add(UnresolvedStar$.MODULE$.apply(Option.empty()));
Expand All @@ -114,8 +160,8 @@ static private LogicalPlan applyProjection(String field, List<String> properties
List<Expression> dropList = createGeoIpStructFields(new ArrayList<>());
dropList.addAll(List.of(
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"cidr")),
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"start")),
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"end")),
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ip_range_start")),
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ip_range_end")),
UnresolvedAttribute$.MODULE$.apply(seq(GEOIP_TABLE_ALIAS,"ipv4"))
));

Expand All @@ -140,7 +186,7 @@ static private List<Expression> createGeoIpStructFields(List<String> attributeLi
}

static private Expression getIpInt(Expression ipAddress) {
return new ScalaUDF(SerializableUdf.ipToInt,
return new ScalaUDF(SerializableUdf.geoIpUtils.ipToInt,
DataTypes.createDecimalType(38,0),
seq(ipAddress),
seq(),
Expand All @@ -152,7 +198,7 @@ static private Expression getIpInt(Expression ipAddress) {
}

static private Expression getIsIpv4(Expression ipAddress) {
return new ScalaUDF(SerializableUdf.isIpv4,
return new ScalaUDF(SerializableUdf.geoIpUtils.isIpv4,
DataTypes.BooleanType,
seq(ipAddress),
seq(), Option.empty(),
Expand All @@ -179,4 +225,15 @@ class GeoIpParameters {
private final Expression ipAddress;
private final List<String> properties;
}

enum GeoIpProperty {
COUNTRY_ISO_CODE,
COUNTRY_NAME,
CONTINENT_NAME,
REGION_ISO_CODE,
REGION_NAME,
CITY_NAME,
TIME_ZONE,
LOCATION
}
}
Loading

0 comments on commit a9e9f93

Please sign in to comment.