似乎 multiprocessing.pool 两次采用相同的参数
It seems that multiprocessing.pool take the same argument two times
我正在使用 multiprocessing.pool
并行执行多个集成。
在这个程序中,我通过生成 dW
3D 阵列,为不同的噪声实现整合了一个运动方程。程序的第一部分只是定义参数和生成计算所需的数组。
我在函数外部生成 dW
,因为我知道否则我必须在每个过程中重新播种才能获得相同的随机序列。
Euler(replica)
函数是必须并行化的函数。这包括一个 for
循环,用于数值积分的单个过程。 arg replica
是存储在 "replicas" 数组中的我的系统副本的编号,这是在 pool.map
.
中传递的参数
import numpy as np
from multiprocessing import Pool
# parameters
N = 30 # number of sites
T = 1 # total time
dt = 0.1 # time step
l = 0 # initially localized state on site l
e = 0.0 # site energy
v = 1.0 # hopping coefficient
mu, sigma = 0, 1.0 # average and variance of the gaussian distribution
num_replicas = 8 # number of replicas of the system
processes=2 # number of processes
# identity vector which represents the diagonal of the Hamiltonian
E = np.ones(N) * e
# vector which represents the upper/lower diagonal terms of the Hopping Matrix and the Hamiltonian
V = np.ones(N-1) * v
# definition of the tight-binding Hamiltonian (tridiagonal)
H = np.diag(E) + np.diag(V, k=1) + np.diag(V, k=-1)
# corner elements of the Hamiltonian
H[0, -1] = v
H[-1, 0] = v
# time array
time_array = np.arange(0, T, dt)
# site array
site_array = np.arange(N)
# initial state
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
#initialization of the state array
Psi = np.zeros((len(time_array), N), dtype=complex)
Psi[0,:] = psi_0
# replicas 1D array
replicas = np.arange(0, num_replicas)
# random 2D array
dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
def Euler(replica):
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
psi = psi_0
for i in np.arange(1, len(time_array)):
psi += -1.j * (H @ psi) * dt - 1.j * sigma * psi * dW[i,replica,:] - 0.5 * (sigma**2) * psi * dt
psi /= np.sqrt(psi @ np.conj(psi))
Psi[i,:] = psi
return Psi
pool = Pool(processes)
Psi = pool.map(Euler, replicas)
Psi = np.asarray(Psi)
Psi = np.swapaxes(Psi,0,1)
print(Psi)
凭经验我发现,如果num_replicas > 4 * processes
如pool.map
函数中所表达的那样,似乎两个进程采用相同的参数,就好像相同的计算重复了两次一样。相反,从 'num_replicas <= 4*processes` 我得到了预期的结果:每个进程都不同于其他进程。
这不是由于随机矩阵 dW
的生成,因为每一行都是不相关的,所以我将此行为归因于我对 multiprocessing.pool
.
的使用
我认为您应该在 Euler 函数中初始化 psi_0 和 "Psi"。
我试图重现您的结果,事实上,我发现当 num_replicas > 4 * processes
时您从多个处理器获得相同的结果。但我认为这是因为 Psi,在你的例子中,它是一个全局变量。
按如下方式修改代码,每个 num_replicas 都会给出不同的结果(顺便说一下,为什么要使用 site_array?它没有在任何地方使用)。
import numpy as np
from multiprocessing import Pool
# parameters
N = 3 # number of sites
T = 1 # total time
dt = 0.1 # time step
l = 0 # initially localized state on site l
e = 0.0 # site energy
v = 1.0 # hopping coefficient
mu, sigma = 0, 1.0 # average and variance of the gaussian distribution
num_replicas = 10 # number of replicas of the system
processes=2 # number of processes
# identity vector which represents the diagonal of the Hamiltonian
E = np.ones(N) * e
# vector which represents the upper/lower diagonal terms of the Hopping Matrix and the Hamiltonian
V = np.ones(N-1) * v
# definition of the tight-binding Hamiltonian (tridiagonal)
H = np.diag(E) + np.diag(V, k=1) + np.diag(V, k=-1)
# corner elements of the Hamiltonian
H[0, -1] = v
H[-1, 0] = v
# time array
time_array = np.arange(0, T, dt)
## site array
#site_array = np.arange(N)
# replicas 1D array
replicas = np.arange(0, num_replicas)
# random 2D array
dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
#dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
def Euler(replica):
# initial state
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
#initialization of the state array
Psi = np.zeros((len(time_array), N), dtype=complex)
Psi[0,:] = psi_0
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
psi = psi_0
# print(dW[0,replica,0])
for i in np.arange(1, len(time_array)):
psi += -1.j * (H @ psi) * dt - 1.j * sigma * psi * dW[i,replica,:] - 0.5 * (sigma**2) * psi * dt
psi /= np.sqrt(psi @ np.conj(psi))
Psi[i,:] = psi
return Psi
pool = Pool(processes)
Psi = pool.map(Euler, replicas)
Psi = np.asarray(Psi)
Psi = np.swapaxes(Psi,0,1)
print(Psi)
正如@Fabrizio 指出的那样,Psi
在 Euler
的调用之间共享。这通常是一件坏事,也是为什么 "global mutable state" 不是一个好主意的另一个例子。事情太容易以意想不到的方式崩溃
在这种情况下它失败的原因是微妙的,并且由于在每个过程中 Pool.map
accumulates several results 在酸洗它们并将它们 return 到 parent/controlling 过程之前的方式。您可以通过将 map
的 chunksize
参数设置为 1
来看到这一点,使其立即产生 return 结果,因此不会覆盖中间结果
它等效于以下最小工作示例:
from multiprocessing import Pool
arr = [None]
def mutate_global(i):
arr[0] = i
return arr
with Pool(2) as pool:
out = pool.map(mutate_global, range(10), chunksize=5)
print(out)
上次我 运行 我得到了这个:
[[4], [4], [4], [4], [4], [9], [9], [9], [9], [9]]
您可以更改 chunksize
参数以了解它在做什么,或者 运行 使用以下版本:
def mutate_local(i):
arr = [None]
arr[0] = i
return arr
"just works",并且与@Fabrizio 描述的在 Euler
中创建 Phi
而不是使用单个全局变量
我正在使用 multiprocessing.pool
并行执行多个集成。
在这个程序中,我通过生成 dW
3D 阵列,为不同的噪声实现整合了一个运动方程。程序的第一部分只是定义参数和生成计算所需的数组。
我在函数外部生成 dW
,因为我知道否则我必须在每个过程中重新播种才能获得相同的随机序列。
Euler(replica)
函数是必须并行化的函数。这包括一个 for
循环,用于数值积分的单个过程。 arg replica
是存储在 "replicas" 数组中的我的系统副本的编号,这是在 pool.map
.
import numpy as np
from multiprocessing import Pool
# parameters
N = 30 # number of sites
T = 1 # total time
dt = 0.1 # time step
l = 0 # initially localized state on site l
e = 0.0 # site energy
v = 1.0 # hopping coefficient
mu, sigma = 0, 1.0 # average and variance of the gaussian distribution
num_replicas = 8 # number of replicas of the system
processes=2 # number of processes
# identity vector which represents the diagonal of the Hamiltonian
E = np.ones(N) * e
# vector which represents the upper/lower diagonal terms of the Hopping Matrix and the Hamiltonian
V = np.ones(N-1) * v
# definition of the tight-binding Hamiltonian (tridiagonal)
H = np.diag(E) + np.diag(V, k=1) + np.diag(V, k=-1)
# corner elements of the Hamiltonian
H[0, -1] = v
H[-1, 0] = v
# time array
time_array = np.arange(0, T, dt)
# site array
site_array = np.arange(N)
# initial state
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
#initialization of the state array
Psi = np.zeros((len(time_array), N), dtype=complex)
Psi[0,:] = psi_0
# replicas 1D array
replicas = np.arange(0, num_replicas)
# random 2D array
dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
def Euler(replica):
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
psi = psi_0
for i in np.arange(1, len(time_array)):
psi += -1.j * (H @ psi) * dt - 1.j * sigma * psi * dW[i,replica,:] - 0.5 * (sigma**2) * psi * dt
psi /= np.sqrt(psi @ np.conj(psi))
Psi[i,:] = psi
return Psi
pool = Pool(processes)
Psi = pool.map(Euler, replicas)
Psi = np.asarray(Psi)
Psi = np.swapaxes(Psi,0,1)
print(Psi)
凭经验我发现,如果num_replicas > 4 * processes
如pool.map
函数中所表达的那样,似乎两个进程采用相同的参数,就好像相同的计算重复了两次一样。相反,从 'num_replicas <= 4*processes` 我得到了预期的结果:每个进程都不同于其他进程。
这不是由于随机矩阵 dW
的生成,因为每一行都是不相关的,所以我将此行为归因于我对 multiprocessing.pool
.
我认为您应该在 Euler 函数中初始化 psi_0 和 "Psi"。
我试图重现您的结果,事实上,我发现当 num_replicas > 4 * processes
时您从多个处理器获得相同的结果。但我认为这是因为 Psi,在你的例子中,它是一个全局变量。
按如下方式修改代码,每个 num_replicas 都会给出不同的结果(顺便说一下,为什么要使用 site_array?它没有在任何地方使用)。
import numpy as np
from multiprocessing import Pool
# parameters
N = 3 # number of sites
T = 1 # total time
dt = 0.1 # time step
l = 0 # initially localized state on site l
e = 0.0 # site energy
v = 1.0 # hopping coefficient
mu, sigma = 0, 1.0 # average and variance of the gaussian distribution
num_replicas = 10 # number of replicas of the system
processes=2 # number of processes
# identity vector which represents the diagonal of the Hamiltonian
E = np.ones(N) * e
# vector which represents the upper/lower diagonal terms of the Hopping Matrix and the Hamiltonian
V = np.ones(N-1) * v
# definition of the tight-binding Hamiltonian (tridiagonal)
H = np.diag(E) + np.diag(V, k=1) + np.diag(V, k=-1)
# corner elements of the Hamiltonian
H[0, -1] = v
H[-1, 0] = v
# time array
time_array = np.arange(0, T, dt)
## site array
#site_array = np.arange(N)
# replicas 1D array
replicas = np.arange(0, num_replicas)
# random 2D array
dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
#dW = np.random.normal(mu, 1.0, (len(time_array), num_replicas, N)) * np.sqrt(dt)
def Euler(replica):
# initial state
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
#initialization of the state array
Psi = np.zeros((len(time_array), N), dtype=complex)
Psi[0,:] = psi_0
psi_0 = np.zeros((N), dtype=complex)
psi_0[l] = 1. + 0.j
psi = psi_0
# print(dW[0,replica,0])
for i in np.arange(1, len(time_array)):
psi += -1.j * (H @ psi) * dt - 1.j * sigma * psi * dW[i,replica,:] - 0.5 * (sigma**2) * psi * dt
psi /= np.sqrt(psi @ np.conj(psi))
Psi[i,:] = psi
return Psi
pool = Pool(processes)
Psi = pool.map(Euler, replicas)
Psi = np.asarray(Psi)
Psi = np.swapaxes(Psi,0,1)
print(Psi)
正如@Fabrizio 指出的那样,Psi
在 Euler
的调用之间共享。这通常是一件坏事,也是为什么 "global mutable state" 不是一个好主意的另一个例子。事情太容易以意想不到的方式崩溃
在这种情况下它失败的原因是微妙的,并且由于在每个过程中 Pool.map
accumulates several results 在酸洗它们并将它们 return 到 parent/controlling 过程之前的方式。您可以通过将 map
的 chunksize
参数设置为 1
来看到这一点,使其立即产生 return 结果,因此不会覆盖中间结果
它等效于以下最小工作示例:
from multiprocessing import Pool
arr = [None]
def mutate_global(i):
arr[0] = i
return arr
with Pool(2) as pool:
out = pool.map(mutate_global, range(10), chunksize=5)
print(out)
上次我 运行 我得到了这个:
[[4], [4], [4], [4], [4], [9], [9], [9], [9], [9]]
您可以更改 chunksize
参数以了解它在做什么,或者 运行 使用以下版本:
def mutate_local(i):
arr = [None]
arr[0] = i
return arr
"just works",并且与@Fabrizio 描述的在 Euler
中创建 Phi
而不是使用单个全局变量