Skip to content

Commit

Permalink
Done with adding fork-join-pool pinning test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
sheinbergon committed Sep 11, 2020
1 parent d6955d3 commit 93636ab
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ public void run() {

private final class GovernedPinnedForkJoinWorkerThread extends PinnedThread.ForkJoinWorker {

GovernedPinnedForkJoinWorkerThread(final @Nonnull ForkJoinPool pool,
@Nonnull final AffinityDescriptor descriptor) {
GovernedPinnedForkJoinWorkerThread(
final @Nonnull ForkJoinPool pool,
final @Nonnull AffinityDescriptor descriptor) {
super(pool, descriptor);
}

Expand All @@ -178,6 +179,7 @@ private final class GovernedPinnedForkJoinWorkerThread extends PinnedThread.Fork

@Override
protected void onStart() {
super.onStart();
pinnedThreadStartLatch.fire();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.sheinbergon.needle.concurrent;

import javax.annotation.Nonnull;
import java.io.Closeable;
import java.util.concurrent.ForkJoinPool;

public final class PinnedForkJoinPool extends ForkJoinPool implements Closeable {

/**
* Creates a new affinity aware {@code PinnedForkJoinPool} with the given initial
* parameters.
*
* @param parallelism the parallelism level (amount of worker threads to be spawned)
* @param factory the {@code PinnedThread} factory to use when the executor
* creates new fork-join worker threads
*/
public PinnedForkJoinPool(
final int parallelism,
final @Nonnull PinnedThreadFactory factory) {
super(parallelism, factory, null, false);
}

@Override
public void close() {
shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public final class PinnedThreadPoolExecutor extends ThreadPoolExecutor implement
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param factory the {@code PinnedThread} factory to use when the executor
* creates a new thread
* @param factory the {@code PinnedThreadFactory} used create affinity aware {@code PinnedThread} instances
*/
public PinnedThreadPoolExecutor(
final int corePoolSize,
Expand All @@ -44,7 +43,7 @@ public PinnedThreadPoolExecutor(
/**
* Static factory methods for affinity aware single-thread {@code ExecutorService} inception.
*
* @param factory the {@code PinnedThreadFactory} used create affinity aware {@code PinnedThread} instances
* @param factory the {@code PinnedThreadFactory} used to create affinity aware {@code PinnedThread} instances
* @return the affinity aware {@code ExecutorService}
*/
public static ExecutorService newSinglePinnedThreadExecutor(final @Nonnull PinnedThreadFactory factory) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,45 @@
package org.sheinbergon.needle.concurrent

import org.amshove.kluent.shouldBeEqualTo
import org.amshove.kluent.shouldBeTrue
import org.junit.jupiter.api.Test
import org.sheinbergon.needle.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.RecursiveAction

class FixedAffinityPinnedThreadFactoryTest {

@Test
fun `Initialize the factory`() {
val factory = FixedAffinityPinnedThreadFactory(testAffinityDescriptor)
testPinnedThreadFactory(factory)
testPinnedThreadInception(factory)
testPinnedForkJoinWorkerThreadInception(factory)
}

private fun testPinnedThreadFactory(factory: PinnedThreadFactory) {
private fun testPinnedThreadInception(factory: PinnedThreadFactory) {
val latch = CountDownLatch(`1`)
val pinned = factory.newThread(task(latch))
pinned?.start()
latch.await()
}

private fun testPinnedForkJoinWorkerThreadInception(factory: PinnedThreadFactory) {
val latch = CountDownLatch(`1`)
PinnedForkJoinPool(`1`, factory).use {
val action = action(latch)
it.submit(action)
latch.await()
Thread.sleep(5L)
action.isDone.shouldBeTrue()
}
}

private fun action(latch: CountDownLatch) = object : RecursiveAction() {
override fun compute() = task(latch).run()
}

private fun task(latch: CountDownLatch) = Runnable {
val self = Thread.currentThread() as PinnedThread
val self = Thread.currentThread() as Pinned
self.affinity().mask() shouldBeEqualTo binaryTestMask
self.affinity().toString() shouldBeEqualTo textTestMask
latch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.sheinbergon.needle.*
import org.sheinbergon.needle.concurrent.util.ResettableOneOffLatch
import java.util.concurrent.RecursiveAction

class GovernedAffinityPinnedThreadFactoryTest {

Expand All @@ -15,6 +16,17 @@ class GovernedAffinityPinnedThreadFactoryTest {
runCatching { Thread.sleep(1000L) }
}

private inner class UnlatchAndSleepAction : RecursiveAction() {

lateinit var pinned: Pinned
private set

override fun compute() {
pinned = Thread.currentThread() as PinnedThread.ForkJoinWorker
unlatchAndSleepTask().run()
}
}

@BeforeEach
fun setup() {
latch = ResettableOneOffLatch(true)
Expand All @@ -34,7 +46,7 @@ class GovernedAffinityPinnedThreadFactoryTest {
}

@Test
fun `Initialize the factory using a binary a mask and alter the affinity of newly created pinned threads`() {
fun `Initialize the factory using a binary mask and alter the affinity of newly created pinned threads`() {
val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor)
val pinned1 = factory.newThread(unlatchAndSleepTask())
pinned1!!.start()
Expand Down Expand Up @@ -71,4 +83,42 @@ class GovernedAffinityPinnedThreadFactoryTest {
Thread.sleep(2000L)
factory.governed() shouldBeEqualTo `0`
}

@Test
fun `Initialize the factory without a mask and alter the affinity of created pinned fork-join threads`() {
val factory = GovernedAffinityPinnedThreadFactory()
PinnedForkJoinPool(`1`, factory).use { pool ->
val action = UnlatchAndSleepAction()
pool.execute(action)
latch.await(true)
val pinned = action.pinned
val original = pinned.affinity()
original.mask() shouldBeEqualTo default.mask()
factory.alter(negatedTestAffinityDescriptor, true)
val altered = pinned.affinity()
altered.mask() shouldBeEqualTo negatedBinaryTestMask
}
}

@Test
fun `Initialize the factory using a mask and alter the affinity of newly created pinned fork-join threads`() {
val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor)
PinnedForkJoinPool(`2`, factory).use { pool ->
val action1 = UnlatchAndSleepAction()
pool.execute(action1)
latch.await(true)
val pinned1 = action1.pinned
val original = pinned1.affinity()
original.mask() shouldBeEqualTo binaryTestMask
factory.alter(negatedTestAffinityDescriptor, false)
val unaltered = pinned1.affinity()
unaltered.mask() shouldBeEqualTo binaryTestMask
val action2 = UnlatchAndSleepAction()
pool.execute(action2)
latch.await(false)
val pinned2 = action2.pinned
val altered = pinned2.affinity()
altered.mask() shouldBeEqualTo negatedBinaryTestMask
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.sheinbergon.needle.concurrent

import com.google.common.collect.Sets
import org.amshove.kluent.shouldBe
import org.amshove.kluent.shouldBeEqualTo
import org.junit.jupiter.api.Test
import org.sheinbergon.needle.*
import java.util.concurrent.CountDownLatch

class PinnedForkJoinPoolTest {

@Test
fun `Fixed affinity PinnedForkJoinPool behavior`() {
val pool = PinnedForkJoinPool(availableCores, TestMaskPinnedThreadFactory)
pool.use {
testPinnedThreadExecutor(availableCores, pool)
}
}

private fun testPinnedThreadExecutor(concurrency: Int, pool: PinnedForkJoinPool) = pool.use {
val visited = Sets.newConcurrentHashSet<Pinned>()
val latch = CountDownLatch(concurrency)
pool.parallelism shouldBeEqualTo concurrency
val actions = (`0` until concurrency).map { recursiveAction(latch, visited) }
val tasks = actions.map { pool.submit(it) }
latch.await()
Thread.sleep(5L)
tasks.forEach { it.isDone shouldBe true }
visited.size shouldBeEqualTo concurrency
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PinnedThreadPoolExecutorTest {
}

private fun testPinnedThreadExecutor(concurrency: Int, pool: PinnedThreadPoolExecutor) = pool.use {
val visited = Sets.newConcurrentHashSet<PinnedThread>()
val visited = Sets.newConcurrentHashSet<Pinned>()
val latch = CountDownLatch(concurrency)
pool.corePoolSize shouldBeEqualTo concurrency
val tasks = (`0` until concurrency).map { callableTask(latch, visited) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,27 @@ class ScheduledPinnedThreadPoolExecutorTest {
@Test
fun `Single pinned thread scheduled executor`() {
ScheduledPinnedThreadPoolExecutor
.newSinglePinnedThreadScheduledExecutor(TestMaskPinnedThreadFactory)
.let { testPinnedThreadExecutor(`1`, it as ScheduledPinnedThreadPoolExecutor) }
.newSinglePinnedThreadScheduledExecutor(TestMaskPinnedThreadFactory)
.let { testPinnedThreadExecutor(`1`, it as ScheduledPinnedThreadPoolExecutor) }
}

@Test
fun `Pooled pinned thread scheduled executor`() {
ScheduledPinnedThreadPoolExecutor
.newScheduledPinnedThreadPool(availableCores, TestMaskPinnedThreadFactory)
.let { testPinnedThreadExecutor(availableCores, it as ScheduledPinnedThreadPoolExecutor) }
.newScheduledPinnedThreadPool(availableCores, TestMaskPinnedThreadFactory)
.let { testPinnedThreadExecutor(availableCores, it as ScheduledPinnedThreadPoolExecutor) }
}

private fun testPinnedThreadExecutor(
concurrency: Int,
scheduler: ScheduledPinnedThreadPoolExecutor
concurrency: Int,
scheduler: ScheduledPinnedThreadPoolExecutor
) = scheduler.use {
val visited = Sets.newConcurrentHashSet<PinnedThread>()
val visited = Sets.newConcurrentHashSet<Pinned>()
val latch = CountDownLatch(concurrency)
scheduler.corePoolSize shouldBeEqualTo concurrency
val futures = (`0` until concurrency)
.map { runnableTask(latch, visited) }
.map { scheduler.schedule(it, SCHEDULING_DELAY, TimeUnit.MILLISECONDS) }
.map { runnableTask(latch, visited) }
.map { scheduler.schedule(it, SCHEDULING_DELAY, TimeUnit.MILLISECONDS) }
latch.await()
Thread.sleep(5L)
visited.size shouldBeEqualTo concurrency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@ package org.sheinbergon.needle.concurrent

import org.amshove.kluent.shouldBe
import org.amshove.kluent.shouldBeEqualTo
import org.sheinbergon.needle.PinnedThread
import org.sheinbergon.needle.binaryTestMask
import org.sheinbergon.needle.testAffinityDescriptor
import org.sheinbergon.needle.textTestMask
import java.util.concurrent.Callable
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ForkJoinPool
import org.sheinbergon.needle.*
import java.util.concurrent.*

internal const val SCHEDULING_DELAY = 500L

Expand All @@ -20,11 +14,15 @@ internal object TestMaskPinnedThreadFactory : PinnedThreadFactory {
}

@Suppress("UNCHECKED_CAST")
internal fun callableTask(latch: CountDownLatch, visited: MutableSet<PinnedThread>): Callable<Unit> =
internal fun callableTask(latch: CountDownLatch, visited: MutableSet<Pinned>): Callable<Unit> =
Executors.callable { runnableTask(latch, visited).run() } as Callable<Unit>

internal fun runnableTask(latch: CountDownLatch, visited: MutableSet<PinnedThread>) = Runnable {
val self = Thread.currentThread() as PinnedThread
internal fun recursiveAction(latch: CountDownLatch, visited: MutableSet<Pinned>) = object : RecursiveAction() {
override fun compute() = runnableTask(latch, visited).run()
}

internal fun runnableTask(latch: CountDownLatch, visited: MutableSet<Pinned>) = Runnable {
val self = Thread.currentThread() as Pinned
visited.add(self) shouldBe true
self.affinity().mask() shouldBeEqualTo binaryTestMask
self.affinity().toString() shouldBeEqualTo textTestMask
Expand Down
2 changes: 1 addition & 1 deletion concurrent/src/test/resources/junit-platform.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
junit.jupiter.execution.timeout.default=5000 ms
junit.jupiter.execution.timeout.default=10000 ms

0 comments on commit 93636ab

Please sign in to comment.