如何使用 Java Spark 结构化流从 Kafka 主题正确消费

How to consume correctly from Kafka topic with Java Spark structured streaming

我是 kafka-spark streaming 的新手,正在尝试使用 Protocol buffer serializer/deserializer 实施 spark 文档中的示例。到目前为止,我遵循了

上的官方教程

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html https://developers.google.com/protocol-buffers/docs/javatutorial

现在我继续解决以下问题。这个问题可能与 post How to deserialize records from Kafka using Structured Streaming in Java?

类似

我已经成功实现了在 kafka 主题上写入消息的序列化程序。现在的任务是使用带有自定义反序列化器的 spark 结构化流来使用它。

public class CustomDeserializer implements Deserializer<Person> {

@Override
public Person deserialize(String topic, byte[] data) {
    Person person = null;
    try {
        person = Person.parseFrom(data);

        return person;
    } catch (Exception e) {
               //ToDo
    }

    return null;
 }


Dataset<Row> dataset = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", topic)
        .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        .option("value.deserializer", "de.myproject.CustomDeserializer")
        .load()
        .select("value");

    dataset.writeStream()
        .format("console")
        .start()
        .awaitTermination();

但是作为输出,我仍然得到了二进制文件。

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[08 AC BD BB 09 1...|
+--------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[08 82 EF D8 08 1...|
+--------------------+

关于教程,我只需要将 value.deserializer 的选项设置为具有人类可读格式

.option("value.deserializer", "de.myproject.CustomDeserializer")

我是不是漏掉了什么?

您是否错过了文档的这一部分?

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

  • key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
  • value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

您必须注册一个调用反序列化器的 UDF

类似于Read protobuf kafka message using spark structured streaming

您需要将 byte 转换为 String 数据类型。 dataset.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

然后你可以使用函数from_json(dataset.col("value"), StructType)来取回实际的DF。

快乐编码:)