使用并发期货将多线程 Python 转换为多进程
Convert a multi-threaded Python to a multi-process one using concurrent futures
我有以下工作代码 (Python 3.5),它使用并发 futures 以线程方式解析文件,然后在结果返回时对结果进行一些 post 处理(以任何顺序)。
from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=4) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the value
jobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it, along with the kwargs of parser_variables.
# The results of the functions can come back in any order.
for this_file in files_list:
job = executor.submit(parse_log_file.parse, this_file, **parser_variables)
jobs[job] = this_file
# Get the completed jobs whenever they are done
for job in futures.as_completed(jobs):
debug.checkpointer("Multi-threaded Parsing File finishing")
# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
result_content = job.result()
this_file = jobs[job]
我想将其转换为使用进程而不是线程,因为线程不提供任何加速。理论上我只需要将 ThreadPoolExecutor 更改为 ProcessPoolExecutor 即可。
问题是,如果我这样做,我会得到这个异常:
Process Process-2:
Traceback (most recent call last):
File "C:\Python35\lib\multiprocessing\process.py", line 254, in _bootstrap
self.run()
File "C:\Python35\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Python35\lib\concurrent\futures\process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "C:\Python35\lib\multiprocessing\queues.py", line 113, in get
return ForkingPickler.loads(res)
TypeError: Required argument 'fileno' (pos 1) not found
Traceback (most recent call last):
File "c:/myscript/main.py", line 89, in <module>
main()
File "c:/myscript/main.py", line 59, in main
system_counters = process_system(system, filename)
File "c:\myscript\per_system.py", line 208, in process_system
system_counters = process_filelist(**file_handling_variables)
File "c:\myscript\per_logfile.py", line 31, in process_filelist
results_list = job.result()
File "C:\Python35\lib\concurrent\futures\_base.py", line 398, in result
return self.__get_result()
File "C:\Python35\lib\concurrent\futures\_base.py", line 357, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
我认为这可能与酸洗有关,但谷歌搜索没有发现任何错误。
如何将上述转换为使用多进程?
事实证明这是因为我在 parser_variables
中传递的东西之一是 class(来自 third-party 模块的 reader)。如果我删除 class,上面的工作正常。
无论出于何种原因,pickle 似乎无法处理这个特定对象。
我有以下工作代码 (Python 3.5),它使用并发 futures 以线程方式解析文件,然后在结果返回时对结果进行一些 post 处理(以任何顺序)。
from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=4) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the value
jobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it, along with the kwargs of parser_variables.
# The results of the functions can come back in any order.
for this_file in files_list:
job = executor.submit(parse_log_file.parse, this_file, **parser_variables)
jobs[job] = this_file
# Get the completed jobs whenever they are done
for job in futures.as_completed(jobs):
debug.checkpointer("Multi-threaded Parsing File finishing")
# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
result_content = job.result()
this_file = jobs[job]
我想将其转换为使用进程而不是线程,因为线程不提供任何加速。理论上我只需要将 ThreadPoolExecutor 更改为 ProcessPoolExecutor 即可。 问题是,如果我这样做,我会得到这个异常:
Process Process-2:
Traceback (most recent call last):
File "C:\Python35\lib\multiprocessing\process.py", line 254, in _bootstrap
self.run()
File "C:\Python35\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Python35\lib\concurrent\futures\process.py", line 169, in _process_worker
call_item = call_queue.get(block=True)
File "C:\Python35\lib\multiprocessing\queues.py", line 113, in get
return ForkingPickler.loads(res)
TypeError: Required argument 'fileno' (pos 1) not found
Traceback (most recent call last):
File "c:/myscript/main.py", line 89, in <module>
main()
File "c:/myscript/main.py", line 59, in main
system_counters = process_system(system, filename)
File "c:\myscript\per_system.py", line 208, in process_system
system_counters = process_filelist(**file_handling_variables)
File "c:\myscript\per_logfile.py", line 31, in process_filelist
results_list = job.result()
File "C:\Python35\lib\concurrent\futures\_base.py", line 398, in result
return self.__get_result()
File "C:\Python35\lib\concurrent\futures\_base.py", line 357, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
我认为这可能与酸洗有关,但谷歌搜索没有发现任何错误。
如何将上述转换为使用多进程?
事实证明这是因为我在 parser_variables
中传递的东西之一是 class(来自 third-party 模块的 reader)。如果我删除 class,上面的工作正常。
无论出于何种原因,pickle 似乎无法处理这个特定对象。