Skip to content

Commit

Permalink
Refactor RdCoroutineScope
Browse files Browse the repository at this point in the history
  • Loading branch information
Iliya-usov committed Aug 16, 2023
1 parent 3a14394 commit d753dab
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1,58 +1,44 @@
package com.jetbrains.rd.util.threading.coroutines

import com.jetbrains.rd.util.AtomicReference
import com.jetbrains.rd.util.error
import com.jetbrains.rd.util.getLogger
import com.jetbrains.rd.util.info
import com.jetbrains.rd.util.*
import com.jetbrains.rd.util.lifetime.Lifetime
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import java.lang.IllegalStateException
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

open class RdCoroutineScope(lifetime: Lifetime) : CoroutineScope {
open class RdCoroutineScope : CoroutineScope {
companion object {
private val logger = getLogger<RdCoroutineScope>()

private val default = RdCoroutineScope(Lifetime.Eternal)
private val default = RdCoroutineScope()
private var currentHost: AtomicReference<RdCoroutineScope?> = AtomicReference(null)

val current: RdCoroutineScope get() = currentHost.get() ?: default

/**
* Should be called on start of the application to override the default behavior of the Rd-based coroutines (default dispatcher, exception handler, shutdown behavior).
*/
fun override(lifetime: Lifetime, host: RdCoroutineScope) {
lifetime.bracket({
if (!currentHost.compareAndSet(null, host)) {
throw IllegalStateException("Could not override RdCoroutineHost")
}
fun override(host: RdCoroutineScope) {
if (!currentHost.compareAndSet(null, host))
throw IllegalStateException("Could not override RdCoroutineHost")

logger.info { "RdCoroutineHost overridden" }
}, {
if (!currentHost.compareAndSet(host, null)) {
throw IllegalStateException("currentHost must not be null")
}

logger.info { "RdCoroutineHost has been reset" }
})
logger.debug { "RdCoroutineHost has been overridden" }
host.coroutineContext.job.invokeOnCompletion {
currentHost.getAndSet(null)
logger.debug { "RdCoroutineHost has been reset" }
}
}
}

final override val coroutineContext by lazy {
defaultDispatcher + CoroutineExceptionHandler { _, throwable ->
defaultContext + CoroutineExceptionHandler { _, throwable ->
onException(throwable)
}
}

protected open val defaultDispatcher: CoroutineContext get() = Dispatchers.Default

init {
lifetime.onTermination {
shutdown()
logger.info { "RdCoroutineHost disposed" }
}
}
protected open val defaultContext: CoroutineContext get() = Dispatchers.Default

open fun onException(throwable: Throwable) {
if (throwable !is CancellationException) {
Expand All @@ -79,18 +65,6 @@ open class RdCoroutineScope(lifetime: Lifetime) : CoroutineScope {
val nestedDef = lifetime.createNested()
return launch(context, start, action).also { job -> nestedDef.synchronizeWith(job) }
}

protected open fun shutdown() {
try {
runBlocking {
coroutineContext[Job]!!.cancelAndJoin()
}
} catch (e: CancellationException) {
// nothing
} catch (e: Throwable) {
logger.error(e)
}
}
}

fun Job.noAwait() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.jetbrains.rd.util.log.ErrorAccumulatorLoggerFactory
import com.jetbrains.rd.util.threading.CompoundThrowable
import com.jetbrains.rd.util.threading.TestSingleThreadScheduler
import com.jetbrains.rd.util.threading.coroutines.RdCoroutineScope
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
Expand All @@ -34,12 +35,17 @@ open class CoroutineTestBase {

logger = getLogger<CoroutineTest>()

host = TestSingleThreadCoroutineHost(lifetime, scheduler)
host = TestSingleThreadCoroutineHost(scheduler)

lifetime.onTermination { scheduler.assertNoExceptions() }
lifetime.onTermination { host.assertNoExceptions() }

RdCoroutineScope.override(lifetime, host)
RdCoroutineScope.override(host)
def.onTermination {
runBlocking {
host.coroutineContext.job.cancelAndJoin()
}
}
}

@AfterTest
Expand All @@ -49,8 +55,8 @@ open class CoroutineTestBase {
}
}

class TestSingleThreadCoroutineHost(lifetime: Lifetime, scheduler: TestSingleThreadScheduler) : RdCoroutineScope(lifetime) {
override val defaultDispatcher: CoroutineContext = scheduler.asCoroutineDispatcher
class TestSingleThreadCoroutineHost(scheduler: TestSingleThreadScheduler) : RdCoroutineScope() {
override val defaultContext: CoroutineContext = scheduler.asCoroutineDispatcher + SupervisorJob()

private val exceptions = SynchronizedList<Throwable>()

Expand Down

0 comments on commit d753dab

Please sign in to comment.