Kafka Spring 集成:Headers 不支持 kafka 消费者
Kafka Spring Integration: Headers not coming for kafka consumer
我正在使用 Kafka Spring 使用 kafka 发布和使用消息的集成。我看到有效负载已从生产者正确传递到消费者,但 header 信息在某处被覆盖。
@ServiceActivator(inputChannel = "fromKafka")
public void processMessage(Message<?> message) throws InterruptedException,
ExecutionException {
try {
System.out.println("Headers :" + message.getHeaders().toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
我关注 headers:
Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}
我在生产者端构建消息是这样的:
Message<FeelDBMessage> message = MessageBuilder
.withPayload(samplePayloadObj)
.setHeader(KafkaHeaders.MESSAGE_KEY, "key")
.setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();
// publish the message
publisher.publishMessage(message);
以下是生产者的 header 信息:
headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}
知道为什么 Headers 被不同的值覆盖了吗?
只是因为默认情况下 Framework 使用 immutable GenericMessage
。
对现有消息的任何操作(例如 MessageBuilder.withPayload
)都会产生一个新的 GenericMessage
实例。
从另一方面看,Kafka 不支持任何 headers
抽象,例如 JMS 或 AMQP。这就是为什么 KafkaProducerMessageHandler
在向 Kafka 发布消息时只执行此操作的原因:
this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
如您所见,它根本不发送 headers
。因此,另一方(消费者)仅处理主题中的 message
作为 payload
和一些系统选项作为 headers
,如 topic
、partition
、messageKey
.
两个词:我们不在 Kafka 上传输 headers 因为它不支持它们。
我正在使用 Kafka Spring 使用 kafka 发布和使用消息的集成。我看到有效负载已从生产者正确传递到消费者,但 header 信息在某处被覆盖。
@ServiceActivator(inputChannel = "fromKafka")
public void processMessage(Message<?> message) throws InterruptedException,
ExecutionException {
try {
System.out.println("Headers :" + message.getHeaders().toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
我关注 headers:
Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}
我在生产者端构建消息是这样的:
Message<FeelDBMessage> message = MessageBuilder
.withPayload(samplePayloadObj)
.setHeader(KafkaHeaders.MESSAGE_KEY, "key")
.setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();
// publish the message
publisher.publishMessage(message);
以下是生产者的 header 信息:
headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}
知道为什么 Headers 被不同的值覆盖了吗?
只是因为默认情况下 Framework 使用 immutable GenericMessage
。
对现有消息的任何操作(例如 MessageBuilder.withPayload
)都会产生一个新的 GenericMessage
实例。
从另一方面看,Kafka 不支持任何 headers
抽象,例如 JMS 或 AMQP。这就是为什么 KafkaProducerMessageHandler
在向 Kafka 发布消息时只执行此操作的原因:
this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
如您所见,它根本不发送 headers
。因此,另一方(消费者)仅处理主题中的 message
作为 payload
和一些系统选项作为 headers
,如 topic
、partition
、messageKey
.
两个词:我们不在 Kafka 上传输 headers 因为它不支持它们。