运行 在 python 中并行使用位置参数和可选参数的函数(跟进)
Run function with positional and optional arguments in parallel in python (follow up)
这是以下问题的后续问题:Python: How can I run python functions in parallel?
最小工作示例:
'''
Created on 06.05.2015
'''
from multiprocessing import Process
import time
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
def func1():
s=time.time()
print 'func1: starting', s
for i in xrange(1000000000):
if i==i:
pass
e = time.time()
print 'func1: finishing', e
print 'duration', e-s
if __name__ == '__main__':
s =time.time()
runInParallel(func1, func1, func1, func1, func1)
print time.time()-s
这导致了这个(这正是我想要的):
func1: starting 1430920678.09
func1: starting 1430920678.53
func1: starting 1430920679.02
func1: starting 1430920679.57
func1: starting 1430920680.55
func1: finishing 1430920729.68
duration 51.1449999809
func1: finishing 1430920729.78
duration 51.6889998913
func1: finishing 1430920730.69
duration 51.1239998341
func1: finishing 1430920748.64
duration 69.6180000305
func1: finishing 1430920749.25
duration 68.7009999752
71.5629999638
但是,我的函数有很多参数,所以我这样测试它:
-> func1(a) 现在可以传递参数。
'''
Created on 06.05.2015
'''
from multiprocessing import Process
import time
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
def func1(a):
s=time.time()
print 'func1: starting', s
for i in xrange(a):
if i==i:
pass
e = time.time()
print 'func1: finishing', e
print 'duration', e-s
if __name__ == '__main__':
s =time.time()
g=s
runInParallel(func1(1000000000), func1(1000000000),
func1(1000000000), func1(1000000000),
func1(1000000000))
print time.time()-s
所以现在发生了这样的事情:
func1: starting 1430921299.08
func1: finishing 1430921327.84
duration 28.760999918
func1: starting 1430921327.84
func1: finishing 1430921357.68
duration 29.8410000801
func1: starting 1430921357.68
func1: finishing 1430921387.14
duration 29.4619998932
func1: starting 1430921387.14
func1: finishing 1430921416.52
duration 29.3849999905
func1: starting 1430921416.52
func1: finishing 1430921447.39
duration 30.864000082
151.392999887
流程现在是顺序的,不再是并行的,我不明白为什么!我错过了什么和做错了什么?
编辑:另外,如果一些参数是位置参数而其他参数是可选的,示例会是什么样子?
您必须使用参数 args
将您的参数传递给 Process
。例如:
def runInParallel(*fns):
proc = []
for fn, arg in fns:
p = Process(target=fn, args=(arg,))
p.start()
proc.append(p)
for p in proc:
p.join()
然后调用函数使用:
runInParallel((func1, 10**9),
(func1, 10**9),
(func1, 10**9))
此外,您可以考虑改用 Pool:
from multiprocessing import Pool
pool = Pool()
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))
编辑:
Process
和 Pool.apply_asynch
的工作方式相同。它们有两个可选参数 args
和 kwargs
。这些是 python:
中位置参数和关键字参数的标准变量
f(1, 2, a=3, b=4) # is equivalent to
args, kwargs = (1, 2), {"a":3, "b":4}
f(*args, **kwargs)
与 multiprocessing
相同的示例:
args, kwargs = (1, 2), {"a":3, "b":4}
Process(target=f, args=args, kwargs=kwargs).start()
# Or
pool = Pool()
args, kwargs = (1, 2), {"a":3, "b":4}
pool.apply_async(f, args, kwargs)
问题
我认为您的问题是因为您在第一个示例中提供了一个函数处理程序,而在第二个示例中直接计算了该函数。
即
func1
不等同于
func1 ()
解决方案
根据 s://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process 你必须像
一样单独给出你的论点
p = Process(target=fn, args=(10000000,))
希望对您有所帮助
如果您不介意使用 multiprocessing
的分支,您可以为并行 map
的目标使用多个参数来做一些非常酷的事情。在这里,我构建了一个函数,它需要 2 个参数,但也有一个可选参数以及 *args
和 **kwds
。我将构建一个具有随机长度的输入列表,运行 那些是并行的。
>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> pmap = PPool().map
>>> from pathos.multiprocessing import ThreadingPool as TPool
>>> tmap = TPool().map
>>> import numpy
>>>
>>> # build a function with multiple arguments, some optional
>>> def do_it(x,y,z=1,*args,**kwds):
... import time
... import random
... s = time.time()
... print 'starting', s
... time.sleep(random.random())
... res = sum([x,y,z]+list(args)+kwds.values())
... e = time.time()
... print 'finishing', e
... print 'duration', e-s
... return res
...
>>> # create a bunch of random-length arrays as input for do_it
>>> input = map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5))
>>> input
[array([ 0.25178071, 0.68871176, 0.92305523, 0.47103722]), array([ 0.14214278, 0.16747431, 0.59177496, 0.79984192]), array([ 0.20061353, 0.94339813, 0.67396539, 0.99919187]), array([ 0.63974882, 0.46868301, 0.59963679, 0.97704561]), array([ 0.14515633, 0.97824495, 0.57832663, 0.34167116])]
现在,让我们得到结果...
>>> # call do_it in parallel, with random-length inputs
>>> result = pmap(do_it, *input)
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
finishing 1431039903.21
finishing 1431039903.21
duration 0.358909130096
duration 0.35973405838
finishing 1431039903.21
finishing 1431039903.21
duration 0.359538078308
duration 0.358761072159
>>> result
[1.379442164896775, 3.2465121635066176, 3.3667590048477187, 3.5887877829029042]
当然,如果你想变得狡猾,你可以 运行 一个三层嵌套的映射全部在一行中。
>>> # do it, all in one line
>>> result = pmap(do_it, *map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
finishing 1431040673.73
finishing 1431040673.73
duration 0.110394001007
duration 0.111043930054
finishing 1431040673.73
duration 0.110962152481
finishing 1431040673.73
duration 0.110266923904
finishing 1431040673.74
duration 0.110939025879
>>> result
[1.9904591398425764, 1.932317817954369, 2.6365732054048432, 2.5168248011900047, 2.0410734229587968]
而且,您可能根本不使用阻塞或串行 map
,事情会非常快(我在这里忽略了 numpy 随机播种)。
>>> # get a non-blocking thread map and an asynchronous processing map
>>> itmap = TPool().imap
>>> apmap = Pool().amap
>>>
>>> # do it!
>>> result = apmap(do_it, *itmap(numpy.random.random, itmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431041250.33
starting 1431041250.33
starting 1431041250.33
finishing 1431041250.44
duration 0.110985040665
finishing 1431041250.44
duration 0.110254049301
finishing 1431041250.45
duration 0.110941886902
>>> result.get()
[3.6386644432719697, 0.43038222983159957, 3.6220901279963318]
在此处获取 pathos
:https://github.com/uqfoundation
这是以下问题的后续问题:Python: How can I run python functions in parallel?
最小工作示例:
'''
Created on 06.05.2015
'''
from multiprocessing import Process
import time
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
def func1():
s=time.time()
print 'func1: starting', s
for i in xrange(1000000000):
if i==i:
pass
e = time.time()
print 'func1: finishing', e
print 'duration', e-s
if __name__ == '__main__':
s =time.time()
runInParallel(func1, func1, func1, func1, func1)
print time.time()-s
这导致了这个(这正是我想要的):
func1: starting 1430920678.09
func1: starting 1430920678.53
func1: starting 1430920679.02
func1: starting 1430920679.57
func1: starting 1430920680.55
func1: finishing 1430920729.68
duration 51.1449999809
func1: finishing 1430920729.78
duration 51.6889998913
func1: finishing 1430920730.69
duration 51.1239998341
func1: finishing 1430920748.64
duration 69.6180000305
func1: finishing 1430920749.25
duration 68.7009999752
71.5629999638
但是,我的函数有很多参数,所以我这样测试它:
-> func1(a) 现在可以传递参数。
'''
Created on 06.05.2015
'''
from multiprocessing import Process
import time
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
def func1(a):
s=time.time()
print 'func1: starting', s
for i in xrange(a):
if i==i:
pass
e = time.time()
print 'func1: finishing', e
print 'duration', e-s
if __name__ == '__main__':
s =time.time()
g=s
runInParallel(func1(1000000000), func1(1000000000),
func1(1000000000), func1(1000000000),
func1(1000000000))
print time.time()-s
所以现在发生了这样的事情:
func1: starting 1430921299.08
func1: finishing 1430921327.84
duration 28.760999918
func1: starting 1430921327.84
func1: finishing 1430921357.68
duration 29.8410000801
func1: starting 1430921357.68
func1: finishing 1430921387.14
duration 29.4619998932
func1: starting 1430921387.14
func1: finishing 1430921416.52
duration 29.3849999905
func1: starting 1430921416.52
func1: finishing 1430921447.39
duration 30.864000082
151.392999887
流程现在是顺序的,不再是并行的,我不明白为什么!我错过了什么和做错了什么?
编辑:另外,如果一些参数是位置参数而其他参数是可选的,示例会是什么样子?
您必须使用参数 args
将您的参数传递给 Process
。例如:
def runInParallel(*fns):
proc = []
for fn, arg in fns:
p = Process(target=fn, args=(arg,))
p.start()
proc.append(p)
for p in proc:
p.join()
然后调用函数使用:
runInParallel((func1, 10**9),
(func1, 10**9),
(func1, 10**9))
此外,您可以考虑改用 Pool:
from multiprocessing import Pool
pool = Pool()
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))
编辑:
Process
和 Pool.apply_asynch
的工作方式相同。它们有两个可选参数 args
和 kwargs
。这些是 python:
f(1, 2, a=3, b=4) # is equivalent to
args, kwargs = (1, 2), {"a":3, "b":4}
f(*args, **kwargs)
与 multiprocessing
相同的示例:
args, kwargs = (1, 2), {"a":3, "b":4}
Process(target=f, args=args, kwargs=kwargs).start()
# Or
pool = Pool()
args, kwargs = (1, 2), {"a":3, "b":4}
pool.apply_async(f, args, kwargs)
问题
我认为您的问题是因为您在第一个示例中提供了一个函数处理程序,而在第二个示例中直接计算了该函数。
即
func1
不等同于
func1 ()
解决方案
根据 s://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process 你必须像
一样单独给出你的论点p = Process(target=fn, args=(10000000,))
希望对您有所帮助
如果您不介意使用 multiprocessing
的分支,您可以为并行 map
的目标使用多个参数来做一些非常酷的事情。在这里,我构建了一个函数,它需要 2 个参数,但也有一个可选参数以及 *args
和 **kwds
。我将构建一个具有随机长度的输入列表,运行 那些是并行的。
>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> pmap = PPool().map
>>> from pathos.multiprocessing import ThreadingPool as TPool
>>> tmap = TPool().map
>>> import numpy
>>>
>>> # build a function with multiple arguments, some optional
>>> def do_it(x,y,z=1,*args,**kwds):
... import time
... import random
... s = time.time()
... print 'starting', s
... time.sleep(random.random())
... res = sum([x,y,z]+list(args)+kwds.values())
... e = time.time()
... print 'finishing', e
... print 'duration', e-s
... return res
...
>>> # create a bunch of random-length arrays as input for do_it
>>> input = map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5))
>>> input
[array([ 0.25178071, 0.68871176, 0.92305523, 0.47103722]), array([ 0.14214278, 0.16747431, 0.59177496, 0.79984192]), array([ 0.20061353, 0.94339813, 0.67396539, 0.99919187]), array([ 0.63974882, 0.46868301, 0.59963679, 0.97704561]), array([ 0.14515633, 0.97824495, 0.57832663, 0.34167116])]
现在,让我们得到结果...
>>> # call do_it in parallel, with random-length inputs
>>> result = pmap(do_it, *input)
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
finishing 1431039903.21
finishing 1431039903.21
duration 0.358909130096
duration 0.35973405838
finishing 1431039903.21
finishing 1431039903.21
duration 0.359538078308
duration 0.358761072159
>>> result
[1.379442164896775, 3.2465121635066176, 3.3667590048477187, 3.5887877829029042]
当然,如果你想变得狡猾,你可以 运行 一个三层嵌套的映射全部在一行中。
>>> # do it, all in one line
>>> result = pmap(do_it, *map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
finishing 1431040673.73
finishing 1431040673.73
duration 0.110394001007
duration 0.111043930054
finishing 1431040673.73
duration 0.110962152481
finishing 1431040673.73
duration 0.110266923904
finishing 1431040673.74
duration 0.110939025879
>>> result
[1.9904591398425764, 1.932317817954369, 2.6365732054048432, 2.5168248011900047, 2.0410734229587968]
而且,您可能根本不使用阻塞或串行 map
,事情会非常快(我在这里忽略了 numpy 随机播种)。
>>> # get a non-blocking thread map and an asynchronous processing map
>>> itmap = TPool().imap
>>> apmap = Pool().amap
>>>
>>> # do it!
>>> result = apmap(do_it, *itmap(numpy.random.random, itmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431041250.33
starting 1431041250.33
starting 1431041250.33
finishing 1431041250.44
duration 0.110985040665
finishing 1431041250.44
duration 0.110254049301
finishing 1431041250.45
duration 0.110941886902
>>> result.get()
[3.6386644432719697, 0.43038222983159957, 3.6220901279963318]
在此处获取 pathos
:https://github.com/uqfoundation