使用 Xarray 和 Numpy 数组进行多处理
multiprocessing with Xarray and Numpy array
所以我正在尝试实施已经描述过的解决方案 here,但我正在对其进行一些更改。我不是仅仅尝试通过操作更改数组,而是尝试使用 xarray 从 NetCDF 文件中读取,然后使用多处理模块写入共享的 numpy 数组。
我觉得我已经很接近了,但是出了点问题。我在下面粘贴了一个可重现的简单 copy/paste 示例。如您所见,当我 运行 进程时,它们都可以读取我创建的文件,但它们没有正确更新我尝试写入的共享 numpy 数组。任何帮助将不胜感激。
代码
import ctypes
import logging
import multiprocessing as mp
import xarray as xr
from contextlib import closing
import numpy as np
info = mp.get_logger().info
def main():
data = np.arange(10)
for i in range(4):
ds = xr.Dataset({'x': data})
ds.to_netcdf('test_{}.nc'.format(i))
ds.close()
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
# create shared array
N, M = 4, 10
shared_arr = mp.Array(ctypes.c_float, N * M)
arr = tonumpyarray(shared_arr, dtype=np.float32)
arr = arr.reshape((N, M))
# Fill with random values
arr[:, :] = np.zeros((N, M))
arr_orig = arr.copy()
files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']
parameter_tuples = [
(files[0], 0),
(files[1], 1),
(files[2], 2),
(files[3], 3)
]
# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
# many processes access different slices of the same array
p.map_async(g, parameter_tuples)
p.join()
print(arr_orig)
print(tonumpyarray(shared_arr, np.float32).reshape(N, M))
def init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr, dtype=np.float64):
return np.frombuffer(mp_arr.get_obj(), dtype)
def g(params):
"""no synchronization."""
print("Current File Name: ", params[0])
tmp_dataset = xr.open_dataset(params[0])
print(tmp_dataset["x"].data[:])
arr = tonumpyarray(shared_arr)
arr[params[1], :] = tmp_dataset["x"].data[:]
tmp_dataset.close()
if __name__ == '__main__':
mp.freeze_support()
main()
怎么了?
1.You tonumpyarray
.
后忘记重塑背部
2.You 在 tonumpyarray
中使用了错误的 dtype
。
代码
import ctypes
import logging
import multiprocessing as mp
import xarray as xr
from contextlib import closing
import numpy as np
info = mp.get_logger().info
def main():
data = np.arange(10)
for i in range(4):
ds = xr.Dataset({'x': data})
ds.to_netcdf('test_{}.nc'.format(i))
ds.close()
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
# create shared array
N, M = 4, 10
shared_arr = mp.Array(ctypes.c_float, N * M)
arr = tonumpyarray(shared_arr, dtype=np.float32)
arr = arr.reshape((N, M))
# Fill with random values
arr[:, :] = np.zeros((N, M))
arr_orig = arr.copy()
files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']
parameter_tuples = [
(files[0], 0),
(files[1], 1),
(files[2], 2),
(files[3], 3)
]
# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr, N, M))) as p:
# many processes access different slices of the same array
p.map_async(g, parameter_tuples)
p.join()
print(arr_orig)
print(tonumpyarray(shared_arr, np.float32).reshape(N, M))
def init(shared_arr_, N_, M_): # add shape
global shared_arr
global N, M
shared_arr = shared_arr_ # must be inherited, not passed as an argument
N = N_
M = M_
def tonumpyarray(mp_arr, dtype=np.float32): # change type
return np.frombuffer(mp_arr.get_obj(), dtype)
def g(params):
"""no synchronization."""
print("Current File Name: ", params[0])
tmp_dataset = xr.open_dataset(params[0])
print(tmp_dataset["x"].data[:])
arr = tonumpyarray(shared_arr).reshape(N, M) # reshape
arr[params[1], :] = tmp_dataset["x"].data[:]
tmp_dataset.close()
if __name__ == '__main__':
mp.freeze_support()
main()
所以我正在尝试实施已经描述过的解决方案 here,但我正在对其进行一些更改。我不是仅仅尝试通过操作更改数组,而是尝试使用 xarray 从 NetCDF 文件中读取,然后使用多处理模块写入共享的 numpy 数组。
我觉得我已经很接近了,但是出了点问题。我在下面粘贴了一个可重现的简单 copy/paste 示例。如您所见,当我 运行 进程时,它们都可以读取我创建的文件,但它们没有正确更新我尝试写入的共享 numpy 数组。任何帮助将不胜感激。
代码
import ctypes
import logging
import multiprocessing as mp
import xarray as xr
from contextlib import closing
import numpy as np
info = mp.get_logger().info
def main():
data = np.arange(10)
for i in range(4):
ds = xr.Dataset({'x': data})
ds.to_netcdf('test_{}.nc'.format(i))
ds.close()
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
# create shared array
N, M = 4, 10
shared_arr = mp.Array(ctypes.c_float, N * M)
arr = tonumpyarray(shared_arr, dtype=np.float32)
arr = arr.reshape((N, M))
# Fill with random values
arr[:, :] = np.zeros((N, M))
arr_orig = arr.copy()
files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']
parameter_tuples = [
(files[0], 0),
(files[1], 1),
(files[2], 2),
(files[3], 3)
]
# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
# many processes access different slices of the same array
p.map_async(g, parameter_tuples)
p.join()
print(arr_orig)
print(tonumpyarray(shared_arr, np.float32).reshape(N, M))
def init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr, dtype=np.float64):
return np.frombuffer(mp_arr.get_obj(), dtype)
def g(params):
"""no synchronization."""
print("Current File Name: ", params[0])
tmp_dataset = xr.open_dataset(params[0])
print(tmp_dataset["x"].data[:])
arr = tonumpyarray(shared_arr)
arr[params[1], :] = tmp_dataset["x"].data[:]
tmp_dataset.close()
if __name__ == '__main__':
mp.freeze_support()
main()
怎么了?
1.You tonumpyarray
.
后忘记重塑背部
2.You 在 tonumpyarray
中使用了错误的 dtype
。
代码
import ctypes
import logging
import multiprocessing as mp
import xarray as xr
from contextlib import closing
import numpy as np
info = mp.get_logger().info
def main():
data = np.arange(10)
for i in range(4):
ds = xr.Dataset({'x': data})
ds.to_netcdf('test_{}.nc'.format(i))
ds.close()
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
# create shared array
N, M = 4, 10
shared_arr = mp.Array(ctypes.c_float, N * M)
arr = tonumpyarray(shared_arr, dtype=np.float32)
arr = arr.reshape((N, M))
# Fill with random values
arr[:, :] = np.zeros((N, M))
arr_orig = arr.copy()
files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']
parameter_tuples = [
(files[0], 0),
(files[1], 1),
(files[2], 2),
(files[3], 3)
]
# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr, N, M))) as p:
# many processes access different slices of the same array
p.map_async(g, parameter_tuples)
p.join()
print(arr_orig)
print(tonumpyarray(shared_arr, np.float32).reshape(N, M))
def init(shared_arr_, N_, M_): # add shape
global shared_arr
global N, M
shared_arr = shared_arr_ # must be inherited, not passed as an argument
N = N_
M = M_
def tonumpyarray(mp_arr, dtype=np.float32): # change type
return np.frombuffer(mp_arr.get_obj(), dtype)
def g(params):
"""no synchronization."""
print("Current File Name: ", params[0])
tmp_dataset = xr.open_dataset(params[0])
print(tmp_dataset["x"].data[:])
arr = tonumpyarray(shared_arr).reshape(N, M) # reshape
arr[params[1], :] = tmp_dataset["x"].data[:]
tmp_dataset.close()
if __name__ == '__main__':
mp.freeze_support()
main()