Python 多处理模块

Python Multiprocessing Modules

编辑:更新了环境信息(参见第一部分)

环境

我正在使用 Python 2.7

Ubuntu 16.04

问题

我有一个应用程序,我已将其简化为一个三阶段过程:

  1. 从多个数据源(HTTP 请求、系统信息等)收集数据
  2. 根据此数据计算指标
  3. 以各种格式输出这些指标

这些阶段中的每一个都必须在进入下一阶段之前完成,但是每个阶段都包含多个可以并行 运行 的子任务(我可以发送 3 个 HTTP 请求并读取系统日志在等待他们 return)

我将阶段划分为模块,将子任务划分为子模块,所以我的项目层次结构如下所示:

+ datasources
|-- __init__.py
|-- data_one.py
|-- data_two.py
|-- data_three.py
+ metrics
|-- __init__.py
|-- metric_one.py
|-- metric_two.py
+ outputs
|-- output_one.py
|-- output_two.py
- app.py

app.py 看起来大致像这样(为简洁起见的伪代码):

import datasources
import metrics
import outputs

for datasource in dir(datasources):
    datasource.refresh()
for metric in dir(metrics):
    metric.calculate()
for output in dir(outputs):
    output.dump()

(有额外的代码包装 dir 忽略系统模块的调用,有异常处理等——但这是它的要点)

每个数据源子模块大致如下所示:

data = []

def refresh():
    # Populate the "data" member somehow
    data = [1, 2, 3]
    return

每个指标子模块大致如下所示:

import datasources.data_one as data_one
import datasources.data_two as data_two

data = []

def calculate():
    # Use the datasources to compute the metric
    data = [sum(x) for x in zip(data_one, data_two)]
    return

为了并行化第一阶段(数据源),我写了一些简单的东西,如下所示:

def run_thread(datasource):
    datasource.refresh()

threads = []
for datasource in dir(datasources):
    thread = threading.Thread(target=run_thread, args=(datasource))
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()

这行得通,之后我可以计算任何指标并填充 datasources.x.data 属性

为了并行化第二阶段(指标),因为它较少依赖 I/O 而更多地依赖 CPU,我觉得简单的线程实际上不会加快速度,我需要多处理模块,以利用多核。我写了以下内容:

def run_pool(calculate):
    calculate()

pool = multiprocessing.Pool()
pool.map(run_pool, [m.calculate for m in dir(metrics)]
pool.close()
pool.join()

此代码 运行s 持续了几秒钟(所以我认为它有效?)但是当我尝试时:

metrics.metric_one.data

it returns [],就像模块从来没有 运行

通过使用 multiprocessing 模块,它似乎以某种方式限定了线程的范围,以便它们不再共享数据属性。我应该如何重写它,以便我可以并行计算每个指标,利用多核,但在完成后仍然可以访问数据?

ProcessThread 在 python 中的表现完全不同。如果你想使用多处理,你将需要使用同步数据类型来传递信息。

例如,您可以使用 multiprocessing.Array,它可以在您的进程之间共享。

有关详细信息,请参阅文档:https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes

根据评论再次更新: 由于您使用的是 2.7,并且您正在处理模块而不是对象,因此您在挑选所需内容时遇到了问题。解决方法不是很好。它涉及将每个模块的名称传递给您的操作函数。我更新了 partial 部分,还更新了删除 with 语法。

几件事:

首先,一般来说,多核比多线程好。使用线程,你总是 运行 有处理全局解释器锁的风险,这可能是非常低效的。如果您使用多核,这就不是问题了。

其次,您的概念是正确的,但是您通过拥有一个全局到模块的数据成员使它变得奇怪。使您的来源 return 成为您感兴趣的数据,并使您的指标(和输出)将数据列表作为输入并输出结果列表。

这会将您的伪代码变成如下所示:

app.py:

import datasources
import metrics
import outputs

pool = multiprocessing.Pool()
data_list = pool.map(lambda o: o.refresh, list(dir(datasources)))
pool.close()
pool.join()

pool = multiprocessing.Pool()
metrics_funcs = [(m, data_list) for m in dir(metrics)]
metrics_list = pool.map(lambda m: m[0].calculate(m[1]), metrics_funcs)
pool.close()
pool.join()

pool = multiprocessing.Pool()
output_funcs = [(o, data_list, metrics_list) for o in dir(outputs)]
output_list = pool.map(lambda o: o[0].dump(o[1], o[2]), output_funcs)
pool.close()
pool.join()

执行此操作后,您的数据源将如下所示:

def refresh():
    # Populate the "data" member somehow
    return [1, 2, 3]

而您的指标将如下所示:

def calculate(data_list):
    # Use the datasources to compute the metric
    return [sum(x) for x in zip(data_list)]

最后,您的输出可能如下所示:

def dump(data_list, metrics_list):
    # do whatever; you now have all the information

删除数据 "global" 并传递它使每个部分更清晰(并且更容易测试)。这突出了使每件作品完全独立。如您所见,我所做的只是更改传递给 map 的列表中的内容,在这种情况下,我通过将它们作为元组传递并在功能。当然,您不必使用 lambda。您可以单独定义每个函数,但实际上没有太多要定义的。但是,如果您确实定义了每个函数,则可以使用 partial 函数来减少传递的参数数量。我经常使用该模式,在您更复杂的代码中,您可能需要这样做。这是一个例子:

from functools import partial

do_dump(module_name, data_list, metrics_list):
    globals()[module_name].dump(data_list, metrics_list)

invoke = partial(do_dump, data_list=data_list, metrics_list=metrics_list)
with multiprocessing.Pool() as pool:
    output_list = pool.map(invoke, [o.__name__ for o in dir(outputs)])
    pool.close()
    pool.join()

更新,根据评论:

当您使用 map 时,您可以保证输入的顺序与输出的顺序相匹配,即 data_list[i] 是 运行ning dir(datasources)[i].refresh() 的输出。我不会将数据源模块导入指标,而是将此更改为 app.py:

data_list = ...
pool.close()
pool.join()
data_map = {name: data_list[i] for i, name in enumerate(dir(datasources))}

然后将 data_map 传递给每个指标。然后指标通过名称获取它想要的数据,例如

d1 = data_map['data_one']
d2 = data_map['data_two']
return [sum(x) for x in zip([d1, d2])]