multiprocessing 在做任何工作之前退出
multiprocessing exits before doing any job
我继承了某个解析器,该解析器应该解析 10 个文件,每个文件大约有 400 万行。
代码写在Python2,我更新了。
有一个多处理逻辑,我似乎无法开始工作。
from multiprocessing.pool import ThreadPool
import glob
DATADIR = 'home/my_dir/where/all/my/files/are'
def process_file(filepath):
# read line by line, parse and insert to postgres database.
def process_directory(dirpath):
pattern = f'{dirpath}/*dat' # files have .dat extension.
tp = ThreadPool(10)
for filepath in glob.glob(pattern):
print(filepath)
tp.apply_async(process_file, filepath)
tp.close()
tp.join()
if __name__ == '__main__':
process_directory(DATADIR)
我已经查看了很多文档和一些类似的问题,但它似乎不起作用。
使用解析器代码时,我确实在控制台上打印了我需要解析的文件的所有路径,但仅此而已,程序不会执行任何其他操作。
问题在于您的调用方式 apply_async
。我对你的问题做了一个简单的重现器,但稍作调整以获得每次调用的结果:
from multiprocessing.pool import ThreadPool
def func(f):
print("hey " + f)
return f + "1"
l = ["name", "name2", "name3"]
pool = ThreadPool(3)
out = []
for a in l:
print(a)
out.append(pool.apply_async(func, a))
# Check the response from each `apply_async` call
for a in out:
a.get()
pool.close()
pool.join()
这returns一个错误:
Traceback (most recent call last):
File "a.py", line 16, in <module>
a.get()
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
TypeError: func() takes 1 positional argument but 4 were given
它认为您传递的是四个位置参数,而不是一个。这是因为 apply_async
希望所有的参数都在一个元组中传递,像这样:
pool.apply_async(func, (a,))
如果您在调用 apply_async
时将 filepath
放入元组中,我认为您会得到预期的行为。
还值得注意的是,您的用例非常适合使用 pool.map
而不是 apply_async
,后者更简洁:
pool.map(process_file, glob.glob(pattern))
我继承了某个解析器,该解析器应该解析 10 个文件,每个文件大约有 400 万行。
代码写在Python2,我更新了。
有一个多处理逻辑,我似乎无法开始工作。
from multiprocessing.pool import ThreadPool
import glob
DATADIR = 'home/my_dir/where/all/my/files/are'
def process_file(filepath):
# read line by line, parse and insert to postgres database.
def process_directory(dirpath):
pattern = f'{dirpath}/*dat' # files have .dat extension.
tp = ThreadPool(10)
for filepath in glob.glob(pattern):
print(filepath)
tp.apply_async(process_file, filepath)
tp.close()
tp.join()
if __name__ == '__main__':
process_directory(DATADIR)
我已经查看了很多文档和一些类似的问题,但它似乎不起作用。
使用解析器代码时,我确实在控制台上打印了我需要解析的文件的所有路径,但仅此而已,程序不会执行任何其他操作。
问题在于您的调用方式 apply_async
。我对你的问题做了一个简单的重现器,但稍作调整以获得每次调用的结果:
from multiprocessing.pool import ThreadPool
def func(f):
print("hey " + f)
return f + "1"
l = ["name", "name2", "name3"]
pool = ThreadPool(3)
out = []
for a in l:
print(a)
out.append(pool.apply_async(func, a))
# Check the response from each `apply_async` call
for a in out:
a.get()
pool.close()
pool.join()
这returns一个错误:
Traceback (most recent call last):
File "a.py", line 16, in <module>
a.get()
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
TypeError: func() takes 1 positional argument but 4 were given
它认为您传递的是四个位置参数,而不是一个。这是因为 apply_async
希望所有的参数都在一个元组中传递,像这样:
pool.apply_async(func, (a,))
如果您在调用 apply_async
时将 filepath
放入元组中,我认为您会得到预期的行为。
还值得注意的是,您的用例非常适合使用 pool.map
而不是 apply_async
,后者更简洁:
pool.map(process_file, glob.glob(pattern))