dask dataframe groupby 导致一个分区内存问题

dask dataframe groupby resulting in one partition memory issue

我正在将 64 个压缩的 csv 文件(可能是 70-80 GB)读入一个 dask 数据帧,然后 运行 groupby 聚合。

作业从未完成,因为 groupby 显然创建了一个只有一个分区的数据框。

This post and this post 已经解决了这个问题,但关注的是计算图,而不是你 运行 遇到的内存问题,当你的结果数据帧太大时。

我尝试了重新分区的解决方法,但作业仍然无法完成。

我做错了什么,我必须使用 map_partition 吗?这非常令人困惑,因为我希望 Dask 即使在聚合操作之后也会负责对所有内容进行分区。

    from dask.distributed import Client, progress
    client = Client(n_workers=4, threads_per_worker=1, memory_limit='8GB',diagnostics_port=5000)
    client

    dask.config.set(scheduler='processes')
    dB3 = dd.read_csv("boden/expansion*.csv",  # read in parallel
                 blocksize=None, # 64 files
                 sep=',',
                 compression='gzip'
    )

    aggs = {
      'boden': ['count','min']
    }
    dBSelect=dB3.groupby(['lng','lat']).agg(aggs).repartition(npartitions=64) 
    dBSelect=dBSelect.reset_index()
    dBSelect.columns=['lng','lat','bodenCount','boden']
    dBSelect=dBSelect.drop('bodenCount',axis=1)
    with ProgressBar(dt=30): dBSelect.compute().to_parquet('boden/final/boden_final.parq',compression=None)  

大多数 groupby 聚合输出都很小,很容易放在一个分区中。显然,您的情况并非如此。

要解决此问题,您应该使用 groupby 聚合的 split_out= 参数来请求一定数量的输出分区。

df.groupby(['x', 'y', 'z']).mean(split_out=10)

请注意,使用 split_out= 会显着增加任务图的大小(它必须稍微提前 shuffle/sort 您的数据),因此可能会增加调度开销。