Skip to content

Commit

Permalink
lookup - copy fields from the lookup table
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Trochimiak <[email protected]>
  • Loading branch information
kt-eliatra authored and salyh committed Jul 2, 2024
1 parent 0a81105 commit 427793a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.common.collect.ImmutableList;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Map;

Expand All @@ -16,21 +17,21 @@

public class Lookup extends UnresolvedPlan {
private UnresolvedPlan child;
private final String indexName;
private final String tableName;
private final List<Map> matchFieldList;
private final List<Argument> options;
private final List<Map> copyFieldList;
private final List<Alias> copyFieldList;

public Lookup(UnresolvedPlan child, String indexName, List<Map> matchFieldList, List<Argument> options, List<Map> copyFieldList) {
public Lookup(UnresolvedPlan child, String tableName, List<Map> matchFieldList, List<Argument> options, List<Alias> copyFieldList) {
this.child = child;
this.indexName = indexName;
this.tableName = tableName;
this.matchFieldList = matchFieldList;
this.options = options;
this.copyFieldList = copyFieldList;
}

public Lookup(String indexName, List<Map> matchFieldList, List<Argument> options, List<Map> copyFieldList) {
this.indexName = indexName;
public Lookup(String tableName, List<Map> matchFieldList, List<Argument> options, List<Alias> copyFieldList) {
this.tableName = tableName;
this.matchFieldList = matchFieldList;
this.options = options;
this.copyFieldList = copyFieldList;
Expand All @@ -42,8 +43,8 @@ public Lookup attach(UnresolvedPlan child) {
return this;
}

public String getIndexName() {
return indexName;
public String getTableName() {
return tableName;
}

public List<Map> getMatchFieldList() {
Expand All @@ -54,7 +55,7 @@ public List<Argument> getOptions() {
return options;
}

public List<Map> getCopyFieldList() {
public List<Alias> getCopyFieldList() {
return copyFieldList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand Down Expand Up @@ -68,6 +69,7 @@
import scala.collection.Seq;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -280,17 +282,26 @@ public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) {
//TODO: not sure how to implement appendonly
Boolean appendonly = (Boolean) node.getOptions().get(0).getValue().getValue();

LogicalPlan right = new UnresolvedRelation(seq(of(node.getIndexName())), CaseInsensitiveStringMap.empty(), false);
LogicalPlan lookupRelation = new UnresolvedRelation(seq(of(node.getTableName())), CaseInsensitiveStringMap.empty(), false);
org.apache.spark.sql.catalyst.plans.logical.Project lookupProject;

List<NamedExpression> lookupRelationFields = buildLookupTableFieldList(node, context);
if (! lookupRelationFields.isEmpty()) {
lookupProject = new org.apache.spark.sql.catalyst.plans.logical.Project(seq(lookupRelationFields), lookupRelation);
} else {
lookupProject = new org.apache.spark.sql.catalyst.plans.logical.Project(seq(of(new UnresolvedStar(Option.empty()))), lookupRelation);
}

//TODO: use node.getCopyFieldList() to prefilter the right logical plan
//and return only the fields listed there. rename fields when requested

Expression joinCondition = visitFieldMap(node.getMatchFieldList(), source.getTableQualifiedName().toString(), node.getIndexName(), context);
Expression joinCondition = buildLookupTableJoinCondition(node.getMatchFieldList(), source.getTableQualifiedName().toString(), node.getTableName(), context);

return context.apply(p -> new Join(

p, //original query (left)

right, //lookup query (right)
lookupProject, //lookup query (right)

JoinType.apply("left"), //https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html

Expand All @@ -300,22 +311,38 @@ public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) {
));
}

private List<NamedExpression> buildLookupTableFieldList(Lookup node, CatalystPlanContext context) {
if (node.getCopyFieldList().isEmpty()) {
return Collections.emptyList();
} else {
//todo should we also append fields used to match records - node.getMatchFieldList()?
List<NamedExpression> copyFields = node.getCopyFieldList().stream()
.map(copyField -> (NamedExpression) expressionAnalyzer.visitAlias(copyField, context))
.collect(Collectors.toList());
return copyFields;
}
}

private org.opensearch.sql.ast.expression.Field prefixField(List<String> prefixParts, UnresolvedExpression field) {
org.opensearch.sql.ast.expression.Field in = (org.opensearch.sql.ast.expression.Field) field;
org.opensearch.sql.ast.expression.QualifiedName inq = (org.opensearch.sql.ast.expression.QualifiedName) in.getField();
Iterable finalParts = Iterables.concat(prefixParts, inq.getParts());
return new org.opensearch.sql.ast.expression.Field(new org.opensearch.sql.ast.expression.QualifiedName(finalParts), in.getFieldArgs());
}

private Expression visitFieldMap(List<Map> fieldMap, String sourceTableName, String lookupTableName, CatalystPlanContext context) {
private Expression buildLookupTableJoinCondition(List<Map> fieldMap, String sourceTableName, String lookupTableName, CatalystPlanContext context) {
int size = fieldMap.size();

List<Expression> allEqlExpressions = new ArrayList<>(size);

for (Map map : fieldMap) {

Expression origin = visitExpression(prefixField(of(sourceTableName.split("\\.")),map.getOrigin()), context);
Expression target = visitExpression(prefixField(of(lookupTableName.split("\\.")),map.getTarget()), context);
//todo do we need to run prefixField? match fields are anyway handled as qualifiedName?
// Expression origin = visitExpression(prefixField(of(sourceTableName.split("\\.")),map.getOrigin()), context);
// Expression target = visitExpression(prefixField(of(lookupTableName.split("\\.")),map.getTarget()), context);
Expression origin = visitExpression(map.getOrigin(), context);
Expression target = visitExpression(map.getTarget(), context);


//important
context.retainAllNamedParseExpressions(e -> e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.sql.ppl.utils.ArgumentFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -216,9 +215,9 @@ public UnresolvedPlan visitLookupCommand(OpenSearchPPLParser.LookupCommandContex
ctx.copyFieldWithOptAs().stream()
.map(
ct ->
new Map(
evaluateFieldExpressionContext(ct.orignalCopyField),
evaluateFieldExpressionContext(ct.asCopyField, ct.orignalCopyField)))
new Alias(
ct.asCopyField == null? ct.orignalCopyField.getText() : ct.asCopyField.getText(),
evaluateFieldExpressionContext(ct.orignalCopyField)))
.collect(Collectors.toList()));
}

Expand Down

0 comments on commit 427793a

Please sign in to comment.