RxJava:window/buffer 重载问题,与 Rx.NET 不兼容?

RxJava: window/buffer overload questions, incompatibility with Rx.NET?

我正在阅读 http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html#SequencesOfCoincidence 并在其中找到有关 window 运算符的以下部分:

"The first of these complex overloads allows us to control when windows should close. The windowClosingSelector function is called each time a window is created. Windows are created on subscription and immediately after a window closes; windows close when the sequence from the windowClosingSelector produces a value. The value is disregarded so it doesn't matter what type the sequence values are; in fact you can just complete the sequence from windowClosingSelector to close the window instead."

这似乎与 public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) 方法文档和行为不同。我的测试表明,用于生成结束可观察对象的 Func0 仅被调用一次,完成上述可观察对象的工作也不会被调用 - 只有一个 window 被创建。该方法实际上与 public final <U> Observable<Observable<T>> window(Observable<U> boundary) 方法的工作原理相同,只是边界是直接传递的,而不是使用函数。从源代码来看,latter/simpler 方法只是通过将传递的边界包装到 Func0 中来调用 former/more 复杂的方法,而 Func0 只是 returns 它。

问题:

  1. RxJava 的 public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) 的行为似乎是错误的,除非这本书是错误的或者 Rx.NET 的新版本,与 RxJava 兼容,改变了。这是 Rx.NET 和 RxJava 之间故意不兼容吗?

  2. 如果不兼容是正确的行为,为什么 RxJava 有 2 个 window 重载实际上做的几乎一样?如果它不接受任何参数并且将被调用一次,我没有看到使用创建 window 关闭可观察对象的函数进行重载的任何明显优势。好的,它使可观察对象的创建变得懒惰,但我认为这可以通过 defer 方法来实现。

  3. 更令人困惑的是,两个重载的弹珠图略有不同(箭头是)。也许我只是不明白那里真正的区别?

buffer 方法存在相同的两个重载。

这是一些代码,或多或少是上述文章的移植。每行输入关闭当前并打开一个新的 window,除非它是 'q'(不区分大小写),在这种情况下整个事情结束。

Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS);
PublishSubject<Object> closer = PublishSubject.create();

Subscription s = source
        .window(() -> {
            System.out.println("!!! creating window closer");
            return closer;
        })
        .subscribe(new Subscriber<Observable<Long>>() {

            private int idx = 0;

            @Override
            public void onNext(Observable<Long> window) {
                ++idx;
                System.out.printf("+++ starting new window%n");
                String windowName = "window " + idx;
                window.subscribe(new Subscriber<Long>() {

                    @Override
                    public void onNext(Long aLong) {
                        System.out.printf("%s -> %d%n", windowName, aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        // nothing
                    }

                    @Override
                    public void onCompleted() {
                        System.out.printf("--- %s completed%n", windowName);
                    }
                });
            }

            @Override
            public void onError(Throwable e) {
                // nothing
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        });

Scanner scanner = new Scanner(System.in);
String input;
do {
    input = scanner.nextLine();
    closer.onNext(input);
    //closer.onCompleted();
} while (!"q".equalsIgnoreCase(input));
s.unsubscribe();

closer.onNext() 行替换为 closer.onCompleted() 会破坏应用程序 - 只会创建一个 window。

按照建议,我在 Github 页面上提出了这个问题,它已得到解决:https://github.com/ReactiveX/RxJava/issues/3053