diff --git a/modules/lang-expression/src/internalClusterTest/java/org/opensearch/script/expression/MoreExpressionIT.java b/modules/lang-expression/src/internalClusterTest/java/org/opensearch/script/expression/MoreExpressionIT.java index bb2f652168d5c..8ca28a905f216 100644 --- a/modules/lang-expression/src/internalClusterTest/java/org/opensearch/script/expression/MoreExpressionIT.java +++ b/modules/lang-expression/src/internalClusterTest/java/org/opensearch/script/expression/MoreExpressionIT.java @@ -504,10 +504,6 @@ public void testInvalidFieldMember() { } public void testSpecialValueVariable() throws Exception { - assumeFalse( - "Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/10079", - internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) - ); // i.e. _value for aggregations createIndex("test"); ensureGreen("test"); diff --git a/modules/lang-expression/src/main/java/org/opensearch/script/expression/ReplaceableConstDoubleValueSource.java b/modules/lang-expression/src/main/java/org/opensearch/script/expression/ReplaceableConstDoubleValueSource.java index 28e4707a07192..8c55658dfe08a 100644 --- a/modules/lang-expression/src/main/java/org/opensearch/script/expression/ReplaceableConstDoubleValueSource.java +++ b/modules/lang-expression/src/main/java/org/opensearch/script/expression/ReplaceableConstDoubleValueSource.java @@ -39,20 +39,25 @@ import org.apache.lucene.search.IndexSearcher; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** - * A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double. + * A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double. This is made + * thread-safe for concurrent segment search use case by keeping the {@link DoubleValues} per thread. Any update to the value happens in + * thread specific {@link DoubleValuesSource} instance. */ final class ReplaceableConstDoubleValueSource extends DoubleValuesSource { - final ReplaceableConstDoubleValues fv; + // Multiple slices can be processed by same thread but that will be sequential, so keeping per thread is fine + final Map perThreadDoubleValues; ReplaceableConstDoubleValueSource() { - fv = new ReplaceableConstDoubleValues(); + perThreadDoubleValues = new ConcurrentHashMap<>(); } @Override public DoubleValues getValues(LeafReaderContext ctx, DoubleValues scores) throws IOException { - return fv; + return perThreadDoubleValues.computeIfAbsent(Thread.currentThread().getId(), threadId -> new ReplaceableConstDoubleValues()); } @Override @@ -62,7 +67,9 @@ public boolean needsScores() { @Override public Explanation explain(LeafReaderContext ctx, int docId, Explanation scoreExplanation) throws IOException { - if (fv.advanceExact(docId)) return Explanation.match((float) fv.doubleValue(), "ReplaceableConstDoubleValues"); + final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent(Thread.currentThread().getId(), threadId -> new ReplaceableConstDoubleValues()); + if (currentFv.advanceExact(docId)) return Explanation.match((float) currentFv.doubleValue(), + "ReplaceableConstDoubleValues"); else return Explanation.noMatch("ReplaceableConstDoubleValues"); } @@ -77,7 +84,8 @@ public int hashCode() { } public void setValue(double v) { - fv.setValue(v); + final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent(Thread.currentThread().getId(), threadId -> new ReplaceableConstDoubleValues()); + currentFv.setValue(v); } @Override