如何使用 Spark Streaming 从 Kafka 读取二进制序列化的 Avro(Confluent Platform)
How do I read binary serialized Avro (Confluent Platform) from Kafka using Spark Streaming
这些是使用 Confluent 平台连载的 Avros。
我想找到一个像这样的工作示例:
但对于 Spark 结构化流。
kafka
.select("value")
.map { row =>
// this gives me test == testRehydrated
val test = Foo("bar")
val testBytes = AvroWriter[Foo].toBytes(test)
val testRehydrated = AvroReader[Foo].fromBytes(testBytes)
// this yields mangled Foo data
val bytes = row.getAs[Array[Byte]]("value")
val rehydrated = AvroReader[Foo].fromBytes(bytes)
我发现如果你想阅读他们的东西,你必须使用 Confluent 平台解码器。
def decoder: io.confluent.kafka.serializers.KafkaAvroDecoder = {
val props = new Properties()
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
val vProps = new kafka.utils.VerifiableProperties(props)
new io.confluent.kafka.serializers.KafkaAvroDecoder(vProps)
}
我们一直在开发这个库,它可能会有所帮助:ABRiS (Avro Bridge for Spark)
它提供用于在读取和写入操作(流式处理和批处理)中将 Spark 集成到 Avro 的 API。它还支持 Confluent Kafka 并与 Schema Registry 集成。
免责声明:我为 ABSA 工作,我是这个库背后的主要开发人员。
这些是使用 Confluent 平台连载的 Avros。
我想找到一个像这样的工作示例:
但对于 Spark 结构化流。
kafka
.select("value")
.map { row =>
// this gives me test == testRehydrated
val test = Foo("bar")
val testBytes = AvroWriter[Foo].toBytes(test)
val testRehydrated = AvroReader[Foo].fromBytes(testBytes)
// this yields mangled Foo data
val bytes = row.getAs[Array[Byte]]("value")
val rehydrated = AvroReader[Foo].fromBytes(bytes)
我发现如果你想阅读他们的东西,你必须使用 Confluent 平台解码器。
def decoder: io.confluent.kafka.serializers.KafkaAvroDecoder = {
val props = new Properties()
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl())
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
val vProps = new kafka.utils.VerifiableProperties(props)
new io.confluent.kafka.serializers.KafkaAvroDecoder(vProps)
}
我们一直在开发这个库,它可能会有所帮助:ABRiS (Avro Bridge for Spark)
它提供用于在读取和写入操作(流式处理和批处理)中将 Spark 集成到 Avro 的 API。它还支持 Confluent Kafka 并与 Schema Registry 集成。
免责声明:我为 ABSA 工作,我是这个库背后的主要开发人员。