如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框
How to parse a json string column in pyspark's DataStreamReader and create a Data Frame
我正在阅读来自 kafka 主题的消息
messageDFRaw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "test-message")\
.load()
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")
当我从上述查询中打印数据框时,我得到以下控制台输出。
|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|
如何从 DataStreamReader 创建一个数据框,以便我有一个列为 |key|channel| username| message|
的数据框
我尝试遵循
中接受的答案
struct = StructType([
StructField("channel", StringType()),
StructField("username", StringType()),
StructField("message", StringType()),
])
messageDFRaw.select(from_json("CAST(value AS STRING)", struct))
但是,我在 from_json()
中得到了 Expected type 'StructField', got 'StructType' instead
我忽略了 from_json()
中的警告 Expected type 'StructField', got 'StructType' instead
。
但是,我必须先从 kafka 消息中转换值,然后再解析 json 模式。
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
messageParsedDF = messageDF.select(from_json("value", struct_schema).alias("message"))
messageFlattenedDF = messageParsedDF.selectExpr("value.channel", "value.username", "value.message")
我正在阅读来自 kafka 主题的消息
messageDFRaw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "test-message")\
.load()
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")
当我从上述查询中打印数据框时,我得到以下控制台输出。
|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|
如何从 DataStreamReader 创建一个数据框,以便我有一个列为 |key|channel| username| message|
我尝试遵循
struct = StructType([
StructField("channel", StringType()),
StructField("username", StringType()),
StructField("message", StringType()),
])
messageDFRaw.select(from_json("CAST(value AS STRING)", struct))
但是,我在 from_json()
Expected type 'StructField', got 'StructType' instead
我忽略了 from_json()
中的警告 Expected type 'StructField', got 'StructType' instead
。
但是,我必须先从 kafka 消息中转换值,然后再解析 json 模式。
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
messageParsedDF = messageDF.select(from_json("value", struct_schema).alias("message"))
messageFlattenedDF = messageParsedDF.selectExpr("value.channel", "value.username", "value.message")