具有开放 http 连接限制的 Netty 服务器

Netty server with open http connections limit

我是 Netty 的新手,我根据一个示例编写了我发现的 Netty http 服务器,它保持 http 连接打开以将服务器发送的事件发送到浏览器客户端。

问题是它最多只接受大约 ~5 个连接,之后会阻止新连接。我用谷歌搜索,发现大多数答案都说要将 SO_LOGBACK 设置为更高的值。尝试了不同的值,但我看不出有什么不同。我什至将其设置为 MAX_INTEGER 值,但仍然只有 5 个连接。

服务器代码(使用Netty 4.1.6.Final):

package server;

import static io.netty.buffer.Unpooled.copiedBuffer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;

public class NettyHttpServer {
 private ChannelFuture channel;
 private final EventLoopGroup masterGroup;

 public NettyHttpServer() {
  masterGroup = new NioEventLoopGroup(100);
 }

 public void start() {
  try {
   final ServerBootstrap bootstrap = new ServerBootstrap().group(masterGroup)
    .channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer < SocketChannel > () {
     @Override
     public void initChannel(final SocketChannel ch) throws Exception {
      ch.pipeline().addLast("codec", new HttpServerCodec());
      ch.pipeline().addLast("aggregator", new HttpObjectAggregator(512 * 1024));
      ch.pipeline().addLast("request", new ChannelInboundHandlerAdapter() {
       @Override
       public void channelRead(final ChannelHandlerContext ctx, final Object msg)
       throws Exception {
        System.out.println(msg);
        registerToPubSub(ctx, msg);
       }

       @Override
       public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
       }

       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
         HttpResponseStatus.INTERNAL_SERVER_ERROR,
         copiedBuffer(cause.getMessage().getBytes())));
       }
      });
     }
    }).option(ChannelOption.SO_BACKLOG, Integer.MAX_VALUE)
    .childOption(ChannelOption.SO_KEEPALIVE, true);
   channel = bootstrap.bind(8081).sync();
   // channels.add(bootstrap.bind(8080).sync());
  } catch (final InterruptedException e) {}
 }

 public void shutdown() {
  masterGroup.shutdownGracefully();

  try {
   channel.channel().closeFuture().sync();
  } catch (InterruptedException e) {}
 }

 private void registerToPubSub(final ChannelHandlerContext ctx, Object msg) {
  new Thread() {
   @Override
   public void run() {
    while (true) {
     final String responseMessage = "data:abcdef\n\n";
     FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
      copiedBuffer(responseMessage.getBytes()));

     response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
     response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
     response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
     response.headers().set("Cache-Control", "no-cache");

     ctx.writeAndFlush(response);

     try {
      Thread.sleep(1000);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   };
  }.start();
 }

 public static void main(String[] args) {
  new NettyHttpServer().start();
 }
}

客户端 js 代码(我在不同的选项卡中从我的浏览器中 运行 它超过 5 次,但并不是所有的都得到:

var source = new EventSource("http://localhost:8081");
source.onmessage = function(event) {
   console.log(event.data);
};
source.onerror= function(err){console.log(err); source.close()};
source.onopen = function(event){console.log('open'); console.log(event)}

您需要让浏览器知道您已完成发送响应,为此您有三个选项。

  1. 设置内容长度
  2. 分块发送
  3. 完成后关闭连接

你没有做任何这些。我怀疑您的浏览器仍在等待对您发送的每个请求的完整响应,并且正在为您的测试中的每个请求使用新连接。 5 次请求后,您的浏览器必须拒绝创建新连接。

我注意到的另一件事是,您正在为服务器中的每个请求创建一个新线程,并且永远不会让它死掉。当您尝试扩展时,这将导致问题。如果您真的希望该代码 运行 在不同的线程中,那么我建议查看用于向管道添加处理程序的重载方法;那些应该让你指定一个线程池 运行 他们。