diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index d5e8b93b13..09622459b6 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -50,6 +51,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; @@ -66,6 +68,7 @@ import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.data.model.ExprMissingValue; import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; @@ -89,6 +92,7 @@ import org.opensearch.sql.planner.logical.LogicalFetchCursor; import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalLimit; +import org.opensearch.sql.planner.logical.LogicalLookup; import org.opensearch.sql.planner.logical.LogicalML; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPaginate; @@ -507,6 +511,115 @@ public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) { consecutive); } + /** Build {@link LogicalLookup}. */ + @Override + public LogicalPlan visitLookup(Lookup node, AnalysisContext queryContext) { + LogicalPlan child = node.getChild().get(0).accept(this, queryContext); + List options = node.getOptions(); + // Todo, refactor the option. + Boolean appendOnly = (Boolean) options.get(0).getValue().getValue(); + + Table table = + dataSourceService + .getDataSource(DEFAULT_DATASOURCE_NAME) + .getStorageEngine() + .getTable(null, node.getIndexName()); + + if (table == null || !table.exists()) { + throw new SemanticCheckException( + String.format("no such lookup index %s", node.getIndexName())); + } + + AnalysisContext lookupTableContext = new AnalysisContext(); + TypeEnvironment lookupTableEnvironment = lookupTableContext.peek(); + table + .getFieldTypes() + .forEach( + (name, type) -> + lookupTableEnvironment.define(new Symbol(Namespace.FIELD_NAME, name), type)); + ImmutableMap matchFieldMap = + analyzeLookupMatchFields(node.getMatchFieldList(), queryContext, lookupTableContext); + + return new LogicalLookup( + child, + node.getIndexName(), + matchFieldMap, + appendOnly, + analyzeLookupCopyFields(node.getCopyFieldList(), queryContext, table, appendOnly)); + } + + private ImmutableMap analyzeLookupMatchFields( + List inputMap, AnalysisContext queryContext, AnalysisContext lookupTableContext) { + ImmutableMap.Builder copyMapBuilder = + new ImmutableMap.Builder<>(); + for (Map resultMap : inputMap) { + Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), lookupTableContext); + if (resultMap.getTarget() instanceof Field) { + Expression targerExpression = + expressionAnalyzer.analyze(resultMap.getTarget(), queryContext); + ReferenceExpression targetReference = + DSL.ref(targerExpression.toString(), targerExpression.type()); + ReferenceExpression originReference = DSL.ref(origin.toString(), origin.type()); + TypeEnvironment curEnv = queryContext.peek(); + curEnv.remove(originReference); + curEnv.define(targetReference); + copyMapBuilder.put(originReference, targetReference); + } else { + throw new SemanticCheckException( + String.format("the target expected to be field, but is %s", resultMap.getTarget())); + } + } + + return copyMapBuilder.build(); + } + + private ImmutableMap analyzeLookupCopyFields( + List inputMap, AnalysisContext context, Table table, Boolean appendOnly) { + + if (inputMap.isEmpty()) { + return ImmutableMap.builder().build(); + } + + TypeEnvironment curEnv = context.peek(); + Set queryTableFieldNames = curEnv.lookupAllFields(Namespace.FIELD_NAME).keySet(); + java.util.Map fieldTypes = table.getFieldTypes(); + + ImmutableMap.Builder copyMapBuilder = + new ImmutableMap.Builder<>(); + for (Map resultMap : inputMap) { + if (!(resultMap.getOrigin() instanceof Field && resultMap.getTarget() instanceof Field)) { + throw new SemanticCheckException( + String.format( + "the origin and target expected to be field, but is %s/%s", + resultMap.getOrigin(), resultMap.getTarget())); + } + + String originFieldNameInLookupTable = ((Field) resultMap.getOrigin()).getField().toString(); + String targetFieldNameInQueryTable = ((Field) resultMap.getTarget()).getField().toString(); + ExprType ex = fieldTypes.get(originFieldNameInLookupTable); + + if (ex == null) { + throw new SemanticCheckException( + String.format("no such field %s", originFieldNameInLookupTable)); + } + + ReferenceExpression origin = new ReferenceExpression(originFieldNameInLookupTable, ex); + ReferenceExpression target = + new ReferenceExpression(((Field) resultMap.getTarget()).getField().toString(), ex); + + if (shouldAppendField(appendOnly, targetFieldNameInQueryTable, queryTableFieldNames)) { + curEnv.define(target.equals(origin) ? origin : target); + } + copyMapBuilder.put(origin, target); + } + return copyMapBuilder.build(); + } + + private static boolean shouldAppendField( + Boolean appendOnly, String k, Set queryTableFieldNames) { + return !appendOnly || !queryTableFieldNames.contains(k); + } + /** Logical head is identical to {@link LogicalLimit}. */ public LogicalPlan visitHead(Head node, AnalysisContext context) { LogicalPlan child = node.getChild().get(0).accept(this, context); diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 973b10310b..16ebf45854 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -49,6 +49,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; @@ -217,6 +218,10 @@ public T visitDedupe(Dedupe node, C context) { return visitChildren(node, context); } + public T visitLookup(Lookup node, C context) { + return visitChildren(node, context); + } + public T visitHead(Head node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index 4f3056b0f7..9b435a627d 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -5,6 +5,7 @@ package org.opensearch.sql.ast.dsl; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -49,6 +50,7 @@ import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareTopN; @@ -441,6 +443,25 @@ public static Dedupe dedupe(UnresolvedPlan input, List options, Field. return new Dedupe(input, options, Arrays.asList(fields)); } + public static Lookup lookup( + UnresolvedPlan input, + String indexName, + List matchFieldList, + List options, + List copyFieldList) { + return new Lookup(input, indexName, matchFieldList, options, copyFieldList); + } + + public static List fieldMap(String field, String asField, String... more) { + assert more == null || more.length % 2 == 0; + List list = new ArrayList(); + list.add(map(field, asField)); + for (int i = 0; i < more.length; i = i + 2) { + list.add(map(more[i], more[i + 1])); + } + return list; + } + public static Head head(UnresolvedPlan input, Integer size, Integer from) { return new Head(input, size, from); } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java b/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java new file mode 100644 index 0000000000..d9b22cbd26 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.Map; + +/** AST node represent Lookup operation. */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +@AllArgsConstructor +public class Lookup extends UnresolvedPlan { + private UnresolvedPlan child; + private final String indexName; + private final List matchFieldList; + private final List options; + private final List copyFieldList; + + @Override + public Lookup attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/Explain.java b/core/src/main/java/org/opensearch/sql/executor/Explain.java index 0f05b99383..bec6c5e47d 100644 --- a/core/src/main/java/org/opensearch/sql/executor/Explain.java +++ b/core/src/main/java/org/opensearch/sql/executor/Explain.java @@ -22,6 +22,7 @@ import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; import org.opensearch.sql.planner.physical.LimitOperator; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; @@ -157,6 +158,20 @@ public ExplainResponseNode visitDedupe(DedupeOperator node, Object context) { "consecutive", node.getConsecutive()))); } + @Override + public ExplainResponseNode visitLookup(LookupOperator node, Object context) { + return explain( + node, + context, + explainNode -> + explainNode.setDescription( + ImmutableMap.of( + "copyfields", node.getCopyFieldMap().toString(), + "matchfields", node.getMatchFieldMap().toString(), + "indexname", node.getIndexName(), + "appendonly", node.getAppendOnly()))); + } + @Override public ExplainResponseNode visitRareTopN(RareTopNOperator node, Object context) { return explain( diff --git a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java index b53d17b38f..6d19bb312f 100644 --- a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java +++ b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java @@ -13,6 +13,7 @@ import org.opensearch.sql.planner.logical.LogicalFetchCursor; import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalLimit; +import org.opensearch.sql.planner.logical.LogicalLookup; import org.opensearch.sql.planner.logical.LogicalNested; import org.opensearch.sql.planner.logical.LogicalPaginate; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -31,6 +32,7 @@ import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; import org.opensearch.sql.planner.physical.LimitOperator; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.ProjectOperator; @@ -74,6 +76,19 @@ public PhysicalPlan visitDedupe(LogicalDedupe node, C context) { node.getConsecutive()); } + @Override + public PhysicalPlan visitLookup(LogicalLookup node, C context) { + return new LookupOperator( + visitChild(node, context), + node.getIndexName(), + node.getMatchFieldMap(), + node.getAppendOnly(), + node.getCopyFieldMap(), + (a, b) -> { + throw new UnsupportedOperationException("Lookup not implemented by DefaultImplementor"); + }); + } + @Override public PhysicalPlan visitProject(LogicalProject node, C context) { return new ProjectOperator( diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java new file mode 100644 index 0000000000..3079b69941 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.logical; + +import java.util.Arrays; +import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.expression.ReferenceExpression; + +/** Logical Lookup Plan. */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = true) +public class LogicalLookup extends LogicalPlan { + + private final String indexName; + private final Map matchFieldMap; + private final Map copyFieldMap; + private final Boolean appendOnly; + + /** Constructor of LogicalLookup. */ + public LogicalLookup( + LogicalPlan child, + String indexName, + Map matchFieldMap, + Boolean appendOnly, + Map copyFieldMap) { + super(Arrays.asList(child)); + this.indexName = indexName; + this.copyFieldMap = copyFieldMap; + this.matchFieldMap = matchFieldMap; + this.appendOnly = appendOnly; + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java index 2a886ba0ca..f2dfdfe661 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java @@ -138,4 +138,13 @@ public LogicalPlan values(List... values) { public static LogicalPlan limit(LogicalPlan input, Integer limit, Integer offset) { return new LogicalLimit(input, limit, offset); } + + public static LogicalPlan lookup( + LogicalPlan input, + String indexName, + Map matchFieldMap, + Boolean appendOnly, + Map copyFields) { + return new LogicalLookup(input, indexName, matchFieldMap, appendOnly, copyFields); + } } diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java index 156db35306..c05f8c91f6 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java @@ -52,6 +52,10 @@ public R visitDedupe(LogicalDedupe plan, C context) { return visitNode(plan, context); } + public R visitLookup(LogicalLookup plan, C context) { + return visitNode(plan, context); + } + public R visitRename(LogicalRename plan, C context) { return visitNode(plan, context); } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java new file mode 100644 index 0000000000..7117d87f5d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical; + +import static org.opensearch.sql.data.type.ExprCoreType.STRUCT; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.expression.ReferenceExpression; + +/** Lookup operator. Perform lookup on another OpenSearch index and enrich the results. */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = false) +public class LookupOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final String indexName; + @Getter private final Map matchFieldMap; + @Getter private final Map copyFieldMap; + @Getter private final Boolean appendOnly; + + @EqualsAndHashCode.Exclude + private final BiFunction, Map> lookup; + + /** Lookup Constructor. */ + @NonNull + public LookupOperator( + PhysicalPlan input, + String indexName, + Map matchFieldMap, + Boolean appendOnly, + Map copyFieldMap, + BiFunction, Map> lookup) { + this.input = input; + this.indexName = indexName; + this.matchFieldMap = matchFieldMap; + this.appendOnly = appendOnly; + this.copyFieldMap = copyFieldMap; + this.lookup = lookup; + } + + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + return visitor.visitLookup(this, context); + } + + @Override + public List getChild() { + return Collections.singletonList(input); + } + + @Override + public boolean hasNext() { + return input.hasNext(); + } + + @Override + public ExprValue next() { + ExprValue inputValue = input.next(); + + if (STRUCT == inputValue.type()) { + Map matchMap = new HashMap<>(); + Map finalMap = new HashMap<>(); + + for (Map.Entry matchField : + matchFieldMap.entrySet()) { + Object val = inputValue.bindingTuples().resolve(matchField.getValue()).value(); + if (val != null) { + matchMap.put(matchField.getKey().toString(), val); + } else { + // No value in search results, so we just return the input + return inputValue; + } + } + + finalMap.put("_match", matchMap); + + Map copyMap = new HashMap<>(); + + if (!copyFieldMap.isEmpty()) { + + for (Map.Entry copyField : + copyFieldMap.entrySet()) { + copyMap.put(String.valueOf(copyField.getKey()), String.valueOf(copyField.getValue())); + } + + finalMap.put("_copy", copyMap.keySet()); + } + + Map lookupResult = lookup.apply(indexName, finalMap); + + if (lookupResult == null || lookupResult.isEmpty()) { + // no lookup found or lookup is empty, so we just return the original input value + return inputValue; + } + + Map tupleInputValue = ExprValueUtils.getTupleValue(inputValue); + Map resultTupleBuilder = new HashMap<>(); + resultTupleBuilder.putAll(tupleInputValue); + for (Map.Entry sourceOfAdditionalField : lookupResult.entrySet()) { + String lookedUpFieldName = sourceOfAdditionalField.getKey(); + Object lookedUpFieldValue = sourceOfAdditionalField.getValue(); + String finalFieldName = copyMap.getOrDefault(lookedUpFieldName, lookedUpFieldName); + ExprValue value = ExprValueUtils.fromObjectValue(lookedUpFieldValue); + if (appendOnly) { + resultTupleBuilder.putIfAbsent(finalFieldName, value); + } else { + resultTupleBuilder.put(finalFieldName, value); + } + } + + return ExprTupleValue.fromExprValueMap(resultTupleBuilder); + + } else { + return inputValue; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java index 147f0e08dc..9649b620ed 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import lombok.experimental.UtilityClass; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -78,6 +79,17 @@ public static DedupeOperator dedupe( input, Arrays.asList(expressions), allowedDuplication, keepEmpty, consecutive); } + public static LookupOperator lookup( + PhysicalPlan input, + String indexName, + Map matchFieldMap, + Boolean appendOnly, + Map copyFieldMap, + BiFunction, Map> lookupFunction) { + return new LookupOperator( + input, indexName, matchFieldMap, appendOnly, copyFieldMap, lookupFunction); + } + public WindowOperator window( PhysicalPlan input, NamedExpression windowFunction, WindowDefinition windowDefinition) { return new WindowOperator(input, windowFunction, windowDefinition); diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java index 99b5cc8020..0a58b018cd 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java @@ -64,6 +64,10 @@ public R visitDedupe(DedupeOperator node, C context) { return visitNode(node, context); } + public R visitLookup(LookupOperator node, C context) { + return visitNode(node, context); + } + public R visitValues(ValuesOperator node, C context) { return visitNode(node, context); } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java index 8d935b11d2..5070d9fa49 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java @@ -70,6 +70,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.sql.ast.dsl.AstDSL; import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.DataType; @@ -82,11 +84,14 @@ import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.FetchCursor; import org.opensearch.sql.ast.tree.Kmeans; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.RareTopN.CommandType; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.antlr.SyntaxCheckException; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; @@ -105,10 +110,94 @@ import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.datasource.DataSourceTable; +import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.Table; class AnalyzerTest extends AnalyzerTestBase { + private static final String LOOKUP_TABLE_DEVICE_NAMES = "device_names"; + public static final String NO_SUCH_TABLE = "no_such_table"; + public static final String TABLE_DOES_NOT_EXIST = "does_not_exist"; + + public static Map lookupTableFieldTypes = + new ImmutableMap.Builder() + .put("id", INTEGER) + .put("dev_id", STRING) + .put("serial_number", LONG) + .put("vendor", STRING) + .put("ip_v4", STRING) + .put("firmware_version", LONG) + .put("reliability_factor", DOUBLE) + .put("comment", ExprCoreType.STRING) + .build(); + + private final Table deviceNamesLookupTable; + private Table tableDoesNotExist; + + public AnalyzerTest() { + this.deviceNamesLookupTable = + new Table() { + @Override + public boolean exists() { + return true; + } + + @Override + public void create(Map schema) { + throw new UnsupportedOperationException("Create table is not supported"); + } + + @Override + public Map getFieldTypes() { + return lookupTableFieldTypes; + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + throw new UnsupportedOperationException(); + } + + public Map getReservedFieldTypes() { + return ImmutableMap.of("_test", STRING); + } + }; + this.tableDoesNotExist = + new Table() { + + @Override + public boolean exists() { + return false; + } + + @Override + public Map getFieldTypes() { + return Map.of(); + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + throw new UnsupportedOperationException(); + } + }; + } + + protected StorageEngine storageEngine() { + return (dataSourceSchemaName, tableName) -> { + switch (tableName) { + case NO_SUCH_TABLE: + return null; + case TABLE_DOES_NOT_EXIST: + return tableDoesNotExist; + case LOOKUP_TABLE_DEVICE_NAMES: + return deviceNamesLookupTable; + default: + return table; + } + }; + } + @Test public void filter_relation() { assertAnalyzeEqual( @@ -1767,4 +1856,263 @@ public void visit_close_cursor() { () -> assertEquals("pewpew", ((LogicalFetchCursor) analyzed.getChild().get(0)).getCursor())); } + + @Test + public void visit_lookup_same_field_name_in_query_and_lookup_index() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void visit_lookup_use_multiple_fields() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("comment", STRING), + new ReferenceExpression("comment", STRING), + new ReferenceExpression("dev_id", STRING), + new ReferenceExpression("field_value1", STRING)), + false, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of( + AstDSL.map("comment", "comment"), AstDSL.map("dev_id", "field_value1")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void visit_lookup_various_field_name_in_query_and_lookup_index() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("dev_id", STRING), + new ReferenceExpression("comment", STRING)), + true, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("dev_id", "comment")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(true))), + Collections.emptyList())); + } + + @ParameterizedTest + @ValueSource(strings = {NO_SUCH_TABLE, TABLE_DOES_NOT_EXIST}) + public void visit_lookup_should_report_error_when_lookup_table_does_not_exist(String tableName) { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + tableName, + ImmutableList.of(AstDSL.map("string_value", "comment")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("no_such_field", "comment")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist_in_lookup_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("no_such_field", "comment")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist_in_query_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "no_such_field")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void + visit_lookup_should_error_when_join_field_does_not_exist_in_query_table_but_exist_lookup_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "dev_id")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void + visit_lookup_should_error_when_join_field_does_not_exist_in_lookup_table_but_exist_query_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("string_value", "field_value2")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_target_expression_point_out_function() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map(field("ip_v4"), intLiteral(7))), + ImmutableList.of(argument("appendonly", booleanLiteral(false))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_copy_field() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("maker", STRING))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + ImmutableList.of(AstDSL.map("vendor", "maker")))); + } + + @Test + public void visit_lookup_copy_multiple_fields() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("maker", STRING), + new ReferenceExpression("serial_number", LONG), + new ReferenceExpression("serial", LONG))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + ImmutableList.of( + AstDSL.map("vendor", "maker"), AstDSL.map("serial_number", "serial")))); + } + + @Test + public void visit_lookup_copy_field_when_origin_and_target_is_the_same() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("vendor", STRING))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + ImmutableList.of(AstDSL.map("vendor", "vendor")))); + } + + @Test + public void visit_lookup_should_report_error_when_copy_field_does_not_exist() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("appendonly", booleanLiteral(false))), + ImmutableList.of(AstDSL.map("no_such_field", "maker"))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_copy_target_is_not_a_field() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("appendonly", booleanLiteral(false))), + ImmutableList.of(AstDSL.map(AstDSL.field("vendor"), AstDSL.intLiteral(8)))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_copy_source_is_not_a_field() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("appendonly", booleanLiteral(false))), + ImmutableList.of(AstDSL.map(AstDSL.booleanLiteral(true), AstDSL.field("maker")))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } } diff --git a/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java b/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java index 897347f22d..7c59a27f23 100644 --- a/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java @@ -21,6 +21,7 @@ import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.eval; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.filter; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.limit; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.lookup; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.nested; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.project; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rareTopN; @@ -235,6 +236,33 @@ void can_explain_nested() { explain.apply(plan)); } + @Test + void can_explain_lookup() { + PhysicalPlan plan = + lookup( + tableScan, + "lookup_index_name", + Map.of(ref("lookup_index_field", STRING), ref("query_index_field", STRING)), + true, + Map.of(ref("lookup_index_field_name", STRING), ref("renamed_field", STRING)), + null); + assertEquals( + new ExplainResponse( + new ExplainResponseNode( + "LookupOperator", + Map.of( + "copyfields", + "{lookup_index_field_name=renamed_field}", + "matchfields", + "{lookup_index_field=query_index_field}", + "indexname", + "lookup_index_name", + "appendonly", + true), + singletonList(tableScan.explainNode()))), + explain.apply(plan)); + } + private static class FakeTableScan extends TableScanOperator { @Override public boolean hasNext() { diff --git a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java index 45d8f6c03c..930eb63a03 100644 --- a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java @@ -21,6 +21,7 @@ import static org.opensearch.sql.planner.logical.LogicalPlanDSL.eval; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.limit; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.lookup; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.nested; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.rareTopN; @@ -47,6 +48,7 @@ import org.opensearch.sql.ast.tree.RareTopN.CommandType; import org.opensearch.sql.ast.tree.Sort; import org.opensearch.sql.data.model.ExprBooleanValue; +import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.executor.pagination.PlanSerializer; import org.opensearch.sql.expression.DSL; @@ -58,6 +60,7 @@ import org.opensearch.sql.expression.window.WindowDefinition; import org.opensearch.sql.expression.window.ranking.RowNumberFunction; import org.opensearch.sql.planner.logical.LogicalCloseCursor; +import org.opensearch.sql.planner.logical.LogicalLookup; import org.opensearch.sql.planner.logical.LogicalPaginate; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanDSL; @@ -122,22 +125,27 @@ public void visit_should_return_default_physical_operator() { nested( limit( LogicalPlanDSL.dedupe( - rareTopN( - sort( - eval( - remove( - rename( - aggregation( - filter(values(emptyList()), filterExpr), - aggregators, - groupByExprs), - mappings), - exclude), - newEvalField), - sortField), - CommandType.TOP, - topByExprs, - rareTopNField), + lookup( + rareTopN( + sort( + eval( + remove( + rename( + aggregation( + filter(values(emptyList()), filterExpr), + aggregators, + groupByExprs), + mappings), + exclude), + newEvalField), + sortField), + CommandType.TOP, + topByExprs, + rareTopNField), + "lookup_index_name", + Map.of(), + false, + Map.of()), dedupeField), limit, offset), @@ -152,24 +160,30 @@ public void visit_should_return_default_physical_operator() { PhysicalPlanDSL.nested( PhysicalPlanDSL.limit( PhysicalPlanDSL.dedupe( - PhysicalPlanDSL.rareTopN( - PhysicalPlanDSL.sort( - PhysicalPlanDSL.eval( - PhysicalPlanDSL.remove( - PhysicalPlanDSL.rename( - PhysicalPlanDSL.agg( - PhysicalPlanDSL.filter( - PhysicalPlanDSL.values(emptyList()), - filterExpr), - aggregators, - groupByExprs), - mappings), - exclude), - newEvalField), - sortField), - CommandType.TOP, - topByExprs, - rareTopNField), + PhysicalPlanDSL.lookup( + PhysicalPlanDSL.rareTopN( + PhysicalPlanDSL.sort( + PhysicalPlanDSL.eval( + PhysicalPlanDSL.remove( + PhysicalPlanDSL.rename( + PhysicalPlanDSL.agg( + PhysicalPlanDSL.filter( + PhysicalPlanDSL.values(emptyList()), + filterExpr), + aggregators, + groupByExprs), + mappings), + exclude), + newEvalField), + sortField), + CommandType.TOP, + topByExprs, + rareTopNField), + "lookup_index_name", + Map.of(), + false, + Map.of(), + null), dedupeField), limit, offset), @@ -278,4 +292,37 @@ public void visitPaginate_should_remove_it_from_tree() { new ProjectOperator(new ValuesOperator(List.of(List.of())), List.of(), List.of()); assertEquals(physicalPlanTree, logicalPlanTree.accept(implementor, null)); } + + @Test + public void visitLookup_should_build_LookupOperator() { + LogicalPlan values = values(List.of(DSL.literal("to be or not to be"))); + var logicalPlan = lookup(values, "lookup_index_name", Map.of(), false, Map.of()); + var expectedPhysicalPlan = + PhysicalPlanDSL.lookup( + new ValuesOperator(List.of(List.of(DSL.literal("to be or not to be")))), + "lookup_index_name", + Map.of(), + false, + Map.of(), + null); + + PhysicalPlan lookupOperator = logicalPlan.accept(implementor, null); + + assertEquals(expectedPhysicalPlan, lookupOperator); + } + + @Test + public void visitLookup_should_throw_unsupportedOperationException() { + LogicalLookup input = mock(LogicalLookup.class); + LogicalPlan dataSource = mock(LogicalPlan.class); + PhysicalPlan physicalSource = mock(PhysicalPlan.class); + when(dataSource.accept(implementor, null)).thenReturn(physicalSource); + when(input.getChild()).thenReturn(List.of(dataSource)); + PhysicalPlan lookupOperator = implementor.visitLookup(input, null); + when(physicalSource.next()).thenReturn(ExprValueUtils.tupleValue(Map.of("field", "value"))); + + var ex = assertThrows(UnsupportedOperationException.class, () -> lookupOperator.next()); + + assertEquals("Lookup not implemented by DefaultImplementor", ex.getMessage()); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java index f212749f48..f5719fa5c8 100644 --- a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java @@ -125,6 +125,7 @@ public TableWriteOperator build(PhysicalPlan child) { LogicalPlan ad = new LogicalAD(relation, Map.of()); LogicalPlan ml = new LogicalML(relation, Map.of()); LogicalPlan paginate = new LogicalPaginate(42, List.of(relation)); + LogicalPlan lookup = new LogicalLookup(relation, "lookup_index", Map.of(), true, Map.of()); List> nestedArgs = List.of( @@ -163,7 +164,8 @@ public TableWriteOperator build(PhysicalPlan child) { paginate, nested, cursor, - closeCursor) + closeCursor, + lookup) .map(Arguments::of); } @@ -214,5 +216,14 @@ public Integer visitRareTopN(LogicalRareTopN plan, Object context) { .mapToInt(Integer::intValue) .sum(); } + + @Override + public Integer visitLookup(LogicalLookup plan, Object context) { + return 1 + + plan.getChild().stream() + .map(child -> child.accept(this, context)) + .mapToInt(Integer::intValue) + .sum(); + } } } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java new file mode 100644 index 0000000000..2da1811e86 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java @@ -0,0 +1,669 @@ +package org.opensearch.sql.planner.physical; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.utils.MatcherUtils.containsNull; +import static org.opensearch.sql.utils.MatcherUtils.containsValue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.expression.ReferenceExpression; + +@ExtendWith(MockitoExtension.class) +public class LookupOperatorTest extends PhysicalPlanTestBase { + + public static final String LOOKUP_INDEX = "lookup_index"; + + public static final ImmutableList> LOOKUP_TABLE = + ImmutableList.of( + ImmutableMap.of( + "id", 1, "ip", "v4", "ip_v4", "112.111.162.4", "region", "USA", "class", "A"), + ImmutableMap.of( + "id", 2, "ip", "4", "ip_v4", "74.125.19.106", "region", "EU", "class", "A")); + + public static final ImmutableList> LOOKUP_TABLE_WITH_NULLS = + ImmutableList.of( + new HashMap<>( + ImmutableMap.of( + "id", 1, "ip", "v4", "ip_v4", "112.111.162.4", "region", "USA", "class", "A")) { + { + put("class", null); + } + }, + ImmutableMap.of( + "id", 2, "ip", "4", "ip_v4", "74.125.19.106", "region", "EU", "class", "A")); + + @Mock private BiFunction, Map> lookupFunction; + + @Test + public void lookup_empty_table() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip", Collections.emptyList())); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "112.111.162.4", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "74.125.19.106", + "action", + "POST", + "response", + 200, + "referer", + "www.google.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of("ip", "74.125.19.106", "action", "POST", "response", 500)))); + } + + @Test + public void lookup_append_only_true() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + } + + @Test + public void lookup_append_only_false() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + false, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "v4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "4"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "4"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU")))); + } + + @Test + public void lookup_copy_one_field() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of( + new ReferenceExpression("class", STRING), + new ReferenceExpression("ip_address_class", STRING)), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("ip_address_class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("ip_address_class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("ip_address_class", "A")))); + } + + @Test + public void lookup_copy_multiple_fields() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of( + new ReferenceExpression("class", STRING), + new ReferenceExpression("class", STRING), + new ReferenceExpression("id", INTEGER), + new ReferenceExpression("address_id", INTEGER)), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("class", "A"), + containsValue("address_id", 1)))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("class", "A"), + containsValue("address_id", 2)))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("class", "A"), + containsValue("address_id", 2)))); + } + + @Test + public void lookup_empty_join_field() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + List queryResults = + new ImmutableList.Builder() + .addAll(inputs) + .add( + ExprValueUtils.tupleValue( + ImmutableMap.of("action", "GET", "response", 200, "referer", "www.amazon.com"))) + .build(); + PhysicalPlan plan = + new LookupOperator( + testScan(queryResults), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(6)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of("action", "GET", "response", 200, "referer", "www.amazon.com")))); + } + + @Test + public void lookup_ignore_non_struct_input() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + ExprStringValue stringExpression = + new ExprStringValue("Expression of string type should be ignored"); + List queryResults = + new ImmutableList.Builder().addAll(inputs).add(stringExpression).build(); + PhysicalPlan plan = + new LookupOperator( + testScan(queryResults), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(6)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat(result, hasItem(stringExpression)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void lookup_table_with_nulls(boolean appendOnly) { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE_WITH_NULLS)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + appendOnly, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsNull("class")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + } + + private static @NotNull Answer> lookupTableQueryResults( + String lookupTableFieldName, List> lookupTableContent) { + return invocationOnMock -> { + String lookupTableName = invocationOnMock.getArgument(0); + if (!LOOKUP_INDEX.equals(lookupTableName)) { + return ImmutableMap.of(); + } + HashMap> parameters = invocationOnMock.getArgument(1); + String valueOfJoinFieldInLookupTable = + (String) parameters.get("_match").get(lookupTableFieldName); + if (Objects.isNull(valueOfJoinFieldInLookupTable)) { + return null; + } + return lookupTableContent.stream() + .filter(map -> valueOfJoinFieldInLookupTable.equals(map.get(lookupTableFieldName))) + .findAny() + .orElse(ImmutableMap.of()); + }; + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java index c91ae8787c..de91f93b9a 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java @@ -17,6 +17,7 @@ import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.eval; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.filter; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.limit; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.lookup; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.project; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rareTopN; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.remove; @@ -64,7 +65,16 @@ public void print_physical_plan() { agg( rareTopN( filter( - limit(new TestScan(), 1, 1), + limit( + lookup( + new TestScan(), + "lookup_index", + Map.of(), + true, + Map.of(), + null), + 1, + 1), DSL.equal(DSL.ref("response", INTEGER), DSL.literal(10))), CommandType.TOP, ImmutableList.of(), @@ -84,7 +94,8 @@ public void print_physical_plan() { + "\t\t\tAggregation->\n" + "\t\t\t\tRareTopN->\n" + "\t\t\t\t\tFilter->\n" - + "\t\t\t\t\t\tLimit->", + + "\t\t\t\t\t\tLimit->\n" + + "\t\t\t\t\t\t\tLookup->", printer.print(plan)); } @@ -131,6 +142,8 @@ public static Stream getPhysicalPlanForTest() { PhysicalPlan cursorClose = new CursorCloseOperator(plan); + PhysicalPlan lookup = lookup(plan, "lookup_index", Map.of(), false, Map.of(), null); + return Stream.of( Arguments.of(filter, "filter"), Arguments.of(aggregation, "aggregation"), @@ -145,7 +158,8 @@ public static Stream getPhysicalPlanForTest() { Arguments.of(rareTopN, "rareTopN"), Arguments.of(limit, "limit"), Arguments.of(nested, "nested"), - Arguments.of(cursorClose, "cursorClose")); + Arguments.of(cursorClose, "cursorClose"), + Arguments.of(lookup, "Lookup")); } @ParameterizedTest(name = "{1}") @@ -219,6 +233,11 @@ public String visitLimit(LimitOperator node, Integer tabs) { return name(node, "Limit->", tabs); } + @Override + public String visitLookup(LookupOperator node, Integer tabs) { + return name(node, "Lookup->", tabs); + } + private String name(PhysicalPlan node, String current, int tabs) { String child = node.getChild().get(0).accept(this, tabs + 1); StringBuilder sb = new StringBuilder(); diff --git a/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java b/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java index 206f05a38a..6b0d5e2ac7 100644 --- a/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java +++ b/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java @@ -5,7 +5,9 @@ package org.opensearch.sql.utils; +import java.util.Objects; import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; @@ -41,4 +43,77 @@ protected boolean matchesSafely(ExprValue value) { } }; } + + public static TypeSafeDiagnosingMatcher containsValue(String key, Object value) { + Objects.requireNonNull(key, "Key is required"); + Objects.requireNonNull(value, "Value is required"); + return new TypeSafeDiagnosingMatcher<>() { + + @Override + protected boolean matchesSafely(ExprValue item, Description mismatchDescription) { + ExprValue expressionForKey = item.keyValue(key); + if (Objects.isNull(expressionForKey)) { + mismatchDescription + .appendValue(item) + .appendText(" does not contain key ") + .appendValue(key); + return false; + } + Object givenValue = expressionForKey.value(); + if (value.equals(givenValue)) { + return true; + } + mismatchDescription + .appendText(" value for key ") + .appendValue(key) + .appendText(" was ") + .appendValue(givenValue); + return false; + } + + @Override + public void describeTo(Description description) { + description + .appendText("ExprValue should contain key ") + .appendValue(key) + .appendText(" with string value ") + .appendValue(value); + } + }; + } + + public static TypeSafeDiagnosingMatcher containsNull(String key) { + Objects.requireNonNull(key, "Key is required"); + return new TypeSafeDiagnosingMatcher<>() { + + @Override + protected boolean matchesSafely(ExprValue item, Description mismatchDescription) { + ExprValue expressionForKey = item.keyValue(key); + if (Objects.isNull(expressionForKey)) { + mismatchDescription + .appendValue(item) + .appendText(" does not contain key ") + .appendValue(key); + return false; + } + if (expressionForKey.isNull()) { + return true; + } + mismatchDescription + .appendText(" value for key ") + .appendValue(key) + .appendText(" was ") + .appendValue(expressionForKey.value()); + return false; + } + + @Override + public void describeTo(Description description) { + description + .appendText("ExprValue should contain key ") + .appendValue(key) + .appendText(" with null value "); + } + }; + } } diff --git a/docs/user/ppl/cmd/lookup.rst b/docs/user/ppl/cmd/lookup.rst new file mode 100644 index 0000000000..8580161753 --- /dev/null +++ b/docs/user/ppl/cmd/lookup.rst @@ -0,0 +1,139 @@ +============= +lookup +============= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| Use the ``lookup`` command to do a lookup from another index and add the fields and values from the lookup document to the search result. + +Syntax +============ +lookup [AS ] ["," [AS ]]... [appendonly=] [ [AS ]] ["," [AS ]]... + +* lookup-index: mandatory. the name of the lookup index. If more than one is provided, all of them must match. +* lookup-field: mandatory. the name of the lookup field. Must be existing in the lookup-index. It is used to match to a local field (in the current search) to get the lookup document. When there is no lookup document matching it is a no-op. If there is more than one an exception is thrown. +* local-lookup-field: optional. the name of a field in the current search to match against the lookup-field. **Default:** value of lookup-field. +* appendonly: optional. indicates if the values to copy over to the search result from the lookup document should overwrite existing values. If true no existing values are overwritten. **Default:** false. +* source-field: optional. the fields to copy over from the lookup document to the search result. If no such fields are given all fields are copied. **Default:** all fields +* local-source-field: optional. the final name of the field in the search result (if different from the field name in the lookup document). **Default:** value of source-field. + +Note: To check if there is a match between the lookup index and the current search result a term and a match query for the field value of lookup-field is performed. + +Example 1: Simple lookup +============================= + +The example shows a simple lookup to add the name of a person from a lookup index. + +PPL query:: + + os> source=accounts; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | gender | + |------------------+----------| + | 1 | M | + | 13 | F | + +------------------+----------+ + + os> source=hr; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | name | + |------------------+----------| + | 1 | John | + | 13 | Alice | + +------------------+----------+ + + os> source=accounts | lookup hr account_number; + fetched rows / total rows = 2/2 + +------------------+----------+----------+ + | account_number | gender | name | + |------------------+----------|----------| + | 1 | M | John | + | 13 | F | Alice | + +------------------+----------+----------+ + + +Example 2: Lookup with different field names +============================================ + +The example show a lookup to add the name of a person from a lookup index with different field names. + +PPL query:: + + os> source=accounts; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | gender | + |------------------+----------| + | 1 | M | + | 13 | F | + +------------------+----------+ + + os> source=hr; + fetched rows / total rows = 2/2 + +------------------+----------+ + | employee_number | name | + |------------------+----------| + | 1 | John | + | 13 | Alice | + +------------------+----------+ + + os> source=accounts | lookup hr employee_number AS account_number name AS given_name; + fetched rows / total rows = 2/2 + +------------------+----------+----------------+ + | account_number | gender | given_name | + |------------------+----------|----------------| + | 1 | M | John | + | 13 | F | Alice | + +------------------+----------+----------------+ + +Example 3: Full lookup example +============================== + +The example show a lookup to add the name of a person from a lookup index with different field names. + +PPL query:: + + os> source=accounts; + fetched rows / total rows = 3/3 + +------------------+----------+------------+ + | account_number | gender | department | + |------------------+----------+------------| + | 1 | M | finance | + | 13 | F | it | + | 20 | M | it | + +------------------+----------+------------+ + + os> source=hr; + fetched rows / total rows = 4/4 + +------------------+----------+------------+--------+ + | employee_number | name | dep | active | + |------------------+----------|------------|--------| + | 1 | John | finance | true | + | 13 | Alice | finance | false | + | 13 | Melinda | it | true | + | 19 | Jack | finance | true | + +------------------+----------+------------+--------+ + + os> source=accounts | lookup hr employee_number AS account_number, dep AS department appendonly=true name AS given_name, active AS is_active ; + fetched rows / total rows = 2/2 + +------------------+----------+----------------+------------+-----------+ + | account_number | gender | given_name | department | is_active | + |------------------+----------|----------------|------------|-----------| + | 1 | M | John | finance | true | + | 13 | F | Melinda | it | true | + | 20 | M | NULL | it | true | + +------------------+----------+----------------+------------+-----------+ + + +Limitation +========== +The ``lookup`` command is not rewritten to OpenSearch DSL, it is only executed on the coordination node. diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 8a0ad563a6..199af35553 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -709,7 +709,17 @@ public enum Index { TestsConstants.TEST_INDEX_NESTED_WITH_NULLS, "multi_nested", getNestedTypeIndexMapping(), - "src/test/resources/nested_with_nulls.json"); + "src/test/resources/nested_with_nulls.json"), + IOT_READINGS( + TestsConstants.TEST_INDEX_IOT_READINGS, + "iot_readings", + getMappingFile("iot_readings_index_mapping.json"), + "src/test/resources/iot_readings.json"), + IOT_SENSORS( + TestsConstants.TEST_INDEX_IOT_SENSORS, + "iot_sensors", + getMappingFile("iot_sensors_index_mapping.json"), + "src/test/resources/iot_sensors.json"); private final String name; private final String type; diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index 29bc9813fa..6243c78fe4 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -57,6 +57,8 @@ public class TestsConstants { public static final String TEST_INDEX_WILDCARD = TEST_INDEX + "_wildcard"; public static final String TEST_INDEX_MULTI_NESTED_TYPE = TEST_INDEX + "_multi_nested"; public static final String TEST_INDEX_NESTED_WITH_NULLS = TEST_INDEX + "_nested_with_nulls"; + public static final String TEST_INDEX_IOT_READINGS = TEST_INDEX + "_iot_readings"; + public static final String TEST_INDEX_IOT_SENSORS = TEST_INDEX + "_iot_sensors"; public static final String DATASOURCES = ".ql-datasources"; public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java new file mode 100644 index 0000000000..ade37b1241 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java @@ -0,0 +1,207 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_IOT_READINGS; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_IOT_SENSORS; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +public class LookupCommandIT extends PPLIntegTestCase { + + @Override + public void init() throws IOException { + loadIndex(Index.IOT_READINGS); + loadIndex(Index.IOT_SENSORS); + } + + @Test + public void testLookup() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | lookup %s did as device-id | sort @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows( + 28.1, + "2015-01-20 15:31:32.406431", + 255, + "temperature-basement", + "meter", + 255, + "VendorOne"), + rows( + 27.8, + "2016-01-20 15:31:33.509334", + 256, + "temperature-living-room", + "temperature meter", + 256, + "VendorTwo"), + rows( + 27.4, + "2017-01-20 15:31:35.732436", + 257, + "temperature-bedroom", + "camcorder", + 257, + "VendorThree"), + rows( + 28.5, + "2018-01-20 15:32:32.406431", + 255, + "temperature-basement", + "meter", + 255, + "VendorOne"), + rows( + 27.9, + "2019-01-20 15:32:33.509334", + 256, + "temperature-living-room", + "temperature meter", + 256, + "VendorTwo"), + rows( + 27.4, + "2020-01-20 15:32:35.732436", + 257, + "temperature-bedroom", + "camcorder", + 257, + "VendorThree")); + } + + @Test + public void testLookupSelectedAttribute() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | lookup %s did as device-id type, vendor | sort @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows(28.1, "2015-01-20 15:31:32.406431", 255, "meter", "VendorOne"), + rows(27.8, "2016-01-20 15:31:33.509334", 256, "temperature meter", "VendorTwo"), + rows(27.4, "2017-01-20 15:31:35.732436", 257, "camcorder", "VendorThree"), + rows(28.5, "2018-01-20 15:32:32.406431", 255, "meter", "VendorOne"), + rows(27.9, "2019-01-20 15:32:33.509334", 256, "temperature meter", "VendorTwo"), + rows(27.4, "2020-01-20 15:32:35.732436", 257, "camcorder", "VendorThree")); + } + + @Test + public void testLookupRenameSelectedAttributes() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | lookup %s did as device-id did as dev_id, type as kind, vendor | sort" + + " @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows(28.1, "2015-01-20 15:31:32.406431", 255, 255, "meter", "VendorOne"), + rows(27.8, "2016-01-20 15:31:33.509334", 256, 256, "temperature meter", "VendorTwo"), + rows(27.4, "2017-01-20 15:31:35.732436", 257, 257, "camcorder", "VendorThree"), + rows(28.5, "2018-01-20 15:32:32.406431", 255, 255, "meter", "VendorOne"), + rows(27.9, "2019-01-20 15:32:33.509334", 256, 256, "temperature meter", "VendorTwo"), + rows(27.4, "2020-01-20 15:32:35.732436", 257, 257, "camcorder", "VendorThree")); + } + + @Test + public void testLookupSelectedMultipleAttributes() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | lookup %s did as device-id type | sort @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows(28.1, "2015-01-20 15:31:32.406431", 255, "meter"), + rows(27.8, "2016-01-20 15:31:33.509334", 256, "temperature meter"), + rows(27.4, "2017-01-20 15:31:35.732436", 257, "camcorder"), + rows(28.5, "2018-01-20 15:32:32.406431", 255, "meter"), + rows(27.9, "2019-01-20 15:32:33.509334", 256, "temperature meter"), + rows(27.4, "2020-01-20 15:32:35.732436", 257, "camcorder")); + } + + @Test + public void testLookupShouldAppendOnlyShouldBeFalseByDefault() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | rename temperature as vendor | lookup %s did as device-id | sort" + + " @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows("2015-01-20 15:31:32.406431", 255, "VendorOne", "temperature-basement", "meter", 255), + rows( + "2016-01-20 15:31:33.509334", + 256, + "VendorTwo", + "temperature-living-room", + "temperature meter", + 256), + rows( + "2017-01-20 15:31:35.732436", + 257, + "VendorThree", + "temperature-bedroom", + "camcorder", + 257), + rows("2018-01-20 15:32:32.406431", 255, "VendorOne", "temperature-basement", "meter", 255), + rows( + "2019-01-20 15:32:33.509334", + 256, + "VendorTwo", + "temperature-living-room", + "temperature meter", + 256), + rows( + "2020-01-20 15:32:35.732436", + 257, + "VendorThree", + "temperature-bedroom", + "camcorder", + 257)); + } + + @Test + public void testLookupWithAppendOnlyFalse() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | rename temperature as vendor | lookup %s did as device-id appendonly =" + + " true | sort @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows("2015-01-20 15:31:32.406431", 255, 28.1, "temperature-basement", "meter", 255), + rows( + "2016-01-20 15:31:33.509334", + 256, + 27.8, + "temperature-living-room", + "temperature meter", + 256), + rows("2017-01-20 15:31:35.732436", 257, 27.4, "temperature-bedroom", "camcorder", 257), + rows("2018-01-20 15:32:32.406431", 255, 28.5, "temperature-basement", "meter", 255), + rows( + "2019-01-20 15:32:33.509334", + 256, + 27.9, + "temperature-living-room", + "temperature meter", + 256), + rows("2020-01-20 15:32:35.732436", 257, 27.4, "temperature-bedroom", "camcorder", 257)); + } +} diff --git a/integ-test/src/test/resources/indexDefinitions/iot_readings_index_mappings.json b/integ-test/src/test/resources/indexDefinitions/iot_readings_index_mappings.json new file mode 100644 index 0000000000..a97c6c170e --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/iot_readings_index_mappings.json @@ -0,0 +1,24 @@ +{ + "mappings": { + "properties": { + "device-id": { + "type": "long" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "temperature": { + "type": "float" + }, + "timestamp": { + "type": "date" + } + } + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/indexDefinitions/iot_sensors_index_mappings.json b/integ-test/src/test/resources/indexDefinitions/iot_sensors_index_mappings.json new file mode 100644 index 0000000000..e9682a390e --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/iot_sensors_index_mappings.json @@ -0,0 +1,36 @@ +{ + "mappings": { + "properties": { + "did": { + "type": "long" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "vendor": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/iot_readings.json b/integ-test/src/test/resources/iot_readings.json new file mode 100644 index 0000000000..be7e542e87 --- /dev/null +++ b/integ-test/src/test/resources/iot_readings.json @@ -0,0 +1,13 @@ +{ "index" : { "_id" : "1" } } +{ "device-id":255, "temperature":28.1, "@timestamp":"2015-01-20T15:31:32.406431+00:00" } +{ "index" : { "_id" : "2" } } +{ "device-id":256, "temperature":27.8, "@timestamp":"2016-01-20T15:31:33.509334+00:00" } +{ "index" : { "_id" : "3" } } +{ "device-id":257, "temperature":27.4, "@timestamp":"2017-01-20T15:31:35.732436+00:00" } +{ "index" : { "_id" : "4" } } +{ "device-id":255, "temperature":28.5, "@timestamp":"2018-01-20T15:32:32.406431+00:00" } +{ "index" : { "_id" : "5" } } +{ "device-id":256, "temperature":27.9, "@timestamp":"2019-01-20T15:32:33.509334+00:00" } +{ "index" : { "_id" : "6" } } +{ "device-id":257, "temperature":27.4, "@timestamp":"2020-01-20T15:32:35.732436+00:00" } +{ "index" : { "_id" : "7" } } diff --git a/integ-test/src/test/resources/iot_sensors.json b/integ-test/src/test/resources/iot_sensors.json new file mode 100644 index 0000000000..a36dd0aaa4 --- /dev/null +++ b/integ-test/src/test/resources/iot_sensors.json @@ -0,0 +1,6 @@ +{ "index" : { "_id" : "1" } } +{ "did" : 255, "name":"temperature-basement", "vendor":"VendorOne", "type":"meter"} +{ "index" : { "_id" : "2" } } +{ "did" : 256, "name":"temperature-living-room", "vendor":"VendorTwo", "type":"temperature meter" } +{ "index" : { "_id" : "3" } } +{ "did" : 257, "name":"temperature-bedroom", "vendor":"VendorThree", "type":"camcorder"} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java index 0905c2f4b4..cb86978168 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java @@ -5,8 +5,20 @@ package org.opensearch.sql.opensearch.executor.protector; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.monitor.ResourceMonitor; +import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.planner.physical.ADOperator; import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; @@ -16,6 +28,7 @@ import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; import org.opensearch.sql.planner.physical.LimitOperator; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.ProjectOperator; @@ -29,11 +42,16 @@ /** OpenSearch Execution Protector. */ @RequiredArgsConstructor +@AllArgsConstructor public class OpenSearchExecutionProtector extends ExecutionProtector { /** OpenSearch resource monitor. */ private final ResourceMonitor resourceMonitor; + @Nullable + /** OpenSearch client. Maybe null * */ + private OpenSearchClient openSearchClient; + public PhysicalPlan protect(PhysicalPlan physicalPlan) { return physicalPlan.accept(this, null); } @@ -116,6 +134,70 @@ public PhysicalPlan visitDedupe(DedupeOperator node, Object context) { node.getConsecutive()); } + @Override + public PhysicalPlan visitLookup(LookupOperator node, Object context) { + return new LookupOperator( + visitInput(node.getInput(), context), + node.getIndexName(), + node.getMatchFieldMap(), + node.getAppendOnly(), + node.getCopyFieldMap(), + lookup()); + } + + private BiFunction, Map> lookup() { + + if (openSearchClient == null) { + throw new RuntimeException( + "Can not perform lookup because openSearchClient was null. This is likely a bug."); + } + + return (indexName, inputMap) -> { + Map matchMap = (Map) inputMap.get("_match"); + Set copySet = (Set) inputMap.get("_copy"); + + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + + for (Map.Entry f : matchMap.entrySet()) { + BoolQueryBuilder orQueryBuilder = new BoolQueryBuilder(); + + // Todo: Search with term and a match query? Or terms only? + orQueryBuilder.should(new TermQueryBuilder(f.getKey(), f.getValue().toString())); + orQueryBuilder.should(new MatchQueryBuilder(f.getKey(), f.getValue().toString())); + orQueryBuilder.minimumShouldMatch(1); + + // filter is the same as "must" but ignores scoring + boolQueryBuilder.filter(orQueryBuilder); + } + + SearchResponse result = + openSearchClient + .getNodeClient() + .search( + new SearchRequest(indexName) + .source( + SearchSourceBuilder.searchSource() + .fetchSource( + copySet == null ? null : copySet.toArray(new String[0]), null) + .query(boolQueryBuilder) + .size(2))) + .actionGet(); + + int hits = result.getHits().getHits().length; + + if (hits == 0) { + // null indicates no hits for the lookup found + return null; + } + + if (hits != 1) { + throw new RuntimeException("too many hits for " + indexName + " (" + hits + ")"); + } + + return result.getHits().getHits()[0].getSourceAsMap(); + }; + } + @Override public PhysicalPlan visitWindow(WindowOperator node, Object context) { return new WindowOperator( diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java index 33a785c498..8789c79c30 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java @@ -69,8 +69,8 @@ public ResourceMonitor resourceMonitor(Settings settings) { } @Provides - public ExecutionProtector protector(ResourceMonitor resourceMonitor) { - return new OpenSearchExecutionProtector(resourceMonitor); + public ExecutionProtector protector(ResourceMonitor resourceMonitor, OpenSearchClient client) { + return new OpenSearchExecutionProtector(resourceMonitor, client); } @Provides diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 9f707c13cd..f9c8382004 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -57,6 +57,7 @@ NUM: 'NUM'; // ARGUMENT KEYWORDS KEEPEMPTY: 'KEEPEMPTY'; CONSECUTIVE: 'CONSECUTIVE'; +APPENDONLY: 'APPENDONLY'; DEDUP_SPLITVALUES: 'DEDUP_SPLITVALUES'; PARTITIONS: 'PARTITIONS'; ALLNUM: 'ALLNUM'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 5a9c179d1a..32fa477dfc 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -38,6 +38,7 @@ commands | renameCommand | statsCommand | dedupCommand + | lookupCommand | sortCommand | evalCommand | headCommand @@ -85,6 +86,18 @@ dedupCommand : DEDUP (number = integerLiteral)? fieldList (KEEPEMPTY EQUAL keepempty = booleanLiteral)? (CONSECUTIVE EQUAL consecutive = booleanLiteral)? ; +matchFieldWithOptAs + : orignalMatchField = fieldExpression (AS asMatchField = fieldExpression)? + ; + +copyFieldWithOptAs + : orignalCopyField = fieldExpression (AS asCopyField = fieldExpression)? + ; + +lookupCommand + : LOOKUP tableSource matchFieldWithOptAs (COMMA matchFieldWithOptAs)* (APPENDONLY EQUAL appendonly = booleanLiteral)? (copyFieldWithOptAs (COMMA copyFieldWithOptAs)*)* + ; + sortCommand : SORT sortbyClause ; @@ -832,6 +845,7 @@ keywordsCanBeId | RENAME | STATS | DEDUP + | LOOKUP | SORT | EVAL | HEAD diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 3c693fa0bd..30a448924e 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -12,6 +12,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldsCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FromClauseContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.HeadCommandContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.LookupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RareCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RenameCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SearchFilterFromContext; @@ -54,6 +55,7 @@ import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Project; @@ -212,6 +214,47 @@ public UnresolvedPlan visitDedupCommand(DedupCommandContext ctx) { return new Dedupe(ArgumentFactory.getArgumentList(ctx), getFieldList(ctx.fieldList())); } + /** Lookup command */ + @Override + public UnresolvedPlan visitLookupCommand(LookupCommandContext ctx) { + ArgumentFactory.getArgumentList(ctx); + ctx.tableSource(); + ctx.copyFieldWithOptAs(); + ctx.matchFieldWithOptAs(); + return new Lookup( + ctx.tableSource().tableQualifiedName().getText(), + ctx.matchFieldWithOptAs().stream() + .map( + ct -> + new Map( + evaluateFieldExpressionContext(ct.orignalMatchField), + evaluateFieldExpressionContext(ct.asMatchField, ct.orignalMatchField))) + .collect(Collectors.toList()), + ArgumentFactory.getArgumentList(ctx), + ctx.copyFieldWithOptAs().stream() + .map( + ct -> + new Map( + evaluateFieldExpressionContext(ct.orignalCopyField), + evaluateFieldExpressionContext(ct.asCopyField, ct.orignalCopyField))) + .collect(Collectors.toList())); + } + + private UnresolvedExpression evaluateFieldExpressionContext( + OpenSearchPPLParser.FieldExpressionContext f) { + return internalVisitExpression(f); + } + + private UnresolvedExpression evaluateFieldExpressionContext( + OpenSearchPPLParser.FieldExpressionContext f0, + OpenSearchPPLParser.FieldExpressionContext f1) { + if (f0 == null) { + return internalVisitExpression(f1); + } else { + return internalVisitExpression(f0); + } + } + /** Head command visitor. */ @Override public UnresolvedPlan visitHeadCommand(HeadCommandContext ctx) { diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java index f89ecf9c6e..44c90b4d89 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java @@ -9,6 +9,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.DedupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldsCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IntegerLiteralContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.LookupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RareCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SortFieldContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StatsCommandContext; @@ -80,6 +81,13 @@ public static List getArgumentList(DedupCommandContext ctx) { : new Argument("consecutive", new Literal(false, DataType.BOOLEAN))); } + public static List getArgumentList(LookupCommandContext ctx) { + return Arrays.asList( + ctx.appendonly != null + ? new Argument("appendonly", getArgumentValue(ctx.appendonly)) + : new Argument("appendonly", new Literal(false, DataType.BOOLEAN))); + } + /** * Get list of {@link Argument}. * diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index d28e5d122b..888fe5fc4b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -36,6 +36,7 @@ import org.opensearch.sql.ast.tree.Eval; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareTopN; import org.opensearch.sql.ast.tree.Relation; @@ -213,6 +214,41 @@ public String visitDedupe(Dedupe node, String context) { child, fields, allowedDuplication, keepEmpty, consecutive); } + @Override + public String visitLookup(Lookup node, String context) { + String child = node.getChild().get(0).accept(this, context); + String lookupIndexName = node.getIndexName(); + ImmutableMap.Builder matchMapBuilder = new ImmutableMap.Builder<>(); + for (Map matchMap : node.getMatchFieldList()) { + matchMapBuilder.put( + visitExpression(matchMap.getOrigin()), + ((Field) matchMap.getTarget()).getField().toString()); + } + String matches = + matchMapBuilder.build().entrySet().stream() + .map(entry -> StringUtils.format("%s as %s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",")); + + ImmutableMap.Builder copyMapBuilder = new ImmutableMap.Builder<>(); + for (Map copyMap : node.getCopyFieldList()) { + copyMapBuilder.put( + visitExpression(copyMap.getOrigin()), + ((Field) copyMap.getTarget()).getField().toString()); + } + String copies = + copyMapBuilder.build().entrySet().stream() + .map(entry -> StringUtils.format("%s as %s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",")); + + List options = node.getOptions(); + Boolean appendonly = (Boolean) options.get(0).getValue().getValue(); + + return StringUtils.format( + "%s | lookup %s %s appendonly=%b %s", + child, lookupIndexName, matches, appendonly, copies) + .trim(); + } + @Override public String visitHead(Head node, String context) { String child = node.getChild().get(0).accept(this, context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index c9989a49c4..6440ed49d0 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -21,11 +21,13 @@ import static org.opensearch.sql.ast.dsl.AstDSL.eval; import static org.opensearch.sql.ast.dsl.AstDSL.exprList; import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.fieldMap; import static org.opensearch.sql.ast.dsl.AstDSL.filter; import static org.opensearch.sql.ast.dsl.AstDSL.function; import static org.opensearch.sql.ast.dsl.AstDSL.head; import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.let; +import static org.opensearch.sql.ast.dsl.AstDSL.lookup; import static org.opensearch.sql.ast.dsl.AstDSL.map; import static org.opensearch.sql.ast.dsl.AstDSL.nullLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.parse; @@ -44,6 +46,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import java.util.Collections; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -378,6 +381,18 @@ public void testDedupCommandWithSortby() { defaultDedupArgs())); } + @Test + public void testLookupCommand() { + assertEqual( + "source=t | lookup a field", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("appendonly", booleanLiteral(false))), + Collections.emptyList())); + } + @Test public void testHeadCommand() { assertEqual("source=t | head", head(relation("t"), 10, 0)); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java index 7bcb87d193..769b6bd0a9 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java @@ -616,10 +616,9 @@ public void functionNameCanBeUsedAsIdentifier() { + " | LOG10 | LOG2 | MOD | PI |POW | POWER | RAND | ROUND | SIGN | SQRT | TRUNCATE " + "| ACOS | ASIN | ATAN | ATAN2 | COS | COT | DEGREES | RADIANS | SIN | TAN"); assertFunctionNameCouldBeId( - "SEARCH | DESCRIBE | SHOW | FROM | WHERE | FIELDS | RENAME | STATS " - + "| DEDUP | SORT | EVAL | HEAD | TOP | RARE | PARSE | METHOD | REGEX | PUNCT | GROK " - + "| PATTERN | PATTERNS | NEW_FIELD | KMEANS | AD | ML | SOURCE | INDEX | D | DESC " - + "| DATASOURCES"); + "SEARCH | DESCRIBE | SHOW | FROM | WHERE | FIELDS | RENAME | STATS | DEDUP | LOOKUP | SORT" + + " | EVAL | HEAD | TOP | RARE | PARSE | METHOD | REGEX | PUNCT | GROK | PATTERN |" + + " PATTERNS | NEW_FIELD | KMEANS | AD | ML | SOURCE | INDEX | D | DESC | DATASOURCES"); } void assertFunctionNameCouldBeId(String antlrFunctionName) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java index 761dbe2997..102f9591fe 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java @@ -14,12 +14,15 @@ import static org.opensearch.sql.ast.dsl.AstDSL.dedupe; import static org.opensearch.sql.ast.dsl.AstDSL.exprList; import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.fieldMap; import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; +import static org.opensearch.sql.ast.dsl.AstDSL.lookup; import static org.opensearch.sql.ast.dsl.AstDSL.projectWithArg; import static org.opensearch.sql.ast.dsl.AstDSL.relation; import static org.opensearch.sql.ast.dsl.AstDSL.sort; import static org.opensearch.sql.ast.dsl.AstDSL.stringLiteral; +import java.util.Collections; import org.junit.Test; import org.opensearch.sql.ppl.parser.AstBuilderTest; @@ -102,4 +105,53 @@ public void testSortFieldArgument() { public void testNoArgConstructorForArgumentFactoryShouldPass() { new ArgumentFactory(); } + + @Test + public void testLookupCommandRequiredArguments() { + assertEqual( + "source=t | lookup a field", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("appendonly", booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void testLookupCommandFieldArguments() { + assertEqual( + "source=t | lookup a field AS field1,field2 AS field3 destfield AS destfield1, destfield2" + + " AS destfield3", + lookup( + relation("t"), + "a", + fieldMap("field", "field1", "field2", "field3"), + exprList(argument("appendonly", booleanLiteral(false))), + fieldMap("destfield", "destfield1", "destfield2", "destfield3"))); + } + + @Test + public void testLookupCommandAppendTrueArgument() { + assertEqual( + "source=t | lookup a field appendonly=true", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("appendonly", booleanLiteral(true))), + Collections.emptyList())); + } + + @Test + public void testLookupCommandAppendFalseArgument() { + assertEqual( + "source=t | lookup a field appendonly=false", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("appendonly", booleanLiteral(false))), + Collections.emptyList())); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index cd51ea07df..88e6db1ab4 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -89,6 +89,22 @@ public void testDedupCommand() { anonymize("source=t | dedup f1, f2")); } + @Test + public void testLookupCommand() { + assertEquals( + "source=t | lookup index field1 as field1,field2 as field2 appendonly=false", + anonymize("source=t | lookup index field1,field2")); + assertEquals( + "source=t | lookup index field1 as field1,field2 as field2 appendonly=true", + anonymize("source=t | lookup index field1,field2 appendonly=true")); + assertEquals( + "source=t | lookup index field1 as field12,field2 as field22 appendonly=false copyfield1 as" + + " copyfield1,copyfield2 as copyfield22", + anonymize( + "source=t | lookup index field1 as field12, field2 AS field22 copyfield1, copyfield2 as" + + " copyfield22")); + } + @Test public void testHeadCommandWithNumber() { assertEquals("source=t | head 3", anonymize("source=t | head 3"));