我们如何强制 confluent kafka connect s3 sink 刷新

how can we force confluent kafka connect s3 sink to flush

我设置了 kafka connect s3 接收器,持续时间设置为 1 小时,我还设置了一个相当大的刷新计数,比如 10,000。现在如果kafka channel中的消息不多,s3 sink会尝试在内存中缓冲,等待累积到flush count,然后一起上传,commit offset给自己的consumer group。

但是想想这个情况。如果在频道中,我只发送 5,000 条消息。然后没有s3 sink flush。那么时间久了,这5000条消息最终会因为保留时间的关系,从kafka中被逐出。但是这些消息还在s3 sink的内存中,不在s3中。这是非常危险的,例如,如果我们重新启动 s3 sink 或机器 运行 s3 sink 就崩溃了。然后我们丢失了那 5,000 条消息。我们无法从 kafka 中再次找到它们,因为它已被删除。

s3 sink会出现这种情况吗?或者有一些设置会强制它在一段时间后刷新?

如果您从 Kafka 到 S3 的流没有持续的记录流,您可以使用 属性

rotate.schedule.interval.ms

以预定的时间间隔刷新记录。

请注意,在重新处理的情况下,如果使用此选项,您的下游系统应该能够处理重复项。这是因为如果连接器计划从 Kafka 重新导出记录,则基于挂钟刷新此类记录可能会导致重复出现在不同的文件中。

作为旁注,如果您使用 属性:

rotate.interval.ms

使用 Wallclock 时间戳提取器 (timestamp.extractor=Wallclock),您的记录将在不设置 rotate.schedule.interval.ms 的情况下被刷新。但这意味着您的分区程序依赖于挂钟,因此您应该能够解释重复记录。

该连接器能够通过确定性分区器在恒定的记录流上提供精确一次交付,并具有各种时间戳提取器,例如依赖于记录的时间戳 (Record) 或字段的时间戳提取器时间戳 (RecordField) 。

分区配置属性here