"Neverending" TakeWhile、BufferWhile 和 SkipWhile RX.Net 序列

"Neverending" TakeWhile, BufferWhile and SkipWhile RX.Net Sequences

我想知道是否有一种方法可以获取可观察流并使用 *While 运算符,尤其是 TakeWhile、SkipWhile 和 BufferWhile,以便它们的订阅者在 bool 'while' 条件是否满足?

当我开始使用 .TakeWhile / SkipWhile 和 BufferWhile 运算符时,我假设它们不会终止 / .OnComplete(),而只是(不)在满足 bool 条件时发出。

举个例子可能更有意义:

我有一个 bool 标志,它指示一个实例是否忙碌,以及一个可观察的数据流:

private bool IsBusy { get;set; }
private bool IgnoreChanges { get;set; }

private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }

.. 并像那样使用/设置 RX 流(简化)

private void SetupRx()
{
    ConsumerSubscription = Producer
        .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false
        .BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s)
        .Subscribe(i => DoSomething(i));
}

private void DoSomething(int i)
{
    try
    {
        IsBusy = true;
        // ... do something
    }
    finally
    {
        IsBusy = false;
    }
}

每当 IsBusy/IgnoreChanges 标志从 true 切换到 false 并返回时,.SkipeWhile/.BufferWhile 不应完成/OnComplete(..) 但保持流活动。

开箱即用 RX.Net 是否可行 and/or 有人知道如何实现吗?

要删除来自 IObservable<T> 源的 OnCompleted 消息,只需 ConcatObservable.Never<T>():

source.TakeWhile(condition).Concat(Observable.Never<T>())

要手动订阅 IObservable<T> 来源,以便仅在您手动取消订阅时结束订阅,您可以使用 PublishIConnectableObservable<T>:

var connectableSource = source.Publish();
// To subscribe to the source:
var subscription = connectableSource.Connect();
...
// To unsubscribe from the source:
subscription.Dispose();

综上所述,我认为您的处理方式不正确。如果操作正确,您将不需要上述技巧。查看您的查询:

ConsumerSubscription = Producer
    // Drop the producer's stream of ints whenever the IgnoreChanges flag
    // is set to true, but forward them whenever the IgnoreChanges flag is set to false
    .SkipWhile(_ => IgnoreChanges == true) 
    // For all streamed instances buffer them as long as we are busy
    // handling the previous one(s)
    .BufferWhile(_ => IsBusy == true) 
    .Subscribe(i => DoSomething(i));

您应该使用 .Where(_ => !IgnoreChanges) 而不是 .SkipWhile(_ => IgnoreChanges)

您应该使用 .Buffer(_ => IsBusy.SkipWhile(busy => busy))BehaviorSubject<bool> IsBusy 而不是 .BufferWhile(_ => IsBusy)

完整的代码如下所示:

private BehaviorSubject<bool> IsBusy { get;set; }
private bool IgnoreChanges { get;set; }

private IObservable<int> Producer { get;set; }
private IDisposable ConsumerSubscription { get;set; }

private void SetupRx()
{
    ConsumerSubscription = Producer
        .Where(_ => !IgnoreChanges)
        .Buffer(_ => IsBusy.SkipWhile(busy => busy))
        .Subscribe(buffer => DoSomething(buffer));
}

private void DoSomething(IList<int> buffer)
{
    try
    {
        IsBusy.OnNext(true);
        // Do something
    }
    finally
    {
        IsBusy.OnNext(false);
    }
}

下一个改进是尝试摆脱 BehaviorSubject<bool> IsBusy。主题是您要尽量避免的东西,因为它们是您必须管理的状态。