在 Reactor 3 中将一个 Flux 拆分为多个 Flux 的最有效方式
The most efficient way to split a Flux to multiple Fluxes in Reactor 3
在 Reactor 3 中,通过模式匹配将异构通量拆分为多个通量的最有效方法是什么? (而且后续对每个flux的操作可能会有很大的不同)
例如,
Source Flux: a->b->c->a->b->c
||
vv
A Flux: a->a->a
B Flux: b->b->b
C Flux: c->c->c
我是响应式编程的新手,我想出的唯一解决方案是 share()
+filter()
,喜欢
val shared = flux.share();
shared.filter(x -> x.tag=='a').subscribe(a -> consumeA(a));
shared.filter(x -> x.tag=='b').subscribe(b -> consumeB(b));
shared.filter(x -> x.tag=='c').subscribe(c -> consumeC(c));
这是最好的解决方案,还是有更好的范例来解决这个问题?
如果组数比较少,可以使用Flux.groupBy
referenced in the project reactor docs
例如:
Flux<String> flux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2")
.groupBy(s -> s.charAt(0))
.concatMap(groupedFlux -> groupedFlux
.startWith("Group " + groupedFlux.key()));
StepVerifier.create(flux)
.expectNext("Group a", "a1", "a2")
.expectNext("Group b", "b1", "b2")
.expectNext("Group c", "c1", "c2")
.verifyComplete();
您可以使用 groupedFlux.key()
来改变对每个组执行的操作。
在 Reactor 3 中,通过模式匹配将异构通量拆分为多个通量的最有效方法是什么? (而且后续对每个flux的操作可能会有很大的不同)
例如,
Source Flux: a->b->c->a->b->c
||
vv
A Flux: a->a->a
B Flux: b->b->b
C Flux: c->c->c
我是响应式编程的新手,我想出的唯一解决方案是 share()
+filter()
,喜欢
val shared = flux.share();
shared.filter(x -> x.tag=='a').subscribe(a -> consumeA(a));
shared.filter(x -> x.tag=='b').subscribe(b -> consumeB(b));
shared.filter(x -> x.tag=='c').subscribe(c -> consumeC(c));
这是最好的解决方案,还是有更好的范例来解决这个问题?
如果组数比较少,可以使用Flux.groupBy
referenced in the project reactor docs
例如:
Flux<String> flux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2")
.groupBy(s -> s.charAt(0))
.concatMap(groupedFlux -> groupedFlux
.startWith("Group " + groupedFlux.key()));
StepVerifier.create(flux)
.expectNext("Group a", "a1", "a2")
.expectNext("Group b", "b1", "b2")
.expectNext("Group c", "c1", "c2")
.verifyComplete();
您可以使用 groupedFlux.key()
来改变对每个组执行的操作。