将 PySpark DataFrame 序列化为 JSON 数组
Serialize PySpark DataFrame as JSON array
我在 Azure Databricks 中使用 PySpark 获得了 DataFrame。我需要将它序列化为 JSON 到一个或多个文件中。这些文件最终将上传到 Cosmos,因此 JSON 格式正确至关重要。
我知道如何直接连接到 Cosmos 以直接序列化数据,但我需要创建 JSON 文件以便稍后上传到 Cosmos。
我无法从我的实际 DataFrame 中提供数据,但结构很复杂。每行都有嵌入式对象,其中一些有自己的嵌入式对象和对象数组。
我认为问题在于我如何尝试序列化数据,而不是我如何转换它。我创建了这个简单的 DataFrame df
,我认为它足以作为示例。
+---------+-------------+
|property1| array1|
+---------+-------------+
| value1|["a","b","c"]|
| value2|["x","y","z"]|
+---------+-------------+
我像这样将它序列化到 Azure Data Lake Storage Gen2。
df.coalesce(1).write.json(outpath, lineSep=",")
文件将包含此 JSON。这些行不是数组中的元素,最后一行有尾随逗号,因此 JSON 不会与 Cosmos 合作。
{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]},
这 JSON 按预期上传。
[{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]}]
我已经成功上传了单个 JSON 对象(即没有 []
包围它们)所以任何将每个 DataFrame 行写入其自己的 JSON 文件的解决方案都是潜在的赢家。
我已经 试过了,但总是有包含多行的文件。
我想到了两个方法。
首先使用 df.toJSON().collect()
创建一个 JSON 字符串行的列表,将数组切片成批,然后构建一个 JSON 数组字符串。
def batchWriteDataFrame(dataframe):
rows = dataframe.toJSON().collect()
batches = [rows[i * batch_size:(i + 1) * batch_size] for i in range((len(rows) + batch_size - 1) // batch_size)] # slice the rows into batches
batch_num = 1
for batch in batches:
dbutils.fs.put(outpath + "batch/" + str(batch_num) + ".json", "[" + ",".join([row for row in batch]) + "]")
batch_num += 1
第二个将每一行写入其自己的文件。
def writeDataFrameRows(dataframe):
i = 0
for row in dataframe.toJSON().collect():
dbutils.fs.put(outpath + "single/" + str(i) + ".json", row)
i += 1
我在 Azure Databricks 中使用 PySpark 获得了 DataFrame。我需要将它序列化为 JSON 到一个或多个文件中。这些文件最终将上传到 Cosmos,因此 JSON 格式正确至关重要。
我知道如何直接连接到 Cosmos 以直接序列化数据,但我需要创建 JSON 文件以便稍后上传到 Cosmos。
我无法从我的实际 DataFrame 中提供数据,但结构很复杂。每行都有嵌入式对象,其中一些有自己的嵌入式对象和对象数组。
我认为问题在于我如何尝试序列化数据,而不是我如何转换它。我创建了这个简单的 DataFrame df
,我认为它足以作为示例。
+---------+-------------+
|property1| array1|
+---------+-------------+
| value1|["a","b","c"]|
| value2|["x","y","z"]|
+---------+-------------+
我像这样将它序列化到 Azure Data Lake Storage Gen2。
df.coalesce(1).write.json(outpath, lineSep=",")
文件将包含此 JSON。这些行不是数组中的元素,最后一行有尾随逗号,因此 JSON 不会与 Cosmos 合作。
{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]},
这 JSON 按预期上传。
[{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]}]
我已经成功上传了单个 JSON 对象(即没有 []
包围它们)所以任何将每个 DataFrame 行写入其自己的 JSON 文件的解决方案都是潜在的赢家。
我已经
我想到了两个方法。
首先使用 df.toJSON().collect()
创建一个 JSON 字符串行的列表,将数组切片成批,然后构建一个 JSON 数组字符串。
def batchWriteDataFrame(dataframe):
rows = dataframe.toJSON().collect()
batches = [rows[i * batch_size:(i + 1) * batch_size] for i in range((len(rows) + batch_size - 1) // batch_size)] # slice the rows into batches
batch_num = 1
for batch in batches:
dbutils.fs.put(outpath + "batch/" + str(batch_num) + ".json", "[" + ",".join([row for row in batch]) + "]")
batch_num += 1
第二个将每一行写入其自己的文件。
def writeDataFrameRows(dataframe):
i = 0
for row in dataframe.toJSON().collect():
dbutils.fs.put(outpath + "single/" + str(i) + ".json", row)
i += 1