如何将 JSON 对象数组解析为 Spark Dataframe?

How do you parse an array of JSON objects into a Spark Dataframe?

我有一个包含 Twitter 数据的 JSON 文件集合,我想将它们用作 Databricks/Spark 中结构化流式传输的数据源。 JSON 文件具有以下结构:

[{...tweet data...},{...tweet data...},{...tweet data...},...]

我的 PySpark 代码:

# Stream from the /tmp/tweets folder
tweetstore = "/tmp/tweets/"

# Set up the folder as a streaming source
streamingInputDF = (
  spark \
    .readStream \
    .schema(json_schema) \
    .json(tweetstore)
)

# Check
streamingInputDF.isStreaming

# Access the DF using SQL
streamingQuery = streamingInputDF \
  .select("run_stamp", "user", "id", "source", "favorite_count", "retweet_count")\
  .writeStream \
  .format("memory") \
  .queryName("tweetstream") \
  .outputMode("append")\
  .start()

streamingDF = spark.sql("select * from tweetstream order by 1 desc")

我的输出如下所示:

Number of entries in dataframe: 3875046
+---------+----+----+------+--------------+-------------+
|run_stamp|user|id  |source|favorite_count|retweet_count|
+---------+----+----+------+--------------+-------------+
|null     |null|null|null  |null          |null         |
|null     |null|null|null  |null          |null         |
|null     |null|null|null  |null          |null         |

据我所知,我可能需要使用 UDFexplode() 来正确解析 JSON 数组,但到目前为止还没有完全弄清楚如何。

它在示例数据上对我来说效果很好-

 val data = """[{"id":1,"name":"abc1"},{"id":2,"name":"abc2"},{"id":3,"name":"abc3"}]"""
    val df = spark.read.json(Seq(data).toDS())
    df.show(false)
    df.printSchema()

    /**
      * +---+----+
      * |id |name|
      * +---+----+
      * |1  |abc1|
      * |2  |abc2|
      * |3  |abc3|
      * +---+----+
      *
      * root
      * |-- id: long (nullable = true)
      * |-- name: string (nullable = true)
      */

为可能偶然发现此问题的其他人记录答案:我意识到 JSON 没有像 Spark 期望的那样每行一个对象。然后,关键是添加 .option("multiline", True),即:

streamingInputDF = (
  spark \
    .readStream \
    .option("multiline", True) \
    .schema(json_schema) \
    .json(tweetstore)
)