如何在 Python 中实现多处理?
How to implement multiprocessing in Python?
我想在 Python 中使用多处理来对独立列表进行排序。
例如,我有一个 int 字典作为键,一个列表作为值。
我试图实现一个简单的程序,但我很难再次将排序后的列表存储在 defaultdict 中,然后 return 将其存储到主模块中。
from multiprocessing import Process
def fun(id, user_data):
user_data.sort()
return user_data
# users_data is a defaultdict of id as key and a list as a value
if __name__ == '__main__':
for id,user_data in users_data.items():
P= Process(target=fun,args=(id,user_data))
P.start()
P.join()
您需要使用管理器在进程之间共享数据。
此外,正如@Tomerikoo 在评论中提到的那样,您现在执行此操作的方式实际上不会导致多处理,因为 P.join()
在 P.start()
之后脚本将暂停以让该进程完成,从而导致串行执行流程而不是并行。
你可以这样做:
from multiprocessing import Process, Manager
def sort_list(user_id, user_data, interprocess_dict):
user_data.sort()
interprocess_dict[user_id] = user_data
users_data = {}
users_data[1] = [5, 2, 1]
users_data[3] = [10, 12, 1]
def main():
interprocess_dict = Manager().dict()
processes = []
for user_id, user_data in users_data.items():
proc = Process(target=sort_list, args=(user_id, user_data, interprocess_dict,))
processes.append(proc)
proc.start()
for proc in processes:
proc.join()
for user_id, user_data in interprocess_dict.items():
print('{}: {}'.format(user_id, user_data))
if __name__ == '__main__':
main()
编辑:
最好将进程数量限制为可用的硬件数量 CPU,因为对列表进行排序是 100% CPU 绑定操作。
import multiprocessing as mp
def sort_list(user_id, user_data, interprocess_dict):
user_data.sort()
interprocess_dict[user_id] = user_data
def prepare_data():
users_data = {}
for i in range(1000):
users_data[i] = list(range(10000, 0, -1))
return users_data
def main():
# mp.set_start_method('spawn') # Only valid on OSX
interprocess_dict = mp.Manager().dict()
pool = mp.Pool(mp.cpu_count())
users_data = prepare_data()
for user_id, user_data in users_data.items():
pool.apply_async(sort_list, args = (user_id, user_data, interprocess_dict,))
pool.close()
pool.join()
for user_id, user_data in interprocess_dict.items():
print('{}: {}'.format(user_id, user_data))
if __name__ == '__main__':
main()
我想在 Python 中使用多处理来对独立列表进行排序。
例如,我有一个 int 字典作为键,一个列表作为值。
我试图实现一个简单的程序,但我很难再次将排序后的列表存储在 defaultdict 中,然后 return 将其存储到主模块中。
from multiprocessing import Process
def fun(id, user_data):
user_data.sort()
return user_data
# users_data is a defaultdict of id as key and a list as a value
if __name__ == '__main__':
for id,user_data in users_data.items():
P= Process(target=fun,args=(id,user_data))
P.start()
P.join()
您需要使用管理器在进程之间共享数据。
此外,正如@Tomerikoo 在评论中提到的那样,您现在执行此操作的方式实际上不会导致多处理,因为 P.join()
在 P.start()
之后脚本将暂停以让该进程完成,从而导致串行执行流程而不是并行。
你可以这样做:
from multiprocessing import Process, Manager
def sort_list(user_id, user_data, interprocess_dict):
user_data.sort()
interprocess_dict[user_id] = user_data
users_data = {}
users_data[1] = [5, 2, 1]
users_data[3] = [10, 12, 1]
def main():
interprocess_dict = Manager().dict()
processes = []
for user_id, user_data in users_data.items():
proc = Process(target=sort_list, args=(user_id, user_data, interprocess_dict,))
processes.append(proc)
proc.start()
for proc in processes:
proc.join()
for user_id, user_data in interprocess_dict.items():
print('{}: {}'.format(user_id, user_data))
if __name__ == '__main__':
main()
编辑:
最好将进程数量限制为可用的硬件数量 CPU,因为对列表进行排序是 100% CPU 绑定操作。
import multiprocessing as mp
def sort_list(user_id, user_data, interprocess_dict):
user_data.sort()
interprocess_dict[user_id] = user_data
def prepare_data():
users_data = {}
for i in range(1000):
users_data[i] = list(range(10000, 0, -1))
return users_data
def main():
# mp.set_start_method('spawn') # Only valid on OSX
interprocess_dict = mp.Manager().dict()
pool = mp.Pool(mp.cpu_count())
users_data = prepare_data()
for user_id, user_data in users_data.items():
pool.apply_async(sort_list, args = (user_id, user_data, interprocess_dict,))
pool.close()
pool.join()
for user_id, user_data in interprocess_dict.items():
print('{}: {}'.format(user_id, user_data))
if __name__ == '__main__':
main()