Flux.groupBy() 后跟 Flux#next 不会对元素进行分组

Flux.groupBy() followed by Flux#next doesn't group elements

我观察到 Flux#groupBy 的行为,我不确定这是一个错误还是来自我对 reactor 的误解。

我有一个 Flux,其中包含我想按公共字段分组的元素,然后 return 每个组中只有一个元素。我曾尝试使用 Flux#next 但最终结果令人惊讶,因为我仍然获得了通量的所有元素。

Bar bar1 = new Bar(UUID.randomUUID())
Bar bar2 = new Bar(UUID.randomUUID())

Flux.just(new Foo(bar1), new Foo(bar1), new Foo(bar2), new Foo(bar2))
  .groupBy(foo -> foo.bar.id)
  .doOnNext(flux -> System.out.println("Grouped flux with key: " + flux.key()))
  .flatMap(Flux::next)
  .collectList()
  .block() # the final list here contains 4 elements

在上面的示例中,println 被调用了 4 次,密钥被重复了两次:

Grouped flux with key: 8a7b9135-97ce-48d7-a084-97f1bd3b6648
Grouped flux with key: 8a7b9135-97ce-48d7-a084-97f1bd3b6648
Grouped flux with key: 4e174753-0e25-4659-87f9-524f8b0edaf8
Grouped flux with key: 4e174753-0e25-4659-87f9-524f8b0edaf8

flatMap 中的代码替换为使用 Flux#collectList 而不是 Flux#next 已经解决了我的问题:

Flux.just(new Foo(bar1), new Foo(bar1), new Foo(bar2), new Foo(bar2))
  .groupBy(foo -> foo.bar.id)
  .doOnNext(flux -> System.out.println("Grouped flux with key: " + flux.key()))
  .flatMap(flux -> flux.collectList().map(list -> list.get(0)))
  .collectList()
  .block() # the final list here contains 2 elements

我希望这两个代码段的工作方式相同,但显然出于某种原因它们不能。 对这种行为的解释是什么?

谢谢!

解释是next()取消了open group,所以在后面的元素groupBy上又看到了key1,但是那个key没有open group。结果,它重新创建了一个组,并将第二次出现的 bar1 放入第二组。

对于 collectList 组没有这样的取消,因此 key1 和 key2 组都保持打开状态并且 groupBy 能够将传入的 Foo 分派到两个组中。