MQTT Apache Camel 路由错误导致应用程序停止

Error in MQTT Apache Camel route causes application to stop

我正在使用 Apache Camel 和 Spring(不是 Spring 引导)和 xml。

我有我的 camel-context.xml 配置文件,其中包含从 MQTT 服务器到 JMS 服务器的示例路由,反之亦然,只是为了发送消息。

这是我的骆驼-context.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="quadtreeProcessor" class="es.gateway.router.QuadtreeProcessor" />

    <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
        <route id="quadTreeConsumerProducer">
            <from uri="jms:topic:T_ETSI_PRODUCER"/>
            <process ref="quadtreeProcessor"/>
            <to uri="mqtt:quadtree?host=tcp://localhost:1883&amp;publishTopicName=${header.publishTopicName}"/>
        </route>

        <route id="quadTreeConsumerRoute">
            <from uri="mqtt:quadtree?host=tcp://localhost:1883&amp;subscribeTopicName=CONSUMER/DENM/#"/>
            <to uri="jms:topic:T_ETSI_CONSUMER"/>
        </route>
    </camelContext>

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:10011"/>
    </bean>
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" />

这是我的 Main.class:

package es.conncar.main;

import org.apache.camel.RuntimeCamelException;
import org.apache.camel.main.MainListener;
import org.apache.camel.main.MainListenerSupport;
import org.apache.camel.main.MainSupport;
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import es.gateway.config.Config;

public class Main {

    final static Logger logger = Logger.getLogger(Main.class);

    private static boolean exit = false;

    private org.apache.camel.spring.Main camelMain;

    public static void main(String[] args) throws Exception {
        try {
            ApplicationContext springContext = new ClassPathXmlApplicationContext("app-context.xml");
            logger.info("Spring context initialized");

            Config config = (Config) springContext.getBean("config");

            Main main = new Main();
            main.boot();
        } catch (Exception e) {
            logger.error("Unknown error in main", e);
        }
    }

    public void boot() throws Exception {
        camelMain = new org.apache.camel.spring.Main();
        camelMain.addMainListener((MainListener) new Events());
        camelMain.setApplicationContextUri("camel-context.xml"); 
        logger.info("Starting Camel. Use ctrl + c to terminate the JVM.\n");
        camelMain.run();
    }

    public static class Events extends MainListenerSupport {

        @Override
        public void afterStart(MainSupport main) {
            logger.info("Camel is now started!");
        }

        @Override
        public void beforeStop(MainSupport main) {
            logger.info("Camel is now being stopped!");
        }
    }
}

问题发生在 运行 我的 Main class 中的相同路由和配置时,由于 RuntimeCamelException 而在几秒后死掉。 =17=]

异常跟踪是:

骆驼启动正常

[30/11/2018 08:37:08][INFO ][es.gateway.main.Main] - Spring context initialized
[30/11/2018 08:37:08][INFO ][es.gateway.main.Main] - Starting Camel. Use ctrl + c to terminate the JVM. 

10 秒后

[30/11/2018 08:37:21][ERROR][es.gateway.main.Main] - Unknown error in Camel
    org.apache.camel.RuntimeCamelException: java.util.concurrent.TimeoutException
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
        at org.apache.camel.spring.SpringCamelContext.start(SpringCamelContext.java:136)
        at org.apache.camel.spring.CamelContextFactoryBean.start(CamelContextFactoryBean.java:369)
        at org.apache.camel.spring.CamelContextFactoryBean.onApplicationEvent(CamelContextFactoryBean.java:416)
        at org.apache.camel.spring.CamelContextFactoryBean.onApplicationEvent(CamelContextFactoryBean.java:94)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
        at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:393)
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:347)
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:883)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:93)
        at org.apache.camel.spring.Main.createDefaultApplicationContext(Main.java:222)
        at org.apache.camel.spring.Main.doStart(Main.java:154)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.main.MainSupport.run(MainSupport.java:170)
        at es.gateway.main.Main.boot(Main.java:105)
        at es.gateway.main.Main.main(Main.java:77)
    Caused by: java.util.concurrent.TimeoutException
        at org.fusesource.mqtt.client.Promise.await(Promise.java:83)
        at org.apache.camel.component.mqtt.MQTTEndpoint.connect(MQTTEndpoint.java:348)
        at org.apache.camel.component.mqtt.MQTTConsumer.doStart(MQTTConsumer.java:38)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:3705)
        at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:4023)
        at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:3958)
        at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3878)
        at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3642)
        at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3494)
        at org.apache.camel.impl.DefaultCamelContext.access[=13=]0(DefaultCamelContext.java:209)
        at org.apache.camel.impl.DefaultCamelContext.call(DefaultCamelContext.java:3253)
        at org.apache.camel.impl.DefaultCamelContext.call(DefaultCamelContext.java:3249)
        at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3272)
        at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3249)
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3165)
        at org.apache.camel.spring.SpringCamelContext.start(SpringCamelContext.java:133)
        ... 18 more

看起来路由在无法连接到 MQTT 代理时抛出超时(没关系),但此异常未在路由中捕获,而是在上下文中捕获 - Main class(这不好).

我已经关注了这本食谱:

http://camel.apache.org/running-camel-standalone-and-have-it-keep-running.html

camel-springJARorg.apache.camel.spring.Mainclass.

我还查看了 Apache Camel in Action 一书中的第 13 章,但没有找到解决方案。看来我正在开始并配置 camel 上下文就好了。

有没有人有这方面的经验?当路由中出现 RuntimeExceptions 时,有没有办法让路由正常工作并使主程序保持活动状态?希望如此!

提前致谢!

编辑:我发现有几个话题在谈论这个。解决方案似乎是激活监督控制器,但我找不到没有 Spring 启动的方法(我使用的是普通 Spring 和混合 xml 和注释配置) .有人可以帮忙吗?

How to start camel even if the MQTT server is not reachable

Spring Context shutting down camel routes when activemq connection is lost

EDIT2:我已经检查过 问题仅出在 MQTT 上。 JMS 连接正常工作,即使在 JMS 代理关闭时也能够继续并优雅地处理重新连接,而 MQTT 连接则不行。

EDIT3:Paho 工作正常但普通 MQTT 不行。我已经在 xml 配置 (uri) 中使用 Paho 测试了相同的代码,它的行为符合预期,类似于 JMS。该路由抛出异常但继续前进,它不会引发导致应用程序停止的上下文的异常。也许我缺少 MQTT 客户端中的选项?

我建议使用 camel-paho,因为 Eclipse Paho 维护得更活跃 比 camel-mqtt 使用的旧 FuseSource MQTT 客户端库要好。

也就是说,camel-mqtt 组件的问题是它在启动时需要有效连接。您可以配置其重新连接选项以设置各种延迟等

有另一种路由启动机制,允许使用监视路由并处理重试等的后台线程启动路由。它是 SupervisingRouteController,请注意还有一些事情需要实施它的 JMX 管理功能,但在其他方面应该没问题。而且它缺乏更合适的文档。我们正在考虑使其在 Camel 3 中更加突出或默认。

这里有一个例子:https://github.com/apache/camel/tree/master/examples/camel-example-spring-boot-supervising-route-controller