可观察:在源完成之前以间隔获取最新值

Observable: Getting latest value in intervals until source finishes

我正在寻找具有类似于此签名的可观察选择器:

static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval)

哪个应该:

  1. 输入发出第一项后立即发出第一项
  2. 从那时起,在之后的固定时间间隔内,发出输入产生的最新项目
  3. 每当输入完成(或失败)时完成(或失败)

就弹珠而言,类似于以下内容 - 假设间隔 = 2 个时间单位:

Time 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Input A B C D E F (complete)
Output A B D D E E complete (F not emitted anymore)

是否有任何开箱即用的方法,或产生这些结果的相当简单的选择器?

我现在已经完成了以下操作 - 我 认为 它有效,但我会打开它以防任何人能想到更优雅的方式(或可以想到我当前的实施存在问题)

static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval, IScheduler scheduler) => input
  .FirstAsync()
  .Select(_ => Observable.Interval(interval, scheduler).StartWith(0))
  .Switch()
  .CombineLatest(input, (a,b) => (a,b))
  .DistinctUntilChanged(x => x.a)
  .Select(x => x.b)
  .TakeUntil(input.LastAsync());

这应该完全符合您的要求。不过我还没有测试过。

/// <summary>Samples the source observable sequence at each interval,
/// allowing repeated emissions of the same element.</summary>
public static IObservable<T> SampleWithDuplicates<T>(this IObservable<T> source,
    TimeSpan interval, IScheduler scheduler = null)
{
    scheduler ??= DefaultScheduler.Instance;
    return source.Publish(published => Observable
        .Interval(interval, scheduler)
        .WithLatestFrom(published, (_, x) => x)
        .Merge(published.FirstAsync())
        .TakeUntil(published.LastOrDefaultAsync()));
}