Netty 消息接收超时
Netty messageReceived timeout
我需要我的 ChannelHandler 中的 messageReceived(或 channelRead0 for netty 4.0)方法在某个时间阈值后超时。我已尝试 Read/WriteTimeoutHandlers,但当我的 messageReceived 处理时间超过超时时无法生成异常。这是我尝试过的:
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpContentCompressor());
p.addLast("readTimeoutHandler", new ReadTimeoutHandler(1));
//p.addLast("idleTimeoutHandler", new IdleStateHandler(1, 1, 1));
p.addLast(new HttpRequestHandler());
}
}
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
public static class Call implements Callable<Boolean> {
public Boolean call() {
for(long i = 0; i<100000000;i++){
for(long j = 0; j<100;j++){
}
}
return true;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("** Exception caught **");
if (cause instanceof ReadTimeoutException) {
System.out.println("*** Request timed out ***");
}
else if (cause instanceof WriteTimeoutException) {
System.out.println("*** Request timed out on write ***");
}
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
FullHttpRequest request = this.request = (FullHttpRequest) msg;
/*Callable<Boolean> callable = new Call();
ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
ScheduledFuture<Boolean> handle = scheduler.schedule(callable, 4, TimeUnit.SECONDS);
boolean v = handle.get();*/
for(long i = 0; i<100000000;i++){
for(long j = 0; j<100;j++){
}
}
System.out.println("Wait done");
try{
CharSequence content = appController.handleRequest(
url,
ctx.channel().remoteAddress().toString(),
parseURL(url), reqBody);
if(content!=null){
writeResponse(ctx, HttpResponseStatus.OK, content);
}
}catch(AppRuntimeException e){
CharSequence content = e.getMessage();
if(content != null){
OptHttpStatus status = e.getOptHttpStatus();
writeResponse(ctx, HttpResponseStatus.valueOf(status.getCode()), content);
}
}
private void writeResponse(ChannelHandlerContext ctx, HttpResponseStatus status, CharSequence content) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1,
status,
Unpooled.copiedBuffer(content, CharsetUtil.UTF_8));
//Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().set("Access-Control-Allow-Origin", "*");
if (keepAlive) {
//Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Write the response.
ChannelFuture ch = ctx.writeAndFlush(response);
if(!keepAlive){
ch.addListener(ChannelFutureListener.CLOSE);
}
}
}
我在channelRead0方法中添加了一个冗余的for循环来模拟等待,以模拟较长的处理时间。但是没有产生超时异常。
我也尝试安排等待,但没有得到超时异常
您能提出任何解决方案吗?
您尝试执行的操作有两个问题,一个是线程休眠而不是通过调度使用异步延迟引起的,另一个是您需要返回使用 ReadTimeoutHandler .
不睡不堵
而不是 Thread.sleep
为什么不尝试延迟安排您的工作。使用 Netty,您的超时将发生在您发送到睡眠的同一个线程中,因此您仍然会在超时检查发生之前编写响应。如果您改为安排延迟,则线程可以自由检测超时并触发异常。
请记住,netty 对多个通道使用一个 IO 线程,因此您不应在该线程中执行任何 blocking/synchronous 工作。 Thread.sleep、忙循环、同步调用、阻塞调用(如Future.get()
)不应该在IO线程中进行,因为这会影响其他通道的性能。
您可以使用 context 联系执行人安排您延迟的工作。
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
// Put your try/catch in here
}
}, 5000, TimeUnit.MILLISECONDS);
}
如果您必须进行阻塞调用
如果您无法避免进行阻塞调用或进行一些密集处理,则在添加处理处理程序时使用不同的 EventExecutorGroup 以允许从 IO 工作线程异步完成该工作。请务必为其提供足够的线程来满足您预期的工作负载和连接数。
下面的示例代码应该适用于您的 Thread.sleep 或繁忙的循环。请务必 define/replace OFFLOAD_THREADS 使用满足您需要的号码。
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
private final EventExecutorGroup executors = new DefaultEventExecutorGroup(OFFLOAD_THREADS);
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpContentCompressor());
p.addLast("idleTimeoutHandler", new IdleStateHandler(0, 1, 0));
p.addLast(executors, new HttpRequestHandler());
}
}
使用 IdleStateHandler
WriteTimeoutHandler 用于在写入时间过长时超时。在您开始第一次写入之前,它甚至不会开始为您计时。看起来你试图在写入开始之前造成延迟,所以 WriteTimeoutHandler 不会为你触发,即使你按照上面的建议停止使用 sleep 。
如果你真的想在开始写入时超时,那么你应该使用 IdleStateHandler 并处理它触发的用户事件。与 WriteTimeoutHandler 不同,IdleStateHandler 将在通道变为活动状态时开始计数,而不是等待写入开始,因此如果您的处理时间过长(但仅当您进行异步处理时),它将触发。
确保在使用 IdleStateHandler 时捕获并响应用户事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
// Handle the event here
}
else {
super.userEventTriggered(ctx, evt);
}
}
我需要我的 ChannelHandler 中的 messageReceived(或 channelRead0 for netty 4.0)方法在某个时间阈值后超时。我已尝试 Read/WriteTimeoutHandlers,但当我的 messageReceived 处理时间超过超时时无法生成异常。这是我尝试过的:
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpContentCompressor());
p.addLast("readTimeoutHandler", new ReadTimeoutHandler(1));
//p.addLast("idleTimeoutHandler", new IdleStateHandler(1, 1, 1));
p.addLast(new HttpRequestHandler());
}
}
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
public static class Call implements Callable<Boolean> {
public Boolean call() {
for(long i = 0; i<100000000;i++){
for(long j = 0; j<100;j++){
}
}
return true;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("** Exception caught **");
if (cause instanceof ReadTimeoutException) {
System.out.println("*** Request timed out ***");
}
else if (cause instanceof WriteTimeoutException) {
System.out.println("*** Request timed out on write ***");
}
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
FullHttpRequest request = this.request = (FullHttpRequest) msg;
/*Callable<Boolean> callable = new Call();
ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
ScheduledFuture<Boolean> handle = scheduler.schedule(callable, 4, TimeUnit.SECONDS);
boolean v = handle.get();*/
for(long i = 0; i<100000000;i++){
for(long j = 0; j<100;j++){
}
}
System.out.println("Wait done");
try{
CharSequence content = appController.handleRequest(
url,
ctx.channel().remoteAddress().toString(),
parseURL(url), reqBody);
if(content!=null){
writeResponse(ctx, HttpResponseStatus.OK, content);
}
}catch(AppRuntimeException e){
CharSequence content = e.getMessage();
if(content != null){
OptHttpStatus status = e.getOptHttpStatus();
writeResponse(ctx, HttpResponseStatus.valueOf(status.getCode()), content);
}
}
private void writeResponse(ChannelHandlerContext ctx, HttpResponseStatus status, CharSequence content) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1,
status,
Unpooled.copiedBuffer(content, CharsetUtil.UTF_8));
//Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().set("Access-Control-Allow-Origin", "*");
if (keepAlive) {
//Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Write the response.
ChannelFuture ch = ctx.writeAndFlush(response);
if(!keepAlive){
ch.addListener(ChannelFutureListener.CLOSE);
}
}
}
我在channelRead0方法中添加了一个冗余的for循环来模拟等待,以模拟较长的处理时间。但是没有产生超时异常。 我也尝试安排等待,但没有得到超时异常 您能提出任何解决方案吗?
您尝试执行的操作有两个问题,一个是线程休眠而不是通过调度使用异步延迟引起的,另一个是您需要返回使用 ReadTimeoutHandler .
不睡不堵
而不是 Thread.sleep
为什么不尝试延迟安排您的工作。使用 Netty,您的超时将发生在您发送到睡眠的同一个线程中,因此您仍然会在超时检查发生之前编写响应。如果您改为安排延迟,则线程可以自由检测超时并触发异常。
请记住,netty 对多个通道使用一个 IO 线程,因此您不应在该线程中执行任何 blocking/synchronous 工作。 Thread.sleep、忙循环、同步调用、阻塞调用(如Future.get()
)不应该在IO线程中进行,因为这会影响其他通道的性能。
您可以使用 context 联系执行人安排您延迟的工作。
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
// Put your try/catch in here
}
}, 5000, TimeUnit.MILLISECONDS);
}
如果您必须进行阻塞调用
如果您无法避免进行阻塞调用或进行一些密集处理,则在添加处理处理程序时使用不同的 EventExecutorGroup 以允许从 IO 工作线程异步完成该工作。请务必为其提供足够的线程来满足您预期的工作负载和连接数。
下面的示例代码应该适用于您的 Thread.sleep 或繁忙的循环。请务必 define/replace OFFLOAD_THREADS 使用满足您需要的号码。
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
private final EventExecutorGroup executors = new DefaultEventExecutorGroup(OFFLOAD_THREADS);
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpContentCompressor());
p.addLast("idleTimeoutHandler", new IdleStateHandler(0, 1, 0));
p.addLast(executors, new HttpRequestHandler());
}
}
使用 IdleStateHandler
WriteTimeoutHandler 用于在写入时间过长时超时。在您开始第一次写入之前,它甚至不会开始为您计时。看起来你试图在写入开始之前造成延迟,所以 WriteTimeoutHandler 不会为你触发,即使你按照上面的建议停止使用 sleep 。
如果你真的想在开始写入时超时,那么你应该使用 IdleStateHandler 并处理它触发的用户事件。与 WriteTimeoutHandler 不同,IdleStateHandler 将在通道变为活动状态时开始计数,而不是等待写入开始,因此如果您的处理时间过长(但仅当您进行异步处理时),它将触发。
确保在使用 IdleStateHandler 时捕获并响应用户事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
// Handle the event here
}
else {
super.userEventTriggered(ctx, evt);
}
}