Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
14yapkc1 committed Dec 4, 2024
1 parent 25db8b1 commit 06d1569
Show file tree
Hide file tree
Showing 8 changed files with 1,157 additions and 1,178 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ bin/spark-shell --packages "org.opensearch:opensearch-spark-standalone_2.12:0.7.
To build and run this PPL in Spark, you can run (requires Java 11):

```
sbt clean sparkPPLCosmetic/publishM2
```

Expand Down
3 changes: 0 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
"com.github.sbt" % "junit-interface" % "0.13.3" % "test",
"org.projectlombok" % "lombok" % "1.18.30",
"com.github.seancfoley" % "ipaddress" % "5.5.1",
"org.apache.commons" % "commons-lang3" % "3.17.0",
"org.apache.commons" % "commons-csv" % "1.12.0",
"com.fasterxml.jackson.core" % "jackson-annotations" % "2.14.2",
),
libraryDependencies ++= deps(sparkVersion),
// ANTLR settings
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
Expand All @@ -15,7 +20,6 @@
import java.util.List;
import java.util.Optional;

@ToString
@Getter
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.CaseWhen;
import org.apache.spark.sql.catalyst.expressions.Exists$;
Expand All @@ -28,11 +27,31 @@
import org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.unsafe.types.UTF8String;

import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.*;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Between;
import org.opensearch.sql.ast.expression.BinaryExpression;
import org.opensearch.sql.ast.expression.Case;
import org.opensearch.sql.ast.expression.Compare;
import org.opensearch.sql.ast.expression.FieldsMapping;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IsEmpty;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Not;
import org.opensearch.sql.ast.expression.Or;
import org.opensearch.sql.ast.expression.LambdaFunction;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.Span;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.expression.When;
import org.opensearch.sql.ast.expression.WindowFunction;
import org.opensearch.sql.ast.expression.Xor;
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
Expand All @@ -54,7 +73,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Stack;
import java.util.function.BiFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,29 @@

package org.opensearch.sql.ppl;

import org.apache.spark.sql.catalyst.AliasIdentifier;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.And;
import org.apache.spark.sql.catalyst.expressions.Ascending$;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.CreateStruct;
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
import org.apache.spark.sql.catalyst.expressions.Exp;
import org.apache.spark.sql.catalyst.expressions.Explode;
import org.apache.spark.sql.catalyst.expressions.ExprId;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.GeneratorOuter;
import org.apache.spark.sql.catalyst.expressions.GreaterThan;
import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual;
import org.apache.spark.sql.catalyst.expressions.LessThan;
import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual;
import org.apache.spark.sql.catalyst.expressions.Log;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.ScalaUDF;
import org.apache.spark.sql.catalyst.expressions.SortDirection;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns$;
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$;
import org.apache.spark.sql.catalyst.plans.logical.DropColumns;
import org.apache.spark.sql.catalyst.plans.logical.DropColumns$;
import org.apache.spark.sql.catalyst.plans.logical.Generate;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Project$;
import org.apache.spark.sql.catalyst.plans.logical.Union;
import org.apache.spark.sql.execution.ExplainMode;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.execution.command.ExplainCommand;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.flint.spark.FlattenGenerator;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -92,29 +72,25 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.expression.function.SerializableUdf;
import org.opensearch.sql.ppl.utils.FieldSummaryTransformer;
import org.opensearch.sql.ppl.utils.GeoipCatalystUtils;
import org.opensearch.sql.ppl.utils.ParseTransformer;
import org.opensearch.sql.ppl.utils.SortUtils;
import org.opensearch.sql.ppl.utils.TrendlineCatalystUtils;
import org.opensearch.sql.ppl.utils.WindowSpecTransformer;
import scala.None$;
import scala.Option;
import scala.collection.IterableLike;
import scala.collection.JavaConverters;
import scala.collection.Seq;

import javax.naming.Name;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.List.of;
import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEvents;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEventsAndKeepEmpty;
Expand Down Expand Up @@ -593,124 +569,27 @@ public LogicalPlan visitGeoIp(GeoIp node, CatalystPlanContext context) {
List<String> attributeList = new ArrayList<>();

while (!context.getNamedParseExpressions().isEmpty()) {

Expression nextExpression = context.getNamedParseExpressions().pop();
String attributeName = nextExpression.toString();


if (attributeList.contains(attributeName)) {
throw new IllegalStateException("Duplicate attribute in GEOIP attribute list");
}

attributeList.add(0, attributeName);
}

Expression ipAddressExpression = visitExpression(node.getIpAddress(), context);
Expression fieldExpression = visitExpression(node.getField(), context);
Expression ipAddressExpression = visitExpression(node.getIpAddress(), context);

ScalaUDF ipInt = new ScalaUDF(SerializableUdf.ipToInt,
DataTypes.createDecimalType(38,0),
seq(ipAddressExpression),
seq(),
Option.empty(),
Option.apply("ip_to_int"),
false,
true);

ScalaUDF isIpv4 = new ScalaUDF(SerializableUdf.isIpv4,
DataTypes.BooleanType,
seq(ipAddressExpression),
seq(), Option.empty(),
Option.apply("is_ipv4"),
false,
true);

LogicalPlan joinPlan = context.apply(left -> {
LogicalPlan right = new UnresolvedRelation(seq("geoip"), CaseInsensitiveStringMap.empty(), false);
LogicalPlan leftAlias = org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$.MODULE$.apply("t1", left);
LogicalPlan rightAlias = org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$.MODULE$.apply("t2", right);
Optional<Expression> joinCondition = Optional.of(new And(
new And(
new GreaterThanOrEqual(
ipInt,
UnresolvedAttribute$.MODULE$.apply(seq("t2","start"))
),
new LessThan(
ipInt,
UnresolvedAttribute$.MODULE$.apply(seq("t2","end"))
)
),
new EqualTo(
isIpv4,
UnresolvedAttribute$.MODULE$.apply(seq("t2","ipv4"))
)
));
context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);
return join(leftAlias,
rightAlias,
Join.JoinType.INNER,
joinCondition,
new Join.JoinHint());
});

org.apache.spark.sql.catalyst.plans.logical.Join lol = (org.apache.spark.sql.catalyst.plans.logical.Join) joinPlan;
System.out.println(JavaConverters.seqAsJavaListConverter(lol.right().output()).asJava().size());

List<NamedExpression> projectExpressions = JavaConverters.seqAsJavaListConverter(lol.left().output()).asJava()
.stream()
.map(attr -> (NamedExpression) attr)
.collect(Collectors.toList());

projectExpressions.add(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));

NamedExpression geoCol = org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(
CreateStruct.apply(seq(createGeoIpStructFields(attributeList))),
fieldExpression.toString(),
NamedExpression.newExprId(),
seq(new java.util.ArrayList<String>()),
Option.empty(),
seq(new java.util.ArrayList<String>()));

projectExpressions.add(geoCol);

for (NamedExpression expression : projectExpressions) {
System.out.println(expression);
}

List<Expression> dropList = createGeoIpStructFields(new ArrayList<>());
dropList.addAll(List.of(

UnresolvedAttribute$.MODULE$.apply(seq("t2","cidr")),
UnresolvedAttribute$.MODULE$.apply(seq("t2","start")),
UnresolvedAttribute$.MODULE$.apply(seq("t2","end")),
UnresolvedAttribute$.MODULE$.apply(seq("t2","ipv4"))
));

context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(seq(projectExpressions), joinPlan));
return context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns(seq(dropList), p));
}

private List<Expression> createGeoIpStructFields(List<String> attributeList) {
List<String> attributeListToUse;
if (attributeList == null || attributeList.isEmpty()) {
attributeListToUse = List.of(
"country_iso_code",
"country_name",
"continent_name",
"region_iso_code",
"region_name",
"city_name",
"time_zone",
"location"
);
} else {
attributeListToUse = attributeList;
}

return attributeListToUse.stream()
.map(a -> UnresolvedAttribute$.MODULE$.apply(seq("t2",a.toLowerCase(Locale.ROOT))))
.collect(Collectors.toList());
return GeoipCatalystUtils.getGeoipLogicalPlan(
new GeoipCatalystUtils.GeoIpParameters(
fieldExpression,
ipAddressExpression,
attributeList
),
context
);
}

@Override
Expand Down
Loading

0 comments on commit 06d1569

Please sign in to comment.