无法使用基于 JMS 的代码和 amqp 1.0 访问 ActiveMQ

Unable to access ActiveMQ using JMS based code and amqp 1.0

我正在尝试使用 AMQP 1.0 连接到 ActiveMQ 代理,但我想在我的应用程序代码中使用 JMS。我对使用 JMS 感兴趣主要是因为我希望开发人员能够使用他们已经熟悉的 API。

我在本地主机上有 ActiveMQ 5.14.0 运行ning 和以下代码:

    public static void main(String[] args) throws JMSException, InterruptedException {

    Connection connection = null;
    try {
        // Producer
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("amqp://localhost:5672");

        connection = connectionFactory.createConnection();
        connection.start();     

        Session session = connection.createSession(false,
                                                   Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("customerTopic");     

        // Publish
        MessageProducer producer = session.createProducer(topic);
        for ( int i = 0; i < 10; i++) {
            Message msg = session.createTextMessage("Task : " + i);

            producer.send(msg);

        }
        session.close();
    } finally {
        if (connection != null) {
            connection.close();
        }

    }


}

代码总是以同样的方式失败,堆栈跟踪中的根本原因如下:

Caused by: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: tcp://127.0.0.1:5672

这发生在 connection.start() 方法调用上。

如果我 运行 针对 ActiveMQ tcp 端点使用相同的代码,那么它将按预期执行。

我的 pom 文件依赖项如下(我怀疑这是我问题的根源,因为我发现依赖项的文档非常难以遵循)

<dependencies>
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-amqp-1-0-client-jms</artifactId>
        <version>0.32</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-amqp</artifactId>
        <version>5.14.0</version>
    </dependency>

</dependencies>

我的直接问题是 "Why doesn't this work?"。

我的补充(基于意见)问题是 "Is it worthwhile trying to use the JMS abstraction above AMQP 1.0, or should I just abandon myself to learning the provider specific APIs?"

最好用jndi

public static void main(String[] args) throws JMSException, InterruptedException, NamingException {
    Connection connection = null;
    try {
        Properties props = new Properties();
        props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        props.setProperty("connectionfactory.myFactoryLookup",
                "amqp://localhost:5672");
        props.put("topic." + "MyTOPIC", "customerTopic");
        InitialContext ic = new InitialContext(props);
        ConnectionFactory cf1 = (ConnectionFactory) ic.lookup("myFactoryLookup");
        Topic topic = (Topic) ic.lookup("MyTOPIC");
        connection = cf1.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(topic);
        connection.start();
        for (int i = 0; i < 10; i++) {
            Message msg = session.createTextMessage("Task : " + i);
            producer.send(msg);
        }
        session.close();
    } finally {
        if (connection != null) {
            connection.close();
        }
    }
}

替换

 <dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-amqp-1-0-client-jms</artifactId>
    <version>0.32</version>
</dependency>

来自

    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-jms-client</artifactId>
        <version>0.9.0</version>
    </dependency>

在经纪人方面您需要添加:

 <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/>

ref http://activemq.apache.org/amqp.html