groupBy 运算符,来自不同组的项目交错

groupBy operator, items from different groups interleaved

以下代码:

    Observable
            .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
            .doOnNext(item -> System.out.println("source emitting " + item))
            .groupBy(item -> {
                System.out.println("groupBy called for " + item);
                return item % 3;
            })
            .subscribe(observable -> {
                System.out.println("got observable " + observable + " for key " + observable.getKey());
                observable.subscribe(item -> {
                    System.out.println("key " + observable.getKey() + ", item " + item);
                });
            });

让我很困惑。我得到的输出是:

    source emitting 0
    groupBy called for 0
    got observable rx.observables.GroupedObservable@42110406 for key 0
    key 0, item 0
    source emitting 1
    groupBy called for 1
    got observable rx.observables.GroupedObservable@1698c449 for key 1
    key 1, item 1
    source emitting 2
    groupBy called for 2
    got observable rx.observables.GroupedObservable@5ef04b5 for key 2
    key 2, item 2
    source emitting 3
    groupBy called for 3
    key 0, item 3
    source emitting 4
    groupBy called for 4
    key 1, item 4
    source emitting 5
    groupBy called for 5
    key 2, item 5
    source emitting 6
    groupBy called for 6
    key 0, item 6
    source emitting 7
    groupBy called for 7
    key 1, item 7
    source emitting 8
    groupBy called for 8
    key 2, item 8
    source emitting 9
    groupBy called for 9
    key 0, item 9

因此,在顶级订阅方法中,我按预期从 GroupedObservable 获得了 3 个可观察值。然后,我一个一个地订阅了分组的可观察对象——这里是我不明白的地方:

为什么原始项目仍然按原始序列(即 0、1、2、3、...)发出,而不是 0、3、6、9 ...对于键 0,然后是 1, 4、7 表示键 1,然后是 2、5、8 表示键 2?

我想我了解群组是如何创建的:

1. 0 is emitted, the key function is called and it gets 0
2. it is checked if an observable for 0 exists, it doesn't, so a new one is created and emitted, and then it emits 0
3. the same happens for source items 1 and 2 as they both create new groups, and observables with key 1 and 2 are emitted, and they emit 1 and 2 correspondingly
4. source item 3 is emitted, the key function is called and it gets 0
5. it is checked if an observable for 0 exists, it does -> no new grouped observable is created nor emitted, but 3 is emitted by the already existing observable
6. etc. until the source sequence is drained

似乎虽然我一个一个地得到分组的可观察量,但它们的发射以某种方式交错。这是怎么发生的?

Why are the original items still emitted in the original sequence (i.e. 0, 1, 2, 3, ...) and not 0, 3, 6, 9 ... for key 0, followed by 1, 4, 7 for key 1, followed by 2, 5, 8 for key 2?

您已经回答了自己的问题。您正在按照项目发出的顺序对项目流进行操作。因此,当每一个被发出时,它都会向下传递到运算符链中,您会看到此处显示的输出。

您期望的替代输出要求链等到源停止为 all 组发出项目。假设你有 Observable.just(0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 0)。然后你会期望 (0, 3, 0), (1, 4, 4, 4, 4, 4, 4), (2) 作为你的输出组。如果你有一个无限的 4 流怎么办?您的订阅者永远不会从第一组收到 0、3..。

您可以创建所需的行为。 toList 运算符将缓存输出直到源完成,然后将 List<R> 传递给订阅者:

.subscribe(observable -> {
    System.out.println("got observable " + observable + " for key " + observable.getKey());
    observable.toList().subscribe(items -> {
        // items is a List<Integer>
        System.out.println("key " + observable.getKey() + ", items " + items);
    });
});