使用 Reactive Extensions (RX),是否可以添加 "Pause" 命令?

With Reactive Extensions (RX), is it possible to add a "Pause" command?

我有一个 class,它接受一个事件流,并推出另一个事件流。

所有事件都使用响应式扩展 (RX)。使用 .OnNext 将传入事件流从外部源推送到 IObserver<T>,并使用 IObservable<T>.Subscribe 将传出事件流推出。我正在使用 Subject<T> 在幕后管理它。

我想知道RX中有什么技术可以暂时暂停输出。这意味着传入事件将在内部队列中累积,并且当它们未暂停时,事件将再次流出。

您可以用 Observable 模拟 pausing/unpausing。

一旦您的 pauseObservable 发出一个 'paused' 值,缓冲事件直到 pauseObservable 发出一个 'unpaused' 值。

这是一个使用 BufferUntil implementation by Dave Sexton and 的例子(来自我自己的问题)

        //Input events, hot observable
        var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => i.ToString())
            .Publish().RefCount();

       //Simulate pausing from Keyboard, not actually relevant within this answer
        var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            k => KeyPressed += k, k => KeyPressed -= k)
            .Select(i => i.EventArgs.PressedKey)
            .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
            .DistinctUntilChanged();

        //Let events through when not paused
        var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
            .Where(i => !i.paused) //not paused
            .Select(i => i.value)
            .Subscribe(Console.WriteLine);

        //When paused, buffer until not paused
        var pausedEvents = pauseObservable.Where(i => i)
            .Subscribe(_ =>
                source.BufferUntil(pauseObservable.Where(i => !i))
                    .Select(i => String.Join(Environment.NewLine, i))
                    .Subscribe(Console.WriteLine));

改进空间:可能将两个订阅源(pausedEvents 和 notPausedEvents)合并为一个。

这里有一个相当简单的 Rx 方法来做你想做的事。我创建了一个名为 Pausable 的扩展方法,它采用一个源可观察对象和一个 boolean 的第二个可观察对象,用于暂停或恢复可观察对象。

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

可以这样使用:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

现在,我唯一不太明白你所说的 "incoming stream of events is an IObserver<T>" 是什么意思。流是 IObservable<T>。观察者不是流。听起来你没有在这里做某事。你能补充你的问题并进一步解释吗?

这是我使用 Buffer 和 Window 运算符的解决方案:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}

Window 在订阅时打开,每次从暂停流接收到 'true'。当 pauser 提供 'false' 值时它关闭。

Buffer 做它应该做的事情,缓冲来自 pauser 'false' 和 'true' 之间的值。一旦 Buffer 接收到 'true',它会立即立即输出 IList 值。

DotNetFiddle link: https://dotnetfiddle.net/vGU5dJ