Python:我如何定义一个全局变量,可从命令行参数的多处理池访问?
Python: How do I define a global variable accessible by a multiprocessing pool from command line arguments?
我有一个从大型数据集生成文件的脚本,所以我使用多处理来加快处理速度。我遇到的问题是我的脚本使用 argparse
库接受多个命令行参数,这会改变结果,我正在努力将命令行参数传递给我的多处理池调用的函数。
我确定解决此问题的方法非常简单,只是我没有看到。我想我会创建一个全局变量来更新以反映命令行参数,但池调用的我的函数仍然具有旧值。我试图在下面说明我的问题:
output_dir = 'default'
def do_task(item):
print(output_dir) # Prints 'default'
result = process_item(item)
write_to_file(data=result, location=os.path.join(output_dir, item.name))
def do_multi_threaded_work(data_path):
print(output_dir) # Prints command line argument
data = read_from_file(args.input_file)
pool = multiprocessing.Pool()
for i, _ in enumerate(pool.imap_unordered(do_task, data):
print('Completed task %d/%d' % (i, len(data)))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output-dir')
parser.add_argument('-i', '--input-file')
args = parser.parse_args()
output_dir = args.output_dir
do_multithreaded_work(args.input_file)
如何确保根据命令行参数将文件保存到正确的目录?
编辑:有人建议我做类似下面代码的事情,但是考虑到我有很多常量(在这个例子中我将其简化为 1)实际代码这看起来非常混乱且违反直觉。真的没有更好的方法来设置一个可由 do_task
函数访问的全局常量,而不对值进行硬编码吗?
from itertools import repeat
...
def do_multi_threaded_work(data_path):
...
for i, _ in enumerate(pool.imap_unordered(do_task, zip(data, repeat(output_dir))):
如果我以正确的方式理解了你的问题,你可以执行以下操作以使用主要数据向你的函数发送额外的参数:
# my toy example:
import multiprocessing as mp
def do_job(x) -> int:
# x[0] - is a real data # x[1], x[2] imagine the are parameters to tune fuction
return x[0]**2 + x[1] + x[2]
if __name__ == '__main__':
jobs = [1, 2, 3, 4, 5, 6, 7, 8] # number 0 argument - data you want to process
# rules to work with jobs - tune parameters
number_one_arg = 100
number_two_arg = 2000
# create structure to accompany data with tune parameters
x_for_do_job = [(i, number_one_arg, number_two_arg,) for i in jobs]
print(x_for_do_job) # show what we have now
pool_ = mp.Pool(4)
results = pool_.map(do_job, x_for_do_job)
print(results)
最终找到了涉及使用 functools
库的 partial
功能的解决方案。这使我能够通过创建具有指定参数的部分函数来指定任何常量参数。然后,我将该部分函数与可迭代对象一起传递给池。
from functools import partial
def do_task(output_dir, item):
print(output_dir) # Prints 'default'
result = process_item(item)
write_to_file(data=result, location=os.path.join(output_dir, item.name))
def do_multi_threaded_work(data_path):
print(output_dir) # Prints command line argument
data = read_from_file(args.input_file)
func = partial(do_task, output_dir)
pool = multiprocessing.Pool()
for i, _ in enumerate(pool.imap_unordered(func, data):
print('Completed task %d/%d' % (i, len(data)))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output-dir')
parser.add_argument('-i', '--input-file')
args = parser.parse_args()
output_dir = args.output_dir
do_multithreaded_work(args.input_file)
我有一个从大型数据集生成文件的脚本,所以我使用多处理来加快处理速度。我遇到的问题是我的脚本使用 argparse
库接受多个命令行参数,这会改变结果,我正在努力将命令行参数传递给我的多处理池调用的函数。
我确定解决此问题的方法非常简单,只是我没有看到。我想我会创建一个全局变量来更新以反映命令行参数,但池调用的我的函数仍然具有旧值。我试图在下面说明我的问题:
output_dir = 'default'
def do_task(item):
print(output_dir) # Prints 'default'
result = process_item(item)
write_to_file(data=result, location=os.path.join(output_dir, item.name))
def do_multi_threaded_work(data_path):
print(output_dir) # Prints command line argument
data = read_from_file(args.input_file)
pool = multiprocessing.Pool()
for i, _ in enumerate(pool.imap_unordered(do_task, data):
print('Completed task %d/%d' % (i, len(data)))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output-dir')
parser.add_argument('-i', '--input-file')
args = parser.parse_args()
output_dir = args.output_dir
do_multithreaded_work(args.input_file)
如何确保根据命令行参数将文件保存到正确的目录?
编辑:有人建议我做类似下面代码的事情,但是考虑到我有很多常量(在这个例子中我将其简化为 1)实际代码这看起来非常混乱且违反直觉。真的没有更好的方法来设置一个可由 do_task
函数访问的全局常量,而不对值进行硬编码吗?
from itertools import repeat
...
def do_multi_threaded_work(data_path):
...
for i, _ in enumerate(pool.imap_unordered(do_task, zip(data, repeat(output_dir))):
如果我以正确的方式理解了你的问题,你可以执行以下操作以使用主要数据向你的函数发送额外的参数:
# my toy example:
import multiprocessing as mp
def do_job(x) -> int:
# x[0] - is a real data # x[1], x[2] imagine the are parameters to tune fuction
return x[0]**2 + x[1] + x[2]
if __name__ == '__main__':
jobs = [1, 2, 3, 4, 5, 6, 7, 8] # number 0 argument - data you want to process
# rules to work with jobs - tune parameters
number_one_arg = 100
number_two_arg = 2000
# create structure to accompany data with tune parameters
x_for_do_job = [(i, number_one_arg, number_two_arg,) for i in jobs]
print(x_for_do_job) # show what we have now
pool_ = mp.Pool(4)
results = pool_.map(do_job, x_for_do_job)
print(results)
最终找到了涉及使用 functools
库的 partial
功能的解决方案。这使我能够通过创建具有指定参数的部分函数来指定任何常量参数。然后,我将该部分函数与可迭代对象一起传递给池。
from functools import partial
def do_task(output_dir, item):
print(output_dir) # Prints 'default'
result = process_item(item)
write_to_file(data=result, location=os.path.join(output_dir, item.name))
def do_multi_threaded_work(data_path):
print(output_dir) # Prints command line argument
data = read_from_file(args.input_file)
func = partial(do_task, output_dir)
pool = multiprocessing.Pool()
for i, _ in enumerate(pool.imap_unordered(func, data):
print('Completed task %d/%d' % (i, len(data)))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output-dir')
parser.add_argument('-i', '--input-file')
args = parser.parse_args()
output_dir = args.output_dir
do_multithreaded_work(args.input_file)