使用 dask.distributed 强制或显式数据重新平衡
Forced or explicit data rebalancing with dask.distributed
我有一个 Dask-MPI 集群,有 4 个工作人员,一个 3D 网格数据集加载到 Dask 数组中,并分成 4 个块。我的应用程序要求我 运行 每个工作人员恰好完成一项任务,最好是每个任务一个块。我遇到的问题是以可靠、可重现的方式让块分布在整个集群中。具体来说,如果我 运行 array.map_blocks(foo),foo 运行s 在每个块的同一个工人上。
Client.rebalance() 似乎应该做我想做的事,但它仍然将所有或大部分块留在同一个工人身上。作为测试,我尝试将数据重新分块为 128 个块并再次 运行ning,这导致 7 或 8 个块移动到不同的数据集。这暗示 Dask 正在使用启发式方法来决定何时自动移动块,但没有给我一种强制均匀分布块的方法。
这是我一直在尝试的一个简单测试脚本(连接到具有 4 workers/ranks 的集群)。
#connect to the Dask scheduler
from dask.distributed import Client, Sub, Pub, fire_and_forget
client = Client(scheduler_file='../scheduler.json', set_as_default=True)
#load data into a numpy array
import numpy as np
npvol = np.array(np.fromfile('/home/nleaf/data/RegGrid/Vorts_t50_128x128x128_f32.raw', dtype=np.float32))
npvol = npvol.reshape([128,128,128])
#convert numpy array to a dask array
import dask.array as da
ar = da.from_array(npvol).rechunk([npvol.shape[0], npvol.shape[1], npvol.shape[2]/N])
def test(ar):
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
return np.array([rank], ndmin=3, dtype=np.int)
client.rebalance()
print(client.persist(ar.map_blocks(test, chunks=(1,1,1))).compute())
经过几十次测试 运行s,这段代码一次返回了排名 3 的块,否则所有块都在排名 0。
由于您的总数据集不是那么大,对 from_array 的初始调用仅创建一个块,因此它会转到一个工作人员(您可以使用 chunks=
指定其他方式)。如果可能,以下重新分块不希望移动数据。
假设每个工作人员都可以访问您的文件,您最好首先在工作人员中加载块。
你需要像
这样的函数
def get_chunk(fn, offset, count, shape, dtype):
with open(fn, 'rb') as f:
f.seek(offset)
return np.fromfile(f, dtype=dtype, count=count).reshape(shape)
并为每个块传递不同的偏移量。
parts = [da.from_delayed(dask.delayed(get_chunk)(fn, offset, count, shape, dtype), shape, dtype) for offset in [...]]
arr = da.concat(parts)
这与 npy source in Intake, code: https://github.com/intake/intake/blob/master/intake/source/npy.py#L11
自动完成的非常相似
我有一个 Dask-MPI 集群,有 4 个工作人员,一个 3D 网格数据集加载到 Dask 数组中,并分成 4 个块。我的应用程序要求我 运行 每个工作人员恰好完成一项任务,最好是每个任务一个块。我遇到的问题是以可靠、可重现的方式让块分布在整个集群中。具体来说,如果我 运行 array.map_blocks(foo),foo 运行s 在每个块的同一个工人上。
Client.rebalance() 似乎应该做我想做的事,但它仍然将所有或大部分块留在同一个工人身上。作为测试,我尝试将数据重新分块为 128 个块并再次 运行ning,这导致 7 或 8 个块移动到不同的数据集。这暗示 Dask 正在使用启发式方法来决定何时自动移动块,但没有给我一种强制均匀分布块的方法。
这是我一直在尝试的一个简单测试脚本(连接到具有 4 workers/ranks 的集群)。
#connect to the Dask scheduler
from dask.distributed import Client, Sub, Pub, fire_and_forget
client = Client(scheduler_file='../scheduler.json', set_as_default=True)
#load data into a numpy array
import numpy as np
npvol = np.array(np.fromfile('/home/nleaf/data/RegGrid/Vorts_t50_128x128x128_f32.raw', dtype=np.float32))
npvol = npvol.reshape([128,128,128])
#convert numpy array to a dask array
import dask.array as da
ar = da.from_array(npvol).rechunk([npvol.shape[0], npvol.shape[1], npvol.shape[2]/N])
def test(ar):
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
return np.array([rank], ndmin=3, dtype=np.int)
client.rebalance()
print(client.persist(ar.map_blocks(test, chunks=(1,1,1))).compute())
经过几十次测试 运行s,这段代码一次返回了排名 3 的块,否则所有块都在排名 0。
由于您的总数据集不是那么大,对 from_array 的初始调用仅创建一个块,因此它会转到一个工作人员(您可以使用 chunks=
指定其他方式)。如果可能,以下重新分块不希望移动数据。
假设每个工作人员都可以访问您的文件,您最好首先在工作人员中加载块。
你需要像
这样的函数def get_chunk(fn, offset, count, shape, dtype):
with open(fn, 'rb') as f:
f.seek(offset)
return np.fromfile(f, dtype=dtype, count=count).reshape(shape)
并为每个块传递不同的偏移量。
parts = [da.from_delayed(dask.delayed(get_chunk)(fn, offset, count, shape, dtype), shape, dtype) for offset in [...]]
arr = da.concat(parts)
这与 npy source in Intake, code: https://github.com/intake/intake/blob/master/intake/source/npy.py#L11
自动完成的非常相似