Python 使用地图进行多处理
Python multiprocessing using map
#!/usr/bin/env python
import multiprocessing
def tempx((the_time)):
return int(the_time)*int(the_time)
def tempy((the_time, foobar)):
return int(the_time)/(float(foobar))+100
def mp_handler():
p = multiprocessing.Pool(2)
foo = p.map(tempx, [('2')])
print foo
foo = p.map(tempy, [('100', '100000')])
print foo
if __name__ == '__main__':
mp_handler()
我有两种输入参数不同的方法。第一种方法只有:the_time
,第二种方法有the_time and foobar
我需要按特定顺序排列的结果,因此我使用了 map 函数。但是,上面的代码根本没有使用多进程模块,因为我使用了两个 map
函数。我对吗?
最终目标是同时拥有两种方法运行。
我在这里错过了什么?
达诺,这是我正在做的一个例子
import multiprocessing
def print_timing(func):
def wrapper(*arg):
t1 = time.time()
res = func(*arg)
t2 = time.time()
print '%s took %0.3f ms' % (func.func_name, (t2-t1)*1000.0)
return res
return wrapper
@print_timing
def case_one(power_pred, power_core, num_thrs, possible_frequency, clamp_range):
ds1_cur_freq = list()
ds1_freq_index = list()
ds1_cur_clamp = list()
return ds1_cur_freq, ds1_freq_index, ds1_cur_clamp
@print_timing
def case_two(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index, clamp_range):
ds2_cur_freq = list()
ds2_freq_index = list()
ds2_cur_clamp = list()
return ds2_cur_freq, ds2_freq_index, ds2_cur_clamp
def defs_finder():
cpower_pred = list()
power_pred = list()
power_core = list()
num_thrs = 3
possible_frequency = list()
clamp_range= list()
DT_index =1
TT_index = 0
p = multiprocessing.Pool(2)
#Case 1: DS1
# ds1_cur_freq, ds1_freq_index, ds1_cur_clamp =
ds1 = p.apply_async(case_one, args=(power_pred, power_core, num_thrs, possible_frequency))
#Case 1: DS1
# ds1_cur_freq, ds1_freq_index, ds1_cur_clamp = case_one(power_pred, power_core, num_thrs, possible_frequency, clamp_range)
#Case 2: DS2
# ds2_cur_freq, ds2_freq_index, ds2_cur_clamp = case_two(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index, clamp_range)
ds2 = p.apply_async(case_two, args=(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index))
print ds1
print ds2
print ds1.get()
print ds2.get()
# ds1_cur_freq = ds1.get()[0]
# ds1_freq_index = ds1.get()[1]
# ds1_cur_clamp = ds1.get()[2]
# ds2_cur_freq = ds2.get()[0]
# ds2_freq_index = ds2.get()[1]
# ds2_cur_clamp = ds2.get()[2]
defs_finder()
这就是它现在的实现方式和重现错误的方式
不,p.map
会很好地分配您的计算:它将您传递给它的函数的计算与您传递给它的参数并行化。但是,在您的代码中,p.map
对 tempx
和 tempy
的应用不是并行的,如果这是您想要的。
但是,与普通的 map
或列表推导式 ([function(x) for x in your list]
) 不同,p.map
是异步的,不会 return 您按特定顺序获得结果.之后您需要根据某个键对结果进行排序。
Pool.map
如果您需要 运行 并行地对可迭代对象的所有元素执行特定函数,并阻塞直到处理完整个可迭代对象,则此方法很有用。在你的例子中,你只是在迭代中传递一个项目,所以你只是在子进程中 运行ning 一个函数,并阻塞直到它完成。这比仅 运行 在父进程中设置函数要慢,因为您增加了 IPC 的开销。
如果您的目标是 运行 tempx
和 tempy
与一组参数并行,Pool.apply_async
是更好的选择:
import multiprocessing
def tempx(the_time):
return int(the_time)*int(the_time)
def tempy(the_time, foobar):
return int(the_time)/(float(foobar))+100
def mp_handler():
p = multiprocessing.Pool(2)
foox = p.apply_async(tempx, args=('2',))
fooy = p.apply_async(tempy, args=('100', '100000'))
print foox.get()
print fooy.get()
if __name__ == '__main__':
mp_handler()
apply_async
是非阻塞的;它立即 returns 一个 AsyncResult
对象,稍后您可以通过调用 AsyncResult.get
使用它来实际获取异步操作的结果。因此,我们只需在两个函数上调用 apply_async
以在后台启动它们,然后在每个 AsyncResult
上调用 get()
以等待它们完成。
这里要注意的另一件事是:在您的示例中,您在子进程中所做的工作非常轻松 - 完成任一函数都不会花费很长时间。相比之下,生成后台进程并通过 IPC 将您的函数及其参数传递给这些后台进程,然后将结果发回的成本很高。您可能会发现使用 multiprocessing
比在父进程中按顺序执行这些函数要慢。为了使 multiprocessing
值得使用,您需要在 tempx
和 tempy
.
中进行更昂贵的计算
#!/usr/bin/env python
import multiprocessing
def tempx((the_time)):
return int(the_time)*int(the_time)
def tempy((the_time, foobar)):
return int(the_time)/(float(foobar))+100
def mp_handler():
p = multiprocessing.Pool(2)
foo = p.map(tempx, [('2')])
print foo
foo = p.map(tempy, [('100', '100000')])
print foo
if __name__ == '__main__':
mp_handler()
我有两种输入参数不同的方法。第一种方法只有:the_time
,第二种方法有the_time and foobar
我需要按特定顺序排列的结果,因此我使用了 map 函数。但是,上面的代码根本没有使用多进程模块,因为我使用了两个 map
函数。我对吗?
最终目标是同时拥有两种方法运行。
我在这里错过了什么?
达诺,这是我正在做的一个例子
import multiprocessing
def print_timing(func):
def wrapper(*arg):
t1 = time.time()
res = func(*arg)
t2 = time.time()
print '%s took %0.3f ms' % (func.func_name, (t2-t1)*1000.0)
return res
return wrapper
@print_timing
def case_one(power_pred, power_core, num_thrs, possible_frequency, clamp_range):
ds1_cur_freq = list()
ds1_freq_index = list()
ds1_cur_clamp = list()
return ds1_cur_freq, ds1_freq_index, ds1_cur_clamp
@print_timing
def case_two(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index, clamp_range):
ds2_cur_freq = list()
ds2_freq_index = list()
ds2_cur_clamp = list()
return ds2_cur_freq, ds2_freq_index, ds2_cur_clamp
def defs_finder():
cpower_pred = list()
power_pred = list()
power_core = list()
num_thrs = 3
possible_frequency = list()
clamp_range= list()
DT_index =1
TT_index = 0
p = multiprocessing.Pool(2)
#Case 1: DS1
# ds1_cur_freq, ds1_freq_index, ds1_cur_clamp =
ds1 = p.apply_async(case_one, args=(power_pred, power_core, num_thrs, possible_frequency))
#Case 1: DS1
# ds1_cur_freq, ds1_freq_index, ds1_cur_clamp = case_one(power_pred, power_core, num_thrs, possible_frequency, clamp_range)
#Case 2: DS2
# ds2_cur_freq, ds2_freq_index, ds2_cur_clamp = case_two(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index, clamp_range)
ds2 = p.apply_async(case_two, args=(cpower_pred, power_core, num_thrs, possible_frequency, TT_index, DT_index))
print ds1
print ds2
print ds1.get()
print ds2.get()
# ds1_cur_freq = ds1.get()[0]
# ds1_freq_index = ds1.get()[1]
# ds1_cur_clamp = ds1.get()[2]
# ds2_cur_freq = ds2.get()[0]
# ds2_freq_index = ds2.get()[1]
# ds2_cur_clamp = ds2.get()[2]
defs_finder()
这就是它现在的实现方式和重现错误的方式
不,p.map
会很好地分配您的计算:它将您传递给它的函数的计算与您传递给它的参数并行化。但是,在您的代码中,p.map
对 tempx
和 tempy
的应用不是并行的,如果这是您想要的。
但是,与普通的 map
或列表推导式 ([function(x) for x in your list]
) 不同,p.map
是异步的,不会 return 您按特定顺序获得结果.之后您需要根据某个键对结果进行排序。
Pool.map
如果您需要 运行 并行地对可迭代对象的所有元素执行特定函数,并阻塞直到处理完整个可迭代对象,则此方法很有用。在你的例子中,你只是在迭代中传递一个项目,所以你只是在子进程中 运行ning 一个函数,并阻塞直到它完成。这比仅 运行 在父进程中设置函数要慢,因为您增加了 IPC 的开销。
如果您的目标是 运行 tempx
和 tempy
与一组参数并行,Pool.apply_async
是更好的选择:
import multiprocessing
def tempx(the_time):
return int(the_time)*int(the_time)
def tempy(the_time, foobar):
return int(the_time)/(float(foobar))+100
def mp_handler():
p = multiprocessing.Pool(2)
foox = p.apply_async(tempx, args=('2',))
fooy = p.apply_async(tempy, args=('100', '100000'))
print foox.get()
print fooy.get()
if __name__ == '__main__':
mp_handler()
apply_async
是非阻塞的;它立即 returns 一个 AsyncResult
对象,稍后您可以通过调用 AsyncResult.get
使用它来实际获取异步操作的结果。因此,我们只需在两个函数上调用 apply_async
以在后台启动它们,然后在每个 AsyncResult
上调用 get()
以等待它们完成。
这里要注意的另一件事是:在您的示例中,您在子进程中所做的工作非常轻松 - 完成任一函数都不会花费很长时间。相比之下,生成后台进程并通过 IPC 将您的函数及其参数传递给这些后台进程,然后将结果发回的成本很高。您可能会发现使用 multiprocessing
比在父进程中按顺序执行这些函数要慢。为了使 multiprocessing
值得使用,您需要在 tempx
和 tempy
.