Dask 表演:工作流疑惑

Dask performances: workflow doubts

我对如何从 dask 中获得最佳效果感到困惑。

问题 我有一个包含多个时间序列的数据框(每个时间序列都有自己的 key),我需要在每个时间序列上 运行 一个函数 my_fun。用 pandas 解决它的一种方法涉及 df = list(df.groupby("key")) 然后应用 my_fun 与多处理。尽管 RAM 使用量很大,但性能在我的机器上非常好,在 google 云计算上很糟糕。

在 Dask 上,我当前的工作流程是:

import dask.dataframe as dd
from dask.multiprocessing import get
  1. 从S3读取数据。 14 个文件 -> 14 个分区
  2. `df.groupby("key").apply(my_fun).to_frame.compute(get=get)

因为我没有设置索引 df.known_divisionsFalse

结果图是 我不明白我看到的是否是瓶颈。

问题:

  1. df.npartitionsncpu 的倍数更好还是无所谓?
  2. this看来还是把index作为key比较好。我猜我可以做类似

    的事情

    df["key2"] = df["key"] df = df.set_index("key2")

但是,同样,我不知道这是否是最好的方法。

对于像 Dask 中的 "what is taking time" 这样的问题,通常建议您使用 "distributed" scheduler 而不是多处理 - 您可以 运行 使用任意数量的 processes/threads ,但您可以通过诊断仪表板获得更多信息。

对于您的具体问题,如果您对一个没有很好地在分区之间拆分的列进行分组并应用除简单聚合之外的任何其他内容,您将不可避免地需要洗牌。设置索引会作为一个显式步骤为您执行此洗牌,或者您会在任务图中看到隐式洗牌。这是一个多对多的操作,每个聚合任务都需要来自每个原始分区的输入,因此存在瓶颈。这是无可避免的。

至于分区数量,是的,您可以有次优条件,例如 8 个核心上的 9 个分区(您将计算 8 个任务,然后可能在一个核心上阻塞最后一个任务,而其他核心空闲);但一般来说,只要您不使用非常少量的分区,您就可以依靠 dask 做出合理的调度决策。在许多情况下,这并不重要。