diff --git a/src/DynamicData.Tests/Cache/InnerJoinFixture.cs b/src/DynamicData.Tests/Cache/InnerJoinFixture.cs index 397c5802..dcbd37c6 100644 --- a/src/DynamicData.Tests/Cache/InnerJoinFixture.cs +++ b/src/DynamicData.Tests/Cache/InnerJoinFixture.cs @@ -1,5 +1,7 @@ using System; +using DynamicData.Tests.Utilities; + using FluentAssertions; using Xunit; @@ -206,6 +208,30 @@ public void MoreRight() } + [Fact] + public void InitializationWaitsForBothSources() + { + var left = new[] { 1, 2, 3 }; + var right = new[] { 4, 6, 2 }; + + ObservableCacheEx + .InnerJoin( + left: left.AsObservableChangeSet(static left => 2 * left), + right: right.AsObservableChangeSet(static right => right), + rightKeySelector: static right => right, + resultSelector: static (left, right) => (left, right)) + .ValidateSynchronization() + .ValidateChangeSets(static pair => (2 * pair.left, pair.right)) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + + results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset."); + results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes."); + + results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly"); + } + public class Device(string name) : IEquatable { public string Name { get; } = name; diff --git a/src/DynamicData.Tests/Cache/LeftJoinFixture.cs b/src/DynamicData.Tests/Cache/LeftJoinFixture.cs index f2450f93..e290d966 100644 --- a/src/DynamicData.Tests/Cache/LeftJoinFixture.cs +++ b/src/DynamicData.Tests/Cache/LeftJoinFixture.cs @@ -2,6 +2,7 @@ using System.Linq; using DynamicData.Kernel; +using DynamicData.Tests.Utilities; using FluentAssertions; @@ -165,6 +166,32 @@ public void UpdateRight() _result.Data.Items.All(dwm => dwm.MetaData != Optional.None).Should().BeTrue(); } + [Fact] + public void InitializationWaitsForBothSources() + { + // https://github.com/reactivemarbles/DynamicData/issues/943 + + var left = new[] { 1, 2, 3 }; + var right = new[] { 4, 6, 2 }; + + ObservableCacheEx + .LeftJoin( + left: left.AsObservableChangeSet(static left => 2 * left), + right: right.AsObservableChangeSet(static right => right), + rightKeySelector: static right => right, + resultSelector: static (left, right) => (left, right: right.HasValue ? right.Value : default(int?))) + .ValidateSynchronization() + .ValidateChangeSets(static pair => 2 * pair.left) + .RecordCacheItems(out var results); + + results.Error.Should().BeNull(); + + results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset."); + results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes."); + + results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly"); + } + public class Device(string name) : IEquatable { public string Name { get; } = name; diff --git a/src/DynamicData.Tests/Cache/RightJoinFixture.cs b/src/DynamicData.Tests/Cache/RightJoinFixture.cs index 8a8e63dc..653b971d 100644 --- a/src/DynamicData.Tests/Cache/RightJoinFixture.cs +++ b/src/DynamicData.Tests/Cache/RightJoinFixture.cs @@ -2,6 +2,7 @@ using System.Linq; using DynamicData.Kernel; +using DynamicData.Tests.Utilities; using FluentAssertions; @@ -216,7 +217,29 @@ public void MoreRight() _result.Data.Items.Count(dwm => dwm.Device == Optional.None).Should().Be(1); } + [Fact] + public void InitializationWaitsForBothSources() + { + var left = new[] { 1, 2, 3 }; + var right = new[] { 4, 6, 2 }; + + ObservableCacheEx + .RightJoin( + left: left.AsObservableChangeSet(static left => 2 * left), + right: right.AsObservableChangeSet(static right => right), + rightKeySelector: static right => right, + resultSelector: static (left, right) => (left: left.HasValue ? left.Value : default(int?), right)) + .ValidateSynchronization() + .ValidateChangeSets(static pair => pair.right) + .RecordCacheItems(out var results); + results.Error.Should().BeNull(); + + results.RecordedChangeSets.Count.Should().Be(1, "Initialization should only emit one changeset."); + results.RecordedChangeSets[0].Should().OnlyContain(change => change.Reason == ChangeReason.Add, "Initialization should only emit Add changes."); + + results.RecordedItemsByKey.Values.Should().OnlyContain(pair => (2 * pair.left) == pair.right, "Source items should have been joined correctly"); + } public class Device(string name) : IEquatable { diff --git a/src/DynamicData/Cache/Internal/InnerJoin.cs b/src/DynamicData/Cache/Internal/InnerJoin.cs index c71565ce..f1e769a0 100644 --- a/src/DynamicData/Cache/Internal/InnerJoin.cs +++ b/src/DynamicData/Cache/Internal/InnerJoin.cs @@ -37,47 +37,52 @@ internal sealed class InnerJoin(); - var leftLoader = leftCache.Connect().Select(changes => - { - foreach (var change in changes.ToConcreteType()) - { - var leftCurrent = change.Current; - var rightLookup = rightGrouped.Lookup(change.Key); + var hasInitialized = false; - if (rightLookup.HasValue) + var leftLoader = leftCache.Connect() + .Select(changes => + { + foreach (var change in changes.ToConcreteType()) { - switch (change.Reason) + var leftCurrent = change.Current; + var rightLookup = rightGrouped.Lookup(change.Key); + + if (rightLookup.HasValue) { - case ChangeReason.Add: - case ChangeReason.Update: - foreach (var keyvalue in rightLookup.Value.KeyValues) - { - joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurrent, keyvalue.Value), (change.Key, keyvalue.Key)); - } - - break; - - case ChangeReason.Remove: - foreach (var keyvalue in rightLookup.Value.KeyValues) - { - joinedCache.Remove((change.Key, keyvalue.Key)); - } - - break; - - case ChangeReason.Refresh: - foreach (var key in rightLookup.Value.Keys) - { - joinedCache.Refresh((change.Key, key)); - } - - break; + switch (change.Reason) + { + case ChangeReason.Add: + case ChangeReason.Update: + foreach (var keyvalue in rightLookup.Value.KeyValues) + { + joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurrent, keyvalue.Value), (change.Key, keyvalue.Key)); + } + + break; + + case ChangeReason.Remove: + foreach (var keyvalue in rightLookup.Value.KeyValues) + { + joinedCache.Remove((change.Key, keyvalue.Key)); + } + + break; + + case ChangeReason.Refresh: + foreach (var key in rightLookup.Value.Keys) + { + joinedCache.Refresh((change.Key, key)); + } + + break; + } } } - } - return joinedCache.CaptureChanges(); - }); + return joinedCache.CaptureChanges(); + }) + // Don't forward initial changesets from the left side, only the right + .Where(_ => hasInitialized); var rightLoader = rightCache.Connect().Select(changes => { @@ -119,7 +124,11 @@ internal sealed class InnerJoin> Run() => Observable.Creat var locker = new object(); // create local backing stores - var leftCache = _left.Synchronize(locker).AsObservableCache(false); + var leftShare = _left.Synchronize(locker).Publish(); + var leftCache = leftShare.AsObservableCache(false); var rightCache = _right.Synchronize(locker).ChangeKey(_rightKeySelector).AsObservableCache(false); // joined is the final cache var joined = new ChangeAwareCache(); + var hasInitialized = false; + var leftLoader = leftCache.Connect().Select( changes => { @@ -66,8 +69,8 @@ public IObservable> Run() => Observable.Creat return joined.CaptureChanges(); }); - var rightLoader = rightCache.Connect().Select( - changes => + var rightLoader = rightCache.Connect() + .Select(changes => { foreach (var change in changes.ToConcreteType()) { @@ -117,11 +120,17 @@ public IObservable> Run() => Observable.Creat } return joined.CaptureChanges(); - }); + }) + // Don't forward initial changesets from the right side, only the left + .Where(_ => hasInitialized); lock (locker) { - return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache); + var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer); + + hasInitialized = true; + + return new CompositeDisposable(observerSubscription, leftCache, rightCache, leftShare.Connect()); } }); } diff --git a/src/DynamicData/Cache/Internal/RightJoin.cs b/src/DynamicData/Cache/Internal/RightJoin.cs index ca733db2..9d763908 100644 --- a/src/DynamicData/Cache/Internal/RightJoin.cs +++ b/src/DynamicData/Cache/Internal/RightJoin.cs @@ -40,6 +40,8 @@ public IObservable> Run() => Observable.Crea // joined is the final cache var joinedCache = new ChangeAwareCache(); + var hasInitialized = false; + var rightLoader = rightCache.Connect().Select(changes => { foreach (var change in changes.ToConcreteType()) @@ -70,48 +72,58 @@ public IObservable> Run() => Observable.Crea return joinedCache.CaptureChanges(); }); - var leftLoader = leftCache.Connect().Select(changes => - { - foreach (var change in changes.ToConcreteType()) + var leftLoader = leftCache.Connect() + .Select(changes => { - var left = change.Current; - var right = rightGrouped.Lookup(change.Key); - - if (right.HasValue) + foreach (var change in changes.ToConcreteType()) { - switch (change.Reason) + var left = change.Current; + var right = rightGrouped.Lookup(change.Key); + + if (right.HasValue) { - case ChangeReason.Add: - case ChangeReason.Update: - foreach (var keyvalue in right.Value.KeyValues) - { - joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, left, keyvalue.Value), keyvalue.Key); - } - - break; - - case ChangeReason.Remove: - foreach (var keyvalue in right.Value.KeyValues) - { - joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, Optional.None, keyvalue.Value), keyvalue.Key); - } - - break; - - case ChangeReason.Refresh: - foreach (var key in right.Value.Keys) - { - joinedCache.Refresh(key); - } - - break; + switch (change.Reason) + { + case ChangeReason.Add: + case ChangeReason.Update: + foreach (var keyvalue in right.Value.KeyValues) + { + joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, left, keyvalue.Value), keyvalue.Key); + } + + break; + + case ChangeReason.Remove: + foreach (var keyvalue in right.Value.KeyValues) + { + joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, Optional.None, keyvalue.Value), keyvalue.Key); + } + + break; + + case ChangeReason.Refresh: + foreach (var key in right.Value.Keys) + { + joinedCache.Refresh(key); + } + + break; + } } } - } - return joinedCache.CaptureChanges(); - }); + return joinedCache.CaptureChanges(); + }) + // Don't forward initial changesets from the left side, only the right + .Where(_ => hasInitialized); + + lock (locker) + { + var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer); + + hasInitialized = true; - return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect()); + return new CompositeDisposable(observerSubscription, leftCache, rightCache, rightShare.Connect()); + } }); }