Reactive Extensions - 切换,但要等到之前的可观察对象完成而不是取消

Reactive Extensions - Switch, but wait until previous observable completes instead cancelling

我有一个由可观察订阅触发的异步加载方法。

我想使用以下策略防止并发调用 LoadAsync:

我几乎可以肯定这个策略有名字,但我想不起来了。

  1. 如果之前的调用尚未完成,则不会调用 LoadAsync。

    • LoadAsync(3) 仅在 LoadAsync(1) 完成后才开始
  2. 仅针对最新值调用 LoadAsync

    • 从未调用过 LoadAsync(2)

我目前使用此 switchmap 等效项,但它取消了 LoadAsync 方法以支持新调用。
Observable<int> sourceObservable..
sourceObservable
   .Select(value => Observable.FromAsync(() => LoadAsync(value)))
   .Switch() 

async Task<int> LoadAsync(int value)
{
    await Task.Delay(1000);
    return value;
}

在 javascript 中将是:

source$.pipe(switchMap(Load))

function load(id): Observable<int> {
    return delay(1000).pipe(map(() => id));
}

编辑:我想知道为什么还没有人要求这种扁平化策略。我认为这是很常见的要求。

例如,这就是 CI/CD 管道通常的工作方式。如果对一个分支的推送太多而你没有能力构建它们,但你想确保构建最新的提交

换句话说,您想从第 2 级可观察对象中取出一个子可观察对象,获取所有结果并等待其完成,然后重复。

这将适用于 C#:

public static IObservable<T> SingleSwitch<T, U>(this IObservable<U> source, Func<U, IObservable<T>> selector)
{
    return source.Publish(_source => _source
        .Take(1)
        .SelectMany(e => selector(e))
        .Repeat()
    );
}

请注意,您的 Load 中存在错误。您需要添加 await 才能使延迟正常工作。

示例代码:

Observable.Interval(TimeSpan.FromSeconds(.75))
    .SingleSwitch(e => Observable.FromAsync (async () => {
        await Task.Delay(1000);
        return e;
    }))
    .Dump();

输出是偶数,奇数被阻塞因为偶数是运行。

@TheodorZoulias 提到的有效,但我觉得有点太复杂了。

我写了一个更简单的替代方案,但我正在寻找最合适的名称

/// <summary>
/// Concatenates most recent inner observable sequence when previous completes.
/// Similar to Concat, but it ignores out of date inner observable sequences.
/// Similar to Exhaust, but it preserves latest inner observable.
/// </summary>
public static IObservable<T> ConcatExhaust<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Defer(() =>
    {
        IObservable<T> latest = default;
        return source
            .Select(inner =>
            {
                latest = inner;
                return Observable.Defer(() => latest == inner ? inner : Observable.Empty<T>());
            })
            .ConcatExhaust();
    });
}

以下测试:

var source = Observable.Interval(TimeSpan.FromMilliseconds(300))
    .Take(5)
    .Do(val => Console.WriteLine($"Source: {val}"));


source.Select(val => Observable.FromAsync(() => LoadAsync(val)))
     .ConcatExhaust()
     .Wait();

returns:

Source: 0
Load Started: 0
Source: 1
Source: 2
Source: 3
Value finished: 0
Load Started: 3
Source: 4
Value finished: 3
Load Started: 4
Value finished: 4