将 Spark-kafka InputDStream 转换为 Array[Bytes]
Converting Spark-kafka InputDStream to Array[Bytes]
我正在使用 scala 并使用以下 Spark Streaming 方法使用来自 Kafka 的数据:
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
上面的变量 returns InputDStream 通过它我可以使用下面的代码查看 raw/binary 格式的数据:
println(行)
但我需要在 raw/binary 格式上应用 avro 格式(可用架构)才能以预期的 json 格式查看数据。为了应用avro格式,我需要将上面的InputDStream转换为avro使用的Array[Bytes]。
有人可以告诉我将 InputDStream 转换为 Array[Bytes] 吗?
或
如果您知道在 InputDStream(of spark Streaming)上应用 avro 模式的更好方法,请分享。
你需要做两件事。第一种是对 Kafka 使用 DefaultDecoder
,它为值类型提供 Array[Byte]
:
val lines: DStream[(String, Array[Byte])] =
KafkaUtils
.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
然后您需要通过额外的 map
:
应用您的 Avro 反序列化逻辑
lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }
其中 avroDeserializer
是您的任意 class,它知道如何从 Avro 字节创建您的类型。
我个人使用 avro4s 通过宏获取 case class 反序列化。
我正在使用 scala 并使用以下 Spark Streaming 方法使用来自 Kafka 的数据:
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
上面的变量 returns InputDStream 通过它我可以使用下面的代码查看 raw/binary 格式的数据: println(行)
但我需要在 raw/binary 格式上应用 avro 格式(可用架构)才能以预期的 json 格式查看数据。为了应用avro格式,我需要将上面的InputDStream转换为avro使用的Array[Bytes]。
有人可以告诉我将 InputDStream 转换为 Array[Bytes] 吗?
或
如果您知道在 InputDStream(of spark Streaming)上应用 avro 模式的更好方法,请分享。
你需要做两件事。第一种是对 Kafka 使用 DefaultDecoder
,它为值类型提供 Array[Byte]
:
val lines: DStream[(String, Array[Byte])] =
KafkaUtils
.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
然后您需要通过额外的 map
:
lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }
其中 avroDeserializer
是您的任意 class,它知道如何从 Avro 字节创建您的类型。
我个人使用 avro4s 通过宏获取 case class 反序列化。