Dask 分布式调度器和大型函数
Dask Distributed Scheduler and Large Functions
在带有 LocalCluster
的 Dask 分布式调度程序的上下文中:有人可以帮助我理解具有大型(堆)映射函数的动态吗?
例如,考虑 Dask 数据帧 ddf
和 map_partitions
操作:
def mapper():
resource=... #load some large resource eg 50MB
def inner(pdf):
return pdf.apply(lambda x: ..., axis=1)
return inner
mapper_fn = mapper() #50MB on heap
ddf.map_partitions(mapper_fn)
这里发生了什么? Dask 将序列化 mapper_fn
并发送给所有任务?比如说,我有 n
个分区,所以有 n
个任务。
根据经验,我观察到,如果我有 40 个任务和一个 50MB 的映射器,那么开始工作大约需要 70 秒,集群似乎有点满 CPU,但是仪表板什么也没显示。这里发生了什么?在 dish 分布式调度程序中使用大型(堆)函数会产生什么后果?
Dask 使用 cloudpickle 序列化非平凡函数,并在每个任务中包含这些函数的序列化版本。这是非常低效的。我们建议您不要这样做,而是显式传递数据。
resource = ...
ddf.map_partitions(func, resource=resource)
这样会更有效率。
在带有 LocalCluster
的 Dask 分布式调度程序的上下文中:有人可以帮助我理解具有大型(堆)映射函数的动态吗?
例如,考虑 Dask 数据帧 ddf
和 map_partitions
操作:
def mapper():
resource=... #load some large resource eg 50MB
def inner(pdf):
return pdf.apply(lambda x: ..., axis=1)
return inner
mapper_fn = mapper() #50MB on heap
ddf.map_partitions(mapper_fn)
这里发生了什么? Dask 将序列化 mapper_fn
并发送给所有任务?比如说,我有 n
个分区,所以有 n
个任务。
根据经验,我观察到,如果我有 40 个任务和一个 50MB 的映射器,那么开始工作大约需要 70 秒,集群似乎有点满 CPU,但是仪表板什么也没显示。这里发生了什么?在 dish 分布式调度程序中使用大型(堆)函数会产生什么后果?
Dask 使用 cloudpickle 序列化非平凡函数,并在每个任务中包含这些函数的序列化版本。这是非常低效的。我们建议您不要这样做,而是显式传递数据。
resource = ...
ddf.map_partitions(func, resource=resource)
这样会更有效率。