迭代一个Flux,在里面执行一个Mono,在下一步使用结果
Iterate a Flux, execute a Mono inside, use the result in the next step
我想做类似下面的事情,其中上一次调用的结果用于下一次使用 Project Reactor 对同一服务的调用。
Message current;
Message next;
for each (Step step in steps)
{
current = new Message(step, next);
next = execute(current);
}
这就是我使用 reactor 尝试做的事情:
每个 "step"(不断变化)
一个。为该步骤和最后的结果创建一条消息(从空开始)。
b。使用消息调用服务并获取结果(单声道)。
c。将最后一条消息设置为此结果,以便它可以在 1a 中使用。
取最后的结果
到目前为止,我在这方面的拙劣尝试看起来像:
return fromIterable(request.getPipeline())
.map(s -> PipelineMessage.builder()
.client(client)
.step(s.getStep())
.build())
.flatMap(z -> {
return this.pipelineService.execute(z);
})
.last()
.map(m -> ok()
.entity(m.getPayload())
.type(m.getType())
.build());
不确定您的确切要求。但我认为 reduce
函数在这里可以提供帮助。
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));
stringFlux
.reduce("empty", (next, step) -> {
String current = message(step, next);
return execute(current);
})
.map(String::toUpperCase)
.subscribe(System.out::println);
这里message和execute函数是这样的
String message(String step, String next){
return "message[" + step + ":" + next + "]";
}
String execute(String current){
return "execute(" + current + ")";
}
最终输出将是最后执行的消息。
EXECUTE(MESSAGE[C:EXECUTE(MESSAGE[B:EXECUTE(MESSAGE[A:EMPTY])])])
这里的初始步骤不能null
。相反,您可以使用空步骤对象并将其视为 null
.
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));
stringFlux
.reduce(Flux.just("empty"), (Flux<String> next, String step) -> {
Flux<String> current = message(step, next);
return current.flatMap(this::execute);
})
.flatMapMany(a -> a)
.subscribe(System.out::println);
Flux<String> message(String step, Flux<String> next){
return next.map(v -> "message(" + v + ":" + step + ")");
}
Mono<String> execute(String current){
return Mono.just("execute(" + current + ")");
}
输出:
execute(message(execute(message(execute(message(empty:a)):b)):c))
我想做类似下面的事情,其中上一次调用的结果用于下一次使用 Project Reactor 对同一服务的调用。
Message current;
Message next;
for each (Step step in steps)
{
current = new Message(step, next);
next = execute(current);
}
这就是我使用 reactor 尝试做的事情:
每个 "step"(不断变化)
一个。为该步骤和最后的结果创建一条消息(从空开始)。
b。使用消息调用服务并获取结果(单声道)。
c。将最后一条消息设置为此结果,以便它可以在 1a 中使用。
取最后的结果
到目前为止,我在这方面的拙劣尝试看起来像:
return fromIterable(request.getPipeline())
.map(s -> PipelineMessage.builder()
.client(client)
.step(s.getStep())
.build())
.flatMap(z -> {
return this.pipelineService.execute(z);
})
.last()
.map(m -> ok()
.entity(m.getPayload())
.type(m.getType())
.build());
不确定您的确切要求。但我认为 reduce
函数在这里可以提供帮助。
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));
stringFlux
.reduce("empty", (next, step) -> {
String current = message(step, next);
return execute(current);
})
.map(String::toUpperCase)
.subscribe(System.out::println);
这里message和execute函数是这样的
String message(String step, String next){
return "message[" + step + ":" + next + "]";
}
String execute(String current){
return "execute(" + current + ")";
}
最终输出将是最后执行的消息。
EXECUTE(MESSAGE[C:EXECUTE(MESSAGE[B:EXECUTE(MESSAGE[A:EMPTY])])])
这里的初始步骤不能null
。相反,您可以使用空步骤对象并将其视为 null
.
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));
stringFlux
.reduce(Flux.just("empty"), (Flux<String> next, String step) -> {
Flux<String> current = message(step, next);
return current.flatMap(this::execute);
})
.flatMapMany(a -> a)
.subscribe(System.out::println);
Flux<String> message(String step, Flux<String> next){
return next.map(v -> "message(" + v + ":" + step + ")");
}
Mono<String> execute(String current){
return Mono.just("execute(" + current + ")");
}
输出:
execute(message(execute(message(execute(message(empty:a)):b)):c))