如何查看kafka headers
how to view kafka headers
我们正在使用 headers 向 Kafka 发送消息
org.apache.kafka.clients.producer.ProducerRecord
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, (Long)null, key, value, headers);
}
我怎样才能真正看到这些 headers 使用命令。 kafka-console-consumer.sh 仅向我显示有效载荷,而没有 headers。
来自 kafka-console-consumer.sh
脚本:
exec $(dirname [=10=])/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
来源:https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh
在 kafka.tools.ConsoleConsumer
中,header 被提供给格式化程序,但是 none 现有的格式化程序使用它:
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
output)
来源:https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
在上面的底部 link 你可以看到现有的格式化程序。
如果你想打印 headers 你需要实现你自己的 kafka.common.MessageFormatter
尤其是它的 write 方法:
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
然后 运行 您的控制台使用者使用 --formatter 提供您自己的格式化程序(它也应该出现在类路径中)。
另一种更简单、更快速的方法是使用 KafkaConsumer 实现您自己的 mini-program 并在调试中检查 headers。
您可以使用出色的 kafkacat 工具。
示例命令:
kafkacat -b kafka-broker:9092 -t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
示例输出:
Key (-1 bytes):
Value (13 bytes): {foo:"bar 5"}
Timestamp: 1548350164096
Partition: 0
Offset: 34
Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
due to serialization error:
kafkacat header 选项仅在 kafkacat
的最新版本中可用;如果您当前的版本不包含它,您可能想 build 自己从 master 分支。
您还可以 运行 来自 Docker 的 kafkacat:
docker run --rm edenhill/kafkacat:1.5.0 \
-b kafka-broker:9092 \
-t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
如果您使用 Docker 请记住如何访问 Kafka 代理的网络影响。
您也可以为此使用 kafkactl。例如。输出为 yaml:
kafkactl consume my-topic --print-headers -o yaml
示例输出:
partition: 1
offset: 22
headers:
key1: value1
key2: value2
value: my-value
免责声明:我是这个项目的贡献者
从 kafka-2.7.0 开始,您可以通过提供 属性 print.headers=true
在 console-consumer 中启用打印 headers
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic quickstart-events --property print.key=true --property print.headers=true --property print.timestamp=true
kcat -C -b $brokers -t $topic -f 'key: %k Headers: %h: Message value: %s\n'
我们正在使用 headers 向 Kafka 发送消息 org.apache.kafka.clients.producer.ProducerRecord
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, (Long)null, key, value, headers);
}
我怎样才能真正看到这些 headers 使用命令。 kafka-console-consumer.sh 仅向我显示有效载荷,而没有 headers。
来自 kafka-console-consumer.sh
脚本:
exec $(dirname [=10=])/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
来源:https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh
在 kafka.tools.ConsoleConsumer
中,header 被提供给格式化程序,但是 none 现有的格式化程序使用它:
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
output)
来源:https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
在上面的底部 link 你可以看到现有的格式化程序。
如果你想打印 headers 你需要实现你自己的 kafka.common.MessageFormatter
尤其是它的 write 方法:
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
然后 运行 您的控制台使用者使用 --formatter 提供您自己的格式化程序(它也应该出现在类路径中)。
另一种更简单、更快速的方法是使用 KafkaConsumer 实现您自己的 mini-program 并在调试中检查 headers。
您可以使用出色的 kafkacat 工具。
示例命令:
kafkacat -b kafka-broker:9092 -t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
示例输出:
Key (-1 bytes):
Value (13 bytes): {foo:"bar 5"}
Timestamp: 1548350164096
Partition: 0
Offset: 34
Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
due to serialization error:
kafkacat header 选项仅在 kafkacat
的最新版本中可用;如果您当前的版本不包含它,您可能想 build 自己从 master 分支。
您还可以 运行 来自 Docker 的 kafkacat:
docker run --rm edenhill/kafkacat:1.5.0 \
-b kafka-broker:9092 \
-t my_topic_name -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
如果您使用 Docker 请记住如何访问 Kafka 代理的网络影响。
您也可以为此使用 kafkactl。例如。输出为 yaml:
kafkactl consume my-topic --print-headers -o yaml
示例输出:
partition: 1
offset: 22
headers:
key1: value1
key2: value2
value: my-value
免责声明:我是这个项目的贡献者
从 kafka-2.7.0 开始,您可以通过提供 属性 print.headers=true
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic quickstart-events --property print.key=true --property print.headers=true --property print.timestamp=true
kcat -C -b $brokers -t $topic -f 'key: %k Headers: %h: Message value: %s\n'