dask dataframe groupby 导致一个分区内存问题
dask dataframe groupby resulting in one partition memory issue
我正在将 64 个压缩的 csv 文件(可能是 70-80 GB)读入一个 dask 数据帧,然后 运行 groupby 聚合。
作业从未完成,因为 groupby 显然创建了一个只有一个分区的数据框。
This post and this post 已经解决了这个问题,但关注的是计算图,而不是你 运行 遇到的内存问题,当你的结果数据帧太大时。
我尝试了重新分区的解决方法,但作业仍然无法完成。
我做错了什么,我必须使用 map_partition 吗?这非常令人困惑,因为我希望 Dask 即使在聚合操作之后也会负责对所有内容进行分区。
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='8GB',diagnostics_port=5000)
client
dask.config.set(scheduler='processes')
dB3 = dd.read_csv("boden/expansion*.csv", # read in parallel
blocksize=None, # 64 files
sep=',',
compression='gzip'
)
aggs = {
'boden': ['count','min']
}
dBSelect=dB3.groupby(['lng','lat']).agg(aggs).repartition(npartitions=64)
dBSelect=dBSelect.reset_index()
dBSelect.columns=['lng','lat','bodenCount','boden']
dBSelect=dBSelect.drop('bodenCount',axis=1)
with ProgressBar(dt=30): dBSelect.compute().to_parquet('boden/final/boden_final.parq',compression=None)
大多数 groupby 聚合输出都很小,很容易放在一个分区中。显然,您的情况并非如此。
要解决此问题,您应该使用 groupby 聚合的 split_out=
参数来请求一定数量的输出分区。
df.groupby(['x', 'y', 'z']).mean(split_out=10)
请注意,使用 split_out=
会显着增加任务图的大小(它必须稍微提前 shuffle/sort 您的数据),因此可能会增加调度开销。
我正在将 64 个压缩的 csv 文件(可能是 70-80 GB)读入一个 dask 数据帧,然后 运行 groupby 聚合。
作业从未完成,因为 groupby 显然创建了一个只有一个分区的数据框。
This post and this post 已经解决了这个问题,但关注的是计算图,而不是你 运行 遇到的内存问题,当你的结果数据帧太大时。
我尝试了重新分区的解决方法,但作业仍然无法完成。
我做错了什么,我必须使用 map_partition 吗?这非常令人困惑,因为我希望 Dask 即使在聚合操作之后也会负责对所有内容进行分区。
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='8GB',diagnostics_port=5000)
client
dask.config.set(scheduler='processes')
dB3 = dd.read_csv("boden/expansion*.csv", # read in parallel
blocksize=None, # 64 files
sep=',',
compression='gzip'
)
aggs = {
'boden': ['count','min']
}
dBSelect=dB3.groupby(['lng','lat']).agg(aggs).repartition(npartitions=64)
dBSelect=dBSelect.reset_index()
dBSelect.columns=['lng','lat','bodenCount','boden']
dBSelect=dBSelect.drop('bodenCount',axis=1)
with ProgressBar(dt=30): dBSelect.compute().to_parquet('boden/final/boden_final.parq',compression=None)
大多数 groupby 聚合输出都很小,很容易放在一个分区中。显然,您的情况并非如此。
要解决此问题,您应该使用 groupby 聚合的 split_out=
参数来请求一定数量的输出分区。
df.groupby(['x', 'y', 'z']).mean(split_out=10)
请注意,使用 split_out=
会显着增加任务图的大小(它必须稍微提前 shuffle/sort 您的数据),因此可能会增加调度开销。