使用出站通道适配器时生成回复

Generating a reply when using an outbound channel adapter

我有以下简化的 spring 集成流程:

int-ws:inbound-gateway ----> int:transformer ----> int-kafka:outbound-channel-adapter

基本上:

  1. Web 服务端点使用 int-ws:inbound-gateway
  2. 公开
  3. 来自此端点的消息被放入 input 通道(第一个 --->
  4. 自定义转换器转换有效载荷 JSON 格式并添加 MESSAGE_KEY header(kafka 需要)
  5. 消息被放置到 inputToKafka 频道(第二个 --->
  6. int-kafka:outbound-channel-adapter将消息推送到kafka主题

Web 服务操作有一个请求和一个响应负载。
请求负载是我要转换成 JSON 消息的内容。
一旦 int-kafka:outbound-channel-adapter

将消息放在 kafka 主题上,我想 return 一个响应负载(将被编组等)

我该怎么做?

目前,当我调用 Web 服务时,一切都按预期工作,但我必须在 int-ws:inbound-gateway 上设置一个 reply-timeout,这样它就不会挂起。当我这样做时,我只是在 SOAPUI 上得到一个空响应。

我理解 Gateway behavior when no response arrives 部分中的概念 - 但就我而言,我 确实想要生成响应

这是我的集成上下文(没有 kafka 代理配置等):

<int-ws:inbound-gateway id="ws-inbound-gateway" request-channel="input"
                        marshaller="marshaller" unmarshaller="marshaller" reply-timeout="100"/>

<int:channel id="input"/>

<int:transformer input-channel="input" output-channel="inputToKafka" method="transform">
    <bean class="com.test.InputToJSONTransformer"/>
</int:transformer>

<int:channel id="inputToKafka"/>

<int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="true"
                                    channel="inputToKafka"
                                    order="1">
</int-kafka:outbound-channel-adapter>

你期望从 Kafka 出站通道适配器得到什么样的响应,记住它是 Adapter 而不是 Gateway

但是您可以引入另一个组件,ServiceActivator 使用输入通道 sendToInputToKafka 现在您的转换器将输出到该通道。

您的 'ServiceActivator' 应该有一个 @Autowire MessageChannel inputToKafka 并且应该以编程方式手动向该频道发送消息。 发送该消息后,您将构建所需的响应作为 ServiceActivator 的 return 类型和对 ws gateway

的响应

改变

<int:channel id="inputToKafka"/>

<int:publish-subscribe-channel id="inputToKafka"/>

向频道添加第二个订阅者。

<service-activator input-channel="inputToKafka" ... order="2" />

服务生成响应的地方;它将在成功发送到 kafka 后被调用。

不包含 output-channel;该框架将负责将服务输出路由回 ws 网关。