通过键将GroupFlux收集到Hashmap中

Collecting GroupFlux into a Hashmap by the keys

我最近开始使用 Project Reactor,我想出了一个我似乎无法理解的场景。

基本上我想对某个流进行分组,然后获得哈希映射,例如grouping key -> List of grouped values。我一直在玩 API 但我得到的最远的是获取值、键或计数,但不是我想要的数据结构。例如,这将是获取值的代码:

var elements = new ArrayList<Integer>();

Flux.just(-1, -2, -3, 1, 2, 3)
        .groupBy(val -> val.compareTo(0))
        .flatMap(Flux::collectList)
        .subscribe(elements::addAll);

我想通过的测试如下:

@Test
public void groupBy() {
    var elements = new HashMap<Integer, List<Integer>>();

    Flux.just(-1, -2, -3, 1, 2, 3)
            .groupBy(val -> val.compareTo(0))
            // Do something here ...
            .subscribe(...);

    assertThat(elements).containsKeys(-1, 1);
    assertThat(elements.get(-1)).containsExactly(-1, -2, -3);
    assertThat(elements.get(1)).containsExactly(1, 2, 3);
}

我怎样才能实现后者?

您是否考虑过使用 Flux#connect?它接受 Collector,与 Stream 使用的类型相同。还有Flux#collectMap.

此外,如果您需要这样的地图,您可以使用Flux#scan

groupBy 当您需要按键 "route" 您的信号并有一个 Flux 按键时很有用,但它并非设计用于创建数据集合。