在 Python 2.7 中并行化 for 循环
Parallelizing for loop in Python 2.7
我对 Python(和一般编码)还很陌生,我需要帮助并行化下面的代码。我环顾四周,发现了一些可能有用的软件包(例如 Multiprocessing 和 JobLib)。
但是,我在示例中使用它时遇到了问题。我的代码制作了一个输出文件,并在循环中更新它。因此它不能直接paralisable,所以我想我需要制作更小的文件。在此之后,我可以将文件合并在一起。
我找不到办法做到这一点,有好心人给我一个好的开始吗?
感谢您的帮助,
代码新手
代码:
def delta(graph,n,t,nx,OutExt):
fout_=open(OutExt+'Delta'+str(t)+'.txt','w')
temp=nx.Graph(graph)
for u in range(0,n):
#print "stamp: "+str(t)+" node: "+str(u)
for v in range(u+1,n):
#print str(u)+"\t"+str(v)
Stat = dict()
temp.add_edge(u,v)
MineDeltaGraphletTransitionsFromDynamicNetwork(graph,temp,Stat,u,v)
for a in Stat:
for b in Stat[a]:
fout_.write(str(t)+"\t"+str(u)+"\t"+str(v)+"\t"+str(a)+"\t"+str(b)+"\t"+str(Stat[a][b])+"\n")
if not graph.has_edge(u,v):
temp.remove_edge(u,v)
del temp
fout_.close()
最佳使用pool.map。这是一个显示您需要做什么的示例。这里有一个多处理如何与池一起工作的简单示例:
单线程,基本功能:
def f(x):
return x*x
if __name__ == '__main__':
print(map(f, [1, 2, 3]))
>> [1, 4, 9]
使用多个处理器:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(3) # 3 parallel pools
print(p.map(f, [1, 2, 3]))
使用 1 个处理器
from multiprocessing.pool import ThreadPool as Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(3) # 3 parallel pools
print(p.map(f, [1, 2, 3]))
当您使用 map 时,您可以轻松地从函数的结果中返回一个列表。
首先,找到您希望能够与某些东西(可能与同一函数的其他调用)并行执行的代码部分。然后,弄清楚如何让这段代码不与其他任何东西共享可变状态。
可变状态是并行执行的敌人。如果两段代码并行执行并共享可变状态,您将不知道结果会是什么(并且每次 运行 程序的结果都会不同)。这是因为您不知道并行执行中的代码将 运行 的顺序是什么。也许第一个会改变一些东西,然后第二个会计算一些东西。或者也许第二个会计算一些东西,然后第一个会改变它。谁知道?这个问题有解决方案,但它们涉及 fine-grained 锁定和仔细推理可以更改的内容和时间。
在你拥有一个核心不共享可变状态的算法后,将其分解为一个单独的函数(将局部变量转换为参数)。
最后,使用类似 threading
(如果您的计算主要在具有良好 GIL 行为的 CPython 扩展模块中)或 multiprocessing
(否则)模块来执行算法核心函数(您已经抽象出来)在某种程度上的并行性。
您共享的特定代码示例是一个挑战,因为您使用了 NetworkX 库和大量共享的可变状态。显然,循环的每次迭代都取决于前一次的结果。这显然不是您 可以 并行化的东西。然而,也许如果你更抽象地考虑你的目标,你将能够想出一种方法来实现它(记住,关键是能够表达你的算法 without using shared mutable州)。
您的函数名为 delta
。或许您可以将图表拆分为 sub-graphs 并并行计算每个图表的增量(现在不再 共享 )。
如果最外层循环中的代码是并发安全的(我不知道是否是),您可以像这样重写它以并行执行:
from multiprocessing import Pool
def do_one_step(nx, graph, n, t, OutExt, u):
# Create a separate output file for this set of results.
name = "{}Delta{}-{}.txt".format(OutExt, t, u)
fout_ = open(name, 'w')
temp = nx.Graph(graph)
for v in range(u+1,n):
Stat = dict()
temp.add_edge(u,v)
MineDeltaGraphletTransitionsFromDynamicNetwork(graph,temp,Stat,u,v)
for a in Stat:
for b in Stat[a]:
fout_.write(str(t)+"\t"+str(u)+"\t"+str(v)+"\t"+str(a)+"\t"+str(b)+"\t"+str(Stat[a][b])+"\n")
if not graph.has_edge(u,v):
temp.remove_edge(u,v)
fout_.close()
def delta(graph,n,t,nx,OutExt):
pool = Pool()
pool.map(
partial(
do_one_step,
nx,
graph,
n,
t,
OutExt,
),
range(0,n),
)
这假设所有参数都可以跨进程序列化(对于传递给使用 multiprocessing
调用的函数的任何参数都是必需的)。我怀疑 nx
和 graph
可能有问题,但我不知道它们是什么。
再说一次,这假定并发执行内循环实际上是正确。
我对 Python(和一般编码)还很陌生,我需要帮助并行化下面的代码。我环顾四周,发现了一些可能有用的软件包(例如 Multiprocessing 和 JobLib)。
但是,我在示例中使用它时遇到了问题。我的代码制作了一个输出文件,并在循环中更新它。因此它不能直接paralisable,所以我想我需要制作更小的文件。在此之后,我可以将文件合并在一起。
我找不到办法做到这一点,有好心人给我一个好的开始吗?
感谢您的帮助, 代码新手
代码:
def delta(graph,n,t,nx,OutExt):
fout_=open(OutExt+'Delta'+str(t)+'.txt','w')
temp=nx.Graph(graph)
for u in range(0,n):
#print "stamp: "+str(t)+" node: "+str(u)
for v in range(u+1,n):
#print str(u)+"\t"+str(v)
Stat = dict()
temp.add_edge(u,v)
MineDeltaGraphletTransitionsFromDynamicNetwork(graph,temp,Stat,u,v)
for a in Stat:
for b in Stat[a]:
fout_.write(str(t)+"\t"+str(u)+"\t"+str(v)+"\t"+str(a)+"\t"+str(b)+"\t"+str(Stat[a][b])+"\n")
if not graph.has_edge(u,v):
temp.remove_edge(u,v)
del temp
fout_.close()
最佳使用pool.map。这是一个显示您需要做什么的示例。这里有一个多处理如何与池一起工作的简单示例:
单线程,基本功能:
def f(x):
return x*x
if __name__ == '__main__':
print(map(f, [1, 2, 3]))
>> [1, 4, 9]
使用多个处理器:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(3) # 3 parallel pools
print(p.map(f, [1, 2, 3]))
使用 1 个处理器
from multiprocessing.pool import ThreadPool as Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(3) # 3 parallel pools
print(p.map(f, [1, 2, 3]))
当您使用 map 时,您可以轻松地从函数的结果中返回一个列表。
首先,找到您希望能够与某些东西(可能与同一函数的其他调用)并行执行的代码部分。然后,弄清楚如何让这段代码不与其他任何东西共享可变状态。
可变状态是并行执行的敌人。如果两段代码并行执行并共享可变状态,您将不知道结果会是什么(并且每次 运行 程序的结果都会不同)。这是因为您不知道并行执行中的代码将 运行 的顺序是什么。也许第一个会改变一些东西,然后第二个会计算一些东西。或者也许第二个会计算一些东西,然后第一个会改变它。谁知道?这个问题有解决方案,但它们涉及 fine-grained 锁定和仔细推理可以更改的内容和时间。
在你拥有一个核心不共享可变状态的算法后,将其分解为一个单独的函数(将局部变量转换为参数)。
最后,使用类似 threading
(如果您的计算主要在具有良好 GIL 行为的 CPython 扩展模块中)或 multiprocessing
(否则)模块来执行算法核心函数(您已经抽象出来)在某种程度上的并行性。
您共享的特定代码示例是一个挑战,因为您使用了 NetworkX 库和大量共享的可变状态。显然,循环的每次迭代都取决于前一次的结果。这显然不是您 可以 并行化的东西。然而,也许如果你更抽象地考虑你的目标,你将能够想出一种方法来实现它(记住,关键是能够表达你的算法 without using shared mutable州)。
您的函数名为 delta
。或许您可以将图表拆分为 sub-graphs 并并行计算每个图表的增量(现在不再 共享 )。
如果最外层循环中的代码是并发安全的(我不知道是否是),您可以像这样重写它以并行执行:
from multiprocessing import Pool
def do_one_step(nx, graph, n, t, OutExt, u):
# Create a separate output file for this set of results.
name = "{}Delta{}-{}.txt".format(OutExt, t, u)
fout_ = open(name, 'w')
temp = nx.Graph(graph)
for v in range(u+1,n):
Stat = dict()
temp.add_edge(u,v)
MineDeltaGraphletTransitionsFromDynamicNetwork(graph,temp,Stat,u,v)
for a in Stat:
for b in Stat[a]:
fout_.write(str(t)+"\t"+str(u)+"\t"+str(v)+"\t"+str(a)+"\t"+str(b)+"\t"+str(Stat[a][b])+"\n")
if not graph.has_edge(u,v):
temp.remove_edge(u,v)
fout_.close()
def delta(graph,n,t,nx,OutExt):
pool = Pool()
pool.map(
partial(
do_one_step,
nx,
graph,
n,
t,
OutExt,
),
range(0,n),
)
这假设所有参数都可以跨进程序列化(对于传递给使用 multiprocessing
调用的函数的任何参数都是必需的)。我怀疑 nx
和 graph
可能有问题,但我不知道它们是什么。
再说一次,这假定并发执行内循环实际上是正确。