diff --git a/concurrent/src/main/java/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactory.java b/concurrent/src/main/java/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactory.java index de287c5..995e092 100644 --- a/concurrent/src/main/java/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactory.java +++ b/concurrent/src/main/java/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactory.java @@ -167,8 +167,9 @@ public void run() { private final class GovernedPinnedForkJoinWorkerThread extends PinnedThread.ForkJoinWorker { - GovernedPinnedForkJoinWorkerThread(final @Nonnull ForkJoinPool pool, - @Nonnull final AffinityDescriptor descriptor) { + GovernedPinnedForkJoinWorkerThread( + final @Nonnull ForkJoinPool pool, + final @Nonnull AffinityDescriptor descriptor) { super(pool, descriptor); } @@ -178,6 +179,7 @@ private final class GovernedPinnedForkJoinWorkerThread extends PinnedThread.Fork @Override protected void onStart() { + super.onStart(); pinnedThreadStartLatch.fire(); } diff --git a/concurrent/src/main/java/org/sheinbergon/needle/concurrent/PinnedForkJoinPool.java b/concurrent/src/main/java/org/sheinbergon/needle/concurrent/PinnedForkJoinPool.java new file mode 100644 index 0000000..86f669b --- /dev/null +++ b/concurrent/src/main/java/org/sheinbergon/needle/concurrent/PinnedForkJoinPool.java @@ -0,0 +1,27 @@ +package org.sheinbergon.needle.concurrent; + +import javax.annotation.Nonnull; +import java.io.Closeable; +import java.util.concurrent.ForkJoinPool; + +public final class PinnedForkJoinPool extends ForkJoinPool implements Closeable { + + /** + * Creates a new affinity aware {@code PinnedForkJoinPool} with the given initial + * parameters. + * + * @param parallelism the parallelism level (amount of worker threads to be spawned) + * @param factory the {@code PinnedThread} factory to use when the executor + * creates new fork-join worker threads + */ + public PinnedForkJoinPool( + final int parallelism, + final @Nonnull PinnedThreadFactory factory) { + super(parallelism, factory, null, false); + } + + @Override + public void close() { + shutdown(); + } +} diff --git a/concurrent/src/main/java/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutor.java b/concurrent/src/main/java/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutor.java index 0a47f80..4db9281 100644 --- a/concurrent/src/main/java/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutor.java +++ b/concurrent/src/main/java/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutor.java @@ -28,8 +28,7 @@ public final class PinnedThreadPoolExecutor extends ThreadPoolExecutor implement * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. - * @param factory the {@code PinnedThread} factory to use when the executor - * creates a new thread + * @param factory the {@code PinnedThreadFactory} used create affinity aware {@code PinnedThread} instances */ public PinnedThreadPoolExecutor( final int corePoolSize, @@ -44,7 +43,7 @@ public PinnedThreadPoolExecutor( /** * Static factory methods for affinity aware single-thread {@code ExecutorService} inception. * - * @param factory the {@code PinnedThreadFactory} used create affinity aware {@code PinnedThread} instances + * @param factory the {@code PinnedThreadFactory} used to create affinity aware {@code PinnedThread} instances * @return the affinity aware {@code ExecutorService} */ public static ExecutorService newSinglePinnedThreadExecutor(final @Nonnull PinnedThreadFactory factory) { diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/FixedAffinityPinnedThreadFactoryTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/FixedAffinityPinnedThreadFactoryTest.kt index edac72a..81993fc 100644 --- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/FixedAffinityPinnedThreadFactoryTest.kt +++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/FixedAffinityPinnedThreadFactoryTest.kt @@ -1,27 +1,45 @@ package org.sheinbergon.needle.concurrent import org.amshove.kluent.shouldBeEqualTo +import org.amshove.kluent.shouldBeTrue import org.junit.jupiter.api.Test import org.sheinbergon.needle.* import java.util.concurrent.CountDownLatch +import java.util.concurrent.RecursiveAction class FixedAffinityPinnedThreadFactoryTest { @Test fun `Initialize the factory`() { val factory = FixedAffinityPinnedThreadFactory(testAffinityDescriptor) - testPinnedThreadFactory(factory) + testPinnedThreadInception(factory) + testPinnedForkJoinWorkerThreadInception(factory) } - private fun testPinnedThreadFactory(factory: PinnedThreadFactory) { + private fun testPinnedThreadInception(factory: PinnedThreadFactory) { val latch = CountDownLatch(`1`) val pinned = factory.newThread(task(latch)) pinned?.start() latch.await() } + private fun testPinnedForkJoinWorkerThreadInception(factory: PinnedThreadFactory) { + val latch = CountDownLatch(`1`) + PinnedForkJoinPool(`1`, factory).use { + val action = action(latch) + it.submit(action) + latch.await() + Thread.sleep(5L) + action.isDone.shouldBeTrue() + } + } + + private fun action(latch: CountDownLatch) = object : RecursiveAction() { + override fun compute() = task(latch).run() + } + private fun task(latch: CountDownLatch) = Runnable { - val self = Thread.currentThread() as PinnedThread + val self = Thread.currentThread() as Pinned self.affinity().mask() shouldBeEqualTo binaryTestMask self.affinity().toString() shouldBeEqualTo textTestMask latch.countDown() diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactoryTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactoryTest.kt index 3d21714..0c09843 100644 --- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactoryTest.kt +++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/GovernedAffinityPinnedThreadFactoryTest.kt @@ -5,6 +5,7 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.sheinbergon.needle.* import org.sheinbergon.needle.concurrent.util.ResettableOneOffLatch +import java.util.concurrent.RecursiveAction class GovernedAffinityPinnedThreadFactoryTest { @@ -15,6 +16,17 @@ class GovernedAffinityPinnedThreadFactoryTest { runCatching { Thread.sleep(1000L) } } + private inner class UnlatchAndSleepAction : RecursiveAction() { + + lateinit var pinned: Pinned + private set + + override fun compute() { + pinned = Thread.currentThread() as PinnedThread.ForkJoinWorker + unlatchAndSleepTask().run() + } + } + @BeforeEach fun setup() { latch = ResettableOneOffLatch(true) @@ -34,7 +46,7 @@ class GovernedAffinityPinnedThreadFactoryTest { } @Test - fun `Initialize the factory using a binary a mask and alter the affinity of newly created pinned threads`() { + fun `Initialize the factory using a binary mask and alter the affinity of newly created pinned threads`() { val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor) val pinned1 = factory.newThread(unlatchAndSleepTask()) pinned1!!.start() @@ -71,4 +83,42 @@ class GovernedAffinityPinnedThreadFactoryTest { Thread.sleep(2000L) factory.governed() shouldBeEqualTo `0` } + + @Test + fun `Initialize the factory without a mask and alter the affinity of created pinned fork-join threads`() { + val factory = GovernedAffinityPinnedThreadFactory() + PinnedForkJoinPool(`1`, factory).use { pool -> + val action = UnlatchAndSleepAction() + pool.execute(action) + latch.await(true) + val pinned = action.pinned + val original = pinned.affinity() + original.mask() shouldBeEqualTo default.mask() + factory.alter(negatedTestAffinityDescriptor, true) + val altered = pinned.affinity() + altered.mask() shouldBeEqualTo negatedBinaryTestMask + } + } + + @Test + fun `Initialize the factory using a mask and alter the affinity of newly created pinned fork-join threads`() { + val factory = GovernedAffinityPinnedThreadFactory(testAffinityDescriptor) + PinnedForkJoinPool(`2`, factory).use { pool -> + val action1 = UnlatchAndSleepAction() + pool.execute(action1) + latch.await(true) + val pinned1 = action1.pinned + val original = pinned1.affinity() + original.mask() shouldBeEqualTo binaryTestMask + factory.alter(negatedTestAffinityDescriptor, false) + val unaltered = pinned1.affinity() + unaltered.mask() shouldBeEqualTo binaryTestMask + val action2 = UnlatchAndSleepAction() + pool.execute(action2) + latch.await(false) + val pinned2 = action2.pinned + val altered = pinned2.affinity() + altered.mask() shouldBeEqualTo negatedBinaryTestMask + } + } } diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedForkJoinPoolTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedForkJoinPoolTest.kt new file mode 100644 index 0000000..48b58da --- /dev/null +++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedForkJoinPoolTest.kt @@ -0,0 +1,31 @@ +package org.sheinbergon.needle.concurrent + +import com.google.common.collect.Sets +import org.amshove.kluent.shouldBe +import org.amshove.kluent.shouldBeEqualTo +import org.junit.jupiter.api.Test +import org.sheinbergon.needle.* +import java.util.concurrent.CountDownLatch + +class PinnedForkJoinPoolTest { + + @Test + fun `Fixed affinity PinnedForkJoinPool behavior`() { + val pool = PinnedForkJoinPool(availableCores, TestMaskPinnedThreadFactory) + pool.use { + testPinnedThreadExecutor(availableCores, pool) + } + } + + private fun testPinnedThreadExecutor(concurrency: Int, pool: PinnedForkJoinPool) = pool.use { + val visited = Sets.newConcurrentHashSet() + val latch = CountDownLatch(concurrency) + pool.parallelism shouldBeEqualTo concurrency + val actions = (`0` until concurrency).map { recursiveAction(latch, visited) } + val tasks = actions.map { pool.submit(it) } + latch.await() + Thread.sleep(5L) + tasks.forEach { it.isDone shouldBe true } + visited.size shouldBeEqualTo concurrency + } +} diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutorTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutorTest.kt index 063e3a0..1dd5125 100644 --- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutorTest.kt +++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/PinnedThreadPoolExecutorTest.kt @@ -22,7 +22,7 @@ class PinnedThreadPoolExecutorTest { } private fun testPinnedThreadExecutor(concurrency: Int, pool: PinnedThreadPoolExecutor) = pool.use { - val visited = Sets.newConcurrentHashSet() + val visited = Sets.newConcurrentHashSet() val latch = CountDownLatch(concurrency) pool.corePoolSize shouldBeEqualTo concurrency val tasks = (`0` until concurrency).map { callableTask(latch, visited) } diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/ScheduledPinnedThreadPoolExecutorTest.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/ScheduledPinnedThreadPoolExecutorTest.kt index 7d773e4..03d4eac 100644 --- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/ScheduledPinnedThreadPoolExecutorTest.kt +++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/ScheduledPinnedThreadPoolExecutorTest.kt @@ -12,27 +12,27 @@ class ScheduledPinnedThreadPoolExecutorTest { @Test fun `Single pinned thread scheduled executor`() { ScheduledPinnedThreadPoolExecutor - .newSinglePinnedThreadScheduledExecutor(TestMaskPinnedThreadFactory) - .let { testPinnedThreadExecutor(`1`, it as ScheduledPinnedThreadPoolExecutor) } + .newSinglePinnedThreadScheduledExecutor(TestMaskPinnedThreadFactory) + .let { testPinnedThreadExecutor(`1`, it as ScheduledPinnedThreadPoolExecutor) } } @Test fun `Pooled pinned thread scheduled executor`() { ScheduledPinnedThreadPoolExecutor - .newScheduledPinnedThreadPool(availableCores, TestMaskPinnedThreadFactory) - .let { testPinnedThreadExecutor(availableCores, it as ScheduledPinnedThreadPoolExecutor) } + .newScheduledPinnedThreadPool(availableCores, TestMaskPinnedThreadFactory) + .let { testPinnedThreadExecutor(availableCores, it as ScheduledPinnedThreadPoolExecutor) } } private fun testPinnedThreadExecutor( - concurrency: Int, - scheduler: ScheduledPinnedThreadPoolExecutor + concurrency: Int, + scheduler: ScheduledPinnedThreadPoolExecutor ) = scheduler.use { - val visited = Sets.newConcurrentHashSet() + val visited = Sets.newConcurrentHashSet() val latch = CountDownLatch(concurrency) scheduler.corePoolSize shouldBeEqualTo concurrency val futures = (`0` until concurrency) - .map { runnableTask(latch, visited) } - .map { scheduler.schedule(it, SCHEDULING_DELAY, TimeUnit.MILLISECONDS) } + .map { runnableTask(latch, visited) } + .map { scheduler.schedule(it, SCHEDULING_DELAY, TimeUnit.MILLISECONDS) } latch.await() Thread.sleep(5L) visited.size shouldBeEqualTo concurrency diff --git a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/Support.kt b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/Support.kt index 27dba8f..42c9446 100644 --- a/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/Support.kt +++ b/concurrent/src/test/kotlin/org/sheinbergon/needle/concurrent/Support.kt @@ -2,14 +2,8 @@ package org.sheinbergon.needle.concurrent import org.amshove.kluent.shouldBe import org.amshove.kluent.shouldBeEqualTo -import org.sheinbergon.needle.PinnedThread -import org.sheinbergon.needle.binaryTestMask -import org.sheinbergon.needle.testAffinityDescriptor -import org.sheinbergon.needle.textTestMask -import java.util.concurrent.Callable -import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors -import java.util.concurrent.ForkJoinPool +import org.sheinbergon.needle.* +import java.util.concurrent.* internal const val SCHEDULING_DELAY = 500L @@ -20,11 +14,15 @@ internal object TestMaskPinnedThreadFactory : PinnedThreadFactory { } @Suppress("UNCHECKED_CAST") -internal fun callableTask(latch: CountDownLatch, visited: MutableSet): Callable = +internal fun callableTask(latch: CountDownLatch, visited: MutableSet): Callable = Executors.callable { runnableTask(latch, visited).run() } as Callable -internal fun runnableTask(latch: CountDownLatch, visited: MutableSet) = Runnable { - val self = Thread.currentThread() as PinnedThread +internal fun recursiveAction(latch: CountDownLatch, visited: MutableSet) = object : RecursiveAction() { + override fun compute() = runnableTask(latch, visited).run() +} + +internal fun runnableTask(latch: CountDownLatch, visited: MutableSet) = Runnable { + val self = Thread.currentThread() as Pinned visited.add(self) shouldBe true self.affinity().mask() shouldBeEqualTo binaryTestMask self.affinity().toString() shouldBeEqualTo textTestMask diff --git a/concurrent/src/test/resources/junit-platform.properties b/concurrent/src/test/resources/junit-platform.properties index 3decb2a..ee9b800 100644 --- a/concurrent/src/test/resources/junit-platform.properties +++ b/concurrent/src/test/resources/junit-platform.properties @@ -1 +1 @@ -junit.jupiter.execution.timeout.default=5000 ms \ No newline at end of file +junit.jupiter.execution.timeout.default=10000 ms \ No newline at end of file