Skip to content

Commit

Permalink
Merge branch 'main' into pointrange_optimization
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
  • Loading branch information
harshavamsi authored Aug 28, 2024
2 parents ac29217 + 23cba28 commit b42fa37
Show file tree
Hide file tree
Showing 28 changed files with 1,106 additions and 133 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ public void testTermsValuesSource() throws Exception {
}

public void testSimpleDerivedFieldsQuery() {
assumeFalse(
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
SearchRequest searchRequest = new SearchRequest("test-df").source(
SearchSourceBuilder.searchSource()
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
Expand All @@ -204,10 +200,6 @@ public void testSimpleDerivedFieldsQuery() {
}

public void testSimpleDerivedFieldsAgg() {
assumeFalse(
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
SearchRequest searchRequest = new SearchRequest("test-df").source(
SearchSourceBuilder.searchSource()
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -391,6 +392,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -400,6 +402,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -136,6 +138,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;

/**
* Transport action for create snapshot operation
*
Expand All @@ -56,12 +60,15 @@
public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
private final SnapshotsService snapshotsService;

private final RepositoriesService repositoriesService;

@Inject
public TransportCreateSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
RepositoriesService repositoriesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -75,6 +82,7 @@ public TransportCreateSnapshotAction(
indexNameExpressionResolver
);
this.snapshotsService = snapshotsService;
this.repositoriesService = repositoriesService;
}

@Override
Expand Down Expand Up @@ -103,7 +111,9 @@ protected void clusterManagerOperation(
ClusterState state,
final ActionListener<CreateSnapshotResponse> listener
) {
if (request.waitForCompletion()) {
Repository repository = repositoriesService.repository(request.repository());
boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings());
if (request.waitForCompletion() || isSnapshotV2) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,13 @@ public CacheHelper getReaderCacheHelper() {
}

@Override
public FloatVectorValues getFloatVectorValues(String field) throws IOException {
return getFloatVectorValues(field);
public FloatVectorValues getFloatVectorValues(String field) {
throw new UnsupportedOperationException();
}

@Override
public ByteVectorValues getByteVectorValues(String field) throws IOException {
return getByteVectorValues(field);
public ByteVectorValues getByteVectorValues(String field) {
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@ public IndexFieldData.Builder fielddataBuilder(String fullyQualifiedIndexName, S
@Override
public Query termQuery(Object value, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termQuery(value, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -176,10 +175,9 @@ public Query termQuery(Object value, QueryShardContext context) {
@Override
public Query termQueryCaseInsensitive(Object value, @Nullable QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termQueryCaseInsensitive(value, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -195,10 +193,9 @@ public Query termQueryCaseInsensitive(Object value, @Nullable QueryShardContext
@Override
public Query termsQuery(List<?> values, @Nullable QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termsQuery(values, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down Expand Up @@ -230,10 +227,9 @@ public Query rangeQuery(
parser,
context
);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -251,10 +247,9 @@ public Query fuzzyQuery(
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.fuzzyQuery(value, fuzziness, prefixLength, maxExpansions, transpositions, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down Expand Up @@ -289,10 +284,9 @@ public Query fuzzyQuery(
method,
context
);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -316,10 +310,9 @@ public Query prefixQuery(
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.prefixQuery(value, method, caseInsensitive, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -343,10 +336,9 @@ public Query wildcardQuery(
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.wildcardQuery(value, method, caseInsensitive, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -365,10 +357,9 @@ public Query wildcardQuery(
@Override
public Query normalizedWildcardQuery(String value, @Nullable MultiTermQuery.RewriteMethod method, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.normalizedWildcardQuery(value, method, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -394,10 +385,9 @@ public Query regexpQuery(
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.regexpQuery(value, syntaxFlags, matchFlags, maxDeterminizedStates, method, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -416,10 +406,9 @@ public Query regexpQuery(
@Override
public Query phraseQuery(TokenStream stream, int slop, boolean enablePositionIncrements, QueryShardContext context) throws IOException {
Query query = typeFieldMapper.mappedFieldType.phraseQuery(stream, slop, enablePositionIncrements, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -441,10 +430,9 @@ public Query phraseQuery(TokenStream stream, int slop, boolean enablePositionInc
public Query multiPhraseQuery(TokenStream stream, int slop, boolean enablePositionIncrements, QueryShardContext context)
throws IOException {
Query query = typeFieldMapper.mappedFieldType.multiPhraseQuery(stream, slop, enablePositionIncrements, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -465,10 +453,9 @@ public Query multiPhraseQuery(TokenStream stream, int slop, boolean enablePositi
@Override
public Query phrasePrefixQuery(TokenStream stream, int slop, int maxExpansions, QueryShardContext context) throws IOException {
Query query = typeFieldMapper.mappedFieldType.phrasePrefixQuery(stream, slop, maxExpansions, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -493,10 +480,9 @@ public SpanQuery spanPrefixQuery(String value, SpanMultiTermQueryWrapper.SpanRew
@Override
public Query distanceFeatureQuery(Object origin, String pivot, float boost, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.distanceFeatureQuery(origin, pivot, boost, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -507,10 +493,9 @@ public Query distanceFeatureQuery(Object origin, String pivot, float boost, Quer
@Override
public Query geoShapeQuery(Geometry shape, String fieldName, ShapeRelation relation, QueryShardContext context) {
Query query = ((GeoShapeQueryable) (typeFieldMapper.mappedFieldType)).geoShapeQuery(shape, fieldName, relation, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* DerivedFieldQuery used for querying derived fields. It contains the logic to execute an input lucene query against
* DerivedField. It also accepts DerivedFieldValueFetcher and SearchLookup as an input.
*/
public final class DerivedFieldQuery extends Query {
private final Query query;
private final DerivedFieldValueFetcher valueFetcher;
private final Supplier<DerivedFieldValueFetcher> valueFetcherSupplier;
private final SearchLookup searchLookup;
private final Analyzer indexAnalyzer;
private final boolean ignoreMalformed;
Expand All @@ -46,20 +47,19 @@ public final class DerivedFieldQuery extends Query {

/**
* @param query lucene query to be executed against the derived field
* @param valueFetcher DerivedFieldValueFetcher ValueFetcher to fetch the value of a derived field from _source
* using LeafSearchLookup
* @param valueFetcherSupplier Supplier of a DerivedFieldValueFetcher that will be reconstructed per leaf
* @param searchLookup SearchLookup to get the LeafSearchLookup look used by valueFetcher to fetch the _source
*/
public DerivedFieldQuery(
Query query,
DerivedFieldValueFetcher valueFetcher,
Supplier<DerivedFieldValueFetcher> valueFetcherSupplier,
SearchLookup searchLookup,
Analyzer indexAnalyzer,
Function<Object, IndexableField> indexableFieldGenerator,
boolean ignoreMalformed
) {
this.query = query;
this.valueFetcher = valueFetcher;
this.valueFetcherSupplier = valueFetcherSupplier;
this.searchLookup = searchLookup;
this.indexAnalyzer = indexAnalyzer;
this.indexableFieldGenerator = indexableFieldGenerator;
Expand All @@ -77,7 +77,15 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
if (rewritten == query) {
return this;
}
return new DerivedFieldQuery(rewritten, valueFetcher, searchLookup, indexAnalyzer, indexableFieldGenerator, ignoreMalformed);
;
return new DerivedFieldQuery(
rewritten,
valueFetcherSupplier,
searchLookup,
indexAnalyzer,
indexableFieldGenerator,
ignoreMalformed
);
}

@Override
Expand All @@ -88,6 +96,11 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo
public Scorer scorer(LeafReaderContext context) {
DocIdSetIterator approximation;
approximation = DocIdSetIterator.all(context.reader().maxDoc());

// Create a new ValueFetcher per thread.
// ValueFetcher.setNextReader creates a DerivedFieldScript and internally SourceLookup and these objects are not
// thread safe.
final DerivedFieldValueFetcher valueFetcher = valueFetcherSupplier.get();
valueFetcher.setNextReader(context);
LeafSearchLookup leafSearchLookup = searchLookup.getLeafSearchLookup(context);
TwoPhaseIterator twoPhase = new TwoPhaseIterator(approximation) {
Expand Down
Loading

0 comments on commit b42fa37

Please sign in to comment.