从 Kafka 主题反序列化 FlinkKafkaProducer 的事件导致 JSON 条记录为空

Deserializing FlinkKafkaProducer's events from Kafka topic results in empty JSON records

编辑 我准备了最小的可复制示例 https://github.com/kazuhirokomoda/flink-kafka-druid

我有一个数据管道和代码,与我之前的问题中提到的完全相同(除非在此 post 底部另有说明): 基本上我使用的是 Jackson 的 ObjectMapper 在自定义 KafkaSerializationSchema(即 ExampleDataSerializationSchema)中序列化事件。

出于测试目的,我在裸机服务器上准备了 Kafka 2.6,遵循 https://kafka.apache.org/quickstart

我使用 kafka-console-consumer.sh 检查从 Flink 作业发送到 Kafka 主题的内容,但我只看到空的 JSON 记录,而我期待的是非空记录。

bin/kafka-console-consumer.sh --topic <topic> --from-beginning --bootstrap-server localhost:9092
...
{}
{}
{}
^CProcessed a total of 287 messages

备注

bin/kafka-console-consumer.sh --topic <topic> --from-beginning --bootstrap-server localhost:9092
...
fixed string to send to kafka
fixed string to send to kafka
fixed string to send to kafka
^CProcessed a total of 287 messages

问题

再次感谢您的帮助。

代码

Flink -> Kafka

object KafkaSink {
  def sendToKafka(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[ExampleData] = {
    // defined in my previous question
  }

  // just for testing
  def sendToKafkaString(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[String] = {
    val topic: String = ...
    val properties: Properties = ... 

    val producer = new FlinkKafkaProducer[String](
      topic,
      new ExampleDataStringSerializationSchema(topic),
      properties,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

    val exampleDataStreamString = exampleDataStream.map(_ => "fixed string to send to kafka")
    exampleDataStreamString.addSink(producer)
  }
}
import java.nio.charset.StandardCharsets

class ExampleDataStringSerializationSchema(topic: String) extends KafkaSerializationSchema[String]{
  override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    new ProducerRecord[Array[Byte], Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8))
  }
}

ObjectMapper 包装器解决方法对我有用。

https://github.com/kazuhirokomoda/flink-kafka-druid/pull/1

谢谢onecricketeer