Netty 和预定执行器服务
Netty and Scheduled Executor Service
我正在尝试创建一个 TCP 服务器,它定期从数据库 (Redis) 读取数据并将其发送到适当的客户端。
但是,由于我是 Netty 的新手,我不知道如何安排它。我知道我需要使用这样的预定执行程序服务:
ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.scheduleAtFixedRate(() -> {
System.out.println("Calling...");
// Do something
}, 1, 1, TimeUnit.SECONDS);
但是,当我尝试将其放入服务器代码时,它只调用了一次该方法。我试图把它放在不同的地方,但似乎仍然无法正确处理。我该怎么办?
服务器代码如下:
package com.example.test.app;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Server {
public static void main(String[] args) throws Exception
{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerHandler handler = new ServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(handler);
}
});
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.scheduleAtFixedRate(() -> {
System.out.println("Calling...");
handler.saySomething();
}, 1, 1, TimeUnit.SECONDS);
ChannelFuture f = b.bind(1337).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
这是服务器处理程序:
package com.example.test.app;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx)
{
this.ctx = ctx;
System.out.println("Someone's connedted!");
}
public void saySomething()
{
final ChannelFuture f = ctx.writeAndFlush("Sup!");
f.addListener((ChannelFutureListener) (ChannelFuture future) -> {
System.out.println("Something has been said!");
});
}
}
方法saySomething()
生成NullPointerException
用于调用final ChannelFuture f = ctx.writeAndFlush("Sup!");
,而ctx
为空。
EventExecutorGroup.scheduleAtFixedRate
javadoc 描述说 "If any execution of the task encounters an exception, subsequent executions are suppressed"。所以这就是为什么你只调用一次...
此外,似乎 Netty 允许您为不同的管道实例重复使用一个处理程序实例,前提是您将此处理程序的 class 注释为 @Sharable。否则,它会抛出异常。如果你的处理程序是无状态的(这不是你的情况,因为你有 ctx 成员)那么你应该将它注释为 @Sharable 并将它重新用于所有创建的管道。如果它是有状态的,则为每个新管道(新客户端连接)创建一个新实例。
最后,要为每个连接的客户端安排任务,您可以使用 channelActive()
实施。这个执行器实现了 ScheduledExecutorService
,所以你也有 scheduleAtFixedRate
。
看看我的代码版本,看看它是否适合你。
服务器:
package com.example.test.app;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Server {
public static void main(String[] args) throws Exception
{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new ServerHandler());
}
});
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
// e.scheduleAtFixedRate(() -> {
// System.out.println("Calling...");
// handler.saySomething();
// }, 1, 1, TimeUnit.SECONDS);
ChannelFuture f = b.bind(1337).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
服务器处理程序:
package com.example.test.app;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class ServerHandler extends ChannelInboundHandlerAdapter {
private ScheduledFuture sf;
@Override
public void channelActive(ChannelHandlerContext ctx)
{
System.out.println("Someone's connedted! "+ctx.channel());
sf = ctx.executor().scheduleAtFixedRate(() -> {
System.out.println("Calling...");
saySomething(ctx);
}, 1, 1, TimeUnit.SECONDS);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("Someone's disconnected! "+ctx.channel());
sf.cancel(false);
}
private void saySomething(ChannelHandlerContext ctx)
{
final ChannelFuture f = ctx.writeAndFlush("Sup!");
f.addListener((ChannelFutureListener) (ChannelFuture future) -> {
System.out.println("Something has been said!");
});
}
}
我正在尝试创建一个 TCP 服务器,它定期从数据库 (Redis) 读取数据并将其发送到适当的客户端。
但是,由于我是 Netty 的新手,我不知道如何安排它。我知道我需要使用这样的预定执行程序服务:
ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.scheduleAtFixedRate(() -> {
System.out.println("Calling...");
// Do something
}, 1, 1, TimeUnit.SECONDS);
但是,当我尝试将其放入服务器代码时,它只调用了一次该方法。我试图把它放在不同的地方,但似乎仍然无法正确处理。我该怎么办?
服务器代码如下:
package com.example.test.app;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Server {
public static void main(String[] args) throws Exception
{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerHandler handler = new ServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(handler);
}
});
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.scheduleAtFixedRate(() -> {
System.out.println("Calling...");
handler.saySomething();
}, 1, 1, TimeUnit.SECONDS);
ChannelFuture f = b.bind(1337).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
这是服务器处理程序:
package com.example.test.app;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx)
{
this.ctx = ctx;
System.out.println("Someone's connedted!");
}
public void saySomething()
{
final ChannelFuture f = ctx.writeAndFlush("Sup!");
f.addListener((ChannelFutureListener) (ChannelFuture future) -> {
System.out.println("Something has been said!");
});
}
}
方法saySomething()
生成NullPointerException
用于调用final ChannelFuture f = ctx.writeAndFlush("Sup!");
,而ctx
为空。
EventExecutorGroup.scheduleAtFixedRate
javadoc 描述说 "If any execution of the task encounters an exception, subsequent executions are suppressed"。所以这就是为什么你只调用一次...
此外,似乎 Netty 允许您为不同的管道实例重复使用一个处理程序实例,前提是您将此处理程序的 class 注释为 @Sharable。否则,它会抛出异常。如果你的处理程序是无状态的(这不是你的情况,因为你有 ctx 成员)那么你应该将它注释为 @Sharable 并将它重新用于所有创建的管道。如果它是有状态的,则为每个新管道(新客户端连接)创建一个新实例。
最后,要为每个连接的客户端安排任务,您可以使用 channelActive()
实施。这个执行器实现了 ScheduledExecutorService
,所以你也有 scheduleAtFixedRate
。
看看我的代码版本,看看它是否适合你。
服务器:
package com.example.test.app;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Server {
public static void main(String[] args) throws Exception
{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new ServerHandler());
}
});
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
// e.scheduleAtFixedRate(() -> {
// System.out.println("Calling...");
// handler.saySomething();
// }, 1, 1, TimeUnit.SECONDS);
ChannelFuture f = b.bind(1337).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
服务器处理程序:
package com.example.test.app;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class ServerHandler extends ChannelInboundHandlerAdapter {
private ScheduledFuture sf;
@Override
public void channelActive(ChannelHandlerContext ctx)
{
System.out.println("Someone's connedted! "+ctx.channel());
sf = ctx.executor().scheduleAtFixedRate(() -> {
System.out.println("Calling...");
saySomething(ctx);
}, 1, 1, TimeUnit.SECONDS);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("Someone's disconnected! "+ctx.channel());
sf.cancel(false);
}
private void saySomething(ChannelHandlerContext ctx)
{
final ChannelFuture f = ctx.writeAndFlush("Sup!");
f.addListener((ChannelFutureListener) (ChannelFuture future) -> {
System.out.println("Something has been said!");
});
}
}