Concat Flux 及其计数(或另一个 reduce 函数)
Concat Flux with its count (or another reduce function)
是否有可能继续(连接)带有一些减少功能的 Flux,例如传递元素的数量?
也就是比如转一个
Flux.fromStream(Stream.of("a", "b", "c")) //note source flux could be read only once
到将在
中评估的 Flux
"a", "b", "c", "3"
由于您的通量只能读取一次,因此我不得不采取以下解决方法:
我这样定义我的映射器:
public class MyMapper implements Function<String, String> {
private AtomicInteger atomicInteger;
public MyMapper(AtomicInteger atomicInteger) {
this.atomicInteger = atomicInteger;
}
@Override
public String apply(String s) {
atomicInteger.incrementAndGet();
return s;
}
}
最后主要代码如下所示:
public static void main(String[] args) {
AtomicInteger count = new AtomicInteger();
MyMapper mapper = new MyMapper(count);
Flux<String> f = Flux.fromStream(Stream.of("a", "b", "c"));
Flux<String> output = f.map(mapper).concatWith(Mono.fromCallable(() -> Integer.toString(count.get())));
output.doOnNext(System.out::println).blockLast();
}
打印出来:
a
b
c
3
您的问题不需要“减少”功能。
您可以使用以下方法简单地实现此目的:
public Flux<String> concat(Flux<String> sourceFlux){
Mono<String> m = sourceFlux.collectList().map(List::size).map(count -> Integer.toString(count));
return sourceFlux.concatWith(m);
}
例如:
public static void main(String[] args) {
Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));
Flux<String> fork = source.publish().autoConnect(2);
ConnectableFlux<Long> counter = fork.count().flux().replay(1);
counter.connect();
Flux<String> result = fork.concatWith(counter.map(Object::toString));
result.subscribe(System.out::println);
}
备注:
.publish()
用于向多个订阅者广播相同的序列。 autoConnect(2)
防止提前订阅和使用序列。预计正好有 2 个订阅者。
- 众所周知,
Flux.concat()
延迟订阅:第一个序列完成后会创建第二个订阅,但在您的情况下,您需要同时为同一源序列设置两个活动订阅者。这可以通过创建 ConnectableFlux
并显式调用其 connect()
方法来解决。 (没有 ConnectableMono
class,因此计数器从 Mono
转换为 Flux
)。该值是提前获取并缓存的,Flux.concat()
次订阅后会重放。
关于 Flux.concat()
的问题及其懒惰:我刚刚注意到 Flux.mergeSequential()
被记录为热切订阅的 concat
的替代品。
因此我的代码可以通过使用 mergeSequential()
而不是 concat()
来简化:
public static void main(String[] args) {
Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));
Flux<String> fork = source.publish().autoConnect(2);
Mono<Long> counter = fork.count();
Flux<String> result = Flux.mergeSequential(fork, counter.map(Object::toString));
result.subscribe(System.out::println);
}
是否有可能继续(连接)带有一些减少功能的 Flux,例如传递元素的数量?
也就是比如转一个
Flux.fromStream(Stream.of("a", "b", "c")) //note source flux could be read only once
到将在
中评估的 Flux"a", "b", "c", "3"
由于您的通量只能读取一次,因此我不得不采取以下解决方法:
我这样定义我的映射器:
public class MyMapper implements Function<String, String> {
private AtomicInteger atomicInteger;
public MyMapper(AtomicInteger atomicInteger) {
this.atomicInteger = atomicInteger;
}
@Override
public String apply(String s) {
atomicInteger.incrementAndGet();
return s;
}
}
最后主要代码如下所示:
public static void main(String[] args) {
AtomicInteger count = new AtomicInteger();
MyMapper mapper = new MyMapper(count);
Flux<String> f = Flux.fromStream(Stream.of("a", "b", "c"));
Flux<String> output = f.map(mapper).concatWith(Mono.fromCallable(() -> Integer.toString(count.get())));
output.doOnNext(System.out::println).blockLast();
}
打印出来:
a
b
c
3
您的问题不需要“减少”功能。 您可以使用以下方法简单地实现此目的:
public Flux<String> concat(Flux<String> sourceFlux){
Mono<String> m = sourceFlux.collectList().map(List::size).map(count -> Integer.toString(count));
return sourceFlux.concatWith(m);
}
例如:
public static void main(String[] args) {
Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));
Flux<String> fork = source.publish().autoConnect(2);
ConnectableFlux<Long> counter = fork.count().flux().replay(1);
counter.connect();
Flux<String> result = fork.concatWith(counter.map(Object::toString));
result.subscribe(System.out::println);
}
备注:
.publish()
用于向多个订阅者广播相同的序列。autoConnect(2)
防止提前订阅和使用序列。预计正好有 2 个订阅者。- 众所周知,
Flux.concat()
延迟订阅:第一个序列完成后会创建第二个订阅,但在您的情况下,您需要同时为同一源序列设置两个活动订阅者。这可以通过创建ConnectableFlux
并显式调用其connect()
方法来解决。 (没有ConnectableMono
class,因此计数器从Mono
转换为Flux
)。该值是提前获取并缓存的,Flux.concat()
次订阅后会重放。
关于 Flux.concat()
的问题及其懒惰:我刚刚注意到 Flux.mergeSequential()
被记录为热切订阅的 concat
的替代品。
因此我的代码可以通过使用 mergeSequential()
而不是 concat()
来简化:
public static void main(String[] args) {
Flux<String> source = Flux.fromStream(Stream.of("a", "b", "c"));
Flux<String> fork = source.publish().autoConnect(2);
Mono<Long> counter = fork.count();
Flux<String> result = Flux.mergeSequential(fork, counter.map(Object::toString));
result.subscribe(System.out::println);
}