Skip to content

Commit

Permalink
Merge pull request #500 from JetBrains/usov/fixes
Browse files Browse the repository at this point in the history
Some improvements
  • Loading branch information
Iliya-usov authored Oct 25, 2024
2 parents cbf1373 + 53544d2 commit a462590
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ private class SchedulerCoroutineDispatcher(private val scheduler: IScheduler, pr
}

val IScheduler.asCoroutineDispatcher get() = (this as? CoroutineDispatcher) ?: asCoroutineDispatcher(false)

@Deprecated("Use asCoroutineDispatcher that doesn't allow inlining because isDispatchNeeded()=false can lead to deadlocks")
fun IScheduler.asCoroutineDispatcher(allowInlining: Boolean): CoroutineDispatcher = SchedulerCoroutineDispatcher(this, allowInlining)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reactive.ISource
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
import kotlin.coroutines.CoroutineContext

Expand Down Expand Up @@ -52,7 +53,14 @@ suspend fun <T : Any> ISource<T?>.nextNotNullValue(): T =
}

fun<T> ISource<T>.adviseSuspend(lifetime: Lifetime, scheduler: IScheduler, handler: suspend (T) -> Unit) {
adviseSuspend(lifetime, scheduler.asCoroutineDispatcher(allowInlining = true), handler)
val context = scheduler.asCoroutineDispatcher
advise(lifetime) {
val start = if (scheduler.isActive) CoroutineStart.UNDISPATCHED else CoroutineStart.DEFAULT
lifetime.launch(context, start) {
handler(it)
}
}

}

fun<T> ISource<T>.adviseSuspend(lifetime: Lifetime, context: CoroutineContext, handler: suspend (T) -> Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.jetbrains.rd.framework.impl.ProtocolContexts
import com.jetbrains.rd.util.Queue
import com.jetbrains.rd.util.Sync
import com.jetbrains.rd.util.blockingPutUnique
import com.jetbrains.rd.util.error
import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.lifetime.intersect
import com.jetbrains.rd.util.lifetime.isAlive
Expand Down Expand Up @@ -87,11 +88,14 @@ class MessageBroker(queueMessages: Boolean = false) : IPrintable {
return
}


AllowBindingCookie.allowBind {
val messageContext = protocol.contexts.readContext(buffer)
val helper = RdWireableDispatchHelper(entry.lifetime, id, protocol, messageContext)
entry.subscription.onWireReceived(buffer, helper)
try {
AllowBindingCookie.allowBind {
val messageContext = protocol.contexts.readContext(buffer)
val helper = RdWireableDispatchHelper(entry.lifetime, id, protocol, messageContext)
entry.subscription.onWireReceived(buffer, helper)
}
} catch (e: Throwable) {
log.error("Unexpected exception happened during processing a protocol event", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ class RdSignal<T>(val valueSerializer: ISerializer<T> = Polymorphic<T>()) : RdRe
val proto = protocol
val ctx = serializationContext

if (proto == null || ctx == null)
return
if (proto != null && ctx != null) {
val wire = proto.wire

val wire = proto.wire

wire.send(rdid) { buffer ->
logSend.trace { "signal `$location` ($rdid):: value = ${value.printToString()}" }
valueSerializer.write(ctx, buffer, value)
wire.send(rdid) { buffer ->
logSend.trace { "signal `$location` ($rdid):: value = ${value.printToString()}" }
valueSerializer.write(ctx, buffer, value)
}
}

signal.fire(value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import com.jetbrains.rd.framework.impl.RdEndpoint
import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.threading.SynchronousScheduler
import com.jetbrains.rd.util.threading.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.async

fun <TReq, TRes> IRdEndpoint<TReq, TRes>.setSuspend(
cancellationScheduler: IScheduler? = null,
Expand All @@ -15,10 +17,10 @@ fun <TReq, TRes> IRdEndpoint<TReq, TRes>.setSuspend(
// wireScheduler is not be available if RdEndpoint is not bound
val coroutineDispatcher by lazy {
val scheduler = handlerScheduler ?: (this as RdEndpoint).protocol?.scheduler ?: SynchronousScheduler
scheduler.asCoroutineDispatcher(allowInlining = true)
scheduler.asCoroutineDispatcher
}

set(cancellationScheduler, handlerScheduler) { lt, req ->
lt.startAsync(coroutineDispatcher) { handler(lt, req) }.toRdTask()
lt.coroutineScope.async(coroutineDispatcher, CoroutineStart.UNDISPATCHED) { handler(lt, req) }.toRdTask()
}
}

0 comments on commit a462590

Please sign in to comment.