运行 在 python 中并行使用位置参数和可选参数的函数(跟进)

Run function with positional and optional arguments in parallel in python (follow up)

这是以下问题的后续问题:Python: How can I run python functions in parallel?

最小工作示例:

'''
Created on 06.05.2015

'''
from multiprocessing import Process
import time

def runInParallel(*fns):
    proc = []
    for fn in fns:
        p = Process(target=fn)
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

def func1():
    s=time.time()
    print 'func1: starting', s 
    for i in xrange(1000000000):
        if i==i:
            pass
    e = time.time()
    print 'func1: finishing', e
    print 'duration', e-s

if __name__ == '__main__':
    s =time.time()
    runInParallel(func1, func1, func1, func1, func1)
    print time.time()-s

这导致了这个(这正是我想要的):

func1: starting 1430920678.09

func1: starting 1430920678.53

func1: starting 1430920679.02

func1: starting 1430920679.57

func1: starting 1430920680.55

func1: finishing 1430920729.68

duration 51.1449999809

func1: finishing 1430920729.78

duration 51.6889998913

func1: finishing 1430920730.69

duration 51.1239998341

func1: finishing 1430920748.64

duration 69.6180000305

func1: finishing 1430920749.25

duration 68.7009999752

71.5629999638

但是,我的函数有很多参数,所以我这样测试它:

-> func1(a) 现在可以传递参数。

'''
Created on 06.05.2015

'''
from multiprocessing import Process
import time

def runInParallel(*fns):
    proc = []
    for fn in fns:
        p = Process(target=fn)
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

def func1(a):
    s=time.time()
    print 'func1: starting', s 
    for i in xrange(a):
        if i==i:
            pass
    e = time.time()
    print 'func1: finishing', e
    print 'duration', e-s

if __name__ == '__main__':
    s =time.time()
    g=s
    runInParallel(func1(1000000000), func1(1000000000),
                  func1(1000000000), func1(1000000000),
                  func1(1000000000))
    print time.time()-s

所以现在发生了这样的事情:

func1: starting 1430921299.08

func1: finishing 1430921327.84

duration 28.760999918

func1: starting 1430921327.84

func1: finishing 1430921357.68

duration 29.8410000801

func1: starting 1430921357.68

func1: finishing 1430921387.14

duration 29.4619998932

func1: starting 1430921387.14

func1: finishing 1430921416.52

duration 29.3849999905

func1: starting 1430921416.52

func1: finishing 1430921447.39

duration 30.864000082

151.392999887

流程现在是顺序的,不再是并行的,我不明白为什么!我错过了什么和做错了什么?

编辑:另外,如果一些参数是位置参数而其他参数是可选的,示例会是什么样子?

您必须使用参数 args 将您的参数传递给 Process。例如:

def runInParallel(*fns):
    proc = []
    for fn, arg in fns:
        p = Process(target=fn, args=(arg,))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

然后调用函数使用:

runInParallel((func1, 10**9),
              (func1, 10**9),
              (func1, 10**9))

此外,您可以考虑改用 Pool

from multiprocessing import Pool

pool = Pool()
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))

编辑:

ProcessPool.apply_asynch 的工作方式相同。它们有两个可选参数 argskwargs。这些是 python:

中位置参数和关键字参数的标准变量
f(1, 2, a=3, b=4)  # is equivalent to
args, kwargs = (1, 2), {"a":3, "b":4}
f(*args, **kwargs)

multiprocessing 相同的示例:

args, kwargs = (1, 2), {"a":3, "b":4}
Process(target=f, args=args, kwargs=kwargs).start()
# Or
pool = Pool()
args, kwargs = (1, 2), {"a":3, "b":4}
pool.apply_async(f, args, kwargs)

问题

我认为您的问题是因为您在第一个示例中提供了一个函数处理程序,而在第二个示例中直接计算了该函数。

func1

不等同于

func1 ()

解决方案

根据 s://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process 你必须像

一样单独给出你的论点
p = Process(target=fn, args=(10000000,))

希望对您有所帮助

如果您不介意使用 multiprocessing 的分支,您可以为并行 map 的目标使用多个参数来做一些非常酷的事情。在这里,我构建了一个函数,它需要 2 个参数,但也有一个可选参数以及 *args**kwds。我将构建一个具有随机长度的输入列表,运行 那些是并行的。

>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> pmap = PPool().map
>>> from pathos.multiprocessing import ThreadingPool as TPool
>>> tmap = TPool().map
>>> import numpy
>>>
>>> # build a function with multiple arguments, some optional
>>> def do_it(x,y,z=1,*args,**kwds):
...   import time
...   import random
...   s = time.time()
...   print 'starting', s
...   time.sleep(random.random())
...   res = sum([x,y,z]+list(args)+kwds.values())
...   e = time.time()
...   print 'finishing', e
...   print 'duration', e-s
...   return res
... 
>>> # create a bunch of random-length arrays as input for do_it
>>> input = map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5))
>>> input
[array([ 0.25178071,  0.68871176,  0.92305523,  0.47103722]), array([ 0.14214278,  0.16747431,  0.59177496,  0.79984192]), array([ 0.20061353,  0.94339813,  0.67396539,  0.99919187]), array([ 0.63974882,  0.46868301,  0.59963679,  0.97704561]), array([ 0.14515633,  0.97824495,  0.57832663,  0.34167116])] 

现在,让我们得到结果...

>>> # call do_it in parallel, with random-length inputs
>>> result = pmap(do_it, *input)
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
finishing 1431039903.21
finishing 1431039903.21
duration 0.358909130096
duration 0.35973405838
finishing 1431039903.21
finishing 1431039903.21
duration 0.359538078308
duration 0.358761072159
>>> result
[1.379442164896775, 3.2465121635066176, 3.3667590048477187, 3.5887877829029042]

当然,如果你想变得狡猾,你可以 运行 一个三层嵌套的映射全部在一行中。

>>> # do it, all in one line
>>> result = pmap(do_it, *map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
finishing 1431040673.73
finishing 1431040673.73
duration 0.110394001007
duration 0.111043930054
finishing 1431040673.73
duration 0.110962152481
finishing 1431040673.73
duration 0.110266923904
finishing 1431040673.74
duration 0.110939025879
>>> result
[1.9904591398425764, 1.932317817954369, 2.6365732054048432, 2.5168248011900047, 2.0410734229587968]

而且,您可能根本不使用阻塞或串行 map,事情会非常快(我在这里忽略了 numpy 随机播种)。

>>> # get a non-blocking thread map and an asynchronous processing map
>>> itmap = TPool().imap
>>> apmap = Pool().amap
>>>
>>> # do it!
>>> result = apmap(do_it, *itmap(numpy.random.random, itmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431041250.33
starting 1431041250.33
starting 1431041250.33
finishing 1431041250.44
duration 0.110985040665
finishing 1431041250.44
duration 0.110254049301
finishing 1431041250.45
duration 0.110941886902
>>> result.get()
[3.6386644432719697, 0.43038222983159957, 3.6220901279963318]

在此处获取 pathoshttps://github.com/uqfoundation