将 Observable 拆分为 windows

Splitting Observable into windows

我正在寻找 Observable.Window 类似运算符的 window 选择器

例如,我们可以取一个自然数序列。 我想知道如何将此序列拆分为 windows,以便在数字大于 4 或 window 大小已达到 3

时每个新的 window 开始

输入序列是IObservable<int> 输出序列为IObservable<IObservable<int>>

序列 1 2 5 3 1 1 2 3 1 5 0 将产生 windows 1 2; 5 3 1; 1 2 3; 1; 5 0

使用 C# 这可行:

var observable =
    source
        .Concat(Observable.Return(-1))
        .Publish(sp =>
            sp.Zip(sp.Skip(1), (x0, x1) => new { x0, x1 })
                .Publish(zsp =>
                    zsp
                        .Window(zsp.Where(x => x.x1 >= 4))
                        .Select(xs => xs.Select(x => x.x0).Window(3))
                        .Merge()));

我得到这个结果:

根据我对问题的评论,我不确定序列 1, 2, 3, 5 是否应该产生 windows 1 2 3 | 51 2 3 | | 5(请参阅空 window 这就是 Enigmativity 的答案所产生的)。我的回答不会产生空 window:

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> predicate, int maximumWindowSize)
{
    return Observable.Create<IObservable<T>>(obs =>
    {
        var currentWindow = new Subject<T>();
        obs.OnNext(currentWindow);

        var count = 0;

        return source.Subscribe(x =>
        {
            if (count == maximumWindowSize || predicate(x))
            {
                count = 0;
                currentWindow.OnCompleted();
                currentWindow = new Subject<T>();
                obs.OnNext(currentWindow);
            }
            currentWindow.OnNext(x);
            count++;
        }, obs.OnError, () =>
        {
            obs.OnCompleted();
            currentWindow.OnCompleted();
        });
    });
}

可以这样使用:

var windows = source.Window(x => x > 4, 3);