如何在达到特定大小(128 Mb)时将 Kafka 消息提交到 HDFS 接收器
How to commit Kafka messages to HDFS sink on reaching a specific size (128 Mb)
我的配置:
Confluent (5.0.0) Kafka 生成一些 avro 消息。
Connect worker(HDFS 连接器接收器)将这些消息以 Parquet 格式流式传输到 HDFS 节点。我将 connect worker 配置为每 5000 条消息 (flush.size=5000
) 将消息提交到 HDFS。此配置工作正常。
我的问题:是否有任何解决方法可以在刚好达到 128 Mb(或 256 Mb)时提交消息,而不是消息数?
我的 HDFS 连接器配置文件:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=some_topic
hdfs.url=hdfs://hdfshost:8020/user/someuser/kafka_hdfs_sink/
flush.size=5000
没有这样的配置 - see this open issue
解决方法是了解主题的每条消息平均有多大(在单个 Kafka 分区中,因为这就是文件的写入方式),然后相应地设置 flush.size
以大约达到HDFS 块大小的因素。
如果您使用 TimeBasedPartioner,则您必须知道消息的数量或消息达到目标大小的时间。
我的配置:
Confluent (5.0.0) Kafka 生成一些 avro 消息。
Connect worker(HDFS 连接器接收器)将这些消息以 Parquet 格式流式传输到 HDFS 节点。我将 connect worker 配置为每 5000 条消息 (flush.size=5000
) 将消息提交到 HDFS。此配置工作正常。
我的问题:是否有任何解决方法可以在刚好达到 128 Mb(或 256 Mb)时提交消息,而不是消息数?
我的 HDFS 连接器配置文件:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=some_topic
hdfs.url=hdfs://hdfshost:8020/user/someuser/kafka_hdfs_sink/
flush.size=5000
没有这样的配置 - see this open issue
解决方法是了解主题的每条消息平均有多大(在单个 Kafka 分区中,因为这就是文件的写入方式),然后相应地设置 flush.size
以大约达到HDFS 块大小的因素。
如果您使用 TimeBasedPartioner,则您必须知道消息的数量或消息达到目标大小的时间。