可观察:在源完成之前以间隔获取最新值
Observable: Getting latest value in intervals until source finishes
我正在寻找具有类似于此签名的可观察选择器:
static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval)
哪个应该:
- 输入发出第一项后立即发出第一项
- 从那时起,在之后的固定时间间隔内,发出输入产生的最新项目
- 每当输入完成(或失败)时完成(或失败)
就弹珠而言,类似于以下内容 - 假设间隔 = 2 个时间单位:
Time
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Input
A
B
C
D
E
F (complete)
Output
A
B
D
D
E
E
complete (F not emitted anymore)
是否有任何开箱即用的方法,或产生这些结果的相当简单的选择器?
我现在已经完成了以下操作 - 我 认为 它有效,但我会打开它以防任何人能想到更优雅的方式(或可以想到我当前的实施存在问题)
static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval, IScheduler scheduler) => input
.FirstAsync()
.Select(_ => Observable.Interval(interval, scheduler).StartWith(0))
.Switch()
.CombineLatest(input, (a,b) => (a,b))
.DistinctUntilChanged(x => x.a)
.Select(x => x.b)
.TakeUntil(input.LastAsync());
这应该完全符合您的要求。不过我还没有测试过。
/// <summary>Samples the source observable sequence at each interval,
/// allowing repeated emissions of the same element.</summary>
public static IObservable<T> SampleWithDuplicates<T>(this IObservable<T> source,
TimeSpan interval, IScheduler scheduler = null)
{
scheduler ??= DefaultScheduler.Instance;
return source.Publish(published => Observable
.Interval(interval, scheduler)
.WithLatestFrom(published, (_, x) => x)
.Merge(published.FirstAsync())
.TakeUntil(published.LastOrDefaultAsync()));
}
我正在寻找具有类似于此签名的可观察选择器:
static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval)
哪个应该:
- 输入发出第一项后立即发出第一项
- 从那时起,在之后的固定时间间隔内,发出输入产生的最新项目
- 每当输入完成(或失败)时完成(或失败)
就弹珠而言,类似于以下内容 - 假设间隔 = 2 个时间单位:
Time | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Input | A | B | C | D | E | F (complete) | |||||||||
Output | A | B | D | D | E | E | complete (F not emitted anymore) |
是否有任何开箱即用的方法,或产生这些结果的相当简单的选择器?
我现在已经完成了以下操作 - 我 认为 它有效,但我会打开它以防任何人能想到更优雅的方式(或可以想到我当前的实施存在问题)
static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval, IScheduler scheduler) => input
.FirstAsync()
.Select(_ => Observable.Interval(interval, scheduler).StartWith(0))
.Switch()
.CombineLatest(input, (a,b) => (a,b))
.DistinctUntilChanged(x => x.a)
.Select(x => x.b)
.TakeUntil(input.LastAsync());
这应该完全符合您的要求。不过我还没有测试过。
/// <summary>Samples the source observable sequence at each interval,
/// allowing repeated emissions of the same element.</summary>
public static IObservable<T> SampleWithDuplicates<T>(this IObservable<T> source,
TimeSpan interval, IScheduler scheduler = null)
{
scheduler ??= DefaultScheduler.Instance;
return source.Publish(published => Observable
.Interval(interval, scheduler)
.WithLatestFrom(published, (_, x) => x)
.Merge(published.FirstAsync())
.TakeUntil(published.LastOrDefaultAsync()));
}