Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lookup simplified analyze copy fields #2809

113 changes: 113 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -50,6 +51,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand All @@ -66,6 +68,7 @@
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
Expand All @@ -89,6 +92,7 @@
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalLookup;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
Expand Down Expand Up @@ -507,6 +511,115 @@ public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) {
consecutive);
}

/** Build {@link LogicalLookup}. */
@Override
public LogicalPlan visitLookup(Lookup node, AnalysisContext queryContext) {
LogicalPlan child = node.getChild().get(0).accept(this, queryContext);
List<Argument> options = node.getOptions();
// Todo, refactor the option.
Boolean appendOnly = (Boolean) options.get(0).getValue().getValue();

Table table =
dataSourceService
.getDataSource(DEFAULT_DATASOURCE_NAME)
.getStorageEngine()
.getTable(null, node.getIndexName());

if (table == null || !table.exists()) {
throw new SemanticCheckException(
String.format("no such lookup index %s", node.getIndexName()));
}

AnalysisContext lookupTableContext = new AnalysisContext();
TypeEnvironment lookupTableEnvironment = lookupTableContext.peek();
table
.getFieldTypes()
.forEach(
(name, type) ->
lookupTableEnvironment.define(new Symbol(Namespace.FIELD_NAME, name), type));
ImmutableMap<ReferenceExpression, ReferenceExpression> matchFieldMap =
analyzeLookupMatchFields(node.getMatchFieldList(), queryContext, lookupTableContext);

return new LogicalLookup(
child,
node.getIndexName(),
matchFieldMap,
appendOnly,
analyzeLookupCopyFields(node.getCopyFieldList(), queryContext, table, appendOnly));
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLookupMatchFields(
List<Map> inputMap, AnalysisContext queryContext, AnalysisContext lookupTableContext) {
ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), lookupTableContext);
if (resultMap.getTarget() instanceof Field) {
Expression targerExpression =
expressionAnalyzer.analyze(resultMap.getTarget(), queryContext);
ReferenceExpression targetReference =
DSL.ref(targerExpression.toString(), targerExpression.type());
ReferenceExpression originReference = DSL.ref(origin.toString(), origin.type());
TypeEnvironment curEnv = queryContext.peek();
curEnv.remove(originReference);
curEnv.define(targetReference);
copyMapBuilder.put(originReference, targetReference);
} else {
throw new SemanticCheckException(
String.format("the target expected to be field, but is %s", resultMap.getTarget()));
}
}

return copyMapBuilder.build();
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLookupCopyFields(
List<Map> inputMap, AnalysisContext context, Table table, Boolean appendOnly) {

if (inputMap.isEmpty()) {
return ImmutableMap.<ReferenceExpression, ReferenceExpression>builder().build();
}

TypeEnvironment curEnv = context.peek();
Set<String> queryTableFieldNames = curEnv.lookupAllFields(Namespace.FIELD_NAME).keySet();
java.util.Map<String, ExprType> fieldTypes = table.getFieldTypes();

ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
if (!(resultMap.getOrigin() instanceof Field && resultMap.getTarget() instanceof Field)) {
throw new SemanticCheckException(
String.format(
"the origin and target expected to be field, but is %s/%s",
resultMap.getOrigin(), resultMap.getTarget()));
}

String originFieldNameInLookupTable = ((Field) resultMap.getOrigin()).getField().toString();
String targetFieldNameInQueryTable = ((Field) resultMap.getTarget()).getField().toString();
ExprType ex = fieldTypes.get(originFieldNameInLookupTable);

if (ex == null) {
throw new SemanticCheckException(
String.format("no such field %s", originFieldNameInLookupTable));
}

ReferenceExpression origin = new ReferenceExpression(originFieldNameInLookupTable, ex);
ReferenceExpression target =
new ReferenceExpression(((Field) resultMap.getTarget()).getField().toString(), ex);

if (shouldAppendField(appendOnly, targetFieldNameInQueryTable, queryTableFieldNames)) {
curEnv.define(target.equals(origin) ? origin : target);
}
copyMapBuilder.put(origin, target);
}
return copyMapBuilder.build();
}

private static boolean shouldAppendField(
Boolean appendOnly, String k, Set<String> queryTableFieldNames) {
return !appendOnly || !queryTableFieldNames.contains(k);
}

/** Logical head is identical to {@link LogicalLimit}. */
public LogicalPlan visitHead(Head node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand Down Expand Up @@ -217,6 +218,10 @@ public T visitDedupe(Dedupe node, C context) {
return visitChildren(node, context);
}

public T visitLookup(Lookup node, C context) {
return visitChildren(node, context);
}

public T visitHead(Head node, C context) {
return visitChildren(node, context);
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ast.dsl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -441,6 +443,25 @@ public static Dedupe dedupe(UnresolvedPlan input, List<Argument> options, Field.
return new Dedupe(input, options, Arrays.asList(fields));
}

public static Lookup lookup(
UnresolvedPlan input,
String indexName,
List<Map> matchFieldList,
List<Argument> options,
List<Map> copyFieldList) {
return new Lookup(input, indexName, matchFieldList, options, copyFieldList);
}

public static List<Map> fieldMap(String field, String asField, String... more) {
assert more == null || more.length % 2 == 0;
List list = new ArrayList();
list.add(map(field, asField));
for (int i = 0; i < more.length; i = i + 2) {
list.add(map(more[i], more[i + 1]));
}
return list;
}

public static Head head(UnresolvedPlan input, Integer size, Integer from) {
return new Head(input, size, from);
}
Expand Down
49 changes: 49 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Map;

/** AST node represent Lookup operation. */
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
@AllArgsConstructor
public class Lookup extends UnresolvedPlan {
private UnresolvedPlan child;
private final String indexName;
private final List<Map> matchFieldList;
private final List<Argument> options;
private final List<Map> copyFieldList;

@Override
public Lookup attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitLookup(this, context);
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/org/opensearch/sql/executor/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
import org.opensearch.sql.planner.physical.LimitOperator;
import org.opensearch.sql.planner.physical.LookupOperator;
import org.opensearch.sql.planner.physical.NestedOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;
Expand Down Expand Up @@ -157,6 +158,20 @@ public ExplainResponseNode visitDedupe(DedupeOperator node, Object context) {
"consecutive", node.getConsecutive())));
}

@Override
public ExplainResponseNode visitLookup(LookupOperator node, Object context) {
return explain(
node,
context,
explainNode ->
explainNode.setDescription(
ImmutableMap.of(
"copyfields", node.getCopyFieldMap().toString(),
"matchfields", node.getMatchFieldMap().toString(),
"indexname", node.getIndexName(),
"appendonly", node.getAppendOnly())));
}

@Override
public ExplainResponseNode visitRareTopN(RareTopNOperator node, Object context) {
return explain(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalLookup;
import org.opensearch.sql.planner.logical.LogicalNested;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
Expand All @@ -31,6 +32,7 @@
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
import org.opensearch.sql.planner.physical.LimitOperator;
import org.opensearch.sql.planner.physical.LookupOperator;
import org.opensearch.sql.planner.physical.NestedOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.ProjectOperator;
Expand Down Expand Up @@ -74,6 +76,19 @@ public PhysicalPlan visitDedupe(LogicalDedupe node, C context) {
node.getConsecutive());
}

@Override
public PhysicalPlan visitLookup(LogicalLookup node, C context) {
return new LookupOperator(
visitChild(node, context),
node.getIndexName(),
node.getMatchFieldMap(),
node.getAppendOnly(),
node.getCopyFieldMap(),
(a, b) -> {
throw new UnsupportedOperationException("Lookup not implemented by DefaultImplementor");
});
}

@Override
public PhysicalPlan visitProject(LogicalProject node, C context) {
return new ProjectOperator(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.Arrays;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.expression.ReferenceExpression;

/** Logical Lookup Plan. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = true)
public class LogicalLookup extends LogicalPlan {

private final String indexName;
private final Map<ReferenceExpression, ReferenceExpression> matchFieldMap;
private final Map<ReferenceExpression, ReferenceExpression> copyFieldMap;
private final Boolean appendOnly;

/** Constructor of LogicalLookup. */
public LogicalLookup(
LogicalPlan child,
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
Boolean appendOnly,
Map<ReferenceExpression, ReferenceExpression> copyFieldMap) {
super(Arrays.asList(child));
this.indexName = indexName;
this.copyFieldMap = copyFieldMap;
this.matchFieldMap = matchFieldMap;
this.appendOnly = appendOnly;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitLookup(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,13 @@ public LogicalPlan values(List<LiteralExpression>... values) {
public static LogicalPlan limit(LogicalPlan input, Integer limit, Integer offset) {
return new LogicalLimit(input, limit, offset);
}

public static LogicalPlan lookup(
LogicalPlan input,
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
Boolean appendOnly,
Map<ReferenceExpression, ReferenceExpression> copyFields) {
return new LogicalLookup(input, indexName, matchFieldMap, appendOnly, copyFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public R visitDedupe(LogicalDedupe plan, C context) {
return visitNode(plan, context);
}

public R visitLookup(LogicalLookup plan, C context) {
return visitNode(plan, context);
}

public R visitRename(LogicalRename plan, C context) {
return visitNode(plan, context);
}
Expand Down
Loading