From ae1b419d56fcd3d5e6248c1d9983c71a4280a0ed Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 10 Oct 2023 10:00:57 -0700 Subject: [PATCH] Use separate search execution context for each pipeline (#100563) (#100606) Synthetic source doesn't seem to work correctly with either inter-segment or intra-segment parallelism. Neither of these parallelisms were available when the synthetic source was developed. The new test fails with the doc or segment data_partitioning. While we are working on a proper fix, this PR introduces a workaround by creating a separate search execution context for each execution pipeline, restoring the sequential execution invariants. I believe that the overhead added by this workaround is minimal. --- x-pack/plugin/esql/build.gradle | 1 + .../compute/lucene/ValueSources.java | 16 ++++- .../lucene/ValuesSourceReaderOperator.java | 5 +- .../operator/OrdinalsGroupingOperator.java | 13 +++- .../ValuesSourceReaderOperatorTests.java | 2 +- .../xpack/esql/action/SyntheticSourceIT.java | 69 +++++++++++++++++++ .../planner/EsPhysicalOperationProviders.java | 20 +++--- 7 files changed, 111 insertions(+), 15 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SyntheticSourceIT.java diff --git a/x-pack/plugin/esql/build.gradle b/x-pack/plugin/esql/build.gradle index a21c3d0990333..9643e2b2d8e1e 100644 --- a/x-pack/plugin/esql/build.gradle +++ b/x-pack/plugin/esql/build.gradle @@ -34,6 +34,7 @@ dependencies { testImplementation('org.webjars.npm:fontsource__roboto-mono:4.5.7') internalClusterTestImplementation project(":client:rest-high-level") + internalClusterTestImplementation project(":modules:mapper-extras") } /* diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValueSources.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValueSources.java index b7eb47a7a52d3..e5ce5436990b7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValueSources.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValueSources.java @@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.support.FieldContext; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchRequest; import java.io.IOException; import java.util.ArrayList; @@ -49,7 +50,20 @@ public static List sources( List sources = new ArrayList<>(searchContexts.size()); for (SearchContext searchContext : searchContexts) { - SearchExecutionContext ctx = searchContext.getSearchExecutionContext(); + // TODO: remove this workaround + // Create a separate SearchExecutionContext for each ValuesReader, as it seems that + // the synthetic source doesn't work properly with inter-segment or intra-segment parallelism. + ShardSearchRequest shardRequest = searchContext.request(); + SearchExecutionContext ctx = searchContext.readerContext() + .indexService() + .newSearchExecutionContext( + shardRequest.shardId().id(), + shardRequest.shardRequestIndex(), + searchContext.searcher(), + shardRequest::nowInMillis, + shardRequest.getClusterAlias(), + shardRequest.getRuntimeMappings() + ); var fieldType = ctx.getFieldType(fieldName); if (fieldType == null) { sources.add(new ValueSourceInfo(new NullValueSourceType(), new NullValueSource(), elementType, ctx.getIndexReader())); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index b3ac80ee099b7..83fc902bd5077 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.function.Supplier; /** * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} @@ -42,12 +43,12 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator { * @param docChannel the channel containing the shard, leaf/segment and doc id * @param field the lucene field being loaded */ - public record ValuesSourceReaderOperatorFactory(List sources, int docChannel, String field) + public record ValuesSourceReaderOperatorFactory(Supplier> sources, int docChannel, String field) implements OperatorFactory { @Override public Operator get(DriverContext driverContext) { - return new ValuesSourceReaderOperator(sources, docChannel, field); + return new ValuesSourceReaderOperator(sources.get(), docChannel, field); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 1c068815f1aae..4dab7faa2a074 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; @@ -51,7 +52,7 @@ */ public class OrdinalsGroupingOperator implements Operator { public record OrdinalsGroupingOperatorFactory( - List sources, + Supplier> sources, int docChannel, String groupingField, List aggregators, @@ -61,7 +62,15 @@ public record OrdinalsGroupingOperatorFactory( @Override public Operator get(DriverContext driverContext) { - return new OrdinalsGroupingOperator(sources, docChannel, groupingField, aggregators, maxPageSize, bigArrays, driverContext); + return new OrdinalsGroupingOperator( + sources.get(), + docChannel, + groupingField, + aggregators, + maxPageSize, + bigArrays, + driverContext + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index 3ce202c0e4608..ec1697e9aedd2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -109,7 +109,7 @@ static Operator.OperatorFactory factory(IndexReader reader, ValuesSourceType vsT FieldContext fc = new FieldContext(ft.name(), fd, ft); ValuesSource vs = vsType.getField(fc, null); return new ValuesSourceReaderOperator.ValuesSourceReaderOperatorFactory( - List.of(new ValueSourceInfo(vsType, vs, elementType, reader)), + () -> List.of(new ValueSourceInfo(vsType, vs, elementType, reader)), 0, ft.name() ); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SyntheticSourceIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SyntheticSourceIT.java new file mode 100644 index 0000000000000..f0365ce78f44a --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SyntheticSourceIT.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class SyntheticSourceIT extends AbstractEsqlIntegTestCase { + @Override + protected Collection> nodePlugins() { + var plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(MapperExtrasPlugin.class); + return plugins; + } + + public void testMatchOnlyText() throws Exception { + XContentBuilder mapping = JsonXContent.contentBuilder(); + mapping.startObject(); + if (true || randomBoolean()) { + mapping.startObject("_source"); + mapping.field("mode", "synthetic"); + mapping.endObject(); + } + { + mapping.startObject("properties"); + mapping.startObject("uid"); + mapping.field("type", "keyword"); + mapping.endObject(); + mapping.startObject("name"); + mapping.field("type", "match_only_text"); + mapping.endObject(); + mapping.endObject(); + } + mapping.endObject(); + + assertAcked(client().admin().indices().prepareCreate("test").setMapping(mapping)); + + int numDocs = between(10, 1000); + for (int i = 0; i < numDocs; i++) { + IndexRequestBuilder indexRequest = client().prepareIndex("test").setSource("uid", "u" + i); + if (randomInt(100) < 5) { + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + indexRequest.get(); + } + client().admin().indices().prepareRefresh("test").get(); + try (EsqlQueryResponse resp = run("from test | keep uid, name | sort uid asc | limit 1")) { + Iterator row = resp.values().next(); + assertThat(row.next(), equalTo("u0")); + assertNull(row.next()); + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index ce5e277deaad8..3131b8c8c1e20 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -15,6 +15,7 @@ import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; +import org.elasticsearch.compute.lucene.ValueSourceInfo; import org.elasticsearch.compute.lucene.ValueSources; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Operator; @@ -39,10 +40,12 @@ import org.elasticsearch.xpack.esql.type.EsqlDataTypes; import org.elasticsearch.xpack.ql.expression.Attribute; import org.elasticsearch.xpack.ql.expression.FieldAttribute; +import org.elasticsearch.xpack.ql.type.DataType; import java.util.ArrayList; import java.util.List; import java.util.function.Function; +import java.util.function.Supplier; import static org.elasticsearch.common.lucene.search.Queries.newNonNestedFilter; import static org.elasticsearch.compute.lucene.LuceneSourceOperator.NO_LIMIT; @@ -74,19 +77,18 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi layout.append(attr); Layout previousLayout = op.layout; - var sources = ValueSources.sources( + DataType dataType = attr.dataType(); + String fieldName = attr.name(); + Supplier> sources = () -> ValueSources.sources( searchContexts, - attr.name(), - EsqlDataTypes.isUnsupported(attr.dataType()), - LocalExecutionPlanner.toElementType(attr.dataType()) + fieldName, + EsqlDataTypes.isUnsupported(dataType), + LocalExecutionPlanner.toElementType(dataType) ); int docChannel = previousLayout.get(sourceAttr.id()).channel(); - op = op.with( - new ValuesSourceReaderOperator.ValuesSourceReaderOperatorFactory(sources, docChannel, attr.name()), - layout.build() - ); + op = op.with(new ValuesSourceReaderOperator.ValuesSourceReaderOperatorFactory(sources, docChannel, fieldName), layout.build()); } return op; } @@ -173,7 +175,7 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory( // The grouping-by values are ready, let's group on them directly. // Costin: why are they ready and not already exposed in the layout? return new OrdinalsGroupingOperator.OrdinalsGroupingOperatorFactory( - ValueSources.sources( + () -> ValueSources.sources( searchContexts, attrSource.name(), EsqlDataTypes.isUnsupported(attrSource.dataType()),