Skip to content

Commit

Permalink
feat: added a possibility to recover
Browse files Browse the repository at this point in the history
  • Loading branch information
y9san9 committed Aug 28, 2024
1 parent fe5a301 commit 80d3a08
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
22 changes: 15 additions & 7 deletions core/src/commonMain/kotlin/me/y9san9/aqueue/flow/AQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package me.y9san9.aqueue.flow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import me.y9san9.aqueue.AQueue
import kotlin.coroutines.CoroutineContext
Expand All @@ -17,19 +15,27 @@ import kotlin.coroutines.EmptyCoroutineContext
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param queue The queue used to parallel flow
* @param recover The action to perform in case of exception
* @param transform The action to perform with request
*/
public fun <T, R> Flow<T>.mapInAQueue(
queue: AQueue = AQueue(),
key: suspend (T) -> Any? = { null },
context: CoroutineContext = EmptyCoroutineContext,
transform: suspend (T) -> R,
recover: suspend FlowCollector<R>.(Throwable) -> Unit = { throw it },
transform: suspend (T) -> R
): Flow<R> {
return channelFlow {
collect { element ->
launch(start = CoroutineStart.UNDISPATCHED) {
val result = queue.execute(key(element), context) { transform(element) }
send(result)
runCatching {
queue.execute(key(element), context) { transform(element) }
}.onFailure { throwable ->
val flow = flow { recover(throwable) }
flow.collect(::send)
}.onSuccess { element ->
send(element)
}
}
}
}
Expand All @@ -42,14 +48,16 @@ public fun <T, R> Flow<T>.mapInAQueue(
* @param key It is guaranteed that requests with the same [key] will be executed consecutively
* @param context The context that is used to launch new coroutines. You may limit parallelism using context
* @param queue The queue used to parallel flow
* @param recover The action to perform in case of exception
* @param block The action to perform with request
*/
public fun <T> Flow<T>.launchInAQueue(
scope: CoroutineScope,
queue: AQueue = AQueue(),
key: suspend (T) -> Any? = { null },
context: CoroutineContext = EmptyCoroutineContext,
recover: suspend (Throwable) -> Unit = { throw it },
block: suspend (T) -> Unit
): Job {
return mapInAQueue(queue, key, context, block).launchIn(scope)
return mapInAQueue(queue, key, context, { throwable -> recover(throwable) }, block).launchIn(scope)
}
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ kotlin = "2.0.0"
coroutines = "1.8.0"
maven-publish = "0.29.0"

aqueue = "1.0.8"
aqueue = "1.0.9"

[libraries]

Expand Down

0 comments on commit 80d3a08

Please sign in to comment.