From e4defcaec3f2433fda2d0187543417e679570b37 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Thu, 5 Dec 2024 11:43:56 +0100 Subject: [PATCH] ESQL: Small LOOKUP JOIN cleanups (#117922) * Simplify InsertFieldExtraction * Fix wrong error message when looping ResolveRefs * Add basic verification tests * Add check for data type mismatch --- .../src/main/resources/lookup-join.csv-spec | 31 ++++++++++ .../xpack/esql/analysis/Analyzer.java | 9 +-- .../xpack/esql/analysis/AnalyzerContext.java | 8 ++- .../xpack/esql/analysis/Verifier.java | 48 ++++++++++------ .../physical/local/InsertFieldExtraction.java | 20 ++----- .../esql/planner/LocalExecutionPlanner.java | 9 --- .../xpack/esql/session/EsqlSession.java | 3 +- .../esql/analysis/AnalyzerTestUtils.java | 29 ++++++++-- .../xpack/esql/analysis/AnalyzerTests.java | 56 +++++++++++++++++++ .../xpack/esql/analysis/VerifierTests.java | 11 ++++ 10 files changed, 173 insertions(+), 51 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 712cadf6d44f..584cde55080e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -120,6 +120,19 @@ left:keyword | client_ip:keyword | right:keyword | env:keyword left | 172.21.0.5 | right | Development ; +lookupIPFromRowWithShadowingKeepReordered +required_capability: join_lookup_v4 + +ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" +| EVAL client_ip = client_ip::keyword +| LOOKUP JOIN clientips_lookup ON client_ip +| KEEP right, env, client_ip +; + +right:keyword | env:keyword | client_ip:keyword +right | Development | 172.21.0.5 +; + lookupIPFromIndex required_capability: join_lookup_v4 @@ -263,6 +276,24 @@ ignoreOrder:true; 2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success ; +lookupMessageFromIndexKeepReordered +required_capability: join_lookup_v4 + +FROM sample_data +| LOOKUP JOIN message_types_lookup ON message +| KEEP type, client_ip, event_duration, message +; + +type:keyword | client_ip:ip | event_duration:long | message:keyword +Success | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 +Error | 172.21.3.15 | 5033755 | Connection error +Error | 172.21.3.15 | 8268153 | Connection error +Error | 172.21.3.15 | 725448 | Connection error +Disconnected | 172.21.0.5 | 1232382 | Disconnected +Success | 172.21.2.113 | 2764889 | Connected to 10.1.0.2 +Success | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 +; + lookupMessageFromIndexStats required_capability: join_lookup_v4 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index b847508d2b16..cf91c7df9a03 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -633,9 +633,10 @@ private Join resolveLookupJoin(LookupJoin join) { config = new JoinConfig(coreJoin, leftKeys, leftKeys, rightKeys); join = new LookupJoin(join.source(), join.left(), join.right(), config); - } - // everything else is unsupported for now - else { + } else if (type != JoinTypes.LEFT) { + // everything else is unsupported for now + // LEFT can only happen by being mapped from a USING above. So we need to exclude this as well because this rule can be run + // more than once. UnresolvedAttribute errorAttribute = new UnresolvedAttribute(join.source(), "unsupported", "Unsupported join type"); // add error message return join.withConfig(new JoinConfig(type, singletonList(errorAttribute), emptyList(), emptyList())); @@ -651,7 +652,7 @@ private List resolveUsingColumns(List cols, List"), enrichResolution); + this( + configuration, + functionRegistry, + indexResolution, + IndexResolution.invalid("AnalyzerContext constructed without any lookup join resolution"), + enrichResolution + ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java index 49d8a5ee8caa..f5fd82d742bc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.analysis; -import org.elasticsearch.index.IndexMode; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.elasticsearch.xpack.esql.common.Failure; @@ -55,7 +54,8 @@ import org.elasticsearch.xpack.esql.plan.logical.RegexExtract; import org.elasticsearch.xpack.esql.plan.logical.Row; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; -import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; import org.elasticsearch.xpack.esql.stats.FeatureMetric; import org.elasticsearch.xpack.esql.stats.Metrics; @@ -172,20 +172,6 @@ else if (p instanceof Lookup lookup) { else { lookup.matchFields().forEach(unresolvedExpressions); } - } else if (p instanceof LookupJoin lj) { - // expect right side to always be a lookup index - lj.right().forEachUp(EsRelation.class, r -> { - if (r.indexMode() != IndexMode.LOOKUP) { - failures.add( - fail( - r, - "LOOKUP JOIN right side [{}] must be a lookup index (index_mode=lookup, not [{}]", - r.index().name(), - r.indexMode().getName() - ) - ); - } - }); } else { @@ -217,6 +203,7 @@ else if (p instanceof Lookup lookup) { checkSort(p, failures); checkFullTextQueryFunctions(p, failures); + checkJoin(p, failures); }); checkRemoteEnrich(plan, failures); checkMetadataScoreNameReserved(plan, failures); @@ -791,6 +778,35 @@ private static void checkNotPresentInDisjunctions( }); } + /** + * Checks Joins for invalid usage. + * + * @param plan root plan to check + * @param failures failures found + */ + private static void checkJoin(LogicalPlan plan, Set failures) { + if (plan instanceof Join join) { + JoinConfig config = join.config(); + for (int i = 0; i < config.leftFields().size(); i++) { + Attribute leftField = config.leftFields().get(i); + Attribute rightField = config.rightFields().get(i); + if (leftField.dataType() != rightField.dataType()) { + failures.add( + fail( + leftField, + "JOIN left field [{}] of type [{}] is incompatible with right field [{}] of type [{}]", + leftField.name(), + leftField.dataType(), + rightField.name(), + rightField.dataType() + ) + ); + } + } + + } + } + /** * Checks full text query functions for invalid usage. * diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java index dc32a4ad3c28..ed8851b64c27 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java @@ -11,14 +11,12 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; -import org.elasticsearch.xpack.esql.core.expression.TypedAttribute; import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize; import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.LeafExec; -import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.rule.Rule; @@ -102,25 +100,17 @@ private static Set missingAttributes(PhysicalPlan p) { var missing = new LinkedHashSet(); var input = p.inputSet(); - // For LOOKUP JOIN we only need field-extraction on left fields used to match, since the right side is always materialized - if (p instanceof LookupJoinExec join) { - join.leftFields().forEach(f -> { - if (input.contains(f) == false) { - missing.add(f); - } - }); - return missing; - } - - // collect field attributes used inside expressions - // TODO: Rather than going over all expressions manually, this should just call .references() - p.forEachExpression(TypedAttribute.class, f -> { + // Collect field attributes referenced by this plan but not yet present in the child's output. + // This is also correct for LookupJoinExec, where we only need field extraction on the left fields used to match, since the right + // side is always materialized. + p.references().forEach(f -> { if (f instanceof FieldAttribute || f instanceof MetadataAttribute) { if (input.contains(f) == false) { missing.add(f); } } }); + return missing; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 8c0488afdd42..b85340936497 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -565,21 +565,12 @@ private PhysicalOperation planHashJoin(HashJoinExec join, LocalExecutionPlannerC private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlannerContext context) { PhysicalOperation source = plan(join.left(), context); - // TODO: The source builder includes incoming fields including the ones we're going to drop Layout.Builder layoutBuilder = source.layout.builder(); for (Attribute f : join.addedFields()) { layoutBuilder.append(f); } Layout layout = layoutBuilder.build(); - // TODO: this works when the join happens on the coordinator - /* - * But when it happens on the data node we get a - * \_FieldExtractExec[language_code{f}#15, language_name{f}#16]<[]> - * \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#18], limit[], sort[] estimatedRowSize[62] - * Which we'd prefer not to do - at least for now. We already know the fields we're loading - * and don't want any local planning. - */ EsQueryExec localSourceExec = (EsQueryExec) join.lookup(); if (localSourceExec.indexMode() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "]"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 71fba5683644..4f7c620bc8d1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -374,10 +374,11 @@ private void preAnalyzeLookupIndices(List indices, ListenerResult lis // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( table.index(), - Set.of("*"), // Current LOOKUP JOIN syntax does not allow for field selection + Set.of("*"), // TODO: for LOOKUP JOIN, this currently declares all lookup index fields relevant and might fetch too many. null, listener.map(indexResolution -> listenerResult.withLookupIndexResolution(indexResolution)) ); + // TODO: Verify that the resolved index actually has indexMode: "lookup" } else { try { // No lookup indices specified diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java index a63ee53cdd49..4e89a09db9ed 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java @@ -38,7 +38,7 @@ public static Analyzer defaultAnalyzer() { } public static Analyzer expandedDefaultAnalyzer() { - return analyzer(analyzerExpandedDefaultMapping()); + return analyzer(expandedDefaultIndexResolution()); } public static Analyzer analyzer(IndexResolution indexResolution) { @@ -47,18 +47,33 @@ public static Analyzer analyzer(IndexResolution indexResolution) { public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) { return new Analyzer( - new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), indexResolution, defaultEnrichResolution()), + new AnalyzerContext( + EsqlTestUtils.TEST_CFG, + new EsqlFunctionRegistry(), + indexResolution, + defaultLookupResolution(), + defaultEnrichResolution() + ), verifier ); } public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) { - return new Analyzer(new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultEnrichResolution()), verifier); + return new Analyzer( + new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultLookupResolution(), defaultEnrichResolution()), + verifier + ); } public static Analyzer analyzer(Verifier verifier) { return new Analyzer( - new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), analyzerDefaultMapping(), defaultEnrichResolution()), + new AnalyzerContext( + EsqlTestUtils.TEST_CFG, + new EsqlFunctionRegistry(), + analyzerDefaultMapping(), + defaultLookupResolution(), + defaultEnrichResolution() + ), verifier ); } @@ -98,10 +113,14 @@ public static IndexResolution analyzerDefaultMapping() { return loadMapping("mapping-basic.json", "test"); } - public static IndexResolution analyzerExpandedDefaultMapping() { + public static IndexResolution expandedDefaultIndexResolution() { return loadMapping("mapping-default.json", "test"); } + public static IndexResolution defaultLookupResolution() { + return loadMapping("mapping-languages.json", "languages_lookup"); + } + public static EnrichResolution defaultEnrichResolution() { EnrichResolution enrichResolution = new EnrichResolution(); loadEnrichPolicyResolution(enrichResolution, MATCH_TYPE, "languages", "language_code", "languages_idx", "mapping-languages.json"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 5a1e109041a1..6edbb55af463 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.analysis.IndexAnalyzers; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.LoadMapping; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; @@ -73,6 +74,8 @@ import static org.elasticsearch.xpack.esql.analysis.Analyzer.NO_FIELDS; import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze; import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzerDefaultMapping; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultEnrichResolution; import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping; import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.tsdbIndexResolution; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; @@ -83,6 +86,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.matchesRegex; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; //@TestLogging(value = "org.elasticsearch.xpack.esql.analysis:TRACE", reason = "debug") @@ -2002,6 +2006,58 @@ public void testLookupMatchTypeWrong() { assertThat(e.getMessage(), containsString("column type mismatch, table column was [integer] and original column was [keyword]")); } + public void testLookupJoinUnknownIndex() { + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled()); + + String errorMessage = "Unknown index [foobar]"; + IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage); + + Analyzer analyzerMissingLookupIndex = new Analyzer( + new AnalyzerContext( + EsqlTestUtils.TEST_CFG, + new EsqlFunctionRegistry(), + analyzerDefaultMapping(), + missingLookupIndex, + defaultEnrichResolution() + ), + TEST_VERIFIER + ); + + String query = "FROM test | LOOKUP JOIN foobar ON last_name"; + + VerificationException e = expectThrows(VerificationException.class, () -> analyze(query, analyzerMissingLookupIndex)); + assertThat(e.getMessage(), containsString("1:25: " + errorMessage)); + + String query2 = "FROM test | LOOKUP JOIN foobar ON missing_field"; + + e = expectThrows(VerificationException.class, () -> analyze(query2, analyzerMissingLookupIndex)); + assertThat(e.getMessage(), containsString("1:25: " + errorMessage)); + assertThat(e.getMessage(), not(containsString("[missing_field]"))); + } + + public void testLookupJoinUnknownField() { + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled()); + + String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name"; + String errorMessage = "1:45: Unknown column [last_name] in right side of join"; + + VerificationException e = expectThrows(VerificationException.class, () -> analyze(query)); + assertThat(e.getMessage(), containsString(errorMessage)); + + String query2 = "FROM test | LOOKUP JOIN languages_lookup ON language_code"; + String errorMessage2 = "1:45: Unknown column [language_code] in left side of join"; + + e = expectThrows(VerificationException.class, () -> analyze(query2)); + assertThat(e.getMessage(), containsString(errorMessage2)); + + String query3 = "FROM test | LOOKUP JOIN languages_lookup ON missing_altogether"; + String errorMessage3 = "1:45: Unknown column [missing_altogether] in "; + + e = expectThrows(VerificationException.class, () -> analyze(query3)); + assertThat(e.getMessage(), containsString(errorMessage3 + "left side of join")); + assertThat(e.getMessage(), containsString(errorMessage3 + "right side of join")); + } + public void testImplicitCasting() { var e = expectThrows(VerificationException.class, () -> analyze(""" from test | eval x = concat("2024", "-04", "-01") + 1 day diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index 74e2de114172..882b8b7dbfd7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -1926,6 +1926,17 @@ public void testSortByAggregate() { assertEquals("1:18: Aggregate functions are not allowed in SORT [COUNT]", error("FROM test | SORT count(*)")); } + public void testLookupJoinDataTypeMismatch() { + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled()); + + query("FROM test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code"); + + assertEquals( + "1:87: JOIN left field [language_code] of type [KEYWORD] is incompatible with right field [language_code] of type [INTEGER]", + error("FROM test | EVAL language_code = languages::keyword | LOOKUP JOIN languages_lookup ON language_code") + ); + } + private void query(String query) { defaultAnalyzer.analyze(parser.createStatement(query)); }