Skip to content

Commit

Permalink
Fix issues with ReinitializingSourceProvider (#118370) (#118430)
Browse files Browse the repository at this point in the history
The previous fix to ensure that each thread uses its own SearchProvider wasn't good enough.  The read from `perThreadProvider` field could be stale and therefore returning a previous source provider.  Instead the source provider should be returned from `provider` local variable.

This change also addresses another issue, sometimes current docid goes backwards compared to last seen docid and this causes issue when synthetic source provider is used, as doc values can't advance backwards. This change addresses that by returning a new source provider if backwards docid is detected.

Closes #118238
  • Loading branch information
martijnvg authored Dec 11, 2024
1 parent 3bb04d0 commit 2b6a7fe
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/118370.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 118370
summary: Fix concurrency issue with `ReinitializingSourceProvider`
area: Mapping
type: bug
issues:
- 118238
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -1673,17 +1674,32 @@ public void testScriptField() throws Exception {
String sourceMode = randomBoolean() ? "stored" : "synthetic";
Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
for (int i = 0; i < 10; i++) {
int numDocs = 256;
for (int i = 0; i < numDocs; i++) {
index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
}
refresh("test-script");
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) {

var pragmas = randomPragmas();
if (canUseQueryPragmas()) {
Settings.Builder pragmaSettings = Settings.builder().put(pragmas.getSettings());
pragmaSettings.put("task_concurrency", 10);
pragmaSettings.put("data_partitioning", "doc");
pragmas = new QueryPragmas(pragmaSettings.build());
}
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) {
List<Object> k1Column = Iterators.toList(resp.column(0));
assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList()));
List<Object> k2Column = Iterators.toList(resp.column(1));
assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
assertThat(k2Column, equalTo(Collections.nCopies(numDocs, null)));
List<Object> meterColumn = Iterators.toList(resp.column(2));
assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
var expectedMeterColumn = new ArrayList<>(numDocs);
double val = 0.0;
for (int i = 0; i < numDocs; i++) {
expectedMeterColumn.add(val);
val += 10000.0;
}
assertThat(meterColumn, equalTo(expectedMeterColumn));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@
import java.util.function.Supplier;

/**
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
* This class exists as a workaround for using SourceProvider in the compute engine.
* <p>
* The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization).
* A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses
* its own {@link SourceProvider}.
* <p>
* Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle.
*/
final class ReinitializingSourceProvider implements SourceProvider {

private PerThreadSourceProvider perThreadProvider;
private final Supplier<SourceProvider> sourceProviderFactory;

// Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized:
// (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards)
private int lastSeenDocId;

ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
}
Expand All @@ -30,11 +40,12 @@ final class ReinitializingSourceProvider implements SourceProvider {
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
PerThreadSourceProvider provider = perThreadProvider;
if (provider == null || provider.creatingThread != currentThread) {
if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) {
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
this.perThreadProvider = provider;
}
return perThreadProvider.source.getSource(ctx, doc);
lastSeenDocId = doc;
return provider.source.getSource(ctx, doc);
}

private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {
Expand Down

0 comments on commit 2b6a7fe

Please sign in to comment.