Flux 未订阅 Spring 5 个反应堆
Flux not subscribing in Spring 5 reactor
我可能遗漏了什么,但我不知道是什么。
下面的代码什么都不做:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.subscribe();
如果我尝试阻止调用,它会正常工作:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();
奇怪的是,如果我创建一个 Flux "manually"(即不是来自 spring webClient),它工作正常:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.doOnNext(System.out::println)
.subscribe();
有人可以解释一下我做错了什么吗? .subscribe()
不是应该像上次那样在第一种情况下执行操作吗?
谢谢!
简答
subscribe
不会阻塞当前线程,这意味着应用程序主线程可以在 Flux 发出任何元素之前完成。所以要么使用 block
要么在主线程中等待。
详情
调用无参数 subscribe() just makes request(unbounded)
on Flux
without setting up any Subscriber
. It triggers operation generally in a separate thread but does not block the current thread. Most likely, your main thread ends before WebClient
received the response in that separate thread and passive side effect doOnNext(...)
发生了。
到illustrate/test即开始运行,在主线程中等待一段时间。只需将以下行放在 subscribe()
调用之后:
Thread.sleep(1000);
现在,在调整超时值后,您将能够看到打印的结果。
现在让我们为异步操作隐式发送自定义 Scheduler
并等待其所有任务完成。另外,让我们将 System.out::println
作为 subscribe(...)
参数而不是 doOnNext
传递,这样完整的代码如下所示:
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking
executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread
此示例使用略有不同的 subscribe(Consumer). Most importantly, it adds publishOn(Scheduler),它由 ExecutorService
支持。后者用于等待主线程中的终止。
当然,获得相同结果的更简单方法是使用您最初提到的 block()
:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();
最后,注意你的第三个例子 Flux.just(...)...subscribe()
- 似乎它在你的主线程终止之前很快就完成了。这是因为与发射单个 GetLocationsResponse
元素相比,发射几个 String
元素所需的时间更少(意味着写入请求+读取响应+解析为 POJO 的时间)。然而,如果你使这个 Flux
延迟元素,你会得到相同的行为再现:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back again
我可能遗漏了什么,但我不知道是什么。
下面的代码什么都不做:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.subscribe();
如果我尝试阻止调用,它会正常工作:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();
奇怪的是,如果我创建一个 Flux "manually"(即不是来自 spring webClient),它工作正常:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.doOnNext(System.out::println)
.subscribe();
有人可以解释一下我做错了什么吗? .subscribe()
不是应该像上次那样在第一种情况下执行操作吗?
谢谢!
简答
subscribe
不会阻塞当前线程,这意味着应用程序主线程可以在 Flux 发出任何元素之前完成。所以要么使用 block
要么在主线程中等待。
详情
调用无参数 subscribe() just makes request(unbounded)
on Flux
without setting up any Subscriber
. It triggers operation generally in a separate thread but does not block the current thread. Most likely, your main thread ends before WebClient
received the response in that separate thread and passive side effect doOnNext(...)
发生了。
到illustrate/test即开始运行,在主线程中等待一段时间。只需将以下行放在 subscribe()
调用之后:
Thread.sleep(1000);
现在,在调整超时值后,您将能够看到打印的结果。
现在让我们为异步操作隐式发送自定义 Scheduler
并等待其所有任务完成。另外,让我们将 System.out::println
作为 subscribe(...)
参数而不是 doOnNext
传递,这样完整的代码如下所示:
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking
executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread
此示例使用略有不同的 subscribe(Consumer). Most importantly, it adds publishOn(Scheduler),它由 ExecutorService
支持。后者用于等待主线程中的终止。
当然,获得相同结果的更简单方法是使用您最初提到的 block()
:
webClient.get().uri("/some/path/here").retrieve() .bodyToMono(GetLocationsResponse.class) .doOnNext(System.out::println) .block();
最后,注意你的第三个例子 Flux.just(...)...subscribe()
- 似乎它在你的主线程终止之前很快就完成了。这是因为与发射单个 GetLocationsResponse
元素相比,发射几个 String
元素所需的时间更少(意味着写入请求+读取响应+解析为 POJO 的时间)。然而,如果你使这个 Flux
延迟元素,你会得到相同的行为再现:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back again