Dask 分布式调度器和大型函数

Dask Distributed Scheduler and Large Functions

在带有 LocalCluster 的 Dask 分布式调度程序的上下文中:有人可以帮助我理解具有大型(堆)映射函数的动态吗?

例如,考虑 Dask 数据帧 ddfmap_partitions 操作:

def mapper():
  resource=... #load some large resource eg 50MB

  def inner(pdf):
    return pdf.apply(lambda x: ..., axis=1)

  return inner

mapper_fn = mapper() #50MB on heap
ddf.map_partitions(mapper_fn)

这里发生了什么? Dask 将序列化 mapper_fn 并发送给所有任务?比如说,我有 n 个分区,所以有 n 个任务。

根据经验,我观察到,如果我有 40 个任务和一个 50MB 的映射器,那么开始工作大约需要 70 秒,集群似乎有点满 CPU,但是仪表板什么也没显示。这里发生了什么?在 dish 分布式调度程序中使用大型(堆)函数会产生什么后果?

Dask 使用 cloudpickle 序列化非平凡函数,并在每个任务中包含这些函数的序列化版本。这是非常低效的。我们建议您不要这样做,而是显式传递数据。

resource = ...

ddf.map_partitions(func, resource=resource)

这样会更有效率。