Netty 客户端请求在发送到多个服务器时超时
Netty Client requests timing out while sending to multiple servers
首先说明一下背景:
我已经实现了 Netty Framework 并让一个客户端向超过 44 个服务器发送 HTTP 请求。服务器正在响应该请求。
在我的客户端中,我通过覆盖 channelActive
函数并从 channelRead0
函数读取响应并将所有响应存储在数据结构中来发送请求。
因为,发送 HTTP 请求并从 44 个服务器获得响应需要时间。我正在使用超时值,结构如下所示:
for (final InetAddress target : remoteIPAddresses.values()) {
httpClient.connect(target);
}
// wait for the timeout. Hoping client send request to all
// the targets and get response.
Uninterruptibles.sleepUninterruptibly(timeout, TimeUnit.MILLISECONDS);
httpClient.stop();
fetchResults();
fetchResults 从 channelRead0
中提到的数据结构中获取结果
connect 方法包含如下所示的 netty 实现:
public void connect(final InetAddress remoteAddress){
new Bootstrap()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(httpNettyClientChannelInitializer)
.connect(remoteAddress, serverPort)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.cancel(!future.isSuccess());
}
});
}
Netty 内部使用的参数
connectionTimeout = 100ms
Timeout value = 400ms
Eventloop = 1 (Tried with 2 , 5 and 10)
问题
在 44 个目标中,我遇到了多个目标的超时。目标每次都不同。使用线程睡眠不是一个好习惯,我无法想出任何其他方法来完成任务。
有一个更好的方法吗?我已经看过 this 视频。我被封锁了。任何线索都会很有帮助。
您可以使用 CountDownLatch,而不是睡觉并希望您获得所有必需的响应。将此锁存器传递给您的处理程序,它会在每次响应到达时倒计时(在 channelRead0
中)。然后,您的主线程可以使用 await()
等待所有具有全局超时的响应
您的处理程序可能如下所示:
@ChannelHandler.Sharable
public class HttpResponseHandler extends SimpleChannelInboundHandler<HttpObject> {
final CountDownLatch responseLatch;
public HttpResponseHandler(CountDownLatch responseLatch) {
this.responseLatch = responseLatch;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
....
responseLatch.countDown();
}
}
并且在主线程中:
CountDownLatch responseLatch = new CountDownLatch(remoteIpAddresses.size());
HttpResponseHandler handler = new HttpResponseHandler(responseLatch);
// your for loop to connect to servers here
responseLatch.await(timeout, TimeUnit.MILLISECONDS);
我没有考虑处理程序中的错误情况(套接字 connect/read 超时、无效响应等),因此请务必处理这些情况。
首先说明一下背景:
我已经实现了 Netty Framework 并让一个客户端向超过 44 个服务器发送 HTTP 请求。服务器正在响应该请求。
在我的客户端中,我通过覆盖 channelActive
函数并从 channelRead0
函数读取响应并将所有响应存储在数据结构中来发送请求。
因为,发送 HTTP 请求并从 44 个服务器获得响应需要时间。我正在使用超时值,结构如下所示:
for (final InetAddress target : remoteIPAddresses.values()) {
httpClient.connect(target);
}
// wait for the timeout. Hoping client send request to all
// the targets and get response.
Uninterruptibles.sleepUninterruptibly(timeout, TimeUnit.MILLISECONDS);
httpClient.stop();
fetchResults();
fetchResults 从 channelRead0
中提到的数据结构中获取结果
connect 方法包含如下所示的 netty 实现:
public void connect(final InetAddress remoteAddress){
new Bootstrap()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(httpNettyClientChannelInitializer)
.connect(remoteAddress, serverPort)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.cancel(!future.isSuccess());
}
});
}
Netty 内部使用的参数
connectionTimeout = 100ms
Timeout value = 400ms
Eventloop = 1 (Tried with 2 , 5 and 10)
问题
在 44 个目标中,我遇到了多个目标的超时。目标每次都不同。使用线程睡眠不是一个好习惯,我无法想出任何其他方法来完成任务。
有一个更好的方法吗?我已经看过 this 视频。我被封锁了。任何线索都会很有帮助。
您可以使用 CountDownLatch,而不是睡觉并希望您获得所有必需的响应。将此锁存器传递给您的处理程序,它会在每次响应到达时倒计时(在 channelRead0
中)。然后,您的主线程可以使用 await()
您的处理程序可能如下所示:
@ChannelHandler.Sharable
public class HttpResponseHandler extends SimpleChannelInboundHandler<HttpObject> {
final CountDownLatch responseLatch;
public HttpResponseHandler(CountDownLatch responseLatch) {
this.responseLatch = responseLatch;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
....
responseLatch.countDown();
}
}
并且在主线程中:
CountDownLatch responseLatch = new CountDownLatch(remoteIpAddresses.size());
HttpResponseHandler handler = new HttpResponseHandler(responseLatch);
// your for loop to connect to servers here
responseLatch.await(timeout, TimeUnit.MILLISECONDS);
我没有考虑处理程序中的错误情况(套接字 connect/read 超时、无效响应等),因此请务必处理这些情况。