如何在达到特定大小(128 Mb)时将 Kafka 消息提交到 HDFS 接收器

How to commit Kafka messages to HDFS sink on reaching a specific size (128 Mb)

我的配置: Confluent (5.0.0) Kafka 生成一些 avro 消息。 Connect worker(HDFS 连接器接收器)将这些消息以 Parquet 格式流式传输到 HDFS 节点。我将 connect worker 配置为每 5000 条消息 (flush.size=5000) 将消息提交到 HDFS。此配置工作正常。

我的问题:是否有任何解决方法可以在刚好达到 128 Mb(或 256 Mb)时提交消息,而不是消息数?

我的 HDFS 连接器配置文件:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=some_topic
hdfs.url=hdfs://hdfshost:8020/user/someuser/kafka_hdfs_sink/
flush.size=5000

没有这样的配置 - see this open issue

解决方法是了解主题的每条消息平均有多大(在单个 Kafka 分区中,因为这就是文件的写入方式),然后相应地设置 flush.size 以大约达到HDFS 块大小的因素。

如果您使用 TimeBasedPartioner,则您必须知道消息的数量或消息达到目标大小的时间。