diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index cb27c7e87471e..c0290fa2b1d73 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -283,7 +283,7 @@ public void analyzedPlan( return; } - Function analyzeAction = (l) -> { + Function analyzeAction = (l) -> { planningMetrics.gatherPreAnalysisMetrics(parsed); Analyzer analyzer = new Analyzer( new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution), @@ -308,27 +308,27 @@ public void analyzedPlan( var listener = SubscribableListener.newForked( l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l) - ).andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l)); + ).andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l)); // first resolve the lookup indices, then the main indices for (TableInfo lookupIndex : preAnalysis.lookupIndices) { - listener = listener.andThen((l, listenerResult) -> { preAnalyzeLookupIndex(lookupIndex, listenerResult, l); }); + listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(lookupIndex, preAnalysisResult, l); }); } - listener.andThen((l, listenerResult) -> { + listener.andThen((l, result) -> { // resolve the main indices - preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, requestFilter, l); - }).andThen((l, listenerResult) -> { + preAnalyzeIndices(preAnalysis.indices, executionInfo, result, requestFilter, l); + }).andThen((l, result) -> { // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for // invalid index resolution to updateExecutionInfo - if (listenerResult.indices.isValid()) { + if (result.indices.isValid()) { // CCS indices and skip_unavailable cluster values can stop the analysis right here - if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, listenerResult, logicalPlanListener, l)) return; + if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, result, logicalPlanListener, l)) return; } // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step - l.onResponse(listenerResult); - }).andThen((l, listenerResult) -> { + l.onResponse(result); + }).andThen((l, result) -> { // first attempt (maybe the only one) at analyzing the plan - analyzeAndMaybeRetry(analyzeAction, requestFilter, listenerResult, logicalPlanListener, l); - }).andThen((l, listenerResult) -> { + analyzeAndMaybeRetry(analyzeAction, requestFilter, result, logicalPlanListener, l); + }).andThen((l, result) -> { assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices @@ -338,13 +338,13 @@ public void analyzedPlan( } // here the requestFilter is set to null, performing the pre-analysis after the first step failed - preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, null, l); - }).andThen((l, listenerResult) -> { + preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l); + }).andThen((l, result) -> { assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; LOGGER.debug("Analyzing the plan (second attempt, without filter)"); LogicalPlan plan; try { - plan = analyzeAction.apply(listenerResult); + plan = analyzeAction.apply(result); } catch (Exception e) { l.onFailure(e); return; @@ -354,17 +354,15 @@ public void analyzedPlan( }).addListener(logicalPlanListener); } - private void preAnalyzeLookupIndex(TableInfo tableInfo, ListenerResult listenerResult, ActionListener listener) { + private void preAnalyzeLookupIndex(TableInfo tableInfo, PreAnalysisResult result, ActionListener listener) { TableIdentifier table = tableInfo.id(); - Set fieldNames = listenerResult.wildcardJoinIndices().contains(table.index()) - ? IndexResolver.ALL_FIELDS - : listenerResult.fieldNames; + Set fieldNames = result.wildcardJoinIndices().contains(table.index()) ? IndexResolver.ALL_FIELDS : result.fieldNames; // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( table.index(), fieldNames, null, - listener.map(indexResolution -> listenerResult.addLookupIndexResolution(table.index(), indexResolution)) + listener.map(indexResolution -> result.addLookupIndexResolution(table.index(), indexResolution)) ); // TODO: Verify that the resolved index actually has indexMode: "lookup" } @@ -372,9 +370,9 @@ private void preAnalyzeLookupIndex(TableInfo tableInfo, ListenerResult listenerR private void preAnalyzeIndices( List indices, EsqlExecutionInfo executionInfo, - ListenerResult listenerResult, + PreAnalysisResult result, QueryBuilder requestFilter, - ActionListener listener + ActionListener listener ) { // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one if (indices.size() > 1) { @@ -382,7 +380,7 @@ private void preAnalyzeIndices( listener.onFailure(new MappingException("Queries with multiple indices are not supported")); } else if (indices.size() == 1) { // known to be unavailable from the enrich policy API call - Map unavailableClusters = listenerResult.enrichResolution.getUnavailableClusters(); + Map unavailableClusters = result.enrichResolution.getUnavailableClusters(); TableInfo tableInfo = indices.get(0); TableIdentifier table = tableInfo.id(); @@ -415,22 +413,20 @@ private void preAnalyzeIndices( String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution - listener.onResponse( - listenerResult.withIndexResolution(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))) - ); + listener.onResponse(result.withIndexResolution(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())))); } else { // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( indexExpressionToResolve, - listenerResult.fieldNames, + result.fieldNames, requestFilter, - listener.map(indexResolution -> listenerResult.withIndexResolution(indexResolution)) + listener.map(indexResolution -> result.withIndexResolution(indexResolution)) ); } } else { try { // occurs when dealing with local relations (row a = 1) - listener.onResponse(listenerResult.withIndexResolution(IndexResolution.invalid("[none specified]"))); + listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]"))); } catch (Exception ex) { listener.onFailure(ex); } @@ -441,11 +437,11 @@ private boolean analyzeCCSIndices( EsqlExecutionInfo executionInfo, Set targetClusters, Set unresolvedPolicies, - ListenerResult listenerResult, + PreAnalysisResult result, ActionListener logicalPlanListener, - ActionListener l + ActionListener l ) { - IndexResolution indexResolution = listenerResult.indices; + IndexResolution indexResolution = result.indices; EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { @@ -467,7 +463,7 @@ private boolean analyzeCCSIndices( enrichPolicyResolver.resolvePolicies( newClusters, unresolvedPolicies, - l.map(enrichResolution -> listenerResult.withEnrichResolution(enrichResolution)) + l.map(enrichResolution -> result.withEnrichResolution(enrichResolution)) ); return true; } @@ -475,11 +471,11 @@ private boolean analyzeCCSIndices( } private static void analyzeAndMaybeRetry( - Function analyzeAction, + Function analyzeAction, QueryBuilder requestFilter, - ListenerResult listenerResult, + PreAnalysisResult result, ActionListener logicalPlanListener, - ActionListener l + ActionListener l ) { LogicalPlan plan = null; var filterPresentMessage = requestFilter == null ? "without" : "with"; @@ -487,7 +483,7 @@ private static void analyzeAndMaybeRetry( LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); try { - plan = analyzeAction.apply(listenerResult); + plan = analyzeAction.apply(result); } catch (Exception e) { if (e instanceof VerificationException ve) { LOGGER.debug( @@ -502,7 +498,7 @@ private static void analyzeAndMaybeRetry( } else { // interested only in a VerificationException, but this time we are taking out the index filter // to try and make the index resolution work without any index filtering. In the next step... to be continued - l.onResponse(listenerResult); + l.onResponse(result); } } else { // if the query failed with any other type of exception, then just pass the exception back to the user @@ -515,7 +511,7 @@ private static void analyzeAndMaybeRetry( logicalPlanListener.onResponse(plan); } - private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution, ActionListener l) { + private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution, ActionListener l) { try { // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API var enrichMatchFields = enrichResolution.resolvedEnrichPolicies() @@ -523,16 +519,16 @@ private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enric .map(ResolvedEnrichPolicy::matchField) .collect(Collectors.toSet()); // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy - l.onResponse(fieldNames(parsed, enrichMatchFields, new ListenerResult(enrichResolution))); + l.onResponse(fieldNames(parsed, enrichMatchFields, new PreAnalysisResult(enrichResolution))); } catch (Exception ex) { l.onFailure(ex); } } - static ListenerResult fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields, ListenerResult listenerResult) { + static PreAnalysisResult fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields, PreAnalysisResult result) { if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) { // no explicit columns selection, for example "from employees" - return listenerResult.withFieldNames(IndexResolver.ALL_FIELDS); + return result.withFieldNames(IndexResolver.ALL_FIELDS); } Holder projectAll = new Holder<>(false); @@ -543,7 +539,7 @@ static ListenerResult fieldNames(LogicalPlan parsed, Set enrichPolicyMat projectAll.set(true); }); if (projectAll.get()) { - return listenerResult.withFieldNames(IndexResolver.ALL_FIELDS); + return result.withFieldNames(IndexResolver.ALL_FIELDS); } AttributeSet references = new AttributeSet(); @@ -615,7 +611,7 @@ static ListenerResult fieldNames(LogicalPlan parsed, Set enrichPolicyMat references.addAll(keepJoinReferences); // If any JOIN commands need wildcard field-caps calls, persist the index names if (wildcardJoinIndices.isEmpty() == false) { - listenerResult = listenerResult.withWildcardJoinIndices(wildcardJoinIndices); + result = result.withWildcardJoinIndices(wildcardJoinIndices); } // remove valid metadata attributes because they will be filtered out by the IndexResolver anyway @@ -625,12 +621,12 @@ static ListenerResult fieldNames(LogicalPlan parsed, Set enrichPolicyMat if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) { // there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index - return listenerResult.withFieldNames(IndexResolver.INDEX_METADATA_FIELD); + return result.withFieldNames(IndexResolver.INDEX_METADATA_FIELD); } else { fieldNames.addAll(subfields(fieldNames)); fieldNames.addAll(enrichPolicyMatchFields); fieldNames.addAll(subfields(enrichPolicyMatchFields)); - return listenerResult.withFieldNames(fieldNames); + return result.withFieldNames(fieldNames); } } @@ -689,36 +685,36 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { return plan; } - record ListenerResult( + record PreAnalysisResult( IndexResolution indices, Map lookupIndices, EnrichResolution enrichResolution, Set fieldNames, Set wildcardJoinIndices ) { - ListenerResult(EnrichResolution newEnrichResolution) { + PreAnalysisResult(EnrichResolution newEnrichResolution) { this(null, new HashMap<>(), newEnrichResolution, Set.of(), Set.of()); } - ListenerResult withEnrichResolution(EnrichResolution newEnrichResolution) { - return new ListenerResult(indices(), lookupIndices(), newEnrichResolution, fieldNames(), wildcardJoinIndices()); + PreAnalysisResult withEnrichResolution(EnrichResolution newEnrichResolution) { + return new PreAnalysisResult(indices(), lookupIndices(), newEnrichResolution, fieldNames(), wildcardJoinIndices()); } - ListenerResult withIndexResolution(IndexResolution newIndexResolution) { - return new ListenerResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices()); + PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) { + return new PreAnalysisResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices()); } - ListenerResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) { + PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) { lookupIndices.put(index, newIndexResolution); return this; } - ListenerResult withFieldNames(Set newFields) { - return new ListenerResult(indices(), lookupIndices(), enrichResolution(), newFields, wildcardJoinIndices()); + PreAnalysisResult withFieldNames(Set newFields) { + return new PreAnalysisResult(indices(), lookupIndices(), enrichResolution(), newFields, wildcardJoinIndices()); } - public ListenerResult withWildcardJoinIndices(Set wildcardJoinIndices) { - return new ListenerResult(indices(), lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices); + public PreAnalysisResult withWildcardJoinIndices(Set wildcardJoinIndices) { + return new PreAnalysisResult(indices(), lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices); } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java index 662b37a3872e5..2bb5376d3545c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java @@ -1537,8 +1537,8 @@ public void testMultiLookupJoinSameIndexKeepAfter() { } private Set fieldNames(String query, Set enrichPolicyMatchFields) { - EsqlSession.ListenerResult listenerResult = new EsqlSession.ListenerResult(null); - return EsqlSession.fieldNames(parser.createStatement(query), enrichPolicyMatchFields, listenerResult).fieldNames(); + var preAnalysisResult = new EsqlSession.PreAnalysisResult(null); + return EsqlSession.fieldNames(parser.createStatement(query), enrichPolicyMatchFields, preAnalysisResult).fieldNames(); } private void assertFieldNames(String query, Set expected) { @@ -1547,9 +1547,8 @@ private void assertFieldNames(String query, Set expected) { } private void assertFieldNames(String query, Set expected, Set wildCardIndices) { - EsqlSession.ListenerResult listenerResult = new EsqlSession.ListenerResult(null); - listenerResult = EsqlSession.fieldNames(parser.createStatement(query), Set.of(), listenerResult); - assertThat("Query-wide field names", listenerResult.fieldNames(), equalTo(expected)); - assertThat("Lookup Indices that expect wildcard lookups", listenerResult.wildcardJoinIndices(), equalTo(wildCardIndices)); + var preAnalysisResult = EsqlSession.fieldNames(parser.createStatement(query), Set.of(), new EsqlSession.PreAnalysisResult(null)); + assertThat("Query-wide field names", preAnalysisResult.fieldNames(), equalTo(expected)); + assertThat("Lookup Indices that expect wildcard lookups", preAnalysisResult.wildcardJoinIndices(), equalTo(wildCardIndices)); } }