Skip to content

Commit

Permalink
Parallel operations on Arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
cvb941 committed Apr 28, 2019
1 parent 9432873 commit 1f14edb
Show file tree
Hide file tree
Showing 19 changed files with 1,080 additions and 54 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ suspend fun <T, R> Iterable<T>.mapParallel(transform: (T) -> R): List<R> = corou
Example of using the parallel *map* operation.
```kotlin
fun showCase() {
var list = listOf(1,2,3)
val list = listOf(1,2,3)
runBlocking(Dispatchers.Default) {
var mappedList = list.mapParallel { it * 2 } // Results in [2,4,6]
}
Expand Down Expand Up @@ -47,7 +47,7 @@ allprojects {
After that, include this line in your module build.gradle.
```gradle
dependencies {
implementation 'com.github.cvb941:kotlin-parallel-collection-operations:1.1'
implementation 'com.github.cvb941:kotlin-parallel-operations:1.2'
}
```

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.lukaskusik.coroutines.transformations

fun <T> Array<T>.mapInPlace(transform: (T) -> T): Array<T> {
for (i in this.indices) this[i] = transform(this[i])
return this
}

fun ByteArray.mapInPlace(transform: (Byte) -> Byte): ByteArray {
for (i in this.indices) this[i] = transform(this[i])
return this
}

fun ShortArray.mapInPlace(transform: (Short) -> Short): ShortArray {
for (i in this.indices) this[i] = transform(this[i])
return this
}

fun IntArray.mapInPlace(transform: (Int) -> Int): IntArray {
for (i in this.indices) this[i] = transform(this[i])
return this
}

fun LongArray.mapInPlace(transform: (Long) -> Long): LongArray {
for (i in this.indices) this[i] = transform(this[i])
return this
}

fun FloatArray.mapInPlace(transform: (Float) -> Float): FloatArray {
for (i in this.indices) this[i] = transform(this[i])
return this
}

fun DoubleArray.mapInPlace(transform: (Double) -> Double): DoubleArray {
for (i in this.indices) this[i] = transform(this[i])
return this
}

fun BooleanArray.mapInPlace(transform: (Boolean) -> Boolean): BooleanArray {
for (i in this.indices) this[i] = transform(this[i])
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,76 @@ suspend fun <T, R> Iterable<T>.mapParallel(
map { async { transform(it) } }.map { it.await() }
}

//region Arrays
/**
* Performs map transformation on the iterable using coroutines.
* The chunkSize parameter is used to run multiple transformations on a single coroutine.
*
* @param chunkSize Size of each sub-collection that will be reduced in each coroutine.
* Performs map transformation on the array using coroutines.
*/
suspend fun <T, R> Iterable<T>.mapParallelChunked(
chunkSize: Int,
suspend fun <T, R> Array<out T>.mapParallel(
transform: (T) -> R
): List<R> = coroutineScope {
chunked(chunkSize).map { subChunk ->
async {
subChunk.map(transform)
}
}.flatMap {
it.await()
}
map { async { transform(it) } }.map { it.await() }
}

/**
* Performs map transformation on the iterable using coroutines.
*
* It can split the collection into multiple chunks using the chunksCount parameter.
* Each chunk will then run on a single coroutine, minimizing thread management, etc.
* The default and recommended chunksCount for multithreading is the number of CPU threads, e.g. 4 or 8.
*
* @param chunksCount How many chunks should the collection be split into. Defaults to the number of available processors.
*
*/
suspend fun <T, E> Collection<T>.mapParallelChunked(
chunksCount: Int = Runtime.getRuntime().availableProcessors(),
transform: (T) -> E
): List<E> {
val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt()
return asIterable().mapParallelChunked(chunkSize, transform)
}
* Performs map transformation on the array using coroutines.
*/
suspend fun <R> ByteArray.mapParallel(
transform: (Byte) -> R
): List<R> = coroutineScope {
map { async { transform(it) } }.map { it.await() }
}

/**
* Performs map transformation on the array using coroutines.
*/
suspend fun <R> ShortArray.mapParallel(
transform: (Short) -> R
): List<R> = coroutineScope {
map { async { transform(it) } }.map { it.await() }
}

/**
* Performs map transformation on the array using coroutines.
*/
suspend fun <R> IntArray.mapParallel(
transform: (Int) -> R
): List<R> = coroutineScope {
map { async { transform(it) } }.map { it.await() }
}

/**
* Performs map transformation on the array using coroutines.
*/
suspend fun <R> LongArray.mapParallel(
transform: (Long) -> R
): List<R> = coroutineScope {
map { async { transform(it) } }.map { it.await() }
}

/**
* Performs map transformation on the array using coroutines.
*/
suspend fun <R> FloatArray.mapParallel(
transform: (Float) -> R
): List<R> = coroutineScope {
map { async { transform(it) } }.map { it.await() }
}

/**
* Performs map transformation on the array using coroutines.
*/
suspend fun <R> DoubleArray.mapParallel(
transform: (Double) -> R
): List<R> = coroutineScope {
map { async { transform(it) } }.map { it.await() }
}

/**
* Performs map transformation on the array using coroutines.
*/
suspend fun <R> BooleanArray.mapParallel(
transform: (Boolean) -> R
): List<R> = coroutineScope {
map { async { transform(it) } }.map { it.await() }
}
//endregion
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.lukaskusik.coroutines.transformations

import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope

/**
* Performs map transformation on the iterable using coroutines.
* The chunkSize parameter is used to run multiple transformations on a single coroutine.
*
* @param chunkSize Size of each sub-collection that will be reduced in each coroutine.
*/
suspend fun <T, R> Iterable<T>.mapParallelChunked(
chunkSize: Int,
transform: (T) -> R
): List<R> = coroutineScope {
chunked(chunkSize).map { subChunk ->
async {
subChunk.map(transform)
}
}.flatMap {
it.await()
}
}

/**
* Performs map transformation on the collection using coroutines.
*
* It can split the collection into multiple chunks using the chunksCount parameter.
* Each chunk will then run on a single coroutine, minimizing thread management, etc.
* The default and recommended chunksCount for multithreading is the number of CPU threads, e.g. 4 or 8.
*
* @param chunksCount How many chunks should the collection be split into. Defaults to the number of available processors.
*
*/
suspend fun <T, E> Collection<T>.mapParallelChunked(
chunksCount: Int = Runtime.getRuntime().availableProcessors(),
transform: (T) -> E
): List<E> {
val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt()
return asIterable().mapParallelChunked(chunkSize, transform)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.lukaskusik.coroutines.transformations

import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch


/**
* Performs in place map transformation on the array using coroutines.
*/
suspend fun <T> Array<T>.mapInPlaceParallel(
transform: (T) -> T
): Array<T> = coroutineScope {
for (i in indices) {
launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) }
}
this@mapInPlaceParallel
}


/**
* Performs in place map transformation on the array using coroutines.
*/
suspend fun ByteArray.mapInPlaceParallel(
transform: (Byte) -> Byte
): ByteArray = coroutineScope {
for (i in indices) {
launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) }
}
this@mapInPlaceParallel
}


/**
* Performs in place map transformation on the array using coroutines.
*/
suspend fun ShortArray.mapInPlaceParallel(
transform: (Short) -> Short
): ShortArray = coroutineScope {
for (i in indices) {
launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) }
}
this@mapInPlaceParallel
}

/**
* Performs in place map transformation on the array using coroutines.
*/
suspend fun IntArray.mapInPlaceParallel(
transform: (Int) -> Int
): IntArray = coroutineScope {
for (i in indices) {
launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) }
}
this@mapInPlaceParallel
}

/**
* Performs in place map transformation on the array using coroutines.
*/
suspend fun LongArray.mapInPlaceParallel(
transform: (Long) -> Long
): LongArray = coroutineScope {
for (i in indices) {
launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) }
}
this@mapInPlaceParallel
}

/**
* Performs in place map transformation on the array using coroutines.
*/
suspend fun FloatArray.mapInPlaceParallel(
transform: (Float) -> Float
): FloatArray = coroutineScope {
for (i in indices) {
launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) }
}
this@mapInPlaceParallel
}

/**
* Performs in place map transformation on the array using coroutines.
*/
suspend fun DoubleArray.mapInPlaceParallel(
transform: (Double) -> Double
): DoubleArray = coroutineScope {
for (i in indices) {
launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) }
}
this@mapInPlaceParallel
}

/**
* Performs in place map transformation on the array using coroutines.
*/
suspend fun BooleanArray.mapInPlaceParallel(
transform: (Boolean) -> Boolean
): BooleanArray = coroutineScope {
for (i in indices) {
launch { this@mapInPlaceParallel[i] = transform(this@mapInPlaceParallel[i]) }
}
this@mapInPlaceParallel
}
Loading

0 comments on commit 1f14edb

Please sign in to comment.