当另一个 Observable 提供值时暂时忽略来自 Observable 的值
Momentarily ignore values from Observable when another Observable provides a value
当另一个 Observable 提供值时,我需要在一段时间内忽略 Observable 值。
目前,我的实现使用变量来控制阻止(或忽略)。
bool block = false;
var blocker = observable1.Do(_ => block = true )
.Throttle( _ => Observable.Timer(_timeToBlock)
.Subscribe( _ => block = false ));
var receiver = observable2.Where( i => !block && SomeCondition(i) )
.Subscribe( i=> EvenMoreStuff(i) );
是否有更多的 Rx 方法通过组合这两个可观察值来做到这一点?
编辑:对拦截器订阅的小改动
第一个任务是将您的 block
变量表示为可观察变量。
IObservable<bool> blockObservable = observable1
.Select(x => Observable.Concat(
Observable.Return(true),
Observable.Return(false).Delay(_timeToBlock)))
.Switch()
.DistinctUntilChanged();
每次 observable1
发出一个值,我们 select 一个发出 true
的可观察对象,等待 _timeToBlock
,然后发出 false
。 Switch
总是切换到最近的可观察值。
这是弹珠图。假设 _timeToBlock
的长度为 3 个字符。
observable1 -----x--xx-x-----x-----
select0 T--F
select1 T--F
select2 T--F
select3 T--F
select4 T--F
switch -----T--TT-T--F--T--F--
blockObservable -----T--------F--T--F--
现在我们可以 Zip
您的值序列与 blockObservable
的 MostRecent
值。
var receiverObservable = observable2
.Zip(blockObservable.MostRecent(false), (value, block) => new { value, block })
.Where(x => !x.block)
.Select(x => x.value);
作为使用一次性用品的替代方法,您可以创建一个小型扩展方法:
public static IObservable<TResult> Suppress<TResult, TOther>(
this IObservable<TResult> source,
IObservable<TOther> other,
TimeSpan delayFor)
{
return Observable.Create<TResult>(observer => {
var published = source.Publish();
var connected = new SerialDisposable();
Func<IDisposable> connect = () => published.Subscribe(observer);
var suppressor = other.Select(_ => Observable.Timer(delayFor)
.Select(_2 => connect())
.StartWith(Disposable.Empty))
.Switch();
return new CompositeDisposable(
connected,
suppressor.StartWith(connect())
.Subscribe(d => connected.Disposable = d),
published.Connect());
});
}
这会将源 observable 转换为 ConnectableObservable
,然后每次 other
源发出它都会处理其订阅,然后在计时器到期时重新连接。
当另一个 Observable 提供值时,我需要在一段时间内忽略 Observable 值。
目前,我的实现使用变量来控制阻止(或忽略)。
bool block = false;
var blocker = observable1.Do(_ => block = true )
.Throttle( _ => Observable.Timer(_timeToBlock)
.Subscribe( _ => block = false ));
var receiver = observable2.Where( i => !block && SomeCondition(i) )
.Subscribe( i=> EvenMoreStuff(i) );
是否有更多的 Rx 方法通过组合这两个可观察值来做到这一点?
编辑:对拦截器订阅的小改动
第一个任务是将您的 block
变量表示为可观察变量。
IObservable<bool> blockObservable = observable1
.Select(x => Observable.Concat(
Observable.Return(true),
Observable.Return(false).Delay(_timeToBlock)))
.Switch()
.DistinctUntilChanged();
每次 observable1
发出一个值,我们 select 一个发出 true
的可观察对象,等待 _timeToBlock
,然后发出 false
。 Switch
总是切换到最近的可观察值。
这是弹珠图。假设 _timeToBlock
的长度为 3 个字符。
observable1 -----x--xx-x-----x-----
select0 T--F
select1 T--F
select2 T--F
select3 T--F
select4 T--F
switch -----T--TT-T--F--T--F--
blockObservable -----T--------F--T--F--
现在我们可以 Zip
您的值序列与 blockObservable
的 MostRecent
值。
var receiverObservable = observable2
.Zip(blockObservable.MostRecent(false), (value, block) => new { value, block })
.Where(x => !x.block)
.Select(x => x.value);
作为使用一次性用品的替代方法,您可以创建一个小型扩展方法:
public static IObservable<TResult> Suppress<TResult, TOther>(
this IObservable<TResult> source,
IObservable<TOther> other,
TimeSpan delayFor)
{
return Observable.Create<TResult>(observer => {
var published = source.Publish();
var connected = new SerialDisposable();
Func<IDisposable> connect = () => published.Subscribe(observer);
var suppressor = other.Select(_ => Observable.Timer(delayFor)
.Select(_2 => connect())
.StartWith(Disposable.Empty))
.Switch();
return new CompositeDisposable(
connected,
suppressor.StartWith(connect())
.Subscribe(d => connected.Disposable = d),
published.Connect());
});
}
这会将源 observable 转换为 ConnectableObservable
,然后每次 other
源发出它都会处理其订阅,然后在计时器到期时重新连接。