如何确定重复(意外)`repartition-split-repartition-merge` 任务的原因?
How do I identify the cause of repeated (unexpected) `repartition-split-repartition-merge` tasks?
在 Dask 通过 ddf.visualize()
输出的任务图中,我看到许多 *-repartition-split-repartition-merge
任务,其中 *
可能是 join
、rename
或一些我从我的应用程序中识别出的其他任务。
我正在尝试确定这些数据的来源,它们是否会对性能产生影响(我经常想象 repartitioning/splitting/merging 数据会产生成本,但不会直接帮助我的计算目标),如果会,如何影响我可以删除它们。
它们似乎在 distributed
提供的 performance_report
中支配计算时间。
查看 Dask 源代码,我可以在 dask.dataframe.core
源代码中看到 DataFrame
方法 repartition
将这些值放入 HighLevelGraph
:
[...]
tmp = "repartition-split-" + token
out = "repartition-merge-" + token
dsk = repartition_divisions(
df.divisions, divisions, df._name, tmp, out, force=force
)
graph = HighLevelGraph.from_collections(out, dsk, dependencies=[df])
return new_dd_object(graph, out, df._meta, divisions)
[...]
鉴于我并没有特别要求 Dask 在我的应用程序中重新分区,我如何才能找出是什么触发了这个?
我试过在这段 Dask 代码中设置断点,但我似乎没有成功。
就我而言,这是因为我的应用程序处理许多不同的数据帧,并在执行结束时合并它们。
作为此合并的一部分,Dask 需要对齐 divisions/partitions,并通过 DataFrame.repartition()
方法执行此操作。在该方法中,看起来至少创建了两个不同的任务 - repartition-split
(进行分区并将其分解为 n
其他任务)和 repartition-merge
(加入 m
分割成一个)。
在 Dask 通过 ddf.visualize()
输出的任务图中,我看到许多 *-repartition-split-repartition-merge
任务,其中 *
可能是 join
、rename
或一些我从我的应用程序中识别出的其他任务。
我正在尝试确定这些数据的来源,它们是否会对性能产生影响(我经常想象 repartitioning/splitting/merging 数据会产生成本,但不会直接帮助我的计算目标),如果会,如何影响我可以删除它们。
它们似乎在 distributed
提供的 performance_report
中支配计算时间。
查看 Dask 源代码,我可以在 dask.dataframe.core
源代码中看到 DataFrame
方法 repartition
将这些值放入 HighLevelGraph
:
[...]
tmp = "repartition-split-" + token
out = "repartition-merge-" + token
dsk = repartition_divisions(
df.divisions, divisions, df._name, tmp, out, force=force
)
graph = HighLevelGraph.from_collections(out, dsk, dependencies=[df])
return new_dd_object(graph, out, df._meta, divisions)
[...]
鉴于我并没有特别要求 Dask 在我的应用程序中重新分区,我如何才能找出是什么触发了这个?
我试过在这段 Dask 代码中设置断点,但我似乎没有成功。
就我而言,这是因为我的应用程序处理许多不同的数据帧,并在执行结束时合并它们。
作为此合并的一部分,Dask 需要对齐 divisions/partitions,并通过 DataFrame.repartition()
方法执行此操作。在该方法中,看起来至少创建了两个不同的任务 - repartition-split
(进行分区并将其分解为 n
其他任务)和 repartition-merge
(加入 m
分割成一个)。