Flux.groupBy() 后跟 Flux#next 不会对元素进行分组
Flux.groupBy() followed by Flux#next doesn't group elements
我观察到 Flux#groupBy
的行为,我不确定这是一个错误还是来自我对 reactor 的误解。
我有一个 Flux
,其中包含我想按公共字段分组的元素,然后 return 每个组中只有一个元素。我曾尝试使用 Flux#next
但最终结果令人惊讶,因为我仍然获得了通量的所有元素。
Bar bar1 = new Bar(UUID.randomUUID())
Bar bar2 = new Bar(UUID.randomUUID())
Flux.just(new Foo(bar1), new Foo(bar1), new Foo(bar2), new Foo(bar2))
.groupBy(foo -> foo.bar.id)
.doOnNext(flux -> System.out.println("Grouped flux with key: " + flux.key()))
.flatMap(Flux::next)
.collectList()
.block() # the final list here contains 4 elements
在上面的示例中,println
被调用了 4 次,密钥被重复了两次:
Grouped flux with key: 8a7b9135-97ce-48d7-a084-97f1bd3b6648
Grouped flux with key: 8a7b9135-97ce-48d7-a084-97f1bd3b6648
Grouped flux with key: 4e174753-0e25-4659-87f9-524f8b0edaf8
Grouped flux with key: 4e174753-0e25-4659-87f9-524f8b0edaf8
将 flatMap
中的代码替换为使用 Flux#collectList
而不是 Flux#next
已经解决了我的问题:
Flux.just(new Foo(bar1), new Foo(bar1), new Foo(bar2), new Foo(bar2))
.groupBy(foo -> foo.bar.id)
.doOnNext(flux -> System.out.println("Grouped flux with key: " + flux.key()))
.flatMap(flux -> flux.collectList().map(list -> list.get(0)))
.collectList()
.block() # the final list here contains 2 elements
我希望这两个代码段的工作方式相同,但显然出于某种原因它们不能。
对这种行为的解释是什么?
谢谢!
解释是next()
取消了open group,所以在后面的元素groupBy
上又看到了key1,但是那个key没有open group。结果,它重新创建了一个组,并将第二次出现的 bar1
放入第二组。
对于 collectList
组没有这样的取消,因此 key1 和 key2 组都保持打开状态并且 groupBy
能够将传入的 Foo
分派到两个组中。
我观察到 Flux#groupBy
的行为,我不确定这是一个错误还是来自我对 reactor 的误解。
我有一个 Flux
,其中包含我想按公共字段分组的元素,然后 return 每个组中只有一个元素。我曾尝试使用 Flux#next
但最终结果令人惊讶,因为我仍然获得了通量的所有元素。
Bar bar1 = new Bar(UUID.randomUUID())
Bar bar2 = new Bar(UUID.randomUUID())
Flux.just(new Foo(bar1), new Foo(bar1), new Foo(bar2), new Foo(bar2))
.groupBy(foo -> foo.bar.id)
.doOnNext(flux -> System.out.println("Grouped flux with key: " + flux.key()))
.flatMap(Flux::next)
.collectList()
.block() # the final list here contains 4 elements
在上面的示例中,println
被调用了 4 次,密钥被重复了两次:
Grouped flux with key: 8a7b9135-97ce-48d7-a084-97f1bd3b6648
Grouped flux with key: 8a7b9135-97ce-48d7-a084-97f1bd3b6648
Grouped flux with key: 4e174753-0e25-4659-87f9-524f8b0edaf8
Grouped flux with key: 4e174753-0e25-4659-87f9-524f8b0edaf8
将 flatMap
中的代码替换为使用 Flux#collectList
而不是 Flux#next
已经解决了我的问题:
Flux.just(new Foo(bar1), new Foo(bar1), new Foo(bar2), new Foo(bar2))
.groupBy(foo -> foo.bar.id)
.doOnNext(flux -> System.out.println("Grouped flux with key: " + flux.key()))
.flatMap(flux -> flux.collectList().map(list -> list.get(0)))
.collectList()
.block() # the final list here contains 2 elements
我希望这两个代码段的工作方式相同,但显然出于某种原因它们不能。 对这种行为的解释是什么?
谢谢!
解释是next()
取消了open group,所以在后面的元素groupBy
上又看到了key1,但是那个key没有open group。结果,它重新创建了一个组,并将第二次出现的 bar1
放入第二组。
对于 collectList
组没有这样的取消,因此 key1 和 key2 组都保持打开状态并且 groupBy
能够将传入的 Foo
分派到两个组中。