Spring 集成 kafka 入站适配器聚合传入消息

Spring integration kafka inbound adapter aggregating incoming messages

我有一个 int-kafka:outbound-channel-adapter,我用它向 kafka 发送消息,然后使用 int-kafka:inbound-channel-adapter 接收消息。通信似乎工作正常,我可以发送和接收消息,但格式有点奇怪。我将单独的消息单独发送到我的出站适配器,但是当我收到消息时,我收到一条消息,所有消息都聚合到该消息的有效负载中。

这是我收到消息时消息负载的样子

[payload={mytopic={0=[string message 1, string message 2, string message 3, string message 4, string message 5, ...........]}}, headers={id=3934de02-1f42-ab90-6aa5-9c15f3cd0b6e, timestamp=1439260669762}]

接收集成流程如下所示

<int-kafka:inbound-channel-adapter
    id="kafkaInboundAdapter" kafka-consumer-context-ref="consumerContext"
    auto-startup="true" channel="inputFromKafka">
    <int:poller fixed-delay="10" time-unit="MILLISECONDS"
        max-messages-per-poll="5" />
</int-kafka:inbound-channel-adapter>

<int:channel id="inputFromKafka" />

<int:service-activator id="kakfaMessageHandler"
    input-channel="inputFromKafka">
    <bean class="com...broker.MessageHandler"></bean>
</int:service-activator>

为什么我收到的所有消息都汇总在一个 spring 集成消息中,而不是在发送到 kafka 时收到单独的消息。

KafkaHighLevelConsumerMessageSource的设计和其他很多polling一样MessageSource<?>:通过一个poll获取数据,return它作为一个List<?>

在这种情况下,我们从 Kafka stream 读取了这个结果:

Message<Map<String, Map<Integer, List<Object>>>>

其中 payload 是 Kafka topicMap 和那里的 partitionmessage 的映射。

如果您在 consumerContext 上只使用一个 topic,您可以简单地将顶级 Map 转换为它的 partitions 地图。或者甚至直接转换为 payloads 列表,如果那里只有一个 partition。最后你可以得到 splitter.

如果您希望在主题出现时尽快收到来自该主题的消息,您应该查看 <int-kafka:message-driven-channel-adapter>