From 80d3a0832aad4f60101083a8a90c5f91d4b902aa Mon Sep 17 00:00:00 2001 From: y9san9 Date: Wed, 28 Aug 2024 15:53:01 +0300 Subject: [PATCH] feat: added a possibility to recover --- .../kotlin/me/y9san9/aqueue/flow/AQueue.kt | 22 +++++++++++++------ gradle/libs.versions.toml | 2 +- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/commonMain/kotlin/me/y9san9/aqueue/flow/AQueue.kt b/core/src/commonMain/kotlin/me/y9san9/aqueue/flow/AQueue.kt index b02bce4..3d2ac8d 100644 --- a/core/src/commonMain/kotlin/me/y9san9/aqueue/flow/AQueue.kt +++ b/core/src/commonMain/kotlin/me/y9san9/aqueue/flow/AQueue.kt @@ -3,9 +3,7 @@ package me.y9san9.aqueue.flow import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.channelFlow -import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import me.y9san9.aqueue.AQueue import kotlin.coroutines.CoroutineContext @@ -17,19 +15,27 @@ import kotlin.coroutines.EmptyCoroutineContext * @param key It is guaranteed that requests with the same [key] will be executed consecutively * @param context The context that is used to launch new coroutines. You may limit parallelism using context * @param queue The queue used to parallel flow + * @param recover The action to perform in case of exception * @param transform The action to perform with request */ public fun Flow.mapInAQueue( queue: AQueue = AQueue(), key: suspend (T) -> Any? = { null }, context: CoroutineContext = EmptyCoroutineContext, - transform: suspend (T) -> R, + recover: suspend FlowCollector.(Throwable) -> Unit = { throw it }, + transform: suspend (T) -> R ): Flow { return channelFlow { collect { element -> launch(start = CoroutineStart.UNDISPATCHED) { - val result = queue.execute(key(element), context) { transform(element) } - send(result) + runCatching { + queue.execute(key(element), context) { transform(element) } + }.onFailure { throwable -> + val flow = flow { recover(throwable) } + flow.collect(::send) + }.onSuccess { element -> + send(element) + } } } } @@ -42,6 +48,7 @@ public fun Flow.mapInAQueue( * @param key It is guaranteed that requests with the same [key] will be executed consecutively * @param context The context that is used to launch new coroutines. You may limit parallelism using context * @param queue The queue used to parallel flow + * @param recover The action to perform in case of exception * @param block The action to perform with request */ public fun Flow.launchInAQueue( @@ -49,7 +56,8 @@ public fun Flow.launchInAQueue( queue: AQueue = AQueue(), key: suspend (T) -> Any? = { null }, context: CoroutineContext = EmptyCoroutineContext, + recover: suspend (Throwable) -> Unit = { throw it }, block: suspend (T) -> Unit ): Job { - return mapInAQueue(queue, key, context, block).launchIn(scope) + return mapInAQueue(queue, key, context, { throwable -> recover(throwable) }, block).launchIn(scope) } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 58d5895..538f6c4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,7 +4,7 @@ kotlin = "2.0.0" coroutines = "1.8.0" maven-publish = "0.29.0" -aqueue = "1.0.8" +aqueue = "1.0.9" [libraries]