RxJava:window/buffer 重载问题,与 Rx.NET 不兼容?
RxJava: window/buffer overload questions, incompatibility with Rx.NET?
我正在阅读 http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html#SequencesOfCoincidence 并在其中找到有关 window
运算符的以下部分:
"The first of these complex overloads allows us to control when windows should close. The windowClosingSelector function is called each time a window is created. Windows are created on subscription and immediately after a window closes; windows close when the sequence from the windowClosingSelector produces a value. The value is disregarded so it doesn't matter what type the sequence values are; in fact you can just complete the sequence from windowClosingSelector to close the window instead."
这似乎与 public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)
方法文档和行为不同。我的测试表明,用于生成结束可观察对象的 Func0
仅被调用一次,完成上述可观察对象的工作也不会被调用 - 只有一个 window 被创建。该方法实际上与 public final <U> Observable<Observable<T>> window(Observable<U> boundary)
方法的工作原理相同,只是边界是直接传递的,而不是使用函数。从源代码来看,latter/simpler 方法只是通过将传递的边界包装到 Func0 中来调用 former/more 复杂的方法,而 Func0 只是 returns 它。
问题:
RxJava 的 public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)
的行为似乎是错误的,除非这本书是错误的或者 Rx.NET 的新版本,与 RxJava 兼容,改变了。这是 Rx.NET 和 RxJava 之间故意不兼容吗?
如果不兼容是正确的行为,为什么 RxJava 有 2 个 window
重载实际上做的几乎一样?如果它不接受任何参数并且将被调用一次,我没有看到使用创建 window 关闭可观察对象的函数进行重载的任何明显优势。好的,它使可观察对象的创建变得懒惰,但我认为这可以通过 defer
方法来实现。
更令人困惑的是,两个重载的弹珠图略有不同(箭头是)。也许我只是不明白那里真正的区别?
buffer
方法存在相同的两个重载。
这是一些代码,或多或少是上述文章的移植。每行输入关闭当前并打开一个新的 window,除非它是 'q'(不区分大小写),在这种情况下整个事情结束。
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS);
PublishSubject<Object> closer = PublishSubject.create();
Subscription s = source
.window(() -> {
System.out.println("!!! creating window closer");
return closer;
})
.subscribe(new Subscriber<Observable<Long>>() {
private int idx = 0;
@Override
public void onNext(Observable<Long> window) {
++idx;
System.out.printf("+++ starting new window%n");
String windowName = "window " + idx;
window.subscribe(new Subscriber<Long>() {
@Override
public void onNext(Long aLong) {
System.out.printf("%s -> %d%n", windowName, aLong);
}
@Override
public void onError(Throwable e) {
// nothing
}
@Override
public void onCompleted() {
System.out.printf("--- %s completed%n", windowName);
}
});
}
@Override
public void onError(Throwable e) {
// nothing
}
@Override
public void onCompleted() {
System.out.println("completed");
}
});
Scanner scanner = new Scanner(System.in);
String input;
do {
input = scanner.nextLine();
closer.onNext(input);
//closer.onCompleted();
} while (!"q".equalsIgnoreCase(input));
s.unsubscribe();
将 closer.onNext()
行替换为 closer.onCompleted()
会破坏应用程序 - 只会创建一个 window。
按照建议,我在 Github 页面上提出了这个问题,它已得到解决:https://github.com/ReactiveX/RxJava/issues/3053。
我正在阅读 http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html#SequencesOfCoincidence 并在其中找到有关 window
运算符的以下部分:
"The first of these complex overloads allows us to control when windows should close. The windowClosingSelector function is called each time a window is created. Windows are created on subscription and immediately after a window closes; windows close when the sequence from the windowClosingSelector produces a value. The value is disregarded so it doesn't matter what type the sequence values are; in fact you can just complete the sequence from windowClosingSelector to close the window instead."
这似乎与 public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)
方法文档和行为不同。我的测试表明,用于生成结束可观察对象的 Func0
仅被调用一次,完成上述可观察对象的工作也不会被调用 - 只有一个 window 被创建。该方法实际上与 public final <U> Observable<Observable<T>> window(Observable<U> boundary)
方法的工作原理相同,只是边界是直接传递的,而不是使用函数。从源代码来看,latter/simpler 方法只是通过将传递的边界包装到 Func0 中来调用 former/more 复杂的方法,而 Func0 只是 returns 它。
问题:
RxJava 的
public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)
的行为似乎是错误的,除非这本书是错误的或者 Rx.NET 的新版本,与 RxJava 兼容,改变了。这是 Rx.NET 和 RxJava 之间故意不兼容吗?如果不兼容是正确的行为,为什么 RxJava 有 2 个
window
重载实际上做的几乎一样?如果它不接受任何参数并且将被调用一次,我没有看到使用创建 window 关闭可观察对象的函数进行重载的任何明显优势。好的,它使可观察对象的创建变得懒惰,但我认为这可以通过defer
方法来实现。更令人困惑的是,两个重载的弹珠图略有不同(箭头是)。也许我只是不明白那里真正的区别?
buffer
方法存在相同的两个重载。
这是一些代码,或多或少是上述文章的移植。每行输入关闭当前并打开一个新的 window,除非它是 'q'(不区分大小写),在这种情况下整个事情结束。
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS);
PublishSubject<Object> closer = PublishSubject.create();
Subscription s = source
.window(() -> {
System.out.println("!!! creating window closer");
return closer;
})
.subscribe(new Subscriber<Observable<Long>>() {
private int idx = 0;
@Override
public void onNext(Observable<Long> window) {
++idx;
System.out.printf("+++ starting new window%n");
String windowName = "window " + idx;
window.subscribe(new Subscriber<Long>() {
@Override
public void onNext(Long aLong) {
System.out.printf("%s -> %d%n", windowName, aLong);
}
@Override
public void onError(Throwable e) {
// nothing
}
@Override
public void onCompleted() {
System.out.printf("--- %s completed%n", windowName);
}
});
}
@Override
public void onError(Throwable e) {
// nothing
}
@Override
public void onCompleted() {
System.out.println("completed");
}
});
Scanner scanner = new Scanner(System.in);
String input;
do {
input = scanner.nextLine();
closer.onNext(input);
//closer.onCompleted();
} while (!"q".equalsIgnoreCase(input));
s.unsubscribe();
将 closer.onNext()
行替换为 closer.onCompleted()
会破坏应用程序 - 只会创建一个 window。
按照建议,我在 Github 页面上提出了这个问题,它已得到解决:https://github.com/ReactiveX/RxJava/issues/3053。