From 5eda50213cf9491068df09cebffb92f4e64c0880 Mon Sep 17 00:00:00 2001 From: "KENRICK-THINKPA\\kenricky" Date: Tue, 5 Nov 2024 15:46:42 -0800 Subject: [PATCH] Stub UDF --- .../sql/ast/AbstractNodeVisitor.java | 4 ++- .../opensearch/sql/ast/expression/GeoIp.java | 12 ++++++--- .../expression/function/SerializableUdf.java | 25 +++++++++++++++++++ .../sql/ppl/CatalystExpressionVisitor.java | 24 ++++++++++++++++-- .../sql/ppl/parser/AstExpressionBuilder.java | 1 + 5 files changed, 60 insertions(+), 6 deletions(-) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index e6722eae2..3986972c8 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -299,7 +299,9 @@ public T visitExistsSubquery(ExistsSubquery node, C context) { return visitChildren(node, context); } - public T visitGeoIp(GeoIp node, C context) { return visitGeoip(node, context); } + public T visitGeoIp(GeoIp node, C context) { + return visitChildren(node, context); + } public T visitWindow(Window node, C context) { return visitChildren(node, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/GeoIp.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/GeoIp.java index b3edf86ab..ab8ce860e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/GeoIp.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/GeoIp.java @@ -1,14 +1,20 @@ package org.opensearch.sql.ast.expression; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import org.opensearch.sql.ast.AbstractNodeVisitor; import java.util.Arrays; import java.util.List; +@Getter +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor public class GeoIp extends UnresolvedExpression { - private UnresolvedExpression datasource; - private UnresolvedExpression ipAddress; - private Literal properties; + private final UnresolvedExpression datasource; + private final UnresolvedExpression ipAddress; + private final Literal properties; @Override public List getChild() { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 2541b3743..5834fed26 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -9,8 +9,12 @@ import inet.ipaddr.IPAddressString; import inet.ipaddr.IPAddressStringParameters; import scala.Function2; +import scala.Function3; import scala.Serializable; import scala.runtime.AbstractFunction2; +import scala.runtime.AbstractFunction3; + +import java.util.List; public interface SerializableUdf { @@ -51,7 +55,28 @@ public Boolean apply(String ipAddress, String cidrBlock) { } }; + Function3, Object> geoIpFunction = new SerializableAbstractFunction3<>() { + + @Override + public Object apply(String datasource, String ipAddress, List properties) { + Object results = "geoip data"; + + //TODO: + // 1. Check if in-memory cache object for datasource exists. + // 2. If cache object does not exists create new in-memory cache object from csv retrieved from datasource manifest. + // 3. Search cached object for GeoIP data. + // 4. Return GeoIP data. + + return results; + } + }; + abstract class SerializableAbstractFunction2 extends AbstractFunction2 implements Serializable { } + + abstract class SerializableAbstractFunction3 extends AbstractFunction3 + implements Serializable { + + } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java index 397419819..6dbc8c200 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.expression.*; import org.opensearch.sql.ast.expression.subquery.ExistsSubquery; @@ -435,8 +436,27 @@ public Expression visitLambdaFunction(LambdaFunction node, CatalystPlanContext c @Override public Expression visitGeoIp(GeoIp node, CatalystPlanContext context) { - - ScalaUDF udf = new ScalaUDF(); + analyze(node.getDatasource(), context); + Expression datasourceExpression = context.getNamedParseExpressions().pop(); + analyze(node.getIpAddress(), context); + Expression ipAddressExpression = context.getNamedParseExpressions().pop(); + analyze(node.getProperties(), context); + Expression propertiesExpression = context.getNamedParseExpressions().pop(); + + ScalaUDF udf = new ScalaUDF(SerializableUdf.geoIpFunction, + DataTypes.createStructType(new StructField[]{ + DataTypes.createStructField("country", DataTypes.StringType, true), + DataTypes.createStructField("region", DataTypes.StringType, true), + DataTypes.createStructField("city", DataTypes.StringType, true), + DataTypes.createStructField("lat", DataTypes.StringType, true), + DataTypes.createStructField("lon", DataTypes.StringType, true) + }), + seq(datasourceExpression, ipAddressExpression, propertiesExpression), + seq(), + Option.empty(), + Option.apply("geoip"), + false, + true); return context.getNamedParseExpressions().push(udf); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 2c4410bde..22f21b079 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -25,6 +25,7 @@ import org.opensearch.sql.ast.expression.EqualTo; import org.opensearch.sql.ast.expression.Field; import org.opensearch.sql.ast.expression.Function; +import org.opensearch.sql.ast.expression.GeoIp; import org.opensearch.sql.ast.expression.In; import org.opensearch.sql.ast.expression.Interval; import org.opensearch.sql.ast.expression.IntervalUnit;