我们如何迭代并打印来自 Reactor Flux 或 Mono FlatMap 或 FlatMapMany 的值?

How can we iterate and print the values from Reactor Flux or Mono FlatMap or FlatMapMany?

使用 Spring 引导学习 Reactor。

使用样本API:

https://jsonplaceholder.typicode.com/todos/1
{
  "userId": 1,
  "id": 1,
  "title": "delectus aut autem",
  "completed": false
}

想要将上面的内容映射到一个对象(定义了一个 pojo SingleUser)并打印输出。

private WebClient webClient = WebClient.create("https://jsonplaceholder.typicode.com");

private Mono<ClientResponse> responseMono = webClient.get()
          .uri("/todos/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange();

public String getResult() {
   return ">> result = " + responseMono.flatMap(res -> res.bodyToMono(String.class)).block();
}

当使用上面的..结果是:

>> result = {
  "userId": 1,
  "id": 1,
  "title": "delectus aut autem",
  "completed": false
}

如何在使用 Flux 时迭代并打印所有值,如下所示?

public Flux<SingleUser> listUsers1() {
    return webClient.get()
             .uri("/todos/1")
             .retrieve()
             .bodyToFlux(SingleUser.class);
}

public String getUsers1() {
   return ">> Get Users 1= " + listUsers1();
}

public Flux<SingleUser> listUsers2() {
   return webClient.get()
             .uri("/todos/1")
             .exchange()
             .flatMapMany(clientResponse -> clientResponse.bodyToFlux(SingleUser.class));
}

public String getUsers2() {
   return ">> Get Users 2= " + listUsers2();
}

同时使用 exchange()retrieve() 时的结果:

>> Get Users 1= MonoFlatMapMany
>> Get Users 2= MonoFlatMapMany

我们如何遍历对象并打印值?

首先请注意,您正在处理一个异步非阻塞模型,但您基本上是为了演示目的而诉诸阻塞。

处理多个值时显示从异步到同步的切换限制,因为虽然每个 Flux 都可以打印出来,但您无法控制 "iteration" (这可以在 2 个请求之间以交错的方式发生):

有一个名为 "side effects" 的 Reactor 运算符家族,当您想查看序列中的值时,例如。 log/print 他们:doOn*。您可以使用 doOnNext 来打印 Flux:

发出的每个值
listUsers1().doOnNext(u -> System.out.println("listUsers1 received " + u);
listUsers2().doOnNext(u -> System.out.println("listUsers2 received " + u);

唯一的麻烦是它没有订阅相应的印刷Flux,所以什么也不会发生。即使您只是 .subscribe() 在您的测试中,应用程序也可以立即退出,而无需等待这两个异步序列的结束。

就像你的第一个例子一样,你需要阻止。使用Flux,您可以使用.blockLast()阻塞直到通量完成。

第二个麻烦:您的 getUsersX() 方法期望能够同步检索 String 和 return 它。即使使用上面的 blockLast,这对于 Flux 也不实用:

public String getUsers1() {
       return ">> Get Users 1= " + listUsers1().doOnNext(System.out::println).blockLast();
    }

public String getUsers2() {
   return ">> Get Users 2= " + listUsers2().doOnNext(System.out::println).blockLast();
}

System.out.println(getUsers1());
System.out.println(getUsers2());

会记录如下内容:

user1A
user1B
user1C
user1D
>> Get Users 1= user1D
user2A
user2B
user2C
>> Get Users 2= user2C

注意每个请求如何打印所有值,然后是重复最后一个值的获取用户消息。此外,由于 blockLast(),第一个请求必须 运行 在触发第二个请求之前完成。

支持并行请求的打印请求的最反应方式是异步收集用户并在可用时打印列表:

listUsers1().collectList().doOnNext(l1 -> System.out.println(">> Get Users 1=" + l1).subscribe();
listUsers2().collectList().doOnNext(l2 -> System.out.println(">> Get Users 2=" + l2).subscribe();

但是在 main/test 中也有同样的警告:应用程序可能会在列表发出之前终止,因此不会打印任何内容。我们可以将两个 collectList 绑定到一个 Mono 并使用 Mono.when:

等待两个完成
Mono.when(
    listUsers1().collectList().doOnNext(l1 -> System.out.println(">> Get Users 1=" + l1),
    listUsers2().collectList().doOnNext(l2 -> System.out.println(">> Get Users 2=" + l2)
)
.block();

使用该代码,两个请求同时触发,最快完成的请求首先打印,以较快的为准。