将 Spark-kafka InputDStream 转换为 Array[Bytes]

Converting Spark-kafka InputDStream to Array[Bytes]

我正在使用 scala 并使用以下 Spark Streaming 方法使用来自 Kafka 的数据:

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

上面的变量 returns InputDStream 通过它我可以使用下面的代码查看 raw/binary 格式的数据: println(行)

但我需要在 raw/binary 格式上应用 avro 格式(可用架构)才能以预期的 json 格式查看数据。为了应用avro格式,我需要将上面的InputDStream转换为avro使用的Array[Bytes]。

有人可以告诉我将 InputDStream 转换为 Array[Bytes] 吗?

如果您知道在 InputDStream(of spark Streaming)上应用 avro 模式的更好方法,请分享。

你需要做两件事。第一种是对 Kafka 使用 DefaultDecoder,它为值类型提供 Array[Byte]

val lines: DStream[(String, Array[Byte])] = 
  KafkaUtils
   .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

然后您需要通过额外的 map:

应用您的 Avro 反序列化逻辑
lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }

其中 avroDeserializer 是您的任意 class,它知道如何从 Avro 字节创建您的类型。

我个人使用 avro4s 通过宏获取 case class 反序列化。