Skip to content

Commit

Permalink
ESQL: Small LOOKUP JOIN cleanups (elastic#117922)
Browse files Browse the repository at this point in the history
* Simplify InsertFieldExtraction
* Fix wrong error message when looping ResolveRefs
* Add basic verification tests
* Add check for data type mismatch
  • Loading branch information
alex-spies authored Dec 5, 2024
1 parent 14d9e7c commit e4defca
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ left:keyword | client_ip:keyword | right:keyword | env:keyword
left | 172.21.0.5 | right | Development
;

lookupIPFromRowWithShadowingKeepReordered
required_capability: join_lookup_v4

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
| EVAL client_ip = client_ip::keyword
| LOOKUP JOIN clientips_lookup ON client_ip
| KEEP right, env, client_ip
;

right:keyword | env:keyword | client_ip:keyword
right | Development | 172.21.0.5
;

lookupIPFromIndex
required_capability: join_lookup_v4

Expand Down Expand Up @@ -263,6 +276,24 @@ ignoreOrder:true;
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success
;

lookupMessageFromIndexKeepReordered
required_capability: join_lookup_v4

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
| KEEP type, client_ip, event_duration, message
;

type:keyword | client_ip:ip | event_duration:long | message:keyword
Success | 172.21.3.15 | 1756467 | Connected to 10.1.0.1
Error | 172.21.3.15 | 5033755 | Connection error
Error | 172.21.3.15 | 8268153 | Connection error
Error | 172.21.3.15 | 725448 | Connection error
Disconnected | 172.21.0.5 | 1232382 | Disconnected
Success | 172.21.2.113 | 2764889 | Connected to 10.1.0.2
Success | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
;

lookupMessageFromIndexStats
required_capability: join_lookup_v4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,10 @@ private Join resolveLookupJoin(LookupJoin join) {

config = new JoinConfig(coreJoin, leftKeys, leftKeys, rightKeys);
join = new LookupJoin(join.source(), join.left(), join.right(), config);
}
// everything else is unsupported for now
else {
} else if (type != JoinTypes.LEFT) {
// everything else is unsupported for now
// LEFT can only happen by being mapped from a USING above. So we need to exclude this as well because this rule can be run
// more than once.
UnresolvedAttribute errorAttribute = new UnresolvedAttribute(join.source(), "unsupported", "Unsupported join type");
// add error message
return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList()));
Expand All @@ -651,7 +652,7 @@ private List<Attribute> resolveUsingColumns(List<Attribute> cols, List<Attribute
if (resolvedCol instanceof UnresolvedAttribute ucol) {
String message = ua.unresolvedMessage();
String match = "column [" + ucol.name() + "]";
resolvedCol = ucol.withUnresolvedMessage(message.replace(match, match + "in " + side + " side of join"));
resolvedCol = ucol.withUnresolvedMessage(message.replace(match, match + " in " + side + " side of join"));
}
resolved.add(resolvedCol);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public AnalyzerContext(
IndexResolution indexResolution,
EnrichResolution enrichResolution
) {
this(configuration, functionRegistry, indexResolution, IndexResolution.invalid("<none>"), enrichResolution);
this(
configuration,
functionRegistry,
indexResolution,
IndexResolution.invalid("AnalyzerContext constructed without any lookup join resolution"),
enrichResolution
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.analysis;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.common.Failure;
Expand Down Expand Up @@ -55,7 +54,8 @@
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.stats.FeatureMetric;
import org.elasticsearch.xpack.esql.stats.Metrics;

Expand Down Expand Up @@ -172,20 +172,6 @@ else if (p instanceof Lookup lookup) {
else {
lookup.matchFields().forEach(unresolvedExpressions);
}
} else if (p instanceof LookupJoin lj) {
// expect right side to always be a lookup index
lj.right().forEachUp(EsRelation.class, r -> {
if (r.indexMode() != IndexMode.LOOKUP) {
failures.add(
fail(
r,
"LOOKUP JOIN right side [{}] must be a lookup index (index_mode=lookup, not [{}]",
r.index().name(),
r.indexMode().getName()
)
);
}
});
}

else {
Expand Down Expand Up @@ -217,6 +203,7 @@ else if (p instanceof Lookup lookup) {
checkSort(p, failures);

checkFullTextQueryFunctions(p, failures);
checkJoin(p, failures);
});
checkRemoteEnrich(plan, failures);
checkMetadataScoreNameReserved(plan, failures);
Expand Down Expand Up @@ -791,6 +778,35 @@ private static void checkNotPresentInDisjunctions(
});
}

/**
* Checks Joins for invalid usage.
*
* @param plan root plan to check
* @param failures failures found
*/
private static void checkJoin(LogicalPlan plan, Set<Failure> failures) {
if (plan instanceof Join join) {
JoinConfig config = join.config();
for (int i = 0; i < config.leftFields().size(); i++) {
Attribute leftField = config.leftFields().get(i);
Attribute rightField = config.rightFields().get(i);
if (leftField.dataType() != rightField.dataType()) {
failures.add(
fail(
leftField,
"JOIN left field [{}] of type [{}] is incompatible with right field [{}] of type [{}]",
leftField.name(),
leftField.dataType(),
rightField.name(),
rightField.dataType()
)
);
}
}

}
}

/**
* Checks full text query functions for invalid usage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.TypedAttribute;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.LeafExec;
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.rule.Rule;

Expand Down Expand Up @@ -102,25 +100,17 @@ private static Set<Attribute> missingAttributes(PhysicalPlan p) {
var missing = new LinkedHashSet<Attribute>();
var input = p.inputSet();

// For LOOKUP JOIN we only need field-extraction on left fields used to match, since the right side is always materialized
if (p instanceof LookupJoinExec join) {
join.leftFields().forEach(f -> {
if (input.contains(f) == false) {
missing.add(f);
}
});
return missing;
}

// collect field attributes used inside expressions
// TODO: Rather than going over all expressions manually, this should just call .references()
p.forEachExpression(TypedAttribute.class, f -> {
// Collect field attributes referenced by this plan but not yet present in the child's output.
// This is also correct for LookupJoinExec, where we only need field extraction on the left fields used to match, since the right
// side is always materialized.
p.references().forEach(f -> {
if (f instanceof FieldAttribute || f instanceof MetadataAttribute) {
if (input.contains(f) == false) {
missing.add(f);
}
}
});

return missing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -565,21 +565,12 @@ private PhysicalOperation planHashJoin(HashJoinExec join, LocalExecutionPlannerC

private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlannerContext context) {
PhysicalOperation source = plan(join.left(), context);
// TODO: The source builder includes incoming fields including the ones we're going to drop
Layout.Builder layoutBuilder = source.layout.builder();
for (Attribute f : join.addedFields()) {
layoutBuilder.append(f);
}
Layout layout = layoutBuilder.build();

// TODO: this works when the join happens on the coordinator
/*
* But when it happens on the data node we get a
* \_FieldExtractExec[language_code{f}#15, language_name{f}#16]<[]>
* \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#18], limit[], sort[] estimatedRowSize[62]
* Which we'd prefer not to do - at least for now. We already know the fields we're loading
* and don't want any local planning.
*/
EsQueryExec localSourceExec = (EsQueryExec) join.lookup();
if (localSourceExec.indexMode() != IndexMode.LOOKUP) {
throw new IllegalArgumentException("can't plan [" + join + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,11 @@ private void preAnalyzeLookupIndices(List<TableInfo> indices, ListenerResult lis
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
indexResolver.resolveAsMergedMapping(
table.index(),
Set.of("*"), // Current LOOKUP JOIN syntax does not allow for field selection
Set.of("*"), // TODO: for LOOKUP JOIN, this currently declares all lookup index fields relevant and might fetch too many.
null,
listener.map(indexResolution -> listenerResult.withLookupIndexResolution(indexResolution))
);
// TODO: Verify that the resolved index actually has indexMode: "lookup"
} else {
try {
// No lookup indices specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static Analyzer defaultAnalyzer() {
}

public static Analyzer expandedDefaultAnalyzer() {
return analyzer(analyzerExpandedDefaultMapping());
return analyzer(expandedDefaultIndexResolution());
}

public static Analyzer analyzer(IndexResolution indexResolution) {
Expand All @@ -47,18 +47,33 @@ public static Analyzer analyzer(IndexResolution indexResolution) {

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
return new Analyzer(
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), indexResolution, defaultEnrichResolution()),
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution()
),
verifier
);
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
return new Analyzer(new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultEnrichResolution()), verifier);
return new Analyzer(
new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultLookupResolution(), defaultEnrichResolution()),
verifier
);
}

public static Analyzer analyzer(Verifier verifier) {
return new Analyzer(
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), analyzerDefaultMapping(), defaultEnrichResolution()),
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
analyzerDefaultMapping(),
defaultLookupResolution(),
defaultEnrichResolution()
),
verifier
);
}
Expand Down Expand Up @@ -98,10 +113,14 @@ public static IndexResolution analyzerDefaultMapping() {
return loadMapping("mapping-basic.json", "test");
}

public static IndexResolution analyzerExpandedDefaultMapping() {
public static IndexResolution expandedDefaultIndexResolution() {
return loadMapping("mapping-default.json", "test");
}

public static IndexResolution defaultLookupResolution() {
return loadMapping("mapping-languages.json", "languages_lookup");
}

public static EnrichResolution defaultEnrichResolution() {
EnrichResolution enrichResolution = new EnrichResolution();
loadEnrichPolicyResolution(enrichResolution, MATCH_TYPE, "languages", "language_code", "languages_idx", "mapping-languages.json");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.LoadMapping;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
Expand Down Expand Up @@ -73,6 +74,8 @@
import static org.elasticsearch.xpack.esql.analysis.Analyzer.NO_FIELDS;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzerDefaultMapping;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultEnrichResolution;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.tsdbIndexResolution;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
Expand All @@ -83,6 +86,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.matchesRegex;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

//@TestLogging(value = "org.elasticsearch.xpack.esql.analysis:TRACE", reason = "debug")
Expand Down Expand Up @@ -2002,6 +2006,58 @@ public void testLookupMatchTypeWrong() {
assertThat(e.getMessage(), containsString("column type mismatch, table column was [integer] and original column was [keyword]"));
}

public void testLookupJoinUnknownIndex() {
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());

String errorMessage = "Unknown index [foobar]";
IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage);

Analyzer analyzerMissingLookupIndex = new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
analyzerDefaultMapping(),
missingLookupIndex,
defaultEnrichResolution()
),
TEST_VERIFIER
);

String query = "FROM test | LOOKUP JOIN foobar ON last_name";

VerificationException e = expectThrows(VerificationException.class, () -> analyze(query, analyzerMissingLookupIndex));
assertThat(e.getMessage(), containsString("1:25: " + errorMessage));

String query2 = "FROM test | LOOKUP JOIN foobar ON missing_field";

e = expectThrows(VerificationException.class, () -> analyze(query2, analyzerMissingLookupIndex));
assertThat(e.getMessage(), containsString("1:25: " + errorMessage));
assertThat(e.getMessage(), not(containsString("[missing_field]")));
}

public void testLookupJoinUnknownField() {
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());

String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name";
String errorMessage = "1:45: Unknown column [last_name] in right side of join";

VerificationException e = expectThrows(VerificationException.class, () -> analyze(query));
assertThat(e.getMessage(), containsString(errorMessage));

String query2 = "FROM test | LOOKUP JOIN languages_lookup ON language_code";
String errorMessage2 = "1:45: Unknown column [language_code] in left side of join";

e = expectThrows(VerificationException.class, () -> analyze(query2));
assertThat(e.getMessage(), containsString(errorMessage2));

String query3 = "FROM test | LOOKUP JOIN languages_lookup ON missing_altogether";
String errorMessage3 = "1:45: Unknown column [missing_altogether] in ";

e = expectThrows(VerificationException.class, () -> analyze(query3));
assertThat(e.getMessage(), containsString(errorMessage3 + "left side of join"));
assertThat(e.getMessage(), containsString(errorMessage3 + "right side of join"));
}

public void testImplicitCasting() {
var e = expectThrows(VerificationException.class, () -> analyze("""
from test | eval x = concat("2024", "-04", "-01") + 1 day
Expand Down
Loading

0 comments on commit e4defca

Please sign in to comment.