具有多个参数和 return 值的两个并行函数

Two functions in parallel with multiple arguments and return values

我有两个独立的函数。他们每个人都需要相当长的时间来执行。

def function1(arg):
     do_some_stuff_here
     return result1

def function2(arg1, arg2, arg3):
     do_some_stuff_here
     return result2

我想并行启动它们,得到它们的结果(知道哪个是哪个)然后处理结果。据我了解,在 Python 2.7(GIL 相关问题)中,多处理比线程更有效。但是我有点迷茫到底是使用 Process、Pool 还是 Queue 更好,以及如何针对我的用例以正确的 pythonic 方式实现它们。

感谢任何帮助;)

首先,进程、池和队列都有不同的用例。

Process is used to spawn a process by creating the Process object.

from multiprocessing import Process

def method1():
    print "in method1"
    print "in method1"

def method2():
    print "in method2"
    print "in method2"

p1 = Process(target=method1) # create a process object p1
p1.start()                   # starts the process p1
p2 = Process(target=method2)
p2.start()

Pool is used to parallelize execution of function across multiple input values.

from multiprocessing import Pool

def method1(x):
    print x
    print x**2
    return x**2

p = Pool(3)
result = p.map(method1, [1,4,9]) 
print result          # prints [1, 16, 81]

Queue is used to communicate between processes.

from multiprocessing import Process, Queue

def method1(x, l1):
    print "in method1"
    print "in method1"
    l1.put(x**2)
    return x

def method2(x, l2):
    print "in method2"
    print "in method2"
    l2.put(x**3)
    return x

l1 = Queue()
p1 = Process(target=method1, args=(4, l1, ))  
l2 = Queue()
p2 = Process(target=method2, args=(2, l2, )) 
p1.start()   
p2.start()      
print l1.get()          # prints 16
print l2.get()          # prints 8

现在,对于您的情况,您可以使用 Process & Queue(第 3 种方法),或者您可以操纵 pool 方法来工作(如下)

import itertools
from multiprocessing import Pool
import sys

def method1(x):         
    print x
    print x**2
    return x**2

def method2(x):        
    print x
    print x**3
    return x**3

def unzip_func(a, b):  
    return a, b    

def distributor(option_args):
    option, args = unzip_func(*option_args)    # unzip option and args 

    attr_name = "method" + str(option)            
    # creating attr_name depending on option argument

    value = getattr(sys.modules[__name__], attr_name)(args) 
    # call the function with name 'attr_name' with argument args

    return value


option_list = [1,2]      # for selecting the method number
args_list = [4,2]        
# list of arg for the corresponding method, (argument 4 is for method1)

p = Pool(3)              # creating pool of 3 processes

result = p.map(distributor, itertools.izip(option_list, args_list)) 
# calling the distributor function with args zipped as (option1, arg1), (option2, arg2) by itertools package
print result             # prints [16,8]

希望对您有所帮助。

这是我刚找到的另一个例子,希望对你有帮助,又好又简单;)

from multiprocessing import Pool

def square(x):
    return x * x

def cube(y):
    return y * y * y

pool = Pool(processes=20)

result_squares = pool.map_async(square, range(10))
result_cubes = pool.map_async(cube, range(10))

print result_squares.get(timeout=3)
print result_cubes.get(timeout=3)

这是一个例子:

  1. 运行 使用 Pool(方形函数)并行处理多个输入的单个函数
  2. 运行 具有不同输入(args 和 kwargs)的多个函数并使用池(pf1、pf2、pf3 函数)收集它们的结果
import datetime
import multiprocessing
import time
import random

from multiprocessing import Pool

def square(x):
    # calculate the square of the value of x
    print(x, x*x)
    return x*x

def pf1(*args, **kwargs):
    sleep_time = random.randint(3, 6)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf1", args, sleep_time, datetime.datetime.now()))
    print("Keyword Args from pf1: %s" % kwargs)
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf1 done at %s\n" % datetime.datetime.now())
    return (sum(*args), kwargs)

def pf2(*args):
    sleep_time = random.randint(7, 10)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf2", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf2 done at %s\n" % datetime.datetime.now())
    return sum(*args)

def pf3(*args):
    sleep_time = random.randint(0, 3)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf3", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf3 done at %s\n" % datetime.datetime.now())
    return sum(*args)

def smap(f, *arg):
    if len(arg) == 2:
        args, kwargs = arg
        return f(list(args), **kwargs)
    elif len(arg) == 1:
        args = arg
        return f(*args)


if __name__ == '__main__':

    # Define the dataset
    dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

    # Output the dataset
    print ('Dataset: ' + str(dataset))

    # Run this with a pool of 5 agents having a chunksize of 3 until finished
    agents = 5
    chunksize = 3
    with Pool(processes=agents) as pool:
        result = pool.map(square, dataset)
    print("Result of Squares : %s\n\n" % result)
    with Pool(processes=3) as pool:
        result = pool.starmap(smap, [(pf1, [1,2,3], {'a':123, 'b':456}), (pf2, [11,22,33]), (pf3, [111,222,333])])

    # Output the result
    print ('Result: %s ' % result)


Output:
*******

Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
1 1
2 4
3 9
4 16
6 36
7 49
8 64
59 81
 25
10 100
11 121
12 144
13 169
14 196
Result of Squares : [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]


Process : ForkPoolWorker-6  Function : pf1  Args: ([1, 2, 3],)  sleeping for 3  Time : 2020-07-20 00:51:56.477299

Keyword Args from pf1: {'a': 123, 'b': 456}
Process : ForkPoolWorker-7  Function : pf2  Args: ([11, 22, 33],)   sleeping for 8  Time : 2020-07-20 00:51:56.477371

Process : ForkPoolWorker-8  Function : pf3  Args: ([111, 222, 333],)    sleeping for 1  Time : 2020-07-20 00:51:56.477918

ForkPoolWorker-8    pf3 done at 2020-07-20 00:51:57.478808

ForkPoolWorker-6    pf1 done at 2020-07-20 00:51:59.478877

ForkPoolWorker-7    pf2 done at 2020-07-20 00:52:04.478016

Result: [(6, {'a': 123, 'b': 456}), 66, 666] 

Process finished with exit code 0