多处理 python 中的一个函数,它有多个参数

Multiprocess a function in python that got multiple parameters

我正在尝试使用 python 中的多处理库,但遇到了一些困难:

def request_solr(limit=10, offset=10):
    # build my facets here using limit and offset
    # request solr
    return response.json()

def get_list_event_per_user_per_mpm(limit=100):
    nb_unique_user = get_unique_user()
    print "Unique user: ", nb_unique_user
    processor_pool = multiprocessing.Pool(4)
    offset = range(0, nb_unique_user, limit)
    list_event_per_user = processor_pool.map(request_solr(limit), offset)
    return list_event_per_user

我不确定如何将第二个参数传递给函数。我怎样才能让它工作。我遇到以下错误:

TypeError: 'dict' object is not callable

您需要为此使用 lambda。你现在这样做的方式是,它试图将 request_solr 的结果映射为一个以 offset 作为参数的函数。

这应该可以解决问题。

processor_pool.map(lambda x: request_solr(limit, x), offset)

请注意,这仅适用于 3.x。在 2.x 中,您需要创建一个函数对象。例如:

class RequestSolrCaller:
    def __init__(self, limit)
        self.limit = limit
    def __call__(self, offset)
        return request_solr(self.limit, offset)

processor_pool.map(RequestSolrCaller(limit), offset)

您看到该错误是因为您在 调用函数之前 将其传递给多处理。

我建议你使用starmap in combination with itertools.repeat:

import itertools as it

# rest of your code

processor_pool = multiprocessing.Pool(4)
offset = range(0, nb_unique_user, limit)
list_event_per_user = processor_pool.starmap(request_solr, zip(it.repeat(limit), offset))

Starmap 将调用您的函数,将这对值扩展为两个参数。 repeat(limit) 只是生成一个所有元素都等于 limit.

的可迭代对象

这适用于任意数量的参数:

def my_function(a, b, c, d, e):
    return a+b+c+d+e

pool = Pool()
pool.starmap(my_function, [(1,2,3,4,5)])   # calls my_function(1,2,3,4,5)

由于您使用的是旧版本的 python,您必须通过修改函数或使用包装函数来解决此问题:

def wrapper(arguments):
    return request_solr(*arguments)

# later:

pool.map(wrapper, zip(repeat(limit), offset))

我曾经使用生成器来生成关键字。这是我的simple_multiproc.py的内容。

注意 request_solr 级别模块的重要性。

import multiprocessing

MAX=5

def _get_pool_args(**kw):
    for _ in range(MAX):
        r = {"limit": 10, "offset": 10}
        r.update(kw)
        yield r


def request_solr(limit=10, offset=10):
    # build my facets here using limit and offset
    # request solr
    print(locals())
    response.json()

if __name__ == "__main__":
    pool = multiprocessing.Pool(MAX)
    pool.map(request_solr, _get_pool_args())