Spring Webflux 在响应式堆栈中使用阻塞 HttpClient
Spring Webflux using a blocking HttpClient in a Reactive Stack
我目前正在一个构建微服务的项目中,并且正在尝试从更传统的 Spring Boot RestClient
转移到使用 Netty 和 WebClient
作为 HTTP 客户端的 Reactive Stack为了连接到后端系统。
这对于使用 REST API 的后端来说进展顺利,但是我在实现 WebClient
连接到 SOAP 后端和 Oracle 数据库的服务时仍然遇到一些困难,这些服务仍然使用传统的 JDBC。
我设法在网上找到了一些关于 JDBC 调用的解决方法,这些调用使用并行调度程序来发布阻塞 JDBC 调用的结果:
//the method that is called by @Service
@Override
public Mono<TransactionManagerModel> checkTransaction(String transactionId, String channel, String msisdn) {
return asyncCallable(() -> checkTransactionDB(transactionId, channel, msisdn))
.onErrorResume(error -> Mono.error(error));
}
...
//the actual JDBC call
private TransactionManagerModel checkTransactionDB(String transactionId, String channel, String msisdn) {
...
List<TransactionManagerModel> result =
jdbcTemplate.query(CHECK_TRANSACTION, paramMap, new BeanPropertyRowMapper<>(TransactionManagerModel.class));
...
}
//Generic async callable
private <T> Mono<T> asyncCallable(Callable<T> callable) {
return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel()).publishOn(transactionManagerJdbcScheduler);
}
而且我认为这很有效。
虽然对于 SOAP 调用,我所做的是将 SOAP 调用封装在 Mono
中,而 SOAP 调用本身使用 CloseableHttpClient
,这显然是一个阻塞的 HTTP 客户端。
//The method that is being 'reactive'
public Mono<OfferRs> addOffer(String transactionId, String channel, String serviceId, OfferRq request) {
...
OfferRs result = adapter.addOffer(transactionId, channel, generateRequest(request));
...
}
//The SOAP adapter that uses blocking HTTP Client
public OfferRs addOffer(String transactionId, String channel, JAXBElement<OfferRq> request) {
...
response = (OfferRs) getWebServiceTemplate().marshalSendAndReceive(url, request, webServiceMessage -> {
try {
SoapHeader soapHeader = ((SoapMessage) webServiceMessage).getSoapHeader();
ObjectFactory headerFactory = new ObjectFactory();
AuthenticationHeader authHeader = headerFactory.createAuthenticationHeader();
authHeader.setUserName(username);
authHeader.setPassWord(password);
JAXBContext headerContext = JAXBContext.newInstance(AuthenticationHeader.class);
Marshaller marshaller = headerContext.createMarshaller();
marshaller.marshal(authHeader, soapHeader.getResult());
} catch (Exception ex) {
log.error("Failed to marshall SOAP Header!", ex);
}
});
return response;
...
}
我的问题是:这个 SOAP 调用的实现是否足够“反应性”以至于我不必担心某些调用在微服务的某些部分被阻止?我已经实现了反应式堆栈 - 显式调用 block()
将抛出异常,因为如果使用 Netty 则不允许这样做。
或者我是否也应该在 SOAP 调用中调整并行 Schedulers
的使用?
经过一些讨论我会写一个答案。
Reactor 文档指出 you should place blocking calls on their own schedulers。这基本上是为了保持 reactor 的非阻塞部分继续运行,如果阻塞中出现某些内容,则 reactor 将回退到传统的 servlet 行为,这意味着为每个请求分配一个线程。
Reactor 有关于 schedulers 它们的类型等的非常好的文档
但短:
订阅
当有人订阅时,reactor 会进入一个叫做 assembly phase
的地方,这意味着它基本上会从订阅点开始向后向上游调用运算符,直到它找到数据生产者(例如数据库,或者其他服务等)。如果它在这个阶段的某处找到一个 onSubscribe
-operator,它将把整个链放在它自己定义的 Scheduler
上。所以要知道的一件好事是 onSubscribe
的位置并不重要,只要在 assembly phase
期间找到它,整个链就会受到影响。
示例用法可以是:
我们有对数据库的阻塞调用、使用阻塞 rest 客户端的缓慢调用、在阻塞庄园中从系统读取文件等。
发布
如果在 assembly phase
期间链中某处有 onPublish
链将知道它放置的位置,链将在该特定点从默认调度程序切换到指定调度程序。所以 onPublish
位置确实很重要。因为它会在放置的位置切换。这个运算符更多的是控制你想在代码中的特定点在特定的调度程序上放置一些东西。
示例用法可以是:
您正在特定点进行一些繁重的阻塞 cpu 计算,您可以切换到 Scheduler.parallell()
这将保证所有计算都将放在单独的核心上做繁重的 cpu 工作,完成后您可以切换回默认调度程序。
上面的例子
你的 soap 调用应该单独放置 Scheduler
如果它们阻塞,我认为 onSubscribe
就足够了,使用 Schedulers.elasticBound()
就可以得到传统的小服务程序行为。如果您害怕在同一个调度程序上进行所有阻塞调用,您可以在 asyncCallable
函数中传入 Scheduler
并拆分调用以使用不同的 Schedulers
.
我目前正在一个构建微服务的项目中,并且正在尝试从更传统的 Spring Boot RestClient
转移到使用 Netty 和 WebClient
作为 HTTP 客户端的 Reactive Stack为了连接到后端系统。
这对于使用 REST API 的后端来说进展顺利,但是我在实现 WebClient
连接到 SOAP 后端和 Oracle 数据库的服务时仍然遇到一些困难,这些服务仍然使用传统的 JDBC。
我设法在网上找到了一些关于 JDBC 调用的解决方法,这些调用使用并行调度程序来发布阻塞 JDBC 调用的结果:
//the method that is called by @Service
@Override
public Mono<TransactionManagerModel> checkTransaction(String transactionId, String channel, String msisdn) {
return asyncCallable(() -> checkTransactionDB(transactionId, channel, msisdn))
.onErrorResume(error -> Mono.error(error));
}
...
//the actual JDBC call
private TransactionManagerModel checkTransactionDB(String transactionId, String channel, String msisdn) {
...
List<TransactionManagerModel> result =
jdbcTemplate.query(CHECK_TRANSACTION, paramMap, new BeanPropertyRowMapper<>(TransactionManagerModel.class));
...
}
//Generic async callable
private <T> Mono<T> asyncCallable(Callable<T> callable) {
return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel()).publishOn(transactionManagerJdbcScheduler);
}
而且我认为这很有效。
虽然对于 SOAP 调用,我所做的是将 SOAP 调用封装在 Mono
中,而 SOAP 调用本身使用 CloseableHttpClient
,这显然是一个阻塞的 HTTP 客户端。
//The method that is being 'reactive'
public Mono<OfferRs> addOffer(String transactionId, String channel, String serviceId, OfferRq request) {
...
OfferRs result = adapter.addOffer(transactionId, channel, generateRequest(request));
...
}
//The SOAP adapter that uses blocking HTTP Client
public OfferRs addOffer(String transactionId, String channel, JAXBElement<OfferRq> request) {
...
response = (OfferRs) getWebServiceTemplate().marshalSendAndReceive(url, request, webServiceMessage -> {
try {
SoapHeader soapHeader = ((SoapMessage) webServiceMessage).getSoapHeader();
ObjectFactory headerFactory = new ObjectFactory();
AuthenticationHeader authHeader = headerFactory.createAuthenticationHeader();
authHeader.setUserName(username);
authHeader.setPassWord(password);
JAXBContext headerContext = JAXBContext.newInstance(AuthenticationHeader.class);
Marshaller marshaller = headerContext.createMarshaller();
marshaller.marshal(authHeader, soapHeader.getResult());
} catch (Exception ex) {
log.error("Failed to marshall SOAP Header!", ex);
}
});
return response;
...
}
我的问题是:这个 SOAP 调用的实现是否足够“反应性”以至于我不必担心某些调用在微服务的某些部分被阻止?我已经实现了反应式堆栈 - 显式调用 block()
将抛出异常,因为如果使用 Netty 则不允许这样做。
或者我是否也应该在 SOAP 调用中调整并行 Schedulers
的使用?
经过一些讨论我会写一个答案。
Reactor 文档指出 you should place blocking calls on their own schedulers。这基本上是为了保持 reactor 的非阻塞部分继续运行,如果阻塞中出现某些内容,则 reactor 将回退到传统的 servlet 行为,这意味着为每个请求分配一个线程。
Reactor 有关于 schedulers 它们的类型等的非常好的文档
但短:
订阅
当有人订阅时,reactor 会进入一个叫做 assembly phase
的地方,这意味着它基本上会从订阅点开始向后向上游调用运算符,直到它找到数据生产者(例如数据库,或者其他服务等)。如果它在这个阶段的某处找到一个 onSubscribe
-operator,它将把整个链放在它自己定义的 Scheduler
上。所以要知道的一件好事是 onSubscribe
的位置并不重要,只要在 assembly phase
期间找到它,整个链就会受到影响。
示例用法可以是:
我们有对数据库的阻塞调用、使用阻塞 rest 客户端的缓慢调用、在阻塞庄园中从系统读取文件等。
发布
如果在 assembly phase
期间链中某处有 onPublish
链将知道它放置的位置,链将在该特定点从默认调度程序切换到指定调度程序。所以 onPublish
位置确实很重要。因为它会在放置的位置切换。这个运算符更多的是控制你想在代码中的特定点在特定的调度程序上放置一些东西。
示例用法可以是:
您正在特定点进行一些繁重的阻塞 cpu 计算,您可以切换到 Scheduler.parallell()
这将保证所有计算都将放在单独的核心上做繁重的 cpu 工作,完成后您可以切换回默认调度程序。
上面的例子
你的 soap 调用应该单独放置 Scheduler
如果它们阻塞,我认为 onSubscribe
就足够了,使用 Schedulers.elasticBound()
就可以得到传统的小服务程序行为。如果您害怕在同一个调度程序上进行所有阻塞调用,您可以在 asyncCallable
函数中传入 Scheduler
并拆分调用以使用不同的 Schedulers
.