Skip to content

Commit

Permalink
Fixed initialization logic for Join operators, to ensure that only on…
Browse files Browse the repository at this point in the history
…e initial changeset is emitted, and that it emits only after both sources have emitted their initialization changeset.
  • Loading branch information
JakenVeina committed Sep 11, 2024
1 parent 741cf6b commit 21921f9
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 77 deletions.
26 changes: 26 additions & 0 deletions src/DynamicData.Tests/Cache/InnerJoinFixture.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;

using DynamicData.Tests.Utilities;

using FluentAssertions;

using Xunit;
Expand Down Expand Up @@ -206,6 +208,30 @@ public void MoreRight()

}

[Fact]
public void InitializationWaitsForBothSources()
{
var left = new[] { 1, 2, 3 };
var right = new[] { 4, 6, 2 };

ObservableCacheEx
.InnerJoin(
left: left.AsObservableChangeSet(static left => 2 * left),
right: right.AsObservableChangeSet(static right => right),
rightKeySelector: static right => right,
resultSelector: static (left, right) => (left, right))
.ValidateSynchronization()
.ValidateChangeSets(static pair => (2 * pair.left, pair.right))
.RecordCacheItems(out var results);

results.Error.Should().BeNull();

results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset.");
results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes.");

results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly");
}

public class Device(string name) : IEquatable<Device>
{
public string Name { get; } = name;
Expand Down
27 changes: 27 additions & 0 deletions src/DynamicData.Tests/Cache/LeftJoinFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Linq;

using DynamicData.Kernel;
using DynamicData.Tests.Utilities;

using FluentAssertions;

Expand Down Expand Up @@ -165,6 +166,32 @@ public void UpdateRight()
_result.Data.Items.All(dwm => dwm.MetaData != Optional<DeviceMetaData>.None).Should().BeTrue();
}

[Fact]
public void InitializationWaitsForBothSources()
{
// https://github.com/reactivemarbles/DynamicData/issues/943

var left = new[] { 1, 2, 3 };
var right = new[] { 4, 6, 2 };

ObservableCacheEx
.LeftJoin(
left: left.AsObservableChangeSet(static left => 2 * left),
right: right.AsObservableChangeSet(static right => right),
rightKeySelector: static right => right,
resultSelector: static (left, right) => (left, right: right.HasValue ? right.Value : default(int?)))
.ValidateSynchronization()
.ValidateChangeSets(static pair => 2 * pair.left)
.RecordCacheItems(out var results);

results.Error.Should().BeNull();

results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset.");
results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes.");

results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly");
}

public class Device(string name) : IEquatable<Device>
{
public string Name { get; } = name;
Expand Down
23 changes: 23 additions & 0 deletions src/DynamicData.Tests/Cache/RightJoinFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Linq;

using DynamicData.Kernel;
using DynamicData.Tests.Utilities;

using FluentAssertions;

Expand Down Expand Up @@ -216,7 +217,29 @@ public void MoreRight()
_result.Data.Items.Count(dwm => dwm.Device == Optional<Device>.None).Should().Be(1);
}

[Fact]
public void InitializationWaitsForBothSources()
{
var left = new[] { 1, 2, 3 };
var right = new[] { 4, 6, 2 };

ObservableCacheEx
.RightJoin(
left: left.AsObservableChangeSet(static left => 2 * left),
right: right.AsObservableChangeSet(static right => right),
rightKeySelector: static right => right,
resultSelector: static (left, right) => (left: left.HasValue ? left.Value : default(int?), right))
.ValidateSynchronization()
.ValidateChangeSets(static pair => pair.right)
.RecordCacheItems(out var results);

results.Error.Should().BeNull();

results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset.");
results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes.");

results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly");
}

public class Device(string name) : IEquatable<Device>
{
Expand Down
81 changes: 45 additions & 36 deletions src/DynamicData/Cache/Internal/InnerJoin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,47 +37,52 @@ internal sealed class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination
// joined is the final cache
var joinedCache = new ChangeAwareCache<TDestination, (TLeftKey, TRightKey)>();
var leftLoader = leftCache.Connect().Select(changes =>
{
foreach (var change in changes.ToConcreteType())
{
var leftCurrent = change.Current;
var rightLookup = rightGrouped.Lookup(change.Key);
var hasInitialized = false;
if (rightLookup.HasValue)
var leftLoader = leftCache.Connect()
.Select(changes =>
{
foreach (var change in changes.ToConcreteType())
{
switch (change.Reason)
var leftCurrent = change.Current;
var rightLookup = rightGrouped.Lookup(change.Key);
if (rightLookup.HasValue)
{
case ChangeReason.Add:
case ChangeReason.Update:
foreach (var keyvalue in rightLookup.Value.KeyValues)
{
joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurrent, keyvalue.Value), (change.Key, keyvalue.Key));
}
break;
case ChangeReason.Remove:
foreach (var keyvalue in rightLookup.Value.KeyValues)
{
joinedCache.Remove((change.Key, keyvalue.Key));
}
break;
case ChangeReason.Refresh:
foreach (var key in rightLookup.Value.Keys)
{
joinedCache.Refresh((change.Key, key));
}
break;
switch (change.Reason)
{
case ChangeReason.Add:
case ChangeReason.Update:
foreach (var keyvalue in rightLookup.Value.KeyValues)
{
joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurrent, keyvalue.Value), (change.Key, keyvalue.Key));
}
break;
case ChangeReason.Remove:
foreach (var keyvalue in rightLookup.Value.KeyValues)
{
joinedCache.Remove((change.Key, keyvalue.Key));
}
break;
case ChangeReason.Refresh:
foreach (var key in rightLookup.Value.Keys)
{
joinedCache.Refresh((change.Key, key));
}
break;
}
}
}
}
return joinedCache.CaptureChanges();
});
return joinedCache.CaptureChanges();
})
// Don't forward initial changesets from the left side, only the right
.Where(_ => hasInitialized);
var rightLoader = rightCache.Connect().Select(changes =>
{
Expand Down Expand Up @@ -119,7 +124,11 @@ internal sealed class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination
lock (locker)
{
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect());
var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer);
hasInitialized = true;
return new CompositeDisposable(observerSubscription, leftCache, rightCache, rightShare.Connect());
}
});
}
19 changes: 14 additions & 5 deletions src/DynamicData/Cache/Internal/LeftJoin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ public IObservable<IChangeSet<TDestination, TLeftKey>> Run() => Observable.Creat
var locker = new object();
// create local backing stores
var leftCache = _left.Synchronize(locker).AsObservableCache(false);
var leftShare = _left.Synchronize(locker).Publish();
var leftCache = leftShare.AsObservableCache(false);
var rightCache = _right.Synchronize(locker).ChangeKey(_rightKeySelector).AsObservableCache(false);
// joined is the final cache
var joined = new ChangeAwareCache<TDestination, TLeftKey>();
var hasInitialized = false;
var leftLoader = leftCache.Connect().Select(
changes =>
{
Expand Down Expand Up @@ -66,8 +69,8 @@ public IObservable<IChangeSet<TDestination, TLeftKey>> Run() => Observable.Creat
return joined.CaptureChanges();
});
var rightLoader = rightCache.Connect().Select(
changes =>
var rightLoader = rightCache.Connect()
.Select(changes =>
{
foreach (var change in changes.ToConcreteType())
{
Expand Down Expand Up @@ -117,11 +120,17 @@ public IObservable<IChangeSet<TDestination, TLeftKey>> Run() => Observable.Creat
}
return joined.CaptureChanges();
});
})
// Don't forward initial changesets from the right side, only the left
.Where(_ => hasInitialized);
lock (locker)
{
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache);
var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer);
hasInitialized = true;
return new CompositeDisposable(observerSubscription, leftCache, rightCache, leftShare.Connect());
}
});
}
84 changes: 48 additions & 36 deletions src/DynamicData/Cache/Internal/RightJoin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public IObservable<IChangeSet<TDestination, TRightKey>> Run() => Observable.Crea
// joined is the final cache
var joinedCache = new ChangeAwareCache<TDestination, TRightKey>();
var hasInitialized = false;
var rightLoader = rightCache.Connect().Select(changes =>
{
foreach (var change in changes.ToConcreteType())
Expand Down Expand Up @@ -70,48 +72,58 @@ public IObservable<IChangeSet<TDestination, TRightKey>> Run() => Observable.Crea
return joinedCache.CaptureChanges();
});
var leftLoader = leftCache.Connect().Select(changes =>
{
foreach (var change in changes.ToConcreteType())
var leftLoader = leftCache.Connect()
.Select(changes =>
{
var left = change.Current;
var right = rightGrouped.Lookup(change.Key);
if (right.HasValue)
foreach (var change in changes.ToConcreteType())
{
switch (change.Reason)
var left = change.Current;
var right = rightGrouped.Lookup(change.Key);
if (right.HasValue)
{
case ChangeReason.Add:
case ChangeReason.Update:
foreach (var keyvalue in right.Value.KeyValues)
{
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, left, keyvalue.Value), keyvalue.Key);
}
break;
case ChangeReason.Remove:
foreach (var keyvalue in right.Value.KeyValues)
{
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, Optional<TLeft>.None, keyvalue.Value), keyvalue.Key);
}
break;
case ChangeReason.Refresh:
foreach (var key in right.Value.Keys)
{
joinedCache.Refresh(key);
}
break;
switch (change.Reason)
{
case ChangeReason.Add:
case ChangeReason.Update:
foreach (var keyvalue in right.Value.KeyValues)
{
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, left, keyvalue.Value), keyvalue.Key);
}
break;
case ChangeReason.Remove:
foreach (var keyvalue in right.Value.KeyValues)
{
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, Optional<TLeft>.None, keyvalue.Value), keyvalue.Key);
}
break;
case ChangeReason.Refresh:
foreach (var key in right.Value.Keys)
{
joinedCache.Refresh(key);
}
break;
}
}
}
}
return joinedCache.CaptureChanges();
});
return joinedCache.CaptureChanges();
})
// Don't forward initial changesets from the left side, only the right
.Where(_ => hasInitialized);
lock (locker)
{
var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer);
hasInitialized = true;
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect());
return new CompositeDisposable(observerSubscription, leftCache, rightCache, rightShare.Connect());
}
});
}

0 comments on commit 21921f9

Please sign in to comment.