如何通过 pyspark 将一个文件中的多个嵌套 json 对象读取到 Azure 数据块中的数据框?

How to read multiple nested json objects in one file extract by pyspark to dataframe in Azure databricks?

我在 ADLS 中有 .log 文件,其中包含多个嵌套的 Json 对象,如下

{"EventType":3735091736,"Timestamp":"2019-03-19","Data":{"Id":"event-c2","Level":2,"MessageTemplate":"Test1","Properties":{"CorrId":"d69b7489","ActionId":"d0e2c3fd"}},"Id":"event-c20b9c7eac0808d6321106d901000000"}
{"EventType":3735091737,"Timestamp":"2019-03-18","Data":{"Id":"event-d2","Level":2,"MessageTemplate":"Test1","Properties":{"CorrId":"f69b7489","ActionId":"d0f2c3fd"}},"Id":"event-d20b9c7eac0808d6321106d901000000"}
{"EventType":3735091738,"Timestamp":"2019-03-17","Data":{"Id":"event-e2","Level":1,"MessageTemplate":"Test1","Properties":{"CorrId":"g69b7489","ActionId":"d0d2c3fd"}},"Id":"event-e20b9c7eac0808d6321106d901000000"}

需要在pyspark中读取上面的多个嵌套Json对象并转换为dataframe如下

EventType    Timestamp       Data.[Id]  ..... [Data.Properties.CorrId]    [Data.Properties. ActionId]
3735091736   2019-03-19      event-c2   ..... d69b7489                    d0e2c3fd   
3735091737   2019-03-18      event-d2   ..... f69b7489                    d0f2c3fd
3735091738    2019-03-17     event-e2   ..... f69b7489                    d0d2c3fd

对于以上内容,我在 Azure DataBricks 中使用 ADLS、Pyspark。

有谁知道处理上述问题的一般方法?谢谢!

  1. 你可以先读入一个RDD。它将被读取为字符串列表
  2. 您需要将 json 字符串转换为本机 python 数据类型,使用 json.loads()
  3. 然后你可以将RDD转换成dataframe,它可以直接使用toDF()
  4. 推断模式
  5. 使用 的答案,您可以将 Data 列分解为多列。鉴于您的 Id 列将是唯一的。请注意,explode 将为地图类型中的每个条目提供 return keyvalue 列。
  6. 您可以重复第 4 点来展开 properties 列。

解决方案:

import json

rdd = sc.textFile("demo_files/Test20191023.log")
df = rdd.map(lambda x: json.loads(x)).toDF()
df.show()
# +--------------------+----------+--------------------+----------+
# |                Data| EventType|                  Id| Timestamp|
# +--------------------+----------+--------------------+----------+
# |[MessageTemplate ...|3735091736|event-c20b9c7eac0...|2019-03-19|
# |[MessageTemplate ...|3735091737|event-d20b9c7eac0...|2019-03-18|
# |[MessageTemplate ...|3735091738|event-e20b9c7eac0...|2019-03-17|
# +--------------------+----------+--------------------+----------+

data_exploded = df.select('Id', 'EventType', "Timestamp", F.explode('Data'))\
    .groupBy('Id', 'EventType', "Timestamp").pivot('key').agg(F.first('value'))
# There is a duplicate Id column and might cause ambiguity problems
data_exploded.show()

# +--------------------+----------+----------+--------+-----+---------------+--------------------+
# |                  Id| EventType| Timestamp|      Id|Level|MessageTemplate|          Properties|
# +--------------------+----------+----------+--------+-----+---------------+--------------------+
# |event-c20b9c7eac0...|3735091736|2019-03-19|event-c2|    2|          Test1|{CorrId=d69b7489,...|
# |event-d20b9c7eac0...|3735091737|2019-03-18|event-d2|    2|          Test1|{CorrId=f69b7489,...|
# |event-e20b9c7eac0...|3735091738|2019-03-17|event-e2|    1|          Test1|{CorrId=g69b7489,...|
# +--------------------+----------+----------+--------+-----+---------------+--------------------+

我能够通过以下代码读取数据。

from pyspark.sql.functions import *
DF = spark.read.json("demo_files/Test20191023.log") 

DF.select(col('Id'),col('EventType'),col('Timestamp'),col('Data.Id'),col('Data.Level'),col('Data.MessageTemplate'),
          col('Data.Properties.CorrId'),col('Data.Properties.ActionId'))\
  .show()```

***Result*** 

+--------------------+----------+----------+--------+-----+---------------+--------+--------+
|                  Id| EventType| Timestamp|      Id|Level|MessageTemplate|  CorrId|ActionId|
+--------------------+----------+----------+--------+-----+---------------+--------+--------+
|event-c20b9c7eac0...|3735091736|2019-03-19|event-c2|    2|          Test1|d69b7489|d0e2c3fd|
|event-d20b9c7eac0...|3735091737|2019-03-18|event-d2|    2|          Test1|f69b7489|d0f2c3fd|
|event-e20b9c7eac0...|3735091738|2019-03-17|event-e2|    1|          Test1|g69b7489|d0d2c3fd|
+--------------------+----------+----------+--------+-----+---------------+--------+--------+