迭代一个Flux,在里面执行一个Mono,在下一步使用结果

Iterate a Flux, execute a Mono inside, use the result in the next step

我想做类似下面的事情,其中​​上一次调用的结果用于下一次使用 Project Reactor 对同一服务的调用。

Message current;
Message next;
for each (Step step in steps)
{
    current = new Message(step, next);
    next = execute(current);
}

这就是我使用 reactor 尝试做的事情:

  1. 每个 "step"(不断变化)

    一个。为该步骤和最后的结果创建一条消息(从空开始)。

    b。使用消息调用服务并获取结果(单声道)。

    c。将最后一条消息设置为此结果,以便它可以在 1a 中使用。

  2. 取最后的结果

到目前为止,我在这方面的拙劣尝试看起来像:

return fromIterable(request.getPipeline())
    .map(s -> PipelineMessage.builder()
        .client(client)
        .step(s.getStep())
        .build())
    .flatMap(z -> {
        return this.pipelineService.execute(z);
    })
    .last()
    .map(m -> ok()
        .entity(m.getPayload())
        .type(m.getType())
        .build());

不确定您的确切要求。但我认为 reduce 函数在这里可以提供帮助。

Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));

stringFlux
        .reduce("empty", (next, step) -> {
            String current = message(step, next);
            return execute(current);
        })
        .map(String::toUpperCase)
        .subscribe(System.out::println);

这里message和execute函数是这样的

String message(String step, String next){
    return "message[" + step + ":" + next + "]";
}

String execute(String current){
    return "execute(" + current + ")";
}

最终输出将是最后执行的消息。

EXECUTE(MESSAGE[C:EXECUTE(MESSAGE[B:EXECUTE(MESSAGE[A:EMPTY])])])

这里的初始步骤不能null。相反,您可以使用空步骤对象并将其视为 null.


Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));

stringFlux
        .reduce(Flux.just("empty"), (Flux<String> next, String step) -> {
            Flux<String> current = message(step, next);
            return current.flatMap(this::execute);
        })
        .flatMapMany(a -> a)
        .subscribe(System.out::println);


Flux<String> message(String step, Flux<String> next){
    return next.map(v -> "message(" + v + ":" + step + ")");
}

Mono<String> execute(String current){
    return Mono.just("execute(" + current + ")");
}

输出:

execute(message(execute(message(execute(message(empty:a)):b)):c))