Spring 多个客户端的 webflux 超时

Spring webflux timeout with multiple clients

我有一个与其他几个服务交互的服务。所以我为他们创建了单独的网络客户端(因为不同的基本路径)。我已经根据 https://docs.spring.io/spring/docs/5.1.6.RELEASE/spring-framework-reference/web-reactive.html#webflux-client-builder-reactor-timeout 分别为它们设置了超时,但这似乎并没有有效地工作。对于其中一项服务,尝试将 ReadTimeout 降低到 2 秒,但该服务似乎没有超时(使用 logging.level.org.springframework.web.reactive=debug 的日志显示请求大约需要 6-7 秒才能完成)。

我正在使用 spring5.1 和 netty 0.8,我正在使用 webclient 的阻塞,因为我们还没有完全使用 webflux。我试着稍微调整一下每个调用的超时,似乎有些调用确实响应了超时,而另一些则没有(更多细节与下面的代码一起)

我如何初始化网络客户端 -

@Bean
public WebClient serviceAWebClient(@Value("${serviceA.basepath}") String basePath,
                                          @Value("${serviceA.connection.timeout}") int connectionTimeout,
                                          @Value("${serviceA.read.timeout}") int readTimeout,
                                          @Value("${serviceA.write.timeout}") int writeTimeout) {

    return getWebClientWithTimeout(basePath, connectionTimeout, readTimeout, writeTimeout);
}

@Bean
public WebClient serviceBWebClient(@Value("${serviceB.basepath}") String basePath,
                                           @Value("${serviceB.connection.timeout}") int connectionTimeout,
                                           @Value("${serviceB.read.timeout}") int readTimeout,
                                           @Value("${serviceB.write.timeout}") int writeTimeout) {

    return getWebClientWithTimeout(basePath, connectionTimeout, readTimeout, writeTimeout);
}

@Bean
public WebClient serviceCWebClient(@Value("${serviceC.basepath}") String basePath,
                                           @Value("${serviceC.connection.timeout}") int connectionTimeout,
                                           @Value("${serviceC.read.timeout}") int readTimeout,
                                           @Value("${serviceC.write.timeout}") int writeTimeout) {

    return getWebClientWithTimeout(basePath, connectionTimeout, readTimeout, writeTimeout);
}

private WebClient getWebClientWithTimeout(String basePath,
                                          int connectionTimeout,
                                          int readTimeout,
                                          int writeTimeout) {


    TcpClient tcpClient = TcpClient.create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout)
            .doOnConnected(connection ->
                    connection.addHandlerLast(new ReadTimeoutHandler(readTimeout))
                            .addHandlerLast(new WriteTimeoutHandler(writeTimeout)));

    return WebClient.builder().baseUrl(basePath)
            .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient))).build();

我基本上是如何使用它的(每个网络客户端都有包装器 类)-

Mono<ResponseA> serviceACallMono = ..;
Mono<ResponseB> serviceBCallMono = ..;
Mono.zip(serviceACallMono,serviceBCallMono,
(serviceAResponse, serviceBResponse) -> serviceC.getImportantData(serviceAResponse,serviceBResponse))
.flatMap(Function.identity)
.block();

所以在上面,我注意到了以下内容 -

如果我降低 serviceA ReadTimeout ,我会收到超时错误。

如果我降低 serviceB ReadTimeout ,我会收到超时错误。

如果我降低 serviceC ReadTimeout,它不会响应降低 ReadTimeout。它只是继续工作,直到它得到响应。

那么,我是不是漏掉了什么?我的印象是这些超时应该适用于所有场景。如果我可以添加更多内容,请告诉我。

编辑:更新,所以我可以用更简单的方式重现问题。 所以,对于类似 -

return serviceACallMono
                .flatMap(notUsed -> serviceBCallMono);

serviceACallMono 的超时是有效的,但是无论你为 serviceB 降低多少它都不会超时。

如果你只是翻转顺序 -

return serviceBCallMono
                .flatMap(notUsed -> serviceACallMono);

现在 serviceB 的超时已生效,但 serviceA 的超时未生效。

我在观察此编辑中的行为时也将服务更新为 return Mono。

编辑 2: 这基本上就是 ServiceC#getImportantData 中发生的事情 -

@Override
    public Mono<ServiceCResponse> getImportantData(ServiceAResponse requestA,
                                                   ServiceBResponse requestB) {

        return serviceCWebClient.post()
                .uri(GET_IMPORTANT_DATA_PATH, requestB.getAccountId())
                .body(BodyInserters.fromObject(formRequest(requestA)))
                .retrieve()
                .bodyToMono(ServiceC.class);
    }

formRequest是一个简单的POJO转换方法。

我正在使用 spring-boot 起始父级来拉取各种 spring 依赖项。让它从版本 2.1.2 升级到 2.1.4 似乎可以解决问题。