Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lookup simplified analyze copy fields #2809

Prev Previous commit
Next Next commit
Tests related to lookup and Analyzer class.
Signed-off-by: Lukasz Soszynski <[email protected]>
lukasz-soszynski-eliatra authored and salyh committed Jun 28, 2024
commit 24cf6df3085cc5edf402364f29b1859d53a8a00f
39 changes: 25 additions & 14 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
@@ -512,8 +512,8 @@ public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) {

/** Build {@link LogicalLookup}. */
@Override
public LogicalPlan visitLookup(Lookup node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
public LogicalPlan visitLookup(Lookup node, AnalysisContext queryContext) {
LogicalPlan child = node.getChild().get(0).accept(this, queryContext);
List<Argument> options = node.getOptions();
// Todo, refactor the option.
Boolean appendOnly = (Boolean) options.get(0).getValue().getValue();
@@ -529,29 +529,40 @@ public LogicalPlan visitLookup(Lookup node, AnalysisContext context) {
String.format("no such lookup index %s", node.getIndexName()));
}

AnalysisContext lookupTableContext = new AnalysisContext();
TypeEnvironment lookupTableEnvironment = lookupTableContext.peek();
table
.getFieldTypes()
.forEach(
(name, type) ->
lookupTableEnvironment.define(new Symbol(Namespace.FIELD_NAME, name), type));
ImmutableMap<ReferenceExpression, ReferenceExpression> matchFieldMap =
analyzeLookupMatchFields(node.getMatchFieldList(), queryContext, lookupTableContext);

return new LogicalLookup(
child,
node.getIndexName(),
analyzeLookupMatchFields(node.getMatchFieldList(), context),
matchFieldMap,
appendOnly,
analyzeLookupCopyFields(node.getCopyFieldList(), context, table));
analyzeLookupCopyFields(node.getCopyFieldList(), queryContext, table));
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLookupMatchFields(
List<Map> inputMap, AnalysisContext context) {
List<Map> inputMap, AnalysisContext queryContext, AnalysisContext lookupTableContext) {
ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), context);
Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), lookupTableContext);
if (resultMap.getTarget() instanceof Field) {
ReferenceExpression target =
new ReferenceExpression(
((Field) resultMap.getTarget()).getField().toString(), origin.type());
ReferenceExpression originExpr = DSL.ref(origin.toString(), origin.type());
TypeEnvironment curEnv = context.peek();
curEnv.remove(originExpr);
curEnv.define(target);
copyMapBuilder.put(originExpr, target);
Expression targerExpression =
expressionAnalyzer.analyze(resultMap.getTarget(), queryContext);
ReferenceExpression targetReference =
DSL.ref(targerExpression.toString(), targerExpression.type());
ReferenceExpression originReference = DSL.ref(origin.toString(), origin.type());
TypeEnvironment curEnv = queryContext.peek();
curEnv.remove(originReference);
curEnv.define(targetReference);
copyMapBuilder.put(originReference, targetReference);
} else {
throw new SemanticCheckException(
String.format("the target expected to be field, but is %s", resultMap.getTarget()));
Original file line number Diff line number Diff line change
@@ -138,4 +138,13 @@ public LogicalPlan values(List<LiteralExpression>... values) {
public static LogicalPlan limit(LogicalPlan input, Integer limit, Integer offset) {
return new LogicalLimit(input, limit, offset);
}

public static LogicalPlan lookup(
LogicalPlan input,
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
boolean appendOnly,
Map<ReferenceExpression, ReferenceExpression> copyFields) {
return new LogicalLookup(input, indexName, matchFieldMap, appendOnly, copyFields);
}
}
348 changes: 348 additions & 0 deletions core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java
Original file line number Diff line number Diff line change
@@ -70,6 +70,8 @@
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.sql.ast.dsl.AstDSL;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.DataType;
@@ -82,11 +84,14 @@
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.RareTopN.CommandType;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
@@ -105,10 +110,94 @@
import org.opensearch.sql.planner.logical.LogicalPlanDSL;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.datasource.DataSourceTable;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.Table;

class AnalyzerTest extends AnalyzerTestBase {

private static final String LOOKUP_TABLE_DEVICE_NAMES = "device_names";
public static final String NO_SUCH_TABLE = "no_such_table";
public static final String TABLE_DOES_NOT_EXIST = "does_not_exist";

public static Map<String, ExprType> lookupTableFieldTypes =
new ImmutableMap.Builder<String, ExprType>()
.put("id", INTEGER)
.put("dev_id", STRING)
.put("serial_number", LONG)
.put("vendor", STRING)
.put("ip_v4", STRING)
.put("firmware_version", LONG)
.put("reliability_factor", DOUBLE)
.put("comment", ExprCoreType.STRING)
.build();

private final Table deviceNamesLookupTable;
private Table tableDoesNotExist;

public AnalyzerTest() {
this.deviceNamesLookupTable =
new Table() {
@Override
public boolean exists() {
return true;
}

@Override
public void create(Map<String, ExprType> schema) {
throw new UnsupportedOperationException("Create table is not supported");
}

@Override
public Map<String, ExprType> getFieldTypes() {
return lookupTableFieldTypes;
}

@Override
public PhysicalPlan implement(LogicalPlan plan) {
throw new UnsupportedOperationException();
}

public Map<String, ExprType> getReservedFieldTypes() {
return ImmutableMap.of("_test", STRING);
}
};
this.tableDoesNotExist =
new Table() {

@Override
public boolean exists() {
return false;
}

@Override
public Map<String, ExprType> getFieldTypes() {
return Map.of();
}

@Override
public PhysicalPlan implement(LogicalPlan plan) {
throw new UnsupportedOperationException();
}
};
}

protected StorageEngine storageEngine() {
return (dataSourceSchemaName, tableName) -> {
switch (tableName) {
case NO_SUCH_TABLE:
return null;
case TABLE_DOES_NOT_EXIST:
return tableDoesNotExist;
case LOOKUP_TABLE_DEVICE_NAMES:
return deviceNamesLookupTable;
default:
return table;
}
};
}

@Test
public void filter_relation() {
assertAnalyzeEqual(
@@ -1767,4 +1856,263 @@ public void visit_close_cursor() {
() ->
assertEquals("pewpew", ((LogicalFetchCursor) analyzed.getChild().get(0)).getCursor()));
}

@Test
public void visit_lookup_same_field_name_in_query_and_lookup_index() {
assertAnalyzeEqual(
LogicalPlanDSL.lookup(
LogicalPlanDSL.relation("schema", table),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableMap.of(
new ReferenceExpression("ip_v4", STRING),
new ReferenceExpression("field_value2", STRING)),
false,
Collections.emptyMap()),
AstDSL.lookup(
AstDSL.relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "field_value2")),
ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))),
Collections.emptyList()));
}

@Test
public void visit_lookup_use_multiple_fields() {
assertAnalyzeEqual(
LogicalPlanDSL.lookup(
LogicalPlanDSL.relation("schema", table),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableMap.of(
new ReferenceExpression("comment", STRING),
new ReferenceExpression("comment", STRING),
new ReferenceExpression("dev_id", STRING),
new ReferenceExpression("field_value1", STRING)),
false,
Collections.emptyMap()),
AstDSL.lookup(
AstDSL.relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(
AstDSL.map("comment", "comment"), AstDSL.map("dev_id", "field_value1")),
ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))),
Collections.emptyList()));
}

@Test
public void visit_lookup_various_field_name_in_query_and_lookup_index() {
assertAnalyzeEqual(
LogicalPlanDSL.lookup(
LogicalPlanDSL.relation("schema", table),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableMap.of(
new ReferenceExpression("dev_id", STRING),
new ReferenceExpression("comment", STRING)),
true,
Collections.emptyMap()),
AstDSL.lookup(
AstDSL.relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("dev_id", "comment")),
ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(true))),
Collections.emptyList()));
}

@ParameterizedTest
@ValueSource(strings = {NO_SUCH_TABLE, TABLE_DOES_NOT_EXIST})
public void visit_lookup_should_report_error_when_lookup_table_does_not_exist(String tableName) {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
tableName,
ImmutableList.of(AstDSL.map("string_value", "comment")),
ImmutableList.of(argument("appendonly", booleanLiteral(true))),
emptyList());

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void visit_lookup_should_report_error_when_join_field_does_not_exist() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("no_such_field", "comment")),
ImmutableList.of(argument("appendonly", booleanLiteral(true))),
emptyList());

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void visit_lookup_should_report_error_when_join_field_does_not_exist_in_lookup_table() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("no_such_field", "comment")),
ImmutableList.of(argument("appendonly", booleanLiteral(true))),
emptyList());

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void visit_lookup_should_report_error_when_join_field_does_not_exist_in_query_table() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "no_such_field")),
ImmutableList.of(argument("appendonly", booleanLiteral(true))),
emptyList());

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void
visit_lookup_should_error_when_join_field_does_not_exist_in_query_table_but_exist_lookup_table() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "dev_id")),
ImmutableList.of(argument("appendonly", booleanLiteral(true))),
emptyList());

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void
visit_lookup_should_error_when_join_field_does_not_exist_in_lookup_table_but_exist_query_table() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("string_value", "field_value2")),
ImmutableList.of(argument("appendonly", booleanLiteral(true))),
emptyList());

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void visit_lookup_should_report_error_when_target_expression_point_out_function() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map(field("ip_v4"), intLiteral(7))),
ImmutableList.of(argument("appendonly", booleanLiteral(false))),
emptyList());

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void visit_lookup_copy_field() {
assertAnalyzeEqual(
LogicalPlanDSL.lookup(
LogicalPlanDSL.relation("schema", table),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableMap.of(
new ReferenceExpression("ip_v4", STRING),
new ReferenceExpression("field_value2", STRING)),
false,
ImmutableMap.of(
new ReferenceExpression("vendor", STRING),
new ReferenceExpression("maker", STRING))),
AstDSL.lookup(
AstDSL.relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "field_value2")),
ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))),
ImmutableList.of(AstDSL.map("vendor", "maker"))));
}

@Test
public void visit_lookup_copy_multiple_fields() {
assertAnalyzeEqual(
LogicalPlanDSL.lookup(
LogicalPlanDSL.relation("schema", table),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableMap.of(
new ReferenceExpression("ip_v4", STRING),
new ReferenceExpression("field_value2", STRING)),
false,
ImmutableMap.of(
new ReferenceExpression("vendor", STRING),
new ReferenceExpression("maker", STRING),
new ReferenceExpression("serial_number", LONG),
new ReferenceExpression("serial", LONG))),
AstDSL.lookup(
AstDSL.relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "field_value2")),
ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))),
ImmutableList.of(
AstDSL.map("vendor", "maker"), AstDSL.map("serial_number", "serial"))));
}

@Test
public void visit_lookup_copy_field_when_origin_and_target_is_the_same() {
assertAnalyzeEqual(
LogicalPlanDSL.lookup(
LogicalPlanDSL.relation("schema", table),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableMap.of(
new ReferenceExpression("ip_v4", STRING),
new ReferenceExpression("field_value2", STRING)),
false,
ImmutableMap.of(
new ReferenceExpression("vendor", STRING),
new ReferenceExpression("vendor", STRING))),
AstDSL.lookup(
AstDSL.relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "field_value2")),
ImmutableList.of(AstDSL.argument("appendonly", AstDSL.booleanLiteral(false))),
ImmutableList.of(AstDSL.map("vendor", "vendor"))));
}

@Test
public void visit_lookup_should_report_error_when_copy_field_does_not_exist() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "field_value2")),
ImmutableList.of(argument("appendonly", booleanLiteral(false))),
ImmutableList.of(AstDSL.map("no_such_field", "maker")));

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void visit_lookup_should_report_error_when_copy_target_is_not_a_field() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "field_value2")),
ImmutableList.of(argument("appendonly", booleanLiteral(false))),
ImmutableList.of(AstDSL.map(AstDSL.field("vendor"), AstDSL.intLiteral(8))));

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}

@Test
public void visit_lookup_should_report_error_when_copy_source_is_not_a_field() {
Lookup lookup =
AstDSL.lookup(
relation("schema"),
LOOKUP_TABLE_DEVICE_NAMES,
ImmutableList.of(AstDSL.map("ip_v4", "field_value2")),
ImmutableList.of(argument("appendonly", booleanLiteral(false))),
ImmutableList.of(AstDSL.map(AstDSL.booleanLiteral(true), AstDSL.field("maker"))));

assertThrows(SemanticCheckException.class, () -> analyze(lookup));
}
}