Skip to content

Commit

Permalink
Stub UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
kenricky-bitquill committed Nov 5, 2024
1 parent f4e1918 commit 5eda502
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ public T visitExistsSubquery(ExistsSubquery node, C context) {
return visitChildren(node, context);
}

public T visitGeoIp(GeoIp node, C context) { return visitGeoip(node, context); }
public T visitGeoIp(GeoIp node, C context) {
return visitChildren(node, context);
}

public T visitWindow(Window node, C context) {
return visitChildren(node, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package org.opensearch.sql.ast.expression;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.AbstractNodeVisitor;

import java.util.Arrays;
import java.util.List;

@Getter
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class GeoIp extends UnresolvedExpression {
private UnresolvedExpression datasource;
private UnresolvedExpression ipAddress;
private Literal properties;
private final UnresolvedExpression datasource;
private final UnresolvedExpression ipAddress;
private final Literal properties;

@Override
public List<UnresolvedExpression> getChild() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
import inet.ipaddr.IPAddressString;
import inet.ipaddr.IPAddressStringParameters;
import scala.Function2;
import scala.Function3;
import scala.Serializable;
import scala.runtime.AbstractFunction2;
import scala.runtime.AbstractFunction3;

import java.util.List;


public interface SerializableUdf {
Expand Down Expand Up @@ -51,7 +55,28 @@ public Boolean apply(String ipAddress, String cidrBlock) {
}
};

Function3<String, String, List<String>, Object> geoIpFunction = new SerializableAbstractFunction3<>() {

@Override
public Object apply(String datasource, String ipAddress, List<String> properties) {
Object results = "geoip data";

//TODO:
// 1. Check if in-memory cache object for datasource exists.
// 2. If cache object does not exists create new in-memory cache object from csv retrieved from datasource manifest.
// 3. Search cached object for GeoIP data.
// 4. Return GeoIP data.

return results;
}
};

abstract class SerializableAbstractFunction2<T1,T2,R> extends AbstractFunction2<T1,T2,R>
implements Serializable {
}

abstract class SerializableAbstractFunction3<T1,T2,T3,R> extends AbstractFunction3<T1,T2,T3,R>
implements Serializable {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.*;
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
Expand Down Expand Up @@ -435,8 +436,27 @@ public Expression visitLambdaFunction(LambdaFunction node, CatalystPlanContext c

@Override
public Expression visitGeoIp(GeoIp node, CatalystPlanContext context) {

ScalaUDF udf = new ScalaUDF();
analyze(node.getDatasource(), context);
Expression datasourceExpression = context.getNamedParseExpressions().pop();
analyze(node.getIpAddress(), context);
Expression ipAddressExpression = context.getNamedParseExpressions().pop();
analyze(node.getProperties(), context);
Expression propertiesExpression = context.getNamedParseExpressions().pop();

ScalaUDF udf = new ScalaUDF(SerializableUdf.geoIpFunction,
DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("country", DataTypes.StringType, true),
DataTypes.createStructField("region", DataTypes.StringType, true),
DataTypes.createStructField("city", DataTypes.StringType, true),
DataTypes.createStructField("lat", DataTypes.StringType, true),
DataTypes.createStructField("lon", DataTypes.StringType, true)
}),
seq(datasourceExpression, ipAddressExpression, propertiesExpression),
seq(),
Option.empty(),
Option.apply("geoip"),
false,
true);

return context.getNamedParseExpressions().push(udf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.GeoIp;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IntervalUnit;
Expand Down

0 comments on commit 5eda502

Please sign in to comment.