已弃用的 FlinkKafkaProducer 的继任者
Successor to deprecated FlinkKafkaProducer
谁能告诉我在新版本的 Flink 中 FlinkKafkaProducer 使用什么,因为它已被弃用?
我有一些这样的代码:
FlinkKafkaProducer<String> myKafkaProducer = new FlinkKafkaProducer<String>(
"kafka-producer", // target topic
new SimpleStringSchema(), // serialization schema
properties);
我如何替换已弃用的方法?谢谢!
FlinkKafkaProducer 已被弃用
Flink推荐在最新版本
中使用新的source和sinkapi
参考https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/
DataStream<String> stream = ...
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
Flink 1.14 发行说明包含有关 KafkaSink
的详细信息,它是 FlinkKafkaProducer
的后续版本。这包括迁移指南。您可以在 https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143
找到它
谁能告诉我在新版本的 Flink 中 FlinkKafkaProducer 使用什么,因为它已被弃用?
我有一些这样的代码:
FlinkKafkaProducer<String> myKafkaProducer = new FlinkKafkaProducer<String>(
"kafka-producer", // target topic
new SimpleStringSchema(), // serialization schema
properties);
我如何替换已弃用的方法?谢谢!
FlinkKafkaProducer 已被弃用
Flink推荐在最新版本
中使用新的source和sinkapi参考https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/
DataStream<String> stream = ...
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
Flink 1.14 发行说明包含有关 KafkaSink
的详细信息,它是 FlinkKafkaProducer
的后续版本。这包括迁移指南。您可以在 https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143