Kafka S3 Connector Once Delivery Guarantee 如何工作
How does Kafka S3 Connector Once Delivery Guarantee Work
我已经阅读了他们的博客并了解了他们的例子。
https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
但我正在努力解决我遇到的这种情况。我当前的配置是:
"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"
根据我阅读的有关配置的内容。连接器将提交 50
条记录的文件或 300000ms
(5 分钟)后的文件,以先到者为准。如果连接器将文件上传到 s3 但未能提交给 Kafka,那么 Kafka 将如何重新上传将覆盖 s3 文件的相同记录,因为我设置了轮换计划间隔?这不会导致 s3 中出现重复吗?
S3 sink connector's documentation 是另一个很好的资源,它描述了连接器如何保证准确地一次交付到 S3,更重要的是哪些功能组合提供(或不提供)这种保证。
具体来说,该文档中的一个部分说:
To guarantee exactly-once semantics with the TimeBasedPartitioner
, the connector must be configured to use a deterministic implementation of TimestampExtractor
and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (timestamp.extractor=Record
) or record fields (timestamp.extractor=RecordField
). The deterministic rotation strategy configuration is rotate.interval.ms
(setting rotate.schedule.interval.ms
is nondeterministic and will invalidate exactly-once guarantees).
您的 S3 接收器连接器配置确实使用确定性分区器(通过 "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner"),但它使用非确定性 Wallclock 时间戳提取器(通过 "timestamp.extractor": "Wallclock"
)。它是不确定的,因为如果连接器确实必须重新启动(例如,由于故障)并且重新处理特定记录,它将在稍后重新处理该记录并且挂钟时间戳提取器将为该记录选择不同的时间。
其次,您的连接器使用 rotate.schedule.interval.ms
选项,文档指出该选项与 exactly once delivery 不兼容。例如,如果连接器确实必须重新处理一系列 Kafka 记录,它可能会将记录分解为与第一次不同的 S3 对象,这意味着 S3 连接器最终会写入不同的 S3 对象。
总而言之,具有您的配置的 S3 接收器连接器将无法提供一次交付保证。
使用 rotate.interval.ms 和 timestamp.extractor 设置为录制
此外,确保您正在阅读的主题的时间戳类型设置为 "LOG_APPEND_TIME"
我不确定工作者配置是否应该将消费者隔离级别 属性 设置为已提交读取。取决于 S3 连接器是否自动执行此操作。
即便如此,如果在 broker 集群的领导者选举期间时间戳不是单调递增,也会出现问题。注意this问题
的状态
我已经阅读了他们的博客并了解了他们的例子。 https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
但我正在努力解决我遇到的这种情况。我当前的配置是:
"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"
根据我阅读的有关配置的内容。连接器将提交 50
条记录的文件或 300000ms
(5 分钟)后的文件,以先到者为准。如果连接器将文件上传到 s3 但未能提交给 Kafka,那么 Kafka 将如何重新上传将覆盖 s3 文件的相同记录,因为我设置了轮换计划间隔?这不会导致 s3 中出现重复吗?
S3 sink connector's documentation 是另一个很好的资源,它描述了连接器如何保证准确地一次交付到 S3,更重要的是哪些功能组合提供(或不提供)这种保证。
具体来说,该文档中的一个部分说:
To guarantee exactly-once semantics with the
TimeBasedPartitioner
, the connector must be configured to use a deterministic implementation ofTimestampExtractor
and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (timestamp.extractor=Record
) or record fields (timestamp.extractor=RecordField
). The deterministic rotation strategy configuration isrotate.interval.ms
(settingrotate.schedule.interval.ms
is nondeterministic and will invalidate exactly-once guarantees).
您的 S3 接收器连接器配置确实使用确定性分区器(通过 "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner"),但它使用非确定性 Wallclock 时间戳提取器(通过 "timestamp.extractor": "Wallclock"
)。它是不确定的,因为如果连接器确实必须重新启动(例如,由于故障)并且重新处理特定记录,它将在稍后重新处理该记录并且挂钟时间戳提取器将为该记录选择不同的时间。
其次,您的连接器使用 rotate.schedule.interval.ms
选项,文档指出该选项与 exactly once delivery 不兼容。例如,如果连接器确实必须重新处理一系列 Kafka 记录,它可能会将记录分解为与第一次不同的 S3 对象,这意味着 S3 连接器最终会写入不同的 S3 对象。
总而言之,具有您的配置的 S3 接收器连接器将无法提供一次交付保证。
使用 rotate.interval.ms 和 timestamp.extractor 设置为录制 此外,确保您正在阅读的主题的时间戳类型设置为 "LOG_APPEND_TIME"
我不确定工作者配置是否应该将消费者隔离级别 属性 设置为已提交读取。取决于 S3 连接器是否自动执行此操作。
即便如此,如果在 broker 集群的领导者选举期间时间戳不是单调递增,也会出现问题。注意this问题
的状态