diff --git a/docs/changelog/118370.yaml b/docs/changelog/118370.yaml new file mode 100644 index 0000000000000..e6a429448e493 --- /dev/null +++ b/docs/changelog/118370.yaml @@ -0,0 +1,6 @@ +pr: 118370 +summary: Fix concurrency issue with `ReinitializingSourceProvider` +area: Mapping +type: bug +issues: + - 118238 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 00f53d31165b1..de9eb166688f9 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.parser.ParsingException; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.junit.Before; import java.io.IOException; @@ -1673,17 +1674,32 @@ public void testScriptField() throws Exception { String sourceMode = randomBoolean() ? "stored" : "synthetic"; Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode); client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get(); - for (int i = 0; i < 10; i++) { + int numDocs = 256; + for (int i = 0; i < numDocs; i++) { index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i)); } refresh("test-script"); - try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) { + + var pragmas = randomPragmas(); + if (canUseQueryPragmas()) { + Settings.Builder pragmaSettings = Settings.builder().put(pragmas.getSettings()); + pragmaSettings.put("task_concurrency", 10); + pragmaSettings.put("data_partitioning", "doc"); + pragmas = new QueryPragmas(pragmaSettings.build()); + } + try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) { List k1Column = Iterators.toList(resp.column(0)); - assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)); + assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList())); List k2Column = Iterators.toList(resp.column(1)); - assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null)); + assertThat(k2Column, equalTo(Collections.nCopies(numDocs, null))); List meterColumn = Iterators.toList(resp.column(2)); - assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0)); + var expectedMeterColumn = new ArrayList<>(numDocs); + double val = 0.0; + for (int i = 0; i < numDocs; i++) { + expectedMeterColumn.add(val); + val += 10000.0; + } + assertThat(meterColumn, equalTo(expectedMeterColumn)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java index b6b2c6dfec755..8dee3478b3b64 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java @@ -15,13 +15,23 @@ import java.util.function.Supplier; /** - * This is a workaround for when compute engine executes concurrently with data partitioning by docid. + * This class exists as a workaround for using SourceProvider in the compute engine. + *

+ * The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization). + * A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses + * its own {@link SourceProvider}. + *

+ * Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle. */ final class ReinitializingSourceProvider implements SourceProvider { private PerThreadSourceProvider perThreadProvider; private final Supplier sourceProviderFactory; + // Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized: + // (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards) + private int lastSeenDocId; + ReinitializingSourceProvider(Supplier sourceProviderFactory) { this.sourceProviderFactory = sourceProviderFactory; } @@ -30,11 +40,12 @@ final class ReinitializingSourceProvider implements SourceProvider { public Source getSource(LeafReaderContext ctx, int doc) throws IOException { var currentThread = Thread.currentThread(); PerThreadSourceProvider provider = perThreadProvider; - if (provider == null || provider.creatingThread != currentThread) { + if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) { provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread); this.perThreadProvider = provider; } - return perThreadProvider.source.getSource(ctx, doc); + lastSeenDocId = doc; + return provider.source.getSource(ctx, doc); } private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {