-
-
Notifications
You must be signed in to change notification settings - Fork 183
Commit
…e initial changeset is emitted, and that it emits only after both sources have emitted their initialization changeset.
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,8 @@ public IObservable<IChangeSet<TDestination, TRightKey>> Run() => Observable.Crea | |
// joined is the final cache | ||
var joinedCache = new ChangeAwareCache<TDestination, TRightKey>(); | ||
var hasInitialized = false; | ||
var rightLoader = rightCache.Connect().Select(changes => | ||
{ | ||
foreach (var change in changes.ToConcreteType()) | ||
|
@@ -70,48 +72,58 @@ public IObservable<IChangeSet<TDestination, TRightKey>> Run() => Observable.Crea | |
return joinedCache.CaptureChanges(); | ||
}); | ||
var leftLoader = leftCache.Connect().Select(changes => | ||
{ | ||
foreach (var change in changes.ToConcreteType()) | ||
var leftLoader = leftCache.Connect() | ||
.Select(changes => | ||
{ | ||
var left = change.Current; | ||
var right = rightGrouped.Lookup(change.Key); | ||
if (right.HasValue) | ||
foreach (var change in changes.ToConcreteType()) | ||
{ | ||
switch (change.Reason) | ||
var left = change.Current; | ||
var right = rightGrouped.Lookup(change.Key); | ||
if (right.HasValue) | ||
{ | ||
case ChangeReason.Add: | ||
case ChangeReason.Update: | ||
foreach (var keyvalue in right.Value.KeyValues) | ||
{ | ||
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, left, keyvalue.Value), keyvalue.Key); | ||
} | ||
break; | ||
case ChangeReason.Remove: | ||
foreach (var keyvalue in right.Value.KeyValues) | ||
{ | ||
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, Optional<TLeft>.None, keyvalue.Value), keyvalue.Key); | ||
} | ||
break; | ||
case ChangeReason.Refresh: | ||
foreach (var key in right.Value.Keys) | ||
{ | ||
joinedCache.Refresh(key); | ||
} | ||
break; | ||
switch (change.Reason) | ||
{ | ||
case ChangeReason.Add: | ||
case ChangeReason.Update: | ||
foreach (var keyvalue in right.Value.KeyValues) | ||
{ | ||
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, left, keyvalue.Value), keyvalue.Key); | ||
} | ||
break; | ||
case ChangeReason.Remove: | ||
foreach (var keyvalue in right.Value.KeyValues) | ||
{ | ||
joinedCache.AddOrUpdate(_resultSelector(keyvalue.Key, Optional<TLeft>.None, keyvalue.Value), keyvalue.Key); | ||
} | ||
break; | ||
case ChangeReason.Refresh: | ||
foreach (var key in right.Value.Keys) | ||
{ | ||
joinedCache.Refresh(key); | ||
} | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
return joinedCache.CaptureChanges(); | ||
}); | ||
return joinedCache.CaptureChanges(); | ||
}) | ||
// Don't forward initial changesets from the left side, only the right | ||
.Where(_ => hasInitialized); | ||
; | ||
Check failure on line 119 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
Check failure on line 119 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
Check failure on line 119 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
Check failure on line 119 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
Check failure on line 119 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
Check failure on line 119 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
|
||
lock(locker) | ||
Check failure on line 120 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
Check failure on line 120 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
Check failure on line 120 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
Check failure on line 120 in src/DynamicData/Cache/Internal/RightJoin.cs GitHub Actions / build
|
||
{ | ||
var observerSubscription = leftLoader.Merge(rightLoader).SubscribeSafe(observer); | ||
hasInitialized = true; | ||
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect()); | ||
return new CompositeDisposable(observerSubscription, leftCache, rightCache, rightShare.Connect()); | ||
} | ||
}); | ||
} |