如何在 apache kafka 连接器中实现 exactly once 语义

how to achieve exactly once semantics in apache kafka connector

我使用的是 flink 1.8.0 版本。我的应用程序从 kafka 读取数据 -> 转换 -> 发布到 Kafka。为了避免在重启期间出现任何重复,我想使用具有 Exactly once 语义的 kafka 生产者,请在此处阅读:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-011-and-newer

我的kafka版本是1.1。

        return new FlinkKafkaProducer<String>( topic,  new KeyedSerializationSchema<String>() {


            public byte[] serializeKey(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public byte[] serializeValue(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public String getTargetTopic(String element) {
                // TODO Auto-generated method stub
                return topic;
            }
        },prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);

检查点代码:

    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setCheckpointTimeout(15 * 1000 );
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.enableCheckpointing(5000 );

如果我在 kafka producer 中添加了 exactly once sematics,我的 flink consumer 不会读取任何新数据。

任何人都可以与 Exactly once Semantics 分享任何示例 code/application 吗?

请在此处找到完整代码:

https://github.com/sris2/sample_flink_exactly_once

谢谢

Can any one please share any sample code/application with Exactly once Semantics ?

end-to-end test in flink中隐藏了一个恰好一次的例子。由于它使用了一些方便的功能,如果不检查整个 repo 可能很难理解。

If I add exactly once sematics in kafka producer , my flink consumer is not reading any new data. [...] Please find complete code here :

https://github.com/sris2/sample_flink_exactly_once

我检查了你的代码并发现了问题(必须修复整个 setup/code 才能真正得到它 运行)。接收器实际上无法正确配置事务。如 Flink Kafka connector documentation 中所写,您需要将 Kafka 代理中的 transaction.timeout.ms 调整为最多 1 小时或在您的应用程序中将其调整为 15 分钟:

    prop.setProperty("transaction.timeout.ms", "900000");

相应的摘录是:

Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. This property will not allow to set transaction timeouts for the producers larger than it’s value. FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.