使用 Python 并行处理巨大 JSON 的最佳方法
Best way to process the huge JSON in parallel using Python
我有一个巨大的 JSON 大约 5GB 和 200 万条记录。我正在尝试根据条件合并一些记录。在下面的示例中,我尝试为每个部分创建单个记录并将所有学生信息添加为嵌套 json。我还想拆分 subject
字段并将其转换为数组。这样的操作还有很多
原始 json:
[{"section": "abc-abc-abc", "student_id": "ss-23235", "subjects": "physics;maths;chemistry"},
{"section": "abc-abc-abc", "student_id": "ss-33237", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13632", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13265", "subjects": "physics;maths;chemistry"}]
我想把它转换成下面的形式
[
{
"section":"abc-abc-abc",
"students":[
{
"student_id":"ss-23235",
"subjects":[
"physics",
"maths",
"chemistry"
]
},
{
"student_id":"ss-33237",
"subjects":[
"physics",
"maths",
"chemistry"
]
}
]
},
{
"section":"xyz-xyz-xyz",
"students":[
{
"student_id":"ss-13632",
"subjects":[
"physics",
"maths",
"chemistry"
]
},
{
"student_id":"ss-13265",
"subjects":[
"physics",
"maths",
"chemistry"
]
}
]
}
]
我尝试在 spark 中加载数据并在列表中进行独特的会话然后使用 Python multiprocessing.pool
开始处理如下。
from multiprocessing.pool import ThreadPool
pool = ThreadPool(8)
def process(section_part, student_df):
# process section_part and store in a list
processed_data_list = []
for section_id in section_part:
students = student_df.filter(student_df.section == section_id)
updated_info = students.first().asDict()
nested_stu_list = []
for student in students.collect()[1:]:
ind_info = student.asDict()
# process each records and store the data in ind_info
# ind_info["subjects"]: ["physics", "maths", "chemistry"]
nested_stu_list.append(ind_info)
updated_info["students"] = nested_stu_list
processed_data_list.append(updated_info)
return processed_data_list
uniq_section_list = student_df.select("section").distinct().collect()
# create a list of lists with 10000 sections
section_parts = [uniq_section_list[i:i+10000] for i in range(0, len(uniq_section_list), 10000)]
#Using lambda process each sublists
result_lists = pool.map(lambda part: process(part), section_parts)
#Merge all the result list into one bigger list
final_datalist = list(itertools.chain.from_iterable(result_lists))
#save as new json file
with open('result.json', 'w') as fout:
json.dump(final_datalist, fout)
我在 16GB RAM
和 8 Core CPU
上 运行 这个。
对于 200000
条记录的样本,它需要超过 12 小时。以更快的方式实现这一目标的最佳方法是什么?我愿意使用任何图书馆。
您可以使用 Spark 来处理和聚合 JSON:
import pyspark.sql.functions as F
result = df.groupBy('section').agg(
F.collect_list(
F.struct(
'student_id',
F.split('subjects', ';').alias('subjects')
)
).alias('students')
)
result.show(truncate=False)
+-----------+----------------------------------------------------------------------------------+
|section |students |
+-----------+----------------------------------------------------------------------------------+
|xyz-xyz-xyz|[[ss-13632, [physics, maths, chemistry]], [ss-13265, [physics, maths, chemistry]]]|
|abc-abc-abc|[[ss-23235, [physics, maths, chemistry]], [ss-33237, [physics, maths, chemistry]]]|
+-----------+----------------------------------------------------------------------------------+
result.coalesce(1).write.json('result')
我有一个巨大的 JSON 大约 5GB 和 200 万条记录。我正在尝试根据条件合并一些记录。在下面的示例中,我尝试为每个部分创建单个记录并将所有学生信息添加为嵌套 json。我还想拆分 subject
字段并将其转换为数组。这样的操作还有很多
原始 json:
[{"section": "abc-abc-abc", "student_id": "ss-23235", "subjects": "physics;maths;chemistry"},
{"section": "abc-abc-abc", "student_id": "ss-33237", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13632", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13265", "subjects": "physics;maths;chemistry"}]
我想把它转换成下面的形式
[
{
"section":"abc-abc-abc",
"students":[
{
"student_id":"ss-23235",
"subjects":[
"physics",
"maths",
"chemistry"
]
},
{
"student_id":"ss-33237",
"subjects":[
"physics",
"maths",
"chemistry"
]
}
]
},
{
"section":"xyz-xyz-xyz",
"students":[
{
"student_id":"ss-13632",
"subjects":[
"physics",
"maths",
"chemistry"
]
},
{
"student_id":"ss-13265",
"subjects":[
"physics",
"maths",
"chemistry"
]
}
]
}
]
我尝试在 spark 中加载数据并在列表中进行独特的会话然后使用 Python multiprocessing.pool
开始处理如下。
from multiprocessing.pool import ThreadPool
pool = ThreadPool(8)
def process(section_part, student_df):
# process section_part and store in a list
processed_data_list = []
for section_id in section_part:
students = student_df.filter(student_df.section == section_id)
updated_info = students.first().asDict()
nested_stu_list = []
for student in students.collect()[1:]:
ind_info = student.asDict()
# process each records and store the data in ind_info
# ind_info["subjects"]: ["physics", "maths", "chemistry"]
nested_stu_list.append(ind_info)
updated_info["students"] = nested_stu_list
processed_data_list.append(updated_info)
return processed_data_list
uniq_section_list = student_df.select("section").distinct().collect()
# create a list of lists with 10000 sections
section_parts = [uniq_section_list[i:i+10000] for i in range(0, len(uniq_section_list), 10000)]
#Using lambda process each sublists
result_lists = pool.map(lambda part: process(part), section_parts)
#Merge all the result list into one bigger list
final_datalist = list(itertools.chain.from_iterable(result_lists))
#save as new json file
with open('result.json', 'w') as fout:
json.dump(final_datalist, fout)
我在 16GB RAM
和 8 Core CPU
上 运行 这个。
对于 200000
条记录的样本,它需要超过 12 小时。以更快的方式实现这一目标的最佳方法是什么?我愿意使用任何图书馆。
您可以使用 Spark 来处理和聚合 JSON:
import pyspark.sql.functions as F
result = df.groupBy('section').agg(
F.collect_list(
F.struct(
'student_id',
F.split('subjects', ';').alias('subjects')
)
).alias('students')
)
result.show(truncate=False)
+-----------+----------------------------------------------------------------------------------+
|section |students |
+-----------+----------------------------------------------------------------------------------+
|xyz-xyz-xyz|[[ss-13632, [physics, maths, chemistry]], [ss-13265, [physics, maths, chemistry]]]|
|abc-abc-abc|[[ss-23235, [physics, maths, chemistry]], [ss-33237, [physics, maths, chemistry]]]|
+-----------+----------------------------------------------------------------------------------+
result.coalesce(1).write.json('result')