如何确定重复(意外)`repartition-split-repartition-merge` 任务的原因?

How do I identify the cause of repeated (unexpected) `repartition-split-repartition-merge` tasks?

在 Dask 通过 ddf.visualize() 输出的任务图中,我看到许多 *-repartition-split-repartition-merge 任务,其中 * 可能是 joinrename 或一些我从我的应用程序中识别出的其他任务。

我正在尝试确定这些数据的来源,它们是否会对性能产生影响(我经常想象 repartitioning/splitting/merging 数据会产生成本,但不会直接帮助我的计算目标),如果会,如何影响我可以删除它们。

它们似乎在 distributed 提供的 performance_report 中支配计算时间。

查看 Dask 源代码,我可以在 dask.dataframe.core 源代码中看到 DataFrame 方法 repartition 将这些值放入 HighLevelGraph:

[...]
        tmp = "repartition-split-" + token
        out = "repartition-merge-" + token
        dsk = repartition_divisions(
            df.divisions, divisions, df._name, tmp, out, force=force
        )
        graph = HighLevelGraph.from_collections(out, dsk, dependencies=[df])
        return new_dd_object(graph, out, df._meta, divisions)
[...]

鉴于我并没有特别要求 Dask 在我的应用程序中重新分区,我如何才能找出是什么触发了这个?

我试过在这段 Dask 代码中设置断点,但我似乎没有成功。

就我而言,这是因为我的应用程序处理许多不同的数据帧,并在执行结束时合并它们。

作为此合并的一部分,Dask 需要对齐 divisions/partitions,并通过 DataFrame.repartition() 方法执行此操作。在该方法中,看起来至少创建了两个不同的任务 - repartition-split(进行分区并将其分解为 n 其他任务)和 repartition-merge(加入 m分割成一个)。