RxJava groupBy 和后续阻塞操作(onComplete 缺失?)

RxJava groupBy and subsequent blocking operations (onComplete missing?)

我已将我的问题归结为以下片段:

Observable<Integer> numbers = Observable.just(1, 2, 3);

Observable<GroupedObservable<Integer,Integer>> outer = numbers.groupBy(i->i%3);
System.out.println(outer.count().toBlocking().single());

无限阻塞。我已经 reading posts 并且相信我理解问题所在:GroupedObservables 将不会调用 onComplete 直到它们的内部 Observables 也已完成。不幸的是,尽管我仍然无法打印上面的代码片段!

例如,以下内容:

Observable<Integer> just = Observable.just(1, 2, 3);

Observable<GroupedObservable<Integer,Integer>> groupBy = just.groupBy(i->i%3);
groupBy.subscribe(inner -> inner.ignoreElements());
System.out.println(groupBy.count().toBlocking().single());

还是什么都不做。我误解了这个问题吗?还有其他问题吗?简而言之,我怎样才能让上面的代码片段起作用?

非常感谢, 段.

是的,您必须以某种方式消费群组。您的第二个示例不起作用,因为您有两个独立订阅分组操作。

通常,解决方案是 flatMap,但不是 ignoreElements,因为那样会完成,而 count 不会获得任何元素。相反,您可以使用 takeLast(1):

Observable.just(1, 2, 3)
    .groupBy(k -> k % 3)
    .flatMap(g -> g.takeLast(1))
    .count()
    .toBlocking()
    .forEach(System.out::println);