在错误状态代码上关闭 Reactor Netty 连接

Closing Reactor Netty connection on error status codes

我正在通过 Spring Webflux 框架使用 Reactor Netty,以便将数据发送到远程内容分发网络。当客户端请求完成后,默认的 Reactor Netty 行为是保持连接存活并将其释放回底层连接池。

一些内容分发网络建议在某些类型的状态代码(例如 500 内部服务器错误)上重新解析 DNS。为此,我添加了一个自定义 Netty DnsNameResolverDnsCache,但我还需要关闭连接,否则它将被释放回池中并且 DNS 将不会重新解析。

如何在出现错误状态代码时关闭连接?

到目前为止,我通过将 ConnectionObserver 添加到 Reactor Netty 的 TcpClient:

来提出以下解决方法
TcpClient tcpClient = TcpClient.create()
        .observe((connection, newState) -> {
            if (newState == State.RELEASED && connection instanceof HttpClientResponse) {
                HttpResponseStatus status = ((HttpClientResponse) connection).status();
                if (status.codeClass() != HttpStatusClass.SUCCESS) {
                    connection.dispose();
                }
            }
        });

即,如果连接已被释放(即放回连接池中)并且释放是由具有不成功状态代码的 HTTP 客户端响应引起的,则关闭连接。

这种方法感觉很笨拙。如果连接在错误状态代码后被释放,并且观察者正在关闭该连接,那么新请求是否可以并行获取相同的连接?框架是否在内部优雅地处理事情,或者这是使上述方法无效的竞争条件?

在此先感谢您的帮助!

最好用doOnResponse or doAfterResponseSuccess 看使用情况哪个更合适。

但是等待 RELEASED 应该不是问题

If the connection is released after an error status code, and the observer is closing that connection, can a new request acquire the same connection in parallel? Does the framework internally handle things gracefully or is this a race condition that invalidates the above approach?

连接池默认以先进先出的租用策略运行,所以如果池中有空闲连接,您将不会获得相同的连接,如果将连接池切换为后进先出的租用策略,则不会出现这种情况。获取时,会检查每个连接是否活跃,只有活跃的连接才会被提供使用。

更新:

您也可以尝试下面仅使用 WebClient API 而不是 Reactor Netty API 的方法:

return this.webClient
           .get()
           .uri("/500")
           .retrieve()
           .onStatus(status -> status.equals(HttpStatus.INTERNAL_SERVER_ERROR), clientResponse -> {
                clientResponse.bodyToFlux(DataBuffer.class)
                              .subscribe(new BaseSubscriber<DataBuffer>() {
                                  @Override
                                  protected void hookOnSubscribe(Subscription subscription) {
                                      subscription.cancel();
                                  }
                              });
                return Mono.error(new IllegalStateException("..."));
           })
           .bodyToMono(String.class);