Skip to content

Commit

Permalink
First implementation draft
Browse files Browse the repository at this point in the history
  • Loading branch information
salyh committed Jul 2, 2024
1 parent 49b964d commit 0935fcc
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@
package org.opensearch.sql.ppl;

import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedFieldName;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.JoinType;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.Join;
import org.apache.spark.sql.catalyst.plans.logical.JoinHint;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.types.DataTypes;
Expand All @@ -32,6 +37,7 @@
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Map;
import org.opensearch.sql.ast.expression.Not;
import org.opensearch.sql.ast.expression.Or;
import org.opensearch.sql.ast.expression.QualifiedName;
Expand Down Expand Up @@ -60,6 +66,7 @@
import scala.Option;
import scala.collection.Seq;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -259,7 +266,56 @@ public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) {

@Override
public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) {
throw new IllegalStateException("Not Supported operation : lookup ");
node.getChild().get(0).accept(this, context);

//TODO: not sure how to implement appendonly
Boolean appendonly = (Boolean) node.getOptions().get(0).getValue().getValue();

LogicalPlan right = new UnresolvedRelation(seq(of(node.getIndexName())), CaseInsensitiveStringMap.empty(), false);
//TODO: use node.getCopyFieldList() to prefilter the right logical plan
//and return only the fields listed there. rename fields when requested

Expression joinExpression = visitFieldMap(node.getMatchFieldList());

return context.apply(p -> new Join(

p, //original query (left)

right, //lookup query (right)

JoinType.apply("left"), //https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html

Option.apply(joinExpression), //which fields to join

JoinHint.NONE() //TODO: check, https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#join-hints-types
));
}

private Expression visitFieldMap(List<Map> fieldMap) {
int size = fieldMap.size();

List<Expression> allEqlExpressions = new ArrayList<>(size);

for (Map map : fieldMap) {
Expression eql = new EqualTo(new UnresolvedFieldName(seq(of(((Field) map.getTarget()).getField().toString()))),
new UnresolvedFieldName(seq(of(((Field) map.getOrigin()).getField().toString()))));
allEqlExpressions.add(eql);
}

if(size == 1) {
return allEqlExpressions.get(0);
} else if(size == 2) {
return new org.apache.spark.sql.catalyst.expressions.And(allEqlExpressions.get(0),allEqlExpressions.get(1));
} else {
//2 and(1,2) -> 1 * and
//3 -> and(1, and(2,3)) -> 2 * and
//4 -> and(and(1,2), and(3,4)) -> 3 * and
//5 -> and(and(1, and(2,3)),and(4,5)) -> 4* and
//6 -> and(and(and(1,2), and(3,4)), and(5,6)) -> 5* and

//TODO: implement
throw new RuntimeException("not implemented");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, EqualTo, Floor, GreaterThanOrEqual, Literal, Multiply, SortOrder, TimeWindow}
import org.apache.spark.sql.catalyst.plans.logical._
import org.junit.Assert.assertEquals
import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

class PPLLogicalPlanLookupTranslatorTestSuite
extends SparkFunSuite
with LogicalPlanTestUtils
with Matchers {

private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test lookup ") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "source = table | lookup a b,c as d appendonly=true q,w as z ", false), context)
val star = Seq(UnresolvedStar(None))

val priceField = UnresolvedAttribute("price")
val tableRelation = UnresolvedRelation(Seq("table"))
val aggregateExpressions = Seq(
Alias(UnresolvedFunction(Seq("AVG"), Seq(priceField), isDistinct = false), "avg(price)")())
val aggregatePlan = Aggregate(Seq(), aggregateExpressions, tableRelation)
val expectedPlan = Project(star, aggregatePlan)

//scalastyle:off
println("### plan ###\n"+compareByString(logPlan)+"\n#########");

assertEquals(compareByString(expectedPlan), compareByString(logPlan))
}
}

0 comments on commit 0935fcc

Please sign in to comment.