Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC DO NOT MERGE - Semantic Query #11

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ protected UpdateByQueryRequest buildRequest(RestRequest request, NamedWriteableR
consumers.put("script", o -> internal.setScript(Script.parse(o)));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

// TODO There surely must be a better way of doing this
request.params().put("_source_includes", "*");
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included source exclusion from the demo. Hacky but gets the job done for now

parseInternalRequest(internal, request, namedWriteableRegistry, consumers);

internal.setPipeline(request.param("pipeline"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private void runCoordinatorRewritePhase() {
continue;
}
boolean canMatch = true;
CoordinatorRewriteContext coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(
CoordinatorRewriteContext coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContextForIndex(
request.shardId().getIndex()
);
if (coordinatorRewriteContext != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.search;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.query.CoordinatorRewriteContext;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;

import static org.elasticsearch.core.Strings.format;

/**
* This search phase can be used as an initial search phase to pre-filter search shards based on query rewriting.
* The queries are rewritten against the shards and based on the rewrite result shards might be able to be excluded
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
* This phase can also be used to pre-sort shards based on min/max values in each shard of the provided primary sort.
* When the query primary sort is perform on a field, this phase extracts the min/max value in each shard and
* sort them according to the provided order. This can be useful for instance to ensure that shards that contain recent
* data are executed first when sorting by descending timestamp.
*/
final class CoordinatorQueryRewriteSearchPhase extends SearchPhase {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New search phase for rewriting queries in the coordinator node


private final Logger logger;
private final SearchRequest request;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
private final ActionListener<SearchRequest> listener;

private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider;

private final IndicesService indicesService;

private final Executor executor;

CoordinatorQueryRewriteSearchPhase(
Logger logger,
SearchRequest request,
GroupShardsIterator<SearchShardIterator> shardsIts,
Executor executor,
IndicesService indicesService,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
ActionListener<SearchRequest> listener
) {
super("coordinator_rewrite");

this.logger = logger;
this.request = request;
this.executor = executor;
this.listener = listener;
this.shardsIts = shardsIts;
this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider;
this.indicesService = indicesService;
}

private static boolean assertSearchCoordinationThread() {
return ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION);
}

@Override
public void run() throws IOException {
assert assertSearchCoordinationThread();
runCoordinatorRewritePhase();
}

// tries to pre-filter shards based on information that's available to the coordinator
// without having to reach out to the actual shards
private void runCoordinatorRewritePhase() {
// TODO: the index filter (i.e, `_index:patten`) should be prefiltered on the coordinator
assert assertSearchCoordinationThread();
final List<SearchShardIterator> matchedShardLevelRequests = new ArrayList<>();
Map<String, Set<String>> fieldModelIds = new HashMap<>();
for (SearchShardIterator searchShardIterator : shardsIts) {
Index index = searchShardIterator.shardId().getIndex();
MappingLookup mappingLookup = indicesService.indexService(index).mapperService().mappingLookup();
mappingLookup.modelsForFields().forEach((k, v) -> {
Set<String> modelIds = fieldModelIds.computeIfAbsent(k, value -> new HashSet<String>());
modelIds.add(v);
});
}

CoordinatorRewriteContext coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContextForModels(
fieldModelIds
);
Rewriteable.rewriteAndFetch(request, coordinatorRewriteContext, listener);
}

@Override
public void start() {
// Note that the search is failed when this task is rejected by the executor
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("Failed to execute [%s] while running [%s] phase", request, getName()), e);
}
listener.onFailure(new SearchPhaseExecutionException(getName(), e.getMessage(), e.getCause(), ShardSearchFailure.EMPTY_ARRAY));
}

@Override
protected void doRun() throws IOException {
CoordinatorQueryRewriteSearchPhase.this.run();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public SearchPhase newSearchPhase(
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
boolean preFilter,
boolean runCoordinatorPhase,
ThreadPool threadPool,
SearchResponse.Clusters clusters
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ private void executeSearch(
Collections.unmodifiableMap(aliasFilter),
concreteIndexBoosts,
preFilterSearchShards,
SearchService.canRewriteInCoordinator(searchRequest.source()),
threadPool,
clusters
).start();
Expand Down Expand Up @@ -1210,6 +1211,7 @@ SearchPhase newSearchPhase(
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
boolean preFilter,
boolean runCoordinatorPhase,
ThreadPool threadPool,
SearchResponse.Clusters clusters
);
Expand All @@ -1234,10 +1236,38 @@ public SearchPhase newSearchPhase(
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
boolean preFilter,
boolean runCoordinatorPhase,
ThreadPool threadPool,
SearchResponse.Clusters clusters
) {
if (preFilter) {
if (runCoordinatorPhase) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added as a pre-step for search phases

return new CoordinatorQueryRewriteSearchPhase(
logger,
searchRequest,
shardIterators,
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
searchService.getIndicesService(),
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
listener.delegateFailureAndWrap((l, newSearchRequest) -> {
SearchPhase action = newSearchPhase(
task,
newSearchRequest,
executor,
shardIterators,
timeProvider,
connectionLookup,
clusterState,
aliasFilter,
concreteIndexBoosts,
preFilter,
false,
threadPool,
clusters
);
action.start();
}
));
} else if (preFilter) {
return new CanMatchPreFilterSearchPhase(
logger,
searchTransportService,
Expand All @@ -1263,6 +1293,7 @@ public SearchPhase newSearchPhase(
aliasFilter,
concreteIndexBoosts,
false,
false,
threadPool,
clusters
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ String modelForField(String fieldName) {
return this.fieldToInferenceModels.get(fieldName);
}

Map<String, String> modelsForFields() {
return this.fieldToInferenceModels;
}

/**
* Returns the mapped field type for the given field name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,4 +495,8 @@ public void validateDoesNotShadow(String name) {
public String modelForField(String fieldName) {
return fieldTypeLookup.modelForField(fieldName);
}

public Map<String, String> modelsForFields() {
return fieldTypeLookup.modelsForFields();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ public SemanticTextFieldMapper build(MapperBuilderContext context) {

public static class SemanticTextFieldType extends SimpleMappedFieldType {

private SparseVectorFieldType sparseVectorFieldType;
private final SparseVectorFieldType sparseVectorFieldType;

private final String modelId;

public SemanticTextFieldType(String name, String modelId, Map<String, String> meta) {
super(name, true, false, false, TextSearchInfo.NONE, meta);
this.sparseVectorFieldType = new SparseVectorFieldType(name + "." + SPARSE_VECTOR_SUBFIELD_NAME, meta);
this.modelId = modelId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

/**
Expand All @@ -27,8 +29,11 @@
* don't hold queried data. See IndexMetadata#getTimestampRange() for more details
*/
public class CoordinatorRewriteContext extends QueryRewriteContext {
@Nullable
private final IndexLongFieldRange indexLongFieldRange;
@Nullable
private final DateFieldMapper.DateFieldType timestampFieldType;
private final Map<String, Set<String>> fieldNamesToInferenceModel;

public CoordinatorRewriteContext(
XContentParserConfiguration parserConfig,
Expand All @@ -55,6 +60,34 @@ public CoordinatorRewriteContext(
);
this.indexLongFieldRange = indexLongFieldRange;
this.timestampFieldType = timestampFieldType;
this.fieldNamesToInferenceModel = Map.of();
}

public CoordinatorRewriteContext(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new constructor for the rewriting in the coordinator search phase

XContentParserConfiguration parserConfig,
Client client,
LongSupplier nowInMillis,
Map<String, Set<String>> fieldNamesToInferenceModel
) {
super(
parserConfig,
client,
nowInMillis,
null,
MappingLookup.EMPTY,
Collections.emptyMap(),
null,
null,
null,
null,
null,
null,
null,
null
);
this.indexLongFieldRange = null;
this.timestampFieldType = null;
this.fieldNamesToInferenceModel = fieldNamesToInferenceModel;
}

long getMinTimestamp() {
Expand Down Expand Up @@ -82,4 +115,9 @@ public MappedFieldType getFieldType(String fieldName) {
public CoordinatorRewriteContext convertToCoordinatorRewriteContext() {
return this;
}

@Nullable
public Set<String> inferenceModelsForFieldName(String fieldName) {
return fieldNamesToInferenceModel.get(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.StringFieldType;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand All @@ -42,7 +45,7 @@ public CoordinatorRewriteContextProvider(
}

@Nullable
public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
public CoordinatorRewriteContext getCoordinatorRewriteContextForIndex(Index index) {
var clusterState = clusterStateSupplier.get();
var indexMetadata = clusterState.metadata().index(index);

Expand All @@ -63,4 +66,9 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {

return new CoordinatorRewriteContext(parserConfig, client, nowInMillis, timestampRange, dateFieldType);
}

@Nullable
public CoordinatorRewriteContext getCoordinatorRewriteContextForModels(Map<String, Set<String>> fieldToModelIds) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New method for obtaining the rewrite context for the new search phase

return new CoordinatorRewriteContext(parserConfig, client, nowInMillis, fieldToModelIds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.query;

// Marker interface for queries that can be
public interface CoordinatorRewriteableQueryBuilder {
}
Loading
Loading