Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak #442

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ sealed class Lifetime {
* Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates.
* Created lifetime inherits the smallest [terminationTimeoutKind]
*/
fun intersect(lifetime1: Lifetime, lifetime2: Lifetime): Lifetime = defineIntersection(lifetime1, lifetime2).lifetime
fun intersect(lifetime1: Lifetime, lifetime2: Lifetime): Lifetime {
if (lifetime1 === lifetime2 || lifetime2.isEternal)
return lifetime1

if (lifetime1.isEternal)
return lifetime2

return defineIntersection(lifetime1, lifetime2).lifetime
}

/**
* Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates.
Expand Down Expand Up @@ -642,7 +650,8 @@ operator fun Lifetime.plusAssign(action : () -> Unit) = onTermination(action)
* Creates an intersection of some lifetimes: new lifetime that terminate when either one terminates.
* Created lifetime inherits the smallest [terminationTimeoutKind]
*/
fun Lifetime.intersect(lifetime: Lifetime): LifetimeDefinition = Lifetime.defineIntersection(this, lifetime)
fun Lifetime.intersect(lifetime: Lifetime): Lifetime = Lifetime.intersect(this, lifetime)
fun Lifetime.defineIntersection(lifetime: Lifetime): LifetimeDefinition = Lifetime.defineIntersection(this, lifetime)

inline fun <T> Lifetime.view(viewable: IViewable<T>, crossinline handler: Lifetime.(T) -> Unit) {
viewable.view(this) { lt, value -> lt.handler(value) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class WriteOnceProperty<T : Any> : IOptProperty<T> {
if (def.isNotAlive || lifetime.isNotAlive) return

val nestedDef = def.intersect(lifetime)
super.advise(nestedDef.lifetime, handler)
super.advise(nestedDef, handler)
}

override fun fire(value: T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class MessageBroker(queueMessages: Boolean = false) : IPrintable {
private val messageContext: ProtocolContexts.MessageContext
) : IRdWireableDispatchHelper {

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
doDispatch(lifetime.intersect(this.lifetime), scheduler ?: protocol.scheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
doDispatch(lifetime, scheduler ?: protocol.scheduler, action)
}

private fun doDispatch(lifetime: Lifetime, scheduler: IScheduler, action: () -> Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ interface IRdWireableDispatchHelper {
val rdId: RdId
val lifetime: Lifetime

fun dispatch(lifetime: Lifetime = this.lifetime, scheduler: IScheduler? = null, action: () -> Unit)
fun dispatch(scheduler: IScheduler? = null, action: () -> Unit) = dispatch(lifetime, scheduler, action)
fun dispatch(scheduler: IScheduler? = null, action: () -> Unit)
}


Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ class AsyncRdMap<K : Any, V : Any> private constructor(
override val lifetime: Lifetime
get() = dispatchHelper.lifetime

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(lifetime, SynchronousScheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(SynchronousScheduler, action)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ class AsyncRdSet<T : Any> private constructor(
override val lifetime: Lifetime
get() = dispatchHelper.lifetime

override fun dispatch(lifetime: Lifetime, scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(lifetime, SynchronousScheduler, action)
override fun dispatch(scheduler: IScheduler?, action: () -> Unit) {
dispatchHelper.dispatch(SynchronousScheduler, action)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class CallSiteWiredRdTask<TReq, TRes>(
} else if (resultFromWire is RdTaskResult.Cancelled)
sendCancellation()

dispatchHelper.dispatch(outerLifetime, wireScheduler) {
dispatchHelper.dispatch(wireScheduler) {
if (!result.setIfEmpty(resultFromWire))
RdReactiveBase.logReceived.trace { "call `${call.location}` (${call.rdid}) response was dropped, task result is: ${result.valueOrNull}" }
}
Expand Down Expand Up @@ -192,7 +192,7 @@ class EndpointWiredRdTask<TReq, TRes>(
RdReactiveBase.logReceived.trace { "received cancellation" }
buffer.readVoid() //nothing just a void value

dispatchHelper.dispatch(lifetime, wireScheduler) {
dispatchHelper.dispatch(wireScheduler) {
val success = result.setIfEmpty(RdTaskResult.Cancelled())
val wireScheduler = call.protocol?.scheduler
if (success || wireScheduler == null)
Expand Down Expand Up @@ -365,10 +365,14 @@ class RdCall<TReq, TRes>(internal val requestSzr: ISerializer<TReq> = Polymorphi

val taskId = proto.identity.next(RdId.Null)
val bindLifetime = bindLifetime
val taskLifetime = lifetime.intersect(bindLifetime)

val task = CallSiteWiredRdTask(taskLifetime, this, taskId, scheduler ?: proto.scheduler)
taskLifetime.executeIfAlive {
val intersectedDef = lifetime.defineIntersection(bindLifetime)
val task = CallSiteWiredRdTask(intersectedDef.lifetime, this, taskId, scheduler ?: proto.scheduler)
task.result.advise(intersectedDef.lifetime) {
if (it !is RdTaskResult.Success || !it.value.isBindable()) {
intersectedDef.terminate(true)
}
}
intersectedDef.lifetime.executeIfAlive {
proto.wire.send(rdid) { buffer ->
logSend.trace { "call `$location`::($rdid) send${sync.condstr {" SYNC"}} request '$taskId' : ${request.printToString()} " }
taskId.write(buffer)
Expand Down
11 changes: 3 additions & 8 deletions rd-net/RdFramework/Base/IRdBindable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,19 @@ public interface IRdWireableDispatchHelper
RdId RdId { get; }
Lifetime Lifetime { get; }

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action);
public void Dispatch(IScheduler? scheduler, Action action);
}

public static class RdWireableDispatchHelperEx
{
public static void Dispatch(this IRdWireableDispatchHelper helper, Lifetime lifetime, Action action)
{
helper.Dispatch(lifetime, null, action);
}

public static void Dispatch(this IRdWireableDispatchHelper helper, IScheduler? scheduler, Action action)
{
helper.Dispatch(helper.Lifetime, scheduler, action);
helper.Dispatch(scheduler, action);
}

public static void Dispatch(this IRdWireableDispatchHelper helper, Action action)
{
helper.Dispatch(helper.Lifetime, null, action);
helper.Dispatch(null, action);
}
}

Expand Down
2 changes: 1 addition & 1 deletion rd-net/RdFramework/Impl/AsyncRdMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public DelegatingDispatchHelper(IRdWireableDispatchHelper dispatchHelper)
myDispatchHelper = dispatchHelper;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
myDispatchHelper.Dispatch(SynchronousScheduler.Instance, action);
}
Expand Down
2 changes: 1 addition & 1 deletion rd-net/RdFramework/Impl/AsyncRdSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public DelegatingDispatchHelper(IRdWireableDispatchHelper dispatchHelper)
myDispatchHelper = dispatchHelper;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
myDispatchHelper.Dispatch(SynchronousScheduler.Instance, action);
}
Expand Down
4 changes: 2 additions & 2 deletions rd-net/RdFramework/Impl/MessageBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ internal RdWireableDispatchHelper(Lifetime lifetime, ILog log, IRdWireable wirea
myMessageContext = messageContext;
}

public void Dispatch(Lifetime lifetime, IScheduler? scheduler, Action action)
public void Dispatch(IScheduler? scheduler, Action action)
{
DoDispatch(lifetime.Intersect(Lifetime), scheduler ?? myProtocol.Scheduler, action);
DoDispatch(Lifetime, scheduler ?? myProtocol.Scheduler, action);
}

private void DoDispatch(Lifetime lifetime, IScheduler scheduler, Action action)
Expand Down
16 changes: 12 additions & 4 deletions rd-net/RdFramework/Tasks/RdCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,18 @@ private IRdTask<TRes> StartInternal(Lifetime requestLifetime, TReq request, ISch

var taskId = proto.Identities.Next(RdId.Nil);

var taskLifetime = Lifetime.Intersect(requestLifetime, myBindLifetime);
var task = new WiredRdTask<TReq, TRes>.CallSite(taskLifetime, this, taskId, scheduler ?? proto.Scheduler);

using var cookie = taskLifetime.UsingExecuteIfAlive();
var intersectedDef = Lifetime.DefineIntersection(requestLifetime, myBindLifetime);
var task = new WiredRdTask<TReq,TRes>.CallSite(intersectedDef.Lifetime, this, taskId, scheduler ?? proto.Scheduler);
task.Result.Advise(intersectedDef.Lifetime, result =>
{
if (result.Status != RdTaskStatus.Success || !result.Result.IsBindable())
{
intersectedDef.AllowTerminationUnderExecution = true;
intersectedDef.Terminate();
}
});

using var cookie = intersectedDef.UsingExecuteIfAlive();
if (cookie.Succeed)
{
proto.Wire.Send(RdId, (writer) =>
Expand Down
4 changes: 2 additions & 2 deletions rd-net/RdFramework/Tasks/WiredRdTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
SendCancellation();


dispatchHelper.Dispatch(myOuterLifetime, WireScheduler, () =>
dispatchHelper.Dispatch(WireScheduler, () =>
{
if (!ResultInternal.SetIfEmpty(taskResult))
Trace(RdReactiveBase.ourLogReceived, "response from wire was rejected because task already has result");
Expand Down Expand Up @@ -203,7 +203,7 @@ public override void OnWireReceived(IProtocol proto, SerializationCtx ctx, Unsaf
Trace(RdReactiveBase.ourLogReceived, "received cancellation");
reader.ReadVoid(); //nothing just a void value

dispatchHelper.Dispatch(Lifetime, WireScheduler, () =>
dispatchHelper.Dispatch(WireScheduler, () =>
{
var success = ResultInternal.SetIfEmpty(RdTaskResult<TRes>.Cancelled());
var protocolScheduler = myCall.TryGetProto()?.Scheduler;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a RdGen v1.11.
// This code was generated by a RdGen v1.12.
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a RdGen v1.11.
// This code was generated by a RdGen v1.12.
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
Expand Down
Loading