"Neverending" TakeWhile、BufferWhile 和 SkipWhile RX.Net 序列
"Neverending" TakeWhile, BufferWhile and SkipWhile RX.Net Sequences
我想知道是否有一种方法可以获取可观察流并使用 *While 运算符,尤其是 TakeWhile、SkipWhile 和 BufferWhile,以便它们的订阅者在 bool 'while' 条件是否满足?
当我开始使用 .TakeWhile / SkipWhile 和 BufferWhile 运算符时,我假设它们不会终止 / .OnComplete(),而只是(不)在满足 bool 条件时发出。
举个例子可能更有意义:
我有一个 bool 标志,它指示一个实例是否忙碌,以及一个可观察的数据流:
private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
.. 并像那样使用/设置 RX 流(简化)
private void SetupRx()
{
ConsumerSubscription = Producer
.SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
.BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s)
.Subscribe(i => DoSomething(i));
}
private void DoSomething(int i)
{
try
{
IsBusy = true;
// ... do something
}
finally
{
IsBusy = false;
}
}
每当 IsBusy/IgnoreChanges 标志从 true 切换到 false 并返回时,.SkipeWhile/.BufferWhile 不应完成/OnComplete(..) 但保持流活动。
开箱即用 RX.Net 是否可行 and/or 有人知道如何实现吗?
要删除来自 IObservable<T>
源的 OnCompleted
消息,只需 Concat
和 Observable.Never<T>()
:
source.TakeWhile(condition).Concat(Observable.Never<T>())
要手动订阅 IObservable<T>
来源,以便仅在您手动取消订阅时结束订阅,您可以使用 Publish
和 IConnectableObservable<T>
:
var connectableSource = source.Publish();
// To subscribe to the source:
var subscription = connectableSource.Connect();
...
// To unsubscribe from the source:
subscription.Dispose();
综上所述,我认为您的处理方式不正确。如果操作正确,您将不需要上述技巧。查看您的查询:
ConsumerSubscription = Producer
// Drop the producer's stream of ints whenever the IgnoreChanges flag
// is set to true, but forward them whenever the IgnoreChanges flag is set to false
.SkipWhile(_ => IgnoreChanges == true)
// For all streamed instances buffer them as long as we are busy
// handling the previous one(s)
.BufferWhile(_ => IsBusy == true)
.Subscribe(i => DoSomething(i));
您应该使用 .Where(_ => !IgnoreChanges)
而不是 .SkipWhile(_ => IgnoreChanges)
。
您应该使用 .Buffer(_ => IsBusy.SkipWhile(busy => busy))
和 BehaviorSubject<bool> IsBusy
而不是 .BufferWhile(_ => IsBusy)
。
完整的代码如下所示:
private BehaviorSubject<bool> IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
private void SetupRx()
{
ConsumerSubscription = Producer
.Where(_ => !IgnoreChanges)
.Buffer(_ => IsBusy.SkipWhile(busy => busy))
.Subscribe(buffer => DoSomething(buffer));
}
private void DoSomething(IList<int> buffer)
{
try
{
IsBusy.OnNext(true);
// Do something
}
finally
{
IsBusy.OnNext(false);
}
}
下一个改进是尝试摆脱 BehaviorSubject<bool> IsBusy
。主题是您要尽量避免的东西,因为它们是您必须管理的状态。
我想知道是否有一种方法可以获取可观察流并使用 *While 运算符,尤其是 TakeWhile、SkipWhile 和 BufferWhile,以便它们的订阅者在 bool 'while' 条件是否满足?
当我开始使用 .TakeWhile / SkipWhile 和 BufferWhile 运算符时,我假设它们不会终止 / .OnComplete(),而只是(不)在满足 bool 条件时发出。
举个例子可能更有意义:
我有一个 bool 标志,它指示一个实例是否忙碌,以及一个可观察的数据流:
private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
.. 并像那样使用/设置 RX 流(简化)
private void SetupRx()
{
ConsumerSubscription = Producer
.SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
.BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s)
.Subscribe(i => DoSomething(i));
}
private void DoSomething(int i)
{
try
{
IsBusy = true;
// ... do something
}
finally
{
IsBusy = false;
}
}
每当 IsBusy/IgnoreChanges 标志从 true 切换到 false 并返回时,.SkipeWhile/.BufferWhile 不应完成/OnComplete(..) 但保持流活动。
开箱即用 RX.Net 是否可行 and/or 有人知道如何实现吗?
要删除来自 IObservable<T>
源的 OnCompleted
消息,只需 Concat
和 Observable.Never<T>()
:
source.TakeWhile(condition).Concat(Observable.Never<T>())
要手动订阅 IObservable<T>
来源,以便仅在您手动取消订阅时结束订阅,您可以使用 Publish
和 IConnectableObservable<T>
:
var connectableSource = source.Publish();
// To subscribe to the source:
var subscription = connectableSource.Connect();
...
// To unsubscribe from the source:
subscription.Dispose();
综上所述,我认为您的处理方式不正确。如果操作正确,您将不需要上述技巧。查看您的查询:
ConsumerSubscription = Producer
// Drop the producer's stream of ints whenever the IgnoreChanges flag
// is set to true, but forward them whenever the IgnoreChanges flag is set to false
.SkipWhile(_ => IgnoreChanges == true)
// For all streamed instances buffer them as long as we are busy
// handling the previous one(s)
.BufferWhile(_ => IsBusy == true)
.Subscribe(i => DoSomething(i));
您应该使用 .Where(_ => !IgnoreChanges)
而不是 .SkipWhile(_ => IgnoreChanges)
。
您应该使用 .Buffer(_ => IsBusy.SkipWhile(busy => busy))
和 BehaviorSubject<bool> IsBusy
而不是 .BufferWhile(_ => IsBusy)
。
完整的代码如下所示:
private BehaviorSubject<bool> IsBusy { get;set; }
private bool IgnoreChanges { get;set; }
private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }
private void SetupRx()
{
ConsumerSubscription = Producer
.Where(_ => !IgnoreChanges)
.Buffer(_ => IsBusy.SkipWhile(busy => busy))
.Subscribe(buffer => DoSomething(buffer));
}
private void DoSomething(IList<int> buffer)
{
try
{
IsBusy.OnNext(true);
// Do something
}
finally
{
IsBusy.OnNext(false);
}
}
下一个改进是尝试摆脱 BehaviorSubject<bool> IsBusy
。主题是您要尽量避免的东西,因为它们是您必须管理的状态。