在 Dask Dataframe 上使用 set_index() 并写入 parquet 会导致内存爆炸

Using set_index() on a Dask Dataframe and writing to parquet causes memory explosion

我有一大组 Parquet 文件,我正在尝试按列排序。未压缩的数据约为 14Gb,因此 Dask 似乎是完成这项工作的合适工具。我对 Dask 所做的一切是:

  1. 正在读取 parquet 文件
  2. 按其中一列(称为“朋友”)排序
  3. 在单独的目录中作为镶木地板文件写入

如果没有 Dask 进程(只有一个,我正在使用同步调度程序)我无法执行此操作运行 内存不足并被杀死。这让我感到惊讶,因为没有一个分区超过 ~300 MB 未压缩。

我写了一个小脚本来分析 Dask 的数据集,我注意到 Dask 的内存消耗随着输入的大小而变化。这是脚本:

import os
import dask
import dask.dataframe as dd
from dask.diagnostics import ResourceProfiler, ProgressBar

def run(input_path, output_path, input_limit):
    dask.config.set(scheduler="synchronous")

    filenames = os.listdir(input_path)
    full_filenames = [os.path.join(input_path, f) for f in filenames]

    rprof = ResourceProfiler()
    with rprof, ProgressBar():
        df = dd.read_parquet(full_filenames[:input_limit])
        df = df.set_index("friend")
        df.to_parquet(output_path)

    rprof.visualize(file_path=f"profiles/input-limit-{input_limit}.html")

以下是 visualize() 调用生成的图表:

输入限制 = 2

输入限制 = 4

输入限制 = 8

输入限制 = 16

完整的数据集大约有 50 个输入文件,所以以这种增长速度,我对作业耗尽我 32gb 机器上的所有内存并不感到惊讶。

我的理解是,Dask 的全部意义在于允许您对大于内存的数据集进行操作。我的印象是人们正在使用 Dask 处理比我的 ~14gb 大得多的数据集。他们如何通过缩放内存消耗来避免这个问题?我在这里做错了什么?

此时我对使用不同的调度程序或并行性不感兴趣。我只想知道为什么 Dask 消耗的内存比我想象的要多。

事实证明这是 Dask 中的性能回归,已在 2021.03.0 版本中修复。

查看此 Github issue 了解更多信息。