无法转换 Kafka Json Spark 结构化流中的数据
Can't Tranform Kafka Json Data in Spark Structured Streaming
我正在尝试获取 Kafka 消息并使用 Spark 独立处理它。 Kafka 将数据存储为 json 格式。我可以获取 Kafka 消息,但无法使用定义模式解析 json 数据。
当我运行bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_kafka_topic --from-beginning
命令查看kafka主题中的kafka消息时,输出如下:
"{\"timestamp\":1553792312117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":21,\"q\":true,\"t\":1553792311686}]}"
"{\"timestamp\":1553792317117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":22,\"q\":true,\"t\":1553792316688}]}"
而且我可以在 Spark 中使用此代码块成功获取此数据:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(col("value").cast("string"))
架构是这样的:
df.printSchema()
root
|-- value: string (nullable = true)
然后将此数据帧写入控制台并打印 kafka 消息:
Batch: 9
-------------------------------------------
+--------------------+
| value|
+--------------------+
|"{\"timestamp\":1...|
+--------------------+
但我想解析 json 数据来定义模式和我尝试这样做的代码块:
schema = StructType([ StructField("timestamp", LongType(), False), StructField("values", ArrayType( StructType([ StructField("id", StringType(), True), StructField("v", IntegerType(), False), StructField("q", BooleanType(), False), StructField("t", LongType(), False) ]), True ), True) ])
parsed = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("opc"))
以及 parsed
数据框的架构:
parsed.printSchema()
root
|-- opc: struct (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- values: struct (nullable = true)
| | |-- id: string (nullable = true)
| | |-- v: integer (nullable = true)
| | |-- q: boolean (nullable = true)
| | |-- t: string (nullable = true)
这些代码块运行没有错误。但是当我想将 parsed
数据帧写入控制台时:
query = parsed\
.writeStream\
.format("console")\
.start()
query.awaitTermination()
它在控制台中这样写null
:
+----+
| opc|
+----+
|null|
+----+
所以,似乎解析 json 数据有问题,但无法弄清楚。
你能告诉我哪里出了问题吗?
似乎架构不适合您的情况,请尝试应用下一个:
schema = StructType([
StructField("timestamp", LongType(), False),
StructField("values", ArrayType(
StructType([StructField("id", StringType(), True),
StructField("v", IntegerType(), False),
StructField("q", BooleanType(), False),
StructField("t", LongType(), False)]), True), True)])
另请记住,inferSchema
选项效果很好,因此您可以让 Spark 发现架构并保存它。
另一个问题是您的 json 数据有前导和尾随双引号 "
它还包含 \
那些无效的 JSON 阻止了 Spark 解析消息。
为了删除无效字符,您的代码应如下修改:
parsed = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.withColumn("value", regexp_replace(col("value").cast("string"), "\\", "")) \
.withColumn("value", regexp_replace(col("value"), "^\"|\"$", "")) \
.select(from_json(col("value"), schema).alias("opc"))
现在你的输出应该是:
+------------------------------------------------------------------------------------------------------------------+
|value |
+------------------------------------------------------------------------------------------------------------------+
|{"timestamp":1553588718638,"values":[{"id":"Simulation.Simulator.Temperature","v":26,"q":true,"t":1553588717036}]}|
+------------------------------------------------------------------------------------------------------------------+
祝你好运!
我正在尝试获取 Kafka 消息并使用 Spark 独立处理它。 Kafka 将数据存储为 json 格式。我可以获取 Kafka 消息,但无法使用定义模式解析 json 数据。
当我运行bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_kafka_topic --from-beginning
命令查看kafka主题中的kafka消息时,输出如下:
"{\"timestamp\":1553792312117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":21,\"q\":true,\"t\":1553792311686}]}"
"{\"timestamp\":1553792317117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":22,\"q\":true,\"t\":1553792316688}]}"
而且我可以在 Spark 中使用此代码块成功获取此数据:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(col("value").cast("string"))
架构是这样的:
df.printSchema()
root
|-- value: string (nullable = true)
然后将此数据帧写入控制台并打印 kafka 消息:
Batch: 9
-------------------------------------------
+--------------------+
| value|
+--------------------+
|"{\"timestamp\":1...|
+--------------------+
但我想解析 json 数据来定义模式和我尝试这样做的代码块:
schema = StructType([ StructField("timestamp", LongType(), False), StructField("values", ArrayType( StructType([ StructField("id", StringType(), True), StructField("v", IntegerType(), False), StructField("q", BooleanType(), False), StructField("t", LongType(), False) ]), True ), True) ])
parsed = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("opc"))
以及 parsed
数据框的架构:
parsed.printSchema()
root
|-- opc: struct (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- values: struct (nullable = true)
| | |-- id: string (nullable = true)
| | |-- v: integer (nullable = true)
| | |-- q: boolean (nullable = true)
| | |-- t: string (nullable = true)
这些代码块运行没有错误。但是当我想将 parsed
数据帧写入控制台时:
query = parsed\
.writeStream\
.format("console")\
.start()
query.awaitTermination()
它在控制台中这样写null
:
+----+
| opc|
+----+
|null|
+----+
所以,似乎解析 json 数据有问题,但无法弄清楚。
你能告诉我哪里出了问题吗?
似乎架构不适合您的情况,请尝试应用下一个:
schema = StructType([
StructField("timestamp", LongType(), False),
StructField("values", ArrayType(
StructType([StructField("id", StringType(), True),
StructField("v", IntegerType(), False),
StructField("q", BooleanType(), False),
StructField("t", LongType(), False)]), True), True)])
另请记住,inferSchema
选项效果很好,因此您可以让 Spark 发现架构并保存它。
另一个问题是您的 json 数据有前导和尾随双引号 "
它还包含 \
那些无效的 JSON 阻止了 Spark 解析消息。
为了删除无效字符,您的代码应如下修改:
parsed = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.withColumn("value", regexp_replace(col("value").cast("string"), "\\", "")) \
.withColumn("value", regexp_replace(col("value"), "^\"|\"$", "")) \
.select(from_json(col("value"), schema).alias("opc"))
现在你的输出应该是:
+------------------------------------------------------------------------------------------------------------------+
|value |
+------------------------------------------------------------------------------------------------------------------+
|{"timestamp":1553588718638,"values":[{"id":"Simulation.Simulator.Temperature","v":26,"q":true,"t":1553588717036}]}|
+------------------------------------------------------------------------------------------------------------------+
祝你好运!