From 20ea733ae76e5004a50faf6ce89c0fb776ebd466 Mon Sep 17 00:00:00 2001 From: Dmitry Kryukov Date: Wed, 16 Oct 2024 22:30:41 +0300 Subject: [PATCH 1/4] Use Atomic vars in multithreaded env. ++num and num++ operations aren't atomic, can't use them with volatile vars Signed-off-by: Dmitry Kryukov --- .../time/NanoTimeVsCurrentTimeMillisBenchmark.java | 5 +++-- .../TransportFieldCapabilitiesIndexAction.java | 9 +++++---- .../action/search/QueryPhaseResultConsumer.java | 11 ++++++----- .../org/opensearch/common/util/PlainIterator.java | 14 ++++++-------- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java index dfa0a8538108e..0057da87d95c6 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java @@ -19,6 +19,7 @@ import org.openjdk.jmh.annotations.Warmup; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; @Fork(3) @Warmup(iterations = 10) @@ -28,7 +29,7 @@ @State(Scope.Benchmark) @SuppressWarnings("unused") // invoked by benchmarking framework public class NanoTimeVsCurrentTimeMillisBenchmark { - private volatile long var = 0; + private final AtomicLong var = new AtomicLong(0); @Benchmark public long currentTimeMillis() { @@ -45,6 +46,6 @@ public long nanoTime() { * */ @Benchmark public long accessLongVar() { - return var++; + return var.getAndIncrement(); } } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 10bf4975311d6..d0abc6f048c1e 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -82,6 +82,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import static org.opensearch.action.support.TransportActions.isShardNotAvailableException; @@ -226,7 +227,7 @@ class AsyncShardsAction { private final ActionListener listener; private final GroupShardsIterator shardsIt; - private volatile int shardIndex = 0; + private final AtomicInteger shardIndex = new AtomicInteger(0); private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener listener) { this.listener = listener; @@ -263,11 +264,11 @@ private void onFailure(ShardRouting shardRouting, Exception e) { } private ShardRouting nextRoutingOrNull(Exception failure) { - if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { + if (shardsIt.size() == 0 || shardIndex.get() >= shardsIt.size()) { return null; } ShardRouting next = FailAwareWeightedRouting.getInstance() - .findNext(shardsIt.get(shardIndex), clusterService.state(), failure, this::moveToNextShard); + .findNext(shardsIt.get(shardIndex.get()), clusterService.state(), failure, this::moveToNextShard); if (next != null) { return next; @@ -277,7 +278,7 @@ private ShardRouting nextRoutingOrNull(Exception failure) { } private void moveToNextShard() { - ++shardIndex; + shardIndex.incrementAndGet(); } private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) { diff --git a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java index f1b06378bd579..ed4529923d404 100644 --- a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java @@ -56,6 +56,7 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -155,7 +156,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { aggsList, topDocsList, topDocsStats, - pendingMerges.numReducePhases, + pendingMerges.numReducePhases.get(), false, aggReduceContextBuilder, performFinalReduce @@ -239,7 +240,7 @@ private MergeResult partialReduce( } public int getNumReducePhases() { - return pendingMerges.numReducePhases; + return pendingMerges.numReducePhases.get(); } /** @@ -264,7 +265,7 @@ private class PendingMerges implements Releasable { private final SearchPhaseController.TopDocsStats topDocsStats; private volatile MergeResult mergeResult; private volatile boolean hasPartialReduce; - private volatile int numReducePhases; + private final AtomicInteger numReducePhases = new AtomicInteger(); PendingMerges(int batchReduceSize, int trackTotalHitsUpTo) { this.batchReduceSize = batchReduceSize; @@ -448,8 +449,8 @@ protected void doRun() { long estimatedMergeSize = estimateRamBytesUsedForReduce(estimatedTotalSize); addEstimateAndMaybeBreak(estimatedMergeSize); estimatedTotalSize += estimatedMergeSize; - ++numReducePhases; - newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases); + numReducePhases.incrementAndGet(); + newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases.get()); } catch (Exception t) { onMergeFailure(t); return; diff --git a/server/src/main/java/org/opensearch/common/util/PlainIterator.java b/server/src/main/java/org/opensearch/common/util/PlainIterator.java index fff8126f13e5d..2e797a7c693cc 100644 --- a/server/src/main/java/org/opensearch/common/util/PlainIterator.java +++ b/server/src/main/java/org/opensearch/common/util/PlainIterator.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * A plain iterator @@ -44,10 +45,7 @@ public class PlainIterator implements Iterable, Countable { private final List elements; - // Calls to nextOrNull might be performed on different threads in the transport actions so we need the volatile - // keyword in order to ensure visibility. Note that it is fine to use `volatile` for a counter in that case given - // that although nextOrNull might be called from different threads, it can never happen concurrently. - private volatile int index; + private AtomicInteger index = new AtomicInteger(); public PlainIterator(List elements) { this.elements = elements; @@ -55,18 +53,18 @@ public PlainIterator(List elements) { } public void reset() { - index = 0; + index = new AtomicInteger(0); } public int remaining() { - return elements.size() - index; + return elements.size() - index.get(); } public T nextOrNull() { - if (index == elements.size()) { + if (index.get() == elements.size()) { return null; } else { - return elements.get(index++); + return elements.get(index.getAndIncrement()); } } From 8b623696f245098f0689aa9466379adceb88ed70 Mon Sep 17 00:00:00 2001 From: Dmitry Kryukov Date: Fri, 18 Oct 2024 10:58:29 +0300 Subject: [PATCH 2/4] Update server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java Co-authored-by: Daniel Widdis Signed-off-by: Dmitry Kryukov --- .../org/opensearch/action/search/QueryPhaseResultConsumer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java index ed4529923d404..c55a6fe42fb63 100644 --- a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java @@ -449,8 +449,7 @@ protected void doRun() { long estimatedMergeSize = estimateRamBytesUsedForReduce(estimatedTotalSize); addEstimateAndMaybeBreak(estimatedMergeSize); estimatedTotalSize += estimatedMergeSize; - numReducePhases.incrementAndGet(); - newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases.get()); + newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases.incrementAndGet()); } catch (Exception t) { onMergeFailure(t); return; From cdbb13fca70a48b8e155842e9665c21200c4d313 Mon Sep 17 00:00:00 2001 From: Dmitry Kryukov Date: Sat, 23 Nov 2024 20:16:11 +0300 Subject: [PATCH 3/4] Reverted changes in 3 files Signed-off-by: Dmitry Kryukov --- .../time/NanoTimeVsCurrentTimeMillisBenchmark.java | 9 ++++----- .../TransportFieldCapabilitiesIndexAction.java | 9 ++++----- .../org/opensearch/common/util/PlainIterator.java | 14 ++++++++------ 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java index 0057da87d95c6..56ccba7f1a689 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java @@ -19,7 +19,6 @@ import org.openjdk.jmh.annotations.Warmup; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; @Fork(3) @Warmup(iterations = 10) @@ -29,7 +28,7 @@ @State(Scope.Benchmark) @SuppressWarnings("unused") // invoked by benchmarking framework public class NanoTimeVsCurrentTimeMillisBenchmark { - private final AtomicLong var = new AtomicLong(0); + private volatile long var = 0; @Benchmark public long currentTimeMillis() { @@ -42,10 +41,10 @@ public long nanoTime() { } /* - * this acts as upper bound of how time is cached in org.opensearch.threadpool.ThreadPool - * */ + * this acts as upper bound of how time is cached in org.opensearch.threadpool.ThreadPool + * */ @Benchmark public long accessLongVar() { - return var.getAndIncrement(); + return var++; } } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index d0abc6f048c1e..10bf4975311d6 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -82,7 +82,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import static org.opensearch.action.support.TransportActions.isShardNotAvailableException; @@ -227,7 +226,7 @@ class AsyncShardsAction { private final ActionListener listener; private final GroupShardsIterator shardsIt; - private final AtomicInteger shardIndex = new AtomicInteger(0); + private volatile int shardIndex = 0; private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener listener) { this.listener = listener; @@ -264,11 +263,11 @@ private void onFailure(ShardRouting shardRouting, Exception e) { } private ShardRouting nextRoutingOrNull(Exception failure) { - if (shardsIt.size() == 0 || shardIndex.get() >= shardsIt.size()) { + if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { return null; } ShardRouting next = FailAwareWeightedRouting.getInstance() - .findNext(shardsIt.get(shardIndex.get()), clusterService.state(), failure, this::moveToNextShard); + .findNext(shardsIt.get(shardIndex), clusterService.state(), failure, this::moveToNextShard); if (next != null) { return next; @@ -278,7 +277,7 @@ private ShardRouting nextRoutingOrNull(Exception failure) { } private void moveToNextShard() { - shardIndex.incrementAndGet(); + ++shardIndex; } private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) { diff --git a/server/src/main/java/org/opensearch/common/util/PlainIterator.java b/server/src/main/java/org/opensearch/common/util/PlainIterator.java index 2e797a7c693cc..fff8126f13e5d 100644 --- a/server/src/main/java/org/opensearch/common/util/PlainIterator.java +++ b/server/src/main/java/org/opensearch/common/util/PlainIterator.java @@ -35,7 +35,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; /** * A plain iterator @@ -45,7 +44,10 @@ public class PlainIterator implements Iterable, Countable { private final List elements; - private AtomicInteger index = new AtomicInteger(); + // Calls to nextOrNull might be performed on different threads in the transport actions so we need the volatile + // keyword in order to ensure visibility. Note that it is fine to use `volatile` for a counter in that case given + // that although nextOrNull might be called from different threads, it can never happen concurrently. + private volatile int index; public PlainIterator(List elements) { this.elements = elements; @@ -53,18 +55,18 @@ public PlainIterator(List elements) { } public void reset() { - index = new AtomicInteger(0); + index = 0; } public int remaining() { - return elements.size() - index.get(); + return elements.size() - index; } public T nextOrNull() { - if (index.get() == elements.size()) { + if (index == elements.size()) { return null; } else { - return elements.get(index.getAndIncrement()); + return elements.get(index++); } } From 3fad0c34bb1c2697402f3fc2f76ba0f8b221fc88 Mon Sep 17 00:00:00 2001 From: Dmitry Kryukov Date: Sat, 23 Nov 2024 20:21:51 +0300 Subject: [PATCH 4/4] Fixed comment Signed-off-by: Dmitry Kryukov --- .../benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java index 56ccba7f1a689..dfa0a8538108e 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/time/NanoTimeVsCurrentTimeMillisBenchmark.java @@ -41,8 +41,8 @@ public long nanoTime() { } /* - * this acts as upper bound of how time is cached in org.opensearch.threadpool.ThreadPool - * */ + * this acts as upper bound of how time is cached in org.opensearch.threadpool.ThreadPool + * */ @Benchmark public long accessLongVar() { return var++;