Replies: 3 comments 6 replies
-
Not that I can tell. I think all that needs to happen is for the key to get carried through on each change. Here's a 5-minute rework of the current "SourceList" version that does just that. internal class FilterOnObservable<TObject, TKey>
where TObject : notnull
where TKey : notnull
{
private readonly TimeSpan? _buffer;
private readonly Func<TObject, IObservable<bool>> _filter;
private readonly IScheduler? _scheduler;
private readonly IObservable<IChangeSet<TObject, TKey>> _source;
public FilterOnObservable(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IObservable<bool>> filter, TimeSpan? buffer = null, IScheduler? scheduler = null)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_filter = filter ?? throw new ArgumentNullException(nameof(filter));
_buffer = buffer;
_scheduler = scheduler;
}
public IObservable<IChangeSet<TObject, TKey>> Run()
{
return Observable.Create<IChangeSet<TObject, TKey>>(
observer =>
{
var locker = new object();
var allItems = new List<ObjWithKeyAndFilterValue>();
var shared = _source.Synchronize(locker).Transform((v, k) => new ObjWithKeyAndFilterValue(v, k, true)) // we default to true (include all items)
.Clone(allItems) // clone all items so we can look up the index when a change has been made
.Publish();
// monitor each item observable and create change, carry the value of the observable property
var itemHasChanged = shared.MergeMany(v => _filter(v.Obj).Select(prop => new ObjWithKeyAndFilterValue(v.Obj, v.Key, prop)));
// create a change set, either buffered or one item at the time
var itemsChanged = _buffer is null ?
itemHasChanged.Select(t => new[] { t }) :
itemHasChanged.Buffer(_buffer.Value, _scheduler ?? Scheduler.Default).Where(list => list.Count > 0);
var requiresRefresh = itemsChanged.Synchronize(locker).Select(
items =>
{
// catch all the indices of items which have been refreshed
return IndexOfMany(allItems, items, v => v.Obj, (t, idx) => new Change<ObjWithKeyAndFilterValue, TKey>(ChangeReason.Refresh, t.Key, t, idx));
}).Select(changes => new ChangeSet<ObjWithKeyAndFilterValue, TKey>(changes));
// publish refreshes and underlying changes
var publisher = shared.Merge(requiresRefresh).Filter(v => v.Filter)
.Transform(v => v.Obj)
.SuppressRefresh() // suppress refreshes from filter, avoids excessive refresh messages for no-op filter updates
.NotEmpty()
.SubscribeSafe(observer);
return new CompositeDisposable(publisher, shared.Connect());
});
}
private static IEnumerable<TResult> IndexOfMany<TObj, TObjectProp, TResult>(IEnumerable<TObj> source, IEnumerable<TObj> itemsToFind, Func<TObj, TObjectProp> objectPropertyFunc, Func<TObj, int, TResult> resultSelector)
{
if (source is null)
{
throw new ArgumentNullException(nameof(source));
}
if (itemsToFind is null)
{
throw new ArgumentNullException(nameof(itemsToFind));
}
if (resultSelector is null)
{
throw new ArgumentNullException(nameof(resultSelector));
}
var indexed = source.Select((element, index) => new { Element = element, Index = index });
return itemsToFind.Join(indexed, objectPropertyFunc, right => objectPropertyFunc(right.Element), (left, right) => resultSelector(left, right.Index));
}
private readonly struct ObjWithKeyAndFilterValue : IEquatable<ObjWithKeyAndFilterValue>
{
public readonly TObject Obj;
public readonly TKey Key;
public readonly bool Filter;
public ObjWithKeyAndFilterValue(TObject obj, TKey key, bool filter)
{
Obj = obj;
Key = key;
Filter = filter;
}
private static IEqualityComparer<ObjWithKeyAndFilterValue> ObjComparer { get; } = new ObjEqualityComparer();
public bool Equals(ObjWithKeyAndFilterValue other)
{
// default equality does _not_ include Filter value, as that would cause the Filter operator that is used later to fail
return ObjComparer.Equals(this, other);
}
public override bool Equals(object? obj)
{
return obj is ObjWithKeyAndFilterValue value && Equals(value);
}
public override int GetHashCode()
{
return ObjComparer.GetHashCode(this);
}
private sealed class ObjEqualityComparer : IEqualityComparer<ObjWithKeyAndFilterValue>
{
public bool Equals(ObjWithKeyAndFilterValue x, ObjWithKeyAndFilterValue y)
{
return EqualityComparer<TObject>.Default.Equals(x.Obj, y.Obj);
}
public int GetHashCode(ObjWithKeyAndFilterValue obj)
{
unchecked
{
return (obj.Obj is null ? 0 : EqualityComparer<TObject>.Default.GetHashCode(obj.Obj)) * 397;
}
}
}
}
} |
Beta Was this translation helpful? Give feedback.
-
It turns out to make this for SourceCache is way more simple than making it for SourceList. The PR I submitted uses a Proxy Object that holds a bool, applies I wonder if the SourceList version could be simplified using this approach? |
Beta Was this translation helpful? Give feedback.
-
I considered added a bool parameter that would let the caller decide if things are in or out by default, but I decided against that after I realized how easy it is to control this with the filter observable itself. If you really want things to pass the filter until a false is observed, then you can just add var offByDefault = sourceCache.Connect().FilterOnObservable(item => GetFilter(item));
var onByDefault = sourceCache.Connect().FilterOnObservable(item => GetFilter(item).StartWith(true)); The extra parameter would have made it unnecessarily complicated. Besides, there is something to be said about keeping the List/Cache versions the same as much as possible.
One huge benefit of Rx is that things can happen independently for each item. If one item takes a while for something to decide if it shows up or not, then let it show up (or not) when it is ready. In the meantime, deal with the ones that are ready. Perhaps I'm missing something, but I just don't see the benefit in blocking ALL of them until all filters have been checked. However, if you want to give the filters a chance to fire for a bit to avoid dealing with a ton of tiny updates, then you should remember that var bufferedFilter = sourceCache.Connect().FilterOnObservable(item => GetFilter(item), TimeSpan.FromSeconds(5)); |
Beta Was this translation helpful? Give feedback.
-
The
FilterOnObservable
method exists only forSourceList
, but I think it should exist forSourceCache
, too.Is there any reason why this method isn't still there?
Thanks.
Beta Was this translation helpful? Give feedback.
All reactions