带有结构化流 protobuf 的 Apache Kafka
Apache Kafka with Structured Streaming protobuf
我正在尝试使用结构化流编写一个 Kafka 消费者(protobuf 的)。让我们将 protobuf 称为 A,它应该在 Scala 中反序列化为字节数组 (Array[Byte])。
我尝试了网上能找到的所有方法,但仍然无法弄清楚如何正确解析消息A
方法 1:根据集成指南 (https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html),我应该将值转换为字符串。但即使我执行 getBytes 将字符串转换为字节以解析我的消息 A,我也会得到:
Exception in thread "main" java.lang.ExceptionInInitializerError
...
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8
方法二:我想直接将值转换为字节数组。我会得到:
missing ')' at '['(line 1, pos 17)
== SQL ==
CAST(key AS Array[Byte])
方法三:我是按照(https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)自己写的protobuf的反序列化器。但是收到错误消息:
Schema for type A is not supported
以上三种方法大概是我在网上能找到的所有方法了。这应该是一个简单而常见的问题,所以如果有人有洞察力,请告诉我。
谢谢!
从流式源创建的 DataFrame
的架构是:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
所以键和值实际上在Array[Byte]
中。您将必须在 Dataframe 操作中执行反序列化。
例如,我有这个用于 Kafka 反序列化:
import sparkSession.implicits._
sparkSession.readStream
.format("kafka")
.option("subscribe", topic)
.option(
"kafka.bootstrap.servers",
bootstrapServers
)
.load()
.selectExpr("key", "value") // Selecting only key & value
.as[(Array[Byte], Array[Byte])]
.flatMap {
case (key, value) =>
for {
deserializedKey <- Try {
keyDeserializer.deserialize(topic, key)
}.toOption
deserializedValue <- Try {
valueDeserializer.deserialize(topic, value)
}.toOption
} yield (deserializedKey, deserializedValue)
}
您需要修改它以反序列化您的 protobuf 记录。
我正在尝试使用结构化流编写一个 Kafka 消费者(protobuf 的)。让我们将 protobuf 称为 A,它应该在 Scala 中反序列化为字节数组 (Array[Byte])。
我尝试了网上能找到的所有方法,但仍然无法弄清楚如何正确解析消息A
方法 1:根据集成指南 (https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html),我应该将值转换为字符串。但即使我执行 getBytes 将字符串转换为字节以解析我的消息 A,我也会得到:
Exception in thread "main" java.lang.ExceptionInInitializerError
...
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8
方法二:我想直接将值转换为字节数组。我会得到:
missing ')' at '['(line 1, pos 17)
== SQL ==
CAST(key AS Array[Byte])
方法三:我是按照(https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)自己写的protobuf的反序列化器。但是收到错误消息:
Schema for type A is not supported
以上三种方法大概是我在网上能找到的所有方法了。这应该是一个简单而常见的问题,所以如果有人有洞察力,请告诉我。
谢谢!
从流式源创建的 DataFrame
的架构是:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
所以键和值实际上在Array[Byte]
中。您将必须在 Dataframe 操作中执行反序列化。
例如,我有这个用于 Kafka 反序列化:
import sparkSession.implicits._
sparkSession.readStream
.format("kafka")
.option("subscribe", topic)
.option(
"kafka.bootstrap.servers",
bootstrapServers
)
.load()
.selectExpr("key", "value") // Selecting only key & value
.as[(Array[Byte], Array[Byte])]
.flatMap {
case (key, value) =>
for {
deserializedKey <- Try {
keyDeserializer.deserialize(topic, key)
}.toOption
deserializedValue <- Try {
valueDeserializer.deserialize(topic, value)
}.toOption
} yield (deserializedKey, deserializedValue)
}
您需要修改它以反序列化您的 protobuf 记录。