使 WebMQ 同步

Making WebMQ Synchronous

我试图让 WebMQ 在 MULE 中同步运行,以便将消息队列转换为内部项目的 REST api。不幸的是,我对MULE知之甚少。

经过一番努力后,我开始了解请求-回复范围并尝试将 WMQ 连接器与它一起使用,结果发现 WMQ 仅在流程结束后才发送。

我试过使用 VM 将请求放入不同的流程来触发它

但到目前为止,这只会导致它在永远不会发生的 WMQ 上永远等待。

我也试过使用 VM 来回:

但它似乎做的事情和没有一样,只是忽略了输入。

现在我知道 WMQ 的输入和输出正在工作,因为我之前设置了一个使用 http 与自身通信并在消息通过时通知它的流程。

我本质上想要的是这个,但是使用 HTTP 而不是 AJAX

我最近的,也许是最接近的尝试是这样的:

XML 是:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:http="http://www.mulesoft.org/schema/mule/http" version="EE-3.6.0" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:ajax="http://www.mulesoft.org/schema/mule/ajax" xmlns:core="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio" xmlns:test="http://www.mulesoft.org/schema/mule/test" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:wmq="http://www.mulesoft.org/schema/mule/ee/wmq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/ajax http://www.mulesoft.org/schema/mule/ajax/current/mule-ajax.xsd
http://www.mulesoft.org/schema/mule/ee/wmq http://www.mulesoft.org/schema/mule/ee/wmq/current/mule-wmq-ee.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/current/mule-stdio.xsd
http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">

    <wmq:connector channel="MAGENTO.SVRCONN" doc:name="WMQ Connector" hostName="[ip]" name="wmqConnector" port="1414" queueManager="MAGENTO" transportType="CLIENT_MQ_TCPIP" validateConnections="true" />
    <vm:connector name="VM" validateConnections="true" doc:name="VM">
        <vm:queue-profile maxOutstandingMessages="500">
            <default-persistent-queue-store/>
        </vm:queue-profile>
    </vm:connector>
    <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
    <flow  name="Input">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/mq" doc:name="HTTP"/>
        <set-payload value="#[message.inboundProperties.'http.query.params'.xml]" doc:name="Set Payload"/>
        <logger message="Entering Request-Reply Payload: #[payload]" level="INFO" doc:name="Logger"/>
        <request-reply doc:name="Request-Reply">
            <vm:outbound-endpoint exchange-pattern="one-way" path="mq" connector-ref="VM" doc:name="VM_TriggerSend" />
            <vm:inbound-endpoint exchange-pattern="request-response" path="mq-return" connector-ref="VM" doc:name="VM_Receive" />
        </request-reply>
        <logger message="Request-Reply has ended. Payload: #[payload]" level="INFO" doc:name="Logger"/>
    </flow>
    <flow name="Send_Message">
        <vm:inbound-endpoint exchange-pattern="one-way" path="mq" connector-ref="VM" doc:name="VM_Send"/>
        <logger message="(Preparing to Dispatch) Payload: #[payload]" level="INFO" doc:name="Logger"/>
        <wmq:outbound-endpoint queue="PUT_QUEUE" connector-ref="wmqConnector" doc:name="WMQ"/>
        <logger message="Message presumably being dispatched after this log" level="INFO" doc:name="Logger"/>
    </flow>
    <flow name="Receive_Message">
        <wmq:inbound-endpoint queue="GET_QUEUE" connector-ref="wmqConnector" doc:name="WMQ_Receive" />
        <logger message="Triggering Receive" level="INFO" doc:name="Logger"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="mq-return" connector-ref="VM" doc:name="VM_TriggerReceive" />
    </flow>
</mule>

几乎可以正常工作,但它会无限期挂起,而不会向 HTTP 服务器发回响应。将 Request-Reply 流中的 vm:inbound-endpoint 设置为单向可以防止它挂起,但不会发送已收到的新有效负载,而是发送先前的有效负载(就像它被跳过一样)。

如有任何帮助,我们将不胜感激!

使异步数据源同步确实很棘手。但我真的没有看到你的请求-回复策略有问题。

尝试在两端使用带有 WMQ 端点的请求-回复。然后监控队列并查看消息和回复是否按预期到达。您是否对请求和回复使用相同的队列?最简单的方案是使用不同的方案,这对您的用例可行吗?

如果我理解正确的话,您只是想公开对 queue 的请求和对众所周知的 queue 的响应作为 REST api.

那里不需要 VM queue,作为 JMS 传输的 request-reply 元素也无法模拟请求响应交换模式。

JMS queues 只是一种方式。您必须在不同的 queue 上回复。 request-response 场景背后的逻辑是 outbound-endpoint 具有出站 属性:MULE_REPLYTO。这个 属性 应该设置为你对 queue 的众所周知的回复的名称:GET_QUEUE(如果没有提供这样的 header,将创建一个临时的 queue ).

MULE_REPLY 到 header 将成为标准 JMSReplyTo 当在服务上收到消息时。服务 必须尊重 这个 header 将回复发送回那个 queue。如果服务是在 Mule 上实现的,这将自动发生,否则它可能不会发生。你应该仔细检查它是否受到尊重。

如果你想使用一个 request-reply 作用域到 one-way 个端点而不是一个 request-reponse 端点,没关系,应该工作相同但代码更多。