使用 Xarray 和 Numpy 数组进行多处理

multiprocessing with Xarray and Numpy array

所以我正在尝试实施已经描述过的解决方案 here,但我正在对其进行一些更改。我不是仅仅尝试通过操作更改数组,而是尝试使用 xarray 从 NetCDF 文件中读取,然后使用多处理模块写入共享的 numpy 数组。

我觉得我已经很接近了,但是出了点问题。我在下面粘贴了一个可重现的简单 copy/paste 示例。如您所见,当我 运行 进程时,它们都可以读取我创建的文件,但它们没有正确更新我尝试写入的共享 numpy 数组。任何帮助将不胜感激。

代码

import ctypes
import logging
import multiprocessing as mp
import xarray as xr

from contextlib import closing

import numpy as np

info = mp.get_logger().info


def main():

    data = np.arange(10)

    for i in range(4):
        ds = xr.Dataset({'x': data})
        ds.to_netcdf('test_{}.nc'.format(i))

        ds.close()


    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 4, 10
    shared_arr = mp.Array(ctypes.c_float, N * M)
    arr = tonumpyarray(shared_arr, dtype=np.float32)
    arr = arr.reshape((N, M))

    # Fill with random values
    arr[:, :] = np.zeros((N, M))
    arr_orig = arr.copy()

    files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']

    parameter_tuples = [
        (files[0], 0),
        (files[1], 1),
        (files[2], 2),
        (files[3], 3)
    ]

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access different slices of the same array
        p.map_async(g, parameter_tuples)
    p.join()

    print(arr_orig)
    print(tonumpyarray(shared_arr, np.float32).reshape(N, M))


def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_  # must be inherited, not passed as an argument


def tonumpyarray(mp_arr, dtype=np.float64):
    return np.frombuffer(mp_arr.get_obj(), dtype)


def g(params):
    """no synchronization."""
    print("Current File Name: ", params[0])

    tmp_dataset = xr.open_dataset(params[0])

    print(tmp_dataset["x"].data[:])

    arr = tonumpyarray(shared_arr)
    arr[params[1], :] = tmp_dataset["x"].data[:]

    tmp_dataset.close()


if __name__ == '__main__':
    mp.freeze_support()
    main()

怎么了?

1.You tonumpyarray.
后忘记重塑背部 2.You 在 tonumpyarray 中使用了错误的 dtype

代码

import ctypes
import logging
import multiprocessing as mp
import xarray as xr

from contextlib import closing

import numpy as np

info = mp.get_logger().info


def main():

    data = np.arange(10)

    for i in range(4):
        ds = xr.Dataset({'x': data})
        ds.to_netcdf('test_{}.nc'.format(i))

        ds.close()


    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 4, 10
    shared_arr = mp.Array(ctypes.c_float, N * M)
    arr = tonumpyarray(shared_arr, dtype=np.float32)
    arr = arr.reshape((N, M))

    # Fill with random values
    arr[:, :] = np.zeros((N, M))
    arr_orig = arr.copy()

    files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']

    parameter_tuples = [
        (files[0], 0),
        (files[1], 1),
        (files[2], 2),
        (files[3], 3)
    ]

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr, N, M))) as p:
        # many processes access different slices of the same array
        p.map_async(g, parameter_tuples)
    p.join()

    print(arr_orig)
    print(tonumpyarray(shared_arr, np.float32).reshape(N, M))


def init(shared_arr_, N_, M_):    # add shape
    global shared_arr
    global N, M
    shared_arr = shared_arr_  # must be inherited, not passed as an argument
    N = N_
    M = M_


def tonumpyarray(mp_arr, dtype=np.float32):  # change type
    return np.frombuffer(mp_arr.get_obj(), dtype)


def g(params):
    """no synchronization."""
    print("Current File Name: ", params[0])

    tmp_dataset = xr.open_dataset(params[0])

    print(tmp_dataset["x"].data[:])

    arr = tonumpyarray(shared_arr).reshape(N, M)   # reshape
    arr[params[1], :] = tmp_dataset["x"].data[:]

    tmp_dataset.close()


if __name__ == '__main__':
    mp.freeze_support()
    main()