Reactive Extensions - 切换,但要等到之前的可观察对象完成而不是取消
Reactive Extensions - Switch, but wait until previous observable completes instead cancelling
我有一个由可观察订阅触发的异步加载方法。
我想使用以下策略防止并发调用 LoadAsync:
我几乎可以肯定这个策略有名字,但我想不起来了。
如果之前的调用尚未完成,则不会调用 LoadAsync。
- LoadAsync(3) 仅在 LoadAsync(1) 完成后才开始
仅针对最新值调用 LoadAsync
- 从未调用过 LoadAsync(2)
我目前使用此 switchmap 等效项,但它取消了 LoadAsync 方法以支持新调用。
Observable<int> sourceObservable..
sourceObservable
.Select(value => Observable.FromAsync(() => LoadAsync(value)))
.Switch()
async Task<int> LoadAsync(int value)
{
await Task.Delay(1000);
return value;
}
在 javascript 中将是:
source$.pipe(switchMap(Load))
function load(id): Observable<int> {
return delay(1000).pipe(map(() => id));
}
编辑:我想知道为什么还没有人要求这种扁平化策略。我认为这是很常见的要求。
例如,这就是 CI/CD 管道通常的工作方式。如果对一个分支的推送太多而你没有能力构建它们,但你想确保构建最新的提交
换句话说,您想从第 2 级可观察对象中取出一个子可观察对象,获取所有结果并等待其完成,然后重复。
这将适用于 C#:
public static IObservable<T> SingleSwitch<T, U>(this IObservable<U> source, Func<U, IObservable<T>> selector)
{
return source.Publish(_source => _source
.Take(1)
.SelectMany(e => selector(e))
.Repeat()
);
}
请注意,您的 Load
中存在错误。您需要添加 await
才能使延迟正常工作。
示例代码:
Observable.Interval(TimeSpan.FromSeconds(.75))
.SingleSwitch(e => Observable.FromAsync (async () => {
await Task.Delay(1000);
return e;
}))
.Dump();
输出是偶数,奇数被阻塞因为偶数是运行。
@TheodorZoulias 提到的有效,但我觉得有点太复杂了。
我写了一个更简单的替代方案,但我正在寻找最合适的名称
/// <summary>
/// Concatenates most recent inner observable sequence when previous completes.
/// Similar to Concat, but it ignores out of date inner observable sequences.
/// Similar to Exhaust, but it preserves latest inner observable.
/// </summary>
public static IObservable<T> ConcatExhaust<T>(this IObservable<IObservable<T>> source)
{
return Observable.Defer(() =>
{
IObservable<T> latest = default;
return source
.Select(inner =>
{
latest = inner;
return Observable.Defer(() => latest == inner ? inner : Observable.Empty<T>());
})
.ConcatExhaust();
});
}
以下测试:
var source = Observable.Interval(TimeSpan.FromMilliseconds(300))
.Take(5)
.Do(val => Console.WriteLine($"Source: {val}"));
source.Select(val => Observable.FromAsync(() => LoadAsync(val)))
.ConcatExhaust()
.Wait();
returns:
Source: 0
Load Started: 0
Source: 1
Source: 2
Source: 3
Value finished: 0
Load Started: 3
Source: 4
Value finished: 3
Load Started: 4
Value finished: 4
我有一个由可观察订阅触发的异步加载方法。
我想使用以下策略防止并发调用 LoadAsync:
我几乎可以肯定这个策略有名字,但我想不起来了。
如果之前的调用尚未完成,则不会调用 LoadAsync。
- LoadAsync(3) 仅在 LoadAsync(1) 完成后才开始
仅针对最新值调用 LoadAsync
- 从未调用过 LoadAsync(2)
我目前使用此 switchmap 等效项,但它取消了 LoadAsync 方法以支持新调用。
Observable<int> sourceObservable..
sourceObservable
.Select(value => Observable.FromAsync(() => LoadAsync(value)))
.Switch()
async Task<int> LoadAsync(int value)
{
await Task.Delay(1000);
return value;
}
在 javascript 中将是:
source$.pipe(switchMap(Load))
function load(id): Observable<int> {
return delay(1000).pipe(map(() => id));
}
编辑:我想知道为什么还没有人要求这种扁平化策略。我认为这是很常见的要求。
例如,这就是 CI/CD 管道通常的工作方式。如果对一个分支的推送太多而你没有能力构建它们,但你想确保构建最新的提交
换句话说,您想从第 2 级可观察对象中取出一个子可观察对象,获取所有结果并等待其完成,然后重复。
这将适用于 C#:
public static IObservable<T> SingleSwitch<T, U>(this IObservable<U> source, Func<U, IObservable<T>> selector)
{
return source.Publish(_source => _source
.Take(1)
.SelectMany(e => selector(e))
.Repeat()
);
}
请注意,您的 Load
中存在错误。您需要添加 await
才能使延迟正常工作。
示例代码:
Observable.Interval(TimeSpan.FromSeconds(.75))
.SingleSwitch(e => Observable.FromAsync (async () => {
await Task.Delay(1000);
return e;
}))
.Dump();
输出是偶数,奇数被阻塞因为偶数是运行。
我写了一个更简单的替代方案,但我正在寻找最合适的名称
/// <summary>
/// Concatenates most recent inner observable sequence when previous completes.
/// Similar to Concat, but it ignores out of date inner observable sequences.
/// Similar to Exhaust, but it preserves latest inner observable.
/// </summary>
public static IObservable<T> ConcatExhaust<T>(this IObservable<IObservable<T>> source)
{
return Observable.Defer(() =>
{
IObservable<T> latest = default;
return source
.Select(inner =>
{
latest = inner;
return Observable.Defer(() => latest == inner ? inner : Observable.Empty<T>());
})
.ConcatExhaust();
});
}
以下测试:
var source = Observable.Interval(TimeSpan.FromMilliseconds(300))
.Take(5)
.Do(val => Console.WriteLine($"Source: {val}"));
source.Select(val => Observable.FromAsync(() => LoadAsync(val)))
.ConcatExhaust()
.Wait();
returns:
Source: 0
Load Started: 0
Source: 1
Source: 2
Source: 3
Value finished: 0
Load Started: 3
Source: 4
Value finished: 3
Load Started: 4
Value finished: 4