From 9c731d9e8ea08d3048e3bf1308405d6c90679206 Mon Sep 17 00:00:00 2001 From: Lukasz Soszynski Date: Tue, 25 Jun 2024 18:49:30 +0200 Subject: [PATCH] Added more lookup operator tests Signed-off-by: Lukasz Soszynski --- .../org/opensearch/sql/executor/Explain.java | 4 +- .../sql/planner/physical/LookupOperator.java | 42 +- .../sql/planner/physical/PhysicalPlanDSL.java | 13 +- .../opensearch/sql/executor/ExplainTest.java | 28 + .../sql/planner/DefaultImplementorTest.java | 98 ++- .../logical/LogicalPlanNodeVisitorTest.java | 13 +- .../planner/physical/LookupOperatorTest.java | 669 ++++++++++++++++++ .../physical/PhysicalPlanNodeVisitorTest.java | 25 +- .../opensearch/sql/utils/MatcherUtils.java | 75 ++ 9 files changed, 895 insertions(+), 72 deletions(-) create mode 100644 core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java diff --git a/core/src/main/java/org/opensearch/sql/executor/Explain.java b/core/src/main/java/org/opensearch/sql/executor/Explain.java index d4b15a1cdc..bec6c5e47d 100644 --- a/core/src/main/java/org/opensearch/sql/executor/Explain.java +++ b/core/src/main/java/org/opensearch/sql/executor/Explain.java @@ -166,8 +166,8 @@ public ExplainResponseNode visitLookup(LookupOperator node, Object context) { explainNode -> explainNode.setDescription( ImmutableMap.of( - "copyfields", node.getCopyFieldMap(), - "matchfields", node.getMatchFieldMap(), + "copyfields", node.getCopyFieldMap().toString(), + "matchfields", node.getMatchFieldMap().toString(), "indexname", node.getIndexName(), "appendonly", node.getAppendOnly()))); } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java index 77e7149c7e..7117d87f5d 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java @@ -15,6 +15,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; +import lombok.ToString; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; @@ -22,6 +23,7 @@ /** Lookup operator. Perform lookup on another OpenSearch index and enrich the results. */ @Getter +@ToString @EqualsAndHashCode(callSuper = false) public class LookupOperator extends PhysicalPlan { @Getter private final PhysicalPlan input; @@ -29,6 +31,8 @@ public class LookupOperator extends PhysicalPlan { @Getter private final Map matchFieldMap; @Getter private final Map copyFieldMap; @Getter private final Boolean appendOnly; + + @EqualsAndHashCode.Exclude private final BiFunction, Map> lookup; /** Lookup Constructor. */ @@ -96,37 +100,29 @@ public ExprValue next() { finalMap.put("_copy", copyMap.keySet()); } - Map source = lookup.apply(indexName, finalMap); + Map lookupResult = lookup.apply(indexName, finalMap); - if (source == null || source.isEmpty()) { + if (lookupResult == null || lookupResult.isEmpty()) { // no lookup found or lookup is empty, so we just return the original input value return inputValue; } - Map tupleValue = ExprValueUtils.getTupleValue(inputValue); - Map resultBuilder = new HashMap<>(); - resultBuilder.putAll(tupleValue); - - if (appendOnly) { - - for (Map.Entry sourceField : source.entrySet()) { - String u = copyMap.get(sourceField.getKey()); - resultBuilder.putIfAbsent( - u == null ? sourceField.getKey() : u.toString(), - ExprValueUtils.fromObjectValue(sourceField.getValue())); - } - } else { - // default - - for (Map.Entry sourceField : source.entrySet()) { - String u = copyMap.get(sourceField.getKey()); - resultBuilder.put( - u == null ? sourceField.getKey() : u.toString(), - ExprValueUtils.fromObjectValue(sourceField.getValue())); + Map tupleInputValue = ExprValueUtils.getTupleValue(inputValue); + Map resultTupleBuilder = new HashMap<>(); + resultTupleBuilder.putAll(tupleInputValue); + for (Map.Entry sourceOfAdditionalField : lookupResult.entrySet()) { + String lookedUpFieldName = sourceOfAdditionalField.getKey(); + Object lookedUpFieldValue = sourceOfAdditionalField.getValue(); + String finalFieldName = copyMap.getOrDefault(lookedUpFieldName, lookedUpFieldName); + ExprValue value = ExprValueUtils.fromObjectValue(lookedUpFieldValue); + if (appendOnly) { + resultTupleBuilder.putIfAbsent(finalFieldName, value); + } else { + resultTupleBuilder.put(finalFieldName, value); } } - return ExprTupleValue.fromExprValueMap(resultBuilder); + return ExprTupleValue.fromExprValueMap(resultTupleBuilder); } else { return inputValue; diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java index 2ab3f08106..9649b620ed 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import lombok.experimental.UtilityClass; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -83,16 +84,10 @@ public static LookupOperator lookup( String indexName, Map matchFieldMap, Boolean appendOnly, - Map copyFieldMap) { + Map copyFieldMap, + BiFunction, Map> lookupFunction) { return new LookupOperator( - input, - indexName, - matchFieldMap, - appendOnly, - copyFieldMap, - (a, b) -> { - throw new RuntimeException("not implemented by PhysicalPlanDSL"); - }); + input, indexName, matchFieldMap, appendOnly, copyFieldMap, lookupFunction); } public WindowOperator window( diff --git a/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java b/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java index 897347f22d..7c59a27f23 100644 --- a/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java @@ -21,6 +21,7 @@ import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.eval; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.filter; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.limit; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.lookup; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.nested; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.project; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rareTopN; @@ -235,6 +236,33 @@ void can_explain_nested() { explain.apply(plan)); } + @Test + void can_explain_lookup() { + PhysicalPlan plan = + lookup( + tableScan, + "lookup_index_name", + Map.of(ref("lookup_index_field", STRING), ref("query_index_field", STRING)), + true, + Map.of(ref("lookup_index_field_name", STRING), ref("renamed_field", STRING)), + null); + assertEquals( + new ExplainResponse( + new ExplainResponseNode( + "LookupOperator", + Map.of( + "copyfields", + "{lookup_index_field_name=renamed_field}", + "matchfields", + "{lookup_index_field=query_index_field}", + "indexname", + "lookup_index_name", + "appendonly", + true), + singletonList(tableScan.explainNode()))), + explain.apply(plan)); + } + private static class FakeTableScan extends TableScanOperator { @Override public boolean hasNext() { diff --git a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java index 45d8f6c03c..423163dc0e 100644 --- a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java @@ -21,6 +21,7 @@ import static org.opensearch.sql.planner.logical.LogicalPlanDSL.eval; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.limit; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.lookup; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.nested; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.rareTopN; @@ -122,22 +123,27 @@ public void visit_should_return_default_physical_operator() { nested( limit( LogicalPlanDSL.dedupe( - rareTopN( - sort( - eval( - remove( - rename( - aggregation( - filter(values(emptyList()), filterExpr), - aggregators, - groupByExprs), - mappings), - exclude), - newEvalField), - sortField), - CommandType.TOP, - topByExprs, - rareTopNField), + lookup( + rareTopN( + sort( + eval( + remove( + rename( + aggregation( + filter(values(emptyList()), filterExpr), + aggregators, + groupByExprs), + mappings), + exclude), + newEvalField), + sortField), + CommandType.TOP, + topByExprs, + rareTopNField), + "lookup_index_name", + Map.of(), + false, + Map.of()), dedupeField), limit, offset), @@ -152,24 +158,30 @@ public void visit_should_return_default_physical_operator() { PhysicalPlanDSL.nested( PhysicalPlanDSL.limit( PhysicalPlanDSL.dedupe( - PhysicalPlanDSL.rareTopN( - PhysicalPlanDSL.sort( - PhysicalPlanDSL.eval( - PhysicalPlanDSL.remove( - PhysicalPlanDSL.rename( - PhysicalPlanDSL.agg( - PhysicalPlanDSL.filter( - PhysicalPlanDSL.values(emptyList()), - filterExpr), - aggregators, - groupByExprs), - mappings), - exclude), - newEvalField), - sortField), - CommandType.TOP, - topByExprs, - rareTopNField), + PhysicalPlanDSL.lookup( + PhysicalPlanDSL.rareTopN( + PhysicalPlanDSL.sort( + PhysicalPlanDSL.eval( + PhysicalPlanDSL.remove( + PhysicalPlanDSL.rename( + PhysicalPlanDSL.agg( + PhysicalPlanDSL.filter( + PhysicalPlanDSL.values(emptyList()), + filterExpr), + aggregators, + groupByExprs), + mappings), + exclude), + newEvalField), + sortField), + CommandType.TOP, + topByExprs, + rareTopNField), + "lookup_index_name", + Map.of(), + false, + Map.of(), + null), dedupeField), limit, offset), @@ -278,4 +290,22 @@ public void visitPaginate_should_remove_it_from_tree() { new ProjectOperator(new ValuesOperator(List.of(List.of())), List.of(), List.of()); assertEquals(physicalPlanTree, logicalPlanTree.accept(implementor, null)); } + + @Test + public void visitLookup_should_build_LookupOperator() { + LogicalPlan values = values(List.of(DSL.literal("to be or not to be"))); + var logicalPlan = lookup(values, "lookup_index_name", Map.of(), false, Map.of()); + var expectedPhysicalPlan = + PhysicalPlanDSL.lookup( + new ValuesOperator(List.of(List.of(DSL.literal("to be or not to be")))), + "lookup_index_name", + Map.of(), + false, + Map.of(), + null); + + PhysicalPlan lookupOperator = logicalPlan.accept(implementor, null); + + assertEquals(expectedPhysicalPlan, lookupOperator); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java index f212749f48..f5719fa5c8 100644 --- a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java @@ -125,6 +125,7 @@ public TableWriteOperator build(PhysicalPlan child) { LogicalPlan ad = new LogicalAD(relation, Map.of()); LogicalPlan ml = new LogicalML(relation, Map.of()); LogicalPlan paginate = new LogicalPaginate(42, List.of(relation)); + LogicalPlan lookup = new LogicalLookup(relation, "lookup_index", Map.of(), true, Map.of()); List> nestedArgs = List.of( @@ -163,7 +164,8 @@ public TableWriteOperator build(PhysicalPlan child) { paginate, nested, cursor, - closeCursor) + closeCursor, + lookup) .map(Arguments::of); } @@ -214,5 +216,14 @@ public Integer visitRareTopN(LogicalRareTopN plan, Object context) { .mapToInt(Integer::intValue) .sum(); } + + @Override + public Integer visitLookup(LogicalLookup plan, Object context) { + return 1 + + plan.getChild().stream() + .map(child -> child.accept(this, context)) + .mapToInt(Integer::intValue) + .sum(); + } } } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java new file mode 100644 index 0000000000..2da1811e86 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java @@ -0,0 +1,669 @@ +package org.opensearch.sql.planner.physical; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.utils.MatcherUtils.containsNull; +import static org.opensearch.sql.utils.MatcherUtils.containsValue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.expression.ReferenceExpression; + +@ExtendWith(MockitoExtension.class) +public class LookupOperatorTest extends PhysicalPlanTestBase { + + public static final String LOOKUP_INDEX = "lookup_index"; + + public static final ImmutableList> LOOKUP_TABLE = + ImmutableList.of( + ImmutableMap.of( + "id", 1, "ip", "v4", "ip_v4", "112.111.162.4", "region", "USA", "class", "A"), + ImmutableMap.of( + "id", 2, "ip", "4", "ip_v4", "74.125.19.106", "region", "EU", "class", "A")); + + public static final ImmutableList> LOOKUP_TABLE_WITH_NULLS = + ImmutableList.of( + new HashMap<>( + ImmutableMap.of( + "id", 1, "ip", "v4", "ip_v4", "112.111.162.4", "region", "USA", "class", "A")) { + { + put("class", null); + } + }, + ImmutableMap.of( + "id", 2, "ip", "4", "ip_v4", "74.125.19.106", "region", "EU", "class", "A")); + + @Mock private BiFunction, Map> lookupFunction; + + @Test + public void lookup_empty_table() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip", Collections.emptyList())); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "112.111.162.4", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "74.125.19.106", + "action", + "POST", + "response", + 200, + "referer", + "www.google.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of("ip", "74.125.19.106", "action", "POST", "response", 500)))); + } + + @Test + public void lookup_append_only_true() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + } + + @Test + public void lookup_append_only_false() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + false, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "v4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "4"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "4"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU")))); + } + + @Test + public void lookup_copy_one_field() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of( + new ReferenceExpression("class", STRING), + new ReferenceExpression("ip_address_class", STRING)), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("ip_address_class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("ip_address_class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("ip_address_class", "A")))); + } + + @Test + public void lookup_copy_multiple_fields() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of( + new ReferenceExpression("class", STRING), + new ReferenceExpression("class", STRING), + new ReferenceExpression("id", INTEGER), + new ReferenceExpression("address_id", INTEGER)), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("class", "A"), + containsValue("address_id", 1)))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("class", "A"), + containsValue("address_id", 2)))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("class", "A"), + containsValue("address_id", 2)))); + } + + @Test + public void lookup_empty_join_field() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + List queryResults = + new ImmutableList.Builder() + .addAll(inputs) + .add( + ExprValueUtils.tupleValue( + ImmutableMap.of("action", "GET", "response", 200, "referer", "www.amazon.com"))) + .build(); + PhysicalPlan plan = + new LookupOperator( + testScan(queryResults), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(6)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of("action", "GET", "response", 200, "referer", "www.amazon.com")))); + } + + @Test + public void lookup_ignore_non_struct_input() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + ExprStringValue stringExpression = + new ExprStringValue("Expression of string type should be ignored"); + List queryResults = + new ImmutableList.Builder().addAll(inputs).add(stringExpression).build(); + PhysicalPlan plan = + new LookupOperator( + testScan(queryResults), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(6)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat(result, hasItem(stringExpression)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void lookup_table_with_nulls(boolean appendOnly) { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE_WITH_NULLS)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + appendOnly, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsNull("class")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + } + + private static @NotNull Answer> lookupTableQueryResults( + String lookupTableFieldName, List> lookupTableContent) { + return invocationOnMock -> { + String lookupTableName = invocationOnMock.getArgument(0); + if (!LOOKUP_INDEX.equals(lookupTableName)) { + return ImmutableMap.of(); + } + HashMap> parameters = invocationOnMock.getArgument(1); + String valueOfJoinFieldInLookupTable = + (String) parameters.get("_match").get(lookupTableFieldName); + if (Objects.isNull(valueOfJoinFieldInLookupTable)) { + return null; + } + return lookupTableContent.stream() + .filter(map -> valueOfJoinFieldInLookupTable.equals(map.get(lookupTableFieldName))) + .findAny() + .orElse(ImmutableMap.of()); + }; + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java index c91ae8787c..de91f93b9a 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java @@ -17,6 +17,7 @@ import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.eval; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.filter; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.limit; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.lookup; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.project; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rareTopN; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.remove; @@ -64,7 +65,16 @@ public void print_physical_plan() { agg( rareTopN( filter( - limit(new TestScan(), 1, 1), + limit( + lookup( + new TestScan(), + "lookup_index", + Map.of(), + true, + Map.of(), + null), + 1, + 1), DSL.equal(DSL.ref("response", INTEGER), DSL.literal(10))), CommandType.TOP, ImmutableList.of(), @@ -84,7 +94,8 @@ public void print_physical_plan() { + "\t\t\tAggregation->\n" + "\t\t\t\tRareTopN->\n" + "\t\t\t\t\tFilter->\n" - + "\t\t\t\t\t\tLimit->", + + "\t\t\t\t\t\tLimit->\n" + + "\t\t\t\t\t\t\tLookup->", printer.print(plan)); } @@ -131,6 +142,8 @@ public static Stream getPhysicalPlanForTest() { PhysicalPlan cursorClose = new CursorCloseOperator(plan); + PhysicalPlan lookup = lookup(plan, "lookup_index", Map.of(), false, Map.of(), null); + return Stream.of( Arguments.of(filter, "filter"), Arguments.of(aggregation, "aggregation"), @@ -145,7 +158,8 @@ public static Stream getPhysicalPlanForTest() { Arguments.of(rareTopN, "rareTopN"), Arguments.of(limit, "limit"), Arguments.of(nested, "nested"), - Arguments.of(cursorClose, "cursorClose")); + Arguments.of(cursorClose, "cursorClose"), + Arguments.of(lookup, "Lookup")); } @ParameterizedTest(name = "{1}") @@ -219,6 +233,11 @@ public String visitLimit(LimitOperator node, Integer tabs) { return name(node, "Limit->", tabs); } + @Override + public String visitLookup(LookupOperator node, Integer tabs) { + return name(node, "Lookup->", tabs); + } + private String name(PhysicalPlan node, String current, int tabs) { String child = node.getChild().get(0).accept(this, tabs + 1); StringBuilder sb = new StringBuilder(); diff --git a/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java b/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java index 206f05a38a..6b0d5e2ac7 100644 --- a/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java +++ b/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java @@ -5,7 +5,9 @@ package org.opensearch.sql.utils; +import java.util.Objects; import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; @@ -41,4 +43,77 @@ protected boolean matchesSafely(ExprValue value) { } }; } + + public static TypeSafeDiagnosingMatcher containsValue(String key, Object value) { + Objects.requireNonNull(key, "Key is required"); + Objects.requireNonNull(value, "Value is required"); + return new TypeSafeDiagnosingMatcher<>() { + + @Override + protected boolean matchesSafely(ExprValue item, Description mismatchDescription) { + ExprValue expressionForKey = item.keyValue(key); + if (Objects.isNull(expressionForKey)) { + mismatchDescription + .appendValue(item) + .appendText(" does not contain key ") + .appendValue(key); + return false; + } + Object givenValue = expressionForKey.value(); + if (value.equals(givenValue)) { + return true; + } + mismatchDescription + .appendText(" value for key ") + .appendValue(key) + .appendText(" was ") + .appendValue(givenValue); + return false; + } + + @Override + public void describeTo(Description description) { + description + .appendText("ExprValue should contain key ") + .appendValue(key) + .appendText(" with string value ") + .appendValue(value); + } + }; + } + + public static TypeSafeDiagnosingMatcher containsNull(String key) { + Objects.requireNonNull(key, "Key is required"); + return new TypeSafeDiagnosingMatcher<>() { + + @Override + protected boolean matchesSafely(ExprValue item, Description mismatchDescription) { + ExprValue expressionForKey = item.keyValue(key); + if (Objects.isNull(expressionForKey)) { + mismatchDescription + .appendValue(item) + .appendText(" does not contain key ") + .appendValue(key); + return false; + } + if (expressionForKey.isNull()) { + return true; + } + mismatchDescription + .appendText(" value for key ") + .appendValue(key) + .appendText(" was ") + .appendValue(expressionForKey.value()); + return false; + } + + @Override + public void describeTo(Description description) { + description + .appendText("ExprValue should contain key ") + .appendValue(key) + .appendText(" with null value "); + } + }; + } }