RxJava groupBy 和后续阻塞操作(onComplete 缺失?)
RxJava groupBy and subsequent blocking operations (onComplete missing?)
我已将我的问题归结为以下片段:
Observable<Integer> numbers = Observable.just(1, 2, 3);
Observable<GroupedObservable<Integer,Integer>> outer = numbers.groupBy(i->i%3);
System.out.println(outer.count().toBlocking().single());
无限阻塞。我已经 reading posts 并且相信我理解问题所在:GroupedObservables 将不会调用 onComplete 直到它们的内部 Observables 也已完成。不幸的是,尽管我仍然无法打印上面的代码片段!
例如,以下内容:
Observable<Integer> just = Observable.just(1, 2, 3);
Observable<GroupedObservable<Integer,Integer>> groupBy = just.groupBy(i->i%3);
groupBy.subscribe(inner -> inner.ignoreElements());
System.out.println(groupBy.count().toBlocking().single());
还是什么都不做。我误解了这个问题吗?还有其他问题吗?简而言之,我怎样才能让上面的代码片段起作用?
非常感谢,
段.
是的,您必须以某种方式消费群组。您的第二个示例不起作用,因为您有两个独立订阅分组操作。
通常,解决方案是 flatMap
,但不是 ignoreElements
,因为那样会完成,而 count
不会获得任何元素。相反,您可以使用 takeLast(1)
:
Observable.just(1, 2, 3)
.groupBy(k -> k % 3)
.flatMap(g -> g.takeLast(1))
.count()
.toBlocking()
.forEach(System.out::println);
我已将我的问题归结为以下片段:
Observable<Integer> numbers = Observable.just(1, 2, 3);
Observable<GroupedObservable<Integer,Integer>> outer = numbers.groupBy(i->i%3);
System.out.println(outer.count().toBlocking().single());
无限阻塞。我已经 reading
例如,以下内容:
Observable<Integer> just = Observable.just(1, 2, 3);
Observable<GroupedObservable<Integer,Integer>> groupBy = just.groupBy(i->i%3);
groupBy.subscribe(inner -> inner.ignoreElements());
System.out.println(groupBy.count().toBlocking().single());
还是什么都不做。我误解了这个问题吗?还有其他问题吗?简而言之,我怎样才能让上面的代码片段起作用?
非常感谢, 段.
是的,您必须以某种方式消费群组。您的第二个示例不起作用,因为您有两个独立订阅分组操作。
通常,解决方案是 flatMap
,但不是 ignoreElements
,因为那样会完成,而 count
不会获得任何元素。相反,您可以使用 takeLast(1)
:
Observable.just(1, 2, 3)
.groupBy(k -> k % 3)
.flatMap(g -> g.takeLast(1))
.count()
.toBlocking()
.forEach(System.out::println);