在 Python 2.7 中并行化 for 循环

Parallelizing for loop in Python 2.7

我对 Python(和一般编码)还很陌生,我需要帮助并行化下面的代码。我环顾四周,发现了一些可能有用的软件包(例如 Multiprocessing 和 JobLib)。

但是,我在示例中使用它时遇到了问题。我的代码制作了一个输出文件,并在循环中更新它。因此它不能直接paralisable,所以我想我需要制作更小的文件。在此之后,我可以将文件合并在一起。

我找不到办法做到这一点,有好心人给我一个好的开始吗?

感谢您的帮助, 代码新手

代码:

def delta(graph,n,t,nx,OutExt):
    fout_=open(OutExt+'Delta'+str(t)+'.txt','w')
    temp=nx.Graph(graph)
    for u in range(0,n):
        #print "stamp: "+str(t)+" node: "+str(u)
        for v in range(u+1,n):
            #print str(u)+"\t"+str(v)
            Stat = dict()
            temp.add_edge(u,v)
            MineDeltaGraphletTransitionsFromDynamicNetwork(graph,temp,Stat,u,v)
            for a in Stat:
                for b in Stat[a]:
                    fout_.write(str(t)+"\t"+str(u)+"\t"+str(v)+"\t"+str(a)+"\t"+str(b)+"\t"+str(Stat[a][b])+"\n")
            if not graph.has_edge(u,v):
                temp.remove_edge(u,v)
    del temp
    fout_.close()

最佳使用pool.map。这是一个显示您需要做什么的示例。这里有一个多处理如何与池一起工作的简单示例:

单线程,基本功能:

def f(x):
    return x*x

if __name__ == '__main__':
     print(map(f, [1, 2, 3]))

>> [1, 4, 9]

使用多个处理器:

from multiprocessing import Pool 

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(3) # 3 parallel pools
    print(p.map(f, [1, 2, 3]))

使用 1 个处理器

from multiprocessing.pool import ThreadPool as Pool 

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(3) # 3 parallel pools
    print(p.map(f, [1, 2, 3]))

当您使用 map 时,您可以轻松地从函数的结果中返回一个列表。

首先,找到您希望能够与某些东西(可能与同一函数的其他调用)并行执行的代码部分。然后,弄清楚如何让这段代码与其他任何东西共享可变状态。

可变状态是并行执行的敌人。如果两段代码并行执行并共享可变状态,您将不知道结果会是什么(并且每次 运行 程序的结果都会不同)。这是因为您不知道并行执行中的代码将 运行 的顺序是什么。也许第一个会改变一些东西,然后第二个会计算一些东西。或者也许第二个会计算一些东西,然后第一个会改变它。谁知道?这个问题有解决方案,但它们涉及 fine-grained 锁定和仔细推理可以更改的内容和时间。

在你拥有一个核心不共享可变状态的算法后,将其分解为一个单独的函数(将局部变量转换为参数)。

最后,使用类似 threading(如果您的计算主要在具有良好 GIL 行为的 CPython 扩展模块中)或 multiprocessing(否则)模块来执行算法核心函数(您已经抽象出来)在某种程度上的并行性。

您共享的特定代码示例是一个挑战,因为您使用了 NetworkX 库和大量共享的可变状态。显然,循环的每次迭代都取决于前一次的结果。这显然不是您 可以 并行化的东西。然而,也许如果你更抽象地考虑你的目标,你将能够想出一种方法来实现它(记住,关键是能够表达你的算法 without using shared mutable州)。

您的函数名为 delta。或许您可以将图表拆分为 sub-graphs 并并行计算每个图表的增量(现在不再 共享 )。

如果最外层循环中的代码是并发安全的(我不知道是否是),您可以像这样重写它以并行执行:

from multiprocessing import Pool

def do_one_step(nx, graph, n, t, OutExt, u):
    # Create a separate output file for this set of results.
    name = "{}Delta{}-{}.txt".format(OutExt, t, u)
    fout_ = open(name, 'w')
    temp = nx.Graph(graph)

    for v in range(u+1,n):
        Stat = dict()
        temp.add_edge(u,v)
        MineDeltaGraphletTransitionsFromDynamicNetwork(graph,temp,Stat,u,v)
        for a in Stat:
            for b in Stat[a]:
                fout_.write(str(t)+"\t"+str(u)+"\t"+str(v)+"\t"+str(a)+"\t"+str(b)+"\t"+str(Stat[a][b])+"\n")
        if not graph.has_edge(u,v):
            temp.remove_edge(u,v)
    fout_.close()

def delta(graph,n,t,nx,OutExt):
    pool = Pool()
    pool.map(
        partial(
            do_one_step,
            nx,
            graph,
            n,
            t,
            OutExt,
        ),
        range(0,n),
    )

这假设所有参数都可以跨进程序列化(对于传递给使用 multiprocessing 调用的函数的任何参数都是必需的)。我怀疑 nxgraph 可能有问题,但我不知道它们是什么。

再说一次,这假定并发执行内循环实际上是正确