From 2dc28ac2f1e0a255d88cf68695bb19532def1a6c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 15 Jul 2024 09:34:52 +0200 Subject: [PATCH] Use new runInParallel and startInParallel test util in more spots (#110853) It's in the title. Use the utility in a couple more spots to save some code and thread creation. --- .../versioning/SimpleVersioningIT.java | 136 +++++----- .../search/SearchPhaseControllerTests.java | 177 ++++++------ .../action/shard/ShardStateActionTests.java | 61 ++--- .../allocator/ContinuousComputationTests.java | 28 +- .../common/cache/CacheTests.java | 213 +++++---------- .../common/compress/DeflateCompressTests.java | 252 ++++++------------ .../common/util/concurrent/RunOnceTests.java | 22 +- .../env/NodeEnvironmentTests.java | 51 +--- .../index/engine/InternalEngineTests.java | 132 ++++----- .../seqno/LocalCheckpointTrackerTests.java | 45 +--- ...ReplicationTrackerRetentionLeaseTests.java | 38 +-- .../HierarchyCircuitBreakerServiceTests.java | 101 +++---- 12 files changed, 445 insertions(+), 811 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/versioning/SimpleVersioningIT.java b/server/src/internalClusterTest/java/org/elasticsearch/versioning/SimpleVersioningIT.java index 41fc3d2b759ff..339187f56d8c3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/versioning/SimpleVersioningIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/versioning/SimpleVersioningIT.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -585,85 +584,70 @@ public void testRandomIDsAndVersions() throws Exception { } final AtomicInteger upto = new AtomicInteger(); - final CountDownLatch startingGun = new CountDownLatch(1); - Thread[] threads = new Thread[TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 20 : 5)]; final long startTime = System.nanoTime(); - for (int i = 0; i < threads.length; i++) { - final int threadID = i; - threads[i] = new Thread() { - @Override - public void run() { - try { - // final Random threadRandom = RandomizedContext.current().getRandom(); - final Random threadRandom = random(); - startingGun.await(); - while (true) { - - // TODO: sometimes use bulk: - - int index = upto.getAndIncrement(); - if (index >= idVersions.length) { - break; - } - if (index % 100 == 0) { - logger.trace("{}: index={}", Thread.currentThread().getName(), index); - } - IDAndVersion idVersion = idVersions[index]; - - String id = idVersion.id; - idVersion.threadID = threadID; - idVersion.indexStartTime = System.nanoTime() - startTime; - long version = idVersion.version; - if (idVersion.delete) { - try { - idVersion.response = client().prepareDelete("test", id) - .setVersion(version) - .setVersionType(VersionType.EXTERNAL) - .get(); - } catch (VersionConflictEngineException vcee) { - // OK: our version is too old - assertThat(version, lessThanOrEqualTo(truth.get(id).version)); - idVersion.versionConflict = true; - } - } else { - try { - idVersion.response = prepareIndex("test").setId(id) - .setSource("foo", "bar") - .setVersion(version) - .setVersionType(VersionType.EXTERNAL) - .get(); - - } catch (VersionConflictEngineException vcee) { - // OK: our version is too old - assertThat(version, lessThanOrEqualTo(truth.get(id).version)); - idVersion.versionConflict = true; - } - } - idVersion.indexFinishTime = System.nanoTime() - startTime; - - if (threadRandom.nextInt(100) == 7) { - logger.trace("--> {}: TEST: now refresh at {}", threadID, System.nanoTime() - startTime); - refresh(); - logger.trace("--> {}: TEST: refresh done at {}", threadID, System.nanoTime() - startTime); - } - if (threadRandom.nextInt(100) == 7) { - logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime); - flush(); - logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime); - } + startInParallel(TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 20 : 5), threadID -> { + try { + // final Random threadRandom = RandomizedContext.current().getRandom(); + final Random threadRandom = random(); + while (true) { + + // TODO: sometimes use bulk: + + int index = upto.getAndIncrement(); + if (index >= idVersions.length) { + break; + } + if (index % 100 == 0) { + logger.trace("{}: index={}", Thread.currentThread().getName(), index); + } + IDAndVersion idVersion = idVersions[index]; + + String id = idVersion.id; + idVersion.threadID = threadID; + idVersion.indexStartTime = System.nanoTime() - startTime; + long v = idVersion.version; + if (idVersion.delete) { + try { + idVersion.response = client().prepareDelete("test", id) + .setVersion(v) + .setVersionType(VersionType.EXTERNAL) + .get(); + } catch (VersionConflictEngineException vcee) { + // OK: our version is too old + assertThat(v, lessThanOrEqualTo(truth.get(id).version)); + idVersion.versionConflict = true; + } + } else { + try { + idVersion.response = prepareIndex("test").setId(id) + .setSource("foo", "bar") + .setVersion(v) + .setVersionType(VersionType.EXTERNAL) + .get(); + + } catch (VersionConflictEngineException vcee) { + // OK: our version is too old + assertThat(v, lessThanOrEqualTo(truth.get(id).version)); + idVersion.versionConflict = true; } - } catch (Exception e) { - throw new RuntimeException(e); } - } - }; - threads[i].start(); - } + idVersion.indexFinishTime = System.nanoTime() - startTime; - startingGun.countDown(); - for (Thread thread : threads) { - thread.join(); - } + if (threadRandom.nextInt(100) == 7) { + logger.trace("--> {}: TEST: now refresh at {}", threadID, System.nanoTime() - startTime); + refresh(); + logger.trace("--> {}: TEST: refresh done at {}", threadID, System.nanoTime() - startTime); + } + if (threadRandom.nextInt(100) == 7) { + logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime); + flush(); + logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); // Verify against truth: boolean failed = false; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 118a7055cd782..ed02328d388b6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -748,43 +748,34 @@ public void testConsumerConcurrently() throws Exception { ) ) { AtomicInteger max = new AtomicInteger(); - Thread[] threads = new Thread[expectedNumResults]; CountDownLatch latch = new CountDownLatch(expectedNumResults); - for (int i = 0; i < expectedNumResults; i++) { - int id = i; - threads[i] = new Thread(() -> { - int number = randomIntBetween(1, 1000); - max.updateAndGet(prev -> Math.max(prev, number)); - QuerySearchResult result = new QuerySearchResult( - new ShardSearchContextId("", id), - new SearchShardTarget("node", new ShardId("a", "b", id), null), - null + runInParallel(expectedNumResults, id -> { + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult( + new ShardSearchContextId("", id), + new SearchShardTarget("node", new ShardId("a", "b", id), null), + null + ); + try { + result.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), + number + ), + new DocValueFormat[0] ); - try { - result.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), - number - ), - new DocValueFormat[0] - ); - InternalAggregations aggs = InternalAggregations.from( - Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) - ); - result.aggregations(aggs); - result.setShardIndex(id); - result.size(1); - consumer.consumeResult(result, latch::countDown); - } finally { - result.decRef(); - } - - }); - threads[i].start(); - } - for (int i = 0; i < expectedNumResults; i++) { - threads[i].join(); - } + InternalAggregations aggs = InternalAggregations.from( + Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) + ); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } + }); latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); @@ -1264,42 +1255,34 @@ public void onFinalReduce(List shards, TotalHits totalHits, Interna ) ) { AtomicInteger max = new AtomicInteger(); - Thread[] threads = new Thread[expectedNumResults]; CountDownLatch latch = new CountDownLatch(expectedNumResults); - for (int i = 0; i < expectedNumResults; i++) { - int id = i; - threads[i] = new Thread(() -> { - int number = randomIntBetween(1, 1000); - max.updateAndGet(prev -> Math.max(prev, number)); - QuerySearchResult result = new QuerySearchResult( - new ShardSearchContextId("", id), - new SearchShardTarget("node", new ShardId("a", "b", id), null), - null + runInParallel(expectedNumResults, id -> { + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult( + new ShardSearchContextId("", id), + new SearchShardTarget("node", new ShardId("a", "b", id), null), + null + ); + try { + result.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), + number + ), + new DocValueFormat[0] ); - try { - result.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), - number - ), - new DocValueFormat[0] - ); - InternalAggregations aggs = InternalAggregations.from( - Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) - ); - result.aggregations(aggs); - result.setShardIndex(id); - result.size(1); - consumer.consumeResult(result, latch::countDown); - } finally { - result.decRef(); - } - }); - threads[i].start(); - } - for (int i = 0; i < expectedNumResults; i++) { - threads[i].join(); - } + InternalAggregations aggs = InternalAggregations.from( + Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) + ); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } + }); latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); assertAggReduction(request); @@ -1354,39 +1337,31 @@ private void testReduceCase(int numShards, int bufferSize, boolean shouldFail) t ) ) { CountDownLatch latch = new CountDownLatch(numShards); - Thread[] threads = new Thread[numShards]; - for (int i = 0; i < numShards; i++) { - final int index = i; - threads[index] = new Thread(() -> { - QuerySearchResult result = new QuerySearchResult( - new ShardSearchContextId(UUIDs.randomBase64UUID(), index), - new SearchShardTarget("node", new ShardId("a", "b", index), null), - null + runInParallel(numShards, index -> { + QuerySearchResult result = new QuerySearchResult( + new ShardSearchContextId(UUIDs.randomBase64UUID(), index), + new SearchShardTarget("node", new ShardId("a", "b", index), null), + null + ); + try { + result.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), + Float.NaN + ), + new DocValueFormat[0] ); - try { - result.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), - Float.NaN - ), - new DocValueFormat[0] - ); - InternalAggregations aggs = InternalAggregations.from( - Collections.singletonList(new Max("test", 0d, DocValueFormat.RAW, Collections.emptyMap())) - ); - result.aggregations(aggs); - result.setShardIndex(index); - result.size(1); - consumer.consumeResult(result, latch::countDown); - } finally { - result.decRef(); - } - }); - threads[index].start(); - } - for (int i = 0; i < numShards; i++) { - threads[i].join(); - } + InternalAggregations aggs = InternalAggregations.from( + Collections.singletonList(new Max("test", 0d, DocValueFormat.RAW, Collections.emptyMap())) + ); + result.aggregations(aggs); + result.setShardIndex(index); + result.size(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } + }); latch.await(); if (shouldFail) { if (shouldFailPartial == false) { diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index cada467ea3ad6..ef8742e4a4a35 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -430,9 +430,9 @@ public void testRemoteShardFailedConcurrently() throws Exception { for (int i = 0; i < failedShards.length; i++) { failedShards[i] = getRandomShardRouting(index); } - Thread[] clientThreads = new Thread[between(1, 6)]; + final int clientThreads = between(1, 6); int iterationsPerThread = scaledRandomIntBetween(50, 500); - Phaser barrier = new Phaser(clientThreads.length + 2); // one for master thread, one for the main thread + Phaser barrier = new Phaser(clientThreads + 1); // +1 for the master thread Thread masterThread = new Thread(() -> { barrier.arriveAndAwaitAdvance(); while (shutdown.get() == false) { @@ -448,39 +448,32 @@ public void testRemoteShardFailedConcurrently() throws Exception { masterThread.start(); AtomicInteger notifiedResponses = new AtomicInteger(); - for (int t = 0; t < clientThreads.length; t++) { - clientThreads[t] = new Thread(() -> { - barrier.arriveAndAwaitAdvance(); - for (int i = 0; i < iterationsPerThread; i++) { - ShardRouting failedShard = randomFrom(failedShards); - shardStateAction.remoteShardFailed( - failedShard.shardId(), - failedShard.allocationId().getId(), - randomLongBetween(1, Long.MAX_VALUE), - randomBoolean(), - "test", - getSimulatedFailure(), - new ActionListener() { - @Override - public void onResponse(Void aVoid) { - notifiedResponses.incrementAndGet(); - } - - @Override - public void onFailure(Exception e) { - notifiedResponses.incrementAndGet(); - } + runInParallel(clientThreads, t -> { + barrier.arriveAndAwaitAdvance(); + for (int i = 0; i < iterationsPerThread; i++) { + ShardRouting failedShard = randomFrom(failedShards); + shardStateAction.remoteShardFailed( + failedShard.shardId(), + failedShard.allocationId().getId(), + randomLongBetween(1, Long.MAX_VALUE), + randomBoolean(), + "test", + getSimulatedFailure(), + new ActionListener<>() { + @Override + public void onResponse(Void aVoid) { + notifiedResponses.incrementAndGet(); } - ); - } - }); - clientThreads[t].start(); - } - barrier.arriveAndAwaitAdvance(); - for (Thread t : clientThreads) { - t.join(); - } - assertBusy(() -> assertThat(notifiedResponses.get(), equalTo(clientThreads.length * iterationsPerThread))); + + @Override + public void onFailure(Exception e) { + notifiedResponses.incrementAndGet(); + } + } + ); + } + }); + assertBusy(() -> assertThat(notifiedResponses.get(), equalTo(clientThreads * iterationsPerThread))); shutdown.set(true); masterThread.join(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputationTests.java index c644c0a1d1225..ca4aa5c6ae442 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputationTests.java @@ -17,7 +17,6 @@ import org.junit.BeforeClass; import java.util.Arrays; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -60,26 +59,13 @@ protected void processInput(Integer input) { } }; - final Thread[] threads = new Thread[between(1, 5)]; - final int[] valuePerThread = new int[threads.length]; - final CountDownLatch startLatch = new CountDownLatch(1); - for (int i = 0; i < threads.length; i++) { - final int threadIndex = i; - valuePerThread[threadIndex] = randomInt(); - threads[threadIndex] = new Thread(() -> { - safeAwait(startLatch); - for (int j = 1000; j >= 0; j--) { - computation.onNewInput(valuePerThread[threadIndex] = valuePerThread[threadIndex] + j); - } - }, "submit-thread-" + threadIndex); - threads[threadIndex].start(); - } - - startLatch.countDown(); - - for (Thread thread : threads) { - thread.join(); - } + final int threads = between(1, 5); + final int[] valuePerThread = new int[threads]; + startInParallel(threads, threadIndex -> { + for (int j = 1000; j >= 0; j--) { + computation.onNewInput(valuePerThread[threadIndex] = valuePerThread[threadIndex] + j); + } + }); assertBusy(() -> assertFalse(computation.isActive())); diff --git a/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java index 0fb40583a4d79..fe302d2e5fec1 100644 --- a/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/server/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -326,32 +326,19 @@ protected long now() { assertEquals(numberOfEntries, cache.stats().getEvictions()); } - public void testComputeIfAbsentDeadlock() { - final int numberOfThreads = randomIntBetween(2, 32); + public void testComputeIfAbsentDeadlock() throws InterruptedException { final Cache cache = CacheBuilder.builder() .setExpireAfterAccess(TimeValue.timeValueNanos(1)) .build(); - - final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - for (int i = 0; i < numberOfThreads; i++) { - final Thread thread = new Thread(() -> { - safeAwait(barrier); - for (int j = 0; j < numberOfEntries; j++) { - try { - cache.computeIfAbsent(0, k -> Integer.toString(k)); - } catch (final ExecutionException e) { - throw new AssertionError(e); - } + startInParallel(randomIntBetween(2, 32), i -> { + for (int j = 0; j < numberOfEntries; j++) { + try { + cache.computeIfAbsent(0, k -> Integer.toString(k)); + } catch (final ExecutionException e) { + throw new AssertionError(e); } - safeAwait(barrier); - }); - thread.start(); - } - - // wait for all threads to be ready - safeAwait(barrier); - // wait for all threads to finish - safeAwait(barrier); + } + }); } // randomly promote some entries, step the clock forward, then check that the promoted entries remain and the @@ -596,41 +583,26 @@ public void testComputeIfAbsentLoadsSuccessfully() { } } - public void testComputeIfAbsentCallsOnce() { - int numberOfThreads = randomIntBetween(2, 32); + public void testComputeIfAbsentCallsOnce() throws InterruptedException { final Cache cache = CacheBuilder.builder().build(); AtomicReferenceArray flags = new AtomicReferenceArray<>(numberOfEntries); for (int j = 0; j < numberOfEntries; j++) { flags.set(j, false); } - CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); - - CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - for (int i = 0; i < numberOfThreads; i++) { - Thread thread = new Thread(() -> { - safeAwait(barrier); - for (int j = 0; j < numberOfEntries; j++) { - try { - cache.computeIfAbsent(j, key -> { - assertTrue(flags.compareAndSet(key, false, true)); - return Integer.toString(key); - }); - } catch (ExecutionException e) { - failures.add(e); - break; - } + startInParallel(randomIntBetween(2, 32), i -> { + for (int j = 0; j < numberOfEntries; j++) { + try { + cache.computeIfAbsent(j, key -> { + assertTrue(flags.compareAndSet(key, false, true)); + return Integer.toString(key); + }); + } catch (ExecutionException e) { + failures.add(e); + break; } - safeAwait(barrier); - }); - thread.start(); - } - - // wait for all threads to be ready - safeAwait(barrier); - // wait for all threads to finish - safeAwait(barrier); - + } + }); assertThat(failures, is(empty())); } @@ -751,111 +723,70 @@ public int hashCode() { assertFalse("deadlock", deadlock.get()); } - public void testCachePollution() { + public void testCachePollution() throws InterruptedException { int numberOfThreads = randomIntBetween(2, 32); final Cache cache = CacheBuilder.builder().build(); - - CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - - for (int i = 0; i < numberOfThreads; i++) { - Thread thread = new Thread(() -> { - safeAwait(barrier); - Random random = new Random(random().nextLong()); - for (int j = 0; j < numberOfEntries; j++) { - Integer key = random.nextInt(numberOfEntries); - boolean first; - boolean second; - do { - first = random.nextBoolean(); - second = random.nextBoolean(); - } while (first && second); - if (first) { - try { - cache.computeIfAbsent(key, k -> { - if (random.nextBoolean()) { - return Integer.toString(k); - } else { - throw new Exception("testCachePollution"); - } - }); - } catch (ExecutionException e) { - assertNotNull(e.getCause()); - assertThat(e.getCause(), instanceOf(Exception.class)); - assertEquals(e.getCause().getMessage(), "testCachePollution"); - } - } else if (second) { - cache.invalidate(key); - } else { - cache.get(key); + startInParallel(numberOfThreads, i -> { + Random random = new Random(random().nextLong()); + for (int j = 0; j < numberOfEntries; j++) { + Integer key = random.nextInt(numberOfEntries); + boolean first; + boolean second; + do { + first = random.nextBoolean(); + second = random.nextBoolean(); + } while (first && second); + if (first) { + try { + cache.computeIfAbsent(key, k -> { + if (random.nextBoolean()) { + return Integer.toString(k); + } else { + throw new Exception("testCachePollution"); + } + }); + } catch (ExecutionException e) { + assertNotNull(e.getCause()); + assertThat(e.getCause(), instanceOf(Exception.class)); + assertEquals(e.getCause().getMessage(), "testCachePollution"); } + } else if (second) { + cache.invalidate(key); + } else { + cache.get(key); } - safeAwait(barrier); - }); - thread.start(); - } - - // wait for all threads to be ready - safeAwait(barrier); - // wait for all threads to finish - safeAwait(barrier); + } + }); } - public void testExceptionThrownDuringConcurrentComputeIfAbsent() { - int numberOfThreads = randomIntBetween(2, 32); + public void testExceptionThrownDuringConcurrentComputeIfAbsent() throws InterruptedException { final Cache cache = CacheBuilder.builder().build(); - - CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - final String key = randomAlphaOfLengthBetween(2, 32); - for (int i = 0; i < numberOfThreads; i++) { - Thread thread = new Thread(() -> { - safeAwait(barrier); - for (int j = 0; j < numberOfEntries; j++) { - try { - String value = cache.computeIfAbsent(key, k -> { throw new RuntimeException("failed to load"); }); - fail("expected exception but got: " + value); - } catch (ExecutionException e) { - assertNotNull(e.getCause()); - assertThat(e.getCause(), instanceOf(RuntimeException.class)); - assertEquals(e.getCause().getMessage(), "failed to load"); - } + startInParallel(randomIntBetween(2, 32), i -> { + for (int j = 0; j < numberOfEntries; j++) { + try { + String value = cache.computeIfAbsent(key, k -> { throw new RuntimeException("failed to load"); }); + fail("expected exception but got: " + value); + } catch (ExecutionException e) { + assertNotNull(e.getCause()); + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertEquals(e.getCause().getMessage(), "failed to load"); } - safeAwait(barrier); - }); - thread.start(); - } - - // wait for all threads to be ready - safeAwait(barrier); - // wait for all threads to finish - safeAwait(barrier); + } + }); } // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key // here be dragons: this test did catch one subtle bug during development; do not remove lightly - public void testTorture() { - int numberOfThreads = randomIntBetween(2, 32); + public void testTorture() throws InterruptedException { final Cache cache = CacheBuilder.builder().setMaximumWeight(1000).weigher((k, v) -> 2).build(); - - CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - for (int i = 0; i < numberOfThreads; i++) { - Thread thread = new Thread(() -> { - safeAwait(barrier); - Random random = new Random(random().nextLong()); - for (int j = 0; j < numberOfEntries; j++) { - Integer key = random.nextInt(numberOfEntries); - cache.put(key, Integer.toString(j)); - } - safeAwait(barrier); - }); - thread.start(); - } - - // wait for all threads to be ready - safeAwait(barrier); - // wait for all threads to finish - safeAwait(barrier); - + startInParallel(randomIntBetween(2, 32), i -> { + Random random = new Random(random().nextLong()); + for (int j = 0; j < numberOfEntries; j++) { + Integer key = random.nextInt(numberOfEntries); + cache.put(key, Integer.toString(j)); + } + }); cache.refresh(); assertEquals(500, cache.count()); } diff --git a/server/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java b/server/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java index 2fb31cad12051..2909b22347b93 100644 --- a/server/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java +++ b/server/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Random; -import java.util.concurrent.CountDownLatch; import java.util.zip.ZipException; import static org.hamcrest.Matchers.equalTo; @@ -45,34 +44,17 @@ public void testRandom() throws IOException { } public void testRandomThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - for (int i = 0; i < 10; i++) { - byte bytes[] = new byte[TestUtil.nextInt(r, 1, 100000)]; - r.nextBytes(bytes); - doTest(bytes); - } - } catch (Exception e) { - throw new RuntimeException(e); - } + startInParallel(randomIntBetween(2, 6), tid -> { + try { + for (int i = 0; i < 10; i++) { + byte[] bytes = new byte[randomIntBetween(1, 100000)]; + randomBytesBetween(bytes, Byte.MIN_VALUE, Byte.MAX_VALUE); + doTest(bytes); } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } public void testLineDocs() throws IOException { @@ -91,40 +73,24 @@ public void testLineDocs() throws IOException { } public void testLineDocsThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - LineFileDocs lineFileDocs = new LineFileDocs(r); - for (int i = 0; i < 10; i++) { - int numDocs = TestUtil.nextInt(r, 1, 200); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - for (int j = 0; j < numDocs; j++) { - String s = lineFileDocs.nextDoc().get("body"); - bos.write(s.getBytes(StandardCharsets.UTF_8)); - } - doTest(bos.toByteArray()); - } - lineFileDocs.close(); - } catch (Exception e) { - throw new RuntimeException(e); + int threadCount = randomIntBetween(2, 6); + startInParallel(threadCount, tid -> { + try { + LineFileDocs lineFileDocs = new LineFileDocs(random()); + for (int i = 0; i < 10; i++) { + int numDocs = randomIntBetween(1, 200); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + for (int j = 0; j < numDocs; j++) { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); } + doTest(bos.toByteArray()); } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } + lineFileDocs.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } public void testRepetitionsL() throws IOException { @@ -151,48 +117,32 @@ public void testRepetitionsL() throws IOException { } public void testRepetitionsLThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - for (int i = 0; i < 10; i++) { - int numLongs = TestUtil.nextInt(r, 1, 10000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - long theValue = r.nextLong(); - for (int j = 0; j < numLongs; j++) { - if (r.nextInt(10) == 0) { - theValue = r.nextLong(); - } - bos.write((byte) (theValue >>> 56)); - bos.write((byte) (theValue >>> 48)); - bos.write((byte) (theValue >>> 40)); - bos.write((byte) (theValue >>> 32)); - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); + int threadCount = randomIntBetween(2, 6); + startInParallel(threadCount, tid -> { + try { + for (int i = 0; i < 10; i++) { + int numLongs = randomIntBetween(1, 10000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + long theValue = randomLong(); + for (int j = 0; j < numLongs; j++) { + if (randomInt(10) == 0) { + theValue = randomLong(); } - } catch (Exception e) { - throw new RuntimeException(e); + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); } + doTest(bos.toByteArray()); } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } public void testRepetitionsI() throws IOException { @@ -215,44 +165,28 @@ public void testRepetitionsI() throws IOException { } public void testRepetitionsIThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - for (int i = 0; i < 10; i++) { - int numInts = TestUtil.nextInt(r, 1, 20000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int theValue = r.nextInt(); - for (int j = 0; j < numInts; j++) { - if (r.nextInt(10) == 0) { - theValue = r.nextInt(); - } - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); + int threadCount = randomIntBetween(2, 6); + startInParallel(threadCount, tid -> { + try { + for (int i = 0; i < 10; i++) { + int numInts = randomIntBetween(1, 20000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int theValue = randomInt(); + for (int j = 0; j < numInts; j++) { + if (randomInt(10) == 0) { + theValue = randomInt(); } - } catch (Exception e) { - throw new RuntimeException(e); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); } + doTest(bos.toByteArray()); } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } public void testRepetitionsS() throws IOException { @@ -330,42 +264,26 @@ private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { } public void testRepetitionsSThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - for (int i = 0; i < 10; i++) { - int numShorts = TestUtil.nextInt(r, 1, 40000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - short theValue = (short) r.nextInt(65535); - for (int j = 0; j < numShorts; j++) { - if (r.nextInt(10) == 0) { - theValue = (short) r.nextInt(65535); - } - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); + int threadCount = randomIntBetween(2, 6); + startInParallel(threadCount, tid -> { + try { + for (int i = 0; i < 10; i++) { + int numShorts = randomIntBetween(1, 40000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + short theValue = (short) randomInt(65535); + for (int j = 0; j < numShorts; j++) { + if (randomInt(10) == 0) { + theValue = (short) randomInt(65535); } - } catch (Exception e) { - throw new RuntimeException(e); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); } + doTest(bos.toByteArray()); } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } public void testCompressUncompressWithCorruptions() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java index 1fe436347a50f..eeaf41cc80569 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ReachabilityChecker; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; public class RunOnceTests extends ESTestCase { @@ -33,26 +32,7 @@ public void testRunOnce() { public void testRunOnceConcurrently() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); final RunOnce runOnce = new RunOnce(counter::incrementAndGet); - - final Thread[] threads = new Thread[between(3, 10)]; - final CountDownLatch latch = new CountDownLatch(1 + threads.length); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(() -> { - latch.countDown(); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - runOnce.run(); - }); - threads[i].start(); - } - - latch.countDown(); - for (Thread thread : threads) { - thread.join(); - } + startInParallel(between(3, 10), i -> runOnce.run()); assertTrue(runOnce.hasRun()); assertEquals(1, counter.get()); } diff --git a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index 247f60b7228e3..adab51a37d2bf 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -357,46 +357,23 @@ class Int { flipFlop[i] = new AtomicInteger(); } - Thread[] threads = new Thread[randomIntBetween(2, 5)]; - final CountDownLatch latch = new CountDownLatch(1); + final int threads = randomIntBetween(2, 5); final int iters = scaledRandomIntBetween(10000, 100000); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread() { - @Override - public void run() { - try { - latch.await(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - for (int i = 0; i < iters; i++) { - int shard = randomIntBetween(0, counts.length - 1); - try { - try ( - ShardLock autoCloses = env.shardLock( - new ShardId("foo", "fooUUID", shard), - "1", - scaledRandomIntBetween(0, 10) - ) - ) { - counts[shard].value++; - countsAtomic[shard].incrementAndGet(); - assertEquals(flipFlop[shard].incrementAndGet(), 1); - assertEquals(flipFlop[shard].decrementAndGet(), 0); - } - } catch (ShardLockObtainFailedException ex) { - // ok - } + startInParallel(threads, tid -> { + for (int i = 0; i < iters; i++) { + int shard = randomIntBetween(0, counts.length - 1); + try { + try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "fooUUID", shard), "1", scaledRandomIntBetween(0, 10))) { + counts[shard].value++; + countsAtomic[shard].incrementAndGet(); + assertEquals(flipFlop[shard].incrementAndGet(), 1); + assertEquals(flipFlop[shard].decrementAndGet(), 0); } + } catch (ShardLockObtainFailedException ex) { + // ok } - }; - threads[i].start(); - } - latch.countDown(); // fire the threads up - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - + } + }); assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); for (int i = 0; i < counts.length; i++) { assertTrue(counts[i].value > 0); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c668cfbb502a2..3bc88d997ab6d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1252,24 +1252,15 @@ public void testSyncTranslogConcurrently() throws Exception { assertThat(commitInfo.localCheckpoint(), equalTo(engine.getProcessedLocalCheckpoint())); } }; - final Thread[] threads = new Thread[randomIntBetween(2, 4)]; - final Phaser phaser = new Phaser(threads.length); globalCheckpoint.set(engine.getProcessedLocalCheckpoint()); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(() -> { - phaser.arriveAndAwaitAdvance(); - try { - engine.syncTranslog(); - checker.run(); - } catch (IOException e) { - throw new AssertionError(e); - } - }); - threads[i].start(); - } - for (Thread thread : threads) { - thread.join(); - } + startInParallel(randomIntBetween(2, 4), i -> { + try { + engine.syncTranslog(); + checker.run(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); checker.run(); } @@ -2419,8 +2410,6 @@ public void testConcurrentGetAndSetOnPrimary() throws IOException, InterruptedEx MapperService mapperService = createMapperService(); MappingLookup mappingLookup = mapperService.mappingLookup(); DocumentParser documentParser = mapperService.documentParser(); - Thread[] thread = new Thread[randomIntBetween(3, 5)]; - CountDownLatch startGun = new CountDownLatch(thread.length); final int opsPerThread = randomIntBetween(10, 20); class OpAndVersion { final long version; @@ -2438,54 +2427,39 @@ class OpAndVersion { ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null); final BytesRef uidTerm = newUid(doc); engine.index(indexForDoc(doc)); - for (int i = 0; i < thread.length; i++) { - thread[i] = new Thread(() -> { - startGun.countDown(); - safeAwait(startGun); - for (int op = 0; op < opsPerThread; op++) { - Engine.Get engineGet = new Engine.Get(true, false, doc.id()); - try (Engine.GetResult get = engine.get(engineGet, mappingLookup, documentParser, randomSearcherWrapper())) { - FieldsVisitor visitor = new FieldsVisitor(true); - get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor); - List values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); - String removed = op % 3 == 0 && values.size() > 0 ? values.remove(0) : null; - String added = "v_" + idGenerator.incrementAndGet(); - values.add(added); - Engine.Index index = new Engine.Index( - uidTerm, - testParsedDocument( - "1", - null, - testDocument(), - bytesArray(Strings.collectionToCommaDelimitedString(values)), - null - ), - UNASSIGNED_SEQ_NO, - 2, - get.version(), - VersionType.INTERNAL, - PRIMARY, - System.currentTimeMillis(), - -1, - false, - UNASSIGNED_SEQ_NO, - 0 - ); - Engine.IndexResult indexResult = engine.index(index); - if (indexResult.getResultType() == Engine.Result.Type.SUCCESS) { - history.add(new OpAndVersion(indexResult.getVersion(), removed, added)); - } - - } catch (IOException e) { - throw new AssertionError(e); + startInParallel(randomIntBetween(3, 5), i -> { + for (int op = 0; op < opsPerThread; op++) { + Engine.Get engineGet = new Engine.Get(true, false, doc.id()); + try (Engine.GetResult get = engine.get(engineGet, mappingLookup, documentParser, randomSearcherWrapper())) { + FieldsVisitor visitor = new FieldsVisitor(true); + get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor); + List values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); + String removed = op % 3 == 0 && values.size() > 0 ? values.remove(0) : null; + String added = "v_" + idGenerator.incrementAndGet(); + values.add(added); + Engine.Index index = new Engine.Index( + uidTerm, + testParsedDocument("1", null, testDocument(), bytesArray(Strings.collectionToCommaDelimitedString(values)), null), + UNASSIGNED_SEQ_NO, + 2, + get.version(), + VersionType.INTERNAL, + PRIMARY, + System.currentTimeMillis(), + -1, + false, + UNASSIGNED_SEQ_NO, + 0 + ); + Engine.IndexResult indexResult = engine.index(index); + if (indexResult.getResultType() == Engine.Result.Type.SUCCESS) { + history.add(new OpAndVersion(indexResult.getVersion(), removed, added)); } + } catch (IOException e) { + throw new AssertionError(e); } - }); - thread[i].start(); - } - for (int i = 0; i < thread.length; i++) { - thread[i].join(); - } + } + }); List sortedHistory = new ArrayList<>(history); sortedHistory.sort(Comparator.comparing(o -> o.version)); Set currentValues = new HashSet<>(); @@ -4363,7 +4337,6 @@ public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final l } public void testRetryConcurrently() throws InterruptedException, IOException { - Thread[] thread = new Thread[randomIntBetween(3, 5)]; int numDocs = randomIntBetween(1000, 10000); List docs = new ArrayList<>(); final boolean primary = randomBoolean(); @@ -4389,26 +4362,17 @@ public void testRetryConcurrently() throws InterruptedException, IOException { docs.add(retryIndex); } Collections.shuffle(docs, random()); - CountDownLatch startGun = new CountDownLatch(thread.length); AtomicInteger offset = new AtomicInteger(-1); - for (int i = 0; i < thread.length; i++) { - thread[i] = new Thread(() -> { - startGun.countDown(); - safeAwait(startGun); - int docOffset; - while ((docOffset = offset.incrementAndGet()) < docs.size()) { - try { - engine.index(docs.get(docOffset)); - } catch (IOException e) { - throw new AssertionError(e); - } + startInParallel(randomIntBetween(3, 5), i -> { + int docOffset; + while ((docOffset = offset.incrementAndGet()) < docs.size()) { + try { + engine.index(docs.get(docOffset)); + } catch (IOException e) { + throw new AssertionError(e); } - }); - thread[i].start(); - } - for (int i = 0; i < thread.length; i++) { - thread[i].join(); - } + } + }); engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { int count = searcher.count(new MatchAllDocsQuery()); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 3930a37df498c..0765872f892c2 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -196,46 +196,29 @@ protected void doRun() throws Exception { } public void testConcurrentReplica() throws InterruptedException { - Thread[] threads = new Thread[randomIntBetween(2, 5)]; + final int threads = randomIntBetween(2, 5); final int opsPerThread = randomIntBetween(10, 20); - final int maxOps = opsPerThread * threads.length; + final int maxOps = opsPerThread * threads; final long unFinishedSeq = randomIntBetween(0, maxOps - 2); // make sure we always index the last seqNo to simplify maxSeq checks Set seqNos = IntStream.range(0, maxOps).boxed().collect(Collectors.toSet()); - final Integer[][] seqNoPerThread = new Integer[threads.length][]; - for (int t = 0; t < threads.length - 1; t++) { + final Integer[][] seqNoPerThread = new Integer[threads][]; + for (int t = 0; t < threads - 1; t++) { int size = Math.min(seqNos.size(), randomIntBetween(opsPerThread - 4, opsPerThread + 4)); seqNoPerThread[t] = randomSubsetOf(size, seqNos).toArray(new Integer[size]); seqNos.removeAll(Arrays.asList(seqNoPerThread[t])); } - seqNoPerThread[threads.length - 1] = seqNos.toArray(new Integer[seqNos.size()]); - logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinishedSeq); - final CyclicBarrier barrier = new CyclicBarrier(threads.length); - for (int t = 0; t < threads.length; t++) { - final int threadId = t; - threads[t] = new Thread(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - throw new ElasticsearchException("failure in background thread", e); + seqNoPerThread[threads - 1] = seqNos.toArray(new Integer[seqNos.size()]); + logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads, maxOps, unFinishedSeq); + startInParallel(threads, threadId -> { + Integer[] ops = seqNoPerThread[threadId]; + for (int seqNo : ops) { + if (seqNo != unFinishedSeq) { + tracker.markSeqNoAsProcessed(seqNo); + logger.info("[t{}] completed [{}]", threadId, seqNo); } - - @Override - protected void doRun() throws Exception { - barrier.await(); - Integer[] ops = seqNoPerThread[threadId]; - for (int seqNo : ops) { - if (seqNo != unFinishedSeq) { - tracker.markSeqNoAsProcessed(seqNo); - logger.info("[t{}] completed [{}]", threadId, seqNo); - } - } - } - }, "testConcurrentReplica_" + threadId); - threads[t].start(); - } - for (Thread thread : threads) { - thread.join(); - } + } + }); assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L)); assertThat(tracker.getProcessedCheckpoint(), equalTo(unFinishedSeq - 1L)); assertThat(tracker.hasProcessed(unFinishedSeq), equalTo(false)); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index de8dbc3a1515e..0a167b677934e 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -687,7 +686,7 @@ public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException { * * @throws IOException if an I/O exception occurs loading the retention lease state file */ - public void testPersistRetentionLeasesUnderConcurrency() throws IOException { + public void testPersistRetentionLeasesUnderConcurrency() throws IOException, InterruptedException { final AllocationId allocationId = AllocationId.newInitializing(); long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); final ReplicationTracker replicationTracker = new ReplicationTracker( @@ -719,35 +718,16 @@ public void testPersistRetentionLeasesUnderConcurrency() throws IOException { final Path path = createTempDir(); final int numberOfThreads = randomIntBetween(1, 2 * Runtime.getRuntime().availableProcessors()); - final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - final Thread[] threads = new Thread[numberOfThreads]; - for (int i = 0; i < numberOfThreads; i++) { + startInParallel(numberOfThreads, i -> { final String id = Integer.toString(length + i); - threads[i] = new Thread(() -> { - try { - safeAwait(barrier); - final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test-" + id, ActionListener.noop()); - replicationTracker.persistRetentionLeases(path); - safeAwait(barrier); - } catch (final WriteStateException e) { - throw new AssertionError(e); - } - }); - threads[i].start(); - } - - try { - // synchronize the threads invoking ReplicationTracker#persistRetentionLeases(Path path) - safeAwait(barrier); - // wait for all the threads to finish - safeAwait(barrier); - for (int i = 0; i < numberOfThreads; i++) { - threads[i].join(); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test-" + id, ActionListener.noop()); + try { + replicationTracker.persistRetentionLeases(path); + } catch (WriteStateException e) { + throw new AssertionError(e); } - } catch (final InterruptedException e) { - throw new AssertionError(e); - } + }); assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases())); } diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index ff2f55c791dd3..c228cc067b20b 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -56,7 +56,6 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase { public void testThreadedUpdatesToChildBreaker() throws Exception { final int NUM_THREADS = scaledRandomIntBetween(3, 15); final int BYTES_PER_THREAD = scaledRandomIntBetween(500, 4500); - final Thread[] threads = new Thread[NUM_THREADS]; final AtomicBoolean tripped = new AtomicBoolean(false); final AtomicReference lastException = new AtomicReference<>(null); @@ -87,31 +86,21 @@ public void checkParentLimit(long newBytesReserved, String label) throws Circuit CircuitBreaker.REQUEST ); breakerRef.set(breaker); - - for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new Thread(() -> { - for (int j = 0; j < BYTES_PER_THREAD; j++) { - try { - breaker.addEstimateBytesAndMaybeBreak(1L, "test"); - } catch (CircuitBreakingException e) { - if (tripped.get()) { - assertThat("tripped too many times", true, equalTo(false)); - } else { - assertThat(tripped.compareAndSet(false, true), equalTo(true)); - } - } catch (Exception e) { - lastException.set(e); + runInParallel(NUM_THREADS, i -> { + for (int j = 0; j < BYTES_PER_THREAD; j++) { + try { + breaker.addEstimateBytesAndMaybeBreak(1L, "test"); + } catch (CircuitBreakingException e) { + if (tripped.get()) { + assertThat("tripped too many times", true, equalTo(false)); + } else { + assertThat(tripped.compareAndSet(false, true), equalTo(true)); } + } catch (Exception e) { + lastException.set(e); } - }); - - threads[i].start(); - } - - for (Thread t : threads) { - t.join(); - } - + } + }); assertThat("no other exceptions were thrown", lastException.get(), equalTo(null)); assertThat("breaker was tripped", tripped.get(), equalTo(true)); assertThat("breaker was tripped at least once", breaker.getTrippedCount(), greaterThanOrEqualTo(1L)); @@ -122,7 +111,6 @@ public void testThreadedUpdatesToChildBreakerWithParentLimit() throws Exception final int BYTES_PER_THREAD = scaledRandomIntBetween(500, 4500); final int parentLimit = (BYTES_PER_THREAD * NUM_THREADS) - 2; final int childLimit = parentLimit + 10; - final Thread[] threads = new Thread[NUM_THREADS]; final AtomicInteger tripped = new AtomicInteger(0); final AtomicReference lastException = new AtomicReference<>(null); @@ -165,21 +153,6 @@ public void checkParentLimit(long newBytesReserved, String label) throws Circuit CircuitBreaker.REQUEST ); breakerRef.set(breaker); - - for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new Thread(() -> { - for (int j = 0; j < BYTES_PER_THREAD; j++) { - try { - breaker.addEstimateBytesAndMaybeBreak(1L, "test"); - } catch (CircuitBreakingException e) { - tripped.incrementAndGet(); - } catch (Exception e) { - lastException.set(e); - } - } - }); - } - logger.info( "--> NUM_THREADS: [{}], BYTES_PER_THREAD: [{}], TOTAL_BYTES: [{}], PARENT_LIMIT: [{}], CHILD_LIMIT: [{}]", NUM_THREADS, @@ -190,13 +163,17 @@ public void checkParentLimit(long newBytesReserved, String label) throws Circuit ); logger.info("--> starting threads..."); - for (Thread t : threads) { - t.start(); - } - - for (Thread t : threads) { - t.join(); - } + runInParallel(NUM_THREADS, i -> { + for (int j = 0; j < BYTES_PER_THREAD; j++) { + try { + breaker.addEstimateBytesAndMaybeBreak(1L, "test"); + } catch (CircuitBreakingException e) { + tripped.incrementAndGet(); + } catch (Exception e) { + lastException.set(e); + } + } + }); logger.info("--> child breaker: used: {}, limit: {}", breaker.getUsed(), breaker.getLimit()); logger.info("--> parent tripped: {}, total trip count: {} (expecting 1-2 for each)", parentTripped.get(), tripped.get()); @@ -401,29 +378,15 @@ void overLimitTriggered(boolean leader) { }); logger.trace("black hole [{}]", data.hashCode()); - int threadCount = randomIntBetween(1, 10); - CyclicBarrier barrier = new CyclicBarrier(threadCount + 1); - List threads = new ArrayList<>(threadCount); - for (int i = 0; i < threadCount; ++i) { - threads.add(new Thread(() -> { - try { - safeAwait(barrier); - service.checkParentLimit(0, "test-thread"); - } catch (CircuitBreakingException e) { - // very rare - logger.info("Thread got semi-unexpected circuit breaking exception", e); - } - })); - } - - threads.forEach(Thread::start); - barrier.await(20, TimeUnit.SECONDS); - - for (Thread thread : threads) { - thread.join(10000); - } - threads.forEach(thread -> assertFalse(thread.isAlive())); + startInParallel(threadCount, i -> { + try { + service.checkParentLimit(0, "test-thread"); + } catch (CircuitBreakingException e) { + // very rare + logger.info("Thread got semi-unexpected circuit breaking exception", e); + } + }); assertThat(leaderTriggerCount.get(), equalTo(2)); }