如何在 Rx 中异步重新过滤序列

How to re-filter a sequence asynchrounously in Rx

我有一个可观察的序列,应该根据某些条件使用 Where 运算符进行过滤,并且应该用这些过滤后的元素填充一个列表。

我想即时更改过滤标准,所以当发生这种情况时,应该清除列表,应该根据新标准评估可观察序列的先前元素,以及由顺序。

该列表应使用现在通过新标准的过去元素重新填充,并继续使用新元素(也已过滤)。我不在乎新元素可能会延迟,而以前的元素会被重新评估,但我确实关心顺序。

Reactive Extensions 可以做到这一点吗?

除非我错过了一些关键的东西,否则指定的要求消除了对不可变集合的需求并简化了实现,因为您需要缓冲所有发出的值。

private List<T> values = new List<T>();
private IObservable<T> _valueSource;
public List<T> ValidValues => values.Where(MatchesCriteria).ToList();

private void StartSubscriptions()
{
    var addNewValuesSub = _valueSource.Subscribe(values.Add); //todo disposing
}

如果您认为 IEnumerable.Where 太慢并且我们事先知道所有可能的标准。我们可以 GroupBy 将值分离到它们各自的 Observable s /数据结构中。它看起来像这样。

_valueSource.GroupBy(CriteriaSelector)
            .Subscribe(i => UpdateDataStructure(i.Key(), i.Latest()) );

IObservable 不适合缓冲大量值,这些值稍后应按某些条件访问。这是 IEnumerable 的工作。

最后,如果您认为内存使用会成为一个问题,请考虑重构 values 作为内存对象缓存系统。

我有两个初步反应

  1. 那只是一个.Replay()过滤条件改变时重新订阅
  2. 这听起来也像是一个无限缓冲区 :-/

为了解决我的第一个想法,您可以使用此代码

public class Foo
{
    private readonly IDisposable _replayConnection;
    private readonly IConnectableObservable<int> _replaySource;
    private readonly SerialDisposable _subscription = new SerialDisposable();
    private readonly List<int> _values = new List<int>();

    //the Ctor or some initialise method...
    public Foo(IObservable<int> source)
    {
        _replaySource = source.Replay();
        _replayConnection =  _replaySource.Connect()
    }

    public void SetFilter(Func<int, bool> predicate)
    {
        //Not thread safe. If required, then a scheduler can solve that.
        _values.Clear();
        _subscription.Disposable = _replaySource.Where(predicate)
            .Subscribe(value => _values.Add(value),
                ex => {/*Do something here to handle errors*/},
                () => {/*Do something here if you want to cater for completion of the sequence*/},
    }
}

然而,这恰恰让我更关心2点。如果您期望数百万个值,那么如果它们只是整数,那么您将使用大约 3MB/100 万个内存项。如果它们是整数,则按值语义复制,您将在重播缓冲区和最终列表 (IIRC) 中获得副本。如果这种内存压力没问题,那我觉得上面的Replay代码就可以了。另请注意,如果您尝试缓冲超过 int.MaxValue 个值,此 Replay 用法将抛出。

这是一个很好的扩展方法,可以满足您的需要:

public static IObservable<T> Refilterable<T>(
    this IObservable<T> source, IObservable<Func<T, bool>> filters)
{
    return
        Observable
            .Create<T>(o =>
            {
                var replay = new ReplaySubject<T>();
                var replaySubscription = source.Subscribe(replay);
                var query = filters.Select(f => replay.Where(f)).Switch();
                var querySubscription = query.Subscribe(o);
                return new CompositeDisposable(replaySubscription, querySubscription);
            });
}

我用这段代码测试了这个:

var source = new Subject<int>();
var filters = new Subject<Func<int, bool>>();

var subscription = source.Refilterable(filters).Subscribe(x => Console.WriteLine(x));

source.OnNext(1);
source.OnNext(2);
source.OnNext(3);

filters.OnNext(x => x % 2 == 0);

source.OnNext(4);
source.OnNext(5);

filters.OnNext(x => x % 2 == 1);

source.OnNext(6);

filters.OnNext(x => x % 3 == 0);

source.OnNext(7);

filters.OnNext(x => x % 2 == 1);

subscription.Dispose();

filters.OnNext(x => x % 2 == 0);

我得到了这个输出:

2
4
1
3
5
3
6
1
3
5
7

这似乎是你想要的。


我刚刚注意到生成列表的要求。这是一个更新:

public static IObservable<IList<T>> Refilterable<T>(this IObservable<T> source, IObservable<Func<T, bool>> filters)
{
    return
        Observable
            .Create<IList<T>>(o =>
            {
                var replay = new ReplaySubject<T>();
                var replaySubscription = source.Subscribe(replay);
                var query =
                    filters
                        .Select(f =>
                            replay
                                .Synchronize()
                                .Where(f)
                                .Scan(new List<T>(), (a, x) =>
                                {
                                    a.Add(x);
                                    return a;
                                }))
                        .Switch();
                var querySubscription = query.Subscribe(o);
                return new CompositeDisposable(replaySubscription, querySubscription);
            });
}

我唯一注意到的另一件事是 VB.NET 标签。如果需要的话,我看看能不能稍后转换。


这个应该是对的VB:

<System.Runtime.CompilerServices.Extension> _
Public Shared Function Refilterable(Of T)(source As IObservable(Of T), filters As IObservable(Of Func(Of T, Boolean))) As IObservable(Of IList(Of T))
    Return Observable.Create(Of IList(Of T))( _
        Function(o)
            Dim replay = New ReplaySubject(Of T)()
            Dim replaySubscription = source.Subscribe(replay)
            Dim query = filters.[Select](Function(f) replay.Synchronize().Where(f).Scan(New List(Of T)(), _
                Function(a, x)
                    a.Add(x)
                    Return a
                End Function)).Switch()
            Dim querySubscription = query.Subscribe(o)
            Return New CompositeDisposable(replaySubscription, querySubscription)
        End Function)
End Function