在错误状态代码上关闭 Reactor Netty 连接
Closing Reactor Netty connection on error status codes
我正在通过 Spring Webflux 框架使用 Reactor Netty,以便将数据发送到远程内容分发网络。当客户端请求完成后,默认的 Reactor Netty 行为是保持连接存活并将其释放回底层连接池。
一些内容分发网络建议在某些类型的状态代码(例如 500 内部服务器错误)上重新解析 DNS。为此,我添加了一个自定义 Netty DnsNameResolver
和 DnsCache
,但我还需要关闭连接,否则它将被释放回池中并且 DNS 将不会重新解析。
如何在出现错误状态代码时关闭连接?
到目前为止,我通过将 ConnectionObserver
添加到 Reactor Netty 的 TcpClient
:
来提出以下解决方法
TcpClient tcpClient = TcpClient.create()
.observe((connection, newState) -> {
if (newState == State.RELEASED && connection instanceof HttpClientResponse) {
HttpResponseStatus status = ((HttpClientResponse) connection).status();
if (status.codeClass() != HttpStatusClass.SUCCESS) {
connection.dispose();
}
}
});
即,如果连接已被释放(即放回连接池中)并且释放是由具有不成功状态代码的 HTTP 客户端响应引起的,则关闭连接。
这种方法感觉很笨拙。如果连接在错误状态代码后被释放,并且观察者正在关闭该连接,那么新请求是否可以并行获取相同的连接?框架是否在内部优雅地处理事情,或者这是使上述方法无效的竞争条件?
在此先感谢您的帮助!
最好用doOnResponse or doAfterResponseSuccess 看使用情况哪个更合适。
但是等待 RELEASED 应该不是问题
If the connection is released after an error status code, and the observer is closing that connection, can a new request acquire the same connection in parallel? Does the framework internally handle things gracefully or is this a race condition that invalidates the above approach?
连接池默认以先进先出的租用策略运行,所以如果池中有空闲连接,您将不会获得相同的连接,如果将连接池切换为后进先出的租用策略,则不会出现这种情况。获取时,会检查每个连接是否活跃,只有活跃的连接才会被提供使用。
更新:
您也可以尝试下面仅使用 WebClient API 而不是 Reactor Netty API 的方法:
return this.webClient
.get()
.uri("/500")
.retrieve()
.onStatus(status -> status.equals(HttpStatus.INTERNAL_SERVER_ERROR), clientResponse -> {
clientResponse.bodyToFlux(DataBuffer.class)
.subscribe(new BaseSubscriber<DataBuffer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.cancel();
}
});
return Mono.error(new IllegalStateException("..."));
})
.bodyToMono(String.class);
我正在通过 Spring Webflux 框架使用 Reactor Netty,以便将数据发送到远程内容分发网络。当客户端请求完成后,默认的 Reactor Netty 行为是保持连接存活并将其释放回底层连接池。
一些内容分发网络建议在某些类型的状态代码(例如 500 内部服务器错误)上重新解析 DNS。为此,我添加了一个自定义 Netty DnsNameResolver
和 DnsCache
,但我还需要关闭连接,否则它将被释放回池中并且 DNS 将不会重新解析。
如何在出现错误状态代码时关闭连接?
到目前为止,我通过将 ConnectionObserver
添加到 Reactor Netty 的 TcpClient
:
TcpClient tcpClient = TcpClient.create()
.observe((connection, newState) -> {
if (newState == State.RELEASED && connection instanceof HttpClientResponse) {
HttpResponseStatus status = ((HttpClientResponse) connection).status();
if (status.codeClass() != HttpStatusClass.SUCCESS) {
connection.dispose();
}
}
});
即,如果连接已被释放(即放回连接池中)并且释放是由具有不成功状态代码的 HTTP 客户端响应引起的,则关闭连接。
这种方法感觉很笨拙。如果连接在错误状态代码后被释放,并且观察者正在关闭该连接,那么新请求是否可以并行获取相同的连接?框架是否在内部优雅地处理事情,或者这是使上述方法无效的竞争条件?
在此先感谢您的帮助!
最好用doOnResponse or doAfterResponseSuccess 看使用情况哪个更合适。
但是等待 RELEASED 应该不是问题
If the connection is released after an error status code, and the observer is closing that connection, can a new request acquire the same connection in parallel? Does the framework internally handle things gracefully or is this a race condition that invalidates the above approach?
连接池默认以先进先出的租用策略运行,所以如果池中有空闲连接,您将不会获得相同的连接,如果将连接池切换为后进先出的租用策略,则不会出现这种情况。获取时,会检查每个连接是否活跃,只有活跃的连接才会被提供使用。
更新:
您也可以尝试下面仅使用 WebClient API 而不是 Reactor Netty API 的方法:
return this.webClient
.get()
.uri("/500")
.retrieve()
.onStatus(status -> status.equals(HttpStatus.INTERNAL_SERVER_ERROR), clientResponse -> {
clientResponse.bodyToFlux(DataBuffer.class)
.subscribe(new BaseSubscriber<DataBuffer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.cancel();
}
});
return Mono.error(new IllegalStateException("..."));
})
.bodyToMono(String.class);