如何在 Rx 中异步重新过滤序列
How to re-filter a sequence asynchrounously in Rx
我有一个可观察的序列,应该根据某些条件使用 Where
运算符进行过滤,并且应该用这些过滤后的元素填充一个列表。
我想即时更改过滤标准,所以当发生这种情况时,应该清除列表,应该根据新标准评估可观察序列的先前元素,以及由顺序。
该列表应使用现在通过新标准的过去元素重新填充,并继续使用新元素(也已过滤)。我不在乎新元素可能会延迟,而以前的元素会被重新评估,但我确实关心顺序。
Reactive Extensions 可以做到这一点吗?
除非我错过了一些关键的东西,否则指定的要求消除了对不可变集合的需求并简化了实现,因为您需要缓冲所有发出的值。
private List<T> values = new List<T>();
private IObservable<T> _valueSource;
public List<T> ValidValues => values.Where(MatchesCriteria).ToList();
private void StartSubscriptions()
{
var addNewValuesSub = _valueSource.Subscribe(values.Add); //todo disposing
}
如果您认为 IEnumerable.Where
太慢并且我们事先知道所有可能的标准。我们可以 GroupBy
将值分离到它们各自的 Observable
s /数据结构中。它看起来像这样。
_valueSource.GroupBy(CriteriaSelector)
.Subscribe(i => UpdateDataStructure(i.Key(), i.Latest()) );
IObservable
不适合缓冲大量值,这些值稍后应按某些条件访问。这是 IEnumerable
的工作。
最后,如果您认为内存使用会成为一个问题,请考虑重构 values
作为内存对象缓存系统。
我有两个初步反应
- 那只是一个.
Replay()
过滤条件改变时重新订阅
- 这听起来也像是一个无限缓冲区 :-/
为了解决我的第一个想法,您可以使用此代码
public class Foo
{
private readonly IDisposable _replayConnection;
private readonly IConnectableObservable<int> _replaySource;
private readonly SerialDisposable _subscription = new SerialDisposable();
private readonly List<int> _values = new List<int>();
//the Ctor or some initialise method...
public Foo(IObservable<int> source)
{
_replaySource = source.Replay();
_replayConnection = _replaySource.Connect()
}
public void SetFilter(Func<int, bool> predicate)
{
//Not thread safe. If required, then a scheduler can solve that.
_values.Clear();
_subscription.Disposable = _replaySource.Where(predicate)
.Subscribe(value => _values.Add(value),
ex => {/*Do something here to handle errors*/},
() => {/*Do something here if you want to cater for completion of the sequence*/},
}
}
然而,这恰恰让我更关心2点。如果您期望数百万个值,那么如果它们只是整数,那么您将使用大约 3MB/100 万个内存项。如果它们是整数,则按值语义复制,您将在重播缓冲区和最终列表 (IIRC) 中获得副本。如果这种内存压力没问题,那我觉得上面的Replay
代码就可以了。另请注意,如果您尝试缓冲超过 int.MaxValue
个值,此 Replay 用法将抛出。
这是一个很好的扩展方法,可以满足您的需要:
public static IObservable<T> Refilterable<T>(
this IObservable<T> source, IObservable<Func<T, bool>> filters)
{
return
Observable
.Create<T>(o =>
{
var replay = new ReplaySubject<T>();
var replaySubscription = source.Subscribe(replay);
var query = filters.Select(f => replay.Where(f)).Switch();
var querySubscription = query.Subscribe(o);
return new CompositeDisposable(replaySubscription, querySubscription);
});
}
我用这段代码测试了这个:
var source = new Subject<int>();
var filters = new Subject<Func<int, bool>>();
var subscription = source.Refilterable(filters).Subscribe(x => Console.WriteLine(x));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
filters.OnNext(x => x % 2 == 0);
source.OnNext(4);
source.OnNext(5);
filters.OnNext(x => x % 2 == 1);
source.OnNext(6);
filters.OnNext(x => x % 3 == 0);
source.OnNext(7);
filters.OnNext(x => x % 2 == 1);
subscription.Dispose();
filters.OnNext(x => x % 2 == 0);
我得到了这个输出:
2
4
1
3
5
3
6
1
3
5
7
这似乎是你想要的。
我刚刚注意到生成列表的要求。这是一个更新:
public static IObservable<IList<T>> Refilterable<T>(this IObservable<T> source, IObservable<Func<T, bool>> filters)
{
return
Observable
.Create<IList<T>>(o =>
{
var replay = new ReplaySubject<T>();
var replaySubscription = source.Subscribe(replay);
var query =
filters
.Select(f =>
replay
.Synchronize()
.Where(f)
.Scan(new List<T>(), (a, x) =>
{
a.Add(x);
return a;
}))
.Switch();
var querySubscription = query.Subscribe(o);
return new CompositeDisposable(replaySubscription, querySubscription);
});
}
我唯一注意到的另一件事是 VB.NET 标签。如果需要的话,我看看能不能稍后转换。
这个应该是对的VB:
<System.Runtime.CompilerServices.Extension> _
Public Shared Function Refilterable(Of T)(source As IObservable(Of T), filters As IObservable(Of Func(Of T, Boolean))) As IObservable(Of IList(Of T))
Return Observable.Create(Of IList(Of T))( _
Function(o)
Dim replay = New ReplaySubject(Of T)()
Dim replaySubscription = source.Subscribe(replay)
Dim query = filters.[Select](Function(f) replay.Synchronize().Where(f).Scan(New List(Of T)(), _
Function(a, x)
a.Add(x)
Return a
End Function)).Switch()
Dim querySubscription = query.Subscribe(o)
Return New CompositeDisposable(replaySubscription, querySubscription)
End Function)
End Function
我有一个可观察的序列,应该根据某些条件使用 Where
运算符进行过滤,并且应该用这些过滤后的元素填充一个列表。
我想即时更改过滤标准,所以当发生这种情况时,应该清除列表,应该根据新标准评估可观察序列的先前元素,以及由顺序。
该列表应使用现在通过新标准的过去元素重新填充,并继续使用新元素(也已过滤)。我不在乎新元素可能会延迟,而以前的元素会被重新评估,但我确实关心顺序。
Reactive Extensions 可以做到这一点吗?
除非我错过了一些关键的东西,否则指定的要求消除了对不可变集合的需求并简化了实现,因为您需要缓冲所有发出的值。
private List<T> values = new List<T>();
private IObservable<T> _valueSource;
public List<T> ValidValues => values.Where(MatchesCriteria).ToList();
private void StartSubscriptions()
{
var addNewValuesSub = _valueSource.Subscribe(values.Add); //todo disposing
}
如果您认为 IEnumerable.Where
太慢并且我们事先知道所有可能的标准。我们可以 GroupBy
将值分离到它们各自的 Observable
s /数据结构中。它看起来像这样。
_valueSource.GroupBy(CriteriaSelector)
.Subscribe(i => UpdateDataStructure(i.Key(), i.Latest()) );
IObservable
不适合缓冲大量值,这些值稍后应按某些条件访问。这是 IEnumerable
的工作。
最后,如果您认为内存使用会成为一个问题,请考虑重构 values
作为内存对象缓存系统。
我有两个初步反应
- 那只是一个.
Replay()
过滤条件改变时重新订阅 - 这听起来也像是一个无限缓冲区 :-/
为了解决我的第一个想法,您可以使用此代码
public class Foo
{
private readonly IDisposable _replayConnection;
private readonly IConnectableObservable<int> _replaySource;
private readonly SerialDisposable _subscription = new SerialDisposable();
private readonly List<int> _values = new List<int>();
//the Ctor or some initialise method...
public Foo(IObservable<int> source)
{
_replaySource = source.Replay();
_replayConnection = _replaySource.Connect()
}
public void SetFilter(Func<int, bool> predicate)
{
//Not thread safe. If required, then a scheduler can solve that.
_values.Clear();
_subscription.Disposable = _replaySource.Where(predicate)
.Subscribe(value => _values.Add(value),
ex => {/*Do something here to handle errors*/},
() => {/*Do something here if you want to cater for completion of the sequence*/},
}
}
然而,这恰恰让我更关心2点。如果您期望数百万个值,那么如果它们只是整数,那么您将使用大约 3MB/100 万个内存项。如果它们是整数,则按值语义复制,您将在重播缓冲区和最终列表 (IIRC) 中获得副本。如果这种内存压力没问题,那我觉得上面的Replay
代码就可以了。另请注意,如果您尝试缓冲超过 int.MaxValue
个值,此 Replay 用法将抛出。
这是一个很好的扩展方法,可以满足您的需要:
public static IObservable<T> Refilterable<T>(
this IObservable<T> source, IObservable<Func<T, bool>> filters)
{
return
Observable
.Create<T>(o =>
{
var replay = new ReplaySubject<T>();
var replaySubscription = source.Subscribe(replay);
var query = filters.Select(f => replay.Where(f)).Switch();
var querySubscription = query.Subscribe(o);
return new CompositeDisposable(replaySubscription, querySubscription);
});
}
我用这段代码测试了这个:
var source = new Subject<int>();
var filters = new Subject<Func<int, bool>>();
var subscription = source.Refilterable(filters).Subscribe(x => Console.WriteLine(x));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
filters.OnNext(x => x % 2 == 0);
source.OnNext(4);
source.OnNext(5);
filters.OnNext(x => x % 2 == 1);
source.OnNext(6);
filters.OnNext(x => x % 3 == 0);
source.OnNext(7);
filters.OnNext(x => x % 2 == 1);
subscription.Dispose();
filters.OnNext(x => x % 2 == 0);
我得到了这个输出:
2 4 1 3 5 3 6 1 3 5 7
这似乎是你想要的。
我刚刚注意到生成列表的要求。这是一个更新:
public static IObservable<IList<T>> Refilterable<T>(this IObservable<T> source, IObservable<Func<T, bool>> filters)
{
return
Observable
.Create<IList<T>>(o =>
{
var replay = new ReplaySubject<T>();
var replaySubscription = source.Subscribe(replay);
var query =
filters
.Select(f =>
replay
.Synchronize()
.Where(f)
.Scan(new List<T>(), (a, x) =>
{
a.Add(x);
return a;
}))
.Switch();
var querySubscription = query.Subscribe(o);
return new CompositeDisposable(replaySubscription, querySubscription);
});
}
我唯一注意到的另一件事是 VB.NET 标签。如果需要的话,我看看能不能稍后转换。
这个应该是对的VB:
<System.Runtime.CompilerServices.Extension> _
Public Shared Function Refilterable(Of T)(source As IObservable(Of T), filters As IObservable(Of Func(Of T, Boolean))) As IObservable(Of IList(Of T))
Return Observable.Create(Of IList(Of T))( _
Function(o)
Dim replay = New ReplaySubject(Of T)()
Dim replaySubscription = source.Subscribe(replay)
Dim query = filters.[Select](Function(f) replay.Synchronize().Where(f).Scan(New List(Of T)(), _
Function(a, x)
a.Add(x)
Return a
End Function)).Switch()
Dim querySubscription = query.Subscribe(o)
Return New CompositeDisposable(replaySubscription, querySubscription)
End Function)
End Function