已弃用的 FlinkKafkaProducer 的继任者

Successor to deprecated FlinkKafkaProducer

谁能告诉我在新版本的 Flink 中 FlinkKafkaProducer 使用什么,因为它已被弃用?

我有一些这样的代码:

FlinkKafkaProducer<String> myKafkaProducer = new FlinkKafkaProducer<String>(
        "kafka-producer",            // target topic
        new SimpleStringSchema(),    // serialization schema
        properties);

我如何替换已弃用的方法?谢谢!

FlinkKafkaProducer 已被弃用

Flink推荐在最新版本

中使用新的source和sinkapi

参考https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/

DataStream<String> stream = ...
        
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);

Flink 1.14 发行说明包含有关 KafkaSink 的详细信息,它是 FlinkKafkaProducer 的后续版本。这包括迁移指南。您可以在 https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143

找到它