我可以使用从 Spring5 的 WebClient 返回的 Flux 的 block() 方法吗?
Can I use block() method of Flux returned from Spring5's WebClient?
我创建了 Spring Boot 2.0 演示应用程序,其中包含两个使用 WebClient 进行通信的应用程序。当我从 WebClient 的响应中使用 Flux 的 block() 方法时,他们经常停止通信,这让我很痛苦。由于某些原因,我想使用 List 而不是 Flux。
服务器端应用程序是这样的。它只是 returns Flux 对象。
@GetMapping
public Flux<Item> findAll() {
return Flux.fromIterable(items);
}
而客户端(或BFF端)应用程序是这样的。我从服务器获取 Flux 并通过调用 block() 方法将其转换为 List。
@GetMapping
public List<Item> findBlock() {
return webClient.get()
.retrieve()
.bodyToFlux(Item.class)
.collectList()
.block(Duration.ofSeconds(10L));
}
虽然一开始很好用,但是多次访问后findBlock()会无响应并超时。当我将 findBlock() 方法修改为 return Flux 删除 collectList() 和 block() 时,它运行良好。然后我假设 block() 方法导致了这个问题。
而且,当我将 findAll() 方法修改为 return 列表时,没有任何变化。
整个示例应用程序的源代码在这里。
https://github.com/cero-t/webclient-example
"resource"是服务端应用,"front"是客户端应用。在 运行 两个应用程序之后,当我访问 localhost:8080 时它运行良好并且我可以随时重新加载,但是当我访问 localhost:8080/block 时它似乎运行良好但是在几次重新加载之后它不会回应。
顺便说一下,当我将 "spring-boot-starter-web" 依赖项添加到 "front" 应用程序(不是资源应用程序的)pom.xml 时,这意味着我使用 tomcat,这个问题从来没有发生。是 Netty 服务器的问题吗?
任何指导将不胜感激。
首先,让我指出,仅当 items
已从内存中获取时才建议使用 Flux.fromIterable(items)
,不涉及 I/O。否则您可能会使用阻塞 API 来获取它 - 这可能会破坏您的反应式应用程序。在本例中,这是一个内存列表,所以没问题。请注意,您也可以转到 Flux.just(item1, item2, item3)
.
使用以下是最有效的:
@GetMapping("/")
public Flux<Item> findFlux() {
return webClient.get()
.retrieve()
.bodyToFlux(Item.class);
}
Item
个实例将以非常有效的方式 read/written、decoded/encoded 即时运行。
另一方面,这不是首选方式:
@GetMapping("/block")
public List<Item> findBlock() {
return webClient.get()
.retrieve()
.bodyToFlux(Item.class)
.collectList()
.block(Duration.ofSeconds(10L));
}
在这种情况下,您的前端应用程序在内存中缓冲 整个项目列表 和 collectList
,但也阻塞了为数不多的可用服务器线程之一。这可能会导致性能非常差,因为您的服务器可能会因等待该数据而被阻塞,无法同时为其他请求提供服务。
在这种特殊情况下,情况更糟,因为应用程序完全崩溃了。
查看控制台,我们可以看到以下内容:
WARN 3075 --- [ctor-http-nio-7] io.netty.util.concurrent.DefaultPromise : An exception was thrown by reactor.ipc.netty.channel.PooledClientContextHandler$$Lambda2/356589024.operationComplete()
reactor.core.Exceptions$BubblingException: java.lang.IllegalArgumentException: Channel [id: 0xab15f050, L:/127.0.0.1:59350 - R:localhost/127.0.0.1:8081] was not acquired from this ChannelPool
at reactor.core.Exceptions.bubble(Exceptions.java:154) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
这可能与应该在 0.7.4.RELEASE 中修复的 reactor-netty client connection pool issue 相关联。我不知道这方面的具体细节,但我怀疑整个连接池都被破坏了,因为没有从客户端连接中正确读取 HTTP 响应。
添加 spring-boot-starter-web
确实会让您的应用程序使用 Tomcat,但它主要将您的 Spring WebFlux 应用程序转变为 Spring MVC 应用程序(现在支持一些响应式 return 类型,但具有不同的运行时模型)。如果您希望使用 Tomcat 测试您的应用程序,您可以将 spring-boot-starter-tomcat
添加到您的 POM,这将使用 Tomcat 和 Spring WebFlux。
我创建了 Spring Boot 2.0 演示应用程序,其中包含两个使用 WebClient 进行通信的应用程序。当我从 WebClient 的响应中使用 Flux 的 block() 方法时,他们经常停止通信,这让我很痛苦。由于某些原因,我想使用 List 而不是 Flux。
服务器端应用程序是这样的。它只是 returns Flux 对象。
@GetMapping
public Flux<Item> findAll() {
return Flux.fromIterable(items);
}
而客户端(或BFF端)应用程序是这样的。我从服务器获取 Flux 并通过调用 block() 方法将其转换为 List。
@GetMapping
public List<Item> findBlock() {
return webClient.get()
.retrieve()
.bodyToFlux(Item.class)
.collectList()
.block(Duration.ofSeconds(10L));
}
虽然一开始很好用,但是多次访问后findBlock()会无响应并超时。当我将 findBlock() 方法修改为 return Flux 删除 collectList() 和 block() 时,它运行良好。然后我假设 block() 方法导致了这个问题。
而且,当我将 findAll() 方法修改为 return 列表时,没有任何变化。
整个示例应用程序的源代码在这里。
https://github.com/cero-t/webclient-example
"resource"是服务端应用,"front"是客户端应用。在 运行 两个应用程序之后,当我访问 localhost:8080 时它运行良好并且我可以随时重新加载,但是当我访问 localhost:8080/block 时它似乎运行良好但是在几次重新加载之后它不会回应。
顺便说一下,当我将 "spring-boot-starter-web" 依赖项添加到 "front" 应用程序(不是资源应用程序的)pom.xml 时,这意味着我使用 tomcat,这个问题从来没有发生。是 Netty 服务器的问题吗?
任何指导将不胜感激。
首先,让我指出,仅当 items
已从内存中获取时才建议使用 Flux.fromIterable(items)
,不涉及 I/O。否则您可能会使用阻塞 API 来获取它 - 这可能会破坏您的反应式应用程序。在本例中,这是一个内存列表,所以没问题。请注意,您也可以转到 Flux.just(item1, item2, item3)
.
使用以下是最有效的:
@GetMapping("/")
public Flux<Item> findFlux() {
return webClient.get()
.retrieve()
.bodyToFlux(Item.class);
}
Item
个实例将以非常有效的方式 read/written、decoded/encoded 即时运行。
另一方面,这不是首选方式:
@GetMapping("/block")
public List<Item> findBlock() {
return webClient.get()
.retrieve()
.bodyToFlux(Item.class)
.collectList()
.block(Duration.ofSeconds(10L));
}
在这种情况下,您的前端应用程序在内存中缓冲 整个项目列表 和 collectList
,但也阻塞了为数不多的可用服务器线程之一。这可能会导致性能非常差,因为您的服务器可能会因等待该数据而被阻塞,无法同时为其他请求提供服务。
在这种特殊情况下,情况更糟,因为应用程序完全崩溃了。 查看控制台,我们可以看到以下内容:
WARN 3075 --- [ctor-http-nio-7] io.netty.util.concurrent.DefaultPromise : An exception was thrown by reactor.ipc.netty.channel.PooledClientContextHandler$$Lambda2/356589024.operationComplete()
reactor.core.Exceptions$BubblingException: java.lang.IllegalArgumentException: Channel [id: 0xab15f050, L:/127.0.0.1:59350 - R:localhost/127.0.0.1:8081] was not acquired from this ChannelPool
at reactor.core.Exceptions.bubble(Exceptions.java:154) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]
这可能与应该在 0.7.4.RELEASE 中修复的 reactor-netty client connection pool issue 相关联。我不知道这方面的具体细节,但我怀疑整个连接池都被破坏了,因为没有从客户端连接中正确读取 HTTP 响应。
添加 spring-boot-starter-web
确实会让您的应用程序使用 Tomcat,但它主要将您的 Spring WebFlux 应用程序转变为 Spring MVC 应用程序(现在支持一些响应式 return 类型,但具有不同的运行时模型)。如果您希望使用 Tomcat 测试您的应用程序,您可以将 spring-boot-starter-tomcat
添加到您的 POM,这将使用 Tomcat 和 Spring WebFlux。