Skip to content

Commit

Permalink
Rename class for clarity of purpose
Browse files Browse the repository at this point in the history
  • Loading branch information
craigtaverner committed Dec 13, 2024
1 parent a2bdc02 commit 99dbc09
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void analyzedPlan(
return;
}

Function<ListenerResult, LogicalPlan> analyzeAction = (l) -> {
Function<PreAnalysisResult, LogicalPlan> analyzeAction = (l) -> {
planningMetrics.gatherPreAnalysisMetrics(parsed);
Analyzer analyzer = new Analyzer(
new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution),
Expand All @@ -308,27 +308,27 @@ public void analyzedPlan(

var listener = SubscribableListener.<EnrichResolution>newForked(
l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)
).<ListenerResult>andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l));
).<PreAnalysisResult>andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l));
// first resolve the lookup indices, then the main indices
for (TableInfo lookupIndex : preAnalysis.lookupIndices) {
listener = listener.andThen((l, listenerResult) -> { preAnalyzeLookupIndex(lookupIndex, listenerResult, l); });
listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(lookupIndex, preAnalysisResult, l); });
}
listener.<ListenerResult>andThen((l, listenerResult) -> {
listener.<PreAnalysisResult>andThen((l, result) -> {
// resolve the main indices
preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, requestFilter, l);
}).<ListenerResult>andThen((l, listenerResult) -> {
preAnalyzeIndices(preAnalysis.indices, executionInfo, result, requestFilter, l);
}).<PreAnalysisResult>andThen((l, result) -> {
// TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for
// invalid index resolution to updateExecutionInfo
if (listenerResult.indices.isValid()) {
if (result.indices.isValid()) {
// CCS indices and skip_unavailable cluster values can stop the analysis right here
if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, listenerResult, logicalPlanListener, l)) return;
if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, result, logicalPlanListener, l)) return;
}
// whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step
l.onResponse(listenerResult);
}).<ListenerResult>andThen((l, listenerResult) -> {
l.onResponse(result);
}).<PreAnalysisResult>andThen((l, result) -> {
// first attempt (maybe the only one) at analyzing the plan
analyzeAndMaybeRetry(analyzeAction, requestFilter, listenerResult, logicalPlanListener, l);
}).<ListenerResult>andThen((l, listenerResult) -> {
analyzeAndMaybeRetry(analyzeAction, requestFilter, result, logicalPlanListener, l);
}).<PreAnalysisResult>andThen((l, result) -> {
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";

// "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices
Expand All @@ -338,13 +338,13 @@ public void analyzedPlan(
}

// here the requestFilter is set to null, performing the pre-analysis after the first step failed
preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, null, l);
}).<LogicalPlan>andThen((l, listenerResult) -> {
preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l);
}).<LogicalPlan>andThen((l, result) -> {
assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request";
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
LogicalPlan plan;
try {
plan = analyzeAction.apply(listenerResult);
plan = analyzeAction.apply(result);
} catch (Exception e) {
l.onFailure(e);
return;
Expand All @@ -354,35 +354,33 @@ public void analyzedPlan(
}).addListener(logicalPlanListener);
}

private void preAnalyzeLookupIndex(TableInfo tableInfo, ListenerResult listenerResult, ActionListener<ListenerResult> listener) {
private void preAnalyzeLookupIndex(TableInfo tableInfo, PreAnalysisResult result, ActionListener<PreAnalysisResult> listener) {
TableIdentifier table = tableInfo.id();
Set<String> fieldNames = listenerResult.wildcardJoinIndices().contains(table.index())
? IndexResolver.ALL_FIELDS
: listenerResult.fieldNames;
Set<String> fieldNames = result.wildcardJoinIndices().contains(table.index()) ? IndexResolver.ALL_FIELDS : result.fieldNames;
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
indexResolver.resolveAsMergedMapping(
table.index(),
fieldNames,
null,
listener.map(indexResolution -> listenerResult.addLookupIndexResolution(table.index(), indexResolution))
listener.map(indexResolution -> result.addLookupIndexResolution(table.index(), indexResolution))
);
// TODO: Verify that the resolved index actually has indexMode: "lookup"
}

private void preAnalyzeIndices(
List<TableInfo> indices,
EsqlExecutionInfo executionInfo,
ListenerResult listenerResult,
PreAnalysisResult result,
QueryBuilder requestFilter,
ActionListener<ListenerResult> listener
ActionListener<PreAnalysisResult> listener
) {
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
if (indices.size() > 1) {
// Note: JOINs are not supported but we detect them when
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
} else if (indices.size() == 1) {
// known to be unavailable from the enrich policy API call
Map<String, Exception> unavailableClusters = listenerResult.enrichResolution.getUnavailableClusters();
Map<String, Exception> unavailableClusters = result.enrichResolution.getUnavailableClusters();
TableInfo tableInfo = indices.get(0);
TableIdentifier table = tableInfo.id();

Expand Down Expand Up @@ -415,22 +413,20 @@ private void preAnalyzeIndices(
String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(
listenerResult.withIndexResolution(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())))
);
listener.onResponse(result.withIndexResolution(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))));
} else {
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
indexResolver.resolveAsMergedMapping(
indexExpressionToResolve,
listenerResult.fieldNames,
result.fieldNames,
requestFilter,
listener.map(indexResolution -> listenerResult.withIndexResolution(indexResolution))
listener.map(indexResolution -> result.withIndexResolution(indexResolution))
);
}
} else {
try {
// occurs when dealing with local relations (row a = 1)
listener.onResponse(listenerResult.withIndexResolution(IndexResolution.invalid("[none specified]")));
listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
} catch (Exception ex) {
listener.onFailure(ex);
}
Expand All @@ -441,11 +437,11 @@ private boolean analyzeCCSIndices(
EsqlExecutionInfo executionInfo,
Set<String> targetClusters,
Set<EnrichPolicyResolver.UnresolvedPolicy> unresolvedPolicies,
ListenerResult listenerResult,
PreAnalysisResult result,
ActionListener<LogicalPlan> logicalPlanListener,
ActionListener<ListenerResult> l
ActionListener<PreAnalysisResult> l
) {
IndexResolution indexResolution = listenerResult.indices;
IndexResolution indexResolution = result.indices;
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
Expand All @@ -467,27 +463,27 @@ private boolean analyzeCCSIndices(
enrichPolicyResolver.resolvePolicies(
newClusters,
unresolvedPolicies,
l.map(enrichResolution -> listenerResult.withEnrichResolution(enrichResolution))
l.map(enrichResolution -> result.withEnrichResolution(enrichResolution))
);
return true;
}
return false;
}

private static void analyzeAndMaybeRetry(
Function<ListenerResult, LogicalPlan> analyzeAction,
Function<PreAnalysisResult, LogicalPlan> analyzeAction,
QueryBuilder requestFilter,
ListenerResult listenerResult,
PreAnalysisResult result,
ActionListener<LogicalPlan> logicalPlanListener,
ActionListener<ListenerResult> l
ActionListener<PreAnalysisResult> l
) {
LogicalPlan plan = null;
var filterPresentMessage = requestFilter == null ? "without" : "with";
var attemptMessage = requestFilter == null ? "the only" : "first";
LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage);

try {
plan = analyzeAction.apply(listenerResult);
plan = analyzeAction.apply(result);
} catch (Exception e) {
if (e instanceof VerificationException ve) {
LOGGER.debug(
Expand All @@ -502,7 +498,7 @@ private static void analyzeAndMaybeRetry(
} else {
// interested only in a VerificationException, but this time we are taking out the index filter
// to try and make the index resolution work without any index filtering. In the next step... to be continued
l.onResponse(listenerResult);
l.onResponse(result);
}
} else {
// if the query failed with any other type of exception, then just pass the exception back to the user
Expand All @@ -515,24 +511,24 @@ private static void analyzeAndMaybeRetry(
logicalPlanListener.onResponse(plan);
}

private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution, ActionListener<ListenerResult> l) {
private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution, ActionListener<PreAnalysisResult> l) {
try {
// we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API
var enrichMatchFields = enrichResolution.resolvedEnrichPolicies()
.stream()
.map(ResolvedEnrichPolicy::matchField)
.collect(Collectors.toSet());
// get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy
l.onResponse(fieldNames(parsed, enrichMatchFields, new ListenerResult(enrichResolution)));
l.onResponse(fieldNames(parsed, enrichMatchFields, new PreAnalysisResult(enrichResolution)));
} catch (Exception ex) {
l.onFailure(ex);
}
}

static ListenerResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields, ListenerResult listenerResult) {
static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields, PreAnalysisResult result) {
if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) {
// no explicit columns selection, for example "from employees"
return listenerResult.withFieldNames(IndexResolver.ALL_FIELDS);
return result.withFieldNames(IndexResolver.ALL_FIELDS);
}

Holder<Boolean> projectAll = new Holder<>(false);
Expand All @@ -543,7 +539,7 @@ static ListenerResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMat
projectAll.set(true);
});
if (projectAll.get()) {
return listenerResult.withFieldNames(IndexResolver.ALL_FIELDS);
return result.withFieldNames(IndexResolver.ALL_FIELDS);
}

AttributeSet references = new AttributeSet();
Expand Down Expand Up @@ -615,7 +611,7 @@ static ListenerResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMat
references.addAll(keepJoinReferences);
// If any JOIN commands need wildcard field-caps calls, persist the index names
if (wildcardJoinIndices.isEmpty() == false) {
listenerResult = listenerResult.withWildcardJoinIndices(wildcardJoinIndices);
result = result.withWildcardJoinIndices(wildcardJoinIndices);
}

// remove valid metadata attributes because they will be filtered out by the IndexResolver anyway
Expand All @@ -625,12 +621,12 @@ static ListenerResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMat

if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) {
// there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index
return listenerResult.withFieldNames(IndexResolver.INDEX_METADATA_FIELD);
return result.withFieldNames(IndexResolver.INDEX_METADATA_FIELD);
} else {
fieldNames.addAll(subfields(fieldNames));
fieldNames.addAll(enrichPolicyMatchFields);
fieldNames.addAll(subfields(enrichPolicyMatchFields));
return listenerResult.withFieldNames(fieldNames);
return result.withFieldNames(fieldNames);
}
}

Expand Down Expand Up @@ -689,36 +685,36 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
return plan;
}

record ListenerResult(
record PreAnalysisResult(
IndexResolution indices,
Map<String, IndexResolution> lookupIndices,
EnrichResolution enrichResolution,
Set<String> fieldNames,
Set<String> wildcardJoinIndices
) {
ListenerResult(EnrichResolution newEnrichResolution) {
PreAnalysisResult(EnrichResolution newEnrichResolution) {
this(null, new HashMap<>(), newEnrichResolution, Set.of(), Set.of());
}

ListenerResult withEnrichResolution(EnrichResolution newEnrichResolution) {
return new ListenerResult(indices(), lookupIndices(), newEnrichResolution, fieldNames(), wildcardJoinIndices());
PreAnalysisResult withEnrichResolution(EnrichResolution newEnrichResolution) {
return new PreAnalysisResult(indices(), lookupIndices(), newEnrichResolution, fieldNames(), wildcardJoinIndices());
}

ListenerResult withIndexResolution(IndexResolution newIndexResolution) {
return new ListenerResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices());
PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) {
return new PreAnalysisResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices());
}

ListenerResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) {
PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) {
lookupIndices.put(index, newIndexResolution);
return this;
}

ListenerResult withFieldNames(Set<String> newFields) {
return new ListenerResult(indices(), lookupIndices(), enrichResolution(), newFields, wildcardJoinIndices());
PreAnalysisResult withFieldNames(Set<String> newFields) {
return new PreAnalysisResult(indices(), lookupIndices(), enrichResolution(), newFields, wildcardJoinIndices());
}

public ListenerResult withWildcardJoinIndices(Set<String> wildcardJoinIndices) {
return new ListenerResult(indices(), lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices);
public PreAnalysisResult withWildcardJoinIndices(Set<String> wildcardJoinIndices) {
return new PreAnalysisResult(indices(), lookupIndices(), enrichResolution(), fieldNames(), wildcardJoinIndices);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1537,8 +1537,8 @@ public void testMultiLookupJoinSameIndexKeepAfter() {
}

private Set<String> fieldNames(String query, Set<String> enrichPolicyMatchFields) {
EsqlSession.ListenerResult listenerResult = new EsqlSession.ListenerResult(null);
return EsqlSession.fieldNames(parser.createStatement(query), enrichPolicyMatchFields, listenerResult).fieldNames();
var preAnalysisResult = new EsqlSession.PreAnalysisResult(null);
return EsqlSession.fieldNames(parser.createStatement(query), enrichPolicyMatchFields, preAnalysisResult).fieldNames();
}

private void assertFieldNames(String query, Set<String> expected) {
Expand All @@ -1547,9 +1547,8 @@ private void assertFieldNames(String query, Set<String> expected) {
}

private void assertFieldNames(String query, Set<String> expected, Set<String> wildCardIndices) {
EsqlSession.ListenerResult listenerResult = new EsqlSession.ListenerResult(null);
listenerResult = EsqlSession.fieldNames(parser.createStatement(query), Set.of(), listenerResult);
assertThat("Query-wide field names", listenerResult.fieldNames(), equalTo(expected));
assertThat("Lookup Indices that expect wildcard lookups", listenerResult.wildcardJoinIndices(), equalTo(wildCardIndices));
var preAnalysisResult = EsqlSession.fieldNames(parser.createStatement(query), Set.of(), new EsqlSession.PreAnalysisResult(null));
assertThat("Query-wide field names", preAnalysisResult.fieldNames(), equalTo(expected));
assertThat("Lookup Indices that expect wildcard lookups", preAnalysisResult.wildcardJoinIndices(), equalTo(wildCardIndices));
}
}

0 comments on commit 99dbc09

Please sign in to comment.