Skip to content

Commit

Permalink
Fix ScalingThreadPoolTest to handle new pool
Browse files Browse the repository at this point in the history
This was a bad use of randomization. The test is super fast so it can be
run against every scaling thread pool every time.

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Oct 6, 2023
1 parent 66aef13 commit 192c8fc
Showing 1 changed file with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,43 @@

package org.opensearch.threadpool;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;

public class ScalingThreadPoolTests extends OpenSearchThreadPoolTestCase {

@ParametersFactory
public static Collection<Object[]> scalingThreadPools() {
return ThreadPool.THREAD_POOL_TYPES.entrySet()
.stream()
.filter(t -> t.getValue().equals(ThreadPool.ThreadPoolType.SCALING))
.map(e -> new String[] { e.getKey() })
.collect(Collectors.toList());
}

private final String threadPoolName;

public ScalingThreadPoolTests(String threadPoolName) {
this.threadPoolName = threadPoolName;
}

public void testScalingThreadPoolConfiguration() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
final Settings.Builder builder = Settings.builder();

final int core;
Expand Down Expand Up @@ -136,11 +154,11 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n);
sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessorsMaxTen);
sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::halfAllocatedProcessorsMaxTen);
return sizes.get(threadPoolName).apply(numberOfProcessors);
}

public void testScalingThreadPoolIsBounded() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
final int size = randomIntBetween(32, 512);
final Settings settings = Settings.builder().put("thread_pool." + threadPoolName + ".max", size).build();
runScalingThreadPoolTest(settings, (clusterSettings, threadPool) -> {
Expand Down Expand Up @@ -170,7 +188,6 @@ public void testScalingThreadPoolIsBounded() throws InterruptedException {
}

public void testScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
final int min = "generic".equals(threadPoolName) ? 4 : 1;
final Settings settings = Settings.builder()
.put("thread_pool." + threadPoolName + ".max", 128)
Expand Down

0 comments on commit 192c8fc

Please sign in to comment.