可自定义的流过滤
Customizable filtering on a stream
我需要在流上配置过滤器列表。
流最初没有任何过滤器,但在一段时间内它有一个活动过滤器列表。
每个过滤器都有严格的有效期。
测试用例如下:
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0200.Ms(), 'A'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0400.Ms(), 'B'),
ReactiveTest.OnNext(0500.Ms(), 'A'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C') );
// filters
// A between 70ms -> 550ms
// B between 330ms -> 400ms
// modeled as a string observable where:
// first word is the char to filter
// second word are the msecs duration of the filter
var filters = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0070.Ms(), "A 480"),
ReactiveTest.OnNext(0330.Ms(), "B 70")
);
var expected = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C') );
你能建议我最好的 Rx 解决方案吗?
p.s.: 我正在使用以下扩展方法
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
首先,您的输入应如下所示:
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0200.Ms(), 'A'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0400.Ms(), 'B'), //You forgot this line
ReactiveTest.OnNext(0500.Ms(), 'A'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C'));
此外,正如@LeeCampbell 提到的,您的 Ms
扩展方法应该 return 键入 long
,而不是 TimeSpan
。
这是我想出的解决方案:
var activeFilters = filters
.Select(s => s.Split(' '))
.Select(s => Tuple.Create(s[0][0], TimeSpan.FromMilliseconds(int.Parse(s[1]))))
.Select(t => Observable.Timer(t.Item2, scheduler).Select(_ => t.Item1).StartWith(t.Item1))
.MergeCombineLatest(true)
.StartWith(new List<char>());
var output = activeFilters.Publish(_activeFilters =>
input.Join(_activeFilters,
_ => Observable.Return(1),
t => _activeFilters,
(c, filterList) => Tuple.Create(c, filterList)
)
)
.Where(t => !t.Item2.Contains(t.Item1))
.Select(t => t.Item1);
var observer = scheduler.CreateObserver<char>();
output.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
expected.Messages,
observer.Messages);
activeFilters
是一个 observable,它会发出当前处于过滤条件下的字符列表。当主动过滤的列表发生变化时,activeFilters
发出一个新的列表。请注意,由于同一字符可以有多个过滤器,因此列表不一定是唯一的。
一旦您可以在任何给定时间找出哪些过滤器处于活动状态,您就可以将该列表加入到输入中。
代码需要此扩展方法,它使用 System.Collections.Immutable Nuget 包:
public static IObservable<IList<T>> MergeCombineLatest<T>(this IObservable<IObservable<T>> outer, bool removeCompleted)
{
return outer
.SelectMany((inner, i) => inner
.Materialize()
.SelectMany(nt => nt.Kind == NotificationKind.OnNext
? Observable.Return(Tuple.Create(i, nt.Value, true))
: nt.Kind == NotificationKind.OnCompleted
? removeCompleted
? Observable.Return(Tuple.Create(i, default(T), false))
: Observable.Empty<Tuple<int, T, bool>>()
: Observable.Throw<Tuple<int, T, bool>>(nt.Exception)
)
)
.Scan(ImmutableDictionary<int, T>.Empty, (dict, t) => t.Item3 ? dict.SetItem(t.Item1, t.Item2) : dict.Remove(t.Item1))
.Select(dict => dict.Values.ToList());
}
MergeCombineLatest
获取可观察对象的可观察对象,并发出每个子可观察对象的最新值列表。如果 removeCompleted
为真,则当一个子可观察对象完成时,列表会缩小一个。如果 removeCompleted
为 false,则最后一个值将永远保留在列表和后续列表中。
如果有一些更友好的方式来做到这一点,我将不胜感激。
我需要在流上配置过滤器列表。
流最初没有任何过滤器,但在一段时间内它有一个活动过滤器列表。
每个过滤器都有严格的有效期。
测试用例如下:
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0200.Ms(), 'A'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0400.Ms(), 'B'),
ReactiveTest.OnNext(0500.Ms(), 'A'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C') );
// filters
// A between 70ms -> 550ms
// B between 330ms -> 400ms
// modeled as a string observable where:
// first word is the char to filter
// second word are the msecs duration of the filter
var filters = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0070.Ms(), "A 480"),
ReactiveTest.OnNext(0330.Ms(), "B 70")
);
var expected = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C') );
你能建议我最好的 Rx 解决方案吗?
p.s.: 我正在使用以下扩展方法
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
首先,您的输入应如下所示:
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0200.Ms(), 'A'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0400.Ms(), 'B'), //You forgot this line
ReactiveTest.OnNext(0500.Ms(), 'A'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C'));
此外,正如@LeeCampbell 提到的,您的 Ms
扩展方法应该 return 键入 long
,而不是 TimeSpan
。
这是我想出的解决方案:
var activeFilters = filters
.Select(s => s.Split(' '))
.Select(s => Tuple.Create(s[0][0], TimeSpan.FromMilliseconds(int.Parse(s[1]))))
.Select(t => Observable.Timer(t.Item2, scheduler).Select(_ => t.Item1).StartWith(t.Item1))
.MergeCombineLatest(true)
.StartWith(new List<char>());
var output = activeFilters.Publish(_activeFilters =>
input.Join(_activeFilters,
_ => Observable.Return(1),
t => _activeFilters,
(c, filterList) => Tuple.Create(c, filterList)
)
)
.Where(t => !t.Item2.Contains(t.Item1))
.Select(t => t.Item1);
var observer = scheduler.CreateObserver<char>();
output.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
expected.Messages,
observer.Messages);
activeFilters
是一个 observable,它会发出当前处于过滤条件下的字符列表。当主动过滤的列表发生变化时,activeFilters
发出一个新的列表。请注意,由于同一字符可以有多个过滤器,因此列表不一定是唯一的。
一旦您可以在任何给定时间找出哪些过滤器处于活动状态,您就可以将该列表加入到输入中。
代码需要此扩展方法,它使用 System.Collections.Immutable Nuget 包:
public static IObservable<IList<T>> MergeCombineLatest<T>(this IObservable<IObservable<T>> outer, bool removeCompleted)
{
return outer
.SelectMany((inner, i) => inner
.Materialize()
.SelectMany(nt => nt.Kind == NotificationKind.OnNext
? Observable.Return(Tuple.Create(i, nt.Value, true))
: nt.Kind == NotificationKind.OnCompleted
? removeCompleted
? Observable.Return(Tuple.Create(i, default(T), false))
: Observable.Empty<Tuple<int, T, bool>>()
: Observable.Throw<Tuple<int, T, bool>>(nt.Exception)
)
)
.Scan(ImmutableDictionary<int, T>.Empty, (dict, t) => t.Item3 ? dict.SetItem(t.Item1, t.Item2) : dict.Remove(t.Item1))
.Select(dict => dict.Values.ToList());
}
MergeCombineLatest
获取可观察对象的可观察对象,并发出每个子可观察对象的最新值列表。如果 removeCompleted
为真,则当一个子可观察对象完成时,列表会缩小一个。如果 removeCompleted
为 false,则最后一个值将永远保留在列表和后续列表中。
如果有一些更友好的方式来做到这一点,我将不胜感激。