diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index d5e8b93b13..aeff7443dd 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -50,6 +50,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; @@ -66,6 +67,7 @@ import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.data.model.ExprMissingValue; import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; @@ -89,6 +91,7 @@ import org.opensearch.sql.planner.logical.LogicalFetchCursor; import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalLimit; +import org.opensearch.sql.planner.logical.LogicalLookup; import org.opensearch.sql.planner.logical.LogicalML; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPaginate; @@ -507,6 +510,113 @@ public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) { consecutive); } + /** Build {@link LogicalLookup}. */ + @Override + public LogicalPlan visitLookup(Lookup node, AnalysisContext queryContext) { + LogicalPlan child = node.getChild().get(0).accept(this, queryContext); + List options = node.getOptions(); + // Todo, refactor the option. + Boolean appendOnly = (Boolean) options.get(0).getValue().getValue(); + + Table table = + dataSourceService + .getDataSource(DEFAULT_DATASOURCE_NAME) + .getStorageEngine() + .getTable(null, node.getIndexName()); + + if (table == null || !table.exists()) { + throw new SemanticCheckException( + String.format("no such lookup index %s", node.getIndexName())); + } + + AnalysisContext lookupTableContext = new AnalysisContext(); + TypeEnvironment lookupTableEnvironment = lookupTableContext.peek(); + table + .getFieldTypes() + .forEach( + (name, type) -> + lookupTableEnvironment.define(new Symbol(Namespace.FIELD_NAME, name), type)); + ImmutableMap matchFieldMap = + analyzeLookupMatchFields(node.getMatchFieldList(), queryContext, lookupTableContext); + + return new LogicalLookup( + child, + node.getIndexName(), + matchFieldMap, + appendOnly, + analyzeLookupCopyFields(node.getCopyFieldList(), queryContext, table)); + } + + private ImmutableMap analyzeLookupMatchFields( + List inputMap, AnalysisContext queryContext, AnalysisContext lookupTableContext) { + ImmutableMap.Builder copyMapBuilder = + new ImmutableMap.Builder<>(); + for (Map resultMap : inputMap) { + Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), lookupTableContext); + if (resultMap.getTarget() instanceof Field) { + Expression targerExpression = + expressionAnalyzer.analyze(resultMap.getTarget(), queryContext); + ReferenceExpression targetReference = + DSL.ref(targerExpression.toString(), targerExpression.type()); + ReferenceExpression originReference = DSL.ref(origin.toString(), origin.type()); + TypeEnvironment curEnv = queryContext.peek(); + curEnv.remove(originReference); + curEnv.define(targetReference); + copyMapBuilder.put(originReference, targetReference); + } else { + throw new SemanticCheckException( + String.format("the target expected to be field, but is %s", resultMap.getTarget())); + } + } + + return copyMapBuilder.build(); + } + + private ImmutableMap analyzeLookupCopyFields( + List inputMap, AnalysisContext context, Table table) { + + TypeEnvironment curEnv = context.peek(); + java.util.Map fieldTypes = table.getFieldTypes(); + + if (inputMap.isEmpty()) { + fieldTypes.forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v)); + return ImmutableMap.builder().build(); + } + + ImmutableMap.Builder copyMapBuilder = + new ImmutableMap.Builder<>(); + for (Map resultMap : inputMap) { + if (resultMap.getOrigin() instanceof Field && resultMap.getTarget() instanceof Field) { + String fieldName = ((Field) resultMap.getOrigin()).getField().toString(); + ExprType ex = fieldTypes.get(fieldName); + + if (ex == null) { + throw new SemanticCheckException(String.format("no such field %s", fieldName)); + } + + ReferenceExpression origin = new ReferenceExpression(fieldName, ex); + + if (resultMap.getTarget().equals(resultMap.getOrigin())) { + + curEnv.define(origin); + copyMapBuilder.put(origin, origin); + } else { + ReferenceExpression target = + new ReferenceExpression(((Field) resultMap.getTarget()).getField().toString(), ex); + curEnv.define(target); + copyMapBuilder.put(origin, target); + } + } else { + throw new SemanticCheckException( + String.format( + "the origin and target expected to be field, but is %s/%s", + resultMap.getOrigin(), resultMap.getTarget())); + } + } + + return copyMapBuilder.build(); + } + /** Logical head is identical to {@link LogicalLimit}. */ public LogicalPlan visitHead(Head node, AnalysisContext context) { LogicalPlan child = node.getChild().get(0).accept(this, context); diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 973b10310b..16ebf45854 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -49,6 +49,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; @@ -217,6 +218,10 @@ public T visitDedupe(Dedupe node, C context) { return visitChildren(node, context); } + public T visitLookup(Lookup node, C context) { + return visitChildren(node, context); + } + public T visitHead(Head node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index 4f3056b0f7..9b435a627d 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -5,6 +5,7 @@ package org.opensearch.sql.ast.dsl; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -49,6 +50,7 @@ import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareTopN; @@ -441,6 +443,25 @@ public static Dedupe dedupe(UnresolvedPlan input, List options, Field. return new Dedupe(input, options, Arrays.asList(fields)); } + public static Lookup lookup( + UnresolvedPlan input, + String indexName, + List matchFieldList, + List options, + List copyFieldList) { + return new Lookup(input, indexName, matchFieldList, options, copyFieldList); + } + + public static List fieldMap(String field, String asField, String... more) { + assert more == null || more.length % 2 == 0; + List list = new ArrayList(); + list.add(map(field, asField)); + for (int i = 0; i < more.length; i = i + 2) { + list.add(map(more[i], more[i + 1])); + } + return list; + } + public static Head head(UnresolvedPlan input, Integer size, Integer from) { return new Head(input, size, from); } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java b/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java new file mode 100644 index 0000000000..d9b22cbd26 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.Map; + +/** AST node represent Lookup operation. */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +@AllArgsConstructor +public class Lookup extends UnresolvedPlan { + private UnresolvedPlan child; + private final String indexName; + private final List matchFieldList; + private final List options; + private final List copyFieldList; + + @Override + public Lookup attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/Explain.java b/core/src/main/java/org/opensearch/sql/executor/Explain.java index 0f05b99383..d4b15a1cdc 100644 --- a/core/src/main/java/org/opensearch/sql/executor/Explain.java +++ b/core/src/main/java/org/opensearch/sql/executor/Explain.java @@ -22,6 +22,7 @@ import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; import org.opensearch.sql.planner.physical.LimitOperator; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; @@ -157,6 +158,20 @@ public ExplainResponseNode visitDedupe(DedupeOperator node, Object context) { "consecutive", node.getConsecutive()))); } + @Override + public ExplainResponseNode visitLookup(LookupOperator node, Object context) { + return explain( + node, + context, + explainNode -> + explainNode.setDescription( + ImmutableMap.of( + "copyfields", node.getCopyFieldMap(), + "matchfields", node.getMatchFieldMap(), + "indexname", node.getIndexName(), + "appendonly", node.getAppendOnly()))); + } + @Override public ExplainResponseNode visitRareTopN(RareTopNOperator node, Object context) { return explain( diff --git a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java index b53d17b38f..96a3be1f69 100644 --- a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java +++ b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java @@ -13,6 +13,7 @@ import org.opensearch.sql.planner.logical.LogicalFetchCursor; import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalLimit; +import org.opensearch.sql.planner.logical.LogicalLookup; import org.opensearch.sql.planner.logical.LogicalNested; import org.opensearch.sql.planner.logical.LogicalPaginate; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -31,6 +32,7 @@ import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; import org.opensearch.sql.planner.physical.LimitOperator; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.ProjectOperator; @@ -74,6 +76,19 @@ public PhysicalPlan visitDedupe(LogicalDedupe node, C context) { node.getConsecutive()); } + @Override + public PhysicalPlan visitLookup(LogicalLookup node, C context) { + return new LookupOperator( + visitChild(node, context), + node.getIndexName(), + node.getMatchFieldMap(), + node.getAppendOnly(), + node.getCopyFieldMap(), + (a, b) -> { + throw new RuntimeException("not implemented by DefaultImplementor"); + }); + } + @Override public PhysicalPlan visitProject(LogicalProject node, C context) { return new ProjectOperator( diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java new file mode 100644 index 0000000000..6269b56e37 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.logical; + +import java.util.Arrays; +import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.expression.ReferenceExpression; + +/** Logical Dedupe Plan. */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = true) +public class LogicalLookup extends LogicalPlan { + + private final String indexName; + private final Map matchFieldMap; + private final Map copyFieldMap; + private final Boolean appendOnly; + + /** Constructor of LogicalDedupe. */ + public LogicalLookup( + LogicalPlan child, + String indexName, + Map matchFieldMap, + Boolean appendOnly, + Map copyFieldMap) { + super(Arrays.asList(child)); + this.indexName = indexName; + this.copyFieldMap = copyFieldMap; + this.matchFieldMap = matchFieldMap; + this.appendOnly = appendOnly; + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java index 2a886ba0ca..9fb242a3bc 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java @@ -138,4 +138,13 @@ public LogicalPlan values(List... values) { public static LogicalPlan limit(LogicalPlan input, Integer limit, Integer offset) { return new LogicalLimit(input, limit, offset); } + + public static LogicalPlan lookup( + LogicalPlan input, + String indexName, + Map matchFieldMap, + boolean appendOnly, + Map copyFields) { + return new LogicalLookup(input, indexName, matchFieldMap, appendOnly, copyFields); + } } diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java index 156db35306..c05f8c91f6 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java @@ -52,6 +52,10 @@ public R visitDedupe(LogicalDedupe plan, C context) { return visitNode(plan, context); } + public R visitLookup(LogicalLookup plan, C context) { + return visitNode(plan, context); + } + public R visitRename(LogicalRename plan, C context) { return visitNode(plan, context); } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java new file mode 100644 index 0000000000..77e7149c7e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java @@ -0,0 +1,135 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical; + +import static org.opensearch.sql.data.type.ExprCoreType.STRUCT; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.expression.ReferenceExpression; + +/** Lookup operator. Perform lookup on another OpenSearch index and enrich the results. */ +@Getter +@EqualsAndHashCode(callSuper = false) +public class LookupOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final String indexName; + @Getter private final Map matchFieldMap; + @Getter private final Map copyFieldMap; + @Getter private final Boolean appendOnly; + private final BiFunction, Map> lookup; + + /** Lookup Constructor. */ + @NonNull + public LookupOperator( + PhysicalPlan input, + String indexName, + Map matchFieldMap, + Boolean appendOnly, + Map copyFieldMap, + BiFunction, Map> lookup) { + this.input = input; + this.indexName = indexName; + this.matchFieldMap = matchFieldMap; + this.appendOnly = appendOnly; + this.copyFieldMap = copyFieldMap; + this.lookup = lookup; + } + + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + return visitor.visitLookup(this, context); + } + + @Override + public List getChild() { + return Collections.singletonList(input); + } + + @Override + public boolean hasNext() { + return input.hasNext(); + } + + @Override + public ExprValue next() { + ExprValue inputValue = input.next(); + + if (STRUCT == inputValue.type()) { + Map matchMap = new HashMap<>(); + Map finalMap = new HashMap<>(); + + for (Map.Entry matchField : + matchFieldMap.entrySet()) { + Object val = inputValue.bindingTuples().resolve(matchField.getValue()).value(); + if (val != null) { + matchMap.put(matchField.getKey().toString(), val); + } else { + // No value in search results, so we just return the input + return inputValue; + } + } + + finalMap.put("_match", matchMap); + + Map copyMap = new HashMap<>(); + + if (!copyFieldMap.isEmpty()) { + + for (Map.Entry copyField : + copyFieldMap.entrySet()) { + copyMap.put(String.valueOf(copyField.getKey()), String.valueOf(copyField.getValue())); + } + + finalMap.put("_copy", copyMap.keySet()); + } + + Map source = lookup.apply(indexName, finalMap); + + if (source == null || source.isEmpty()) { + // no lookup found or lookup is empty, so we just return the original input value + return inputValue; + } + + Map tupleValue = ExprValueUtils.getTupleValue(inputValue); + Map resultBuilder = new HashMap<>(); + resultBuilder.putAll(tupleValue); + + if (appendOnly) { + + for (Map.Entry sourceField : source.entrySet()) { + String u = copyMap.get(sourceField.getKey()); + resultBuilder.putIfAbsent( + u == null ? sourceField.getKey() : u.toString(), + ExprValueUtils.fromObjectValue(sourceField.getValue())); + } + } else { + // default + + for (Map.Entry sourceField : source.entrySet()) { + String u = copyMap.get(sourceField.getKey()); + resultBuilder.put( + u == null ? sourceField.getKey() : u.toString(), + ExprValueUtils.fromObjectValue(sourceField.getValue())); + } + } + + return ExprTupleValue.fromExprValueMap(resultBuilder); + + } else { + return inputValue; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java index 147f0e08dc..2ab3f08106 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java @@ -78,6 +78,23 @@ public static DedupeOperator dedupe( input, Arrays.asList(expressions), allowedDuplication, keepEmpty, consecutive); } + public static LookupOperator lookup( + PhysicalPlan input, + String indexName, + Map matchFieldMap, + Boolean appendOnly, + Map copyFieldMap) { + return new LookupOperator( + input, + indexName, + matchFieldMap, + appendOnly, + copyFieldMap, + (a, b) -> { + throw new RuntimeException("not implemented by PhysicalPlanDSL"); + }); + } + public WindowOperator window( PhysicalPlan input, NamedExpression windowFunction, WindowDefinition windowDefinition) { return new WindowOperator(input, windowFunction, windowDefinition); diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java index 99b5cc8020..0a58b018cd 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java @@ -64,6 +64,10 @@ public R visitDedupe(DedupeOperator node, C context) { return visitNode(node, context); } + public R visitLookup(LookupOperator node, C context) { + return visitNode(node, context); + } + public R visitValues(ValuesOperator node, C context) { return visitNode(node, context); } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java index 8d935b11d2..5070d9fa49 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java @@ -70,6 +70,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.sql.ast.dsl.AstDSL; import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.DataType; @@ -82,11 +84,14 @@ import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.FetchCursor; import org.opensearch.sql.ast.tree.Kmeans; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.RareTopN.CommandType; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.antlr.SyntaxCheckException; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; @@ -105,10 +110,94 @@ import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.datasource.DataSourceTable; +import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.Table; class AnalyzerTest extends AnalyzerTestBase { + private static final String LOOKUP_TABLE_DEVICE_NAMES = "device_names"; + public static final String NO_SUCH_TABLE = "no_such_table"; + public static final String TABLE_DOES_NOT_EXIST = "does_not_exist"; + + public static Map lookupTableFieldTypes = + new ImmutableMap.Builder() + .put("id", INTEGER) + .put("dev_id", STRING) + .put("serial_number", LONG) + .put("vendor", STRING) + .put("ip_v4", STRING) + .put("firmware_version", LONG) + .put("reliability_factor", DOUBLE) + .put("comment", ExprCoreType.STRING) + .build(); + + private final Table deviceNamesLookupTable; + private Table tableDoesNotExist; + + public AnalyzerTest() { + this.deviceNamesLookupTable = + new Table() { + @Override + public boolean exists() { + return true; + } + + @Override + public void create(Map schema) { + throw new UnsupportedOperationException("Create table is not supported"); + } + + @Override + public Map getFieldTypes() { + return lookupTableFieldTypes; + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + throw new UnsupportedOperationException(); + } + + public Map getReservedFieldTypes() { + return ImmutableMap.of("_test", STRING); + } + }; + this.tableDoesNotExist = + new Table() { + + @Override + public boolean exists() { + return false; + } + + @Override + public Map getFieldTypes() { + return Map.of(); + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + throw new UnsupportedOperationException(); + } + }; + } + + protected StorageEngine storageEngine() { + return (dataSourceSchemaName, tableName) -> { + switch (tableName) { + case NO_SUCH_TABLE: + return null; + case TABLE_DOES_NOT_EXIST: + return tableDoesNotExist; + case LOOKUP_TABLE_DEVICE_NAMES: + return deviceNamesLookupTable; + default: + return table; + } + }; + } + @Test public void filter_relation() { assertAnalyzeEqual( @@ -1767,4 +1856,263 @@ public void visit_close_cursor() { () -> assertEquals("pewpew", ((LogicalFetchCursor) analyzed.getChild().get(0)).getCursor())); } + + @Test + public void visit_lookup_same_field_name_in_query_and_lookup_index() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void visit_lookup_use_multiple_fields() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("comment", STRING), + new ReferenceExpression("comment", STRING), + new ReferenceExpression("dev_id", STRING), + new ReferenceExpression("field_value1", STRING)), + false, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of( + AstDSL.map("comment", "comment"), AstDSL.map("dev_id", "field_value1")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void visit_lookup_various_field_name_in_query_and_lookup_index() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("dev_id", STRING), + new ReferenceExpression("comment", STRING)), + true, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("dev_id", "comment")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(true))), + Collections.emptyList())); + } + + @ParameterizedTest + @ValueSource(strings = {NO_SUCH_TABLE, TABLE_DOES_NOT_EXIST}) + public void visit_lookup_should_report_error_when_lookup_table_does_not_exist(String tableName) { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + tableName, + ImmutableList.of(AstDSL.map("string_value", "comment")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("no_such_field", "comment")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist_in_lookup_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("no_such_field", "comment")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist_in_query_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "no_such_field")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void + visit_lookup_should_error_when_join_field_does_not_exist_in_query_table_but_exist_lookup_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "dev_id")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void + visit_lookup_should_error_when_join_field_does_not_exist_in_lookup_table_but_exist_query_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("string_value", "field_value2")), + ImmutableList.of(argument("appendonly", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_target_expression_point_out_function() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map(field("ip_v4"), intLiteral(7))), + ImmutableList.of(argument("appendonly", booleanLiteral(false))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_copy_field() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("maker", STRING))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + ImmutableList.of(AstDSL.map("vendor", "maker")))); + } + + @Test + public void visit_lookup_copy_multiple_fields() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("maker", STRING), + new ReferenceExpression("serial_number", LONG), + new ReferenceExpression("serial", LONG))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + ImmutableList.of( + AstDSL.map("vendor", "maker"), AstDSL.map("serial_number", "serial")))); + } + + @Test + public void visit_lookup_copy_field_when_origin_and_target_is_the_same() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("vendor", STRING))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))), + ImmutableList.of(AstDSL.map("vendor", "vendor")))); + } + + @Test + public void visit_lookup_should_report_error_when_copy_field_does_not_exist() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("appendonly", booleanLiteral(false))), + ImmutableList.of(AstDSL.map("no_such_field", "maker"))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_copy_target_is_not_a_field() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("appendonly", booleanLiteral(false))), + ImmutableList.of(AstDSL.map(AstDSL.field("vendor"), AstDSL.intLiteral(8)))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_copy_source_is_not_a_field() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("appendonly", booleanLiteral(false))), + ImmutableList.of(AstDSL.map(AstDSL.booleanLiteral(true), AstDSL.field("maker")))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java new file mode 100644 index 0000000000..94fda7c791 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java @@ -0,0 +1,534 @@ +package org.opensearch.sql.planner.physical; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.expression.ReferenceExpression; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.utils.MatcherUtils.containsNull; +import static org.opensearch.sql.utils.MatcherUtils.containsValue; + +@ExtendWith(MockitoExtension.class) +public class LookupOperatorTest extends PhysicalPlanTestBase { + + public static final String LOOKUP_INDEX = "lookup_index"; + + public static final ImmutableList> LOOKUP_TABLE = ImmutableList.of( + ImmutableMap.of("id", 1, "ip","v4", "ip_v4", "112.111.162.4", "region", "USA", "class", "A"), + ImmutableMap.of("id", 2,"ip","4", "ip_v4", "74.125.19.106", "region", "EU", "class", "A")); + + public static final ImmutableList> LOOKUP_TABLE_WITH_NULLS = ImmutableList.of( + new HashMap<>(ImmutableMap.of("id", 1, "ip","v4", "ip_v4", "112.111.162.4", "region", "USA", "class", "A")){{ + put("class", null); + }}, + ImmutableMap.of("id", 2,"ip","4", "ip_v4", "74.125.19.106", "region", "EU", "class", "A")); + + + @Mock + private BiFunction, Map> lookupFunction; + + @Test + public void lookup_empty_table() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())).thenAnswer(lookupTableQueryResults("ip", Collections.emptyList())); + PhysicalPlan plan = new LookupOperator( + new TestScan(), LOOKUP_INDEX, + ImmutableMap.of(new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem( ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "112.111.162.4", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "74.125.19.106", + "action", + "POST", + "response", + 200, + "referer", + "www.google.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of("ip", "74.125.19.106", "action", "POST", "response", 500)))); + } + + @Test + public void lookup_append_only_true() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())).thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = new LookupOperator( + new TestScan(), LOOKUP_INDEX, + ImmutableMap.of(new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A") + ))); + } + + @Test + public void lookup_append_only_false() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())).thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = new LookupOperator( + new TestScan(), LOOKUP_INDEX, + ImmutableMap.of(new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + false, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(allOf( + containsValue("ip", "v4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 200), + containsValue("ip", "4"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 500), + containsValue("ip", "4"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU") + ))); + } + + @Test + public void lookup_copy_one_field() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())).thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = new LookupOperator( + new TestScan(), LOOKUP_INDEX, + ImmutableMap.of(new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(new ReferenceExpression("class", STRING), new ReferenceExpression("ip_address_class", STRING)), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("ip_address_class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("ip_address_class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("ip_address_class", "A") + ))); + } + + @Test + public void lookup_copy_multiple_fields() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())).thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = new LookupOperator( + new TestScan(), LOOKUP_INDEX, + ImmutableMap.of(new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(new ReferenceExpression("class", STRING), new ReferenceExpression("class", STRING), + new ReferenceExpression("id", INTEGER), new ReferenceExpression("address_id", INTEGER)), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("class", "A"), + containsValue("address_id", 1) + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("class", "A"), + containsValue("address_id", 2) + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("class", "A"), + containsValue("address_id", 2) + ))); + } + + @Test + public void lookup_empty_join_field() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())).thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + List queryResults = new ImmutableList.Builder() + .addAll(inputs) + .add(ExprValueUtils.tupleValue( + ImmutableMap.of( + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com"))) + .build(); + PhysicalPlan plan = new LookupOperator( + testScan(queryResults), LOOKUP_INDEX, + ImmutableMap.of(new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(6)); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + } + + @Test + public void lookup_ignore_non_struct_input() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())).thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + ExprStringValue stringExpression = new ExprStringValue("Expression of string type should be ignored"); + List queryResults = new ImmutableList.Builder() + .addAll(inputs) + .add(stringExpression) + .build(); + PhysicalPlan plan = new LookupOperator( + testScan(queryResults), LOOKUP_INDEX, + ImmutableMap.of(new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(6)); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(stringExpression)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void lookup_table_with_nulls(boolean appendOnly) { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())).thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE_WITH_NULLS)); + PhysicalPlan plan = new LookupOperator( + new TestScan(), LOOKUP_INDEX, + ImmutableMap.of(new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + appendOnly, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat(result, hasItem(allOf( + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsNull("class") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 200), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A") + ))); + assertThat(result, hasItem(allOf( + containsValue("response", 500), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A") + ))); + } + + private static @NotNull Answer> lookupTableQueryResults(String lookupTableFieldName, List> lookupTableContent) { + return invocationOnMock -> { + String lookupTableName = invocationOnMock.getArgument(0); + if(!LOOKUP_INDEX.equals(lookupTableName)) { + return ImmutableMap.of(); + } + HashMap> parameters = invocationOnMock.getArgument(1); + String valueOfJoinFieldInLookupTable = (String) parameters.get("_match").get(lookupTableFieldName); + if(Objects.isNull(valueOfJoinFieldInLookupTable)) { + return null; + } + return lookupTableContent.stream() + .filter(map -> valueOfJoinFieldInLookupTable.equals(map.get(lookupTableFieldName))) + .findAny() + .orElse(ImmutableMap.of()); + }; + } + +} diff --git a/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java b/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java index 206f05a38a..1e6895edc9 100644 --- a/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java +++ b/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java @@ -6,10 +6,13 @@ package org.opensearch.sql.utils; import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; +import java.util.Objects; + /** Matcher Utils. */ public class MatcherUtils { /** Check {@link ExprValue} type equal to {@link ExprCoreType}. */ @@ -41,4 +44,56 @@ protected boolean matchesSafely(ExprValue value) { } }; } + + public static TypeSafeDiagnosingMatcher containsValue(String key, Object value) { + Objects.requireNonNull(key, "Key is required"); + Objects.requireNonNull(value, "Value is required"); + return new TypeSafeDiagnosingMatcher<>() { + + @Override + protected boolean matchesSafely(ExprValue item, Description mismatchDescription) { + ExprValue expressionForKey = item.keyValue(key); + if(Objects.isNull(expressionForKey)) { + mismatchDescription.appendValue(item).appendText(" does not contain key ").appendValue(key); + return false; + } + Object givenValue = expressionForKey.value(); + if(value.equals(givenValue)) { + return true; + } + mismatchDescription.appendText(" value for key ").appendValue(key).appendText(" was ").appendValue(givenValue); + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("ExprValue should contain key ").appendValue(key).appendText(" with string value ").appendValue(value); + } + }; + } + + public static TypeSafeDiagnosingMatcher containsNull(String key) { + Objects.requireNonNull(key, "Key is required"); + return new TypeSafeDiagnosingMatcher<>() { + + @Override + protected boolean matchesSafely(ExprValue item, Description mismatchDescription) { + ExprValue expressionForKey = item.keyValue(key); + if(Objects.isNull(expressionForKey)) { + mismatchDescription.appendValue(item).appendText(" does not contain key ").appendValue(key); + return false; + } + if(expressionForKey.isNull()) { + return true; + } + mismatchDescription.appendText(" value for key ").appendValue(key).appendText(" was ").appendValue(expressionForKey.value()); + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("ExprValue should contain key ").appendValue(key).appendText(" with null value "); + } + }; + } } diff --git a/docs/user/ppl/cmd/lookup.rst b/docs/user/ppl/cmd/lookup.rst new file mode 100644 index 0000000000..8580161753 --- /dev/null +++ b/docs/user/ppl/cmd/lookup.rst @@ -0,0 +1,139 @@ +============= +lookup +============= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| Use the ``lookup`` command to do a lookup from another index and add the fields and values from the lookup document to the search result. + +Syntax +============ +lookup [AS ] ["," [AS ]]... [appendonly=] [ [AS ]] ["," [AS ]]... + +* lookup-index: mandatory. the name of the lookup index. If more than one is provided, all of them must match. +* lookup-field: mandatory. the name of the lookup field. Must be existing in the lookup-index. It is used to match to a local field (in the current search) to get the lookup document. When there is no lookup document matching it is a no-op. If there is more than one an exception is thrown. +* local-lookup-field: optional. the name of a field in the current search to match against the lookup-field. **Default:** value of lookup-field. +* appendonly: optional. indicates if the values to copy over to the search result from the lookup document should overwrite existing values. If true no existing values are overwritten. **Default:** false. +* source-field: optional. the fields to copy over from the lookup document to the search result. If no such fields are given all fields are copied. **Default:** all fields +* local-source-field: optional. the final name of the field in the search result (if different from the field name in the lookup document). **Default:** value of source-field. + +Note: To check if there is a match between the lookup index and the current search result a term and a match query for the field value of lookup-field is performed. + +Example 1: Simple lookup +============================= + +The example shows a simple lookup to add the name of a person from a lookup index. + +PPL query:: + + os> source=accounts; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | gender | + |------------------+----------| + | 1 | M | + | 13 | F | + +------------------+----------+ + + os> source=hr; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | name | + |------------------+----------| + | 1 | John | + | 13 | Alice | + +------------------+----------+ + + os> source=accounts | lookup hr account_number; + fetched rows / total rows = 2/2 + +------------------+----------+----------+ + | account_number | gender | name | + |------------------+----------|----------| + | 1 | M | John | + | 13 | F | Alice | + +------------------+----------+----------+ + + +Example 2: Lookup with different field names +============================================ + +The example show a lookup to add the name of a person from a lookup index with different field names. + +PPL query:: + + os> source=accounts; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | gender | + |------------------+----------| + | 1 | M | + | 13 | F | + +------------------+----------+ + + os> source=hr; + fetched rows / total rows = 2/2 + +------------------+----------+ + | employee_number | name | + |------------------+----------| + | 1 | John | + | 13 | Alice | + +------------------+----------+ + + os> source=accounts | lookup hr employee_number AS account_number name AS given_name; + fetched rows / total rows = 2/2 + +------------------+----------+----------------+ + | account_number | gender | given_name | + |------------------+----------|----------------| + | 1 | M | John | + | 13 | F | Alice | + +------------------+----------+----------------+ + +Example 3: Full lookup example +============================== + +The example show a lookup to add the name of a person from a lookup index with different field names. + +PPL query:: + + os> source=accounts; + fetched rows / total rows = 3/3 + +------------------+----------+------------+ + | account_number | gender | department | + |------------------+----------+------------| + | 1 | M | finance | + | 13 | F | it | + | 20 | M | it | + +------------------+----------+------------+ + + os> source=hr; + fetched rows / total rows = 4/4 + +------------------+----------+------------+--------+ + | employee_number | name | dep | active | + |------------------+----------|------------|--------| + | 1 | John | finance | true | + | 13 | Alice | finance | false | + | 13 | Melinda | it | true | + | 19 | Jack | finance | true | + +------------------+----------+------------+--------+ + + os> source=accounts | lookup hr employee_number AS account_number, dep AS department appendonly=true name AS given_name, active AS is_active ; + fetched rows / total rows = 2/2 + +------------------+----------+----------------+------------+-----------+ + | account_number | gender | given_name | department | is_active | + |------------------+----------|----------------|------------|-----------| + | 1 | M | John | finance | true | + | 13 | F | Melinda | it | true | + | 20 | M | NULL | it | true | + +------------------+----------+----------------+------------+-----------+ + + +Limitation +========== +The ``lookup`` command is not rewritten to OpenSearch DSL, it is only executed on the coordination node. diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java new file mode 100644 index 0000000000..cb05285ea3 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_WITH_NULL_VALUES; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +public class LookupCommandIT extends PPLIntegTestCase { + + @Override + public void init() throws IOException { + loadIndex(Index.BANK); + loadIndex(Index.BANK_WITH_NULL_VALUES); + } + + @Test + public void testLookup() throws IOException { + JSONObject result = executeQuery(String.format("source=%s | lookup %s male", TEST_INDEX_BANK)); + verifyDataRows(result, rows(true), rows(false)); + } + + @Test + public void testConsecutiveDedup() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | dedup male consecutive=true | fields male", TEST_INDEX_BANK)); + verifyDataRows(result, rows(true), rows(false), rows(true), rows(false)); + } + + @Test + public void testAllowMoreDuplicates() throws IOException { + JSONObject result = + executeQuery(String.format("source=%s | dedup 2 male | fields male", TEST_INDEX_BANK)); + verifyDataRows(result, rows(true), rows(true), rows(false), rows(false)); + } + + @Test + public void testKeepEmptyDedup() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | dedup balance keepempty=true | fields firstname, balance", + TEST_INDEX_BANK_WITH_NULL_VALUES)); + verifyDataRows( + result, + rows("Amber JOHnny", 39225), + rows("Hattie", null), + rows("Nanette", 32838), + rows("Dale", 4180), + rows("Elinor", null), + rows("Virginia", null), + rows("Dillard", 48086)); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java index 0905c2f4b4..cb86978168 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java @@ -5,8 +5,20 @@ package org.opensearch.sql.opensearch.executor.protector; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.monitor.ResourceMonitor; +import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.planner.physical.ADOperator; import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; @@ -16,6 +28,7 @@ import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; import org.opensearch.sql.planner.physical.LimitOperator; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.ProjectOperator; @@ -29,11 +42,16 @@ /** OpenSearch Execution Protector. */ @RequiredArgsConstructor +@AllArgsConstructor public class OpenSearchExecutionProtector extends ExecutionProtector { /** OpenSearch resource monitor. */ private final ResourceMonitor resourceMonitor; + @Nullable + /** OpenSearch client. Maybe null * */ + private OpenSearchClient openSearchClient; + public PhysicalPlan protect(PhysicalPlan physicalPlan) { return physicalPlan.accept(this, null); } @@ -116,6 +134,70 @@ public PhysicalPlan visitDedupe(DedupeOperator node, Object context) { node.getConsecutive()); } + @Override + public PhysicalPlan visitLookup(LookupOperator node, Object context) { + return new LookupOperator( + visitInput(node.getInput(), context), + node.getIndexName(), + node.getMatchFieldMap(), + node.getAppendOnly(), + node.getCopyFieldMap(), + lookup()); + } + + private BiFunction, Map> lookup() { + + if (openSearchClient == null) { + throw new RuntimeException( + "Can not perform lookup because openSearchClient was null. This is likely a bug."); + } + + return (indexName, inputMap) -> { + Map matchMap = (Map) inputMap.get("_match"); + Set copySet = (Set) inputMap.get("_copy"); + + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + + for (Map.Entry f : matchMap.entrySet()) { + BoolQueryBuilder orQueryBuilder = new BoolQueryBuilder(); + + // Todo: Search with term and a match query? Or terms only? + orQueryBuilder.should(new TermQueryBuilder(f.getKey(), f.getValue().toString())); + orQueryBuilder.should(new MatchQueryBuilder(f.getKey(), f.getValue().toString())); + orQueryBuilder.minimumShouldMatch(1); + + // filter is the same as "must" but ignores scoring + boolQueryBuilder.filter(orQueryBuilder); + } + + SearchResponse result = + openSearchClient + .getNodeClient() + .search( + new SearchRequest(indexName) + .source( + SearchSourceBuilder.searchSource() + .fetchSource( + copySet == null ? null : copySet.toArray(new String[0]), null) + .query(boolQueryBuilder) + .size(2))) + .actionGet(); + + int hits = result.getHits().getHits().length; + + if (hits == 0) { + // null indicates no hits for the lookup found + return null; + } + + if (hits != 1) { + throw new RuntimeException("too many hits for " + indexName + " (" + hits + ")"); + } + + return result.getHits().getHits()[0].getSourceAsMap(); + }; + } + @Override public PhysicalPlan visitWindow(WindowOperator node, Object context) { return new WindowOperator( diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java index 33a785c498..8789c79c30 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java @@ -69,8 +69,8 @@ public ResourceMonitor resourceMonitor(Settings settings) { } @Provides - public ExecutionProtector protector(ResourceMonitor resourceMonitor) { - return new OpenSearchExecutionProtector(resourceMonitor); + public ExecutionProtector protector(ResourceMonitor resourceMonitor, OpenSearchClient client) { + return new OpenSearchExecutionProtector(resourceMonitor, client); } @Provides diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 9f707c13cd..f9c8382004 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -57,6 +57,7 @@ NUM: 'NUM'; // ARGUMENT KEYWORDS KEEPEMPTY: 'KEEPEMPTY'; CONSECUTIVE: 'CONSECUTIVE'; +APPENDONLY: 'APPENDONLY'; DEDUP_SPLITVALUES: 'DEDUP_SPLITVALUES'; PARTITIONS: 'PARTITIONS'; ALLNUM: 'ALLNUM'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 5a9c179d1a..32fa477dfc 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -38,6 +38,7 @@ commands | renameCommand | statsCommand | dedupCommand + | lookupCommand | sortCommand | evalCommand | headCommand @@ -85,6 +86,18 @@ dedupCommand : DEDUP (number = integerLiteral)? fieldList (KEEPEMPTY EQUAL keepempty = booleanLiteral)? (CONSECUTIVE EQUAL consecutive = booleanLiteral)? ; +matchFieldWithOptAs + : orignalMatchField = fieldExpression (AS asMatchField = fieldExpression)? + ; + +copyFieldWithOptAs + : orignalCopyField = fieldExpression (AS asCopyField = fieldExpression)? + ; + +lookupCommand + : LOOKUP tableSource matchFieldWithOptAs (COMMA matchFieldWithOptAs)* (APPENDONLY EQUAL appendonly = booleanLiteral)? (copyFieldWithOptAs (COMMA copyFieldWithOptAs)*)* + ; + sortCommand : SORT sortbyClause ; @@ -832,6 +845,7 @@ keywordsCanBeId | RENAME | STATS | DEDUP + | LOOKUP | SORT | EVAL | HEAD diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 3c693fa0bd..30a448924e 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -12,6 +12,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldsCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FromClauseContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.HeadCommandContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.LookupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RareCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RenameCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SearchFilterFromContext; @@ -54,6 +55,7 @@ import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Project; @@ -212,6 +214,47 @@ public UnresolvedPlan visitDedupCommand(DedupCommandContext ctx) { return new Dedupe(ArgumentFactory.getArgumentList(ctx), getFieldList(ctx.fieldList())); } + /** Lookup command */ + @Override + public UnresolvedPlan visitLookupCommand(LookupCommandContext ctx) { + ArgumentFactory.getArgumentList(ctx); + ctx.tableSource(); + ctx.copyFieldWithOptAs(); + ctx.matchFieldWithOptAs(); + return new Lookup( + ctx.tableSource().tableQualifiedName().getText(), + ctx.matchFieldWithOptAs().stream() + .map( + ct -> + new Map( + evaluateFieldExpressionContext(ct.orignalMatchField), + evaluateFieldExpressionContext(ct.asMatchField, ct.orignalMatchField))) + .collect(Collectors.toList()), + ArgumentFactory.getArgumentList(ctx), + ctx.copyFieldWithOptAs().stream() + .map( + ct -> + new Map( + evaluateFieldExpressionContext(ct.orignalCopyField), + evaluateFieldExpressionContext(ct.asCopyField, ct.orignalCopyField))) + .collect(Collectors.toList())); + } + + private UnresolvedExpression evaluateFieldExpressionContext( + OpenSearchPPLParser.FieldExpressionContext f) { + return internalVisitExpression(f); + } + + private UnresolvedExpression evaluateFieldExpressionContext( + OpenSearchPPLParser.FieldExpressionContext f0, + OpenSearchPPLParser.FieldExpressionContext f1) { + if (f0 == null) { + return internalVisitExpression(f1); + } else { + return internalVisitExpression(f0); + } + } + /** Head command visitor. */ @Override public UnresolvedPlan visitHeadCommand(HeadCommandContext ctx) { diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java index f89ecf9c6e..44c90b4d89 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java @@ -9,6 +9,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.DedupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldsCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IntegerLiteralContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.LookupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RareCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SortFieldContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StatsCommandContext; @@ -80,6 +81,13 @@ public static List getArgumentList(DedupCommandContext ctx) { : new Argument("consecutive", new Literal(false, DataType.BOOLEAN))); } + public static List getArgumentList(LookupCommandContext ctx) { + return Arrays.asList( + ctx.appendonly != null + ? new Argument("appendonly", getArgumentValue(ctx.appendonly)) + : new Argument("appendonly", new Literal(false, DataType.BOOLEAN))); + } + /** * Get list of {@link Argument}. * diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index d28e5d122b..888fe5fc4b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -36,6 +36,7 @@ import org.opensearch.sql.ast.tree.Eval; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareTopN; import org.opensearch.sql.ast.tree.Relation; @@ -213,6 +214,41 @@ public String visitDedupe(Dedupe node, String context) { child, fields, allowedDuplication, keepEmpty, consecutive); } + @Override + public String visitLookup(Lookup node, String context) { + String child = node.getChild().get(0).accept(this, context); + String lookupIndexName = node.getIndexName(); + ImmutableMap.Builder matchMapBuilder = new ImmutableMap.Builder<>(); + for (Map matchMap : node.getMatchFieldList()) { + matchMapBuilder.put( + visitExpression(matchMap.getOrigin()), + ((Field) matchMap.getTarget()).getField().toString()); + } + String matches = + matchMapBuilder.build().entrySet().stream() + .map(entry -> StringUtils.format("%s as %s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",")); + + ImmutableMap.Builder copyMapBuilder = new ImmutableMap.Builder<>(); + for (Map copyMap : node.getCopyFieldList()) { + copyMapBuilder.put( + visitExpression(copyMap.getOrigin()), + ((Field) copyMap.getTarget()).getField().toString()); + } + String copies = + copyMapBuilder.build().entrySet().stream() + .map(entry -> StringUtils.format("%s as %s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",")); + + List options = node.getOptions(); + Boolean appendonly = (Boolean) options.get(0).getValue().getValue(); + + return StringUtils.format( + "%s | lookup %s %s appendonly=%b %s", + child, lookupIndexName, matches, appendonly, copies) + .trim(); + } + @Override public String visitHead(Head node, String context) { String child = node.getChild().get(0).accept(this, context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index c9989a49c4..6440ed49d0 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -21,11 +21,13 @@ import static org.opensearch.sql.ast.dsl.AstDSL.eval; import static org.opensearch.sql.ast.dsl.AstDSL.exprList; import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.fieldMap; import static org.opensearch.sql.ast.dsl.AstDSL.filter; import static org.opensearch.sql.ast.dsl.AstDSL.function; import static org.opensearch.sql.ast.dsl.AstDSL.head; import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.let; +import static org.opensearch.sql.ast.dsl.AstDSL.lookup; import static org.opensearch.sql.ast.dsl.AstDSL.map; import static org.opensearch.sql.ast.dsl.AstDSL.nullLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.parse; @@ -44,6 +46,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import java.util.Collections; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -378,6 +381,18 @@ public void testDedupCommandWithSortby() { defaultDedupArgs())); } + @Test + public void testLookupCommand() { + assertEqual( + "source=t | lookup a field", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("appendonly", booleanLiteral(false))), + Collections.emptyList())); + } + @Test public void testHeadCommand() { assertEqual("source=t | head", head(relation("t"), 10, 0)); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java index 7bcb87d193..769b6bd0a9 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java @@ -616,10 +616,9 @@ public void functionNameCanBeUsedAsIdentifier() { + " | LOG10 | LOG2 | MOD | PI |POW | POWER | RAND | ROUND | SIGN | SQRT | TRUNCATE " + "| ACOS | ASIN | ATAN | ATAN2 | COS | COT | DEGREES | RADIANS | SIN | TAN"); assertFunctionNameCouldBeId( - "SEARCH | DESCRIBE | SHOW | FROM | WHERE | FIELDS | RENAME | STATS " - + "| DEDUP | SORT | EVAL | HEAD | TOP | RARE | PARSE | METHOD | REGEX | PUNCT | GROK " - + "| PATTERN | PATTERNS | NEW_FIELD | KMEANS | AD | ML | SOURCE | INDEX | D | DESC " - + "| DATASOURCES"); + "SEARCH | DESCRIBE | SHOW | FROM | WHERE | FIELDS | RENAME | STATS | DEDUP | LOOKUP | SORT" + + " | EVAL | HEAD | TOP | RARE | PARSE | METHOD | REGEX | PUNCT | GROK | PATTERN |" + + " PATTERNS | NEW_FIELD | KMEANS | AD | ML | SOURCE | INDEX | D | DESC | DATASOURCES"); } void assertFunctionNameCouldBeId(String antlrFunctionName) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java index 761dbe2997..102f9591fe 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java @@ -14,12 +14,15 @@ import static org.opensearch.sql.ast.dsl.AstDSL.dedupe; import static org.opensearch.sql.ast.dsl.AstDSL.exprList; import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.fieldMap; import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; +import static org.opensearch.sql.ast.dsl.AstDSL.lookup; import static org.opensearch.sql.ast.dsl.AstDSL.projectWithArg; import static org.opensearch.sql.ast.dsl.AstDSL.relation; import static org.opensearch.sql.ast.dsl.AstDSL.sort; import static org.opensearch.sql.ast.dsl.AstDSL.stringLiteral; +import java.util.Collections; import org.junit.Test; import org.opensearch.sql.ppl.parser.AstBuilderTest; @@ -102,4 +105,53 @@ public void testSortFieldArgument() { public void testNoArgConstructorForArgumentFactoryShouldPass() { new ArgumentFactory(); } + + @Test + public void testLookupCommandRequiredArguments() { + assertEqual( + "source=t | lookup a field", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("appendonly", booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void testLookupCommandFieldArguments() { + assertEqual( + "source=t | lookup a field AS field1,field2 AS field3 destfield AS destfield1, destfield2" + + " AS destfield3", + lookup( + relation("t"), + "a", + fieldMap("field", "field1", "field2", "field3"), + exprList(argument("appendonly", booleanLiteral(false))), + fieldMap("destfield", "destfield1", "destfield2", "destfield3"))); + } + + @Test + public void testLookupCommandAppendTrueArgument() { + assertEqual( + "source=t | lookup a field appendonly=true", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("appendonly", booleanLiteral(true))), + Collections.emptyList())); + } + + @Test + public void testLookupCommandAppendFalseArgument() { + assertEqual( + "source=t | lookup a field appendonly=false", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("appendonly", booleanLiteral(false))), + Collections.emptyList())); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index cd51ea07df..88e6db1ab4 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -89,6 +89,22 @@ public void testDedupCommand() { anonymize("source=t | dedup f1, f2")); } + @Test + public void testLookupCommand() { + assertEquals( + "source=t | lookup index field1 as field1,field2 as field2 appendonly=false", + anonymize("source=t | lookup index field1,field2")); + assertEquals( + "source=t | lookup index field1 as field1,field2 as field2 appendonly=true", + anonymize("source=t | lookup index field1,field2 appendonly=true")); + assertEquals( + "source=t | lookup index field1 as field12,field2 as field22 appendonly=false copyfield1 as" + + " copyfield1,copyfield2 as copyfield22", + anonymize( + "source=t | lookup index field1 as field12, field2 AS field22 copyfield1, copyfield2 as" + + " copyfield22")); + } + @Test public void testHeadCommandWithNumber() { assertEquals("source=t | head 3", anonymize("source=t | head 3"));