Python,在并行循环中将 key:value 添加到字典

Python, add key:value to dictionary in parallelised loop

我已经编写了一些代码来并行执行一些计算 (joblib) 并使用计算结果更新字典。该代码由一个主函数组成,该函数并行调用生成器函数和计算函数 运行。计算结果(key:value 对)由计算函数的每个实例添加到在主函数和市场中创建的字典中作为全局。

下面是我的代码的简化版本,说明了上述过程。

当一切运行s时,结果字典(d_result)是空的,但它应该已经填充了计算函数生成的结果。为什么会这样?

import numpy as np
from joblib import Parallel, delayed


def do_calc(d, r, pair_index): # function to be run in parallel

    data_1 = d[str(r)][pair_index, 1]
    data_2 = d[str(r)][pair_index, 2]
    result_name = str(data_1) + " ^ " + str(data_2)
    result = data_1 ** data_2
    d_result[result_name] = result
    # d_result.setdefault(result_name, []).append(result)  ## same result as above


def compute_indices(d): # generator function

    for r in d:
        num_pairs = d[str(r)].shape[0]
        for pair_index in range(num_pairs):
            yield r, pair_index


def process(): # main function

    global d_result
    d_result = {}
    r1 = np.array([['ab', 1, 2], ['vw', 10, 12]], dtype=object)
    r2 = np.array([['ac', 1, 3], ['vx', 10, 13]], dtype=object)
    r3 = np.array([['ad', 1, 4], ['vy', 10, 14]], dtype=object)
    r4 = np.array([['ae', 1, 5], ['vz', 10, 15]], dtype=object)
    d = {'r1': r1, 'r2': r2, 'r3': r3, 'r4': r4}
    Parallel(n_jobs=4)(delayed(do_calc)(d, r, pair_index) for r, pair_index in (compute_indices)(d))
    print(d_result)


process()

好的,我知道了。答案和新代码如下:

do_calc() 函数现在生成一个空字典,然后用单个 key:value 对和 returns 字典填充它。

process() 中的并行位默认创建从 do_calc() 返回的列表。所以在并行化 do_calc() 之后我得到的是一个字典列表。

我真正想要的是一个字典,所以使用字典理解我将字典列表转换为字典,wala,她很好!

这有帮助:python convert list of single key dictionaries into a single dictionary

import numpy as np
from joblib import Parallel, delayed


def do_calc(d, r, pair_index):  # calculation function to be run in parallel

    data_1 = d[str(r)][pair_index, 1]
    data_2 = d[str(r)][pair_index, 2]
    result_name = str(data_1) + " ^ " + str(data_2)
    result = data_1 ** data_2
    d_result = {}  # create empty dict
    d_result[result_name] = result  #add key:value pair to dict
    return d_result  # return dict


def compute_indices(d):  # generator function

    for r in d:
        num_pairs = d[str(r)].shape[0]
        for pair_index in range(num_pairs):
            yield r, pair_index


def process():  # main function

    r1 = np.array([['ab', 1, 2], ['vw', 10, 12]], dtype=object)
    r2 = np.array([['ac', 1, 3], ['vx', 10, 13]], dtype=object)
    r3 = np.array([['ad', 1, 4], ['vy', 10, 14]], dtype=object)
    r4 = np.array([['ae', 1, 5], ['vz', 10, 15]], dtype=object)
    d = {'r1': r1, 'r2': r2, 'r3': r3, 'r4': r4}
    # parallelised calc.  Each run returns dict, final output is list of dicts
    d_result = Parallel(n_jobs=4)(delayed(do_calc)(d, r, pair_index) for r, pair_index in (compute_indices)(d))
    # transform list of dicts to dict
    d_result = {k: v for x in d_result for k, v in x.items()}
    print(d_result)

process()

很高兴您的程序可以运行。但是我认为您忽略了一些重要的事情,如果您将您的示例用作更大程序的基础,您可能 运行 会遇到麻烦。

我扫描了 joblib 的文档,发现它是基于 Python 多处理模块构建的。所以 multiprocessing programming guidelines 适用。

起初我不明白为什么你的新程序 运行 成功了,而原来的程序却没有。原因如下(来自上面的 link): "Bear in mind that if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that Process.start was called." 这是因为每个子进程至少在概念上都有自己的 Python 解释器副本。在每个子进程中,必须导入该进程使用的代码。如果该代码声明了全局变量,那么这两个进程将具有这些全局变量的 单独副本 ,即使在您阅读代码时看起来并非如此。所以当你原来程序的子进程往全局d_result里放数据的时候,其实和父进程里的d_result是一个不同的对象。再次来自文档:“确保主模块可以由新的 Python 解释器安全导入,而不会导致意外的副作用(例如启动新进程)。

例如,在 Windows 运行ning 下,以下模块将因运行时错误而失败:

from multiprocessing import Process

def foo():
    print 'hello'

p = Process(target=foo)
p.start()

相反,应该使用 if __name__ == '__main__' 来保护程序的入口点。"

所以在你的程序(第二个版本)中添加一行代码很重要,就在最后一行之前:

if __name__ == "__main__":
    process()

如果不这样做,可能会导致一些您不想花时间处理的严重错误。