使用 Python 并行处理巨大 JSON 的最佳方法

Best way to process the huge JSON in parallel using Python

我有一个巨大的 JSON 大约 5GB 和 200 万条记录。我正在尝试根据条件合并一些记录。在下面的示例中,我尝试为每个部分创建单个记录并将所有学生信息添加为嵌套 json。我还想拆分 subject 字段并将其转换为数组。这样的操作还有很多

原始 json:

[{"section": "abc-abc-abc", "student_id": "ss-23235", "subjects": "physics;maths;chemistry"},
{"section": "abc-abc-abc", "student_id": "ss-33237", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13632", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13265", "subjects": "physics;maths;chemistry"}]

我想把它转换成下面的形式

[
   {
      "section":"abc-abc-abc",
      "students":[
         {
            "student_id":"ss-23235",
            "subjects":[
               "physics",
               "maths",
               "chemistry"
            ]
         },
         {
            "student_id":"ss-33237",
            "subjects":[
               "physics",
               "maths",
               "chemistry"
            ]
         }
      ]
   },
   {
      "section":"xyz-xyz-xyz",
      "students":[
         {
            "student_id":"ss-13632",
            "subjects":[
               "physics",
               "maths",
               "chemistry"
            ]
         },
         {
            "student_id":"ss-13265",
            "subjects":[
               "physics",
               "maths",
               "chemistry"
            ]
         }
      ]
   }
]

我尝试在 spark 中加载数据并在列表中进行独特的会话然后使用 Python multiprocessing.pool 开始处理如下。

from multiprocessing.pool import ThreadPool
pool = ThreadPool(8)      
  
def process(section_part, student_df):
    # process section_part and store in a list  
    processed_data_list = [] 
    for section_id in section_part:       
        students = student_df.filter(student_df.section == section_id)
        updated_info = students.first().asDict()
        nested_stu_list = []
        for student in students.collect()[1:]: 
             ind_info = student.asDict()
            # process each records and store the data in ind_info
            # ind_info["subjects"]: ["physics", "maths", "chemistry"]
            nested_stu_list.append(ind_info)
        updated_info["students"] = nested_stu_list
    processed_data_list.append(updated_info)
    return processed_data_list     
        
uniq_section_list = student_df.select("section").distinct().collect()  

# create a list of lists with 10000 sections   
section_parts = [uniq_section_list[i:i+10000] for i in range(0, len(uniq_section_list), 10000)]        

#Using lambda process each sublists
result_lists = pool.map(lambda part: process(part), section_parts)

#Merge all the result list into one bigger list
final_datalist = list(itertools.chain.from_iterable(result_lists))

#save as new json file
with open('result.json', 'w') as fout:
     json.dump(final_datalist, fout)

我在 16GB RAM8 Core CPU 上 运行 这个。 对于 200000 条记录的样本,它需要超过 12 小时。以更快的方式实现这一目标的最佳方法是什么?我愿意使用任何图书馆。

您可以使用 Spark 来处理和聚合 JSON:

import pyspark.sql.functions as F

result = df.groupBy('section').agg(
    F.collect_list(
        F.struct(
            'student_id', 
            F.split('subjects', ';').alias('subjects')
        )
    ).alias('students')
)

result.show(truncate=False)
+-----------+----------------------------------------------------------------------------------+
|section    |students                                                                          |
+-----------+----------------------------------------------------------------------------------+
|xyz-xyz-xyz|[[ss-13632, [physics, maths, chemistry]], [ss-13265, [physics, maths, chemistry]]]|
|abc-abc-abc|[[ss-23235, [physics, maths, chemistry]], [ss-33237, [physics, maths, chemistry]]]|
+-----------+----------------------------------------------------------------------------------+

result.coalesce(1).write.json('result')