Dask 和 numpy - numpy 数组和 dask 数组之间的缓慢转换
Dask and numpy - slow conversion between numpy array and dask array
我需要从一个大的 numpy 数组中保存一个 dask 数组。下面是一个显示该过程的最小工作示例。请注意,a
仅针对此 mwe 使用 numpy.random
创建,不幸的是我无法使用 dask 创建数组。
import numpy as np
import dask.array as da
from dask.distributed import Client
a = numpy.random.randint(0,2,size=4000000*8000).reshape((4000000,8000))
# here the conversion and saving
client = Client(n_workers=90, threads_per_worker=20, processes=True)
dask_array = da.from_array( a, chunks = 100000)
da.to_npy_stack(‘my-folder/’, dask_array)
client.close()
我面临的问题是 a
内存占用大约 100GB,但是当 运行 dask 部分使用的内存开始上升,直到它几乎填满可用 ram,即超过 300GB。然后它进行一些计算,一段时间后(比如 10 分钟)出现内存错误。我需要 dask 保存的数组,因为我有另一个使用 dask 数组的管道(不能直接连接到该管道),要从内存中读取 dask 数组,需要 info
文件(如果有任何其他方法)转储数组并创建 info
文件我打开试试)。
关于如何加速和解决这个任务有什么建议吗?
在主进程中创建所有数据,然后将其上传到工作进程是一个坏主意!您应该始终努力 load/create 直接在 worker 中处理数据,这将 a) 避免重复工作和复制数据 b) 保持数据惰性,仅在需要时将其具体化到内存中:
在这种情况下,这可能看起来像
arr = da.random.randint(0, 2, size=4000000*8000, chunks=100000).reshape((4000000,8000))
如果您在一台机器上,那么我建议使用标准线程调度程序而不是 dask.distributed.Client
。您将以这种方式将所有数据保存在同一个进程中,并且无需复制大型 Numpy 数组。
我需要从一个大的 numpy 数组中保存一个 dask 数组。下面是一个显示该过程的最小工作示例。请注意,a
仅针对此 mwe 使用 numpy.random
创建,不幸的是我无法使用 dask 创建数组。
import numpy as np
import dask.array as da
from dask.distributed import Client
a = numpy.random.randint(0,2,size=4000000*8000).reshape((4000000,8000))
# here the conversion and saving
client = Client(n_workers=90, threads_per_worker=20, processes=True)
dask_array = da.from_array( a, chunks = 100000)
da.to_npy_stack(‘my-folder/’, dask_array)
client.close()
我面临的问题是 a
内存占用大约 100GB,但是当 运行 dask 部分使用的内存开始上升,直到它几乎填满可用 ram,即超过 300GB。然后它进行一些计算,一段时间后(比如 10 分钟)出现内存错误。我需要 dask 保存的数组,因为我有另一个使用 dask 数组的管道(不能直接连接到该管道),要从内存中读取 dask 数组,需要 info
文件(如果有任何其他方法)转储数组并创建 info
文件我打开试试)。
关于如何加速和解决这个任务有什么建议吗?
在主进程中创建所有数据,然后将其上传到工作进程是一个坏主意!您应该始终努力 load/create 直接在 worker 中处理数据,这将 a) 避免重复工作和复制数据 b) 保持数据惰性,仅在需要时将其具体化到内存中:
在这种情况下,这可能看起来像
arr = da.random.randint(0, 2, size=4000000*8000, chunks=100000).reshape((4000000,8000))
如果您在一台机器上,那么我建议使用标准线程调度程序而不是 dask.distributed.Client
。您将以这种方式将所有数据保存在同一个进程中,并且无需复制大型 Numpy 数组。