From 1f14edb170bf2d0e145201b58e3d2ae90638b70f Mon Sep 17 00:00:00 2001 From: Lukas Kusik Date: Sun, 28 Apr 2019 22:13:02 +0300 Subject: [PATCH] Parallel operations on Arrays --- README.md | 4 +- .../coroutines/transformations/MapInPlace.kt | 41 +++++ .../coroutines/transformations/ParallelMap.kt | 95 ++++++++---- .../transformations/ParallelMapChunked.kt | 41 +++++ .../transformations/ParallelMapInPlace.kt | 104 +++++++++++++ .../ParallelMapInPlaceChunked.kt | 144 ++++++++++++++++++ .../transformations/ParallelReduce.kt | 2 + .../ParallelInPlaceMapArrayBenchmark.kt | 56 +++++++ ...rallelInPlaceMapPrimitiveArrayBenchmark.kt | 60 ++++++++ .../benchmark/ParallelMapArrayBenchmark.kt | 42 +++++ ...nchmark.kt => ParallelMapListBenchmark.kt} | 10 +- .../ParallelMapPrimitiveArrayBenchmark.kt | 43 ++++++ .../benchmark/ParallelReduceBenchmark.kt | 4 +- .../transformations/test/MapInPlaceTest.kt | 92 +++++++++++ .../test/ParallelMapArrayTest.kt | 124 +++++++++++++++ .../test/ParallelMapInPlaceChunkedTest.kt | 110 +++++++++++++ .../test/ParallelMapInPlaceTest.kt | 109 +++++++++++++ ...pTest.kt => ParallelMapListChunkedTest.kt} | 17 +-- .../test/ParallelMapListTest.kt | 36 +++++ 19 files changed, 1080 insertions(+), 54 deletions(-) create mode 100644 transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/MapInPlace.kt create mode 100644 transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapChunked.kt create mode 100644 transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapInPlace.kt create mode 100644 transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapInPlaceChunked.kt create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelInPlaceMapArrayBenchmark.kt create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelInPlaceMapPrimitiveArrayBenchmark.kt create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapArrayBenchmark.kt rename transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/{ParallelMapBenchmark.kt => ParallelMapListBenchmark.kt} (80%) create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapPrimitiveArrayBenchmark.kt create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/MapInPlaceTest.kt create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapArrayTest.kt create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapInPlaceChunkedTest.kt create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapInPlaceTest.kt rename transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/{ParallelMapTest.kt => ParallelMapListChunkedTest.kt} (71%) create mode 100644 transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapListTest.kt diff --git a/README.md b/README.md index cd9d399..aa65fff 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ suspend fun Iterable.mapParallel(transform: (T) -> R): List = corou Example of using the parallel *map* operation. ```kotlin fun showCase() { - var list = listOf(1,2,3) + val list = listOf(1,2,3) runBlocking(Dispatchers.Default) { var mappedList = list.mapParallel { it * 2 } // Results in [2,4,6] } @@ -47,7 +47,7 @@ allprojects { After that, include this line in your module build.gradle. ```gradle dependencies { - implementation 'com.github.cvb941:kotlin-parallel-collection-operations:1.1' + implementation 'com.github.cvb941:kotlin-parallel-operations:1.2' } ``` diff --git a/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/MapInPlace.kt b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/MapInPlace.kt new file mode 100644 index 0000000..8149b8f --- /dev/null +++ b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/MapInPlace.kt @@ -0,0 +1,41 @@ +package com.lukaskusik.coroutines.transformations + +fun Array.mapInPlace(transform: (T) -> T): Array { + for (i in this.indices) this[i] = transform(this[i]) + return this +} + +fun ByteArray.mapInPlace(transform: (Byte) -> Byte): ByteArray { + for (i in this.indices) this[i] = transform(this[i]) + return this +} + +fun ShortArray.mapInPlace(transform: (Short) -> Short): ShortArray { + for (i in this.indices) this[i] = transform(this[i]) + return this +} + +fun IntArray.mapInPlace(transform: (Int) -> Int): IntArray { + for (i in this.indices) this[i] = transform(this[i]) + return this +} + +fun LongArray.mapInPlace(transform: (Long) -> Long): LongArray { + for (i in this.indices) this[i] = transform(this[i]) + return this +} + +fun FloatArray.mapInPlace(transform: (Float) -> Float): FloatArray { + for (i in this.indices) this[i] = transform(this[i]) + return this +} + +fun DoubleArray.mapInPlace(transform: (Double) -> Double): DoubleArray { + for (i in this.indices) this[i] = transform(this[i]) + return this +} + +fun BooleanArray.mapInPlace(transform: (Boolean) -> Boolean): BooleanArray { + for (i in this.indices) this[i] = transform(this[i]) + return this +} diff --git a/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMap.kt b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMap.kt index 5eee350..a21c28e 100644 --- a/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMap.kt +++ b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMap.kt @@ -12,39 +12,76 @@ suspend fun Iterable.mapParallel( map { async { transform(it) } }.map { it.await() } } +//region Arrays /** - * Performs map transformation on the iterable using coroutines. - * The chunkSize parameter is used to run multiple transformations on a single coroutine. - * - * @param chunkSize Size of each sub-collection that will be reduced in each coroutine. + * Performs map transformation on the array using coroutines. */ -suspend fun Iterable.mapParallelChunked( - chunkSize: Int, +suspend fun Array.mapParallel( transform: (T) -> R ): List = coroutineScope { - chunked(chunkSize).map { subChunk -> - async { - subChunk.map(transform) - } - }.flatMap { - it.await() - } + map { async { transform(it) } }.map { it.await() } } /** - * Performs map transformation on the iterable using coroutines. - * - * It can split the collection into multiple chunks using the chunksCount parameter. - * Each chunk will then run on a single coroutine, minimizing thread management, etc. - * The default and recommended chunksCount for multithreading is the number of CPU threads, e.g. 4 or 8. - * - * @param chunksCount How many chunks should the collection be split into. Defaults to the number of available processors. - * - */ -suspend fun Collection.mapParallelChunked( - chunksCount: Int = Runtime.getRuntime().availableProcessors(), - transform: (T) -> E -): List { - val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() - return asIterable().mapParallelChunked(chunkSize, transform) -} \ No newline at end of file + * Performs map transformation on the array using coroutines. + */ +suspend fun ByteArray.mapParallel( + transform: (Byte) -> R +): List = coroutineScope { + map { async { transform(it) } }.map { it.await() } +} + +/** + * Performs map transformation on the array using coroutines. + */ +suspend fun ShortArray.mapParallel( + transform: (Short) -> R +): List = coroutineScope { + map { async { transform(it) } }.map { it.await() } +} + +/** + * Performs map transformation on the array using coroutines. + */ +suspend fun IntArray.mapParallel( + transform: (Int) -> R +): List = coroutineScope { + map { async { transform(it) } }.map { it.await() } +} + +/** + * Performs map transformation on the array using coroutines. + */ +suspend fun LongArray.mapParallel( + transform: (Long) -> R +): List = coroutineScope { + map { async { transform(it) } }.map { it.await() } +} + +/** + * Performs map transformation on the array using coroutines. + */ +suspend fun FloatArray.mapParallel( + transform: (Float) -> R +): List = coroutineScope { + map { async { transform(it) } }.map { it.await() } +} + +/** + * Performs map transformation on the array using coroutines. + */ +suspend fun DoubleArray.mapParallel( + transform: (Double) -> R +): List = coroutineScope { + map { async { transform(it) } }.map { it.await() } +} + +/** + * Performs map transformation on the array using coroutines. + */ +suspend fun BooleanArray.mapParallel( + transform: (Boolean) -> R +): List = coroutineScope { + map { async { transform(it) } }.map { it.await() } +} +//endregion \ No newline at end of file diff --git a/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapChunked.kt b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapChunked.kt new file mode 100644 index 0000000..47863d7 --- /dev/null +++ b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapChunked.kt @@ -0,0 +1,41 @@ +package com.lukaskusik.coroutines.transformations + +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope + +/** + * Performs map transformation on the iterable using coroutines. + * The chunkSize parameter is used to run multiple transformations on a single coroutine. + * + * @param chunkSize Size of each sub-collection that will be reduced in each coroutine. + */ +suspend fun Iterable.mapParallelChunked( + chunkSize: Int, + transform: (T) -> R +): List = coroutineScope { + chunked(chunkSize).map { subChunk -> + async { + subChunk.map(transform) + } + }.flatMap { + it.await() + } +} + +/** + * Performs map transformation on the collection using coroutines. + * + * It can split the collection into multiple chunks using the chunksCount parameter. + * Each chunk will then run on a single coroutine, minimizing thread management, etc. + * The default and recommended chunksCount for multithreading is the number of CPU threads, e.g. 4 or 8. + * + * @param chunksCount How many chunks should the collection be split into. Defaults to the number of available processors. + * + */ +suspend fun Collection.mapParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (T) -> E +): List { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + return asIterable().mapParallelChunked(chunkSize, transform) +} \ No newline at end of file diff --git a/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapInPlace.kt b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapInPlace.kt new file mode 100644 index 0000000..b17da75 --- /dev/null +++ b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapInPlace.kt @@ -0,0 +1,104 @@ +package com.lukaskusik.coroutines.transformations + +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch + + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun Array.mapInPlaceParallel( + transform: (T) -> T +): Array = coroutineScope { + for (i in indices) { + launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) } + } + this@mapInPlaceParallel +} + + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun ByteArray.mapInPlaceParallel( + transform: (Byte) -> Byte +): ByteArray = coroutineScope { + for (i in indices) { + launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) } + } + this@mapInPlaceParallel +} + + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun ShortArray.mapInPlaceParallel( + transform: (Short) -> Short +): ShortArray = coroutineScope { + for (i in indices) { + launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) } + } + this@mapInPlaceParallel +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun IntArray.mapInPlaceParallel( + transform: (Int) -> Int +): IntArray = coroutineScope { + for (i in indices) { + launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) } + } + this@mapInPlaceParallel +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun LongArray.mapInPlaceParallel( + transform: (Long) -> Long +): LongArray = coroutineScope { + for (i in indices) { + launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) } + } + this@mapInPlaceParallel +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun FloatArray.mapInPlaceParallel( + transform: (Float) -> Float +): FloatArray = coroutineScope { + for (i in indices) { + launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) } + } + this@mapInPlaceParallel +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun DoubleArray.mapInPlaceParallel( + transform: (Double) -> Double +): DoubleArray = coroutineScope { + for (i in indices) { + launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) } + } + this@mapInPlaceParallel +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun BooleanArray.mapInPlaceParallel( + transform: (Boolean) -> Boolean +): BooleanArray = coroutineScope { + for (i in indices) { + launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) } + } + this@mapInPlaceParallel +} diff --git a/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapInPlaceChunked.kt b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapInPlaceChunked.kt new file mode 100644 index 0000000..6ef8936 --- /dev/null +++ b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelMapInPlaceChunked.kt @@ -0,0 +1,144 @@ +package com.lukaskusik.coroutines.transformations + +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch + + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun Array.mapInPlaceParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (T) -> T +): Array = coroutineScope { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + for (i in indices step chunkSize) { + launch { + for (j in i until Math.min(i + chunkSize, size)) + this@mapInPlaceParallelChunked[j] = transform(this@mapInPlaceParallelChunked[j]) + } + } + this@mapInPlaceParallelChunked +} + + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun ByteArray.mapInPlaceParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (Byte) -> Byte +): ByteArray = coroutineScope { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + for (i in indices step chunkSize) { + launch { + for (j in i until Math.min(i + chunkSize, size)) + this@mapInPlaceParallelChunked[j] = transform(this@mapInPlaceParallelChunked[j]) + } + } + this@mapInPlaceParallelChunked +} + + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun ShortArray.mapInPlaceParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (Short) -> Short +): ShortArray = coroutineScope { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + for (i in indices step chunkSize) { + launch { + for (j in i until Math.min(i + chunkSize, size)) + this@mapInPlaceParallelChunked[j] = transform(this@mapInPlaceParallelChunked[j]) + } + } + this@mapInPlaceParallelChunked +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun IntArray.mapInPlaceParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (Int) -> Int +): IntArray = coroutineScope { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + for (i in indices step chunkSize) { + launch { + for (j in i until Math.min(i + chunkSize, size)) + this@mapInPlaceParallelChunked[j] = transform(this@mapInPlaceParallelChunked[j]) + } + } + this@mapInPlaceParallelChunked +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun LongArray.mapInPlaceParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (Long) -> Long +): LongArray = coroutineScope { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + for (i in indices step chunkSize) { + launch { + for (j in i until Math.min(i + chunkSize, size)) + this@mapInPlaceParallelChunked[j] = transform(this@mapInPlaceParallelChunked[j]) + } + } + this@mapInPlaceParallelChunked +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun FloatArray.mapInPlaceParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (Float) -> Float +): FloatArray = coroutineScope { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + for (i in indices step chunkSize) { + launch { + for (j in i until Math.min(i + chunkSize, size)) + this@mapInPlaceParallelChunked[j] = transform(this@mapInPlaceParallelChunked[j]) + } + } + this@mapInPlaceParallelChunked +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun DoubleArray.mapInPlaceParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (Double) -> Double +): DoubleArray = coroutineScope { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + for (i in indices step chunkSize) { + launch { + for (j in i until Math.min(i + chunkSize, size)) + this@mapInPlaceParallelChunked[j] = transform(this@mapInPlaceParallelChunked[j]) + } + } + this@mapInPlaceParallelChunked +} + +/** + * Performs in place map transformation on the array using coroutines. + */ +suspend fun BooleanArray.mapInPlaceParallelChunked( + chunksCount: Int = Runtime.getRuntime().availableProcessors(), + transform: (Boolean) -> Boolean +): BooleanArray = coroutineScope { + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() + for (i in indices step chunkSize) { + launch { + for (j in i until Math.min(i + chunkSize, size)) + this@mapInPlaceParallelChunked[j] = transform(this@mapInPlaceParallelChunked[j]) + } + } + this@mapInPlaceParallelChunked +} diff --git a/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelReduce.kt b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelReduce.kt index ffba76a..076fff4 100644 --- a/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelReduce.kt +++ b/transformations/src/main/kotlin/com/lukaskusik/coroutines/transformations/ParallelReduce.kt @@ -32,6 +32,8 @@ suspend fun Collection.reduceParallel( chunksCount: Int = Runtime.getRuntime().availableProcessors(), operation: (T, T) -> T ): T { + if (chunksCount <= 0) throw IllegalArgumentException("chunksCount must be a positive integer") + val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt() return asIterable().reduceParallel(chunkSize, operation) } \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelInPlaceMapArrayBenchmark.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelInPlaceMapArrayBenchmark.kt new file mode 100644 index 0000000..a795002 --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelInPlaceMapArrayBenchmark.kt @@ -0,0 +1,56 @@ +package com.lukaskusik.coroutines.transformations.benchmark + +import com.carrotsearch.junitbenchmarks.AbstractBenchmark +import com.lukaskusik.coroutines.transformations.mapInPlace +import com.lukaskusik.coroutines.transformations.mapInPlaceParallel +import com.lukaskusik.coroutines.transformations.mapInPlaceParallelChunked +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.junit.Test +import kotlin.random.Random + +class ParallelInPlaceMapArrayBenchmark : AbstractBenchmark() { + + companion object { + const val ARRAY_SIZE = 100 + } + + private val random = Random(468) + private val array = Array(ARRAY_SIZE) { java.util.UUID.randomUUID().toString() } + private val operation = { it: String -> Thread.sleep(1); it + it } + + @Test + fun sequential() { + array.mapInPlace(operation) + } + + @Test + fun coroutineOnMain() { + runBlocking { + array.mapInPlaceParallel(operation) + } + } + + @Test + fun coroutineOnThreadPool() { + runBlocking(Dispatchers.Default) { + array.mapInPlaceParallel(operation) + } + } + + @Test + fun coroutineOnThreadPoolChunked4() { + runBlocking(Dispatchers.Default) { + array.mapInPlaceParallelChunked(4, operation) + } + } + + @Test + fun coroutineOnThreadPoolChunked8() { + runBlocking(Dispatchers.Default) { + array.mapInPlaceParallelChunked(8, operation) + } + } + + +} \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelInPlaceMapPrimitiveArrayBenchmark.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelInPlaceMapPrimitiveArrayBenchmark.kt new file mode 100644 index 0000000..6070b1f --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelInPlaceMapPrimitiveArrayBenchmark.kt @@ -0,0 +1,60 @@ +package com.lukaskusik.coroutines.transformations.benchmark + +import com.carrotsearch.junitbenchmarks.AbstractBenchmark +import com.lukaskusik.coroutines.transformations.mapInPlace +import com.lukaskusik.coroutines.transformations.mapInPlaceParallel +import com.lukaskusik.coroutines.transformations.mapInPlaceParallelChunked +import com.lukaskusik.coroutines.transformations.mapParallelChunked +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.junit.Test +import kotlin.random.Random + +class ParallelInPlaceMapPrimitiveArrayBenchmark : AbstractBenchmark() { + + companion object { + const val ARRAY_SIZE = 100 + } + + private val random = Random(468) + private val array = IntArray(ARRAY_SIZE) { random.nextInt() } + private val operation = { it: Int -> +// Thread.sleep(1) + it + it + } + + + @Test + fun sequential() { + array.mapInPlace(operation) + } + + @Test + fun coroutineOnMain() { + runBlocking { + array.mapInPlaceParallel(operation) + } + } + + @Test + fun coroutineOnThreadPool() { + runBlocking(Dispatchers.Default) { + array.mapInPlaceParallel(operation) + } + } + + @Test + fun coroutineOnThreadPoolChunked4() { + runBlocking(Dispatchers.Default) { + array.mapInPlaceParallelChunked(4, operation) + } + } + + @Test + fun coroutineOnThreadPoolChunked8() { + runBlocking(Dispatchers.Default) { + array.mapInPlaceParallelChunked(8, operation) + } + } + +} \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapArrayBenchmark.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapArrayBenchmark.kt new file mode 100644 index 0000000..ea4b246 --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapArrayBenchmark.kt @@ -0,0 +1,42 @@ +package com.lukaskusik.coroutines.transformations.benchmark + +import com.carrotsearch.junitbenchmarks.AbstractBenchmark +import com.lukaskusik.coroutines.transformations.mapParallel +import com.lukaskusik.coroutines.transformations.mapParallelChunked +import com.lukaskusik.coroutines.transformations.test.ParallelMapListTest +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.junit.Test +import kotlin.random.Random + +class ParallelMapArrayBenchmark : AbstractBenchmark() { + + companion object { + const val ARRAY_SIZE = 100 + } + + private val random = Random(468) + private val list = Array(ARRAY_SIZE) { java.util.UUID.randomUUID().toString() } + + + @Test + fun sequential() { + list.map { Thread.sleep(1); it + it } + } + + @Test + fun coroutineOnMain() { + runBlocking { + list.mapParallel { Thread.sleep(1); it + it } + } + } + + @Test + fun coroutineOnThreadPool() { + runBlocking(Dispatchers.Default) { + list.mapParallel { Thread.sleep(1); it + it } + } + } + + +} \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapBenchmark.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapListBenchmark.kt similarity index 80% rename from transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapBenchmark.kt rename to transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapListBenchmark.kt index 314c598..4b0c027 100644 --- a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapBenchmark.kt +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapListBenchmark.kt @@ -1,22 +1,20 @@ package com.lukaskusik.coroutines.transformations.benchmark import com.carrotsearch.junitbenchmarks.AbstractBenchmark -import com.carrotsearch.junitbenchmarks.annotation.AxisRange -import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart import com.lukaskusik.coroutines.transformations.mapParallel import com.lukaskusik.coroutines.transformations.mapParallelChunked -import com.lukaskusik.coroutines.transformations.test.ParallelMapTest +import com.lukaskusik.coroutines.transformations.test.ParallelMapListTest import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking import org.junit.Test -class ParallelMapBenchmark : AbstractBenchmark() { +class ParallelMapListBenchmark : AbstractBenchmark() { companion object { - const val LIST_SIZE = 1000 + const val LIST_SIZE = 100 } - private val list = ParallelMapTest.getRandomListOfSize(LIST_SIZE) + private val list = ParallelMapListTest.getRandomListOfSize(LIST_SIZE) @Test diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapPrimitiveArrayBenchmark.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapPrimitiveArrayBenchmark.kt new file mode 100644 index 0000000..7dbb022 --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapPrimitiveArrayBenchmark.kt @@ -0,0 +1,43 @@ +package com.lukaskusik.coroutines.transformations.benchmark + +import com.carrotsearch.junitbenchmarks.AbstractBenchmark +import com.lukaskusik.coroutines.transformations.mapParallel +import com.lukaskusik.coroutines.transformations.mapParallelChunked +import com.lukaskusik.coroutines.transformations.test.ParallelMapListTest +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.junit.Test +import kotlin.random.Random + +class ParallelMapPrimitiveArrayBenchmark : AbstractBenchmark() { + + companion object { + const val ARRAY_SIZE = 100 + } + + private val random = Random(468) + private val list = IntArray(ARRAY_SIZE) { random.nextInt() } + + + @Test + fun sequential() { + list.map { Thread.sleep(1); it / 2 } + } + + @Test + fun coroutineOnMain() { + runBlocking { + list.mapParallel { Thread.sleep(1); it / 2 } + } + } + + @Test + fun coroutineOnThreadPool() { + runBlocking(Dispatchers.Default) { + list.mapParallel { Thread.sleep(1); it / 2 } + } + } + + + +} \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelReduceBenchmark.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelReduceBenchmark.kt index 89de477..44de878 100644 --- a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelReduceBenchmark.kt +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelReduceBenchmark.kt @@ -2,7 +2,7 @@ package com.lukaskusik.coroutines.transformations.benchmark import com.carrotsearch.junitbenchmarks.AbstractBenchmark import com.lukaskusik.coroutines.transformations.reduceParallel -import com.lukaskusik.coroutines.transformations.test.ParallelMapTest +import com.lukaskusik.coroutines.transformations.test.ParallelMapListTest import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking import org.junit.Test @@ -13,7 +13,7 @@ class ParallelReduceBenchmark : AbstractBenchmark() { const val LIST_SIZE = 1000 } - private val list = ParallelMapTest.getRandomListOfSize(LIST_SIZE) + private val list = ParallelMapListTest.getRandomListOfSize(LIST_SIZE) private val operation = { acc: Int, i: Int -> Thread.sleep(1); acc + i } diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/MapInPlaceTest.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/MapInPlaceTest.kt new file mode 100644 index 0000000..dc34425 --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/MapInPlaceTest.kt @@ -0,0 +1,92 @@ +package com.lukaskusik.coroutines.transformations.test + +import com.lukaskusik.coroutines.transformations.mapInPlace +import org.junit.Assert +import org.junit.Test + +class MapInPlaceTest { + + @Test + fun inPlaceMapArrayOfStrings() { + val array = arrayOf("one", "two", "three") + val resultArray = arrayOf("oneone", "twotwo", "threethree") + + array.mapInPlace { it + it } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun inPlaceMapArrayOfBytes() { + val array = byteArrayOf(1, 2, 3) + val resultArray = byteArrayOf(2, 4, 6) + + array.mapInPlace { (it + it).toByte() } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun inPlaceMapArrayOfShorts() { + val array = shortArrayOf(1, 2, 3) + val resultArray = shortArrayOf(2, 4, 6) + + array.mapInPlace { (it + it).toShort() } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun inPlaceMapArrayOfInts() { + val array = intArrayOf(1, 2, 3) + val resultArray = intArrayOf(2, 4, 6) + + array.mapInPlace { it + it } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun inPlaceMapArrayOfLongs() { + val array = longArrayOf(1, 2, 3) + val resultArray = longArrayOf(2, 4, 6) + + array.mapInPlace { it + it } + + Assert.assertArrayEquals(array, resultArray) + } + + + @Test + fun inPlaceMapArrayOfFloats() { + val array = floatArrayOf(1F, 2F, 3F) + val resultArray = floatArrayOf(2F, 4F, 6F) + + array.mapInPlace { it + it } + + Assert.assertArrayEquals(array, resultArray, 0.01f) + } + + + @Test + fun inPlaceMapArrayOfDoubles() { + val array = doubleArrayOf(1.0, 2.0, 3.0) + val resultArray = doubleArrayOf(2.0, 4.0, 6.0) + + array.mapInPlace { it + it } + + Assert.assertArrayEquals(array, resultArray, 0.01) + } + + + @Test + fun inPlaceMapArrayOfBooleans() { + val array = booleanArrayOf(true, false, true) + val resultArray = booleanArrayOf(false, true, false) + + array.mapInPlace { !it } + + Assert.assertArrayEquals(array, resultArray) + } + +} \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapArrayTest.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapArrayTest.kt new file mode 100644 index 0000000..cdf01a8 --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapArrayTest.kt @@ -0,0 +1,124 @@ +package com.lukaskusik.coroutines.transformations.test + +import com.lukaskusik.coroutines.transformations.mapParallel +import kotlinx.coroutines.runBlocking +import org.junit.Assert +import org.junit.Test + +class ParallelMapArrayTest { + + + @Test + fun parallelMapArrayOfStrings() { + val arraySequential = arrayOf("one", "two", "three") + val arrayParallel = arraySequential.copyOf() + + val resultSequential = arraySequential.map { it + it } + var resultParallel: List = emptyList() + runBlocking { + resultParallel = arrayParallel.mapParallel { it + it } + } + + Assert.assertEquals(resultSequential, resultParallel) + } + + @Test + fun parallelMapArrayOfBytes() { + val arraySequential = byteArrayOf(1.toByte(), 2.toByte(), 3.toByte()) + val arrayParallel = arraySequential.copyOf() + + val resultSequential = arraySequential.map { it + it } + var resultParallel: List = emptyList() + runBlocking { + resultParallel = arrayParallel.mapParallel { it + it } + } + + Assert.assertEquals(resultSequential, resultParallel) + } + + @Test + fun parallelMapArrayOfShorts() { + val arraySequential = shortArrayOf(1, 2, 3) + val arrayParallel = arraySequential.copyOf() + + val resultSequential = arraySequential.map { it + it } + var resultParallel: List = emptyList() + runBlocking { + resultParallel = arrayParallel.mapParallel { it + it } + } + + Assert.assertEquals(resultSequential, resultParallel) + } + + @Test + fun parallelMapArrayOfInts() { + val arraySequential = intArrayOf(1, 2, 3) + val arrayParallel = arraySequential.copyOf() + + val resultSequential = arraySequential.map { it + it } + var resultParallel: List = emptyList() + runBlocking { + resultParallel = arrayParallel.mapParallel { it + it } + } + + Assert.assertEquals(resultSequential, resultParallel) + } + + @Test + fun parallelMapArrayOfLongs() { + val arraySequential = longArrayOf(1, 2, 3) + val arrayParallel = arraySequential.copyOf() + + val resultSequential = arraySequential.map { it + it } + var resultParallel: List = emptyList() + runBlocking { + resultParallel = arrayParallel.mapParallel { it + it } + } + + Assert.assertEquals(resultSequential, resultParallel) + } + + @Test + fun parallelMapArrayOfFloats() { + val arraySequential = floatArrayOf(1F, 2F, 3F) + val arrayParallel = arraySequential.copyOf() + + val resultSequential = arraySequential.map { it + it } + var resultParallel: List = emptyList() + runBlocking { + resultParallel = arrayParallel.mapParallel { it + it } + } + + Assert.assertEquals(resultSequential, resultParallel) + } + + @Test + fun parallelMapArrayOfDoubles() { + val arraySequential = doubleArrayOf(1.0, 2.0, 3.0) + val arrayParallel = arraySequential.copyOf() + + val resultSequential = arraySequential.map { it + it } + var resultParallel: List = emptyList() + runBlocking { + resultParallel = arrayParallel.mapParallel { it + it } + } + + Assert.assertEquals(resultSequential, resultParallel) + } + + @Test + fun parallelMapArrayOfBooleans() { + val arraySequential = booleanArrayOf(true, false, true) + val arrayParallel = arraySequential.copyOf() + + val resultSequential = arraySequential.map { !it } + var resultParallel: List = emptyList() + runBlocking { + resultParallel = arrayParallel.mapParallel { !it } + } + + Assert.assertEquals(resultSequential, resultParallel) + } + + +} \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapInPlaceChunkedTest.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapInPlaceChunkedTest.kt new file mode 100644 index 0000000..9327181 --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapInPlaceChunkedTest.kt @@ -0,0 +1,110 @@ +package com.lukaskusik.coroutines.transformations.test + +import com.lukaskusik.coroutines.transformations.mapInPlace +import com.lukaskusik.coroutines.transformations.mapInPlaceParallel +import com.lukaskusik.coroutines.transformations.mapInPlaceParallelChunked +import kotlinx.coroutines.runBlocking +import org.junit.Assert +import org.junit.Test + +class ParallelMapInPlaceChunkedTest { + + @Test + fun parallelInPlaceMapArrayOfStringsChunked4() { + val array = arrayOf("one", "two", "three") + val resultArray = arrayOf("oneone", "twotwo", "threethree") + + runBlocking { + array.mapInPlaceParallelChunked(4) { it + it } + } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun parallelInPlaceMapArrayOfBytesChunked4() { + val array = byteArrayOf(1, 2, 3) + val resultArray = byteArrayOf(2, 4, 6) + + runBlocking { + array.mapInPlaceParallelChunked(4) { (it + it).toByte() } + } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun parallelInPlaceMapArrayOfShortsChunked4() { + val array = shortArrayOf(1, 2, 3) + val resultArray = shortArrayOf(2, 4, 6) + + runBlocking { + array.mapInPlaceParallelChunked(4) { (it + it).toShort() } + } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun parallelInPlaceMapArrayOfIntsChunked4() { + val array = intArrayOf(1, 2, 3) + val resultArray = intArrayOf(2, 4, 6) + + runBlocking { + array.mapInPlaceParallelChunked(4) { it + it } + } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun parallelInPlaceMapArrayOfLongsChunked4() { + val array = longArrayOf(1, 2, 3) + val resultArray = longArrayOf(2, 4, 6) + + runBlocking { + array.mapInPlaceParallelChunked(4) { it + it } + } + + Assert.assertArrayEquals(array, resultArray) + } + + + @Test + fun parallelInPlaceMapArrayOfFloatsChunked4() { + val array = floatArrayOf(1F, 2F, 3F) + val resultArray = floatArrayOf(2F, 4F, 6F) + + runBlocking { + array.mapInPlaceParallelChunked(4) { it + it } + } + + Assert.assertArrayEquals(array, resultArray, 0.01f) + } + + + @Test + fun parallelInPlaceMapArrayOfDoublesChunked4() { + val array = doubleArrayOf(1.0, 2.0, 3.0) + val resultArray = doubleArrayOf(2.0, 4.0, 6.0) + + runBlocking { + array.mapInPlaceParallelChunked(4) { it + it } + } + + Assert.assertArrayEquals(array, resultArray, 0.01) + } + + + @Test + fun parallelInPlaceMapArrayOfBooleansChunked4() { + val array = booleanArrayOf(true, false, true) + val resultArray = booleanArrayOf(false, true, false) + + runBlocking { + array.mapInPlaceParallelChunked(4) { !it } + } + + Assert.assertArrayEquals(array, resultArray) + } +} \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapInPlaceTest.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapInPlaceTest.kt new file mode 100644 index 0000000..e29b60c --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapInPlaceTest.kt @@ -0,0 +1,109 @@ +package com.lukaskusik.coroutines.transformations.test + +import com.lukaskusik.coroutines.transformations.mapInPlace +import com.lukaskusik.coroutines.transformations.mapInPlaceParallel +import kotlinx.coroutines.runBlocking +import org.junit.Assert +import org.junit.Test + +class ParallelMapInPlaceTest { + + @Test + fun parallelInPlaceMapArrayOfStrings() { + val array = arrayOf("one", "two", "three") + val resultArray = arrayOf("oneone", "twotwo", "threethree") + + runBlocking { + array.mapInPlaceParallel { it + it } + } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun parallelInPlaceMapArrayOfBytes() { + val array = byteArrayOf(1, 2, 3) + val resultArray = byteArrayOf(2, 4, 6) + + runBlocking { + array.mapInPlaceParallel { (it + it).toByte() } + } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun parallelInPlaceMapArrayOfShorts() { + val array = shortArrayOf(1, 2, 3) + val resultArray = shortArrayOf(2, 4, 6) + + runBlocking { + array.mapInPlaceParallel { (it + it).toShort() } + } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun parallelInPlaceMapArrayOfInts() { + val array = intArrayOf(1, 2, 3) + val resultArray = intArrayOf(2, 4, 6) + + runBlocking { + array.mapInPlaceParallel { it + it } + } + + Assert.assertArrayEquals(array, resultArray) + } + + @Test + fun parallelInPlaceMapArrayOfLongs() { + val array = longArrayOf(1, 2, 3) + val resultArray = longArrayOf(2, 4, 6) + + runBlocking { + array.mapInPlaceParallel { it + it } + } + + Assert.assertArrayEquals(array, resultArray) + } + + + @Test + fun parallelInPlaceMapArrayOfFloats() { + val array = floatArrayOf(1F, 2F, 3F) + val resultArray = floatArrayOf(2F, 4F, 6F) + + runBlocking { + array.mapInPlaceParallel { it + it } + } + + Assert.assertArrayEquals(array, resultArray, 0.01f) + } + + + @Test + fun parallelInPlaceMapArrayOfDoubles() { + val array = doubleArrayOf(1.0, 2.0, 3.0) + val resultArray = doubleArrayOf(2.0, 4.0, 6.0) + + runBlocking { + array.mapInPlaceParallel { it + it } + } + + Assert.assertArrayEquals(array, resultArray, 0.01) + } + + + @Test + fun parallelInPlaceMapArrayOfBooleans() { + val array = booleanArrayOf(true, false, true) + val resultArray = booleanArrayOf(false, true, false) + + runBlocking { + array.mapInPlaceParallel { !it } + } + + Assert.assertArrayEquals(array, resultArray) + } +} \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapTest.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapListChunkedTest.kt similarity index 71% rename from transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapTest.kt rename to transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapListChunkedTest.kt index 828ef25..fb68c0b 100644 --- a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapTest.kt +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapListChunkedTest.kt @@ -7,7 +7,7 @@ import org.junit.Assert import org.junit.Test import kotlin.random.Random -class ParallelMapTest { +class ParallelMapListChunkedTest { companion object { fun getRandomListOfSize(listSize: Int): List { @@ -20,20 +20,6 @@ class ParallelMapTest { } } - @Test - fun parallelMapNoChunks() { - var listSequential = getRandomListOfSize(100) - var listParallel = listSequential.toList() - - listSequential = listSequential.map { it * 2 } - runBlocking { - listParallel = listParallel.mapParallel { it * 2 } - } - - - Assert.assertEquals(listSequential, listParallel) - } - @Test fun parallelMap4Chunks() { var listSequential = listOf(1, 3, 3, 4, 5) @@ -47,4 +33,5 @@ class ParallelMapTest { Assert.assertEquals(listSequential, listParallel) } + } \ No newline at end of file diff --git a/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapListTest.kt b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapListTest.kt new file mode 100644 index 0000000..dfd17ed --- /dev/null +++ b/transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapListTest.kt @@ -0,0 +1,36 @@ +package com.lukaskusik.coroutines.transformations.test + +import com.lukaskusik.coroutines.transformations.mapParallel +import com.lukaskusik.coroutines.transformations.mapParallelChunked +import kotlinx.coroutines.runBlocking +import org.junit.Assert +import org.junit.Test +import kotlin.random.Random + +class ParallelMapListTest { + + companion object { + fun getRandomListOfSize(listSize: Int): List { + val random = Random(648) + val list = ArrayList(listSize) + repeat(listSize) { + list.add(random.nextInt()) + } + return list + } + } + + @Test + fun parallelMapNoChunks() { + var listSequential = getRandomListOfSize(100) + var listParallel = listSequential.toList() + + listSequential = listSequential.map { it * 2 } + runBlocking { + listParallel = listParallel.mapParallel { it * 2 } + } + + + Assert.assertEquals(listSequential, listParallel) + } +} \ No newline at end of file