Skip to content

Commit

Permalink
feat: added extensions to coroutine builders
Browse files Browse the repository at this point in the history
  • Loading branch information
y9san9 committed Aug 18, 2024
1 parent 42e3909 commit ef3ba42
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 21 deletions.
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,18 @@ See [this example](example/src/main/kotlin/Main.kt) to play around with AQueue.
/**
* Asynchronous Queue with fine-grained control over concurrency
*/
interface AQueue<TRequest, TResponse> {
interface AQueue {

/**
* Executes [request] with fine-grained control over concurrency
* Executes [block] with fine-grained control over concurrency
*
* @param request The request to execute
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param action The action to perform with [request]
* @param block The action to perform
*/
suspend fun execute(
request: TRequest,
suspend fun <T> execute(
key: Any? = null,
context: CoroutineContext = EmptyCoroutineContext,
action: suspend (TRequest) -> TResponse
): TResponse
block: suspend () -> T
): T
```
6 changes: 3 additions & 3 deletions core/src/commonMain/kotlin/me/y9san9/aqueue/AQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import kotlin.coroutines.EmptyCoroutineContext
public interface AQueue {

/**
* Executes [action] with fine-grained control over concurrency
* Executes [block] with fine-grained control over concurrency
*
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param action The action to perform
* @param block The action to perform
*/
public suspend fun <T> execute(
key: Any? = null,
context: CoroutineContext = EmptyCoroutineContext,
action: suspend () -> T
block: suspend () -> T
): T

public companion object
Expand Down
26 changes: 26 additions & 0 deletions core/src/commonMain/kotlin/me/y9san9/aqueue/AQueueAsync.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package me.y9san9.aqueue

import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Launches [block] with fine-grained control over concurrency.
*
* @param scope The scope used to launch a coroutine
* @param start The start mode used to launch a coroutine
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param block The action to perform
*/
public fun <T> AQueue.async(
scope: CoroutineScope,
start: CoroutineStart = CoroutineStart.DEFAULT,
key: Any? = null,
context: CoroutineContext = EmptyCoroutineContext,
block: suspend () -> T
): Deferred<T> {
return scope.async(start = start) {
execute(key, context, block)
}
}
28 changes: 28 additions & 0 deletions core/src/commonMain/kotlin/me/y9san9/aqueue/AQueueLaunch.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package me.y9san9.aqueue

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Launches [block] with fine-grained control over concurrency.
*
* @param scope The scope used to launch a coroutine
* @param start The start mode used to launch a coroutine
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param block The action to perform
*/
public fun AQueue.launch(
scope: CoroutineScope,
start: CoroutineStart = CoroutineStart.DEFAULT,
key: Any? = null,
context: CoroutineContext = EmptyCoroutineContext,
block: suspend () -> Unit
) {
scope.launch(start = start) {
execute(key, context, block)
}
}
8 changes: 4 additions & 4 deletions core/src/commonMain/kotlin/me/y9san9/aqueue/LinkedAQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ public class LinkedAQueue : AQueue {
private val pendingMap = PendingMap()

/**
* Executes [action] with fine-grained control over concurrency
* Executes [block] with fine-grained control over concurrency
*
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param action The action to perform
* @param block The action to perform
*/
override suspend fun <T> execute(
key: Any?,
context: CoroutineContext,
action: suspend () -> T
block: suspend () -> T
): T = coroutineScope {
val scope = this

Expand All @@ -41,7 +41,7 @@ public class LinkedAQueue : AQueue {
pendingMap.putPending(key) { pendingJob ->
launch(context) {
pendingJob?.join()
val result = runCatching { action() }
val result = runCatching { block() }
pendingMap.finishPendingJob(key, coroutineContext.job)
continuation.resumeWith(result)
}
Expand Down
9 changes: 4 additions & 5 deletions core/src/jvmMain/kotlin/me/y9san9/aqueue/AQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.newFixedThreadPoolContext
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Asynchronous Queue that uses [newFixedThreadPoolContext] to create a queue
Expand All @@ -22,11 +21,11 @@ public fun AQueue.Companion.fixedThreadPool(
val fixedContext = newFixedThreadPoolContext(numberOfThreads, name)

return object : AQueue {
override suspend fun <T> execute(key: Any?, context: CoroutineContext, action: suspend () -> T): T {
override suspend fun <T> execute(key: Any?, context: CoroutineContext, block: suspend () -> T): T {
return queue.execute(
key = key,
context = context + fixedContext,
action = action
block = block
)
}
}
Expand All @@ -39,11 +38,11 @@ public fun AQueue.Companion.fixedThreadPool(
*/
public fun AQueue.Companion.io(queue: AQueue = AQueue()): AQueue {
return object : AQueue {
override suspend fun <T> execute(key: Any?, context: CoroutineContext, action: suspend () -> T): T {
override suspend fun <T> execute(key: Any?, context: CoroutineContext, block: suspend () -> T): T {
return queue.execute(
key = key,
context = context + Dispatchers.IO,
action = action
block = block
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ kotlin = "2.0.0"
coroutines = "1.8.0"
maven-publish = "0.29.0"

aqueue = "1.0.6"
aqueue = "1.0.7"

[libraries]

Expand Down

0 comments on commit ef3ba42

Please sign in to comment.