分块并行地从 dask 区域加载数据
Load data from dask area in parellel in chunks
我在 dask 数据框中加载了一个大型 xarray 数据集,其中包含相当大的空间和时间范围内的数据。我想要做的是使用 dask 将这些数据加载到内存中,方法是将其拆分为更小的块并并行加载这些数据。下面是我正在尝试做的示例代码:
import numpy as np
import xarray as xr
def chunk(ds,x_ends,y_ends):
'''
Function which takes a large dataset which has been lazily loaded and specified
indices within the dataset, and cuts out the chunk and loads it into memory.
'''
chunk = ds.isel(x=slice(x_ends[0],x_ends[1]),y=slice(y_vals[0],y_vals[1]))
with ProgressBar():
chunk = chunk.compute()
return chunk
dval = np.random.randint(5,size=[10,100,100])
x = np.linspace(0,100,101,dtype=int)
y = np.linspace(0,100,101,dtype=int)
time = np.linspace(0,10,11,dtype=int)
data = xr.DataArray(dval,coords=[time,x,y],dims=['time','x','y'])
x_vals = np.arange(0,len(data.x),1000)
x_vals = np.append(x_vals,len(data.x))
y_vals = np.arange(0,len(data.y),1000)
y_vals = np.append(y_vals,len(data.y))
for i in range(len(x_vals)-1):
for j in range(len(y_vals)-1):
chunk(data,[x_vals[i],x_vals[i+1]],[y_vals[j],y_vals[j+1]])
这完成了我想要它做的事情,但显然不是并行的,并且不适合双 for 循环。这将插入一个更大的函数,其中数据将应用其他操作。我还意识到示例中的 DataArray 不是暗数组。
我之前尝试使用 dask.distributed 客户端 class,但这破坏了 .compute() 函数。我觉得可能有一个相对简单的答案,我只是在错误的地方寻找。
存在以下函数来制作 Xarray 的 daskified、分块版本:http://xarray.pydata.org/en/stable/generated/xarray.DataArray.chunk.html
那会做你想要的玩具例子。然后对该数据的操作将逐块完成,并且可能很好地并行化。
但是,您通常希望在加载时对数据进行分块,而不是拆分内存中已有的数组。大多数 xarray 加载函数允许您指定 chunks=
,这将自动使内部数据模型变暗,并为您提供并行 and/or 核外处理。 zarr
格式对这种操作特别友好,因为每个数据块都存储在不同的文件中,并且可以根据需要从远程存储系统无缝加载。
我在 dask 数据框中加载了一个大型 xarray 数据集,其中包含相当大的空间和时间范围内的数据。我想要做的是使用 dask 将这些数据加载到内存中,方法是将其拆分为更小的块并并行加载这些数据。下面是我正在尝试做的示例代码:
import numpy as np
import xarray as xr
def chunk(ds,x_ends,y_ends):
'''
Function which takes a large dataset which has been lazily loaded and specified
indices within the dataset, and cuts out the chunk and loads it into memory.
'''
chunk = ds.isel(x=slice(x_ends[0],x_ends[1]),y=slice(y_vals[0],y_vals[1]))
with ProgressBar():
chunk = chunk.compute()
return chunk
dval = np.random.randint(5,size=[10,100,100])
x = np.linspace(0,100,101,dtype=int)
y = np.linspace(0,100,101,dtype=int)
time = np.linspace(0,10,11,dtype=int)
data = xr.DataArray(dval,coords=[time,x,y],dims=['time','x','y'])
x_vals = np.arange(0,len(data.x),1000)
x_vals = np.append(x_vals,len(data.x))
y_vals = np.arange(0,len(data.y),1000)
y_vals = np.append(y_vals,len(data.y))
for i in range(len(x_vals)-1):
for j in range(len(y_vals)-1):
chunk(data,[x_vals[i],x_vals[i+1]],[y_vals[j],y_vals[j+1]])
这完成了我想要它做的事情,但显然不是并行的,并且不适合双 for 循环。这将插入一个更大的函数,其中数据将应用其他操作。我还意识到示例中的 DataArray 不是暗数组。
我之前尝试使用 dask.distributed 客户端 class,但这破坏了 .compute() 函数。我觉得可能有一个相对简单的答案,我只是在错误的地方寻找。
存在以下函数来制作 Xarray 的 daskified、分块版本:http://xarray.pydata.org/en/stable/generated/xarray.DataArray.chunk.html 那会做你想要的玩具例子。然后对该数据的操作将逐块完成,并且可能很好地并行化。
但是,您通常希望在加载时对数据进行分块,而不是拆分内存中已有的数组。大多数 xarray 加载函数允许您指定 chunks=
,这将自动使内部数据模型变暗,并为您提供并行 and/or 核外处理。 zarr
格式对这种操作特别友好,因为每个数据块都存储在不同的文件中,并且可以根据需要从远程存储系统无缝加载。