Spring 集成 kafka 入站适配器聚合传入消息
Spring integration kafka inbound adapter aggregating incoming messages
我有一个 int-kafka:outbound-channel-adapter
,我用它向 kafka 发送消息,然后使用 int-kafka:inbound-channel-adapter
接收消息。通信似乎工作正常,我可以发送和接收消息,但格式有点奇怪。我将单独的消息单独发送到我的出站适配器,但是当我收到消息时,我收到一条消息,所有消息都聚合到该消息的有效负载中。
这是我收到消息时消息负载的样子
[payload={mytopic={0=[string message 1, string message 2, string message 3, string message 4, string message 5, ...........]}}, headers={id=3934de02-1f42-ab90-6aa5-9c15f3cd0b6e, timestamp=1439260669762}]
接收集成流程如下所示
<int-kafka:inbound-channel-adapter
id="kafkaInboundAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="inputFromKafka">
<int:poller fixed-delay="10" time-unit="MILLISECONDS"
max-messages-per-poll="5" />
</int-kafka:inbound-channel-adapter>
<int:channel id="inputFromKafka" />
<int:service-activator id="kakfaMessageHandler"
input-channel="inputFromKafka">
<bean class="com...broker.MessageHandler"></bean>
</int:service-activator>
为什么我收到的所有消息都汇总在一个 spring 集成消息中,而不是在发送到 kafka 时收到单独的消息。
KafkaHighLevelConsumerMessageSource
的设计和其他很多polling一样MessageSource<?>
:通过一个poll获取数据,return它作为一个List<?>
。
在这种情况下,我们从 Kafka stream
读取了这个结果:
Message<Map<String, Map<Integer, List<Object>>>>
其中 payload
是 Kafka topic
的 Map
和那里的 partition
和 message
的映射。
如果您在 consumerContext
上只使用一个 topic
,您可以简单地将顶级 Map
转换为它的 partition
s 地图。或者甚至直接转换为 payload
s 列表,如果那里只有一个 partition
。最后你可以得到 splitter
.
如果您希望在主题出现时尽快收到来自该主题的消息,您应该查看 <int-kafka:message-driven-channel-adapter>
。
我有一个 int-kafka:outbound-channel-adapter
,我用它向 kafka 发送消息,然后使用 int-kafka:inbound-channel-adapter
接收消息。通信似乎工作正常,我可以发送和接收消息,但格式有点奇怪。我将单独的消息单独发送到我的出站适配器,但是当我收到消息时,我收到一条消息,所有消息都聚合到该消息的有效负载中。
这是我收到消息时消息负载的样子
[payload={mytopic={0=[string message 1, string message 2, string message 3, string message 4, string message 5, ...........]}}, headers={id=3934de02-1f42-ab90-6aa5-9c15f3cd0b6e, timestamp=1439260669762}]
接收集成流程如下所示
<int-kafka:inbound-channel-adapter
id="kafkaInboundAdapter" kafka-consumer-context-ref="consumerContext"
auto-startup="true" channel="inputFromKafka">
<int:poller fixed-delay="10" time-unit="MILLISECONDS"
max-messages-per-poll="5" />
</int-kafka:inbound-channel-adapter>
<int:channel id="inputFromKafka" />
<int:service-activator id="kakfaMessageHandler"
input-channel="inputFromKafka">
<bean class="com...broker.MessageHandler"></bean>
</int:service-activator>
为什么我收到的所有消息都汇总在一个 spring 集成消息中,而不是在发送到 kafka 时收到单独的消息。
KafkaHighLevelConsumerMessageSource
的设计和其他很多polling一样MessageSource<?>
:通过一个poll获取数据,return它作为一个List<?>
。
在这种情况下,我们从 Kafka stream
读取了这个结果:
Message<Map<String, Map<Integer, List<Object>>>>
其中 payload
是 Kafka topic
的 Map
和那里的 partition
和 message
的映射。
如果您在 consumerContext
上只使用一个 topic
,您可以简单地将顶级 Map
转换为它的 partition
s 地图。或者甚至直接转换为 payload
s 列表,如果那里只有一个 partition
。最后你可以得到 splitter
.
如果您希望在主题出现时尽快收到来自该主题的消息,您应该查看 <int-kafka:message-driven-channel-adapter>
。