Netty 客户端请求在发送到多个服务器时超时

Netty Client requests timing out while sending to multiple servers

首先说明一下背景:
我已经实现了 Netty Framework 并让一个客户端向超过 44 个服务器发送 HTTP 请求。服务器正在响应该请求。
在我的客户端中,我通过覆盖 channelActive 函数并从 channelRead0 函数读取响应并将所有响应存储在数据结构中来发送请求。
因为,发送 HTTP 请求并从 44 个服务器获得响应需要时间。我正在使用超时值,结构如下所示:

      for (final InetAddress target : remoteIPAddresses.values()) {
            httpClient.connect(target);
        }
        // wait for the timeout. Hoping client send request to all
        // the targets and get response.
        Uninterruptibles.sleepUninterruptibly(timeout, TimeUnit.MILLISECONDS);
        httpClient.stop();
        fetchResults();

fetchResults 从 channelRead0 中提到的数据结构中获取结果 connect 方法包含如下所示的 netty 实现:

 public void connect(final InetAddress remoteAddress){
        new Bootstrap()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .group(eventLoopGroup)
            .channel(NioSocketChannel.class)
            .handler(httpNettyClientChannelInitializer)
            .connect(remoteAddress, serverPort)
            .addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) {
                        future.cancel(!future.isSuccess());
                    }
                });
    }  

Netty 内部使用的参数

connectionTimeout = 100ms  
Timeout value = 400ms    
Eventloop = 1 (Tried with 2 , 5 and 10) 

问题
在 44 个目标中,我遇到了多个目标的超时。目标每次都不同。使用线程睡眠不是一个好习惯,我无法想出任何其他方法来完成任务。 有一个更好的方法吗?我已经看过 this 视频。我被封锁了。任何线索都会很有帮助。

您可以使用 CountDownLatch,而不是睡觉并希望您获得所有必需的响应。将此锁存器传递给您的处理程序,它会在每次响应到达时倒计时(在 channelRead0 中)。然后,您的主线程可以使用 await()

等待所有具有全局超时的响应

您的处理程序可能如下所示:

@ChannelHandler.Sharable
public class HttpResponseHandler extends SimpleChannelInboundHandler<HttpObject> {

    final CountDownLatch responseLatch;

    public HttpResponseHandler(CountDownLatch responseLatch) {
        this.responseLatch = responseLatch;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        ....
        responseLatch.countDown();
    }
}

并且在主线程中:

        CountDownLatch responseLatch = new CountDownLatch(remoteIpAddresses.size());
        HttpResponseHandler handler = new HttpResponseHandler(responseLatch);
        // your for loop to connect to servers here
        responseLatch.await(timeout, TimeUnit.MILLISECONDS);

我没有考虑处理程序中的错误情况(套接字 connect/read 超时、无效响应等),因此请务必处理这些情况。