From 3cdf2b863e223bcb4ebccf680617c4a2d1e75f59 Mon Sep 17 00:00:00 2001 From: "Ilya.Usov" Date: Thu, 19 Oct 2023 12:18:02 +0200 Subject: [PATCH 1/4] Optimize intersection --- .../com/jetbrains/rd/util/lifetime/RLifetime.kt | 13 +++++++++++-- .../com/jetbrains/rd/util/reactive/Property.kt | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/lifetime/RLifetime.kt b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/lifetime/RLifetime.kt index da39f658a..f63086570 100644 --- a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/lifetime/RLifetime.kt +++ b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/lifetime/RLifetime.kt @@ -49,7 +49,15 @@ sealed class Lifetime { * Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates. * Created lifetime inherits the smallest [terminationTimeoutKind] */ - fun intersect(lifetime1: Lifetime, lifetime2: Lifetime): Lifetime = defineIntersection(lifetime1, lifetime2).lifetime + fun intersect(lifetime1: Lifetime, lifetime2: Lifetime): Lifetime { + if (lifetime1 === lifetime2 || lifetime2.isEternal) + return lifetime1 + + if (lifetime1.isEternal) + return lifetime2 + + return defineIntersection(lifetime1, lifetime2).lifetime + } /** * Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates. @@ -642,7 +650,8 @@ operator fun Lifetime.plusAssign(action : () -> Unit) = onTermination(action) * Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates. * Created lifetime inherits the smallest [terminationTimeoutKind] */ -fun Lifetime.intersect(lifetime: Lifetime): LifetimeDefinition = Lifetime.defineIntersection(this, lifetime) +fun Lifetime.intersect(lifetime: Lifetime): Lifetime = Lifetime.intersect(this, lifetime) +fun Lifetime.defineIntersection(lifetime: Lifetime): LifetimeDefinition = Lifetime.defineIntersection(this, lifetime) inline fun Lifetime.view(viewable: IViewable, crossinline handler: Lifetime.(T) -> Unit) { viewable.view(this) { lt, value -> lt.handler(value) } diff --git a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/reactive/Property.kt b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/reactive/Property.kt index 9735541aa..b349f5551 100644 --- a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/reactive/Property.kt +++ b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/reactive/Property.kt @@ -114,7 +114,7 @@ class WriteOnceProperty : IOptProperty { if (def.isNotAlive || lifetime.isNotAlive) return val nestedDef = def.intersect(lifetime) - super.advise(nestedDef.lifetime, handler) + super.advise(nestedDef, handler) } override fun fire(value: T) { From 221bb5aaaa2bc23858f745ae2d9e3615adeba2f0 Mon Sep 17 00:00:00 2001 From: "Ilya.Usov" Date: Thu, 19 Oct 2023 12:19:41 +0200 Subject: [PATCH 2/4] fix memory leak: undisposed intersected lifetime get rid of lifetime for IRdWireableDispatchHelper::dispatch to avoid mixing lifetimes --- .../com/jetbrains/rd/framework/MessageBroker.kt | 4 ++-- .../com/jetbrains/rd/framework/base/IRdReactive.kt | 3 +-- .../com/jetbrains/rd/framework/impl/AsyncRdMap.kt | 4 ++-- .../com/jetbrains/rd/framework/impl/AsyncRdSet.kt | 4 ++-- .../kotlin/com/jetbrains/rd/framework/impl/RdTask.kt | 4 ++-- rd-net/RdFramework/Base/IRdBindable.cs | 11 +++-------- rd-net/RdFramework/Impl/AsyncRdMap.cs | 2 +- rd-net/RdFramework/Impl/AsyncRdSet.cs | 2 +- rd-net/RdFramework/Impl/MessageBroker.cs | 4 ++-- rd-net/RdFramework/Tasks/WiredRdTask.cs | 4 ++-- 10 files changed, 18 insertions(+), 24 deletions(-) diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt index 1816ffcd6..56f9626f3 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt @@ -102,8 +102,8 @@ class MessageBroker(queueMessages: Boolean = false) : IPrintable { private val messageContext: ProtocolContexts.MessageContext ) : IRdWireableDispatchHelper { - override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) { - doDispatch(lifetime.intersect(this.lifetime), scheduler ?: protocol.scheduler, action) + override fun dispatch(scheduler: IScheduler?, action: () -> Unit) { + doDispatch(lifetime, scheduler ?: protocol.scheduler, action) } private fun doDispatch(lifetime: Lifetime, scheduler: IScheduler, action: () -> Unit) { diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/base/IRdReactive.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/base/IRdReactive.kt index 6715d3215..07a0bda47 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/base/IRdReactive.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/base/IRdReactive.kt @@ -35,8 +35,7 @@ interface IRdWireableDispatchHelper { val rdId: RdId val lifetime: Lifetime - fun dispatch(lifetime: Lifetime = this.lifetime, scheduler: IScheduler? = null, action: () -> Unit) - fun dispatch(scheduler: IScheduler? = null, action: () -> Unit) = dispatch(lifetime, scheduler, action) + fun dispatch(scheduler: IScheduler? = null, action: () -> Unit) } diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/AsyncRdMap.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/AsyncRdMap.kt index aea3adf52..ccc6ed4ac 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/AsyncRdMap.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/AsyncRdMap.kt @@ -221,8 +221,8 @@ class AsyncRdMap private constructor( override val lifetime: Lifetime get() = dispatchHelper.lifetime - override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) { - dispatchHelper.dispatch(lifetime, SynchronousScheduler, action) + override fun dispatch(scheduler: IScheduler?, action: () -> Unit) { + dispatchHelper.dispatch(SynchronousScheduler, action) } }) } diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/AsyncRdSet.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/AsyncRdSet.kt index 8a37966b5..61825d86a 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/AsyncRdSet.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/AsyncRdSet.kt @@ -190,8 +190,8 @@ class AsyncRdSet private constructor( override val lifetime: Lifetime get() = dispatchHelper.lifetime - override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) { - dispatchHelper.dispatch(lifetime, SynchronousScheduler, action) + override fun dispatch(scheduler: IScheduler?, action: () -> Unit) { + dispatchHelper.dispatch(SynchronousScheduler, action) } }) } diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt index 30ece6630..12f834d88 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt @@ -130,7 +130,7 @@ class CallSiteWiredRdTask( } else if (resultFromWire is RdTaskResult.Cancelled) sendCancellation() - dispatchHelper.dispatch(outerLifetime, wireScheduler) { + dispatchHelper.dispatch(wireScheduler) { if (!result.setIfEmpty(resultFromWire)) RdReactiveBase.logReceived.trace { "call `${call.location}` (${call.rdid}) response was dropped, task result is: ${result.valueOrNull}" } } @@ -192,7 +192,7 @@ class EndpointWiredRdTask( RdReactiveBase.logReceived.trace { "received cancellation" } buffer.readVoid() //nothing just a void value - dispatchHelper.dispatch(lifetime, wireScheduler) { + dispatchHelper.dispatch(wireScheduler) { val success = result.setIfEmpty(RdTaskResult.Cancelled()) val wireScheduler = call.protocol?.scheduler if (success || wireScheduler == null) diff --git a/rd-net/RdFramework/Base/IRdBindable.cs b/rd-net/RdFramework/Base/IRdBindable.cs index 110e5ddbe..f39f7f3ea 100644 --- a/rd-net/RdFramework/Base/IRdBindable.cs +++ b/rd-net/RdFramework/Base/IRdBindable.cs @@ -46,24 +46,19 @@ public interface IRdWireableDispatchHelper RdId RdId { get; } Lifetime Lifetime { get; } - public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action); + public void Dispatch(IScheduler? scheduler, Action action); } public static class RdWireableDispatchHelperEx { - public static void Dispatch(this IRdWireableDispatchHelper helper, Lifetime lifetime, Action action) - { - helper.Dispatch(lifetime, null, action); - } - public static void Dispatch(this IRdWireableDispatchHelper helper, IScheduler? scheduler, Action action) { - helper.Dispatch(helper.Lifetime, scheduler, action); + helper.Dispatch(scheduler, action); } public static void Dispatch(this IRdWireableDispatchHelper helper, Action action) { - helper.Dispatch(helper.Lifetime, null, action); + helper.Dispatch(null, action); } } diff --git a/rd-net/RdFramework/Impl/AsyncRdMap.cs b/rd-net/RdFramework/Impl/AsyncRdMap.cs index aeded2755..f52143345 100644 --- a/rd-net/RdFramework/Impl/AsyncRdMap.cs +++ b/rd-net/RdFramework/Impl/AsyncRdMap.cs @@ -224,7 +224,7 @@ public DelegatingDispatchHelper(IRdWireableDispatchHelper dispatchHelper) myDispatchHelper = dispatchHelper; } - public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action) + public void Dispatch(IScheduler? scheduler, Action action) { myDispatchHelper.Dispatch(SynchronousScheduler.Instance, action); } diff --git a/rd-net/RdFramework/Impl/AsyncRdSet.cs b/rd-net/RdFramework/Impl/AsyncRdSet.cs index 072ac2804..3b6970db5 100644 --- a/rd-net/RdFramework/Impl/AsyncRdSet.cs +++ b/rd-net/RdFramework/Impl/AsyncRdSet.cs @@ -252,7 +252,7 @@ public DelegatingDispatchHelper(IRdWireableDispatchHelper dispatchHelper) myDispatchHelper = dispatchHelper; } - public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action) + public void Dispatch(IScheduler? scheduler, Action action) { myDispatchHelper.Dispatch(SynchronousScheduler.Instance, action); } diff --git a/rd-net/RdFramework/Impl/MessageBroker.cs b/rd-net/RdFramework/Impl/MessageBroker.cs index cae0f76ba..0b9ba365f 100644 --- a/rd-net/RdFramework/Impl/MessageBroker.cs +++ b/rd-net/RdFramework/Impl/MessageBroker.cs @@ -121,9 +121,9 @@ internal RdWireableDispatchHelper(Lifetime lifetime, ILog log, IRdWireable wirea myMessageContext = messageContext; } - public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action) + public void Dispatch(IScheduler? scheduler, Action action) { - DoDispatch(lifetime.Intersect(Lifetime), scheduler ?? myProtocol.Scheduler, action); + DoDispatch(Lifetime, scheduler ?? myProtocol.Scheduler, action); } private void DoDispatch(Lifetime lifetime, IScheduler scheduler, Action action) diff --git a/rd-net/RdFramework/Tasks/WiredRdTask.cs b/rd-net/RdFramework/Tasks/WiredRdTask.cs index acd9ab51b..6f215bbb3 100644 --- a/rd-net/RdFramework/Tasks/WiredRdTask.cs +++ b/rd-net/RdFramework/Tasks/WiredRdTask.cs @@ -124,7 +124,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf SendCancellation(); - dispatchHelper.Dispatch(myOuterLifetime, WireScheduler, () => + dispatchHelper.Dispatch(WireScheduler, () => { if (!ResultInternal.SetIfEmpty(taskResult)) Trace(RdReactiveBase.ourLogReceived, "response from wire was rejected because task already has result"); @@ -203,7 +203,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf Trace(RdReactiveBase.ourLogReceived, "received cancellation"); reader.ReadVoid(); //nothing just a void value - dispatchHelper.Dispatch(Lifetime, WireScheduler, () => + dispatchHelper.Dispatch(WireScheduler, () => { var success = ResultInternal.SetIfEmpty(RdTaskResult.Cancelled()); var protocolScheduler = myCall.TryGetProto()?.Scheduler; From 1c4c9c7a96e3e841a346ca4bc352086534e66930 Mon Sep 17 00:00:00 2001 From: "Ilya.Usov" Date: Thu, 19 Oct 2023 13:08:38 +0200 Subject: [PATCH 3/4] Terminate intersected lifetime if rd task result is not bindable --- .../com/jetbrains/rd/framework/impl/RdTask.kt | 12 ++++++++---- rd-net/RdFramework/Tasks/RdCall.cs | 13 +++++++++---- .../Reflection/data/Generated/RefExt.cs | 2 +- .../Reflection/data/Generated/RefRoot.cs | 2 +- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt index 12f834d88..0ae441d99 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt @@ -365,10 +365,14 @@ class RdCall(internal val requestSzr: ISerializer = Polymorphi val taskId = proto.identity.next(RdId.Null) val bindLifetime = bindLifetime - val taskLifetime = lifetime.intersect(bindLifetime) - - val task = CallSiteWiredRdTask(taskLifetime, this, taskId, scheduler ?: proto.scheduler) - taskLifetime.executeIfAlive { + val intersectedDef = lifetime.defineIntersection(bindLifetime) + val task = CallSiteWiredRdTask(intersectedDef.lifetime, this, taskId, scheduler ?: proto.scheduler) + task.result.advise(intersectedDef.lifetime) { + if (it !is RdTaskResult.Success || !it.value.isBindable()) { + intersectedDef.terminate() + } + } + intersectedDef.lifetime.executeIfAlive { proto.wire.send(rdid) { buffer -> logSend.trace { "call `$location`::($rdid) send${sync.condstr {" SYNC"}} request '$taskId' : ${request.printToString()} " } taskId.write(buffer) diff --git a/rd-net/RdFramework/Tasks/RdCall.cs b/rd-net/RdFramework/Tasks/RdCall.cs index a1e2ac3ba..1d5316fb9 100644 --- a/rd-net/RdFramework/Tasks/RdCall.cs +++ b/rd-net/RdFramework/Tasks/RdCall.cs @@ -200,10 +200,15 @@ private IRdTask StartInternal(Lifetime requestLifetime, TReq request, ISch var taskId = proto.Identities.Next(RdId.Nil); - var taskLifetime = Lifetime.Intersect(requestLifetime, myBindLifetime); - var task = new WiredRdTask.CallSite(taskLifetime, this, taskId, scheduler ?? proto.Scheduler); - - using var cookie = taskLifetime.UsingExecuteIfAlive(); + var intersectedDef = Lifetime.DefineIntersection(requestLifetime, myBindLifetime); + var task = new WiredRdTask.CallSite(intersectedDef.Lifetime, this, taskId, scheduler ?? proto.Scheduler); + task.Result.Advise(intersectedDef.Lifetime, result => + { + if (result.Status != RdTaskStatus.Success || !result.Result.IsBindable()) + intersectedDef.Terminate(); + }); + + using var cookie = intersectedDef.UsingExecuteIfAlive(); if (cookie.Succeed) { proto.Wire.Send(RdId, (writer) => diff --git a/rd-net/Test.RdFramework/Reflection/data/Generated/RefExt.cs b/rd-net/Test.RdFramework/Reflection/data/Generated/RefExt.cs index 99b4ba995..1a4f9282b 100644 --- a/rd-net/Test.RdFramework/Reflection/data/Generated/RefExt.cs +++ b/rd-net/Test.RdFramework/Reflection/data/Generated/RefExt.cs @@ -1,6 +1,6 @@ //------------------------------------------------------------------------------ // -// This code was generated by a RdGen v1.11. +// This code was generated by a RdGen v1.12. // // Changes to this file may cause incorrect behavior and will be lost if // the code is regenerated. diff --git a/rd-net/Test.RdFramework/Reflection/data/Generated/RefRoot.cs b/rd-net/Test.RdFramework/Reflection/data/Generated/RefRoot.cs index d80529c63..d7bb58ad8 100644 --- a/rd-net/Test.RdFramework/Reflection/data/Generated/RefRoot.cs +++ b/rd-net/Test.RdFramework/Reflection/data/Generated/RefRoot.cs @@ -1,6 +1,6 @@ //------------------------------------------------------------------------------ // -// This code was generated by a RdGen v1.11. +// This code was generated by a RdGen v1.12. // // Changes to this file may cause incorrect behavior and will be lost if // the code is regenerated. From d2295138032dabb847025841ab990e051e29a595 Mon Sep 17 00:00:00 2001 From: "Ilya.Usov" Date: Thu, 19 Oct 2023 13:49:11 +0200 Subject: [PATCH 4/4] Allow termination under execution because of reentrancy in tests. Callback can be executed synchronously inside `send` --- .../src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt | 2 +- rd-net/RdFramework/Tasks/RdCall.cs | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt index 0ae441d99..6ea884525 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdTask.kt @@ -369,7 +369,7 @@ class RdCall(internal val requestSzr: ISerializer = Polymorphi val task = CallSiteWiredRdTask(intersectedDef.lifetime, this, taskId, scheduler ?: proto.scheduler) task.result.advise(intersectedDef.lifetime) { if (it !is RdTaskResult.Success || !it.value.isBindable()) { - intersectedDef.terminate() + intersectedDef.terminate(true) } } intersectedDef.lifetime.executeIfAlive { diff --git a/rd-net/RdFramework/Tasks/RdCall.cs b/rd-net/RdFramework/Tasks/RdCall.cs index 1d5316fb9..c048e305a 100644 --- a/rd-net/RdFramework/Tasks/RdCall.cs +++ b/rd-net/RdFramework/Tasks/RdCall.cs @@ -205,7 +205,10 @@ private IRdTask StartInternal(Lifetime requestLifetime, TReq request, ISch task.Result.Advise(intersectedDef.Lifetime, result => { if (result.Status != RdTaskStatus.Success || !result.Result.IsBindable()) + { + intersectedDef.AllowTerminationUnderExecution = true; intersectedDef.Terminate(); + } }); using var cookie = intersectedDef.UsingExecuteIfAlive();