带有 IBM MQ 的 Apache Camel
Apache Camel with IBM MQ
您好,有人曾将 Camel 与 IBM 的 MQ 一起使用过吗?我们正在考虑是否可以一起使用这两种产品,但没有这两种产品一起工作的例子。
快速 google 显示以下内容,
http://lowry-techie.blogspot.de/2010/11/camel-integration-with-websphere-mq.html
HTH
我广泛使用 IBM MQ 和 camel。两者一起使用没有问题。我将从利用骆驼 Jms 端点、spring 连接工厂和 IBM MQ 定义的 spring 上下文文件之一粘贴示例配置。
骆驼路线
from("someplace")
.to("cpaibmmq:queue:myQueueName");
Spring 上下文
<bean name="cpaibmmq" class="org.apache.camel.component.jms.JmsComponent" destroy-method="doStop">
<property name="transacted" value="${jms.transacted}" />
<property name="concurrentConsumers" value="${cpa.concurrentConsumers}" />
<property name="maxConcurrentConsumers" value="${cpa.concurrentConsumers}" />
<property name="acceptMessagesWhileStopping" value="${jms.acceptMessagesWhileStopping}" />
<property name="acknowledgementModeName" value="${jms.acknowledgementModeName}" />
<property name="cacheLevelName" value="${jms.cacheLevelName}" />
<property name="connectionFactory" ref="ibmFac1" />
<property name="exceptionListener" ref="ibmFac1" />
</bean>
<bean id="ibmFac1" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">
<constructor-arg>
<bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="transportType" value="1" />
<property name="channel" value="${cpa.wmq.channel}" />
<property name="hostName" value="${cpa.wmq.hostname}" />
<property name="port" value="${cpa.wmq.port}" />
<property name="queueManager" value="${cpa.wmq.mqmanager}" />
</bean>
</constructor-arg>
</bean>
我能得到的最好的记录在下面,说明为 Spring XML 本身托管 CAMEL 上下文和路由的应用程序上下文。此样本适用于 IBM 本机 MQ JCA-compliant 资源适配器 v7.5、CAMEL 2.16、Spring 核心 4.2。我已将其部署在 Glassfish、Weblogic 和 JBoss EAP7 服务器中。
复杂性必然会处理 MQ 报告流,其理念与普通 JMS reply-to 消息的理念相冲突。详细解释请参考Implementing native websphere MQ with CoD over Camel JMS component
这个基于 CAMEL XML DSL 的示例 self-contained 并且易于测试。
我们从 Spring & CAMEL 声明开始:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">
CAMEL 上下文遵循 2 条路线:MQ 到 JMS 和 JMS 到 MQ,这里链接形成一个桥接以简化测试。
<camel:camelContext id="mqBridgeCtxt">
<camel:route id="mq2jms" autoStartup="true">
奇怪:在 Weblogic 上,获得(例如)3 个侦听器的唯一方法是强制执行 3 个连接(按顺序有 3 个 Camel:from 语句)每个最多 1 个 session,否则一个 MQ错误随之而来:MQJCA1018:每个连接只允许一个 session。在 JBoss 上,您可以简单地调整 concurrentConsumers=...
<camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&disableReplyTo=true&
acknowledgementModeName=SESSION_TRANSACTED"/>
上面的 disable disableReplyTo 选项确保 CAMEL 在我们测试 MQ 消息类型为 1=Request(-reply) 或 8=datagram(一种方式!)之前不会产生回复。此处未说明该测试和回复构造。
然后我们在下一次发送到纯 JMS 时将 EIP 强制为 InOnly,以与 Inbound MQ 模式保持一致。
<camel:setExchangePattern pattern="InOnly"/>
<!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
<camel:to uri="ref:innerQueue" />
</camel:route>
MQ-to-jms路线结束;接下来是仍然在同一个 CAMEL 上下文中的 jms-to-MQ 路由:
<camel:route id="jms2mq" autoStartup="true">
<camel:from uri="ref:innerQueue" />
<!-- remove inner message headers and properties to test without inbound side effects! -->
<camel:removeHeaders pattern="*"/>
<camel:removeProperties pattern="*" />
<!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->
现在是远程目标返回的 MQ CoD 报告的请求标志。我们还强制 MQ 消息为数据报类型(值 8)。
<camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
<camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
<camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>
可以通过 ReplyTo uri 选项指定 ReplyTo queue,也可以指定为 header,如下所示。
接下来我们使用 CamelJmsDestinationName header 强制抑制 JMS MQ 消息 header MQRFH2(使用 targetClient MQ URL 选项值 1)。换句话说,我们想要发送一个普通的普通 MQ 二进制消息(即只有 MQMD 消息描述符后跟有效负载)。
<camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
<camel:setHeader headerName="CamelJmsDestinationName"> <camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>
可以通过保留的 JMS 属性控制更多 MQMD 字段,如下所示。请参阅 IBM 文档中的限制。
<camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR </camel:constant></camel:setHeader>
<camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>
URI 中的目标 queue 被上面的 CamelJmsDestinationName 覆盖,因此 URI 中的 queue 名称成为占位符。
URI 选项 preserveMessageQos 是一个 - 正如观察到的那样 - 允许发送一条设置了 ReplyTo 数据的消息(以获取 MQ CoD 报告),但阻止 CAMEL 通过强制执行 InOnly MEP 来实例化回复消息侦听器.
<camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&
exchangePattern=InOnly&preserveMessageQos=true&
includeSentJMSMessageID=true" />
</camel:route>
</camel:camelContext>
我们还没有完成,我们仍然要为本机 JMS 提供程序和 Websphere MQ(通过本机 IBM WMQ JCA 资源适配器)声明我们的 queue 工厂,以根据您的上下文进行调整。
我们在这里使用管理 objects.
上的 JNDI 查找
<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>
<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>
<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="jmsraQCFBean" />
</bean>
<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="mqQCFBean" />
</bean>
</beans>
从 JNDI 获取工厂(和 JCA 适配器)的替代方法是将 JMS 客户端声明为 Spring bean。在 Weblogic 和 Glassfish 中,部署本机 IBM JCA 资源适配器并创建 JNDI 资源,然后在上面的 Spring 上下文中引用,在 JBoss 中直接 MQ 客户端 bean 声明最适合如下)
<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
<property name="hostName" value="${mqHost}"/>
<property name="port" value="${mqPort}"/>
<property name="queueManager" value="${mqQueueManager}"/>
<property name="channel" value="${mqChannel}"/>
<property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
<property name="appName" value="${connectionName}"/>
</bean>
<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="mqCFBean"/>
<property name="transacted" value="true"/>
<property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>
欢迎提出意见和改进。
您好,有人曾将 Camel 与 IBM 的 MQ 一起使用过吗?我们正在考虑是否可以一起使用这两种产品,但没有这两种产品一起工作的例子。
快速 google 显示以下内容,
http://lowry-techie.blogspot.de/2010/11/camel-integration-with-websphere-mq.html
HTH
我广泛使用 IBM MQ 和 camel。两者一起使用没有问题。我将从利用骆驼 Jms 端点、spring 连接工厂和 IBM MQ 定义的 spring 上下文文件之一粘贴示例配置。
骆驼路线
from("someplace")
.to("cpaibmmq:queue:myQueueName");
Spring 上下文
<bean name="cpaibmmq" class="org.apache.camel.component.jms.JmsComponent" destroy-method="doStop">
<property name="transacted" value="${jms.transacted}" />
<property name="concurrentConsumers" value="${cpa.concurrentConsumers}" />
<property name="maxConcurrentConsumers" value="${cpa.concurrentConsumers}" />
<property name="acceptMessagesWhileStopping" value="${jms.acceptMessagesWhileStopping}" />
<property name="acknowledgementModeName" value="${jms.acknowledgementModeName}" />
<property name="cacheLevelName" value="${jms.cacheLevelName}" />
<property name="connectionFactory" ref="ibmFac1" />
<property name="exceptionListener" ref="ibmFac1" />
</bean>
<bean id="ibmFac1" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">
<constructor-arg>
<bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="transportType" value="1" />
<property name="channel" value="${cpa.wmq.channel}" />
<property name="hostName" value="${cpa.wmq.hostname}" />
<property name="port" value="${cpa.wmq.port}" />
<property name="queueManager" value="${cpa.wmq.mqmanager}" />
</bean>
</constructor-arg>
</bean>
我能得到的最好的记录在下面,说明为 Spring XML 本身托管 CAMEL 上下文和路由的应用程序上下文。此样本适用于 IBM 本机 MQ JCA-compliant 资源适配器 v7.5、CAMEL 2.16、Spring 核心 4.2。我已将其部署在 Glassfish、Weblogic 和 JBoss EAP7 服务器中。
复杂性必然会处理 MQ 报告流,其理念与普通 JMS reply-to 消息的理念相冲突。详细解释请参考Implementing native websphere MQ with CoD over Camel JMS component
这个基于 CAMEL XML DSL 的示例 self-contained 并且易于测试。
我们从 Spring & CAMEL 声明开始:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">
CAMEL 上下文遵循 2 条路线:MQ 到 JMS 和 JMS 到 MQ,这里链接形成一个桥接以简化测试。
<camel:camelContext id="mqBridgeCtxt">
<camel:route id="mq2jms" autoStartup="true">
奇怪:在 Weblogic 上,获得(例如)3 个侦听器的唯一方法是强制执行 3 个连接(按顺序有 3 个 Camel:from 语句)每个最多 1 个 session,否则一个 MQ错误随之而来:MQJCA1018:每个连接只允许一个 session。在 JBoss 上,您可以简单地调整 concurrentConsumers=...
<camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&disableReplyTo=true&
acknowledgementModeName=SESSION_TRANSACTED"/>
上面的 disable disableReplyTo 选项确保 CAMEL 在我们测试 MQ 消息类型为 1=Request(-reply) 或 8=datagram(一种方式!)之前不会产生回复。此处未说明该测试和回复构造。
然后我们在下一次发送到纯 JMS 时将 EIP 强制为 InOnly,以与 Inbound MQ 模式保持一致。
<camel:setExchangePattern pattern="InOnly"/>
<!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
<camel:to uri="ref:innerQueue" />
</camel:route>
MQ-to-jms路线结束;接下来是仍然在同一个 CAMEL 上下文中的 jms-to-MQ 路由:
<camel:route id="jms2mq" autoStartup="true">
<camel:from uri="ref:innerQueue" />
<!-- remove inner message headers and properties to test without inbound side effects! -->
<camel:removeHeaders pattern="*"/>
<camel:removeProperties pattern="*" />
<!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->
现在是远程目标返回的 MQ CoD 报告的请求标志。我们还强制 MQ 消息为数据报类型(值 8)。
<camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
<camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
<camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>
可以通过 ReplyTo uri 选项指定 ReplyTo queue,也可以指定为 header,如下所示。
接下来我们使用 CamelJmsDestinationName header 强制抑制 JMS MQ 消息 header MQRFH2(使用 targetClient MQ URL 选项值 1)。换句话说,我们想要发送一个普通的普通 MQ 二进制消息(即只有 MQMD 消息描述符后跟有效负载)。
<camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
<camel:setHeader headerName="CamelJmsDestinationName"> <camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>
可以通过保留的 JMS 属性控制更多 MQMD 字段,如下所示。请参阅 IBM 文档中的限制。
<camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR </camel:constant></camel:setHeader>
<camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>
URI 中的目标 queue 被上面的 CamelJmsDestinationName 覆盖,因此 URI 中的 queue 名称成为占位符。
URI 选项 preserveMessageQos 是一个 - 正如观察到的那样 - 允许发送一条设置了 ReplyTo 数据的消息(以获取 MQ CoD 报告),但阻止 CAMEL 通过强制执行 InOnly MEP 来实例化回复消息侦听器.
<camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&
exchangePattern=InOnly&preserveMessageQos=true&
includeSentJMSMessageID=true" />
</camel:route>
</camel:camelContext>
我们还没有完成,我们仍然要为本机 JMS 提供程序和 Websphere MQ(通过本机 IBM WMQ JCA 资源适配器)声明我们的 queue 工厂,以根据您的上下文进行调整。 我们在这里使用管理 objects.
上的 JNDI 查找<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>
<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>
<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="jmsraQCFBean" />
</bean>
<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="mqQCFBean" />
</bean>
</beans>
从 JNDI 获取工厂(和 JCA 适配器)的替代方法是将 JMS 客户端声明为 Spring bean。在 Weblogic 和 Glassfish 中,部署本机 IBM JCA 资源适配器并创建 JNDI 资源,然后在上面的 Spring 上下文中引用,在 JBoss 中直接 MQ 客户端 bean 声明最适合如下)
<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
<property name="hostName" value="${mqHost}"/>
<property name="port" value="${mqPort}"/>
<property name="queueManager" value="${mqQueueManager}"/>
<property name="channel" value="${mqChannel}"/>
<property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
<property name="appName" value="${connectionName}"/>
</bean>
<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="mqCFBean"/>
<property name="transacted" value="true"/>
<property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>
欢迎提出意见和改进。