骆驼消费者在关闭时仍然会收到新消息

Camel consumers still get new messages when shutting down

我正在使用 Camel 2.16.1。关闭时,Camel 的消费者仍然接受新消息。有什么办法可以强制消费者立即停止。这里有同样的问题:Camel ShutdownStrategy: Inflight Messages do not decrease

我为这个问题创建了一个测试用例:

    public class GracefulShutdownTest extends CamelTestSupport {

    public class ThreadStopContext implements Runnable {
        private CamelContext context;

        public ThreadStopContext(CamelContext context) {
            this.context = context;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println("------start shutdown....");
                context.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public class Sender implements Runnable {
        private ProducerTemplate template;

        public Sender(ProducerTemplate template) {
            this.template = template;
        }

        @Override
        public void run() {
            template().sendBody("direct:start", "test");
        }
    }

    @Test
    public void shutdownTest() throws Exception {
        context().setTracing(true);
        getMockEndpoint("mock:endpoint").expectedMessageCount(0);
        getMockEndpoint("mock:deadletterEndpoint").expectedMessageCount(1);
        Thread thread = new Thread(new ThreadStopContext(context()));
        thread.start();
        Thread threadSender1 = new Thread(new Sender(template()));
        threadSender1.start();
        Thread.sleep(2000);
        System.out.println("-------------------second message-----------------------------");
        Thread threadSender2 = new Thread(new Sender(template()));
        threadSender2.start();
        assertMockEndpointsSatisfied();

    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                errorHandler(deadLetterChannel("direct:deadletter")
                        .logHandled(true));

                from("direct:start").process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("----------start sleep....");
                        Thread.sleep(5000);
                        System.out.println("----------end sleep....");
                        throw new Exception();
                    }
                });

                from("direct:deadletter")
                        .startupOrder(1)
                        .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("----message at deadletter----");
                    }
                })
                        .to("mock:deadletterEndpoint");
            }
        };
    }
}

当运行测试用例时,我们可以看到在开始正常关闭后飞行交换的数量增加了:

Waiting as there are still 1 inflight and pending exchanges to complete
.....
Waiting as there are still 2 inflight and pending exchanges to complete

在阅读 Defaultshutdownstrategy class 的源代码后,我发现在关闭消费者时,Camel 更喜欢挂起而不是关闭。这意味着如果消费者支持挂起,它将执行挂起方法,然后将消费者放入延迟列表以稍后关闭。一些消费者(直接,RabbitMQ)有空的 suspend() 方法,然后他们仍然像在延迟模式下一样收到新消息