FlinkKafkaProducer 扩展 KafkaSerializationSchema 的 Scala String Sink

Scala String Sink for FlinkKafkaProducer extending KafkaSerializationSchema

[Flink 版本 -- 1.9]

所以我试图将一个字符串(JSON 格式)放入一个 kafka 主题中,但我对如何实现 KafkaSerializationSchema 来接收一个字符串有点困惑。 SimpleStringSchema 似乎不会与 FlinkKafkaProducer 一起工作,因为它需要 KafkaSerializationSchema。

如果已经有一些代码是像 SimpleStringSchema for kafka 这样的实用程序,我会更喜欢它,但是如果我必须自己编写,谁能解释为什么我从另一个 Whosebug 转换的 scala 代码 post在 java 中,基本上做同样的事情不会覆盖任何东西?

  def defineKafkaDataSink(topic: String,
                          kafkaBootstrapServer: String = "localhost:9092"):FlinkKafkaProducer[String] = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
    new FlinkKafkaProducer[String](topic,new ProducerStringSerializationSchema(topic),properties,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
  }


  // Not sure why serialize doesnt override anything
  // working from a java stack overflow post
  // 
  import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
  import org.apache.kafka.clients.producer.ProducerRecord
  import java.nio.charset.StandardCharsets

  class ProducerStringSerializationSchema(var topic: String) extends KafkaSerializationSchema[String] {
    override def serialize(element: String, timestamp: Long) = new ProducerRecord[Array[Byte], Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8))
  }

这实际上更像是一个 scala 问题,而不是一个 flink 问题。确保类型与各自的 java 类型完全一致。 scala 有提供非常相似的类型的习惯,这些类型与 java 类型不同。

override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]]

特别要仔细检查 Long 确实是 java Long 而不是 scala Long。

如果您将来遇到类似的问题,让您的 IDE 生成方法存根通常会更容易,因为它将 "know" 正确的签名。因此,只需注释掉您的方法,让您的 IDE 抱怨您实际上有未实现的接口。那么您的 IDE 希望建议为您添加它们。